This commit is contained in:
ddrwode
2026-03-06 11:19:17 +08:00
parent 7b351039f8
commit fdafeae1f5
6 changed files with 140 additions and 2 deletions

2
1.py
View File

@@ -370,5 +370,5 @@ def main1(
if __name__ == "__main__": if __name__ == "__main__":
# greet_target=50所有岗位合计打招呼 50 人;一轮不够则从第一个岗位再跑,直到够了或一轮无新增 # greet_target=50所有岗位合计打招呼 50 人;一轮不够则从第一个岗位再跑,直到够了或一轮无新增
main(filters=["初中及以下", "离职-随时到岗"], greet_target=50) main(filters=["初中及以下", "离职-随时到岗"], greet_target=50)
# main1() main1()
# main2() # main2()

View File

@@ -44,6 +44,7 @@ class TaskType(str, Enum):
BOSS_RECRUIT = "boss_recruit" # New greeting flow (from 1.py main) BOSS_RECRUIT = "boss_recruit" # New greeting flow (from 1.py main)
BOSS_REPLY = "boss_reply" # Old recruit flow, now reply flow BOSS_REPLY = "boss_reply" # Old recruit flow, now reply flow
CHECK_LOGIN = "check_login" CHECK_LOGIN = "check_login"
SYNC_FILTERS = "sync_filters" # Fetch recruit filter options from site
def make_msg(msg_type: MsgType, **payload) -> dict: def make_msg(msg_type: MsgType, **payload) -> dict:

View File

@@ -2,16 +2,25 @@
""" """
Filter APIs: Filter APIs:
- Recruit filter options (synced from site at worker startup) - Recruit filter options (synced from site at worker startup)
- Trigger filter sync via task dispatch to worker
""" """
import logging
from asgiref.sync import async_to_sync
from rest_framework import status from rest_framework import status
from rest_framework.decorators import api_view from rest_framework.decorators import api_view
from common.protocol import TaskStatus, TaskType
from server.core.response import api_error, api_success from server.core.response import api_error, api_success
from server.models import BossAccount, RecruitFilterSnapshot from server.core.task_dispatcher import task_dispatcher
from server.core.worker_manager import worker_manager
from server.models import BossAccount, RecruitFilterSnapshot, TaskCreate
from server.serializers import ( from server.serializers import (
RecruitFilterSnapshotSerializer, RecruitFilterSnapshotSerializer,
) )
logger = logging.getLogger("server.api.filters")
def _snapshot_payload(snapshot: RecruitFilterSnapshot) -> dict: def _snapshot_payload(snapshot: RecruitFilterSnapshot) -> dict:
data = RecruitFilterSnapshotSerializer(snapshot).data data = RecruitFilterSnapshotSerializer(snapshot).data
@@ -68,3 +77,59 @@ def recruit_filter_options(request):
} }
) )
return api_success(_snapshot_payload(snapshot)) return api_success(_snapshot_payload(snapshot))
@api_view(["POST"])
def recruit_filter_sync(request):
"""
触发筛选条件同步:派发 sync_filters 任务给 Worker。
请求体: { "account_id": 123 }
返回: 创建的任务信息(前端可用 task_id 轮询状态)。
"""
account_id = request.data.get("account_id")
if not account_id:
return api_error(status.HTTP_400_BAD_REQUEST, "请提供 account_id")
try:
account = BossAccount.objects.get(pk=int(account_id))
except (ValueError, BossAccount.DoesNotExist):
return api_error(status.HTTP_404_NOT_FOUND, "账号不存在")
target_worker_id = account.worker_id
if not target_worker_id:
return api_error(status.HTTP_400_BAD_REQUEST, "该账号未绑定 Worker")
if not worker_manager.is_online(target_worker_id):
return api_error(status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线")
req = TaskCreate(
task_type=TaskType.SYNC_FILTERS,
worker_id=target_worker_id,
account_name=account.browser_name,
params={},
)
try:
task = task_dispatcher.create_task(req)
except ValueError as e:
return api_error(status.HTTP_409_CONFLICT, str(e))
send_fn = worker_manager.get_send_fn(target_worker_id)
if not send_fn:
return api_error(status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在")
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if not success:
return api_error(status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}")
logger.info("筛选条件同步任务已派发: task_id=%s, worker=%s, account=%s", task.task_id, target_worker_id, account.browser_name)
return api_success(
{
"task_id": task.task_id,
"task_type": str(task.task_type),
"status": str(task.status),
},
msg="筛选条件同步任务已派发",
http_status=status.HTTP_201_CREATED,
)

View File

@@ -37,6 +37,7 @@ urlpatterns = [
# ─── 招聘筛选快照 ─── # ─── 招聘筛选快照 ───
path("api/filters/options", filters.recruit_filter_options), path("api/filters/options", filters.recruit_filter_options),
path("api/filters/sync", filters.recruit_filter_sync),
# ─── 话术管理 ─── # ─── 话术管理 ───
path("api/scripts", scripts.script_list), path("api/scripts", scripts.script_list),

View File

@@ -34,8 +34,10 @@ def register_all_handlers() -> None:
from worker.tasks.boss_recruit import BossRecruitHandler from worker.tasks.boss_recruit import BossRecruitHandler
from worker.tasks.boss_reply import BossReplyHandler from worker.tasks.boss_reply import BossReplyHandler
from worker.tasks.check_login import CheckLoginHandler from worker.tasks.check_login import CheckLoginHandler
from worker.tasks.sync_filters import SyncFiltersHandler
register_handler(BossRecruitHandler) register_handler(BossRecruitHandler)
register_handler(BossReplyHandler) register_handler(BossReplyHandler)
register_handler(CheckLoginHandler) register_handler(CheckLoginHandler)
register_handler(SyncFiltersHandler)

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
"""
同步招聘筛选条件任务。
流程:调用 main1 从 BOSS 直聘网页抓取筛选项 → 保存到数据库快照表。
"""
from __future__ import annotations
import asyncio
from typing import Any, Callable, Coroutine, Dict
from common.protocol import TaskType
from worker.tasks.base import BaseTaskHandler
from worker.recruit_filter_sync import sync_recruit_filters_for_account
class SyncFiltersHandler(BaseTaskHandler):
"""从 BOSS 直聘网页同步招聘筛选条件。"""
task_type = TaskType.SYNC_FILTERS.value
async def execute(
self,
task_id: str,
params: Dict[str, Any],
progress_cb: Callable[[str, str], Coroutine],
) -> Any:
account_name = params.get("account_name", "")
worker_id = params.get("worker_id", "")
bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345")
cancel_event = params.get("_cancel_event")
self.ensure_not_cancelled(cancel_event)
await progress_cb(task_id, f"正在同步筛选条件: {account_name}...")
payload = await asyncio.get_event_loop().run_in_executor(
None,
self._run_sync,
worker_id,
account_name,
bit_api_base,
cancel_event,
)
self.ensure_not_cancelled(cancel_event)
groups_count = len(payload.get("groups", []))
options_count = len(payload.get("flat_options", []))
await progress_cb(task_id, f"同步完成: {groups_count} 个分组, {options_count} 个筛选项")
return {
"groups": groups_count,
"flat_options": options_count,
"account_name": account_name,
}
def _run_sync(
self,
worker_id: str,
account_name: str,
bit_api_base: str,
cancel_event,
) -> dict:
self.ensure_not_cancelled(cancel_event)
payload = sync_recruit_filters_for_account(
worker_id=worker_id,
bit_api_base=bit_api_base,
account_name=account_name,
logger=self.logger,
)
self.ensure_not_cancelled(cancel_event)
return payload