#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 天翼订购页 1.html 服务层: - submit_phone: 填写手机号、获取验证码、滑动滑块,监听并返回 hn_userEquitys/getYanZhenMa/v2 响应 - submit_code: 填写验证码、点击确认订购按钮 """ from __future__ import annotations import time from typing import Any # 复用 tyyp_1html_dp 中的工具函数 from tyyp_1html_dp import ( build_human_track, calc_drag_distance_from_bytes, click_safe, find_first, parse_data_url, wait_for_data_src, _dispatch_mouse, _get_element_center, drag_slider, ) # 需要监听的 URL 特征(滑块通过后前端会请求此接口) GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" def _ensure_listen_before_action(page) -> None: """在点击获取验证码之前启动监听,确保能捕获滑块通过后的 getYanZhenMa 响应。""" page.listen.start(GET_YAN_ZHEN_MA_URL) def _do_slider_and_wait_response(page, slider_ele, move_distance: int, timeout: float = 15) -> Any: """ 执行滑块拖动,然后等待 getYanZhenMa/v2 的响应并返回 response.body。 """ drag_slider(page, slider_ele, move_distance) time.sleep(0.5) packet = page.listen.wait(timeout=timeout, fit_count=False) if packet is False: raise RuntimeError(f"滑块拖动后 {timeout}s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应") if isinstance(packet, list): packet = packet[0] if packet else None if packet is None: raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包") return packet.response.body def submit_phone( page, phone: str, url: str = "http://yscnb.com/tyyp/1.html", port: int = 0, alpha_threshold: int = 12, distance_adjust: int = 0, wait_page: float = 0.3, ) -> Any: """ 填写手机号、点击获取验证码、执行滑块,并返回 getYanZhenMa/v2 接口的响应体。 """ page.get(url) time.sleep(wait_page) # 勾选协议 agree_checkbox = find_first(page, [ "css:#color-input-red", "css:input[name='color-input-red']", 'x://input[@id="color-input-red"]', "css:input.right-box[type='checkbox']", ], timeout=5) if agree_checkbox: click_safe(agree_checkbox) time.sleep(0.4) # 立即订购 order_btn = None for attempt in range(4): order_btn = find_first(page, [ "css:div.paybg", "css:.paybg", 'x://button[contains(.,"立即订购")]', 'x://a[contains(.,"立即订购")]', 'x://span[contains(.,"立即订购")]', 'x://div[contains(.,"立即订购")]', 'x://*[contains(text(),"立即订购")]', 'x://*[contains(.,"立即订购")]', "css:.btn-order", "css:button.btn-primary", "css:button.btn", "css:a.btn", ], timeout=1) if order_btn: break time.sleep(0.25) if order_btn: try: order_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.05) except Exception: pass click_safe(order_btn) time.sleep(0.4) else: try: page.run_js(""" var nodes = document.querySelectorAll('button, a, span, div'); for (var i = 0; i < nodes.length; i++) { var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); if (t.indexOf('立即订购') >= 0) { nodes[i].scrollIntoView({block: 'center'}); nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); return true; } } return false; """) except Exception: pass time.sleep(0.4) phone_input = find_first(page, [ 'x://input[@placeholder="请输入手机号码"]', "css:input.inp-txt", ], timeout=8) if not phone_input: raise RuntimeError("未找到手机号输入框") phone_input.input(phone, clear=True) agree = find_first(page, [ "css:i.ico-checkbox", 'x://i[contains(@class,"ico-checkbox")]', ], timeout=2) if agree: try: click_safe(agree) except Exception: pass # 必须在点击获取验证码之前启动监听 _ensure_listen_before_action(page) send_btn = find_first(page, [ "css:button.btn-code", 'x://button[contains(text(),"获取验证码")]', ], timeout=8) if not send_btn: raise RuntimeError("未找到「获取验证码」按钮") click_safe(send_btn) verify_box = find_first(page, [ "css:.verifybox", "css:.verify-bar-area", ], timeout=6) if not verify_box: raise RuntimeError("未检测到滑块验证码弹窗") bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) slider = find_first(page, ["css:.verify-move-block"], timeout=5) if not bg_img or not piece_img or not slider: raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") bg_src = wait_for_data_src(bg_img, timeout=10) piece_src = wait_for_data_src(piece_img, timeout=10) bg_bytes = parse_data_url(bg_src) piece_bytes = parse_data_url(piece_src) if len(bg_bytes) < 100 or len(piece_bytes) < 100: raise RuntimeError("验证码图片数据异常") match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=alpha_threshold) bg_display_w = page.run_js( "const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;", bg_img, ) if not bg_display_w or bg_display_w <= 0: bg_display_w = match["bg_width"] scale = float(bg_display_w) / max(1, match["bg_width"]) move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust) body = _do_slider_and_wait_response(page, slider, move_distance) return body def submit_code(page, code: str) -> dict: """ 填写短信验证码,点击确认订购按钮(img.btn-buy)。 返回操作结果。 """ code_input = find_first(page, [ 'x://input[@placeholder*="验证码"]', 'x://input[@placeholder*="短信"]', "css:input.inp-txt[type='text']", "css:input[type='tel']", "css:.code-input", "css:input.verify-input", ], timeout=8) if not code_input: raise RuntimeError("未找到验证码输入框") code_input.input(code, clear=True) time.sleep(0.2) confirm_btn = find_first(page, [ "css:img.btn-buy", "css:.btn-buy", 'x://img[contains(@class,"btn-buy")]', 'x://*[contains(@class,"btn-buy")]', ], timeout=5) if not confirm_btn: raise RuntimeError("未找到确认订购按钮(img.btn-buy)") try: confirm_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.05) except Exception: pass click_safe(confirm_btn) return {"success": True, "message": "已点击确认订购"}