Files
boss_dp/server/api/tasks.py

122 lines
4.6 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
2026-02-12 18:22:02 +08:00
任务提交与查询 API需要登录
2026-02-14 16:49:44 +08:00
统一任务入口前端通过 task_type 指定任务类型 check_loginboss_recruit
"""
2026-02-14 14:33:38 +08:00
import json
2026-02-14 16:49:44 +08:00
import logging
2026-02-14 14:33:38 +08:00
2026-02-14 16:49:44 +08:00
from asgiref.sync import async_to_sync
from rest_framework import status as http_status
from rest_framework.decorators import api_view
2026-02-14 16:49:44 +08:00
from common.protocol import TaskStatus, TaskType
2026-02-26 01:27:35 +08:00
from server.core.response import api_success, api_error
2026-02-14 16:49:44 +08:00
from server.models import BossAccount, TaskCreate
from server.serializers import TaskCreateSerializer, TaskOutSerializer
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
2026-02-14 16:49:44 +08:00
logger = logging.getLogger("server.api.tasks")
def _task_to_dict(t) -> dict:
"""将 TaskInfo 转为可序列化字典。"""
return {
"task_id": t.task_id,
"task_type": t.task_type.value if hasattr(t.task_type, "value") else str(t.task_type),
"status": t.status.value if hasattr(t.status, "value") else str(t.status),
"worker_id": t.worker_id,
"account_name": t.account_name,
"params": t.params,
"progress": t.progress,
"result": t.result,
"error": t.error,
"created_at": t.created_at,
"updated_at": t.updated_at,
}
2026-02-14 16:49:44 +08:00
@api_view(["GET", "POST"])
def task_list(request):
"""
2026-02-14 16:49:44 +08:00
GET -> 查询任务列表支持 ?worker_id= / ?status= / ?limit= 过滤
POST -> 提交新任务支持 JSON form-data
"""
2026-02-14 16:49:44 +08:00
if request.method == "GET":
wid = request.query_params.get("worker_id")
st = request.query_params.get("status")
limit = int(request.query_params.get("limit", 50))
task_status = TaskStatus(st) if st else None
tasks = task_dispatcher.list_tasks(worker_id=wid, status=task_status, limit=limit)
2026-02-26 01:27:35 +08:00
return api_success([_task_to_dict(t) for t in tasks])
2026-02-14 16:49:44 +08:00
# POST: 提交新任务
data = request.data.copy()
# form-data 中 params 可能是 JSON 字符串
params_raw = data.get("params", {})
2026-02-14 14:33:38 +08:00
if isinstance(params_raw, str):
try:
2026-02-14 16:49:44 +08:00
data["params"] = json.loads(params_raw) if params_raw.strip() else {}
2026-02-14 14:33:38 +08:00
except (json.JSONDecodeError, ValueError):
2026-02-14 16:49:44 +08:00
data["params"] = {}
ser = TaskCreateSerializer(data=data)
ser.is_valid(raise_exception=True)
req = TaskCreate(**ser.validated_data)
target_worker_id = req.worker_id or ""
if not target_worker_id and req.account_name:
target_worker_id = worker_manager.find_worker_by_account(req.account_name)
if not target_worker_id:
2026-02-26 01:27:35 +08:00
return api_error(
http_status.HTTP_404_NOT_FOUND,
f"未找到拥有浏览器 '{req.account_name}' 的在线 Worker",
)
if not target_worker_id:
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_400_BAD_REQUEST, "请指定 worker_id 或 account_name")
if not worker_manager.is_online(target_worker_id):
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线")
req.worker_id = target_worker_id
task = task_dispatcher.create_task(req)
2026-02-14 16:49:44 +08:00
send_fn = worker_manager.get_send_fn(target_worker_id)
if not send_fn:
task.status = TaskStatus.FAILED
task.error = "Worker WebSocket 连接不存在"
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在")
2026-02-14 16:49:44 +08:00
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if not success:
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}")
worker_manager.set_current_task(target_worker_id, task.task_id)
2026-02-14 16:49:44 +08:00
# check_login 任务:关联账号的任务状态
if req.task_type == TaskType.CHECK_LOGIN and req.account_name:
try:
account = BossAccount.objects.filter(
browser_name=req.account_name, worker_id=target_worker_id,
).first()
if account:
account.current_task_id = task.task_id
account.current_task_status = task.status.value
account.save(update_fields=["current_task_id", "current_task_status"])
except Exception as e:
logger.warning("关联账号任务状态失败: %s", e)
2026-02-26 01:27:35 +08:00
return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED)
2026-02-14 16:49:44 +08:00
@api_view(["GET"])
def task_detail(request, task_id):
"""查询指定任务的状态和结果。"""
task = task_dispatcher.get_task(task_id)
if not task:
2026-02-26 01:27:35 +08:00
return api_error(http_status.HTTP_404_NOT_FOUND, f"任务 {task_id} 不存在")
return api_success(_task_to_dict(task))