Files
boss_dp/server/api/accounts.py
2026-03-01 14:02:32 +08:00

159 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
BOSS 账号 API需要登录
- POST /api/accounts -> 添加账号(绑定环境名称到电脑)
- GET /api/accounts -> 查询所有账号(含电脑名称、在线、任务状态)
- POST /api/accounts/fill-boss-ids -> 批量填充缺失的 boss_id触发 check_login
- GET /api/accounts/{id} -> 查询单个账号详情
- DELETE /api/accounts/{id} -> 删除账号
检测登录等操作统一通过 POST /api/tasks 提交task_type 传 check_login 即可。
"""
import logging
from asgiref.sync import async_to_sync
from rest_framework import status
from rest_framework.decorators import api_view
from common.protocol import TaskType
from server.core.response import api_success, api_error
from server.models import BossAccount, TaskCreate
from server.serializers import BossAccountSerializer, AccountBindSerializer
from server.core.worker_manager import worker_manager
from server.core.task_dispatcher import task_dispatcher
logger = logging.getLogger("server.api.accounts")
# ────────────────────────── 内部工具 ──────────────────────────
def _enrich(account: BossAccount) -> dict:
"""为账号实例补充电脑名称和电脑在线状态。"""
data = BossAccountSerializer(account).data
w = worker_manager.get_worker(account.worker_id)
data["worker_name"] = w.worker_name if w else ""
data["worker_online"] = w.online if w else False
return data
# ────────────────────────── 接口 ──────────────────────────
@api_view(["GET", "POST"])
def account_list(request):
"""
GET -> 查询账号列表(可选 ?worker_id= 过滤)
POST -> 添加账号(绑定环境名称到电脑)
"""
if request.method == "GET":
worker_id = request.query_params.get("worker_id")
qs = BossAccount.objects.all().order_by("-updated_at")
if worker_id:
qs = qs.filter(worker_id=worker_id)
return api_success([_enrich(a) for a in qs])
# POST: 添加账号
ser = AccountBindSerializer(data=request.data)
ser.is_valid(raise_exception=True)
wid = ser.validated_data["worker_id"]
bname = ser.validated_data["browser_name"]
account, created = BossAccount.objects.get_or_create(
worker_id=wid,
browser_name=bname,
defaults={
"browser_id": f"name:{bname}",
"boss_username": "",
"is_logged_in": False,
},
)
# 绑定后自动触发检测登录,填充 boss_id
if (created or not account.boss_id) and worker_manager.is_online(wid):
send_fn = worker_manager.get_send_fn(wid)
if send_fn:
req = TaskCreate(task_type=TaskType.CHECK_LOGIN, worker_id=wid, account_name=bname, params={})
task = task_dispatcher.create_task(req)
if async_to_sync(task_dispatcher.dispatch)(task, send_fn):
worker_manager.set_current_task(wid, task.task_id)
account.current_task_id = task.task_id
account.current_task_status = TaskStatus.DISPATCHED.value
account.save(update_fields=["current_task_id", "current_task_status"])
logger.info("绑定账号后自动触发 check_login: %s@%s, task_id=%s", bname, wid, task.task_id)
return api_success(_enrich(account), http_status=status.HTTP_201_CREATED)
@api_view(["POST"])
def fill_boss_ids(request):
"""
为缺少 boss_id 的账号批量触发检测登录任务。
需确保对应 Worker 在线,任务执行完成后会自动更新 boss_id。
"""
from django.db.models import Q
accounts = BossAccount.objects.filter(Q(boss_id="") | Q(boss_id__isnull=True)).exclude(browser_name="")
submitted = 0
skipped = 0
errors = []
for acc in accounts:
wid = acc.worker_id
bname = acc.browser_name
if not worker_manager.is_online(wid):
skipped += 1
errors.append(f"{bname}@{wid}: Worker 不在线")
continue
send_fn = worker_manager.get_send_fn(wid)
if not send_fn:
skipped += 1
errors.append(f"{bname}@{wid}: Worker 连接不可用")
continue
req = TaskCreate(
task_type=TaskType.CHECK_LOGIN,
worker_id=wid,
account_name=bname,
params={},
)
task = task_dispatcher.create_task(req)
success = async_to_sync(task_dispatcher.dispatch)(task, send_fn)
if success:
worker_manager.set_current_task(wid, task.task_id)
acc.current_task_id = task.task_id
acc.current_task_status = TaskStatus.DISPATCHED.value
acc.save(update_fields=["current_task_id", "current_task_status"])
submitted += 1
logger.info("账号检查登录: %s@%s, task_id=%s", bname, wid, task.task_id)
else:
skipped += 1
errors.append(f"{bname}@{wid}: 派发失败")
return api_success({
"submitted": submitted,
"skipped": skipped,
"errors": errors[:20],
"message": f"已为 {submitted} 个账号触发检测登录,{skipped} 个跳过",
}, http_status=status.HTTP_200_OK)
@api_view(["GET"])
def account_list_by_worker(request, worker_id):
"""按 worker_id 查询账号列表,兼容 /api/accounts/worker-1 形式。"""
qs = BossAccount.objects.filter(worker_id=worker_id).order_by("-updated_at")
return api_success([_enrich(a) for a in qs])
@api_view(["GET", "DELETE"])
def account_detail(request, account_id):
"""
GET -> 查询单个账号详情
DELETE -> 删除账号
"""
try:
account = BossAccount.objects.get(pk=account_id)
except BossAccount.DoesNotExist:
return api_error(status.HTTP_404_NOT_FOUND, "账号不存在")
if request.method == "GET":
return api_success(_enrich(account))
# DELETE
account.delete()
return api_success(msg="账号已删除")