# -*- coding: utf-8 -*- """ 定时任务管理 API。 """ import logging from datetime import datetime, time, timedelta from django.utils import timezone from rest_framework import status as http_status from rest_framework.decorators import api_view from common.protocol import TaskStatus, TaskType from server.core.response import api_success, api_error from server.core.worker_manager import worker_manager from server.core.task_dispatcher import task_dispatcher from server.models import BossAccount, ScheduledTask logger = logging.getLogger("server.api.scheduled_task") def _calculate_next_run(scheduled_task: ScheduledTask) -> datetime: """计算下次执行时间。""" now = datetime.now() execute_t = scheduled_task.execute_time if scheduled_task.repeat_type == "once": # 仅一次:返回今天或明天对应时间 next_dt = datetime.combine(now.date(), execute_t) if next_dt <= now: next_dt += timedelta(days=1) return next_dt elif scheduled_task.repeat_type == "daily": # 每天 next_dt = datetime.combine(now.date(), execute_t) if next_dt <= now: next_dt += timedelta(days=1) return next_dt elif scheduled_task.repeat_type == "weekdays": # 工作日(周一到周五) next_dt = datetime.combine(now.date(), execute_t) if next_dt <= now: next_dt += timedelta(days=1) # 跳过周末 while next_dt.weekday() >= 5: next_dt += timedelta(days=1) return next_dt elif scheduled_task.repeat_type == "weekly": # 每周(暂定周一) days_ahead = 0 - now.weekday() if days_ahead <= 0: days_ahead += 7 next_dt = datetime.combine(now.date() + timedelta(days=days_ahead), execute_t) return next_dt elif scheduled_task.repeat_type == "monthly": # 每月(暂定1号) next_dt = datetime.combine(now.date().replace(day=1), execute_t) if next_dt <= now: # 下个月 if now.month == 12: next_dt = next_dt.replace(year=now.year + 1, month=1) else: next_dt = next_dt.replace(month=now.month + 1) return next_dt # 默认 return datetime.combine(now.date(), execute_t) def _map_task_type(scheduled_type: str) -> TaskType: """将前端任务类型映射到后端任务类型。""" mapping = { "greeting": TaskType.BOSS_RECRUIT, "rechat": TaskType.BOSS_REPLY, "collection": TaskType.BOSS_COLLECT, } return mapping.get(scheduled_type, TaskType.BOSS_RECRUIT) @api_view(["GET", "POST"]) def scheduled_task_list(request): """ GET -> 查询定时任务列表 POST -> 创建定时任务 """ if request.method == "GET": account_id = request.query_params.get("account_id") qs = ScheduledTask.objects.all() if account_id: qs = qs.filter(account_id=account_id) qs = qs.order_by("-created_at") tasks = [] for t in qs: tasks.append({ "id": t.id, "name": t.name, "task_type": t.task_type, "account_id": t.account_id, "account_name": t.account_name, "execute_time": t.execute_time.strftime("%H:%M") if t.execute_time else "", "repeat_type": t.repeat_type, "params": t.params or {}, "is_active": t.is_active, "last_run_at": t.last_run_at.strftime("%Y-%m-%d %H:%M:%S") if t.last_run_at else None, "next_run_at": t.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if t.next_run_at else None, "created_at": t.created_at.strftime("%Y-%m-%d %H:%M:%S"), }) return api_success(tasks) # POST: 创建定时任务 data = request.data name = data.get("name") task_type = data.get("task_type") account_id = data.get("account_id") execute_time_str = data.get("execute_time", "09:00") repeat_type = data.get("repeat_type", "once") params = data.get("params", {}) is_active = data.get("is_active", True) if not name: return api_error(http_status.HTTP_400_BAD_REQUEST, "请填写任务名称") if not task_type: return api_error(http_status.HTTP_400_BAD_REQUEST, "请选择任务类型") if not account_id: return api_error(http_status.HTTP_400_BAD_REQUEST, "请选择执行账号") # 验证账号存在 try: account = BossAccount.objects.get(pk=account_id) except BossAccount.DoesNotExist: return api_error(http_status.HTTP_404_NOT_FOUND, "账号不存在") # 解析执行时间 try: execute_time_obj = datetime.strptime(execute_time_str, "%H:%M").time() except ValueError: return api_error(http_status.HTTP_400_BAD_REQUEST, "时间格式错误,请使用 HH:MM") # 创建定时任务 scheduled_task = ScheduledTask.objects.create( name=name, task_type=task_type, account_id=account_id, account_name=account.browser_name, execute_time=execute_time_obj, repeat_type=repeat_type, params=params, is_active=is_active, ) # 计算下次执行时间 scheduled_task.next_run_at = _calculate_next_run(scheduled_task) scheduled_task.save(update_fields=["next_run_at"]) return api_success({ "id": scheduled_task.id, "name": scheduled_task.name, "task_type": scheduled_task.task_type, "next_run_at": scheduled_task.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.next_run_at else None, }, http_status=http_status.HTTP_201_CREATED) @api_view(["GET", "PUT", "DELETE"]) def scheduled_task_detail(request, task_id: int): """ GET -> 查询定时任务详情 PUT -> 更新定时任务 DELETE -> 删除定时任务 """ try: scheduled_task = ScheduledTask.objects.get(pk=task_id) except ScheduledTask.DoesNotExist: return api_error(http_status.HTTP_404_NOT_FOUND, "定时任务不存在") if request.method == "GET": return api_success({ "id": scheduled_task.id, "name": scheduled_task.name, "task_type": scheduled_task.task_type, "account_id": scheduled_task.account_id, "account_name": scheduled_task.account_name, "execute_time": scheduled_task.execute_time.strftime("%H:%M") if scheduled_task.execute_time else "", "repeat_type": scheduled_task.repeat_type, "params": scheduled_task.params or {}, "is_active": scheduled_task.is_active, "last_run_at": scheduled_task.last_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.last_run_at else None, "next_run_at": scheduled_task.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.next_run_at else None, "created_at": scheduled_task.created_at.strftime("%Y-%m-%d %H:%M:%S"), }) if request.method == "PUT": data = request.data if "name" in data: scheduled_task.name = data["name"] if "task_type" in data: scheduled_task.task_type = data["task_type"] if "execute_time" in data: try: scheduled_task.execute_time = datetime.strptime(data["execute_time"], "%H:%M").time() except ValueError: return api_error(http_status.HTTP_400_BAD_REQUEST, "时间格式错误") if "repeat_type" in data: scheduled_task.repeat_type = data["repeat_type"] if "params" in data: scheduled_task.params = data["params"] if "is_active" in data: scheduled_task.is_active = data["is_active"] # 重新计算下次执行时间 scheduled_task.next_run_at = _calculate_next_run(scheduled_task) scheduled_task.save() return api_success({ "id": scheduled_task.id, "next_run_at": scheduled_task.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.next_run_at else None, }) # DELETE scheduled_task.delete() return api_success({"msg": "删除成功"}) @api_view(["POST"]) def run_scheduled_task_now(request, task_id: int): """ 立即执行定时任务(手动触发)。 """ try: scheduled_task = ScheduledTask.objects.get(pk=task_id) except ScheduledTask.DoesNotExist: return api_error(http_status.HTTP_404_NOT_FOUND, "定时任务不存在") # 验证账号 try: account = BossAccount.objects.get(pk=scheduled_task.account_id) except BossAccount.DoesNotExist: return api_error(http_status.HTTP_404_NOT_FOUND, "关联的账号不存在") # 验证 Worker 在线 worker_id = account.worker_id if not worker_id or not worker_manager.is_online(worker_id): return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"账号 {account.browser_name} 对应的 Worker 不在线") # 创建即时任务 from server.models import TaskCreate task_type = _map_task_type(scheduled_task.task_type) req = TaskCreate( task_type=task_type, worker_id=worker_id, account_name=account.browser_name, params=scheduled_task.params or {}, ) try: task = task_dispatcher.create_task(req) except ValueError as e: return api_error(http_status.HTTP_409_CONFLICT, str(e)) # 派发任务 send_fn = worker_manager.get_send_fn(worker_id) if not send_fn: return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在") from asgiref.sync import async_to_sync success = async_to_sync(task_dispatcher.dispatch)(task, send_fn) if not success: return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}") # 更新最后执行时间 scheduled_task.last_run_at = timezone.now() scheduled_task.next_run_at = _calculate_next_run(scheduled_task) scheduled_task.save(update_fields=["last_run_at", "next_run_at"]) return api_success({ "task_id": task.task_id, "task_type": task.task_type, "status": task.status, }) @api_view(["POST"]) def toggle_scheduled_task(request, task_id: int): """ 启用/停用定时任务。 """ try: scheduled_task = ScheduledTask.objects.get(pk=task_id) except ScheduledTask.DoesNotExist: return api_error(http_status.HTTP_404_NOT_FOUND, "定时任务不存在") scheduled_task.is_active = not scheduled_task.is_active scheduled_task.save(update_fields=["is_active"]) return api_success({ "id": scheduled_task.id, "is_active": scheduled_task.is_active, })