From 5c9cfada28043cae7fb99141fc48bf6fe3d0c1bd Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Tue, 3 Mar 2026 10:50:32 +0800 Subject: [PATCH] haha --- server/api/tasks.py | 46 +++++++++++++++++++++++++++++++++- server/core/task_dispatcher.py | 9 ++++--- server/urls.py | 1 + server/ws/consumers.py | 26 +++++++++++++------ worker/tasks/base.py | 15 ++++++++++- worker/tasks/boss_recruit.py | 19 +++++++++++--- worker/tasks/check_login.py | 13 +++++++++- worker/ws_client.py | 39 ++++++++++++++++++++++++++-- 8 files changed, 150 insertions(+), 18 deletions(-) diff --git a/server/api/tasks.py b/server/api/tasks.py index 485080d..c571d34 100644 --- a/server/api/tasks.py +++ b/server/api/tasks.py @@ -12,7 +12,7 @@ from asgiref.sync import async_to_sync from rest_framework import status as http_status from rest_framework.decorators import api_view -from common.protocol import TaskStatus, TaskType +from common.protocol import MsgType, TaskStatus, TaskType, make_msg from server.core.response import api_success, api_error from server.models import BossAccount, TaskCreate, TaskLog, Task from server.serializers import TaskCreateSerializer @@ -295,6 +295,50 @@ def task_list(request): return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED) +@api_view(["POST"]) +def task_cancel(request, task_id: str): + """ + 取消任务。 + - 若任务已结束(success/failed/cancelled)返回 409; + - 若任务可取消:先写入 cancelled,再向 Worker 下发 TASK_CANCEL。 + """ + task = Task.objects.filter(task_id=task_id).first() + if not task: + return api_error(http_status.HTTP_404_NOT_FOUND, f"任务 {task_id} 不存在") + + active_status_values = { + TaskStatus.PENDING.value, + TaskStatus.DISPATCHED.value, + TaskStatus.RUNNING.value, + } + if str(task.status) not in active_status_values: + return api_error(http_status.HTTP_409_CONFLICT, f"任务当前状态为 {task.status},不可取消") + + cancelled_task = task_dispatcher.cancel_task(task_id, error="任务已取消") + if not cancelled_task: + return api_error(http_status.HTTP_409_CONFLICT, "任务已结束或已被取消") + + if cancelled_task.worker_id: + worker_manager.set_current_task(cancelled_task.worker_id, None) + + if cancelled_task.account_name: + BossAccount.objects.filter( + worker_id=cancelled_task.worker_id, + browser_name=cancelled_task.account_name, + current_task_id=cancelled_task.task_id, + ).update(current_task_status=TaskStatus.CANCELLED.value) + + send_fn = worker_manager.get_send_fn(cancelled_task.worker_id) + if send_fn: + cancel_msg = make_msg(MsgType.TASK_CANCEL, task_id=cancelled_task.task_id) + try: + async_to_sync(send_fn)(cancel_msg) + except Exception as e: + logger.warning("向 Worker 下发任务取消失败 task_id=%s: %s", cancelled_task.task_id, e) + + return api_success(_task_to_dict(cancelled_task), msg="任务已取消") + + @api_view(["GET"]) def task_list_by_account(request, account_id: int): """ diff --git a/server/core/task_dispatcher.py b/server/core/task_dispatcher.py index ca7a500..5712973 100644 --- a/server/core/task_dispatcher.py +++ b/server/core/task_dispatcher.py @@ -207,19 +207,22 @@ class TaskDispatcher: task.save(update_fields=["status", "result", "error", "updated_at"]) logger.info("任务 %s 完成: status=%s", task_id, task.status) - def cancel_task(self, task_id: str) -> None: + def cancel_task(self, task_id: str, error: str = "任务已取消") -> Optional[Task]: """ 取消任务:仅对活动状态的任务生效。 + 返回取消后的任务对象,若任务不存在或不可取消则返回 None。 """ from django.utils import timezone as tz active_values = [s.value for s in self._ACTIVE_STATUSES] task = Task.objects.filter(task_id=task_id, status__in=active_values).first() if not task: - return + return None task.status = TaskStatus.CANCELLED.value + task.error = error task.updated_at = tz.now() - task.save(update_fields=["status", "updated_at"]) + task.save(update_fields=["status", "error", "updated_at"]) + return task # ─── 查询 ─── diff --git a/server/urls.py b/server/urls.py index 493efd6..23d33db 100644 --- a/server/urls.py +++ b/server/urls.py @@ -22,6 +22,7 @@ urlpatterns = [ # ─── 任务 ─── path("api/tasks", tasks.task_list), + path("api/tasks//cancel", tasks.task_cancel), path("api/tasks/", tasks.task_list_by_account), # ─── 账号 ─── diff --git a/server/ws/consumers.py b/server/ws/consumers.py index 078d247..8d9af40 100644 --- a/server/ws/consumers.py +++ b/server/ws/consumers.py @@ -9,7 +9,7 @@ from asgiref.sync import sync_to_async from channels.generic.websocket import AsyncWebsocketConsumer -from common.protocol import MsgType, TaskStatus, TaskType, make_msg +from common.protocol import MsgType, TaskStatus, make_msg from server.core.worker_manager import worker_manager logger = logging.getLogger("server.ws") @@ -155,6 +155,8 @@ class WorkerConsumer(AsyncWebsocketConsumer): """ if error is not None: msg = str(error).strip() + if msg in {"任务已取消", "task cancelled", "cancelled"}: + return None return msg or "任务执行失败" if isinstance(result, dict): @@ -184,13 +186,23 @@ class WorkerConsumer(AsyncWebsocketConsumer): if not task: return None - failure_reason = self._infer_failure_reason(result, error) - if failure_reason: - final_status = TaskStatus.FAILED.value - final_error = failure_reason + if task.status == TaskStatus.CANCELLED.value: + final_status = TaskStatus.CANCELLED.value + final_error = task.error or "任务已取消" else: - final_status = TaskStatus.SUCCESS.value - final_error = None + cancel_markers = {"任务已取消", "task cancelled", "cancelled"} + error_text = str(error).strip() if error is not None else "" + if error_text in cancel_markers: + final_status = TaskStatus.CANCELLED.value + final_error = "任务已取消" + else: + failure_reason = self._infer_failure_reason(result, error) + if failure_reason: + final_status = TaskStatus.FAILED.value + final_error = failure_reason + else: + final_status = TaskStatus.SUCCESS.value + final_error = None task.status = final_status task.result = result diff --git a/worker/tasks/base.py b/worker/tasks/base.py index e2aaaf6..12b1378 100644 --- a/worker/tasks/base.py +++ b/worker/tasks/base.py @@ -7,7 +7,11 @@ from __future__ import annotations import logging from abc import ABC, abstractmethod -from typing import Any, Callable, Coroutine, Dict, Optional +from typing import Any, Callable, Coroutine, Dict + + +class TaskCancelledError(Exception): + """任务被取消时抛出的异常。""" class BaseTaskHandler(ABC): @@ -27,6 +31,15 @@ class BaseTaskHandler(ABC): def __init__(self) -> None: self.logger = logging.getLogger(f"worker.tasks.{self.task_type or self.__class__.__name__}") + @staticmethod + def is_cancel_requested(cancel_event) -> bool: + return bool(cancel_event and getattr(cancel_event, "is_set", None) and cancel_event.is_set()) + + @classmethod + def ensure_not_cancelled(cls, cancel_event) -> None: + if cls.is_cancel_requested(cancel_event): + raise TaskCancelledError("任务已取消") + @abstractmethod async def execute( self, diff --git a/worker/tasks/boss_recruit.py b/worker/tasks/boss_recruit.py index 5fb1f08..5c9f0e2 100644 --- a/worker/tasks/boss_recruit.py +++ b/worker/tasks/boss_recruit.py @@ -18,7 +18,7 @@ from typing import Any, Callable, Coroutine, Dict, List from common.protocol import TaskType from worker.bit_browser import BitBrowserAPI from worker.browser_control import connect_browser -from worker.tasks.base import BaseTaskHandler +from worker.tasks.base import BaseTaskHandler, TaskCancelledError CHAT_INDEX_URL = "https://www.zhipin.com/web/chat/index" FRIEND_LIST_API = "wapi/zprelation/friend/getBossFriendListV2" @@ -61,7 +61,9 @@ class BossRecruitHandler(BaseTaskHandler): account_name = params.get("account_name", "") account_id = params.get("account_id", "") bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345") + cancel_event = params.get("_cancel_event") + self.ensure_not_cancelled(cancel_event) await progress_cb(task_id, "正在打开比特浏览器...") result = await asyncio.get_event_loop().run_in_executor( @@ -74,6 +76,7 @@ class BossRecruitHandler(BaseTaskHandler): account_id, bit_api_base, progress_cb, + cancel_event, ) return result @@ -86,10 +89,12 @@ class BossRecruitHandler(BaseTaskHandler): account_id: str, bit_api_base: str, progress_cb: Callable, + cancel_event, ) -> dict: """同步执行浏览器自动化(在线程池中运行)。""" _ = (task_id, progress_cb) + self.ensure_not_cancelled(cancel_event) bit_api = BitBrowserAPI(bit_api_base) addr, port = bit_api.get_browser_for_drission( browser_id=account_id or None, @@ -100,7 +105,7 @@ class BossRecruitHandler(BaseTaskHandler): browser = connect_browser(port=port) tab = browser.latest_tab - flow_result = self._recruit_flow_like_script(tab, job_title, max_greet) + flow_result = self._recruit_flow_like_script(tab, job_title, max_greet, cancel_event) collected = flow_result["details"] errors = flow_result["errors"] @@ -123,11 +128,12 @@ class BossRecruitHandler(BaseTaskHandler): return result - def _recruit_flow_like_script(self, tab, job_title: str, max_greet: int) -> dict: + def _recruit_flow_like_script(self, tab, job_title: str, max_greet: int, cancel_event) -> dict: """按 boss_dp/自动化.py 的 main 流程执行。""" collected: List[dict] = [] errors: List[str] = [] + self.ensure_not_cancelled(cancel_event) try: friend_list = self._open_chat_and_fetch_friend_list(tab) except Exception as e: @@ -146,6 +152,7 @@ class BossRecruitHandler(BaseTaskHandler): for i, friend in enumerate(friend_list[:total], start=1): try: + self.ensure_not_cancelled(cancel_event) name = str(friend.get("name", "")).strip() or f"候选人{i}" friend_job_name = str(friend.get("jobName", "")).strip() friend_job_id = str(friend.get("jobId", "")).strip() @@ -156,6 +163,7 @@ class BossRecruitHandler(BaseTaskHandler): continue messages = self._wait_history_messages(tab) + self.ensure_not_cancelled(cancel_event) has_contact_keyword = self._has_contact_keyword(messages) action_state = { @@ -165,6 +173,7 @@ class BossRecruitHandler(BaseTaskHandler): "exchange_confirmed": False, } if not has_contact_keyword: + self.ensure_not_cancelled(cancel_event) action_state = self._ask_and_exchange_wechat_like_script(tab) contacts = self._extract_contacts(messages) @@ -179,11 +188,15 @@ class BossRecruitHandler(BaseTaskHandler): **action_state, } ) + except TaskCancelledError: + self.logger.info("任务执行中收到取消信号,提前结束") + break except Exception as e: err_msg = f"处理第 {i} 个会话出错: {e}" self.logger.error(err_msg) errors.append(err_msg) + self.ensure_not_cancelled(cancel_event) return {"details": collected, "errors": errors} def _open_chat_and_fetch_friend_list(self, tab) -> list: diff --git a/worker/tasks/check_login.py b/worker/tasks/check_login.py index 88fafb7..a63e8f8 100644 --- a/worker/tasks/check_login.py +++ b/worker/tasks/check_login.py @@ -37,15 +37,21 @@ class CheckLoginHandler(BaseTaskHandler): account_name = params.get("account_name", "") account_id = params.get("account_id", "") bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345") + cancel_event = params.get("_cancel_event") + self.ensure_not_cancelled(cancel_event) await progress_cb(task_id, f"正在比特浏览器中查找环境: {account_name}...") result = await asyncio.get_event_loop().run_in_executor( None, self._run_sync, - account_name, account_id, bit_api_base, + account_name, + account_id, + bit_api_base, + cancel_event, ) + self.ensure_not_cancelled(cancel_event) status_text = "已登录" if result["is_logged_in"] else "未登录" await progress_cb(task_id, f"检测完成: {result['browser_name']} → {status_text} ({result['boss_username']})") return result @@ -55,8 +61,10 @@ class CheckLoginHandler(BaseTaskHandler): account_name: str, account_id: str, bit_api_base: str, + cancel_event, ) -> dict: """同步执行(线程池中运行)。""" + self.ensure_not_cancelled(cancel_event) bit_api = BitBrowserAPI(bit_api_base) # ── 1. 在比特浏览器分组中查找匹配的环境 ── @@ -100,6 +108,7 @@ class CheckLoginHandler(BaseTaskHandler): if not browser_id: raise RuntimeError("未指定 account_name 或 account_id,无法确定浏览器环境") + self.ensure_not_cancelled(cancel_event) # ── 2. 打开该浏览器 ── cdp_addr, port, browser_id = bit_api.open_browser(browser_id=browser_id) self.logger.info("已打开浏览器 %s (%s), CDP: %s", browser_name, browser_id, cdp_addr) @@ -112,6 +121,7 @@ class CheckLoginHandler(BaseTaskHandler): tab.get(CHAT_INDEX_URL) tab.wait.load_start() human_delay(3.0, 5.0) + self.ensure_not_cancelled(cancel_event) # ── 5. 查找 .user-name 元素,判断是否已登录 ── boss_username = "" @@ -140,6 +150,7 @@ class CheckLoginHandler(BaseTaskHandler): except Exception as e: self.logger.warning("环境 %s 查找 .user-name 失败: %s", browser_name, e) + self.ensure_not_cancelled(cancel_event) return { "browser_id": browser_id, "browser_name": browser_name, diff --git a/worker/ws_client.py b/worker/ws_client.py index a88a807..f803441 100644 --- a/worker/ws_client.py +++ b/worker/ws_client.py @@ -8,8 +8,9 @@ from __future__ import annotations import asyncio import json import logging +import threading import time -from typing import List, Optional +from typing import Dict, List, Optional, Set import websockets from websockets.exceptions import ConnectionClosed @@ -17,6 +18,7 @@ from websockets.exceptions import ConnectionClosed from common.protocol import MsgType, TaskStatus, make_msg from worker import config from worker.bit_browser import BitBrowserAPI +from worker.tasks.base import TaskCancelledError from worker.tasks.registry import get_handler logger = logging.getLogger("worker.ws_client") @@ -41,6 +43,8 @@ class WorkerWSClient: self._reconnect_delay = config.RECONNECT_DELAY self._last_browsers: List[dict] = [] self._heartbeat_count = 0 + self._cancel_events: Dict[str, threading.Event] = {} + self._cancelled_tasks: Set[str] = set() # ────────────────────────── 主循环 ────────────────────────── @@ -156,7 +160,7 @@ class WorkerWSClient: elif msg_type == MsgType.TASK_CANCEL.value: task_id = data.get("task_id", "") - logger.info("收到任务取消: %s(暂不支持中途取消)", task_id) + await self._handle_task_cancel(task_id) elif msg_type == MsgType.ERROR.value: logger.error("服务器错误: %s", data.get("detail", "")) @@ -171,10 +175,18 @@ class WorkerWSClient: account_name = data.get("account_name", "") params = data.get("params", {}) + pre_cancelled = task_id in self._cancelled_tasks + cancel_event = threading.Event() + if pre_cancelled: + cancel_event.set() + self._cancelled_tasks.discard(task_id) + self._cancel_events[task_id] = cancel_event + # 将 account_name 注入 params(供 handler 使用) if account_name: params.setdefault("account_name", account_name) params.setdefault("bit_api_base", self.bit_api.base_url) + params["_cancel_event"] = cancel_event logger.info("收到任务: %s (type=%s)", task_id, task_type) @@ -195,11 +207,34 @@ class WorkerWSClient: # 执行任务 try: + if cancel_event.is_set(): + raise TaskCancelledError("任务已取消") result = await handler.execute(task_id, params, progress_cb) + if cancel_event.is_set(): + raise TaskCancelledError("任务已取消") await self._send_result(ws, task_id, result=result) + except TaskCancelledError: + logger.info("任务 %s 已取消", task_id) + await self._send_result(ws, task_id, error="任务已取消") except Exception as e: logger.error("任务 %s 执行失败: %s", task_id, e, exc_info=True) await self._send_result(ws, task_id, error=str(e)) + finally: + params.pop("_cancel_event", None) + self._cancel_events.pop(task_id, None) + + async def _handle_task_cancel(self, task_id: str) -> None: + """处理服务端下发的任务取消。""" + if not task_id: + return + + cancel_event = self._cancel_events.get(task_id) + if cancel_event: + cancel_event.set() + logger.info("收到任务取消: %s,已标记取消", task_id) + else: + self._cancelled_tasks.add(task_id) + logger.info("收到任务取消: %s,任务尚未执行,已记录预取消", task_id) async def _send_result(self, ws, task_id: str, result=None, error: str = None) -> None: """上报任务最终结果。"""