From fdafeae1f5bc9d536098eab2051200dfdb01660c Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Fri, 6 Mar 2026 11:19:17 +0800 Subject: [PATCH] haha --- 1.py | 2 +- common/protocol.py | 1 + server/api/filters.py | 67 +++++++++++++++++++++++++++++++++- server/urls.py | 1 + worker/tasks/registry.py | 2 ++ worker/tasks/sync_filters.py | 69 ++++++++++++++++++++++++++++++++++++ 6 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 worker/tasks/sync_filters.py diff --git a/1.py b/1.py index 343c7f9..cb3d197 100644 --- a/1.py +++ b/1.py @@ -370,5 +370,5 @@ def main1( if __name__ == "__main__": # greet_target=50:所有岗位合计打招呼 50 人;一轮不够则从第一个岗位再跑,直到够了或一轮无新增 main(filters=["初中及以下", "离职-随时到岗"], greet_target=50) - # main1() + main1() # main2() diff --git a/common/protocol.py b/common/protocol.py index 20e9c84..9cb48d3 100644 --- a/common/protocol.py +++ b/common/protocol.py @@ -44,6 +44,7 @@ class TaskType(str, Enum): BOSS_RECRUIT = "boss_recruit" # New greeting flow (from 1.py main) BOSS_REPLY = "boss_reply" # Old recruit flow, now reply flow CHECK_LOGIN = "check_login" + SYNC_FILTERS = "sync_filters" # Fetch recruit filter options from site def make_msg(msg_type: MsgType, **payload) -> dict: diff --git a/server/api/filters.py b/server/api/filters.py index 71005d0..8c995af 100644 --- a/server/api/filters.py +++ b/server/api/filters.py @@ -2,16 +2,25 @@ """ Filter APIs: - Recruit filter options (synced from site at worker startup) +- Trigger filter sync via task dispatch to worker """ +import logging + +from asgiref.sync import async_to_sync from rest_framework import status from rest_framework.decorators import api_view +from common.protocol import TaskStatus, TaskType from server.core.response import api_error, api_success -from server.models import BossAccount, RecruitFilterSnapshot +from server.core.task_dispatcher import task_dispatcher +from server.core.worker_manager import worker_manager +from server.models import BossAccount, RecruitFilterSnapshot, TaskCreate from server.serializers import ( RecruitFilterSnapshotSerializer, ) +logger = logging.getLogger("server.api.filters") + def _snapshot_payload(snapshot: RecruitFilterSnapshot) -> dict: data = RecruitFilterSnapshotSerializer(snapshot).data @@ -68,3 +77,59 @@ def recruit_filter_options(request): } ) return api_success(_snapshot_payload(snapshot)) + + +@api_view(["POST"]) +def recruit_filter_sync(request): + """ + 触发筛选条件同步:派发 sync_filters 任务给 Worker。 + 请求体: { "account_id": 123 } + 返回: 创建的任务信息(前端可用 task_id 轮询状态)。 + """ + account_id = request.data.get("account_id") + if not account_id: + return api_error(status.HTTP_400_BAD_REQUEST, "请提供 account_id") + + try: + account = BossAccount.objects.get(pk=int(account_id)) + except (ValueError, BossAccount.DoesNotExist): + return api_error(status.HTTP_404_NOT_FOUND, "账号不存在") + + target_worker_id = account.worker_id + if not target_worker_id: + return api_error(status.HTTP_400_BAD_REQUEST, "该账号未绑定 Worker") + + if not worker_manager.is_online(target_worker_id): + return api_error(status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线") + + req = TaskCreate( + task_type=TaskType.SYNC_FILTERS, + worker_id=target_worker_id, + account_name=account.browser_name, + params={}, + ) + + try: + task = task_dispatcher.create_task(req) + except ValueError as e: + return api_error(status.HTTP_409_CONFLICT, str(e)) + + send_fn = worker_manager.get_send_fn(target_worker_id) + if not send_fn: + return api_error(status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在") + + success = async_to_sync(task_dispatcher.dispatch)(task, send_fn) + if not success: + return api_error(status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}") + + logger.info("筛选条件同步任务已派发: task_id=%s, worker=%s, account=%s", task.task_id, target_worker_id, account.browser_name) + + return api_success( + { + "task_id": task.task_id, + "task_type": str(task.task_type), + "status": str(task.status), + }, + msg="筛选条件同步任务已派发", + http_status=status.HTTP_201_CREATED, + ) diff --git a/server/urls.py b/server/urls.py index 85b3809..a236ba0 100644 --- a/server/urls.py +++ b/server/urls.py @@ -37,6 +37,7 @@ urlpatterns = [ # ─── 招聘筛选快照 ─── path("api/filters/options", filters.recruit_filter_options), + path("api/filters/sync", filters.recruit_filter_sync), # ─── 话术管理 ─── path("api/scripts", scripts.script_list), diff --git a/worker/tasks/registry.py b/worker/tasks/registry.py index 97a7b26..5ecb38f 100644 --- a/worker/tasks/registry.py +++ b/worker/tasks/registry.py @@ -34,8 +34,10 @@ def register_all_handlers() -> None: from worker.tasks.boss_recruit import BossRecruitHandler from worker.tasks.boss_reply import BossReplyHandler from worker.tasks.check_login import CheckLoginHandler + from worker.tasks.sync_filters import SyncFiltersHandler register_handler(BossRecruitHandler) register_handler(BossReplyHandler) register_handler(CheckLoginHandler) + register_handler(SyncFiltersHandler) diff --git a/worker/tasks/sync_filters.py b/worker/tasks/sync_filters.py new file mode 100644 index 0000000..eaf601d --- /dev/null +++ b/worker/tasks/sync_filters.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +""" +同步招聘筛选条件任务。 +流程:调用 main1 从 BOSS 直聘网页抓取筛选项 → 保存到数据库快照表。 +""" +from __future__ import annotations + +import asyncio +from typing import Any, Callable, Coroutine, Dict + +from common.protocol import TaskType +from worker.tasks.base import BaseTaskHandler +from worker.recruit_filter_sync import sync_recruit_filters_for_account + + +class SyncFiltersHandler(BaseTaskHandler): + """从 BOSS 直聘网页同步招聘筛选条件。""" + + task_type = TaskType.SYNC_FILTERS.value + + async def execute( + self, + task_id: str, + params: Dict[str, Any], + progress_cb: Callable[[str, str], Coroutine], + ) -> Any: + account_name = params.get("account_name", "") + worker_id = params.get("worker_id", "") + bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345") + cancel_event = params.get("_cancel_event") + + self.ensure_not_cancelled(cancel_event) + await progress_cb(task_id, f"正在同步筛选条件: {account_name}...") + + payload = await asyncio.get_event_loop().run_in_executor( + None, + self._run_sync, + worker_id, + account_name, + bit_api_base, + cancel_event, + ) + + self.ensure_not_cancelled(cancel_event) + groups_count = len(payload.get("groups", [])) + options_count = len(payload.get("flat_options", [])) + await progress_cb(task_id, f"同步完成: {groups_count} 个分组, {options_count} 个筛选项") + return { + "groups": groups_count, + "flat_options": options_count, + "account_name": account_name, + } + + def _run_sync( + self, + worker_id: str, + account_name: str, + bit_api_base: str, + cancel_event, + ) -> dict: + self.ensure_not_cancelled(cancel_event) + payload = sync_recruit_filters_for_account( + worker_id=worker_id, + bit_api_base=bit_api_base, + account_name=account_name, + logger=self.logger, + ) + self.ensure_not_cancelled(cancel_event) + return payload