Files
aichebao-automation-project/智慧门店自动注入程序.py
2025-12-03 12:58:35 +08:00

257 lines
8.2 KiB
Python

import time
import pinyin
import configparser
from loguru import *
from DrissionPage import *
class Zh:
def __init__(self, a):
self.local_port = 8800
self.tmp_path = "智慧门店/tmp/" # 临时文件存储路径
self.user_data_path = "智慧门店/user_data/" # 用户数据存储路径
self.cache_path = "智慧门店/cache_path/" # 存储数据路径
self.a = a
def take_over_browser(self):
try:
self.options = ChromiumOptions()
self.options.set_local_port(self.local_port)
self.options.set_tmp_path(self.tmp_path)
self.options.set_cache_path(self.cache_path)
self.options.set_user_data_path(self.user_data_path)
self.page = ChromiumPage(addr_or_opts=self.options)
self.page.set.window.max()
return True
except:
return False
def login(self):
self.page.get(
'https://yunxiu.f6car.cn/erp/view/index.html#/index/frame?pageType=vue&toUrl=%2Fmaintain%2Fui%2Findex.html&route=%2Frepair%2Fedit&_ts=1763622948082')
time.sleep(1)
ele = self.page.ele('text=立即登录', timeout=1)
if ele:
self.page.ele('@placeholder=账号/手机号').input(vals="13593343360", clear=True)
self.page.ele('@placeholder=密码').input(vals="Aa123456.", clear=True)
ele.click()
self.page.ele('text=太原市裕龙顺达汽车销售服务有限公司').click()
self.page.get(
url="https://yunxiu.f6car.cn/erp/view/index.html#/index/frame?pageType=vue&toUrl=%2Fmaintain%2Fui%2Findex.html&route=%2Frepair%2Fedit&_ts=1763622948082")
self.automate_newbie_tutorial() # 过新手教程
def automate_newbie_tutorial(self):
ele = self.page.ele('x://span[normalize-space(text()) = "开始新手学习"]', timeout=1)
if ele:
try:
self.page.ele('x:/html/body/div[3]/div/div[1]/button', timeout=1).click()
except:
pass
for i in range(10):
ele = self.page.ele('x://span[normalize-space(text()) = "下一步"]', timeout=0.5)
if ele:
ele.click()
time.sleep(1)
ele = self.page.ele('x://span[normalize-space(text()) = "我知道了"]', timeout=0.5)
if ele:
ele.click()
time.sleep(1)
def login_page(self):
# 接管浏览器
if self.take_over_browser():
logger.info("接管浏览器浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
self.login() # 登录
self.page.get(
url="https://yunxiu.f6car.cn/erp/view/index.html#/index/frame?pageType=vue&toUrl=%2Fmaintain%2Fui%2Findex.html&route=%2Frepair%2Fedit&_ts=1763622948082")
self.automate_newbie_tutorial() # 过新手教程
def action(self):
# 接管浏览器
if self.take_over_browser():
logger.info("接管浏览器浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
self.login() # 登录
# 开始注入
tab = self.page.new_tab(
url="https://yunxiu.f6car.cn/maintain/ui/index.html#/repair/edit")
time.sleep(3)
tab.ele("@placeholder=请输入姓名").input(vals=self.a["姓名"], clear=True)
time.sleep(1)
tab.ele("@placeholder=支持手机号搜索").input(vals=self.a["手机号"], clear=True)
time.sleep(3)
tab('text=个人').click()
time.sleep(1)
tab.ele(
'x://*[@id="customerAddSimple"]/form/div/div[3]/div/div/div/form/div/div/div[1]/div[1]/div/span/button').click()
time.sleep(3)
tab.eles(f'text={self.a["车牌地区"]}')[-1].click()
time.sleep(3)
tab.ele('@placeholder=最多7个字符').input(self.a["车牌号"])
time.sleep(3)
tab.ele(f"text={self.a['用途']}").click()
time.sleep(3)
tab.ele('@placeholder=17位数字和字母组合').input(vals=self.a['vin'], clear=True)
time.sleep(3)
try:
try:
tab.ele("@placeholder=点击选择车型", timeout=0.5).click()
time.sleep(3)
word = self.a["品牌"]
first_char = word[0]
first_pinyin = pinyin.get(first_char, format="strip")[0]
first_letter = first_pinyin[0].upper()
tab.ele(f'x:(//li[normalize-space(text()) = "{first_letter}"])[last()]').click()
time.sleep(3)
tab.ele(f'@title={word}').click()
time.sleep(3)
tab.ele('text=CC').click()
time.sleep(3)
for i in range(2):
tab.ele('x://div[@aria-label="选择车型"]/div[2]/div/div/div[2]/div[4]/div[1]').click()
time.sleep(3)
except:
try:
tab.ele("@placeholder=通过VIN码检测出多款车型,请选择", timeout=0.5).click()
tab.ele("x://div[@aria-label='多个车型选择']/div[2]/div/div[2]/div[1]").click()
except:
pass
except:
pass
tab("@placeholder=请输入发动机号").input(self.a['发动机号'])
time.sleep(1)
tab('@placeholder=请选择').click()
time.sleep(3)
tab(f'text={self.a["业务分类"]}').click()
time.sleep(3)
tab('@max=9999999').input(self.a['当前里程'])
time.sleep(3)
for i in self.a["项目"]:
ele = tab('x:(//*[@id="app"]/div[1]/div[3]/div[3]/div/div[3]/table/tbody/tr)[last()]')
input1_ele = ele.ele('x:.//td[2]/div/div/div/div[1]/input')
tab.actions.scroll(on_ele="text=材料名称")
time.sleep(1)
tab.actions.click(on_ele=input1_ele)
time.sleep(1)
tab('x://div[@x-placement="top-start"]//input[@class="au-input__inner"]').input(i)
time.sleep(1)
tab(f'text={i}').click()
time.sleep(1)
# if __name__ == '__main__':
# a = {
# "姓名": "小红",
# "手机号": 17168360408,
# "车牌地区": "渝",
# "车牌号": "K8Y995",
# "用途": "乘用车",
# "vin": "LHGCV366XK8024194",
# "品牌": "大众",
# "车系": "CC",
# "发动机号": "gerggaegrg",
# "业务分类": "养护品",
# "当前里程": "231244",
# "项目": ["更换空气滤", "更换冷却液"]
# }
#
# zh = Zh(a=a)
# zh.login_page()
# zh.action()
import uuid
import threading
from flask import Flask, request, jsonify
from loguru import logger
app = Flask(__name__)
# 内存保存任务状态
TASKS = {} # {task_id:{status:"running/success/failed", result:{}}}
def run_task(task_id, payload):
"""
后台执行浏览器自动化任务
"""
try:
TASKS[task_id]["status"] = "running"
zh = Zh(a=payload)
# zh.login_page()
zh.action()
TASKS[task_id]["status"] = "success"
TASKS[task_id]["result"] = {"msg": "任务执行成功"}
except Exception as e:
logger.exception(e)
TASKS[task_id]["status"] = "failed"
TASKS[task_id]["result"] = {"error": str(e)}
@app.post("/create_task")
def create_task():
"""
提交任务数据,返回 task_id
"""
payload = request.json
if not payload:
return jsonify({"error": "缺少 JSON 数据"}), 400
task_id = str(uuid.uuid4())
TASKS[task_id] = {
"status": "waiting",
"result": None
}
# 创建后台线程执行
t = threading.Thread(target=run_task, args=(task_id, payload))
t.start()
return jsonify({"task_id": task_id})
@app.get("/task_status/<task_id>")
def task_status(task_id):
"""
查询任务执行状态
"""
task = TASKS.get(task_id)
if not task:
return jsonify({"error": "task_id 不存在"}), 404
return jsonify(task)
if __name__ == '__main__':
zh = Zh(a={})
zh.login_page()
app.run(host="0.0.0.0", port=5001)