This commit is contained in:
Your Name
2026-02-14 16:50:02 +08:00
parent 0ac1e9549c
commit 2e143fb0c0
14 changed files with 588 additions and 0 deletions

8
server/apps.py Normal file
View File

@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
from django.apps import AppConfig
class ServerConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "server"
verbose_name = "BOSS 直聘自动化服务"

20
server/asgi.py Normal file
View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
"""
ASGI 入口HTTP (Django) + WebSocket (Channels)。
"""
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
import django # noqa: E402
django.setup() # noqa: E402
from channels.routing import ProtocolTypeRouter, URLRouter # noqa: E402
from django.core.asgi import get_asgi_application # noqa: E402
from server.ws.routing import websocket_urlpatterns # noqa: E402
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": URLRouter(websocket_urlpatterns),
})

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
"""
DRF 自定义认证后端:从 Cookie 中读取 auth_token 校验。
"""
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class TokenUser:
"""轻量用户对象(不依赖 Django auth 模块)。"""
def __init__(self, username: str):
self.username = username
self.is_authenticated = True
def __str__(self):
return self.username
class CookieTokenAuthentication(BaseAuthentication):
"""
从请求 Cookie 中读取 auth_token查数据库校验。
"""
def authenticate(self, request):
token = request.COOKIES.get("auth_token")
if not token:
return None # 未携带 token交给权限类处理
# 延迟导入避免循环依赖
from server.models import AuthToken
try:
row = AuthToken.objects.get(token=token)
except AuthToken.DoesNotExist:
raise AuthenticationFailed("登录已失效,请重新登录")
return (TokenUser(row.username), token)

View File

@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
"""
DRF 自定义异常处理器:统一错误响应格式。
"""
from rest_framework.views import exception_handler
from rest_framework.response import Response
def custom_exception_handler(exc, context):
response = exception_handler(exc, context)
if response is not None:
# 统一格式:{"detail": "..."}
if isinstance(response.data, list):
response.data = {"detail": response.data[0] if response.data else str(exc)}
elif isinstance(response.data, dict) and "detail" not in response.data:
response.data = {"detail": str(response.data)}
return response

15
server/manage.py Normal file
View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Django 管理入口。"""
import os
import sys
def main():
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
if __name__ == "__main__":
main()

22
server/middleware.py Normal file
View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
自定义中间件。
"""
class CorsMiddleware:
"""简易 CORS 中间件,允许所有跨域请求。"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
response["Access-Control-Allow-Origin"] = "*"
response["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response["Access-Control-Allow-Headers"] = "*"
response["Access-Control-Allow-Credentials"] = "true"
if request.method == "OPTIONS":
response.status_code = 200
response.content = b""
return response

View File

@@ -0,0 +1,69 @@
# Generated by Django 6.0.2 on 2026-02-14 16:02
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='AuthToken',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('username', models.CharField(max_length=64, unique=True, verbose_name='用户名')),
('token', models.CharField(max_length=64, verbose_name='当前有效 token')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
options={
'verbose_name': '登录 Token',
'verbose_name_plural': '登录 Token',
'db_table': 'auth_token',
},
),
migrations.CreateModel(
name='TaskLog',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('task_id', models.CharField(max_length=32, unique=True, verbose_name='任务 ID')),
('task_type', models.CharField(max_length=64, verbose_name='任务类型')),
('worker_id', models.CharField(default='', max_length=64, verbose_name='执行的 Worker')),
('status', models.CharField(default='', max_length=32, verbose_name='最终状态')),
('params', models.JSONField(blank=True, null=True, verbose_name='任务参数')),
('result', models.JSONField(blank=True, null=True, verbose_name='任务结果')),
('error', models.TextField(blank=True, null=True, verbose_name='错误信息')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
options={
'verbose_name': '任务日志',
'verbose_name_plural': '任务日志',
'db_table': 'task_log',
},
),
migrations.CreateModel(
name='BossAccount',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('worker_id', models.CharField(max_length=64, verbose_name='Worker 标识')),
('browser_id', models.CharField(default='', max_length=128, verbose_name='比特浏览器窗口 ID')),
('browser_name', models.CharField(default='', max_length=128, verbose_name='比特浏览器窗口名称(环境名)')),
('boss_username', models.CharField(default='', max_length=128, verbose_name='BOSS 直聘用户名')),
('is_logged_in', models.BooleanField(default=False, verbose_name='是否已登录')),
('current_task_id', models.CharField(blank=True, max_length=32, null=True, verbose_name='当前检测任务 ID')),
('current_task_status', models.CharField(blank=True, max_length=32, null=True, verbose_name='当前检测任务状态')),
('checked_at', models.DateTimeField(blank=True, null=True, verbose_name='最近一次检测时间')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
('updated_at', models.DateTimeField(auto_now=True, verbose_name='更新时间')),
],
options={
'verbose_name': 'BOSS 账号',
'verbose_name_plural': 'BOSS 账号',
'db_table': 'boss_account',
'unique_together': {('worker_id', 'browser_id')},
},
),
]

View File

@@ -0,0 +1 @@
# -*- coding: utf-8 -*-

83
server/serializers.py Normal file
View File

@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""
DRF 序列化器。
"""
from rest_framework import serializers
from server.models import BossAccount, TaskLog
# ────────────────────────── 账号 ──────────────────────────
class BossAccountSerializer(serializers.ModelSerializer):
"""账号序列化器(读取用)。"""
# 运行时补充的字段(非数据库字段)
worker_name = serializers.CharField(read_only=True, default="")
worker_online = serializers.BooleanField(read_only=True, default=False)
class Meta:
model = BossAccount
fields = [
"id", "worker_id", "browser_id", "browser_name",
"boss_username", "is_logged_in",
"current_task_id", "current_task_status",
"checked_at", "created_at", "updated_at",
"worker_name", "worker_online",
]
class AccountBindSerializer(serializers.Serializer):
"""添加账号请求。"""
browser_name = serializers.CharField(max_length=128)
worker_id = serializers.CharField(max_length=64)
# ────────────────────────── 任务 ──────────────────────────
class TaskCreateSerializer(serializers.Serializer):
"""提交任务请求。"""
task_type = serializers.CharField(max_length=64)
worker_id = serializers.CharField(max_length=64, required=False, allow_blank=True, default="")
account_name = serializers.CharField(max_length=128, required=False, allow_blank=True, default="")
params = serializers.JSONField(required=False, default=dict)
class TaskOutSerializer(serializers.Serializer):
"""任务信息响应。"""
task_id = serializers.CharField()
task_type = serializers.CharField()
status = serializers.CharField()
worker_id = serializers.CharField(allow_null=True)
account_name = serializers.CharField(allow_null=True)
params = serializers.DictField()
progress = serializers.CharField(allow_null=True)
result = serializers.JSONField(allow_null=True)
error = serializers.CharField(allow_null=True)
created_at = serializers.FloatField()
updated_at = serializers.FloatField()
# ────────────────────────── Worker ──────────────────────────
class BrowserProfileSerializer(serializers.Serializer):
"""浏览器窗口信息。"""
id = serializers.CharField()
name = serializers.CharField()
remark = serializers.CharField()
class WorkerOutSerializer(serializers.Serializer):
"""Worker 信息响应。"""
worker_id = serializers.CharField()
worker_name = serializers.CharField()
browsers = BrowserProfileSerializer(many=True)
online = serializers.BooleanField()
current_task_id = serializers.CharField(allow_null=True)
# ────────────────────────── 认证 ──────────────────────────
class LoginSerializer(serializers.Serializer):
"""登录请求。"""
username = serializers.CharField(max_length=64)
password = serializers.CharField(max_length=128)

82
server/settings.py Normal file
View File

@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
"""
Django 配置文件。
"""
import os
import sys
from server.config import DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME
# ─── 基础 ───
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SECRET_KEY = os.getenv("DJANGO_SECRET_KEY", "boss-dp-secret-key-change-in-production")
DEBUG = os.getenv("DJANGO_DEBUG", "True").lower() in ("true", "1", "yes")
ALLOWED_HOSTS = ["*"]
# ─── 应用 ───
INSTALLED_APPS = [
"django.contrib.contenttypes",
"rest_framework",
"channels",
"server",
]
# ─── 中间件 ───
MIDDLEWARE = [
"django.middleware.common.CommonMiddleware",
"server.middleware.CorsMiddleware",
]
# ─── URL / ASGI ───
ROOT_URLCONF = "server.urls"
ASGI_APPLICATION = "server.asgi.application"
# ─── 数据库 ───
DATABASES = {
"default": {
"ENGINE": "django.db.backends.mysql",
"NAME": DB_NAME,
"USER": DB_USER,
"PASSWORD": DB_PASSWORD,
"HOST": DB_HOST,
"PORT": str(DB_PORT),
"OPTIONS": {
"charset": "utf8mb4",
},
}
}
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
# ─── DRF ───
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": [
"server.core.authentication.CookieTokenAuthentication",
],
"DEFAULT_PERMISSION_CLASSES": [
"rest_framework.permissions.IsAuthenticated",
],
"UNAUTHENTICATED_USER": None,
"DEFAULT_RENDERER_CLASSES": [
"rest_framework.renderers.JSONRenderer",
],
"DEFAULT_PARSER_CLASSES": [
"rest_framework.parsers.JSONParser",
"rest_framework.parsers.FormParser",
"rest_framework.parsers.MultiPartParser",
],
"EXCEPTION_HANDLER": "server.core.exception_handler.custom_exception_handler",
}
# ─── Channels ───
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
}
}
# ─── 时区 / 国际化 ───
LANGUAGE_CODE = "zh-hans"
TIME_ZONE = "Asia/Shanghai"
USE_I18N = True
USE_TZ = False

27
server/urls.py Normal file
View File

@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
"""
Django URL 路由配置。
"""
from django.urls import path
from server.api import auth, accounts, tasks, workers
urlpatterns = [
# ─── 健康检查 ───
path("health", workers.health_check),
# ─── 认证 ───
path("api/auth/login", auth.login),
# ─── Worker ───
path("api/workers", workers.worker_list),
path("api/workers/<str:worker_id>", workers.worker_detail),
# ─── 任务 ───
path("api/tasks", tasks.task_list),
path("api/tasks/<str:task_id>", tasks.task_detail),
# ─── 账号 ───
path("api/accounts", accounts.account_list),
path("api/accounts/<int:account_id>", accounts.account_detail),
]

1
server/ws/__init__.py Normal file
View File

@@ -0,0 +1 @@
# -*- coding: utf-8 -*-

194
server/ws/consumers.py Normal file
View File

@@ -0,0 +1,194 @@
# -*- coding: utf-8 -*-
"""
Django Channels WebSocket Consumer处理 Worker 连接。
"""
import asyncio
import json
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from common.protocol import MsgType, TaskStatus, TaskType, make_msg
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
logger = logging.getLogger("server.ws")
class WorkerConsumer(AsyncWebsocketConsumer):
"""
Worker 连接流程:
1. 建立连接
2. 等待第一条 register 消息
3. 持续收发消息(心跳、任务进度、任务结果等)
4. 断开时注销
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_id = None
self._registered = False
async def connect(self):
await self.accept()
# 设置 30 秒注册超时
self._register_timeout = asyncio.get_event_loop().call_later(
30, lambda: asyncio.ensure_future(self._timeout_close())
)
async def _timeout_close(self):
if not self._registered:
logger.warning("WebSocket 连接超时(未在 30 秒内注册)")
await self.close(code=4003)
async def disconnect(self, close_code):
if hasattr(self, "_register_timeout"):
self._register_timeout.cancel()
if self.worker_id:
worker_manager.unregister(self.worker_id)
logger.info("Worker %s WebSocket 断开", self.worker_id)
async def _send_json(self, data: dict):
"""发送 JSON 消息(供 worker_manager 存储为 send_fn"""
await self.send(text_data=json.dumps(data, ensure_ascii=False))
async def receive(self, text_data=None, bytes_data=None):
if text_data is None:
return
try:
data = json.loads(text_data)
except json.JSONDecodeError:
return
msg_type = data.get("type", "")
# ── 未注册:只接受 register ──
if not self._registered:
if msg_type != MsgType.REGISTER.value:
await self._send_json(make_msg(MsgType.ERROR, detail="首条消息必须是 register"))
await self.close(code=4001)
return
self.worker_id = data.get("worker_id", "")
worker_name = data.get("worker_name", self.worker_id)
browsers = data.get("browsers", [])
if not self.worker_id:
await self._send_json(make_msg(MsgType.ERROR, detail="worker_id 不能为空"))
await self.close(code=4002)
return
worker_manager.register(self._send_json, self.worker_id, worker_name, browsers)
self._registered = True
if hasattr(self, "_register_timeout"):
self._register_timeout.cancel()
await self._send_json(make_msg(MsgType.REGISTER_ACK, worker_id=self.worker_id))
logger.info("Worker %s 已连接", self.worker_id)
return
# ── 已注册:处理各类消息 ──
if msg_type == MsgType.HEARTBEAT.value:
worker_manager.heartbeat(self.worker_id)
await self._send_json(make_msg(MsgType.HEARTBEAT_ACK))
elif msg_type == MsgType.BROWSER_LIST_UPDATE.value:
worker_manager.update_browsers(self.worker_id, data.get("browsers", []))
elif msg_type == MsgType.TASK_PROGRESS.value:
task_id = data.get("task_id", "")
progress = data.get("progress", "")
task_dispatcher.update_progress(task_id, progress)
logger.info("任务 %s 进度: %s", task_id, progress)
# 同步更新账号任务状态为 running
try:
self._update_account_task_status(task_id, TaskStatus.RUNNING.value)
except Exception:
pass
elif msg_type == MsgType.TASK_RESULT.value:
task_id = data.get("task_id", "")
result = data.get("result")
error = data.get("error")
task_dispatcher.complete_task(task_id, result=result, error=error)
worker_manager.set_current_task(self.worker_id, None)
logger.info("任务 %s 已完成", task_id)
# ── 将结果写入数据库 ──
try:
task_info = task_dispatcher.get_task(task_id)
if task_info:
final_status = task_info.status.value if hasattr(task_info.status, "value") else str(task_info.status)
self._save_task_log(task_id, task_info, result, error, final_status)
self._update_account_task_status(task_id, final_status)
# check_login 任务:更新账号登录状态
task_type_val = task_info.task_type.value if hasattr(task_info.task_type, "value") else str(task_info.task_type)
if task_type_val == TaskType.CHECK_LOGIN.value and result and not error:
self._upsert_account_status(result)
except Exception as db_err:
logger.error("任务 %s 写入数据库失败: %s", task_id, db_err)
else:
logger.warning("未知消息类型: %s (from %s)", msg_type, self.worker_id)
# ────────────────────────── 数据库操作(同步) ──────────────────────────
@staticmethod
def _save_task_log(task_id, task_info, result, error, final_status):
from server.models import TaskLog
TaskLog.objects.update_or_create(
task_id=task_id,
defaults={
"task_type": task_info.task_type.value if hasattr(task_info.task_type, "value") else str(task_info.task_type),
"worker_id": task_info.worker_id or "",
"status": final_status,
"params": task_info.params,
"result": result,
"error": error,
},
)
@staticmethod
def _update_account_task_status(task_id, task_status):
from server.models import BossAccount
BossAccount.objects.filter(current_task_id=task_id).update(current_task_status=task_status)
def _upsert_account_status(self, result):
from server.models import BossAccount
from django.utils import timezone as tz
browser_id = result.get("browser_id", "")
browser_name = result.get("browser_name", "")
boss_username = result.get("boss_username", "")
is_logged_in = result.get("is_logged_in", False)
# 优先按 worker_id + browser_name 匹配
account = None
if browser_name:
account = BossAccount.objects.filter(
worker_id=self.worker_id, browser_name=browser_name,
).first()
if not account and browser_id:
account = BossAccount.objects.filter(
worker_id=self.worker_id, browser_id=browser_id,
).first()
now = tz.now()
if account:
account.browser_id = browser_id or account.browser_id
account.browser_name = browser_name or account.browser_name
account.boss_username = boss_username
account.is_logged_in = is_logged_in
account.checked_at = now
account.save()
else:
BossAccount.objects.create(
worker_id=self.worker_id,
browser_id=browser_id or f"name:{browser_name}",
browser_name=browser_name,
boss_username=boss_username,
is_logged_in=is_logged_in,
checked_at=now,
)
logger.info(
"账号状态更新: worker=%s, browser=%s(%s), username=%s, logged_in=%s",
self.worker_id, browser_name, browser_id, boss_username, is_logged_in,
)

11
server/ws/routing.py Normal file
View File

@@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
"""
WebSocket URL 路由。
"""
from django.urls import re_path
from server.ws.consumers import WorkerConsumer
websocket_urlpatterns = [
re_path(r"^ws$", WorkerConsumer.as_asgi()),
]