Files
aichebao-automation-project/智慧门店接口.py
2025-11-25 11:37:17 +08:00

74 lines
1.5 KiB
Python

import uuid
import threading
from flask import Flask, request, jsonify
from loguru import logger
from 智慧门店自动注入程序 import Zh
app = Flask(__name__)
# 内存保存任务状态
TASKS = {} # {task_id:{status:"running/success/failed", result:{}}}
def run_task(task_id, payload):
"""
后台执行浏览器自动化任务
"""
try:
TASKS[task_id]["status"] = "running"
zh = Zh(a=payload)
# zh.login_page()
zh.action()
TASKS[task_id]["status"] = "success"
TASKS[task_id]["result"] = {"msg": "任务执行成功"}
except Exception as e:
logger.exception(e)
TASKS[task_id]["status"] = "failed"
TASKS[task_id]["result"] = {"error": str(e)}
@app.post("/create_task")
def create_task():
"""
提交任务数据,返回 task_id
"""
payload = request.json
if not payload:
return jsonify({"error": "缺少 JSON 数据"}), 400
task_id = str(uuid.uuid4())
TASKS[task_id] = {
"status": "waiting",
"result": None
}
# 创建后台线程执行
t = threading.Thread(target=run_task, args=(task_id, payload))
t.start()
return jsonify({"task_id": task_id})
@app.get("/task_status/<task_id>")
def task_status(task_id):
"""
查询任务执行状态
"""
task = TASKS.get(task_id)
if not task:
return jsonify({"error": "task_id 不存在"}), 404
return jsonify(task)
if __name__ == '__main__':
zh = Zh(a={})
zh.login_page()
app.run(port=5000)