Files
boss_dp/server/api/tasks.py

93 lines
3.0 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
2026-02-12 18:22:02 +08:00
任务提交与查询 API需要登录
"""
from __future__ import annotations
from typing import List, Optional
2026-02-12 18:22:02 +08:00
from fastapi import APIRouter, Depends, HTTPException
from common.protocol import TaskStatus
from server.models import TaskCreate, TaskOut
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
2026-02-12 18:22:02 +08:00
from server.api.deps import require_auth
2026-02-12 18:22:02 +08:00
router = APIRouter(prefix="/api/tasks", tags=["tasks"], dependencies=[Depends(require_auth)])
@router.post("", response_model=TaskOut, status_code=201)
async def create_task(req: TaskCreate):
"""
提交一个新任务
2026-02-12 18:22:02 +08:00
路由规则worker_id > account_name
"""
target_worker_id = req.worker_id
if not target_worker_id and req.account_name:
target_worker_id = worker_manager.find_worker_by_account(req.account_name)
if not target_worker_id:
raise HTTPException(
status_code=404,
detail=f"未找到拥有浏览器 '{req.account_name}' 的在线 Worker",
)
if not target_worker_id:
2026-02-12 18:22:02 +08:00
raise HTTPException(status_code=400, detail="请指定 worker_id 或 account_name")
if not worker_manager.is_online(target_worker_id):
2026-02-12 18:22:02 +08:00
raise HTTPException(status_code=503, detail=f"Worker {target_worker_id} 不在线")
req.worker_id = target_worker_id
task = task_dispatcher.create_task(req)
ws = worker_manager.get_ws(target_worker_id)
if not ws:
task.status = TaskStatus.FAILED
task.error = "Worker WebSocket 连接不存在"
raise HTTPException(status_code=503, detail="Worker WebSocket 连接不存在")
success = await task_dispatcher.dispatch(task, ws.send_json)
if not success:
raise HTTPException(status_code=503, detail=f"任务派发失败: {task.error}")
worker_manager.set_current_task(target_worker_id, task.task_id)
return _to_out(task)
@router.get("", response_model=List[TaskOut])
async def list_tasks(
worker_id: Optional[str] = None,
status: Optional[TaskStatus] = None,
limit: int = 50,
):
"""查询任务列表,支持按 worker_id / status 过滤。"""
tasks = task_dispatcher.list_tasks(worker_id=worker_id, status=status, limit=limit)
return [_to_out(t) for t in tasks]
@router.get("/{task_id}", response_model=TaskOut)
async def get_task(task_id: str):
"""查询指定任务的状态和结果。"""
task = task_dispatcher.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"任务 {task_id} 不存在")
return _to_out(task)
def _to_out(t) -> TaskOut:
return TaskOut(
task_id=t.task_id,
task_type=t.task_type,
status=t.status,
worker_id=t.worker_id,
account_name=t.account_name,
params=t.params,
progress=t.progress,
result=t.result,
error=t.error,
created_at=t.created_at,
updated_at=t.updated_at,
)