#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 天翼云盘订购页 http://yscnb.com/tyyp/ 自动化脚本(全部代码自包含) 自动化流程: 1. 打开页面 2. 勾选协议 3. 点击「立即订购」 4. 输入手机号 5. 点击「获取验证码」 6. 解析拼图验证码并执行滑块拖动 7. 监听 hn_userEquitys/getYanZhenMa/v2 响应并输出 用法:python test1.py [--phone 手机号] 调试时可在代码中修改 DEFAULT_PHONE """ from __future__ import annotations import argparse import json from typing import Any, Optional import base64 import random import time from io import BytesIO from datetime import datetime from pathlib import Path import numpy as np from PIL import Image, ImageDraw from DrissionPage import ChromiumPage, ChromiumOptions # ========== 调试用:在代码中直接指定手机号 ========== DEFAULT_PHONE = "17375712810" # 需要监听的 URL 特征(滑块通过后前端会请求此接口) GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" # ---------- 拼图验证码计算 ---------- def _to_rgba_array(image_bytes: bytes) -> np.ndarray: return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16) def _to_rgb_array(image_bytes: bytes) -> np.ndarray: return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16) def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]: mask = alpha > threshold if not mask.any(): raise ValueError("拼图块 alpha 全透明,无法匹配") ys, xs = np.where(mask) return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 def _to_gray(rgb: np.ndarray) -> np.ndarray: return ( rgb[:, :, 0] * 0.299 + rgb[:, :, 1] * 0.587 + rgb[:, :, 2] * 0.114 ).astype(np.float32) def _grad_x(gray: np.ndarray) -> np.ndarray: pad = np.pad(gray, ((0, 0), (1, 1)), mode="edge") return np.abs(pad[:, 2:] - pad[:, :-2]) * 0.5 def calc_drag_distance_from_bytes( bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12 ) -> dict: """计算拼图目标位移,输出多个候选位移提高滑块命中率。""" bg = _to_rgb_array(bg_bytes).astype(np.float32) piece_rgba = _to_rgba_array(piece_bytes).astype(np.float32) bh, bw = bg.shape[:2] ph, _ = piece_rgba.shape[:2] if bh != ph: raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}") alpha = piece_rgba[:, :, 3] x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold) piece_crop = piece_rgba[y0:y1, x0:x1, :3] alpha_crop = alpha[y0:y1, x0:x1] mask = alpha_crop > alpha_threshold ys, xs = np.where(mask) if len(xs) < 10: raise ValueError("拼图有效像素过少,无法稳定匹配") piece_gray = _to_gray(piece_crop) bg_gray = _to_gray(bg) piece_grad = _grad_x(piece_gray) bg_grad = _grad_x(bg_gray) piece_gray_pixels = piece_gray[ys, xs] piece_grad_pixels = piece_grad[ys, xs] weights = np.clip(alpha_crop[ys, xs].astype(np.float32), 1.0, 255.0) weights = weights / float(weights.sum()) patch_h, patch_w = piece_crop.shape[:2] if patch_w > bw or patch_h > bh: raise ValueError("拼图块裁剪尺寸超过背景图") max_x = bw - patch_w scores: list[tuple[float, int, float, float]] = [] for x in range(max_x + 1): patch_gray_pixels = bg_gray[y0 + ys, x + xs] patch_grad_pixels = bg_grad[y0 + ys, x + xs] color_score = float((np.abs(patch_gray_pixels - piece_gray_pixels) * weights).sum()) grad_score = float((np.abs(patch_grad_pixels - piece_grad_pixels) * weights).sum()) # 颜色 + 边缘加权,较纯 RGB 差值在压缩噪声下更稳定 score = color_score * 0.72 + grad_score * 0.28 scores.append((score, x, color_score, grad_score)) scores.sort(key=lambda item: item[0]) best_score, best_x, best_color_score, best_grad_score = scores[0] second_best = scores[1][0] if len(scores) > 1 else float("inf") drag_distance = best_x - x0 confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf") candidate_xs: list[int] = [] for _score, x, _color, _grad in scores: if any(abs(x - picked) <= 1 for picked in candidate_xs): continue candidate_xs.append(int(x)) if len(candidate_xs) >= 5: break candidate_distances = [int(x - x0) for x in candidate_xs] return { "target_x": int(best_x), "piece_bbox_x0": int(x0), "piece_bbox_y0": int(y0), "piece_bbox_w": int(patch_w), "piece_bbox_h": int(patch_h), "bg_width": int(bw), "bg_height": int(bh), "drag_distance": int(drag_distance), "best_score": best_score, "best_color_score": best_color_score, "best_grad_score": best_grad_score, "second_best": second_best, "confidence_ratio": confidence_ratio, "candidate_target_xs": candidate_xs, "candidate_drag_distances": candidate_distances, } def parse_data_url(data_url: str) -> bytes: if not data_url.startswith("data:image"): raise ValueError("图片不是 data:image URL") _, data = data_url.split(",", 1) return base64.b64decode(data) # ---------- 元素查找与点击 ---------- def click_safe(ele) -> None: try: ele.click() return except Exception: pass ele.click(by_js=True) def find_first(page, selectors: list[str], timeout: float = 5): for sel in selectors: try: ele = page.ele(sel, timeout=timeout) if ele: return ele except Exception: continue return None def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str: """轮询等待 img 元素的 src 变为有效 data:image URL。""" deadline = time.time() + timeout while time.time() < deadline: src = img_ele.attr("src") or "" if src.startswith("data:image"): _prefix, _, b64 = src.partition(",") if b64.strip(): return src time.sleep(interval) raise RuntimeError( f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}" ) def _is_element_visible(page, selector: str) -> bool: try: visible = page.run_js( """ const el = document.querySelector(arguments[0]); if (!el) return false; const st = window.getComputedStyle(el); if (st.display === 'none' || st.visibility === 'hidden' || st.opacity === '0') return false; const r = el.getBoundingClientRect(); return r.width > 0 && r.height > 0; """, selector, ) return bool(visible) except Exception: return False def _read_slider_toast_text(page) -> str: try: text = page.run_js( """ const node = document.querySelector('.van-toast--text .van-toast__text') || document.querySelector('.van-toast__text'); if (!node) return ''; const popup = node.closest('.van-toast') || node; const st = window.getComputedStyle(popup); const visible = st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0'; if (!visible) return ''; return (node.innerText || node.textContent || '').trim(); """ ) return (text or "").strip() except Exception: return "" def _get_slider_layout_metrics(page, bg_img, slider_ele) -> dict: try: metrics = page.run_js( """ const bg = arguments[0]; const slider = arguments[1]; const bar = document.querySelector('.verify-bar-area'); const sub = slider ? slider.querySelector('.verify-sub-block') : null; const bgR = bg ? bg.getBoundingClientRect() : {width: 0}; const sliderR = slider ? slider.getBoundingClientRect() : {width: 0}; const barR = bar ? bar.getBoundingClientRect() : {width: 0}; const subR = sub ? sub.getBoundingClientRect() : {width: 0}; return { bg_display_w: Number(bgR.width || 0), slider_w: Number(sliderR.width || 0), bar_w: Number(barR.width || 0), sub_w: Number(subR.width || 0), }; """, bg_img, slider_ele, ) except Exception: metrics = None if not isinstance(metrics, dict): metrics = {} bar_w = float(metrics.get("bar_w", 0) or 0) slider_w = float(metrics.get("slider_w", 0) or 0) track_max = int(round(max(0.0, bar_w - slider_w))) if bar_w > 0 and slider_w > 0 else -1 return { "bg_display_w": float(metrics.get("bg_display_w", 0) or 0), "slider_w": slider_w, "bar_w": bar_w, "sub_w": float(metrics.get("sub_w", 0) or 0), "track_max": track_max, } def _build_move_distance_candidates( match: dict, scale: float, distance_adjust: int, track_max: int, ) -> list[int]: base_distances = match.get("candidate_drag_distances") or [int(match["drag_distance"])] micro_offsets = (0, -1, 1, -2, 2, -3, 3) candidates: list[int] = [] seen: set[int] = set() for base in base_distances[:4]: scaled = int(round(int(base) * scale)) + int(distance_adjust) for offset in micro_offsets: val = scaled + offset if track_max >= 0: val = max(0, min(track_max, val)) if val not in seen: seen.add(val) candidates.append(val) if not candidates: fallback = int(round(int(match["drag_distance"]) * scale)) + int(distance_adjust) if track_max >= 0: fallback = max(0, min(track_max, fallback)) candidates = [fallback] return candidates def _normalize_listen_packet(packet): if packet is False: return None if isinstance(packet, list): return packet[0] if packet else None return packet def _coerce_json_body(body: Any) -> Any: if not isinstance(body, str): return body raw = body.strip() if not raw: return body if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")): try: return json.loads(raw) except Exception: return body return body def _code_indicates_success(value: Any) -> Optional[bool]: if value is None: return None if isinstance(value, bool): return value if isinstance(value, (int, float)): return int(value) in (0, 1, 200) text = str(value).strip() if not text: return None lower = text.lower() if lower in {"0", "00", "000", "0000", "1", "200", "ok", "success", "true"}: return True if lower in {"false", "fail", "failed", "error", "err"}: return False if lower.lstrip("-").isdigit(): return int(lower) in (0, 1, 200) return False def _to_int_or_none(value: Any) -> Optional[int]: try: if value is None: return None if isinstance(value, bool): return int(value) if isinstance(value, (int, float)): return int(value) text = str(value).strip() if not text: return None if text.lstrip("-").isdigit(): return int(text) return None except Exception: return None def _extract_interface_message(data: dict) -> str: for key in ("msg", "message", "error", "errorMsg", "detail", "retMsg"): value = data.get(key) if isinstance(value, str) and value.strip(): return value.strip() return "" def _is_sms_already_sent_response(data: dict, message: str) -> bool: code_keys = ("code", "retCode", "resultCode", "statusCode", "errno") code_val = None for key in code_keys: if key in data: code_val = _to_int_or_none(data.get(key)) if code_val is not None: break if code_val != 1001: return False text = (message or "").strip() if not text: return False # 这类文案通常表示验证码已下发或短信频控,不应判定为滑块失败。 hints = ("验证码已发送", "短信验证码已发送", "请稍后重试", "请稍等") return any(h in text for h in hints) def _assert_interface_success(body: Any) -> None: data = _coerce_json_body(body) if not isinstance(data, dict): return fail_reasons: list[str] = [] message = _extract_interface_message(data) is_soft_success = _is_sms_already_sent_response(data, message) for key in ("success", "ok"): if key in data and not bool(data[key]): fail_reasons.append(f"{key}={data[key]}") for key in ("code", "retCode", "resultCode", "statusCode", "errno"): if key not in data: continue code_ok = _code_indicates_success(data[key]) if code_ok is False: if is_soft_success: continue fail_reasons.append(f"{key}={data[key]}") if message: lowered = message.lower() fail_words = ("失败", "错误", "无效", "fail", "error", "invalid", "请重新", "未通过") if any(word in lowered for word in fail_words): if is_soft_success: return fail_reasons.append(f"msg={message}") if fail_reasons: preview = str(data) if len(preview) > 300: preview = preview[:300] + "..." raise RuntimeError(f"接口返回失败: {', '.join(fail_reasons)}; body={preview}") def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]: deadline = time.time() + timeout last_toast = "" while time.time() < deadline: wait_span = min(0.35, max(0.05, deadline - time.time())) packet = _normalize_listen_packet(page.listen.wait(timeout=wait_span, fit_count=False)) if packet is not None: return packet, last_toast toast = _read_slider_toast_text(page) if toast: last_toast = toast return None, last_toast def _to_jsonable(value: Any) -> Any: if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, dict): return {str(k): _to_jsonable(v) for k, v in value.items()} if isinstance(value, (list, tuple, set)): return [_to_jsonable(v) for v in value] return str(value) def _analyze_slider_failure(match: dict, layout: dict, attempts: list[dict], last_reason: str) -> dict: hypotheses: list[str] = [] confidence_ratio = float(match.get("confidence_ratio", 0) or 0) if confidence_ratio < 1.03: hypotheses.append("拼图匹配置信度偏低,背景与缺口特征区分不明显,易出现位移误差。") toast_texts = [str(a.get("toast") or "") for a in attempts if a.get("toast")] if any("验证码错误" in t for t in toast_texts): hypotheses.append("前端返回“验证码错误”,通常是最终落点偏差或拖动轨迹被风控识别。") interface_errors = [str(a.get("interface_error") or "") for a in attempts if a.get("interface_error")] if interface_errors: hypotheses.append("接口已返回失败状态,说明滑块校验请求已发出但服务端未判定通过。") packet_count = sum(1 for a in attempts if a.get("packet_received")) if attempts and packet_count == 0: hypotheses.append("多次拖动均未捕获到目标接口,请检查接口监听地址是否变化。") if attempts and bool(attempts[-1].get("verify_bar_visible_after")): hypotheses.append("验证码弹窗仍可见,当前 challenge 未完成。") boundary_hits = sum(1 for a in attempts if a.get("boundary_hit")) if boundary_hits > 0: hypotheses.append("拖动距离命中轨道边界,存在位移被截断风险。") if not hypotheses and last_reason: hypotheses.append(last_reason) if not hypotheses: hypotheses.append("未识别出单一原因,建议查看保存的标注图和报告。") return { "summary": hypotheses[0], "hypotheses": hypotheses, "metrics": { "confidence_ratio": confidence_ratio, "best_score": match.get("best_score"), "second_best": match.get("second_best"), "attempt_count": len(attempts), "packet_count": packet_count, "track_max": layout.get("track_max"), }, } def _save_failure_artifacts( page, bg_bytes: bytes, piece_bytes: bytes, match: dict, layout: dict, move_distances: list[int], attempts: list[dict], last_reason: str, analysis: dict, ) -> Path: root = Path(__file__).resolve().parent / "captcha_failures" case_dir = root / datetime.now().strftime("%Y%m%d_%H%M%S_%f") case_dir.mkdir(parents=True, exist_ok=True) (case_dir / "bg.png").write_bytes(bg_bytes) (case_dir / "piece.png").write_bytes(piece_bytes) try: page.get_screenshot(path=str(case_dir / "page.png")) except Exception: pass try: bg_img = Image.open(BytesIO(bg_bytes)).convert("RGB") draw = ImageDraw.Draw(bg_img) y0 = int(match.get("piece_bbox_y0", 0) or 0) h = int(match.get("piece_bbox_h", 0) or 0) w = int(match.get("piece_bbox_w", 0) or 0) candidate_xs = match.get("candidate_target_xs") or [match.get("target_x", 0)] colors = [(255, 0, 0), (255, 140, 0), (0, 128, 255), (0, 180, 80), (160, 80, 255)] for idx, x in enumerate(candidate_xs[:5], start=1): color = colors[(idx - 1) % len(colors)] x = int(x) draw.rectangle([x, y0, x + w, y0 + h], outline=color, width=2 if idx == 1 else 1) draw.text((x + 2, max(0, y0 - 14)), f"#{idx}", fill=color) bg_img.save(case_dir / "bg_overlay.png") except Exception: pass report = { "created_at": datetime.now().isoformat(timespec="seconds"), "failure_reason": last_reason, "analysis": analysis, "match": _to_jsonable(match), "layout": _to_jsonable(layout), "move_distances": move_distances, "attempts": _to_jsonable(attempts), } (case_dir / "report.json").write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8", ) return case_dir # ---------- 滑块拖动(仿人轨迹) ---------- def _ease_out_quad(t: float) -> float: return t * (2 - t) def _ease_out_cubic(t: float) -> float: return 1 - (1 - t) ** 3 def _ease_out_bounce(t: float) -> float: if t < 1 / 2.75: return 7.5625 * t * t elif t < 2 / 2.75: t -= 1.5 / 2.75 return 7.5625 * t * t + 0.75 elif t < 2.5 / 2.75: t -= 2.25 / 2.75 return 7.5625 * t * t + 0.9375 else: t -= 2.625 / 2.75 return 7.5625 * t * t + 0.984375 def build_human_track(distance: int, num_steps: int = 0) -> list[dict]: """生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。""" if distance == 0: return [] dist = abs(distance) sign = 1 if distance > 0 else -1 if num_steps <= 0: num_steps = max(12, int(dist * random.uniform(0.25, 0.4))) overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08))) total = dist + overshoot easing = random.choice([_ease_out_quad, _ease_out_cubic]) raw_positions: list[float] = [] for i in range(1, num_steps + 1): t = i / num_steps raw_positions.append(easing(t) * total) bounce_steps = random.randint(2, 4) for j in range(1, bounce_steps + 1): t = j / bounce_steps raw_positions.append(total - _ease_out_bounce(t) * overshoot) track: list[dict] = [] prev_x = 0.0 for pos in raw_positions: dx = round(pos - prev_x) if dx == 0 and random.random() < 0.3: continue prev_x += dx dy = random.choice([-1, 0, 0, 0, 1]) dt = ( random.uniform(0.005, 0.012) if prev_x < dist * 0.6 else random.uniform(0.008, 0.025) ) if random.random() < 0.03: dt += random.uniform(0.02, 0.06) track.append({"dx": sign * dx, "dy": dy, "dt": dt}) actual = sum(s["dx"] for s in track) diff = distance - actual if diff != 0: track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)}) return track def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None: page.run_cdp( "Input.dispatchMouseEvent", type=event_type, x=x, y=y, button=button, clickCount=1 if event_type == "mousePressed" else 0, ) def _get_element_center(page, ele) -> tuple[int, int]: rect = page.run_js( """const r = arguments[0].getBoundingClientRect(); return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""", ele, ) if rect and isinstance(rect, dict): return int(rect["x"]), int(rect["y"]) loc = ele.rect.midpoint return int(loc[0]), int(loc[1]) def drag_slider(page, slider_ele, distance: int) -> None: """用 CDP 级鼠标事件完成拖拽,模拟真人操作。""" cx, cy = _get_element_center(page, slider_ele) _dispatch_mouse(page, "mouseMoved", cx, cy) time.sleep(random.uniform(0.03, 0.08)) _dispatch_mouse(page, "mousePressed", cx, cy) time.sleep(random.uniform(0.02, 0.06)) cur_x, cur_y = cx, cy track = build_human_track(distance) for step in track: cur_x += step["dx"] cur_y += step["dy"] _dispatch_mouse(page, "mouseMoved", cur_x, cur_y) time.sleep(step["dt"]) time.sleep(random.uniform(0.02, 0.06)) _dispatch_mouse(page, "mouseReleased", cur_x, cur_y) # ---------- 核心自动化流程 ---------- def submit_phone( page, phone: str, url: str = "http://yscnb.com/tyyp/", alpha_threshold: int = 12, distance_adjust: int = 0, wait_page: float = 0.12, ) -> Any: """ 填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。 """ page.get(url) time.sleep(wait_page) # 1. 勾选协议(1.html: #color-input-red) agree_checkbox = find_first( page, [ "css:#color-input-red", "css:input[name='color-input-red']", 'x://input[@id="color-input-red"]', "css:input.right-box[type='checkbox']", ], timeout=5, ) if agree_checkbox: click_safe(agree_checkbox) time.sleep(0.03) # 2. 立即订购(1.html: .paybg 手机端首选) order_btn = find_first( page, [ "css:.paybg", "css:div.paybg", 'x://button[contains(.,"立即订购")]', 'x://a[contains(.,"立即订购")]', 'x://span[contains(.,"立即订购")]', 'x://div[contains(.,"立即订购")]', 'x://*[contains(text(),"立即订购")]', 'x://*[contains(.,"立即订购")]', "css:.btn-order", "css:button.btn-primary", "css:button.btn", "css:a.btn", ], timeout=3, ) if order_btn: try: order_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.01) except Exception: pass click_safe(order_btn) time.sleep(0.05) else: # 兜底:1.html 的 .paybg 为 div,用 JS 直接点 try: page.run_js(""" var btn = document.querySelector('.paybg'); if (btn) { btn.scrollIntoView({block:'center'}); btn.click(); return true; } var nodes = document.querySelectorAll('button, a, span, div'); for (var i = 0; i < nodes.length; i++) { var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); if (t.indexOf('立即订购') >= 0) { nodes[i].scrollIntoView({block: 'center'}); nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); return true; } } return false; """) except Exception: pass time.sleep(0.05) # 3. 输入手机号 phone_input = find_first( page, [ 'x://*[@placeholder="请输入手机号码"]', "css:input.inp-txt", ], timeout=8, ) if not phone_input: raise RuntimeError("未找到手机号输入框") phone_input.input(phone, clear=True) agree = find_first( page, [ "css:i.ico-checkbox", 'x://i[contains(@class,"ico-checkbox")]', ], timeout=2, ) if agree: try: click_safe(agree) except Exception: pass # 4. 启动监听(必须在点击获取验证码之前) page.listen.start(GET_YAN_ZHEN_MA_URL) send_btn = find_first( page, [ "css:button.btn-code", 'x://button[contains(text(),"获取验证码")]', ], timeout=8, ) if not send_btn: raise RuntimeError("未找到「获取验证码」按钮") click_safe(send_btn) # 5. 等待滑块弹窗 verify_box = find_first( page, ["css:.verifybox", "css:.verify-bar-area"], timeout=6, ) if not verify_box: raise RuntimeError("未检测到滑块验证码弹窗") bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) slider = find_first(page, ["css:.verify-move-block"], timeout=5) if not bg_img or not piece_img or not slider: raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") # 6. 每次重试前重新抓取验证码图片并重算位移(不复用旧距离) deadline = time.time() + 15.0 last_reason = "" attempts: list[dict[str, Any]] = [] max_attempts = 6 last_bg_bytes = b"" last_piece_bytes = b"" last_match: dict[str, Any] = {} last_layout: dict[str, Any] = {} last_move_distances: list[int] = [] for idx in range(1, max_attempts + 1): if time.time() >= deadline: break bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=2) piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=2) slider = find_first(page, ["css:.verify-move-block"], timeout=2) attempt: dict[str, Any] = { "index": idx, "move_distance": None, "boundary_hit": False, "packet_received": False, "toast": "", "interface_error": "", "verify_bar_visible_after": False, } if not bg_img or not piece_img or not slider: last_reason = "重试时未找到验证码关键元素(背景图/拼图块/滑块)" attempt["interface_error"] = last_reason attempts.append(attempt) break try: bg_src = wait_for_data_src(bg_img, timeout=4) piece_src = wait_for_data_src(piece_img, timeout=4) bg_bytes = parse_data_url(bg_src) piece_bytes = parse_data_url(piece_src) except Exception as e: last_reason = f"重试时读取验证码图片失败: {e}" attempt["interface_error"] = last_reason attempts.append(attempt) continue if len(bg_bytes) < 100 or len(piece_bytes) < 100: last_reason = f"验证码图片数据异常: bg={len(bg_bytes)}B piece={len(piece_bytes)}B" attempt["interface_error"] = last_reason attempts.append(attempt) continue try: match = calc_drag_distance_from_bytes( bg_bytes, piece_bytes, alpha_threshold=alpha_threshold ) except Exception as e: last_reason = f"重试时位移计算失败: {e}" attempt["interface_error"] = last_reason attempts.append(attempt) continue layout = _get_slider_layout_metrics(page, bg_img, slider) bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"] scale = float(bg_display_w) / max(1, match["bg_width"]) track_max = int(layout["track_max"]) move_distances = _build_move_distance_candidates( match=match, scale=scale, distance_adjust=int(distance_adjust), track_max=track_max, ) pick_index = min(max(0, idx - 1), len(move_distances) - 1) move_distance = int(move_distances[pick_index]) attempt["move_distance"] = move_distance attempt["boundary_hit"] = bool(track_max >= 0 and move_distance in (0, track_max)) attempt["confidence_ratio"] = float(match.get("confidence_ratio", 0) or 0) attempt["candidate_count"] = len(move_distances) # 记录最后一次有效计算,用于失败样本保存与分析 last_bg_bytes = bg_bytes last_piece_bytes = piece_bytes last_match = match last_layout = layout last_move_distances = move_distances drag_slider(page, slider, move_distance) time.sleep(0.1) remaining = max(0.1, deadline - time.time()) packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining)) attempt["toast"] = toast_text or "" if packet is not None: attempt["packet_received"] = True response = getattr(packet, "response", None) if response is None: attempt["interface_error"] = f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段" last_reason = attempt["interface_error"] attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") attempts.append(attempt) if not attempt["verify_bar_visible_after"]: break continue body = _coerce_json_body(response.body) attempt["response_body"] = _to_jsonable(body) try: _assert_interface_success(body) return body except Exception as e: attempt["interface_error"] = str(e) last_reason = attempt["interface_error"] attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") attempts.append(attempt) if not attempt["verify_bar_visible_after"]: break continue if toast_text: last_reason = f"滑块验证失败:{toast_text}" elif _is_element_visible(page, ".verify-bar-area"): last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}" else: last_reason = "验证码弹窗已关闭但未捕获接口响应" attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") attempt["interface_error"] = last_reason if not attempt["interface_error"] else attempt["interface_error"] attempts.append(attempt) analysis = _analyze_slider_failure( match=last_match, layout=last_layout, attempts=attempts, last_reason=last_reason, ) save_info = "" try: case_dir = _save_failure_artifacts( page=page, bg_bytes=last_bg_bytes, piece_bytes=last_piece_bytes, match=last_match, layout=last_layout, move_distances=last_move_distances, attempts=attempts, last_reason=last_reason, analysis=analysis, ) save_info = f"; 失败样本目录: {case_dir}" except Exception as e: save_info = f"; 失败样本保存失败: {e}" raise RuntimeError( f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}" f"; 分析: {analysis.get('summary', '无')}" f"{save_info}" ) def submit_code(page, code: str) -> dict: """填写短信验证码,点击确认订购按钮。""" code_input = find_first( page, [ 'x://input[@placeholder*="验证码"]', 'x://input[@placeholder*="短信"]', "css:input.inp-txt[type='text']", "css:input[type='tel']", "css:.code-input", "css:input.verify-input", ], timeout=8, ) if not code_input: raise RuntimeError("未找到验证码输入框") code_input.input(code, clear=True) time.sleep(0.2) confirm_btn = find_first( page, [ "css:img.btn-buy", "css:.btn-buy", 'x://img[contains(@class,"btn-buy")]', 'x://*[contains(@class,"btn-buy")]', ], timeout=5, ) if not confirm_btn: raise RuntimeError("未找到确认订购按钮(img.btn-buy)") try: confirm_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.05) except Exception: pass click_safe(confirm_btn) return {"success": True, "message": "已点击确认订购"} # ---------- 入口 ---------- def main() -> None: parser = argparse.ArgumentParser(description="天翼云盘订购页自动化") parser.add_argument( "--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)" ) parser.add_argument("--url", default="http://yscnb.com/tyyp/", help="目标页面 URL") parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建") args = parser.parse_args() if args.port: co = ChromiumOptions().set_local_port(port=args.port) page = ChromiumPage(addr_or_opts=co) else: page = ChromiumPage() print(f"打开页面: {args.url},手机号: {args.phone}") body = submit_phone( page=page, phone=args.phone, url=args.url, ) print("hn_userEquitys/getYanZhenMa/v2 响应:") print(body) input_code(page=page, code=123123) def input_code(page, code): """填写验证码并点击确认,监听 hn_userEquitys/common/order 并返回抓包内容。""" page.ele('x://input[@placeholder="请输入验证码"]').input(code, clear=True) time.sleep(0.12) page.listen.start("hn_userEquitys/common/order") page.ele('x://*[@id="app"]/div/img').click(by_js=True) time.sleep(0.12) page.ele('x://*[@id="app"]/div/div[7]/div/div[3]/button[2]').click(by_js=True) res = page.listen.wait() body = getattr(getattr(res, "response", None), "body", None) return body if __name__ == "__main__": main()