Files
fws_code/tyyp_service.py

228 lines
7.2 KiB
Python
Raw Permalink Normal View History

2026-02-26 01:46:57 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
天翼订购页 1.html 服务层
- submit_phone: 填写手机号获取验证码滑动滑块监听并返回 hn_userEquitys/getYanZhenMa/v2 响应
- submit_code: 填写验证码点击确认订购按钮
"""
from __future__ import annotations
import time
from typing import Any
# 复用 tyyp_1html_dp 中的工具函数
from tyyp_1html_dp import (
build_human_track,
calc_drag_distance_from_bytes,
click_safe,
find_first,
parse_data_url,
wait_for_data_src,
_dispatch_mouse,
_get_element_center,
drag_slider,
)
# 需要监听的 URL 特征(滑块通过后前端会请求此接口)
GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2"
def _ensure_listen_before_action(page) -> None:
"""在点击获取验证码之前启动监听,确保能捕获滑块通过后的 getYanZhenMa 响应。"""
page.listen.start(GET_YAN_ZHEN_MA_URL)
def _do_slider_and_wait_response(page, slider_ele, move_distance: int, timeout: float = 15) -> Any:
"""
执行滑块拖动然后等待 getYanZhenMa/v2 的响应并返回 response.body
"""
drag_slider(page, slider_ele, move_distance)
time.sleep(0.5)
packet = page.listen.wait(timeout=timeout, fit_count=False)
if packet is False:
raise RuntimeError(f"滑块拖动后 {timeout}s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应")
if isinstance(packet, list):
packet = packet[0] if packet else None
if packet is None:
raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包")
return packet.response.body
def submit_phone(
page,
phone: str,
url: str = "http://yscnb.com/tyyp/1.html",
port: int = 0,
alpha_threshold: int = 12,
distance_adjust: int = 0,
wait_page: float = 0.3,
) -> Any:
"""
填写手机号点击获取验证码执行滑块并返回 getYanZhenMa/v2 接口的响应体
"""
page.get(url)
time.sleep(wait_page)
# 勾选协议
agree_checkbox = find_first(page, [
"css:#color-input-red",
"css:input[name='color-input-red']",
'x://input[@id="color-input-red"]',
"css:input.right-box[type='checkbox']",
], timeout=5)
if agree_checkbox:
click_safe(agree_checkbox)
time.sleep(0.4)
# 立即订购
order_btn = None
for attempt in range(4):
order_btn = find_first(page, [
"css:div.paybg",
"css:.paybg",
'x://button[contains(.,"立即订购")]',
'x://a[contains(.,"立即订购")]',
'x://span[contains(.,"立即订购")]',
'x://div[contains(.,"立即订购")]',
'x://*[contains(text(),"立即订购")]',
'x://*[contains(.,"立即订购")]',
"css:.btn-order",
"css:button.btn-primary",
"css:button.btn",
"css:a.btn",
], timeout=1)
if order_btn:
break
time.sleep(0.25)
if order_btn:
try:
order_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(order_btn)
time.sleep(0.4)
else:
try:
page.run_js("""
var nodes = document.querySelectorAll('button, a, span, div');
for (var i = 0; i < nodes.length; i++) {
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
if (t.indexOf('立即订购') >= 0) {
nodes[i].scrollIntoView({block: 'center'});
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
return true;
}
}
return false;
""")
except Exception:
pass
time.sleep(0.4)
phone_input = find_first(page, [
'x://input[@placeholder="请输入手机号码"]',
"css:input.inp-txt",
], timeout=8)
if not phone_input:
raise RuntimeError("未找到手机号输入框")
phone_input.input(phone, clear=True)
agree = find_first(page, [
"css:i.ico-checkbox",
'x://i[contains(@class,"ico-checkbox")]',
], timeout=2)
if agree:
try:
click_safe(agree)
except Exception:
pass
# 必须在点击获取验证码之前启动监听
_ensure_listen_before_action(page)
send_btn = find_first(page, [
"css:button.btn-code",
'x://button[contains(text(),"获取验证码")]',
], timeout=8)
if not send_btn:
raise RuntimeError("未找到「获取验证码」按钮")
click_safe(send_btn)
verify_box = find_first(page, [
"css:.verifybox",
"css:.verify-bar-area",
], timeout=6)
if not verify_box:
raise RuntimeError("未检测到滑块验证码弹窗")
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
if not bg_img or not piece_img or not slider:
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
bg_src = wait_for_data_src(bg_img, timeout=10)
piece_src = wait_for_data_src(piece_img, timeout=10)
bg_bytes = parse_data_url(bg_src)
piece_bytes = parse_data_url(piece_src)
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
raise RuntimeError("验证码图片数据异常")
match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=alpha_threshold)
bg_display_w = page.run_js(
"const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;",
bg_img,
)
if not bg_display_w or bg_display_w <= 0:
bg_display_w = match["bg_width"]
scale = float(bg_display_w) / max(1, match["bg_width"])
move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust)
body = _do_slider_and_wait_response(page, slider, move_distance)
return body
def submit_code(page, code: str) -> dict:
"""
填写短信验证码点击确认订购按钮img.btn-buy
返回操作结果
"""
code_input = find_first(page, [
'x://input[@placeholder*="验证码"]',
'x://input[@placeholder*="短信"]',
"css:input.inp-txt[type='text']",
"css:input[type='tel']",
"css:.code-input",
"css:input.verify-input",
], timeout=8)
if not code_input:
raise RuntimeError("未找到验证码输入框")
code_input.input(code, clear=True)
time.sleep(0.2)
confirm_btn = find_first(page, [
"css:img.btn-buy",
"css:.btn-buy",
'x://img[contains(@class,"btn-buy")]',
'x://*[contains(@class,"btn-buy")]',
], timeout=5)
if not confirm_btn:
raise RuntimeError("未找到确认订购按钮img.btn-buy")
try:
confirm_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(confirm_btn)
return {"success": True, "message": "已点击确认订购"}