Files
boss_dp/server/api/tasks.py
2026-02-27 20:31:56 +08:00

162 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
任务提交与查询 API需要登录
统一任务入口:前端通过 task_type 指定任务类型(如 check_login、boss_recruit
"""
import json
import logging
from asgiref.sync import async_to_sync
from rest_framework import status as http_status
from rest_framework.decorators import api_view
from common.protocol import TaskStatus, TaskType
from server.core.response import api_success, api_error
from server.models import BossAccount, TaskCreate
from server.serializers import TaskCreateSerializer, TaskOutSerializer
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
logger = logging.getLogger("server.api.tasks")
def _task_to_dict(t) -> dict:
"""将 TaskInfo 转为可序列化字典。"""
return {
"task_id": t.task_id,
"task_type": t.task_type.value if hasattr(t.task_type, "value") else str(t.task_type),
"status": t.status.value if hasattr(t.status, "value") else str(t.status),
"worker_id": t.worker_id,
"account_name": t.account_name,
"params": t.params,
"progress": t.progress,
"result": t.result,
"error": t.error,
"created_at": t.created_at,
"updated_at": t.updated_at,
}
@api_view(["GET", "POST"])
def task_list(request):
"""
GET -> 查询任务列表,支持 ?worker_id= / ?status= / ?limit= 过滤
POST -> 提交新任务(支持 JSON 和 form-data
"""
if request.method == "GET":
wid = request.query_params.get("worker_id")
st = request.query_params.get("status")
limit = int(request.query_params.get("limit", 50))
task_status = TaskStatus(st) if st else None
tasks = task_dispatcher.list_tasks(worker_id=wid, status=task_status, limit=limit)
return api_success([_task_to_dict(t) for t in tasks])
# POST: 提交新任务
data = request.data.copy()
# form-data 中 params 可能是 JSON 字符串
params_raw = data.get("params", {})
if isinstance(params_raw, str):
try:
data["params"] = json.loads(params_raw) if params_raw.strip() else {}
except (json.JSONDecodeError, ValueError):
data["params"] = {}
ser = TaskCreateSerializer(data=data)
if not ser.is_valid():
logger.warning("任务提交参数校验失败: data=%s, errors=%s", data, ser.errors)
from rest_framework.exceptions import ValidationError
raise ValidationError(ser.errors)
validated = ser.validated_data.copy()
account_id = validated.pop("id", None) or validated.pop("account_id", None)
boss_id = (validated.pop("boss_id", "") or "").strip()
# form-data 可能把数字传成字符串
if account_id is not None:
try:
account_id = int(account_id)
except (TypeError, ValueError):
account_id = None
req = TaskCreate(**validated)
target_worker_id = req.worker_id or ""
account_name = req.account_name or ""
# 指定 id 时,直接从数据库解析 worker_id 和 account_name
if account_id:
try:
account = BossAccount.objects.get(pk=account_id)
except BossAccount.DoesNotExist:
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到 id={account_id} 的账号",
)
target_worker_id = account.worker_id
account_name = account.browser_name
req.worker_id = target_worker_id
req.account_name = account_name
# 指定 boss_id 时,按 BOSS 直聘用户 ID 查账号
elif boss_id:
account = BossAccount.objects.filter(boss_id=boss_id).first()
if not account:
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到 boss_id={boss_id} 的账号",
)
target_worker_id = account.worker_id
account_name = account.browser_name
req.worker_id = target_worker_id
req.account_name = account_name
if not target_worker_id and account_name:
target_worker_id = worker_manager.find_worker_by_account(account_name)
if not target_worker_id:
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到拥有浏览器 '{account_name}' 的在线 Worker",
)
req.worker_id = target_worker_id
if not target_worker_id:
return api_error(http_status.HTTP_400_BAD_REQUEST, "请指定 id、boss_id、worker_id 或 account_name")
if not worker_manager.is_online(target_worker_id):
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线")
req.worker_id = target_worker_id
task = task_dispatcher.create_task(req)
send_fn = worker_manager.get_send_fn(target_worker_id)
if not send_fn:
task.status = TaskStatus.FAILED
task.error = "Worker WebSocket 连接不存在"
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在")
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if not success:
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}")
worker_manager.set_current_task(target_worker_id, task.task_id)
# check_login 任务:关联账号的任务状态
if req.task_type == TaskType.CHECK_LOGIN and req.account_name:
try:
account = BossAccount.objects.filter(
browser_name=req.account_name, worker_id=target_worker_id,
).first()
if account:
account.current_task_id = task.task_id
account.current_task_status = task.status.value
account.save(update_fields=["current_task_id", "current_task_status"])
except Exception as e:
logger.warning("关联账号任务状态失败: %s", e)
return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED)
@api_view(["GET"])
def task_detail(request, task_id):
"""查询指定任务的状态和结果。"""
task = task_dispatcher.get_task(task_id)
if not task:
return api_error(http_status.HTTP_404_NOT_FOUND, f"任务 {task_id} 不存在")
return api_success(_task_to_dict(task))