Files
aichebao-automation-project/本田注入.py
2025-12-16 19:23:39 +08:00

872 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import os
import random
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
import cv2
from DrissionPage import *
from flask import Flask, request, jsonify
from loguru import logger
class Acb:
def __init__(self, a=None):
self.local_port = 9999
self.tmp_path = "本田/tmp" # 临时文件存储路径
self.user_data_path = "本田/user_data" # 用户数据存储路径
self.cache_path = "本田/cache_path" # 存储数据路径
self.a = a # 数据
def take_over_browser(self):
try:
self.options = ChromiumOptions()
self.options.set_local_port(self.local_port)
self.options.set_tmp_path(self.tmp_path)
self.options.set_cache_path(self.cache_path)
self.options.set_user_data_path(self.user_data_path)
# self.options.auto_port()
self.page = ChromiumPage(addr_or_opts=self.options)
self.page.set.window.max()
return True
except:
return False
def get_size(self):
# 定义图片文件路径
bg_file_path = './0.jpg'
tp_file_path = './1.jpg'
# 检查背景图片文件是否存在
if not os.path.exists(bg_file_path):
print(f"背景图片文件 {bg_file_path} 不存在,请检查文件路径。")
else:
# 读取背景图片
bg_img = cv2.imread(bg_file_path)
if bg_img is None:
print(f"无法读取背景图片文件 {bg_file_path},请检查文件完整性。")
else:
# 检查缺口图片文件是否存在
if not os.path.exists(tp_file_path):
print(f"缺口图片文件 {tp_file_path} 不存在,请检查文件路径。")
else:
# 读取缺口图片
tp_img = cv2.imread(tp_file_path)
if tp_img is None:
print(f"无法读取缺口图片文件 {tp_file_path},请检查文件完整性。")
else:
# 识别图片边缘
bg_edge = cv2.Canny(bg_img, 100, 200)
tp_edge = cv2.Canny(tp_img, 100, 200)
# 转换图片格式
bg_pic = cv2.cvtColor(bg_edge, cv2.COLOR_GRAY2RGB)
tp_pic = cv2.cvtColor(tp_edge, cv2.COLOR_GRAY2RGB)
# 缺口匹配
res = cv2.matchTemplate(bg_pic, tp_pic, cv2.TM_CCOEFF_NORMED)
# 寻找最优匹配
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
# 左上角点的坐标
tl = max_loc
return tl[0]
def add_car(self, tab):
tab.ele('x:(//span[normalize-space(text()) = "新增车主车辆"])[last()]', timeout=10).click()
time.sleep(1)
tab.ele('x:(//input[@labelname="VIN"])[last()]').input(self.a["VIN码"], clear=True)
time.sleep(5)
if tab.ele('x:(//p[normalize-space(text()) = "该车辆已存在,是否带入车主及车辆信息?"])[last()]',
timeout=10) or tab.ele('x:(//p[normalize-space(text()) = "该车辆已存在,是否带入车辆信息?"])[last()]',
timeout=10):
xx_ele = tab.ele('x:(//span[normalize-space(text()) = "确认"])[last()]', timeout=10)
xx_ele.click()
if self.a.get("换表里程", ''):
xx_ele = tab.ele('x:(//input[@labelname="换表里程"])[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(self.a["换表里程"], clear=True)
time.sleep(1)
if self.a.get("累计换表里程", ''):
xx_ele = tab.ele('x:(//input[@labelname="累计换表里程"])[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(self.a["累计换表里程"], clear=True)
time.sleep(1)
if self.a.get("发动机编号", ''):
xx_ele = tab.ele('x:(//input[@labelname="发动机编号"])[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(self.a["发动机编号"], clear=True)
time.sleep(1)
if self.a.get("电机号", ''):
xx_ele = tab.ele('x:(//input[@labelname="电机号"])[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(self.a["电机号"], clear=True)
time.sleep(1)
if self.a.get("动力类型", ''):
xx_ele = tab.ele('x:(//div[@labelname="动力类型"]//input)[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["动力类型"]}"])[last()]', timeout=10).click()
time.sleep(1)
if self.a.get("开始日期", ''):
xx_ele = tab.ele('x:(//input[@placeholder="开始日期"])[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(self.a["开始日期"], clear=True)
time.sleep(1)
if self.a.get("结束日期", ''):
xx_ele = tab.ele('x:(//input[@placeholder="结束日期"])[last()]')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(self.a["结束日期"] + "\n", clear=True)
time.sleep(1)
if self.a.get("车主手机号", ''):
tab.ele(
'x://*[@id="app"]/div[3]/div/div[2]/div/div[4]/section/form/div/div/div[1]/div/div/div/div[1]/input').input(
self.a["车主手机号"], clear=True)
time.sleep(1)
if self.a.get("车牌号", ''):
tab.ele(
'x://*[@id="app"]/div[3]/div/div[2]/div/div[2]/section/form/div/div/div[5]/div/div/div/input').input(
self.a["车牌号"], clear=True)
time.sleep(1)
if self.a.get("是否本公司购车", ''):
tab.ele('x:(//div[@labelname="是否本公司购车"])[last()]').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["是否本公司购车"]}"])[last()]', timeout=10).click()
time.sleep(1)
if self.a.get("销售顾问", ''):
tab.ele('x:(//input[@labelname="销售顾问"])[last()]').input(self.a["销售顾问"], clear=True)
time.sleep(1)
if self.a.get("车主名称", ''):
tab.ele('x:(//input[@labelname="车主名称"])[last()]').input(self.a["车主名称"], clear=True)
time.sleep(1)
if self.a.get("车主性质", ''):
tab.ele('x:(//div[@labelname="车主性质"])[last()]').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["车主性质"]}"])[last()]', timeout=10).click()
time.sleep(1)
if self.a.get("性别", ''):
tab.ele('x:(//div[@labelname="性别"])').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["性别"]}"])[last()]', timeout=10).click()
time.sleep(1)
if self.a.get("地址", ''):
tab.ele('x:(//input[@labelname="地址"])[last()]').input(self.a["地址"], clear=True)
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "提交"])[last()]', timeout=10).click()
def action(self):
# 接管浏览器
if self.take_over_browser():
logger.info("接管浏览器浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
tab = self.page.new_tab(
url="https://bcp.dongfeng-honda.com/#/dop/dop/partsStorage/serviceReception/customerReception")
dl_ele = tab.ele('x:(//span[normalize-space(text()) = "登录"])[last()]', timeout=10)
if dl_ele:
tab.listen.start(targets="data:image/jpg;base64") # 抓包图片
tab.ele('x:(//input[@placeholder="用户名/手机号"])[last()]').input(vals=config.get('账号', '用户名'),
clear=True)
time.sleep(1)
tab.ele('x:(//input[@placeholder="密码"])[last()]').input(vals="", clear=True)
tab.ele('x:(//input[@placeholder="密码"])[last()]').input(vals=config.get('账号', '密码'))
time.sleep(3)
dl_ele.click()
for _, i in enumerate(tab.listen.wait(count=2)):
with open(f'./{_}.jpg', 'wb') as f:
f.write(i.response.body)
AA = self.get_size()
# 左键按住 滑标 元素
tab.actions.hold('@id=cutImage')
AA = AA + 5
tab.actions.right(AA)
# 模拟人的行为 移动过去,再移动回来
tab.actions.right(5)
time.sleep(random.randint(1, 3))
tab.actions.left(5)
time.sleep(random.randint(1, 3))
# 释放左键
tab.actions.release()
# 查询
time.sleep(5)
tab.get(
url="https://bcp.dongfeng-honda.com/#/dop/dop/partsStorage/serviceReception/customerReception")
time.sleep(3)
tab.ele('x://span[text()="新建"]').click()
time.sleep(1)
tab.ele('x:(//input[@labelname="VIN码"])[last()]').input(vals=self.a["VIN码"], clear=True)
time.sleep(1)
tab.ele('x:(//span[text()="查询"])[last()]').click()
time.sleep(5)
xx_ele = tab.ele('x:(//span[normalize-space(text()) = "选择"])[last()]', timeout=5)
if xx_ele:
xx_ele.click()
time.sleep(3)
else:
self.add_car(tab=tab)
time.sleep(5)
xx_ele = tab.ele('x:(//span[normalize-space(text()) = "选择"])[last()]', timeout=5)
if xx_ele:
xx_ele.click()
time.sleep(1)
xx_ele = tab.ele('x:(//span[normalize-space(text()) = "继续添加"])[last()]', timeout=10)
if xx_ele:
xx_ele.click()
time.sleep(1)
time.sleep(3)
xx_ele = tab.ele('x:(//div[@aria-label="当前活动"]//span[normalize-space(text()) = "确认"])[last()]',
timeout=10)
if xx_ele:
try:
xx_ele.click()
except:
pass
time.sleep(1)
if self.a.get("工单类型", ''):
time.sleep(1)
tab.ele('x:(//div[@labelname="工单类型"])[last()]').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["工单类型"]}"])[last()]').click()
time.sleep(1)
if self.a.get("维修类型", ''):
tab.ele('x:(//div[@labelname="维修类型"])[last()]').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a['维修类型']}"])[last()]').click()
time.sleep(1)
if self.a.get("二级维修类型", ''):
tab.ele('x:(//div[@labelname="二级维修类型"])[last()]').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["二级维修类型"]}"])[last()]').click()
time.sleep(1)
if self.a.get("出厂行驶里程", ''):
tab.ele('x:(//input[@labelname="进厂行驶里程"])[last()]').input(self.a["进厂行驶里程"], clear=True)
time.sleep(1)
tab.ele('x:(//input[@labelname="出厂行驶里程"])[last()]').input(self.a["出厂行驶里程"], clear=True)
time.sleep(1)
if self.a.get("预计交车时间", ''):
tab.ele('x:(//div[@labelname="预计交车时间"])').click()
time.sleep(1)
tab.ele('x:(//input[@placeholder="选择日期"])').input(vals=self.a["预计交车时间"].split()[0], clear=True)
time.sleep(1)
tab.ele('x:(//input[@placeholder="选择时间"])').input(vals=self.a["预计交车时间"].split()[1], clear=True)
time.sleep(1)
tab.ele('x:(//span[normalize-space(text()) = "确定"])[last()]').click()
time.sleep(1)
if self.a.get("送修保险公司", ''):
tab.ele('x:(//div[@labelname="送修保险公司"])').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["送修保险公司"]}"])[last()]').click()
time.sleep(1)
if self.a.get("报案号", ''):
tab.ele('x:(//input[@labelname="报案号"])').input(vals=self.a["报案号"], clear=True)
time.sleep(1)
if self.a.get("是否全钣喷", ''):
tab.ele('x:(//div[@labelname="是否全钣喷"])').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["是否全钣喷"]}"])[last()]').click()
time.sleep(1)
if self.a.get("三日电访时间", ''):
tab.ele('x:(//div[@labelname="三日电访时间"])').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["三日电访时间"]}"])[last()]').click()
time.sleep(1)
xx_ele = tab.ele('x:(//input[@labelname="送修人"])')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(vals=self.a["送修人"], clear=True)
time.sleep(1)
xx_ele = tab.ele('x:(//div[@labelname="送修人性别"])')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{self.a["送修人性别"]}"])[last()]').click()
time.sleep(1)
xx_ele = tab.ele('x:(//input[@labelname="送修人电话"])')
if xx_ele:
if not xx_ele.attrs.get('disabled', ""):
xx_ele.input(vals=self.a["送修人电话"], clear=True)
# # 维修套餐
# 维修套餐 = self.a.get("维修套餐")
# if 维修套餐:
# tab.ele('x:(//span[text() = "维修套餐"])[last()]').click()
# time.sleep(3)
# tab.ele('x:(//input[@labelname="套餐名称"])').input(维修套餐["套餐名称"])
# time.sleep(3)
# tab.ele('x://*[@aria-label="维修套餐"]//span[text() = "查询"]').click()
# time.sleep(3)
# tab.ele(f'x:(//span[normalize-space(text()) = "{维修套餐["套餐编号"]}"])[last()]').click()
# time.sleep(3)
# tab.ele('x://*[@aria-label="维修套餐"]//span[text() = "确认"]').click()
# time.sleep(5)
#
# wxxm_eles = tab.eles(f'x://*[@id="MicroAppMain"]/div/div[1]/div[7]/div[3]/table/tbody/tr')
# for i in 维修套餐["项目"]:
# for _, wxxm_ele in enumerate(wxxm_eles):
# print(wxxm_ele.ele('x:./td[6]').text)
# if wxxm_ele.ele('x:./td[6]').text == i.get('零部件代码'):
# tab.actions.scroll(on_ele=f"text={_ + 1}")
# time.sleep(1)
# tab.actions.scroll(on_ele=f"text=销售材料")
# time.sleep(1)
# wxxm_ele.ele('x:./td[8]/div/div/input').input(i["数量"], clear=True)
# break
#
# if wxxm_ele.ele('x:./td[6]').text == i.get(
# '车型分组代码'):
# tab.actions.scroll(on_ele=f"text={_ + 1}")
# time.sleep(1)
# tab.actions.scroll(on_ele=f"text=销售材料")
# time.sleep(1)
# wxxm_ele.ele('x:./td[8]/div/div/input').input(i["数量"], clear=True)
# time.sleep(1)
# wxxm_ele.ele('x:./td[10]/div/div/div/input').click()
# time.sleep(1)
# tab.ele(f'x:(//span[normalize-space(text()) = "{i["工时单价"]}"])[last()]').click()
# break
# else:
# self.a["项目"].append(i)
# 维修项目
n = 1 # 第几个项目
start = 0 # 材料判断是否主因零部件
tab.listen.start("dop/se_rpms_core/package/packageMaintenanceQuery")
维修套餐 = self.a.get("维修套餐")
if 维修套餐:
tab.ele('x:(//span[text() = "维修套餐"])[last()]').click()
time.sleep(3)
tab.ele('x:(//input[@labelname="套餐名称"])').input(维修套餐["套餐名称"])
time.sleep(1)
tab.ele('x://*[@aria-label="维修套餐"]//span[text() = "查询"]').click()
time.sleep(3)
tab.ele(f'x:(//span[normalize-space(text()) = "{维修套餐["套餐编号"]}"])[last()]').click()
time.sleep(3)
tab.ele('x://*[@aria-label="维修套餐"]//span[text() = "确认"]').click()
time.sleep(5)
packageRepairDTOList = None
for i in tab.listen.steps():
for i1 in i.response.body["rows"]:
if i1["packageCode"] == 维修套餐["套餐编号"]:
packageRepairDTOList = i1["packageRepairDTOList"]
break
if packageRepairDTOList:
break
wxxm_eles = tab.eles(f'x://*[@id="MicroAppMain"]/div/div[1]/div[7]/div[3]/table/tbody/tr')
while n <= len(wxxm_eles):
wxxm_ele = wxxm_eles[n - 1]
for i in 维修套餐["项目"]:
print(wxxm_ele.ele('x:./td[6]').text)
if i.get("项目类型") == "材料" and (packageRepairDTOList[n - 1].get('itemCode') == i.get('零部件代码') or i.get("零部件代码", "") == "") and i.get("维修项目") == packageRepairDTOList[n - 1].get('itemName'): # 材料
tab.actions.scroll(on_ele=f"text={n}")
time.sleep(1)
tab.actions.scroll(on_ele=f"text=销售材料")
time.sleep(1)
wxxm_ele.ele('x:./td[8]/div/div/input').input(i["数量"], clear=True)
input_ele = wxxm_ele.ele('x:./td[12]/div/label/span', timeout=1)
if not start and input_ele:
input_ele.click()
time.sleep(1)
start += 1
n += 1
break
# 工时
if i.get("项目类型") == "工时" and (packageRepairDTOList[n - 1].get('itemCode') == i.get('车型分组代码') or i.get("车型分组代码", "") == "") and i.get("维修项目") == packageRepairDTOList[n - 1].get('itemName'):
tab.actions.scroll(on_ele=f"text={n}")
time.sleep(1)
tab.actions.scroll(on_ele=f"text=销售材料")
time.sleep(1)
wxxm_ele.ele('x:./td[8]/div/div/input').input(i["数量"], clear=True)
time.sleep(1)
wxxm_ele.ele('x:./td[10]/div/div/div/input').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{i["工时单价"]}"])[last()]').click()
n += 1
break
else:
tab.actions.scroll(on_ele=f"text={n}")
time.sleep(1)
tab.actions.scroll(on_ele=f"text=销售材料")
time.sleep(1)
wxxm_ele.ele(f'x:.//span[normalize-space(text()) = "删除"]').click()
del packageRepairDTOList[n - 1]
# n = max(n - 1, 1)
time.sleep(1)
wxxm_eles = tab.eles(f'x://*[@id="MicroAppMain"]/div/div[1]/div[7]/div[3]/table/tbody/tr')
wxxm_eles = tab.eles(f'x://*[@id="MicroAppMain"]/div/div[1]/div[7]/div[3]/table/tbody/tr')
# 维修项目
if self.a.get("项目"):
for data1 in self.a.get("项目"):
if data1["项目类型"] == "工时":
tab.ele('x:(//span[text() = "维修项目"])[last()]').click()
time.sleep(1)
tab.ele('x:(//input[@labelname="维修项目名称"])').input(data1["维修项目"])
time.sleep(1)
tab.ele('x:(//input[@labelname="车型组"])').input(vals="", clear=True)
time.sleep(1)
tab.ele('x:(//span[normalize-space(text()) = "查询"])[last()]').click()
time.sleep(1)
ele = tab.ele(f'x:(//span[normalize-space(text()) = "{data1["车型分组代码"]}"])[last()]',
timeout=3) # 车型分组代码
if ele:
ele.click()
time.sleep(1)
tab.ele(
'x:(//div[@aria-label="维修项目"]//span[normalize-space(text()) = "确认"])[last()]').click()
time.sleep(1)
wxxm_ele = tab.ele(
f'x://*[@id="MicroAppMain"]/div/div[1]/div[7]/div[3]/table/tbody/tr[{n}]')
time.sleep(1)
tab.actions.scroll(on_ele=f"text={n}")
time.sleep(1)
tab.actions.scroll(on_ele=f"text=销售材料")
time.sleep(1)
wxxm_ele.ele('x:./td[8]/div/div/input').input(data1["数量"], clear=True)
time.sleep(1)
wxxm_ele.ele('x:./td[10]/div/div/div/input').click()
time.sleep(1)
tab.ele(f'x:(//span[normalize-space(text()) = "{data1["工时单价"]}"])[last()]').click()
n += 1
else:
tab.ele('x:(//span[normalize-space(text()) = "取消"])[last()]').click()
time.sleep(3)
else:
tab.ele('x:(//span[text() = "手动选择零部件"])[last()]').click()
time.sleep(1)
tab.ele(
f'x://*[@class="el-col el-col-7 fd-col-custCommon "]//input[@placeholder="请输入"]',
timeout=1).input(
f"{data1["维修项目"]}\n", )
time.sleep(1)
tab.ele('x:(//span[normalize-space(text()) = "查询"])[last()]').click()
time.sleep(15)
ele = tab.ele(f'x:(//span[normalize-space(text()) = "{data1["零部件代码"]}"])[last()]',
timeout=3)
if ele:
ele.click() # 车型分组代码
time.sleep(1)
tab.ele(
f'x://*[@class="btn-col el-col el-col-1"]//button[@class="el-button el-button--primary el-button--small"]').click() # 车型分组代码
time.sleep(1)
tab.ele('x:(//span[normalize-space(text()) = "确认"])[last()]').click()
time.sleep(1)
tab.actions.scroll(on_ele=f"text={n}")
time.sleep(1)
tab.actions.scroll(on_ele=f"text=销售材料")
time.sleep(1)
wxxm_ele = tab.ele(
f'x://*[@id="MicroAppMain"]/div/div[1]/div[7]/div[3]/table/tbody/tr[{n}]')
time.sleep(1)
wxxm_ele.ele('x:./td[8]/div/div/input').input(data1["数量"], clear=True)
input_ele = wxxm_ele.ele('x:./td[12]/div/label/span', timeout=1)
if not start and input_ele:
input_ele.click()
time.sleep(1)
start += 1
n += 1
else:
tab.ele('x:(//span[normalize-space(text()) = "取消"])[last()]').click()
time.sleep(3)
tab.ele('x:(//span[normalize-space(text()) = "保存"])[last()]').click()
time.sleep(15)
if tab.ele('x:(//p[normalize-space(text()) = "保存后,将生成电子委托书发送至服务号!"])[last()]'):
tab.close()
# self.page.close()
return True
else:
tab.close()
# self.page.close()
raise "注入失败"
app = Flask(__name__)
# ========== 线程池与任务存储 ==========
executor = ThreadPoolExecutor(max_workers=5) # 最多同时执行5个任务
tasks = {} # 存储任务状态 {task_id: {"status": "pending"/"running"/"success"/"error", "message": "..."}}
lock = threading.Lock() # 防止多线程修改冲突
# ========== 异步任务函数 ==========
def run_acb_task(task_id, data):
"""后台执行的 ACB 任务"""
try:
with lock:
tasks[task_id]["status"] = "running"
tasks[task_id]["message"] = "任务正在执行中"
acb_instance = Acb(a=data)
result = acb_instance.action() # 执行核心逻辑
with lock:
tasks[task_id]["status"] = "success"
tasks[task_id]["message"] = "注入成功"
tasks[task_id]["result"] = result
except Exception as e:
logger.exception("任务执行出错")
with lock:
tasks[task_id]["status"] = "error"
tasks[task_id]["message"] = str(e)
# ========== 路由部分 ==========
@app.route("/acb", methods=["POST"])
def create_task():
"""创建新任务"""
try:
data = request.get_json(force=True)
if not data:
return jsonify({"status": "error", "message": "请求体为空或格式错误"}), 400
# 生成任务 ID
task_id = str(uuid.uuid4())
# 初始化任务状态
with lock:
tasks[task_id] = {"status": "pending", "message": "任务已提交,等待执行"}
# 提交后台执行
executor.submit(run_acb_task, task_id, data)
return jsonify({"status": "true", "task_id": task_id, "message": "任务已创建"}), 200
except Exception as e:
logger.exception("创建任务出错")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route("/acb/status/<task_id>", methods=["GET"])
def check_task_status(task_id):
"""查询任务执行状态"""
with lock:
task_info = tasks.get(task_id)
if not task_info:
return jsonify({"status": "error", "message": "任务ID不存在"}), 404
return jsonify({
"status": task_info["status"],
"message": task_info["message"],
"result": task_info.get("result")
})
# ========== 启动服务 ==========
if __name__ == "__main__":
# 配置文件路径
config_file = 'config.ini'
# 创建 ConfigParser 对象
config = configparser.ConfigParser()
# 检查文件是否存在
try:
with open(config_file, 'r') as f:
config.read_file(f)
except FileNotFoundError:
# 文件不存在,添加默认配置
config['账号'] = {
'用户名': 'WD20098746',
'密码': 'Zhengxifeng12',
}
config['App'] = {
'debug': 'true'
}
# 将配置写入文件
with open(config_file, 'w') as f:
config.write(f)
for i in range(3):
logger.info(f"当前账号配置信息,账号:{config.get('账号', '用户名')},密码:{config.get('账号', '密码')}")
time.sleep(1)
start = int(input("如果使用当前账号(请输入0输入1将重新设置账号信息)"))
if not start:
logger.success("程序启动。。。")
break
if start:
name = input("请输入账号:")
pwd = input("请输入密码:")
config.set('账号', '用户名', name)
config.set('账号', '密码', pwd)
# 将修改后的配置写入文件
with open(config_file, 'w') as f:
config.write(f)
# a = {
# "工单类型": "维修",
# "VIN码": "LVHFE468XP6000197",
# "车牌号": "京CQR336",
# "维修类型": "一般维修",
# "二级维修类型": "",
# "进厂行驶里程": 154546,
# "出厂行驶里程": 1578164,
# "预计交车时间": "2025-11-15 15:35",
# "送修保险公司": "太平财险",
# "报案号": "2025111301",
# "是否全钣喷": "是",
# "三日电访时间": "下午",
# "换表里程": "",
# "累计换表里程": "",
# "发动机编号": "AS77879ACB",
# "电机号": "",
# "动力类型": "混动",
# "是否本公司购车": "",
# "开始日期": "",
# "结束日期": "",
# "销售顾问": "郑喜峰",
# "车主手机号": "13910921511",
# "车主名称": "金克程",
# "车主性质": "个人",
# "性别": "男性",
# "地址": "空",
# "送修人": "金克程",
# "送修人性别": "男性",
# "送修人电话": "18108230647",
# "维修套餐": {
# "套餐名称": "FA1-20000公里保养",
# "套餐编号": "FA1A2",
# "项目": [
# {
# "项目类型": "材料",
# "维修项目": "放油口垫",
# "数量": 4.5,
# "零部件代码": "94109-14000."
# },
# # {
# # "项目类型": "材料",
# # "维修项目": "发动机保护剂",
# # "数量": 4.5,
# # "零部件代码": "08CH2-ADC-YW1"
# # },
#
# {
# "项目类型": "材料",
# "维修项目": "大黑桶全合成机油0W20日石4L",
# "数量": 0.5,
# "零部件代码": "08235-W99-A4PJ3"
# },
# # {
# # "项目类型": "材料",
# # "维修项目": "机油滤清器MAHLE",
# # "数量": 0.5,
# # "零部件代码": "15400-R5G-H01"
# # },
# {
# "项目类型": "材料",
# "维修项目": "化清",
# "数量": 0.5,
# "零部件代码": "TLY-HQ"
# },
# {
# "项目类型": "材料",
# "维修项目": "发动机缸压恢复剂",
# "数量": 0.5,
# "零部件代码": "123123"
# },
# {
# "项目类型": "工时",
# "维修项目": "轮胎交叉换位(含动平衡)",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": ""
# },
# {
# "项目类型": "工时",
# "维修项目": "常规保养检查",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "123456"
# },
# {
# "项目类型": "工时",
# "维修项目": "10000KM间隔保养",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "10000BY"
# },
# {
# "项目类型": "工时",
# "维修项目": "*拆装清洗节气门体",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "BY0026"
# },
# {
# "项目类型": "工时",
# "维修项目": "*检查前刹车片、后刹车片",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": ""
# },
# {
# "项目类型": "工时",
# "维修项目": "已告知预存保养费尊享工时折扣",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": ""
# },
# {
# "项目类型": "工时",
# "维修项目": "已告知微信预约绿色通道专享",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": ""
# },
# {
# "项目类型": "工时",
# "维修项目": "说明:本工单所有项目为建议维保内容",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": ""
# }
# ]
# },
# "项目": [
# {
# "项目类型": "材料",
# "维修项目": "发动机机油SN0W20 4L日石",
# "数量": 0.5,
# "零部件代码": "08234-W99-A4PJ3"
# },
# {
# "项目类型": "工时",
# "维修项目": "点烟器总成 - 更换",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "CU2"
# },
# {
# "项目类型": "工时",
# "维修项目": "拆装侧踏",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "RD5"
# },
# {
# "项目类型": "工时",
# "维修项目": "两轮轮胎动平衡-调整",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "CU2"
# },
# {
# "项目类型": "工时",
# "维修项目": "仪表灯泡- 更换",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "CU2"
# },
# {
# "项目类型": "工时",
# "维修项目": "驻车制动器-调整",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "RD7"
# },
# {
# "项目类型": "工时",
# "维修项目": "室内干洗",
# "数量": 1,
# "工时单价": 100,
# "车型分组代码": "RE2"
# }
# ]
# }
#
# Acb(
# a=a,
# ).action()
try:
acb = Acb()
acb.take_over_browser()
except Exception as e:
logger.warning(f"初始化浏览器控制失败: {e}")
app.run(host="0.0.0.0", port=5000, )