# -*- coding: utf-8 -*- """ 任务提交与查询 API(需要登录)。 统一任务入口:前端通过 task_type 指定任务类型(如 check_login、boss_recruit)。 """ import json import logging from asgiref.sync import async_to_sync from rest_framework import status as http_status from rest_framework.decorators import api_view from common.protocol import TaskStatus, TaskType from server.core.response import api_success, api_error from server.models import BossAccount, TaskCreate from server.serializers import TaskCreateSerializer, TaskOutSerializer from server.core.worker_manager import worker_manager from server.core.task_dispatcher import task_dispatcher logger = logging.getLogger("server.api.tasks") def _task_to_dict(t) -> dict: """将 TaskInfo 转为可序列化字典。""" return { "task_id": t.task_id, "task_type": t.task_type.value if hasattr(t.task_type, "value") else str(t.task_type), "status": t.status.value if hasattr(t.status, "value") else str(t.status), "worker_id": t.worker_id, "account_name": t.account_name, "params": t.params, "progress": t.progress, "result": t.result, "error": t.error, "created_at": t.created_at, "updated_at": t.updated_at, } @api_view(["GET", "POST"]) def task_list(request): """ GET -> 查询任务列表,支持 ?worker_id= / ?status= / ?limit= 过滤 POST -> 提交新任务(支持 JSON 和 form-data) """ if request.method == "GET": wid = request.query_params.get("worker_id") st = request.query_params.get("status") limit = int(request.query_params.get("limit", 50)) task_status = TaskStatus(st) if st else None tasks = task_dispatcher.list_tasks(worker_id=wid, status=task_status, limit=limit) return api_success([_task_to_dict(t) for t in tasks]) # POST: 提交新任务 data = request.data.copy() # form-data 中 params 可能是 JSON 字符串 params_raw = data.get("params", {}) if isinstance(params_raw, str): try: data["params"] = json.loads(params_raw) if params_raw.strip() else {} except (json.JSONDecodeError, ValueError): data["params"] = {} ser = TaskCreateSerializer(data=data) ser.is_valid(raise_exception=True) validated = ser.validated_data.copy() account_id = validated.pop("id", None) req = TaskCreate(**validated) target_worker_id = req.worker_id or "" account_name = req.account_name or "" # 指定 id 时,直接从数据库解析 worker_id 和 account_name if account_id: try: account = BossAccount.objects.get(pk=account_id) except BossAccount.DoesNotExist: return api_error( http_status.HTTP_404_NOT_FOUND, f"未找到 id={account_id} 的账号", ) target_worker_id = account.worker_id account_name = account.browser_name req.worker_id = target_worker_id req.account_name = account_name if not target_worker_id and account_name: target_worker_id = worker_manager.find_worker_by_account(account_name) if not target_worker_id: return api_error( http_status.HTTP_404_NOT_FOUND, f"未找到拥有浏览器 '{account_name}' 的在线 Worker", ) req.worker_id = target_worker_id if not target_worker_id: return api_error(http_status.HTTP_400_BAD_REQUEST, "请指定 id、worker_id 或 account_name") if not worker_manager.is_online(target_worker_id): return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"Worker {target_worker_id} 不在线") req.worker_id = target_worker_id task = task_dispatcher.create_task(req) send_fn = worker_manager.get_send_fn(target_worker_id) if not send_fn: task.status = TaskStatus.FAILED task.error = "Worker WebSocket 连接不存在" return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在") success = async_to_sync(task_dispatcher.dispatch)(task, send_fn) if not success: return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}") worker_manager.set_current_task(target_worker_id, task.task_id) # check_login 任务:关联账号的任务状态 if req.task_type == TaskType.CHECK_LOGIN and req.account_name: try: account = BossAccount.objects.filter( browser_name=req.account_name, worker_id=target_worker_id, ).first() if account: account.current_task_id = task.task_id account.current_task_status = task.status.value account.save(update_fields=["current_task_id", "current_task_status"]) except Exception as e: logger.warning("关联账号任务状态失败: %s", e) return api_success(_task_to_dict(task), http_status=http_status.HTTP_201_CREATED) @api_view(["GET"]) def task_detail(request, task_id): """查询指定任务的状态和结果。""" task = task_dispatcher.get_task(task_id) if not task: return api_error(http_status.HTTP_404_NOT_FOUND, f"任务 {task_id} 不存在") return api_success(_task_to_dict(task))