Files
fws_code/tyyp_service.py
2026-02-26 01:46:57 +08:00

228 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
天翼订购页 1.html 服务层:
- submit_phone: 填写手机号、获取验证码、滑动滑块,监听并返回 hn_userEquitys/getYanZhenMa/v2 响应
- submit_code: 填写验证码、点击确认订购按钮
"""
from __future__ import annotations
import time
from typing import Any
# 复用 tyyp_1html_dp 中的工具函数
from tyyp_1html_dp import (
build_human_track,
calc_drag_distance_from_bytes,
click_safe,
find_first,
parse_data_url,
wait_for_data_src,
_dispatch_mouse,
_get_element_center,
drag_slider,
)
# 需要监听的 URL 特征(滑块通过后前端会请求此接口)
GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2"
def _ensure_listen_before_action(page) -> None:
"""在点击获取验证码之前启动监听,确保能捕获滑块通过后的 getYanZhenMa 响应。"""
page.listen.start(GET_YAN_ZHEN_MA_URL)
def _do_slider_and_wait_response(page, slider_ele, move_distance: int, timeout: float = 15) -> Any:
"""
执行滑块拖动,然后等待 getYanZhenMa/v2 的响应并返回 response.body。
"""
drag_slider(page, slider_ele, move_distance)
time.sleep(0.5)
packet = page.listen.wait(timeout=timeout, fit_count=False)
if packet is False:
raise RuntimeError(f"滑块拖动后 {timeout}s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应")
if isinstance(packet, list):
packet = packet[0] if packet else None
if packet is None:
raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包")
return packet.response.body
def submit_phone(
page,
phone: str,
url: str = "http://yscnb.com/tyyp/1.html",
port: int = 0,
alpha_threshold: int = 12,
distance_adjust: int = 0,
wait_page: float = 0.3,
) -> Any:
"""
填写手机号、点击获取验证码、执行滑块,并返回 getYanZhenMa/v2 接口的响应体。
"""
page.get(url)
time.sleep(wait_page)
# 勾选协议
agree_checkbox = find_first(page, [
"css:#color-input-red",
"css:input[name='color-input-red']",
'x://input[@id="color-input-red"]',
"css:input.right-box[type='checkbox']",
], timeout=5)
if agree_checkbox:
click_safe(agree_checkbox)
time.sleep(0.4)
# 立即订购
order_btn = None
for attempt in range(4):
order_btn = find_first(page, [
"css:div.paybg",
"css:.paybg",
'x://button[contains(.,"立即订购")]',
'x://a[contains(.,"立即订购")]',
'x://span[contains(.,"立即订购")]',
'x://div[contains(.,"立即订购")]',
'x://*[contains(text(),"立即订购")]',
'x://*[contains(.,"立即订购")]',
"css:.btn-order",
"css:button.btn-primary",
"css:button.btn",
"css:a.btn",
], timeout=1)
if order_btn:
break
time.sleep(0.25)
if order_btn:
try:
order_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(order_btn)
time.sleep(0.4)
else:
try:
page.run_js("""
var nodes = document.querySelectorAll('button, a, span, div');
for (var i = 0; i < nodes.length; i++) {
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
if (t.indexOf('立即订购') >= 0) {
nodes[i].scrollIntoView({block: 'center'});
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
return true;
}
}
return false;
""")
except Exception:
pass
time.sleep(0.4)
phone_input = find_first(page, [
'x://input[@placeholder="请输入手机号码"]',
"css:input.inp-txt",
], timeout=8)
if not phone_input:
raise RuntimeError("未找到手机号输入框")
phone_input.input(phone, clear=True)
agree = find_first(page, [
"css:i.ico-checkbox",
'x://i[contains(@class,"ico-checkbox")]',
], timeout=2)
if agree:
try:
click_safe(agree)
except Exception:
pass
# 必须在点击获取验证码之前启动监听
_ensure_listen_before_action(page)
send_btn = find_first(page, [
"css:button.btn-code",
'x://button[contains(text(),"获取验证码")]',
], timeout=8)
if not send_btn:
raise RuntimeError("未找到「获取验证码」按钮")
click_safe(send_btn)
verify_box = find_first(page, [
"css:.verifybox",
"css:.verify-bar-area",
], timeout=6)
if not verify_box:
raise RuntimeError("未检测到滑块验证码弹窗")
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
if not bg_img or not piece_img or not slider:
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
bg_src = wait_for_data_src(bg_img, timeout=10)
piece_src = wait_for_data_src(piece_img, timeout=10)
bg_bytes = parse_data_url(bg_src)
piece_bytes = parse_data_url(piece_src)
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
raise RuntimeError("验证码图片数据异常")
match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=alpha_threshold)
bg_display_w = page.run_js(
"const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;",
bg_img,
)
if not bg_display_w or bg_display_w <= 0:
bg_display_w = match["bg_width"]
scale = float(bg_display_w) / max(1, match["bg_width"])
move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust)
body = _do_slider_and_wait_response(page, slider, move_distance)
return body
def submit_code(page, code: str) -> dict:
"""
填写短信验证码点击确认订购按钮img.btn-buy
返回操作结果。
"""
code_input = find_first(page, [
'x://input[@placeholder*="验证码"]',
'x://input[@placeholder*="短信"]',
"css:input.inp-txt[type='text']",
"css:input[type='tel']",
"css:.code-input",
"css:input.verify-input",
], timeout=8)
if not code_input:
raise RuntimeError("未找到验证码输入框")
code_input.input(code, clear=True)
time.sleep(0.2)
confirm_btn = find_first(page, [
"css:img.btn-buy",
"css:.btn-buy",
'x://img[contains(@class,"btn-buy")]',
'x://*[contains(@class,"btn-buy")]',
], timeout=5)
if not confirm_btn:
raise RuntimeError("未找到确认订购按钮img.btn-buy")
try:
confirm_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(confirm_btn)
return {"success": True, "message": "已点击确认订购"}