This commit is contained in:
ddrwode
2026-03-03 10:50:32 +08:00
parent efb05ae172
commit 5c9cfada28
8 changed files with 150 additions and 18 deletions

View File

@@ -12,7 +12,7 @@ from asgiref.sync import async_to_sync
from rest_framework import status as http_status
from rest_framework.decorators import api_view
from common.protocol import TaskStatus, TaskType
from common.protocol import MsgType, TaskStatus, TaskType, make_msg
from server.core.response import api_success, api_error
from server.models import BossAccount, TaskCreate, TaskLog, Task
from server.serializers import TaskCreateSerializer
@@ -295,6 +295,50 @@ def task_list(request):
return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED)
@api_view(["POST"])
def task_cancel(request, task_id: str):
"""
取消任务。
- 若任务已结束success/failed/cancelled返回 409
- 若任务可取消:先写入 cancelled再向 Worker 下发 TASK_CANCEL。
"""
task = Task.objects.filter(task_id=task_id).first()
if not task:
return api_error(http_status.HTTP_404_NOT_FOUND, f"任务 {task_id} 不存在")
active_status_values = {
TaskStatus.PENDING.value,
TaskStatus.DISPATCHED.value,
TaskStatus.RUNNING.value,
}
if str(task.status) not in active_status_values:
return api_error(http_status.HTTP_409_CONFLICT, f"任务当前状态为 {task.status},不可取消")
cancelled_task = task_dispatcher.cancel_task(task_id, error="任务已取消")
if not cancelled_task:
return api_error(http_status.HTTP_409_CONFLICT, "任务已结束或已被取消")
if cancelled_task.worker_id:
worker_manager.set_current_task(cancelled_task.worker_id, None)
if cancelled_task.account_name:
BossAccount.objects.filter(
worker_id=cancelled_task.worker_id,
browser_name=cancelled_task.account_name,
current_task_id=cancelled_task.task_id,
).update(current_task_status=TaskStatus.CANCELLED.value)
send_fn = worker_manager.get_send_fn(cancelled_task.worker_id)
if send_fn:
cancel_msg = make_msg(MsgType.TASK_CANCEL, task_id=cancelled_task.task_id)
try:
async_to_sync(send_fn)(cancel_msg)
except Exception as e:
logger.warning("向 Worker 下发任务取消失败 task_id=%s: %s", cancelled_task.task_id, e)
return api_success(_task_to_dict(cancelled_task), msg="任务已取消")
@api_view(["GET"])
def task_list_by_account(request, account_id: int):
"""

View File

@@ -207,19 +207,22 @@ class TaskDispatcher:
task.save(update_fields=["status", "result", "error", "updated_at"])
logger.info("任务 %s 完成: status=%s", task_id, task.status)
def cancel_task(self, task_id: str) -> None:
def cancel_task(self, task_id: str, error: str = "任务已取消") -> Optional[Task]:
"""
取消任务:仅对活动状态的任务生效。
返回取消后的任务对象,若任务不存在或不可取消则返回 None。
"""
from django.utils import timezone as tz
active_values = [s.value for s in self._ACTIVE_STATUSES]
task = Task.objects.filter(task_id=task_id, status__in=active_values).first()
if not task:
return
return None
task.status = TaskStatus.CANCELLED.value
task.error = error
task.updated_at = tz.now()
task.save(update_fields=["status", "updated_at"])
task.save(update_fields=["status", "error", "updated_at"])
return task
# ─── 查询 ───

View File

@@ -22,6 +22,7 @@ urlpatterns = [
# ─── 任务 ───
path("api/tasks", tasks.task_list),
path("api/tasks/<str:task_id>/cancel", tasks.task_cancel),
path("api/tasks/<int:account_id>", tasks.task_list_by_account),
# ─── 账号 ───

View File

@@ -9,7 +9,7 @@ from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from common.protocol import MsgType, TaskStatus, TaskType, make_msg
from common.protocol import MsgType, TaskStatus, make_msg
from server.core.worker_manager import worker_manager
logger = logging.getLogger("server.ws")
@@ -155,6 +155,8 @@ class WorkerConsumer(AsyncWebsocketConsumer):
"""
if error is not None:
msg = str(error).strip()
if msg in {"任务已取消", "task cancelled", "cancelled"}:
return None
return msg or "任务执行失败"
if isinstance(result, dict):
@@ -184,13 +186,23 @@ class WorkerConsumer(AsyncWebsocketConsumer):
if not task:
return None
failure_reason = self._infer_failure_reason(result, error)
if failure_reason:
final_status = TaskStatus.FAILED.value
final_error = failure_reason
if task.status == TaskStatus.CANCELLED.value:
final_status = TaskStatus.CANCELLED.value
final_error = task.error or "任务已取消"
else:
final_status = TaskStatus.SUCCESS.value
final_error = None
cancel_markers = {"任务已取消", "task cancelled", "cancelled"}
error_text = str(error).strip() if error is not None else ""
if error_text in cancel_markers:
final_status = TaskStatus.CANCELLED.value
final_error = "任务已取消"
else:
failure_reason = self._infer_failure_reason(result, error)
if failure_reason:
final_status = TaskStatus.FAILED.value
final_error = failure_reason
else:
final_status = TaskStatus.SUCCESS.value
final_error = None
task.status = final_status
task.result = result

View File

@@ -7,7 +7,11 @@ from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Coroutine, Dict, Optional
from typing import Any, Callable, Coroutine, Dict
class TaskCancelledError(Exception):
"""任务被取消时抛出的异常。"""
class BaseTaskHandler(ABC):
@@ -27,6 +31,15 @@ class BaseTaskHandler(ABC):
def __init__(self) -> None:
self.logger = logging.getLogger(f"worker.tasks.{self.task_type or self.__class__.__name__}")
@staticmethod
def is_cancel_requested(cancel_event) -> bool:
return bool(cancel_event and getattr(cancel_event, "is_set", None) and cancel_event.is_set())
@classmethod
def ensure_not_cancelled(cls, cancel_event) -> None:
if cls.is_cancel_requested(cancel_event):
raise TaskCancelledError("任务已取消")
@abstractmethod
async def execute(
self,

View File

@@ -18,7 +18,7 @@ from typing import Any, Callable, Coroutine, Dict, List
from common.protocol import TaskType
from worker.bit_browser import BitBrowserAPI
from worker.browser_control import connect_browser
from worker.tasks.base import BaseTaskHandler
from worker.tasks.base import BaseTaskHandler, TaskCancelledError
CHAT_INDEX_URL = "https://www.zhipin.com/web/chat/index"
FRIEND_LIST_API = "wapi/zprelation/friend/getBossFriendListV2"
@@ -61,7 +61,9 @@ class BossRecruitHandler(BaseTaskHandler):
account_name = params.get("account_name", "")
account_id = params.get("account_id", "")
bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345")
cancel_event = params.get("_cancel_event")
self.ensure_not_cancelled(cancel_event)
await progress_cb(task_id, "正在打开比特浏览器...")
result = await asyncio.get_event_loop().run_in_executor(
@@ -74,6 +76,7 @@ class BossRecruitHandler(BaseTaskHandler):
account_id,
bit_api_base,
progress_cb,
cancel_event,
)
return result
@@ -86,10 +89,12 @@ class BossRecruitHandler(BaseTaskHandler):
account_id: str,
bit_api_base: str,
progress_cb: Callable,
cancel_event,
) -> dict:
"""同步执行浏览器自动化(在线程池中运行)。"""
_ = (task_id, progress_cb)
self.ensure_not_cancelled(cancel_event)
bit_api = BitBrowserAPI(bit_api_base)
addr, port = bit_api.get_browser_for_drission(
browser_id=account_id or None,
@@ -100,7 +105,7 @@ class BossRecruitHandler(BaseTaskHandler):
browser = connect_browser(port=port)
tab = browser.latest_tab
flow_result = self._recruit_flow_like_script(tab, job_title, max_greet)
flow_result = self._recruit_flow_like_script(tab, job_title, max_greet, cancel_event)
collected = flow_result["details"]
errors = flow_result["errors"]
@@ -123,11 +128,12 @@ class BossRecruitHandler(BaseTaskHandler):
return result
def _recruit_flow_like_script(self, tab, job_title: str, max_greet: int) -> dict:
def _recruit_flow_like_script(self, tab, job_title: str, max_greet: int, cancel_event) -> dict:
"""按 boss_dp/自动化.py 的 main 流程执行。"""
collected: List[dict] = []
errors: List[str] = []
self.ensure_not_cancelled(cancel_event)
try:
friend_list = self._open_chat_and_fetch_friend_list(tab)
except Exception as e:
@@ -146,6 +152,7 @@ class BossRecruitHandler(BaseTaskHandler):
for i, friend in enumerate(friend_list[:total], start=1):
try:
self.ensure_not_cancelled(cancel_event)
name = str(friend.get("name", "")).strip() or f"候选人{i}"
friend_job_name = str(friend.get("jobName", "")).strip()
friend_job_id = str(friend.get("jobId", "")).strip()
@@ -156,6 +163,7 @@ class BossRecruitHandler(BaseTaskHandler):
continue
messages = self._wait_history_messages(tab)
self.ensure_not_cancelled(cancel_event)
has_contact_keyword = self._has_contact_keyword(messages)
action_state = {
@@ -165,6 +173,7 @@ class BossRecruitHandler(BaseTaskHandler):
"exchange_confirmed": False,
}
if not has_contact_keyword:
self.ensure_not_cancelled(cancel_event)
action_state = self._ask_and_exchange_wechat_like_script(tab)
contacts = self._extract_contacts(messages)
@@ -179,11 +188,15 @@ class BossRecruitHandler(BaseTaskHandler):
**action_state,
}
)
except TaskCancelledError:
self.logger.info("任务执行中收到取消信号,提前结束")
break
except Exception as e:
err_msg = f"处理第 {i} 个会话出错: {e}"
self.logger.error(err_msg)
errors.append(err_msg)
self.ensure_not_cancelled(cancel_event)
return {"details": collected, "errors": errors}
def _open_chat_and_fetch_friend_list(self, tab) -> list:

View File

@@ -37,15 +37,21 @@ class CheckLoginHandler(BaseTaskHandler):
account_name = params.get("account_name", "")
account_id = params.get("account_id", "")
bit_api_base = params.get("bit_api_base", "http://127.0.0.1:54345")
cancel_event = params.get("_cancel_event")
self.ensure_not_cancelled(cancel_event)
await progress_cb(task_id, f"正在比特浏览器中查找环境: {account_name}...")
result = await asyncio.get_event_loop().run_in_executor(
None,
self._run_sync,
account_name, account_id, bit_api_base,
account_name,
account_id,
bit_api_base,
cancel_event,
)
self.ensure_not_cancelled(cancel_event)
status_text = "已登录" if result["is_logged_in"] else "未登录"
await progress_cb(task_id, f"检测完成: {result['browser_name']}{status_text} ({result['boss_username']})")
return result
@@ -55,8 +61,10 @@ class CheckLoginHandler(BaseTaskHandler):
account_name: str,
account_id: str,
bit_api_base: str,
cancel_event,
) -> dict:
"""同步执行(线程池中运行)。"""
self.ensure_not_cancelled(cancel_event)
bit_api = BitBrowserAPI(bit_api_base)
# ── 1. 在比特浏览器分组中查找匹配的环境 ──
@@ -100,6 +108,7 @@ class CheckLoginHandler(BaseTaskHandler):
if not browser_id:
raise RuntimeError("未指定 account_name 或 account_id无法确定浏览器环境")
self.ensure_not_cancelled(cancel_event)
# ── 2. 打开该浏览器 ──
cdp_addr, port, browser_id = bit_api.open_browser(browser_id=browser_id)
self.logger.info("已打开浏览器 %s (%s), CDP: %s", browser_name, browser_id, cdp_addr)
@@ -112,6 +121,7 @@ class CheckLoginHandler(BaseTaskHandler):
tab.get(CHAT_INDEX_URL)
tab.wait.load_start()
human_delay(3.0, 5.0)
self.ensure_not_cancelled(cancel_event)
# ── 5. 查找 .user-name 元素,判断是否已登录 ──
boss_username = ""
@@ -140,6 +150,7 @@ class CheckLoginHandler(BaseTaskHandler):
except Exception as e:
self.logger.warning("环境 %s 查找 .user-name 失败: %s", browser_name, e)
self.ensure_not_cancelled(cancel_event)
return {
"browser_id": browser_id,
"browser_name": browser_name,

View File

@@ -8,8 +8,9 @@ from __future__ import annotations
import asyncio
import json
import logging
import threading
import time
from typing import List, Optional
from typing import Dict, List, Optional, Set
import websockets
from websockets.exceptions import ConnectionClosed
@@ -17,6 +18,7 @@ from websockets.exceptions import ConnectionClosed
from common.protocol import MsgType, TaskStatus, make_msg
from worker import config
from worker.bit_browser import BitBrowserAPI
from worker.tasks.base import TaskCancelledError
from worker.tasks.registry import get_handler
logger = logging.getLogger("worker.ws_client")
@@ -41,6 +43,8 @@ class WorkerWSClient:
self._reconnect_delay = config.RECONNECT_DELAY
self._last_browsers: List[dict] = []
self._heartbeat_count = 0
self._cancel_events: Dict[str, threading.Event] = {}
self._cancelled_tasks: Set[str] = set()
# ────────────────────────── 主循环 ──────────────────────────
@@ -156,7 +160,7 @@ class WorkerWSClient:
elif msg_type == MsgType.TASK_CANCEL.value:
task_id = data.get("task_id", "")
logger.info("收到任务取消: %s(暂不支持中途取消)", task_id)
await self._handle_task_cancel(task_id)
elif msg_type == MsgType.ERROR.value:
logger.error("服务器错误: %s", data.get("detail", ""))
@@ -171,10 +175,18 @@ class WorkerWSClient:
account_name = data.get("account_name", "")
params = data.get("params", {})
pre_cancelled = task_id in self._cancelled_tasks
cancel_event = threading.Event()
if pre_cancelled:
cancel_event.set()
self._cancelled_tasks.discard(task_id)
self._cancel_events[task_id] = cancel_event
# 将 account_name 注入 params供 handler 使用)
if account_name:
params.setdefault("account_name", account_name)
params.setdefault("bit_api_base", self.bit_api.base_url)
params["_cancel_event"] = cancel_event
logger.info("收到任务: %s (type=%s)", task_id, task_type)
@@ -195,11 +207,34 @@ class WorkerWSClient:
# 执行任务
try:
if cancel_event.is_set():
raise TaskCancelledError("任务已取消")
result = await handler.execute(task_id, params, progress_cb)
if cancel_event.is_set():
raise TaskCancelledError("任务已取消")
await self._send_result(ws, task_id, result=result)
except TaskCancelledError:
logger.info("任务 %s 已取消", task_id)
await self._send_result(ws, task_id, error="任务已取消")
except Exception as e:
logger.error("任务 %s 执行失败: %s", task_id, e, exc_info=True)
await self._send_result(ws, task_id, error=str(e))
finally:
params.pop("_cancel_event", None)
self._cancel_events.pop(task_id, None)
async def _handle_task_cancel(self, task_id: str) -> None:
"""处理服务端下发的任务取消。"""
if not task_id:
return
cancel_event = self._cancel_events.get(task_id)
if cancel_event:
cancel_event.set()
logger.info("收到任务取消: %s,已标记取消", task_id)
else:
self._cancelled_tasks.add(task_id)
logger.info("收到任务取消: %s,任务尚未执行,已记录预取消", task_id)
async def _send_result(self, ws, task_id: str, result=None, error: str = None) -> None:
"""上报任务最终结果。"""