diff --git a/server/apps.py b/server/apps.py new file mode 100644 index 0000000..739d858 --- /dev/null +++ b/server/apps.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +from django.apps import AppConfig + + +class ServerConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "server" + verbose_name = "BOSS 直聘自动化服务" diff --git a/server/asgi.py b/server/asgi.py new file mode 100644 index 0000000..79028b6 --- /dev/null +++ b/server/asgi.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +""" +ASGI 入口:HTTP (Django) + WebSocket (Channels)。 +""" +import os + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") + +import django # noqa: E402 +django.setup() # noqa: E402 + +from channels.routing import ProtocolTypeRouter, URLRouter # noqa: E402 +from django.core.asgi import get_asgi_application # noqa: E402 + +from server.ws.routing import websocket_urlpatterns # noqa: E402 + +application = ProtocolTypeRouter({ + "http": get_asgi_application(), + "websocket": URLRouter(websocket_urlpatterns), +}) diff --git a/server/core/authentication.py b/server/core/authentication.py new file mode 100644 index 0000000..81c2d79 --- /dev/null +++ b/server/core/authentication.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +""" +DRF 自定义认证后端:从 Cookie 中读取 auth_token 校验。 +""" +from rest_framework.authentication import BaseAuthentication +from rest_framework.exceptions import AuthenticationFailed + + +class TokenUser: + """轻量用户对象(不依赖 Django auth 模块)。""" + + def __init__(self, username: str): + self.username = username + self.is_authenticated = True + + def __str__(self): + return self.username + + +class CookieTokenAuthentication(BaseAuthentication): + """ + 从请求 Cookie 中读取 auth_token,查数据库校验。 + """ + + def authenticate(self, request): + token = request.COOKIES.get("auth_token") + if not token: + return None # 未携带 token,交给权限类处理 + + # 延迟导入避免循环依赖 + from server.models import AuthToken + + try: + row = AuthToken.objects.get(token=token) + except AuthToken.DoesNotExist: + raise AuthenticationFailed("登录已失效,请重新登录") + + return (TokenUser(row.username), token) diff --git a/server/core/exception_handler.py b/server/core/exception_handler.py new file mode 100644 index 0000000..3d05357 --- /dev/null +++ b/server/core/exception_handler.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +""" +DRF 自定义异常处理器:统一错误响应格式。 +""" +from rest_framework.views import exception_handler +from rest_framework.response import Response + + +def custom_exception_handler(exc, context): + response = exception_handler(exc, context) + if response is not None: + # 统一格式:{"detail": "..."} + if isinstance(response.data, list): + response.data = {"detail": response.data[0] if response.data else str(exc)} + elif isinstance(response.data, dict) and "detail" not in response.data: + response.data = {"detail": str(response.data)} + return response diff --git a/server/manage.py b/server/manage.py new file mode 100644 index 0000000..3e717e5 --- /dev/null +++ b/server/manage.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Django 管理入口。""" +import os +import sys + + +def main(): + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") + from django.core.management import execute_from_command_line + execute_from_command_line(sys.argv) + + +if __name__ == "__main__": + main() diff --git a/server/middleware.py b/server/middleware.py new file mode 100644 index 0000000..4f479a5 --- /dev/null +++ b/server/middleware.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +""" +自定义中间件。 +""" + + +class CorsMiddleware: + """简易 CORS 中间件,允许所有跨域请求。""" + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + response = self.get_response(request) + response["Access-Control-Allow-Origin"] = "*" + response["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" + response["Access-Control-Allow-Headers"] = "*" + response["Access-Control-Allow-Credentials"] = "true" + if request.method == "OPTIONS": + response.status_code = 200 + response.content = b"" + return response diff --git a/server/migrations/0001_initial.py b/server/migrations/0001_initial.py new file mode 100644 index 0000000..a1afa5d --- /dev/null +++ b/server/migrations/0001_initial.py @@ -0,0 +1,69 @@ +# Generated by Django 6.0.2 on 2026-02-14 16:02 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='AuthToken', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('username', models.CharField(max_length=64, unique=True, verbose_name='用户名')), + ('token', models.CharField(max_length=64, verbose_name='当前有效 token')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')), + ], + options={ + 'verbose_name': '登录 Token', + 'verbose_name_plural': '登录 Token', + 'db_table': 'auth_token', + }, + ), + migrations.CreateModel( + name='TaskLog', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('task_id', models.CharField(max_length=32, unique=True, verbose_name='任务 ID')), + ('task_type', models.CharField(max_length=64, verbose_name='任务类型')), + ('worker_id', models.CharField(default='', max_length=64, verbose_name='执行的 Worker')), + ('status', models.CharField(default='', max_length=32, verbose_name='最终状态')), + ('params', models.JSONField(blank=True, null=True, verbose_name='任务参数')), + ('result', models.JSONField(blank=True, null=True, verbose_name='任务结果')), + ('error', models.TextField(blank=True, null=True, verbose_name='错误信息')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')), + ], + options={ + 'verbose_name': '任务日志', + 'verbose_name_plural': '任务日志', + 'db_table': 'task_log', + }, + ), + migrations.CreateModel( + name='BossAccount', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('worker_id', models.CharField(max_length=64, verbose_name='Worker 标识')), + ('browser_id', models.CharField(default='', max_length=128, verbose_name='比特浏览器窗口 ID')), + ('browser_name', models.CharField(default='', max_length=128, verbose_name='比特浏览器窗口名称(环境名)')), + ('boss_username', models.CharField(default='', max_length=128, verbose_name='BOSS 直聘用户名')), + ('is_logged_in', models.BooleanField(default=False, verbose_name='是否已登录')), + ('current_task_id', models.CharField(blank=True, max_length=32, null=True, verbose_name='当前检测任务 ID')), + ('current_task_status', models.CharField(blank=True, max_length=32, null=True, verbose_name='当前检测任务状态')), + ('checked_at', models.DateTimeField(blank=True, null=True, verbose_name='最近一次检测时间')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='更新时间')), + ], + options={ + 'verbose_name': 'BOSS 账号', + 'verbose_name_plural': 'BOSS 账号', + 'db_table': 'boss_account', + 'unique_together': {('worker_id', 'browser_id')}, + }, + ), + ] diff --git a/server/migrations/__init__.py b/server/migrations/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/server/migrations/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/server/serializers.py b/server/serializers.py new file mode 100644 index 0000000..c198e48 --- /dev/null +++ b/server/serializers.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +""" +DRF 序列化器。 +""" +from rest_framework import serializers + +from server.models import BossAccount, TaskLog + + +# ────────────────────────── 账号 ────────────────────────── + +class BossAccountSerializer(serializers.ModelSerializer): + """账号序列化器(读取用)。""" + # 运行时补充的字段(非数据库字段) + worker_name = serializers.CharField(read_only=True, default="") + worker_online = serializers.BooleanField(read_only=True, default=False) + + class Meta: + model = BossAccount + fields = [ + "id", "worker_id", "browser_id", "browser_name", + "boss_username", "is_logged_in", + "current_task_id", "current_task_status", + "checked_at", "created_at", "updated_at", + "worker_name", "worker_online", + ] + + +class AccountBindSerializer(serializers.Serializer): + """添加账号请求。""" + browser_name = serializers.CharField(max_length=128) + worker_id = serializers.CharField(max_length=64) + + +# ────────────────────────── 任务 ────────────────────────── + +class TaskCreateSerializer(serializers.Serializer): + """提交任务请求。""" + task_type = serializers.CharField(max_length=64) + worker_id = serializers.CharField(max_length=64, required=False, allow_blank=True, default="") + account_name = serializers.CharField(max_length=128, required=False, allow_blank=True, default="") + params = serializers.JSONField(required=False, default=dict) + + +class TaskOutSerializer(serializers.Serializer): + """任务信息响应。""" + task_id = serializers.CharField() + task_type = serializers.CharField() + status = serializers.CharField() + worker_id = serializers.CharField(allow_null=True) + account_name = serializers.CharField(allow_null=True) + params = serializers.DictField() + progress = serializers.CharField(allow_null=True) + result = serializers.JSONField(allow_null=True) + error = serializers.CharField(allow_null=True) + created_at = serializers.FloatField() + updated_at = serializers.FloatField() + + +# ────────────────────────── Worker ────────────────────────── + +class BrowserProfileSerializer(serializers.Serializer): + """浏览器窗口信息。""" + id = serializers.CharField() + name = serializers.CharField() + remark = serializers.CharField() + + +class WorkerOutSerializer(serializers.Serializer): + """Worker 信息响应。""" + worker_id = serializers.CharField() + worker_name = serializers.CharField() + browsers = BrowserProfileSerializer(many=True) + online = serializers.BooleanField() + current_task_id = serializers.CharField(allow_null=True) + + +# ────────────────────────── 认证 ────────────────────────── + +class LoginSerializer(serializers.Serializer): + """登录请求。""" + username = serializers.CharField(max_length=64) + password = serializers.CharField(max_length=128) diff --git a/server/settings.py b/server/settings.py new file mode 100644 index 0000000..15bd156 --- /dev/null +++ b/server/settings.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +""" +Django 配置文件。 +""" +import os +import sys + +from server.config import DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME + +# ─── 基础 ─── +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SECRET_KEY = os.getenv("DJANGO_SECRET_KEY", "boss-dp-secret-key-change-in-production") +DEBUG = os.getenv("DJANGO_DEBUG", "True").lower() in ("true", "1", "yes") +ALLOWED_HOSTS = ["*"] + +# ─── 应用 ─── +INSTALLED_APPS = [ + "django.contrib.contenttypes", + "rest_framework", + "channels", + "server", +] + +# ─── 中间件 ─── +MIDDLEWARE = [ + "django.middleware.common.CommonMiddleware", + "server.middleware.CorsMiddleware", +] + +# ─── URL / ASGI ─── +ROOT_URLCONF = "server.urls" +ASGI_APPLICATION = "server.asgi.application" + +# ─── 数据库 ─── +DATABASES = { + "default": { + "ENGINE": "django.db.backends.mysql", + "NAME": DB_NAME, + "USER": DB_USER, + "PASSWORD": DB_PASSWORD, + "HOST": DB_HOST, + "PORT": str(DB_PORT), + "OPTIONS": { + "charset": "utf8mb4", + }, + } +} + +DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" + +# ─── DRF ─── +REST_FRAMEWORK = { + "DEFAULT_AUTHENTICATION_CLASSES": [ + "server.core.authentication.CookieTokenAuthentication", + ], + "DEFAULT_PERMISSION_CLASSES": [ + "rest_framework.permissions.IsAuthenticated", + ], + "UNAUTHENTICATED_USER": None, + "DEFAULT_RENDERER_CLASSES": [ + "rest_framework.renderers.JSONRenderer", + ], + "DEFAULT_PARSER_CLASSES": [ + "rest_framework.parsers.JSONParser", + "rest_framework.parsers.FormParser", + "rest_framework.parsers.MultiPartParser", + ], + "EXCEPTION_HANDLER": "server.core.exception_handler.custom_exception_handler", +} + +# ─── Channels ─── +CHANNEL_LAYERS = { + "default": { + "BACKEND": "channels.layers.InMemoryChannelLayer", + } +} + +# ─── 时区 / 国际化 ─── +LANGUAGE_CODE = "zh-hans" +TIME_ZONE = "Asia/Shanghai" +USE_I18N = True +USE_TZ = False diff --git a/server/urls.py b/server/urls.py new file mode 100644 index 0000000..fe286a0 --- /dev/null +++ b/server/urls.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +""" +Django URL 路由配置。 +""" +from django.urls import path + +from server.api import auth, accounts, tasks, workers + +urlpatterns = [ + # ─── 健康检查 ─── + path("health", workers.health_check), + + # ─── 认证 ─── + path("api/auth/login", auth.login), + + # ─── Worker ─── + path("api/workers", workers.worker_list), + path("api/workers/", workers.worker_detail), + + # ─── 任务 ─── + path("api/tasks", tasks.task_list), + path("api/tasks/", tasks.task_detail), + + # ─── 账号 ─── + path("api/accounts", accounts.account_list), + path("api/accounts/", accounts.account_detail), +] diff --git a/server/ws/__init__.py b/server/ws/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/server/ws/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/server/ws/consumers.py b/server/ws/consumers.py new file mode 100644 index 0000000..7505f7c --- /dev/null +++ b/server/ws/consumers.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +""" +Django Channels WebSocket Consumer:处理 Worker 连接。 +""" +import asyncio +import json +import logging + +from channels.generic.websocket import AsyncWebsocketConsumer + +from common.protocol import MsgType, TaskStatus, TaskType, make_msg +from server.core.worker_manager import worker_manager +from server.core.task_dispatcher import task_dispatcher + +logger = logging.getLogger("server.ws") + + +class WorkerConsumer(AsyncWebsocketConsumer): + """ + Worker 连接流程: + 1. 建立连接 + 2. 等待第一条 register 消息 + 3. 持续收发消息(心跳、任务进度、任务结果等) + 4. 断开时注销 + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.worker_id = None + self._registered = False + + async def connect(self): + await self.accept() + # 设置 30 秒注册超时 + self._register_timeout = asyncio.get_event_loop().call_later( + 30, lambda: asyncio.ensure_future(self._timeout_close()) + ) + + async def _timeout_close(self): + if not self._registered: + logger.warning("WebSocket 连接超时(未在 30 秒内注册)") + await self.close(code=4003) + + async def disconnect(self, close_code): + if hasattr(self, "_register_timeout"): + self._register_timeout.cancel() + if self.worker_id: + worker_manager.unregister(self.worker_id) + logger.info("Worker %s WebSocket 断开", self.worker_id) + + async def _send_json(self, data: dict): + """发送 JSON 消息(供 worker_manager 存储为 send_fn)。""" + await self.send(text_data=json.dumps(data, ensure_ascii=False)) + + async def receive(self, text_data=None, bytes_data=None): + if text_data is None: + return + try: + data = json.loads(text_data) + except json.JSONDecodeError: + return + + msg_type = data.get("type", "") + + # ── 未注册:只接受 register ── + if not self._registered: + if msg_type != MsgType.REGISTER.value: + await self._send_json(make_msg(MsgType.ERROR, detail="首条消息必须是 register")) + await self.close(code=4001) + return + + self.worker_id = data.get("worker_id", "") + worker_name = data.get("worker_name", self.worker_id) + browsers = data.get("browsers", []) + + if not self.worker_id: + await self._send_json(make_msg(MsgType.ERROR, detail="worker_id 不能为空")) + await self.close(code=4002) + return + + worker_manager.register(self._send_json, self.worker_id, worker_name, browsers) + self._registered = True + if hasattr(self, "_register_timeout"): + self._register_timeout.cancel() + await self._send_json(make_msg(MsgType.REGISTER_ACK, worker_id=self.worker_id)) + logger.info("Worker %s 已连接", self.worker_id) + return + + # ── 已注册:处理各类消息 ── + if msg_type == MsgType.HEARTBEAT.value: + worker_manager.heartbeat(self.worker_id) + await self._send_json(make_msg(MsgType.HEARTBEAT_ACK)) + + elif msg_type == MsgType.BROWSER_LIST_UPDATE.value: + worker_manager.update_browsers(self.worker_id, data.get("browsers", [])) + + elif msg_type == MsgType.TASK_PROGRESS.value: + task_id = data.get("task_id", "") + progress = data.get("progress", "") + task_dispatcher.update_progress(task_id, progress) + logger.info("任务 %s 进度: %s", task_id, progress) + # 同步更新账号任务状态为 running + try: + self._update_account_task_status(task_id, TaskStatus.RUNNING.value) + except Exception: + pass + + elif msg_type == MsgType.TASK_RESULT.value: + task_id = data.get("task_id", "") + result = data.get("result") + error = data.get("error") + task_dispatcher.complete_task(task_id, result=result, error=error) + worker_manager.set_current_task(self.worker_id, None) + logger.info("任务 %s 已完成", task_id) + + # ── 将结果写入数据库 ── + try: + task_info = task_dispatcher.get_task(task_id) + if task_info: + final_status = task_info.status.value if hasattr(task_info.status, "value") else str(task_info.status) + self._save_task_log(task_id, task_info, result, error, final_status) + self._update_account_task_status(task_id, final_status) + # check_login 任务:更新账号登录状态 + task_type_val = task_info.task_type.value if hasattr(task_info.task_type, "value") else str(task_info.task_type) + if task_type_val == TaskType.CHECK_LOGIN.value and result and not error: + self._upsert_account_status(result) + except Exception as db_err: + logger.error("任务 %s 写入数据库失败: %s", task_id, db_err) + + else: + logger.warning("未知消息类型: %s (from %s)", msg_type, self.worker_id) + + # ────────────────────────── 数据库操作(同步) ────────────────────────── + + @staticmethod + def _save_task_log(task_id, task_info, result, error, final_status): + from server.models import TaskLog + TaskLog.objects.update_or_create( + task_id=task_id, + defaults={ + "task_type": task_info.task_type.value if hasattr(task_info.task_type, "value") else str(task_info.task_type), + "worker_id": task_info.worker_id or "", + "status": final_status, + "params": task_info.params, + "result": result, + "error": error, + }, + ) + + @staticmethod + def _update_account_task_status(task_id, task_status): + from server.models import BossAccount + BossAccount.objects.filter(current_task_id=task_id).update(current_task_status=task_status) + + def _upsert_account_status(self, result): + from server.models import BossAccount + from django.utils import timezone as tz + browser_id = result.get("browser_id", "") + browser_name = result.get("browser_name", "") + boss_username = result.get("boss_username", "") + is_logged_in = result.get("is_logged_in", False) + + # 优先按 worker_id + browser_name 匹配 + account = None + if browser_name: + account = BossAccount.objects.filter( + worker_id=self.worker_id, browser_name=browser_name, + ).first() + if not account and browser_id: + account = BossAccount.objects.filter( + worker_id=self.worker_id, browser_id=browser_id, + ).first() + + now = tz.now() + if account: + account.browser_id = browser_id or account.browser_id + account.browser_name = browser_name or account.browser_name + account.boss_username = boss_username + account.is_logged_in = is_logged_in + account.checked_at = now + account.save() + else: + BossAccount.objects.create( + worker_id=self.worker_id, + browser_id=browser_id or f"name:{browser_name}", + browser_name=browser_name, + boss_username=boss_username, + is_logged_in=is_logged_in, + checked_at=now, + ) + logger.info( + "账号状态更新: worker=%s, browser=%s(%s), username=%s, logged_in=%s", + self.worker_id, browser_name, browser_id, boss_username, is_logged_in, + ) diff --git a/server/ws/routing.py b/server/ws/routing.py new file mode 100644 index 0000000..39ccfc2 --- /dev/null +++ b/server/ws/routing.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +""" +WebSocket URL 路由。 +""" +from django.urls import re_path + +from server.ws.consumers import WorkerConsumer + +websocket_urlpatterns = [ + re_path(r"^ws$", WorkerConsumer.as_asgi()), +]