Files
boss_dp/scripts/fill_boss_ids.py
ddrwode 038f105ec5 haha
2026-02-27 14:11:12 +08:00

76 lines
2.4 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
一次性脚本:为缺少 boss_id 的账号触发检测登录,自动填充 boss_id。
需确保服务已启动、对应 Worker 在线。
用法: python scripts/fill_boss_ids.py [--base-url URL] [--user USER] [--password PASS]
"""
import argparse
import os
import sys
# 项目根目录加入路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import requests
DEFAULT_BASE = os.getenv("BOSS_DP_API", "http://127.0.0.1:9000")
DEFAULT_USER = os.getenv("ADMIN_USERNAME", "admin")
DEFAULT_PASS = os.getenv("ADMIN_PASSWORD", "boss_dp_admin")
def main():
parser = argparse.ArgumentParser(description="批量触发检测登录,填充 boss_id")
parser.add_argument("--base-url", default=DEFAULT_BASE, help="API 地址")
parser.add_argument("--user", default=DEFAULT_USER, help="管理员用户名")
parser.add_argument("--password", default=DEFAULT_PASS, help="管理员密码")
args = parser.parse_args()
base = args.base_url.rstrip("/")
session = requests.Session()
# 登录
try:
r = session.post(
f"{base}/api/auth/login",
json={"username": args.user, "password": args.password},
timeout=10,
)
except requests.exceptions.ConnectionError as e:
print(f"连接失败,请检查服务是否启动、地址是否正确: {base}")
print(f" 错误: {e}")
return 1
except requests.exceptions.Timeout:
print(f"请求超时: {base}")
return 1
if r.status_code != 200:
print(f"登录失败: HTTP {r.status_code}")
print(f" 响应: {r.text[:500] if r.text else '(无内容)'}")
if r.status_code == 502:
print(" 502 通常表示代理/网关无法连接后端,请确认服务已启动,或使用 --base-url 指定正确地址")
return 1
print("登录成功")
# 触发填充
r = session.post(f"{base}/api/accounts/fill-boss-ids")
if r.status_code != 200:
print(f"请求失败: {r.status_code} {r.text}")
return 1
data = r.json()
if data.get("code") != 200:
print(f"接口错误: {data}")
return 1
d = data.get("data", {})
print(d.get("message", "完成"))
print(f" 已触发: {d.get('submitted', 0)}, 跳过: {d.get('skipped', 0)}")
for e in d.get("errors", []):
print(f" - {e}")
return 0
if __name__ == "__main__":
sys.exit(main())