Files
boss_dp/server/api/tasks.py
2026-03-01 23:18:23 +08:00

188 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
任务提交与查询 API需要登录
统一任务入口:前端通过 task_type 指定任务类型(如 check_login、boss_recruit
"""
import json
import logging
from datetime import datetime
from asgiref.sync import async_to_sync
from rest_framework import status as http_status
from rest_framework.decorators import api_view
from common.protocol import TaskStatus, TaskType
from server.core.response import api_success, api_error
from server.models import BossAccount, TaskCreate
from server.serializers import TaskCreateSerializer, TaskOutSerializer
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
logger = logging.getLogger("server.api.tasks")
def _format_timestamp(ts: float) -> str:
"""将时间戳转换为指定格式的字符串2026-03-01T20:19:51"""
dt = datetime.fromtimestamp(ts)
return dt.strftime("%Y-%m-%dT%H:%M:%S")
def _task_to_dict(t) -> dict:
"""将 TaskInfo 转为可序列化字典。"""
return {
"task_id": t.task_id,
"task_type": t.task_type.value if hasattr(t.task_type, "value") else str(t.task_type),
"status": t.status.value if hasattr(t.status, "value") else str(t.status),
"worker_id": t.worker_id,
"account_name": t.account_name,
"params": t.params,
"progress": t.progress,
"result": t.result,
"error": t.error,
"created_at": _format_timestamp(t.created_at),
"updated_at": _format_timestamp(t.updated_at),
}
@api_view(["GET", "POST"])
def task_list(request):
"""
GET -> 查询任务列表,支持以下过滤参数:
- ?worker_id= : 按 Worker ID 过滤
- ?account_id= : 按账号 ID 过滤(自动查询对应的 account_name
- ?status= : 按任务状态过滤
- ?limit= : 返回条数上限,默认 50
POST -> 提交新任务(支持 JSON 和 form-data
"""
if request.method == "GET":
wid = request.query_params.get("worker_id")
account_id = request.query_params.get("account_id")
st = request.query_params.get("status")
limit = int(request.query_params.get("limit", 50))
task_status = TaskStatus(st) if st else None
# 如果指定了 account_id先查询账号信息转换为 account_name 过滤
account_name = None
if account_id:
try:
account_id = int(account_id)
account = BossAccount.objects.get(pk=account_id)
account_name = account.browser_name
if not wid:
wid = account.worker_id
except (ValueError, BossAccount.DoesNotExist):
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到 id={account_id} 的账号",
)
tasks = task_dispatcher.list_tasks(worker_id=wid, status=task_status, limit=limit)
# 按 account_name 进一步过滤(如果指定了 account_id
if account_name:
tasks = [t for t in tasks if t.account_name == account_name]
return api_success([_task_to_dict(t) for t in tasks])
# POST: 提交新任务
data = request.data.copy()
# form-data 中 params 可能是 JSON 字符串
params_raw = data.get("params", {})
if isinstance(params_raw, str):
try:
data["params"] = json.loads(params_raw) if params_raw.strip() else {}
except (json.JSONDecodeError, ValueError):
data["params"] = {}
ser = TaskCreateSerializer(data=data)
if not ser.is_valid():
logger.warning("任务提交参数校验失败: data=%s, errors=%s", data, ser.errors)
from rest_framework.exceptions import ValidationError
raise ValidationError(ser.errors)
validated = ser.validated_data.copy()
# boss_id 即 /api/accounts 返回的 id账号主键与 id/account_id 等价,均按主键查表
account_id = (
validated.pop("id", None)
or validated.pop("account_id", None)
or validated.pop("boss_id", None)
)
if account_id is not None:
try:
account_id = int(account_id)
except (TypeError, ValueError):
account_id = None
req = TaskCreate(**validated)
target_worker_id = req.worker_id or ""
account_name = req.account_name or ""
# 指定 id / account_id / boss_id均为账号主键按主键查表
if account_id:
try:
account = BossAccount.objects.get(pk=account_id)
except BossAccount.DoesNotExist:
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到 id={account_id} 的账号",
)
target_worker_id = account.worker_id
account_name = account.browser_name
req.worker_id = target_worker_id
req.account_name = account_name
if not target_worker_id and account_name:
target_worker_id = worker_manager.find_worker_by_account(account_name)
if not target_worker_id:
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到拥有浏览器 '{account_name}' 的在线 Worker",
)
req.worker_id = target_worker_id
if not target_worker_id:
return api_error(http_status.HTTP_400_BAD_REQUEST, "请指定 id、boss_id、worker_id 或 account_name")
if not worker_manager.is_online(target_worker_id):
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线")
req.worker_id = target_worker_id
task = task_dispatcher.create_task(req)
send_fn = worker_manager.get_send_fn(target_worker_id)
if not send_fn:
task.status = TaskStatus.FAILED
task.error = "Worker WebSocket 连接不存在"
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在")
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if not success:
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}")
worker_manager.set_current_task(target_worker_id, task.task_id)
# check_login 任务:关联账号的任务状态
if req.task_type == TaskType.CHECK_LOGIN and req.account_name:
try:
account = BossAccount.objects.filter(
browser_name=req.account_name, worker_id=target_worker_id,
).first()
if account:
account.current_task_id = task.task_id
# 派发成功后,将状态设为 DISPATCHED不是 task.status.value因为那可能还是 PENDING
account.current_task_status = TaskStatus.DISPATCHED.value
account.save(update_fields=["current_task_id", "current_task_status"])
logger.info("账号 %s 任务关联: task_id=%s, status=%s", req.account_name, task.task_id, account.current_task_status)
except Exception as e:
logger.warning("关联账号任务状态失败: %s", e)
return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED)
@api_view(["GET"])
def task_detail(request, task_id):
"""查询指定任务的状态和结果。"""
task = task_dispatcher.get_task(task_id)
if not task:
return api_error(http_status.HTTP_404_NOT_FOUND, f"任务 {task_id} 不存在")
return api_success(_task_to_dict(task))