Files
boss_dp/server/api/scheduled_task.py
2026-03-07 23:51:18 +08:00

304 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
定时任务管理 API。
"""
import logging
from datetime import datetime, time, timedelta
from django.utils import timezone
from rest_framework import status as http_status
from rest_framework.decorators import api_view
from common.protocol import TaskStatus, TaskType
from server.core.response import api_success, api_error
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
from server.models import BossAccount, ScheduledTask
logger = logging.getLogger("server.api.scheduled_task")
def _calculate_next_run(scheduled_task: ScheduledTask) -> datetime:
"""计算下次执行时间。"""
now = datetime.now()
execute_t = scheduled_task.execute_time
if scheduled_task.repeat_type == "once":
# 仅一次:返回今天或明天对应时间
next_dt = datetime.combine(now.date(), execute_t)
if next_dt <= now:
next_dt += timedelta(days=1)
return next_dt
elif scheduled_task.repeat_type == "daily":
# 每天
next_dt = datetime.combine(now.date(), execute_t)
if next_dt <= now:
next_dt += timedelta(days=1)
return next_dt
elif scheduled_task.repeat_type == "weekdays":
# 工作日(周一到周五)
next_dt = datetime.combine(now.date(), execute_t)
if next_dt <= now:
next_dt += timedelta(days=1)
# 跳过周末
while next_dt.weekday() >= 5:
next_dt += timedelta(days=1)
return next_dt
elif scheduled_task.repeat_type == "weekly":
# 每周(暂定周一)
days_ahead = 0 - now.weekday()
if days_ahead <= 0:
days_ahead += 7
next_dt = datetime.combine(now.date() + timedelta(days=days_ahead), execute_t)
return next_dt
elif scheduled_task.repeat_type == "monthly":
# 每月暂定1号
next_dt = datetime.combine(now.date().replace(day=1), execute_t)
if next_dt <= now:
# 下个月
if now.month == 12:
next_dt = next_dt.replace(year=now.year + 1, month=1)
else:
next_dt = next_dt.replace(month=now.month + 1)
return next_dt
# 默认
return datetime.combine(now.date(), execute_t)
def _map_task_type(scheduled_type: str) -> TaskType:
"""将前端任务类型映射到后端任务类型。"""
mapping = {
"greeting": TaskType.BOSS_RECRUIT,
"rechat": TaskType.BOSS_REPLY,
"collection": TaskType.BOSS_COLLECT,
}
return mapping.get(scheduled_type, TaskType.BOSS_RECRUIT)
@api_view(["GET", "POST"])
def scheduled_task_list(request):
"""
GET -> 查询定时任务列表
POST -> 创建定时任务
"""
if request.method == "GET":
account_id = request.query_params.get("account_id")
qs = ScheduledTask.objects.all()
if account_id:
qs = qs.filter(account_id=account_id)
qs = qs.order_by("-created_at")
tasks = []
for t in qs:
tasks.append({
"id": t.id,
"name": t.name,
"task_type": t.task_type,
"account_id": t.account_id,
"account_name": t.account_name,
"execute_time": t.execute_time.strftime("%H:%M") if t.execute_time else "",
"repeat_type": t.repeat_type,
"params": t.params or {},
"is_active": t.is_active,
"last_run_at": t.last_run_at.strftime("%Y-%m-%d %H:%M:%S") if t.last_run_at else None,
"next_run_at": t.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if t.next_run_at else None,
"created_at": t.created_at.strftime("%Y-%m-%d %H:%M:%S"),
})
return api_success(tasks)
# POST: 创建定时任务
data = request.data
name = data.get("name")
task_type = data.get("task_type")
account_id = data.get("account_id")
execute_time_str = data.get("execute_time", "09:00")
repeat_type = data.get("repeat_type", "once")
params = data.get("params", {})
is_active = data.get("is_active", True)
if not name:
return api_error(http_status.HTTP_400_BAD_REQUEST, "请填写任务名称")
if not task_type:
return api_error(http_status.HTTP_400_BAD_REQUEST, "请选择任务类型")
if not account_id:
return api_error(http_status.HTTP_400_BAD_REQUEST, "请选择执行账号")
# 验证账号存在
try:
account = BossAccount.objects.get(pk=account_id)
except BossAccount.DoesNotExist:
return api_error(http_status.HTTP_404_NOT_FOUND, "账号不存在")
# 解析执行时间
try:
execute_time_obj = datetime.strptime(execute_time_str, "%H:%M").time()
except ValueError:
return api_error(http_status.HTTP_400_BAD_REQUEST, "时间格式错误,请使用 HH:MM")
# 创建定时任务
scheduled_task = ScheduledTask.objects.create(
name=name,
task_type=task_type,
account_id=account_id,
account_name=account.browser_name,
execute_time=execute_time_obj,
repeat_type=repeat_type,
params=params,
is_active=is_active,
)
# 计算下次执行时间
scheduled_task.next_run_at = _calculate_next_run(scheduled_task)
scheduled_task.save(update_fields=["next_run_at"])
return api_success({
"id": scheduled_task.id,
"name": scheduled_task.name,
"task_type": scheduled_task.task_type,
"next_run_at": scheduled_task.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.next_run_at else None,
}, http_status=http_status.HTTP_201_CREATED)
@api_view(["GET", "PUT", "DELETE"])
def scheduled_task_detail(request, task_id: int):
"""
GET -> 查询定时任务详情
PUT -> 更新定时任务
DELETE -> 删除定时任务
"""
try:
scheduled_task = ScheduledTask.objects.get(pk=task_id)
except ScheduledTask.DoesNotExist:
return api_error(http_status.HTTP_404_NOT_FOUND, "定时任务不存在")
if request.method == "GET":
return api_success({
"id": scheduled_task.id,
"name": scheduled_task.name,
"task_type": scheduled_task.task_type,
"account_id": scheduled_task.account_id,
"account_name": scheduled_task.account_name,
"execute_time": scheduled_task.execute_time.strftime("%H:%M") if scheduled_task.execute_time else "",
"repeat_type": scheduled_task.repeat_type,
"params": scheduled_task.params or {},
"is_active": scheduled_task.is_active,
"last_run_at": scheduled_task.last_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.last_run_at else None,
"next_run_at": scheduled_task.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.next_run_at else None,
"created_at": scheduled_task.created_at.strftime("%Y-%m-%d %H:%M:%S"),
})
if request.method == "PUT":
data = request.data
if "name" in data:
scheduled_task.name = data["name"]
if "task_type" in data:
scheduled_task.task_type = data["task_type"]
if "execute_time" in data:
try:
scheduled_task.execute_time = datetime.strptime(data["execute_time"], "%H:%M").time()
except ValueError:
return api_error(http_status.HTTP_400_BAD_REQUEST, "时间格式错误")
if "repeat_type" in data:
scheduled_task.repeat_type = data["repeat_type"]
if "params" in data:
scheduled_task.params = data["params"]
if "is_active" in data:
scheduled_task.is_active = data["is_active"]
# 重新计算下次执行时间
scheduled_task.next_run_at = _calculate_next_run(scheduled_task)
scheduled_task.save()
return api_success({
"id": scheduled_task.id,
"next_run_at": scheduled_task.next_run_at.strftime("%Y-%m-%d %H:%M:%S") if scheduled_task.next_run_at else None,
})
# DELETE
scheduled_task.delete()
return api_success({"msg": "删除成功"})
@api_view(["POST"])
def run_scheduled_task_now(request, task_id: int):
"""
立即执行定时任务(手动触发)。
"""
try:
scheduled_task = ScheduledTask.objects.get(pk=task_id)
except ScheduledTask.DoesNotExist:
return api_error(http_status.HTTP_404_NOT_FOUND, "定时任务不存在")
# 验证账号
try:
account = BossAccount.objects.get(pk=scheduled_task.account_id)
except BossAccount.DoesNotExist:
return api_error(http_status.HTTP_404_NOT_FOUND, "关联的账号不存在")
# 验证 Worker 在线
worker_id = account.worker_id
if not worker_id or not worker_manager.is_online(worker_id):
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"账号 {account.browser_name} 对应的 Worker 不在线")
# 创建即时任务
from server.models import TaskCreate
task_type = _map_task_type(scheduled_task.task_type)
req = TaskCreate(
task_type=task_type,
worker_id=worker_id,
account_name=account.browser_name,
params=scheduled_task.params or {},
)
try:
task = task_dispatcher.create_task(req)
except ValueError as e:
return api_error(http_status.HTTP_409_CONFLICT, str(e))
# 派发任务
send_fn = worker_manager.get_send_fn(worker_id)
if not send_fn:
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, "Worker WebSocket 连接不存在")
from asgiref.sync import async_to_sync
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if not success:
return api_error(http_status.HTTP_503_SERVICE_UNAVAILABLE, f"任务派发失败: {task.error}")
# 更新最后执行时间
scheduled_task.last_run_at = timezone.now()
scheduled_task.next_run_at = _calculate_next_run(scheduled_task)
scheduled_task.save(update_fields=["last_run_at", "next_run_at"])
return api_success({
"task_id": task.task_id,
"task_type": task.task_type,
"status": task.status,
})
@api_view(["POST"])
def toggle_scheduled_task(request, task_id: int):
"""
启用/停用定时任务。
"""
try:
scheduled_task = ScheduledTask.objects.get(pk=task_id)
except ScheduledTask.DoesNotExist:
return api_error(http_status.HTTP_404_NOT_FOUND, "定时任务不存在")
scheduled_task.is_active = not scheduled_task.is_active
scheduled_task.save(update_fields=["is_active"])
return api_success({
"id": scheduled_task.id,
"is_active": scheduled_task.is_active,
})