Files
boss_dp/server/api/tasks.py

299 lines
12 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
2026-02-12 18:22:02 +08:00
任务提交与查询 API需要登录
2026-02-14 16:49:44 +08:00
统一任务入口前端通过 task_type 指定任务类型 check_loginboss_recruit
"""
2026-02-14 14:33:38 +08:00
import json
2026-02-14 16:49:44 +08:00
import logging
2026-03-01 23:18:23 +08:00
from datetime import datetime
2026-03-01 23:39:47 +08:00
from typing import Optional
2026-02-14 14:33:38 +08:00
2026-02-14 16:49:44 +08:00
from asgiref.sync import async_to_sync
from rest_framework import status as http_status
from rest_framework.decorators import api_view
2026-02-14 16:49:44 +08:00
from common.protocol import TaskStatus, TaskType
2026-02-26 01:27:35 +08:00
from server.core.response import api_success, api_error
2026-03-01 23:39:47 +08:00
from server.models import BossAccount, TaskCreate, TaskLog
from server.serializers import TaskCreateSerializer
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
2026-02-14 16:49:44 +08:00
logger = logging.getLogger("server.api.tasks")
2026-03-01 23:18:23 +08:00
def _format_timestamp(ts: float) -> str:
"""将时间戳转换为指定格式的字符串2026-03-01T20:19:51"""
dt = datetime.fromtimestamp(ts)
return dt.strftime("%Y-%m-%dT%H:%M:%S")
2026-02-14 16:49:44 +08:00
def _task_to_dict(t) -> dict:
"""将 TaskInfo 转为可序列化字典。"""
return {
"task_id": t.task_id,
"task_type": t.task_type.value if hasattr(t.task_type, "value") else str(t.task_type),
"status": t.status.value if hasattr(t.status, "value") else str(t.status),
"worker_id": t.worker_id,
"account_name": t.account_name,
"params": t.params,
"progress": t.progress,
"result": t.result,
"error": t.error,
2026-03-01 23:18:23 +08:00
"created_at": _format_timestamp(t.created_at),
"updated_at": _format_timestamp(t.updated_at),
2026-02-14 16:49:44 +08:00
}
2026-03-01 23:39:47 +08:00
def _parse_limit(raw_value, default: int = 50, max_value: int = 200) -> int:
"""解析分页上限,避免非法参数导致 500。"""
try:
value = int(raw_value)
except (TypeError, ValueError):
return default
if value <= 0:
return default
return min(value, max_value)
def _parse_task_status(raw_status: Optional[str]) -> Optional[TaskStatus]:
"""解析任务状态。为空时返回 None。"""
if not raw_status:
return None
try:
return TaskStatus(raw_status)
except ValueError:
return None
def _task_log_account_name(task_log: TaskLog) -> Optional[str]:
"""从任务日志里提取可识别的环境名。"""
result = task_log.result if isinstance(task_log.result, dict) else {}
params = task_log.params if isinstance(task_log.params, dict) else {}
account_name = (
result.get("browser_name")
or result.get("account_name")
or params.get("account_name")
or params.get("browser_name")
)
if not account_name:
return None
return str(account_name).strip() or None
def _task_log_to_dict(task_log: TaskLog, account_name: Optional[str] = None) -> dict:
"""将 TaskLog 转为统一响应结构。"""
return {
"task_id": task_log.task_id,
"task_type": task_log.task_type,
"status": task_log.status,
"worker_id": task_log.worker_id,
"account_name": account_name or _task_log_account_name(task_log),
"params": task_log.params if isinstance(task_log.params, dict) else {},
"progress": None,
"result": task_log.result,
"error": task_log.error,
"created_at": task_log.created_at.strftime("%Y-%m-%dT%H:%M:%S"),
"updated_at": task_log.created_at.strftime("%Y-%m-%dT%H:%M:%S"),
}
def _is_task_log_for_account(task_log: TaskLog, account: BossAccount) -> bool:
"""判断任务日志是否属于某个账号(用于账号任务列表兼容)。"""
if task_log.worker_id and task_log.worker_id != account.worker_id:
return False
if account.current_task_id and task_log.task_id == account.current_task_id:
return True
matched_name = _task_log_account_name(task_log)
if matched_name and matched_name == account.browser_name:
return True
params = task_log.params if isinstance(task_log.params, dict) else {}
account_pk = str(account.pk)
for key in ("id", "account_id", "boss_id"):
value = params.get(key)
if value is None:
continue
if str(value).strip() == account_pk:
return True
return False
def _list_tasks_by_account(account: BossAccount, task_status: Optional[TaskStatus], limit: int) -> list:
"""
聚合某账号的任务列表
1) 内存任务实时
2) TaskLog 历史任务重启后可查
"""
items_by_task_id = {}
memory_limit = max(limit * 3, 100)
memory_tasks = task_dispatcher.list_tasks(worker_id=account.worker_id, status=task_status, limit=memory_limit)
for t in memory_tasks:
if t.account_name != account.browser_name:
continue
items_by_task_id[t.task_id] = _task_to_dict(t)
db_qs = TaskLog.objects.filter(worker_id=account.worker_id).order_by("-created_at")
if task_status:
db_qs = db_qs.filter(status=task_status.value)
# 多取一些做过滤,避免因为条件匹配损耗导致结果太少
for task_log in db_qs[: max(limit * 8, 200)]:
if not _is_task_log_for_account(task_log, account):
continue
if task_log.task_id in items_by_task_id:
continue
items_by_task_id[task_log.task_id] = _task_log_to_dict(task_log, account_name=account.browser_name)
merged = list(items_by_task_id.values())
merged.sort(key=lambda item: item.get("created_at") or "", reverse=True)
return merged[:limit]
2026-02-14 16:49:44 +08:00
@api_view(["GET", "POST"])
def task_list(request):
"""
2026-03-01 23:18:23 +08:00
GET -> 查询任务列表支持以下过滤参数
- ?worker_id= : Worker ID 过滤
- ?account_id= : 按账号 ID 过滤自动查询对应的 account_name
- ?status= : 按任务状态过滤
- ?limit= : 返回条数上限默认 50
2026-02-14 16:49:44 +08:00
POST -> 提交新任务支持 JSON form-data
"""
2026-02-14 16:49:44 +08:00
if request.method == "GET":
wid = request.query_params.get("worker_id")
2026-03-01 23:18:23 +08:00
account_id = request.query_params.get("account_id")
2026-02-14 16:49:44 +08:00
st = request.query_params.get("status")
2026-03-01 23:39:47 +08:00
limit = _parse_limit(request.query_params.get("limit", 50))
task_status = _parse_task_status(st)
if st and task_status is None:
return api_error(http_status.HTTP_400_BAD_REQUEST, f"不支持的任务状态: {st}")
# 如果指定了 account_id直接返回该账号的任务列表兼容前端“查看任务”
2026-03-01 23:18:23 +08:00
if account_id:
try:
account_id = int(account_id)
account = BossAccount.objects.get(pk=account_id)
except (ValueError, BossAccount.DoesNotExist):
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到 id={account_id} 的账号",
)
2026-03-01 23:39:47 +08:00
return api_success(_list_tasks_by_account(account, task_status=task_status, limit=limit))
2026-02-14 16:49:44 +08:00
tasks = task_dispatcher.list_tasks(worker_id=wid, status=task_status, limit=limit)
2026-02-26 01:27:35 +08:00
return api_success([_task_to_dict(t) for t in tasks])
2026-02-14 16:49:44 +08:00
# POST: 提交新任务
data = request.data.copy()
# form-data 中 params 可能是 JSON 字符串
params_raw = data.get("params", {})
2026-02-14 14:33:38 +08:00
if isinstance(params_raw, str):
try:
2026-02-14 16:49:44 +08:00
data["params"] = json.loads(params_raw) if params_raw.strip() else {}
2026-02-14 14:33:38 +08:00
except (json.JSONDecodeError, ValueError):
2026-02-14 16:49:44 +08:00
data["params"] = {}
ser = TaskCreateSerializer(data=data)
2026-02-27 15:51:07 +08:00
if not ser.is_valid():
logger.warning("任务提交参数校验失败: data=%s, errors=%s", data, ser.errors)
from rest_framework.exceptions import ValidationError
raise ValidationError(ser.errors)
2026-02-14 16:49:44 +08:00
2026-02-27 14:33:50 +08:00
validated = ser.validated_data.copy()
2026-02-27 21:08:30 +08:00
# boss_id 即 /api/accounts 返回的 id账号主键与 id/account_id 等价,均按主键查表
account_id = (
validated.pop("id", None)
or validated.pop("account_id", None)
or validated.pop("boss_id", None)
)
2026-02-27 15:42:13 +08:00
if account_id is not None:
try:
account_id = int(account_id)
except (TypeError, ValueError):
account_id = None
2026-02-27 14:33:50 +08:00
req = TaskCreate(**validated)
2026-02-14 16:49:44 +08:00
target_worker_id = req.worker_id or ""
2026-02-27 14:33:50 +08:00
account_name = req.account_name or ""
2026-02-27 21:08:30 +08:00
# 指定 id / account_id / boss_id均为账号主键按主键查表
2026-02-27 15:22:28 +08:00
if account_id:
try:
account = BossAccount.objects.get(pk=account_id)
except BossAccount.DoesNotExist:
2026-02-27 14:33:50 +08:00
return api_error(
http_status.HTTP_404_NOT_FOUND,
2026-02-27 15:22:28 +08:00
f"未找到 id={account_id} 的账号",
2026-02-27 14:33:50 +08:00
)
target_worker_id = account.worker_id
account_name = account.browser_name
req.worker_id = target_worker_id
req.account_name = account_name
if not target_worker_id and account_name:
target_worker_id = worker_manager.find_worker_by_account(account_name)
if not target_worker_id:
2026-02-26 01:27:35 +08:00
return api_error(
http_status.HTTP_404_NOT_FOUND,
2026-02-27 14:33:50 +08:00
f"未找到拥有浏览器 '{account_name}' 的在线 Worker",
)
2026-02-27 14:33:50 +08:00
req.worker_id = target_worker_id
if not target_worker_id:
2026-02-27 20:31:56 +08:00
return api_error(http_status.HTTP_400_BAD_REQUEST, "请指定 id、boss_id、worker_id 或 account_name")
if not worker_manager.is_online(target_worker_id):
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线")
req.worker_id = target_worker_id
task = task_dispatcher.create_task(req)
2026-02-14 16:49:44 +08:00
send_fn = worker_manager.get_send_fn(target_worker_id)
if not send_fn:
task.status = TaskStatus.FAILED
task.error = "Worker WebSocket 连接不存在"
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在")
2026-02-14 16:49:44 +08:00
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if not success:
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}")
worker_manager.set_current_task(target_worker_id, task.task_id)
2026-02-14 16:49:44 +08:00
# check_login 任务:关联账号的任务状态
if req.task_type == TaskType.CHECK_LOGIN and req.account_name:
try:
account = BossAccount.objects.filter(
browser_name=req.account_name, worker_id=target_worker_id,
).first()
if account:
account.current_task_id = task.task_id
2026-03-01 14:02:32 +08:00
# 派发成功后,将状态设为 DISPATCHED不是 task.status.value因为那可能还是 PENDING
account.current_task_status = TaskStatus.DISPATCHED.value
2026-02-14 16:49:44 +08:00
account.save(update_fields=["current_task_id", "current_task_status"])
2026-03-01 14:02:32 +08:00
logger.info("账号 %s 任务关联: task_id=%s, status=%s", req.account_name, task.task_id, account.current_task_status)
2026-02-14 16:49:44 +08:00
except Exception as e:
logger.warning("关联账号任务状态失败: %s", e)
2026-02-26 01:27:35 +08:00
return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED)
2026-02-14 16:49:44 +08:00
@api_view(["GET"])
2026-03-01 23:39:47 +08:00
def task_list_by_account(request, account_id: int):
"""按账号 ID 查询任务列表(不支持按 task_id 查询)。"""
account = BossAccount.objects.filter(pk=account_id).first()
if not account:
return api_error(http_status.HTTP_404_NOT_FOUND, f"账号 {account_id} 不存在")
st = request.query_params.get("status")
task_status = _parse_task_status(st)
if st and task_status is None:
return api_error(http_status.HTTP_400_BAD_REQUEST, f"不支持的任务状态: {st}")
limit = _parse_limit(request.query_params.get("limit", 50))
return api_success(_list_tasks_by_account(account, task_status=task_status, limit=limit))