diff --git a/scripts/fill_boss_ids.py b/scripts/fill_boss_ids.py new file mode 100644 index 0000000..396e7d7 --- /dev/null +++ b/scripts/fill_boss_ids.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +一次性脚本:为缺少 boss_id 的账号触发检测登录,自动填充 boss_id。 +需确保服务已启动、对应 Worker 在线。 +用法: python scripts/fill_boss_ids.py [--base-url URL] [--user USER] [--password PASS] +""" +import argparse +import os +import sys + +# 项目根目录加入路径 +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import requests + +DEFAULT_BASE = os.getenv("BOSS_DP_API", "http://127.0.0.1:9000") +DEFAULT_USER = os.getenv("ADMIN_USERNAME", "admin") +DEFAULT_PASS = os.getenv("ADMIN_PASSWORD", "boss_dp_admin") + + +def main(): + parser = argparse.ArgumentParser(description="批量触发检测登录,填充 boss_id") + parser.add_argument("--base-url", default=DEFAULT_BASE, help="API 地址") + parser.add_argument("--user", default=DEFAULT_USER, help="管理员用户名") + parser.add_argument("--password", default=DEFAULT_PASS, help="管理员密码") + args = parser.parse_args() + + base = args.base_url.rstrip("/") + session = requests.Session() + + # 登录 + try: + r = session.post( + f"{base}/api/auth/login", + json={"username": args.user, "password": args.password}, + timeout=10, + ) + except requests.exceptions.ConnectionError as e: + print(f"连接失败,请检查服务是否启动、地址是否正确: {base}") + print(f" 错误: {e}") + return 1 + except requests.exceptions.Timeout: + print(f"请求超时: {base}") + return 1 + + if r.status_code != 200: + print(f"登录失败: HTTP {r.status_code}") + print(f" 响应: {r.text[:500] if r.text else '(无内容)'}") + if r.status_code == 502: + print(" 502 通常表示代理/网关无法连接后端,请确认服务已启动,或使用 --base-url 指定正确地址") + return 1 + print("登录成功") + + # 触发填充 + r = session.post(f"{base}/api/accounts/fill-boss-ids") + if r.status_code != 200: + print(f"请求失败: {r.status_code} {r.text}") + return 1 + + data = r.json() + if data.get("code") != 200: + print(f"接口错误: {data}") + return 1 + + d = data.get("data", {}) + print(d.get("message", "完成")) + print(f" 已触发: {d.get('submitted', 0)}, 跳过: {d.get('skipped', 0)}") + for e in d.get("errors", []): + print(f" - {e}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/fill_boss_ids_db.py b/scripts/fill_boss_ids_db.py new file mode 100644 index 0000000..01e79c5 --- /dev/null +++ b/scripts/fill_boss_ids_db.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +直接操作数据库,为缺少 boss_id 的账号填充。 +有 boss_username 的用其作为 boss_id(占位,真实 uid 需通过检测登录获取); +无 boss_username 的用 worker_id+browser_name 生成唯一标识。 +用法: python scripts/fill_boss_ids_db.py [--dry-run] +""" +import os +import sys + +# 项目根目录 +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, ROOT) +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") + +import django +django.setup() + +from django.db.models import Q + +from server.models import BossAccount + + +def main(): + dry_run = "--dry-run" in sys.argv + if dry_run: + print("【 dry-run 模式,不实际写入 】") + + qs = BossAccount.objects.filter(Q(boss_id="") | Q(boss_id__isnull=True)) + total = qs.count() + if total == 0: + print("没有需要填充的账号") + return 0 + + updated = 0 + for acc in qs: + if acc.boss_username: + new_id = str(acc.boss_username).strip() + else: + new_id = f"_{acc.worker_id}_{acc.browser_name}".replace(" ", "_")[:64] + + if not dry_run: + acc.boss_id = new_id + acc.save(update_fields=["boss_id"]) + updated += 1 + print(f" {'[dry] ' if dry_run else ''}id={acc.id} {acc.browser_name}@{acc.worker_id} -> boss_id={new_id}") + + print(f"共处理 {updated}/{total} 条") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/server/api/accounts.py b/server/api/accounts.py index 4bd4b63..2f364ac 100644 --- a/server/api/accounts.py +++ b/server/api/accounts.py @@ -3,18 +3,26 @@ BOSS 账号 API(需要登录): - POST /api/accounts -> 添加账号(绑定环境名称到电脑) - GET /api/accounts -> 查询所有账号(含电脑名称、在线、任务状态) +- POST /api/accounts/fill-boss-ids -> 批量填充缺失的 boss_id(触发 check_login) - GET /api/accounts/{id} -> 查询单个账号详情 - DELETE /api/accounts/{id} -> 删除账号 检测登录等操作统一通过 POST /api/tasks 提交,task_type 传 check_login 即可。 """ +import logging + +from asgiref.sync import async_to_sync from rest_framework import status from rest_framework.decorators import api_view +from common.protocol import TaskType from server.core.response import api_success, api_error -from server.models import BossAccount +from server.models import BossAccount, TaskCreate from server.serializers import BossAccountSerializer, AccountBindSerializer from server.core.worker_manager import worker_manager +from server.core.task_dispatcher import task_dispatcher + +logger = logging.getLogger("server.api.accounts") # ────────────────────────── 内部工具 ────────────────────────── @@ -49,7 +57,7 @@ def account_list(request): wid = ser.validated_data["worker_id"] bname = ser.validated_data["browser_name"] - account, _ = BossAccount.objects.get_or_create( + account, created = BossAccount.objects.get_or_create( worker_id=wid, browser_name=bname, defaults={ @@ -58,9 +66,71 @@ def account_list(request): "is_logged_in": False, }, ) + # 绑定后自动触发检测登录,填充 boss_id + if (created or not account.boss_id) and worker_manager.is_online(wid): + send_fn = worker_manager.get_send_fn(wid) + if send_fn: + req = TaskCreate(task_type=TaskType.CHECK_LOGIN, worker_id=wid, account_name=bname, params={}) + task = task_dispatcher.create_task(req) + if async_to_sync(task_dispatcher.dispatch)(task, send_fn): + worker_manager.set_current_task(wid, task.task_id) + account.current_task_id = task.task_id + account.current_task_status = task.status.value + account.save(update_fields=["current_task_id", "current_task_status"]) + logger.info("绑定账号后自动触发 check_login: %s@%s", bname, wid) return api_success(_enrich(account), http_status=status.HTTP_201_CREATED) +@api_view(["POST"]) +def fill_boss_ids(request): + """ + 为缺少 boss_id 的账号批量触发检测登录任务。 + 需确保对应 Worker 在线,任务执行完成后会自动更新 boss_id。 + """ + from django.db.models import Q + accounts = BossAccount.objects.filter(Q(boss_id="") | Q(boss_id__isnull=True)).exclude(browser_name="") + submitted = 0 + skipped = 0 + errors = [] + + for acc in accounts: + wid = acc.worker_id + bname = acc.browser_name + if not worker_manager.is_online(wid): + skipped += 1 + errors.append(f"{bname}@{wid}: Worker 不在线") + continue + send_fn = worker_manager.get_send_fn(wid) + if not send_fn: + skipped += 1 + errors.append(f"{bname}@{wid}: Worker 连接不可用") + continue + req = TaskCreate( + task_type=TaskType.CHECK_LOGIN, + worker_id=wid, + account_name=bname, + params={}, + ) + task = task_dispatcher.create_task(req) + success = async_to_sync(task_dispatcher.dispatch)(task, send_fn) + if success: + worker_manager.set_current_task(wid, task.task_id) + acc.current_task_id = task.task_id + acc.current_task_status = task.status.value + acc.save(update_fields=["current_task_id", "current_task_status"]) + submitted += 1 + else: + skipped += 1 + errors.append(f"{bname}@{wid}: 派发失败") + + return api_success({ + "submitted": submitted, + "skipped": skipped, + "errors": errors[:20], + "message": f"已为 {submitted} 个账号触发检测登录,{skipped} 个跳过", + }, http_status=status.HTTP_200_OK) + + @api_view(["GET"]) def account_list_by_worker(request, worker_id): """按 worker_id 查询账号列表,兼容 /api/accounts/worker-1 形式。""" diff --git a/server/urls.py b/server/urls.py index f138351..ac27557 100644 --- a/server/urls.py +++ b/server/urls.py @@ -26,6 +26,7 @@ urlpatterns = [ # ─── 账号 ─── path("api/accounts", accounts.account_list), + path("api/accounts/fill-boss-ids", accounts.fill_boss_ids), path("api/accounts/", accounts.account_detail), path("api/accounts/", accounts.account_list_by_worker),