Files
aichebao-automation-project/接口.py

93 lines
2.7 KiB
Python
Raw Normal View History

2025-11-10 15:27:47 +08:00
from flask import Flask, request, jsonify
from loguru import logger
2025-11-14 11:44:51 +08:00
from concurrent.futures import ThreadPoolExecutor
import uuid
import threading
import time
2025-11-10 15:27:47 +08:00
2025-12-03 13:17:36 +08:00
from 本田注入 import Acb # 你的自定义模块
2025-11-10 15:27:47 +08:00
app = Flask(__name__)
2025-11-14 11:44:51 +08:00
# ========== 线程池与任务存储 ==========
executor = ThreadPoolExecutor(max_workers=5) # 最多同时执行5个任务
tasks = {} # 存储任务状态 {task_id: {"status": "pending"/"running"/"success"/"error", "message": "..."}}
lock = threading.Lock() # 防止多线程修改冲突
2025-11-13 16:15:06 +08:00
2025-11-14 11:44:51 +08:00
# ========== 异步任务函数 ==========
def run_acb_task(task_id, data):
"""后台执行的 ACB 任务"""
2025-11-10 15:27:47 +08:00
try:
2025-11-14 11:44:51 +08:00
with lock:
tasks[task_id]["status"] = "running"
tasks[task_id]["message"] = "任务正在执行中"
acb_instance = Acb(a=data)
result = acb_instance.action() # 执行核心逻辑
with lock:
tasks[task_id]["status"] = "success"
tasks[task_id]["message"] = "注入成功"
tasks[task_id]["result"] = result
2025-11-13 16:15:06 +08:00
2025-11-14 11:44:51 +08:00
except Exception as e:
logger.exception("任务执行出错")
with lock:
tasks[task_id]["status"] = "error"
tasks[task_id]["message"] = str(e)
# ========== 路由部分 ==========
@app.route("/acb", methods=["POST"])
def create_task():
"""创建新任务"""
try:
2025-11-10 15:27:47 +08:00
data = request.get_json(force=True)
if not data:
return jsonify({"status": "error", "message": "请求体为空或格式错误"}), 400
2025-11-14 11:44:51 +08:00
# 生成任务 ID
task_id = str(uuid.uuid4())
# 初始化任务状态
with lock:
tasks[task_id] = {"status": "pending", "message": "任务已提交,等待执行"}
2025-11-10 15:27:47 +08:00
2025-11-14 11:44:51 +08:00
# 提交后台执行
executor.submit(run_acb_task, task_id, data)
return jsonify({"status": "true", "task_id": task_id, "message": "任务已创建"}), 200
2025-11-10 15:27:47 +08:00
except Exception as e:
2025-11-14 11:44:51 +08:00
logger.exception("创建任务出错")
2025-11-10 15:27:47 +08:00
return jsonify({"status": "error", "message": str(e)}), 500
2025-11-14 11:44:51 +08:00
@app.route("/acb/status/<task_id>", methods=["GET"])
def check_task_status(task_id):
"""查询任务执行状态"""
with lock:
task_info = tasks.get(task_id)
if not task_info:
return jsonify({"status": "error", "message": "任务ID不存在"}), 404
return jsonify({
"status": task_info["status"],
"message": task_info["message"],
"result": task_info.get("result")
})
# ========== 启动服务 ==========
2025-11-10 15:27:47 +08:00
if __name__ == "__main__":
2025-11-14 11:44:51 +08:00
try:
acb = Acb()
acb.take_over_browser()
except Exception as e:
logger.warning(f"初始化浏览器控制失败: {e}")
2025-11-10 18:30:38 +08:00
2025-11-14 11:44:51 +08:00
app.run(host="0.0.0.0", port=5000, debug=False)