228 lines
7.2 KiB
Python
228 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
天翼订购页 1.html 服务层:
|
||
- submit_phone: 填写手机号、获取验证码、滑动滑块,监听并返回 hn_userEquitys/getYanZhenMa/v2 响应
|
||
- submit_code: 填写验证码、点击确认订购按钮
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
from typing import Any
|
||
|
||
# 复用 tyyp_1html_dp 中的工具函数
|
||
from tyyp_1html_dp import (
|
||
build_human_track,
|
||
calc_drag_distance_from_bytes,
|
||
click_safe,
|
||
find_first,
|
||
parse_data_url,
|
||
wait_for_data_src,
|
||
_dispatch_mouse,
|
||
_get_element_center,
|
||
drag_slider,
|
||
)
|
||
|
||
# 需要监听的 URL 特征(滑块通过后前端会请求此接口)
|
||
GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2"
|
||
|
||
|
||
def _ensure_listen_before_action(page) -> None:
|
||
"""在点击获取验证码之前启动监听,确保能捕获滑块通过后的 getYanZhenMa 响应。"""
|
||
page.listen.start(GET_YAN_ZHEN_MA_URL)
|
||
|
||
|
||
def _do_slider_and_wait_response(page, slider_ele, move_distance: int, timeout: float = 15) -> Any:
|
||
"""
|
||
执行滑块拖动,然后等待 getYanZhenMa/v2 的响应并返回 response.body。
|
||
"""
|
||
drag_slider(page, slider_ele, move_distance)
|
||
time.sleep(0.5)
|
||
packet = page.listen.wait(timeout=timeout, fit_count=False)
|
||
if packet is False:
|
||
raise RuntimeError(f"滑块拖动后 {timeout}s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应")
|
||
if isinstance(packet, list):
|
||
packet = packet[0] if packet else None
|
||
if packet is None:
|
||
raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包")
|
||
return packet.response.body
|
||
|
||
|
||
def submit_phone(
|
||
page,
|
||
phone: str,
|
||
url: str = "http://yscnb.com/tyyp/1.html",
|
||
port: int = 0,
|
||
alpha_threshold: int = 12,
|
||
distance_adjust: int = 0,
|
||
wait_page: float = 0.3,
|
||
) -> Any:
|
||
"""
|
||
填写手机号、点击获取验证码、执行滑块,并返回 getYanZhenMa/v2 接口的响应体。
|
||
"""
|
||
page.get(url)
|
||
time.sleep(wait_page)
|
||
|
||
# 勾选协议
|
||
agree_checkbox = find_first(page, [
|
||
"css:#color-input-red",
|
||
"css:input[name='color-input-red']",
|
||
'x://input[@id="color-input-red"]',
|
||
"css:input.right-box[type='checkbox']",
|
||
], timeout=5)
|
||
if agree_checkbox:
|
||
click_safe(agree_checkbox)
|
||
time.sleep(0.4)
|
||
|
||
# 立即订购
|
||
order_btn = None
|
||
for attempt in range(4):
|
||
order_btn = find_first(page, [
|
||
"css:div.paybg",
|
||
"css:.paybg",
|
||
'x://button[contains(.,"立即订购")]',
|
||
'x://a[contains(.,"立即订购")]',
|
||
'x://span[contains(.,"立即订购")]',
|
||
'x://div[contains(.,"立即订购")]',
|
||
'x://*[contains(text(),"立即订购")]',
|
||
'x://*[contains(.,"立即订购")]',
|
||
"css:.btn-order",
|
||
"css:button.btn-primary",
|
||
"css:button.btn",
|
||
"css:a.btn",
|
||
], timeout=1)
|
||
if order_btn:
|
||
break
|
||
time.sleep(0.25)
|
||
|
||
if order_btn:
|
||
try:
|
||
order_btn.run_js("this.scrollIntoView({block:'center'})")
|
||
time.sleep(0.05)
|
||
except Exception:
|
||
pass
|
||
click_safe(order_btn)
|
||
time.sleep(0.4)
|
||
else:
|
||
try:
|
||
page.run_js("""
|
||
var nodes = document.querySelectorAll('button, a, span, div');
|
||
for (var i = 0; i < nodes.length; i++) {
|
||
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
|
||
if (t.indexOf('立即订购') >= 0) {
|
||
nodes[i].scrollIntoView({block: 'center'});
|
||
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
""")
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.4)
|
||
|
||
phone_input = find_first(page, [
|
||
'x://input[@placeholder="请输入手机号码"]',
|
||
"css:input.inp-txt",
|
||
], timeout=8)
|
||
if not phone_input:
|
||
raise RuntimeError("未找到手机号输入框")
|
||
|
||
phone_input.input(phone, clear=True)
|
||
|
||
agree = find_first(page, [
|
||
"css:i.ico-checkbox",
|
||
'x://i[contains(@class,"ico-checkbox")]',
|
||
], timeout=2)
|
||
if agree:
|
||
try:
|
||
click_safe(agree)
|
||
except Exception:
|
||
pass
|
||
|
||
# 必须在点击获取验证码之前启动监听
|
||
_ensure_listen_before_action(page)
|
||
|
||
send_btn = find_first(page, [
|
||
"css:button.btn-code",
|
||
'x://button[contains(text(),"获取验证码")]',
|
||
], timeout=8)
|
||
if not send_btn:
|
||
raise RuntimeError("未找到「获取验证码」按钮")
|
||
|
||
click_safe(send_btn)
|
||
|
||
verify_box = find_first(page, [
|
||
"css:.verifybox",
|
||
"css:.verify-bar-area",
|
||
], timeout=6)
|
||
if not verify_box:
|
||
raise RuntimeError("未检测到滑块验证码弹窗")
|
||
|
||
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
|
||
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
|
||
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
|
||
|
||
if not bg_img or not piece_img or not slider:
|
||
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
|
||
|
||
bg_src = wait_for_data_src(bg_img, timeout=10)
|
||
piece_src = wait_for_data_src(piece_img, timeout=10)
|
||
bg_bytes = parse_data_url(bg_src)
|
||
piece_bytes = parse_data_url(piece_src)
|
||
|
||
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
|
||
raise RuntimeError("验证码图片数据异常")
|
||
|
||
match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=alpha_threshold)
|
||
bg_display_w = page.run_js(
|
||
"const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;",
|
||
bg_img,
|
||
)
|
||
if not bg_display_w or bg_display_w <= 0:
|
||
bg_display_w = match["bg_width"]
|
||
scale = float(bg_display_w) / max(1, match["bg_width"])
|
||
move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust)
|
||
|
||
body = _do_slider_and_wait_response(page, slider, move_distance)
|
||
return body
|
||
|
||
|
||
def submit_code(page, code: str) -> dict:
|
||
"""
|
||
填写短信验证码,点击确认订购按钮(img.btn-buy)。
|
||
返回操作结果。
|
||
"""
|
||
code_input = find_first(page, [
|
||
'x://input[@placeholder*="验证码"]',
|
||
'x://input[@placeholder*="短信"]',
|
||
"css:input.inp-txt[type='text']",
|
||
"css:input[type='tel']",
|
||
"css:.code-input",
|
||
"css:input.verify-input",
|
||
], timeout=8)
|
||
if not code_input:
|
||
raise RuntimeError("未找到验证码输入框")
|
||
|
||
code_input.input(code, clear=True)
|
||
time.sleep(0.2)
|
||
|
||
confirm_btn = find_first(page, [
|
||
"css:img.btn-buy",
|
||
"css:.btn-buy",
|
||
'x://img[contains(@class,"btn-buy")]',
|
||
'x://*[contains(@class,"btn-buy")]',
|
||
], timeout=5)
|
||
if not confirm_btn:
|
||
raise RuntimeError("未找到确认订购按钮(img.btn-buy)")
|
||
|
||
try:
|
||
confirm_btn.run_js("this.scrollIntoView({block:'center'})")
|
||
time.sleep(0.05)
|
||
except Exception:
|
||
pass
|
||
|
||
click_safe(confirm_btn)
|
||
|
||
return {"success": True, "message": "已点击确认订购"}
|