65 lines
1.7 KiB
Python
65 lines
1.7 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
任务处理器基类。
|
||
所有具体任务(如 BOSS 招聘)都继承此类,实现 execute 方法。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from abc import ABC, abstractmethod
|
||
from typing import Any, Callable, Coroutine, Dict
|
||
|
||
|
||
class TaskCancelledError(Exception):
|
||
"""任务被取消时抛出的异常。"""
|
||
|
||
|
||
class BaseTaskHandler(ABC):
|
||
"""
|
||
任务处理器基类。
|
||
|
||
子类需实现:
|
||
execute(task_id, params, progress_callback) -> result
|
||
|
||
progress_callback 是一个异步函数:
|
||
await progress_callback(task_id, "当前进度描述")
|
||
"""
|
||
|
||
# 子类覆盖,声明处理哪个 task_type
|
||
task_type: str = ""
|
||
|
||
def __init__(self) -> None:
|
||
self.logger = logging.getLogger(f"worker.tasks.{self.task_type or self.__class__.__name__}")
|
||
|
||
@staticmethod
|
||
def is_cancel_requested(cancel_event) -> bool:
|
||
return bool(cancel_event and getattr(cancel_event, "is_set", None) and cancel_event.is_set())
|
||
|
||
@classmethod
|
||
def ensure_not_cancelled(cls, cancel_event) -> None:
|
||
if cls.is_cancel_requested(cancel_event):
|
||
raise TaskCancelledError("任务已取消")
|
||
|
||
@abstractmethod
|
||
async def execute(
|
||
self,
|
||
task_id: str,
|
||
params: Dict[str, Any],
|
||
progress_cb: Callable[[str, str], Coroutine],
|
||
) -> Any:
|
||
"""
|
||
执行任务。
|
||
|
||
Args:
|
||
task_id: 任务 ID
|
||
params: 任务参数
|
||
progress_cb: 进度上报回调 async def(task_id, progress_text)
|
||
|
||
Returns:
|
||
任务结果(可序列化为 JSON)
|
||
|
||
Raises:
|
||
Exception: 执行失败时抛出异常
|
||
"""
|
||
...
|