#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 天翼云盘订购页 http://yscnb.com/tyyp/ 自动化脚本(全部代码自包含) 自动化流程: 1. 打开页面 2. 勾选协议 3. 点击「立即订购」 4. 输入手机号 5. 点击「获取验证码」 6. 解析拼图验证码并执行滑块拖动 7. 监听 hn_userEquitys/getYanZhenMa/v2 响应并输出 用法:python test1.py [--phone 手机号] 调试时可在代码中修改 DEFAULT_PHONE """ from __future__ import annotations import argparse import json import re import threading import uuid from typing import Any, Optional import base64 import random import time from io import BytesIO from datetime import datetime from pathlib import Path import numpy as np from PIL import Image, ImageDraw import requests from DrissionPage import ChromiumPage, ChromiumOptions from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from tgebrowser_client import TgeBrowserClient # ========== 调试用:在代码中直接指定手机号 ========== DEFAULT_PHONE = "17375712810" # 需要监听的 URL 特征(滑块通过后前端会请求此接口) GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" ORDER_PACKET_URL = "hn_userEquitys/common/order" DEFAULT_TARGET_URL = "http://yscnb.com/tyyp/" PROXY_SOURCE_URL = "http://47.109.106.79:7002/ProxIpServiceTxt" SESSION_IDLE_SECONDS = 180 MOBILE_UA_POOL = [ "Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 13; M2012K11AC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 12; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 13; SM-S9180) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 12; V2227A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 18_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Mobile/15E148 Safari/604.1", ] PROXY_FETCH_HEADERS = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Cache-Control": "no-cache", "DNT": "1", "Pragma": "no-cache", "Proxy-Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0", } # ---------- 拼图验证码计算 ---------- def _to_rgba_array(image_bytes: bytes) -> np.ndarray: return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16) def _to_rgb_array(image_bytes: bytes) -> np.ndarray: return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16) def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]: mask = alpha > threshold if not mask.any(): raise ValueError("拼图块 alpha 全透明,无法匹配") ys, xs = np.where(mask) return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 def _to_gray(rgb: np.ndarray) -> np.ndarray: return ( rgb[:, :, 0] * 0.299 + rgb[:, :, 1] * 0.587 + rgb[:, :, 2] * 0.114 ).astype(np.float32) def _grad_x(gray: np.ndarray) -> np.ndarray: pad = np.pad(gray, ((0, 0), (1, 1)), mode="edge") return np.abs(pad[:, 2:] - pad[:, :-2]) * 0.5 def calc_drag_distance_from_bytes( bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12 ) -> dict: """计算拼图目标位移,输出多个候选位移提高滑块命中率。""" bg = _to_rgb_array(bg_bytes).astype(np.float32) piece_rgba = _to_rgba_array(piece_bytes).astype(np.float32) bh, bw = bg.shape[:2] ph, _ = piece_rgba.shape[:2] if bh != ph: raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}") alpha = piece_rgba[:, :, 3] x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold) piece_crop = piece_rgba[y0:y1, x0:x1, :3] alpha_crop = alpha[y0:y1, x0:x1] mask = alpha_crop > alpha_threshold ys, xs = np.where(mask) if len(xs) < 10: raise ValueError("拼图有效像素过少,无法稳定匹配") piece_gray = _to_gray(piece_crop) bg_gray = _to_gray(bg) piece_grad = _grad_x(piece_gray) bg_grad = _grad_x(bg_gray) piece_gray_pixels = piece_gray[ys, xs] piece_grad_pixels = piece_grad[ys, xs] weights = np.clip(alpha_crop[ys, xs].astype(np.float32), 1.0, 255.0) weights = weights / float(weights.sum()) patch_h, patch_w = piece_crop.shape[:2] if patch_w > bw or patch_h > bh: raise ValueError("拼图块裁剪尺寸超过背景图") max_x = bw - patch_w scores: list[tuple[float, int, float, float]] = [] for x in range(max_x + 1): patch_gray_pixels = bg_gray[y0 + ys, x + xs] patch_grad_pixels = bg_grad[y0 + ys, x + xs] color_score = float((np.abs(patch_gray_pixels - piece_gray_pixels) * weights).sum()) grad_score = float((np.abs(patch_grad_pixels - piece_grad_pixels) * weights).sum()) # 颜色 + 边缘加权,较纯 RGB 差值在压缩噪声下更稳定 score = color_score * 0.72 + grad_score * 0.28 scores.append((score, x, color_score, grad_score)) scores.sort(key=lambda item: item[0]) best_score, best_x, best_color_score, best_grad_score = scores[0] second_best = scores[1][0] if len(scores) > 1 else float("inf") drag_distance = best_x - x0 confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf") candidate_xs: list[int] = [] for _score, x, _color, _grad in scores: if any(abs(x - picked) <= 1 for picked in candidate_xs): continue candidate_xs.append(int(x)) if len(candidate_xs) >= 5: break candidate_distances = [int(x - x0) for x in candidate_xs] return { "target_x": int(best_x), "piece_bbox_x0": int(x0), "piece_bbox_y0": int(y0), "piece_bbox_w": int(patch_w), "piece_bbox_h": int(patch_h), "bg_width": int(bw), "bg_height": int(bh), "drag_distance": int(drag_distance), "best_score": best_score, "best_color_score": best_color_score, "best_grad_score": best_grad_score, "second_best": second_best, "confidence_ratio": confidence_ratio, "candidate_target_xs": candidate_xs, "candidate_drag_distances": candidate_distances, } def parse_data_url(data_url: str) -> bytes: if not data_url.startswith("data:image"): raise ValueError("图片不是 data:image URL") _, data = data_url.split(",", 1) return base64.b64decode(data) # ---------- 元素查找与点击 ---------- def click_safe(ele) -> None: try: ele.click() return except Exception: pass ele.click(by_js=True) def find_first(page, selectors: list[str], timeout: float = 5): for sel in selectors: try: ele = page.ele(sel, timeout=timeout) if ele: return ele except Exception: continue return None def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str: """轮询等待 img 元素的 src 变为有效 data:image URL。""" deadline = time.time() + timeout while time.time() < deadline: src = img_ele.attr("src") or "" if src.startswith("data:image"): _prefix, _, b64 = src.partition(",") if b64.strip(): return src time.sleep(interval) raise RuntimeError( f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}" ) def _is_element_visible(page, selector: str) -> bool: try: visible = page.run_js( """ const el = document.querySelector(arguments[0]); if (!el) return false; const st = window.getComputedStyle(el); if (st.display === 'none' || st.visibility === 'hidden' || st.opacity === '0') return false; const r = el.getBoundingClientRect(); return r.width > 0 && r.height > 0; """, selector, ) return bool(visible) except Exception: return False def _read_slider_toast_text(page) -> str: try: text = page.run_js( """ const node = document.querySelector('.van-toast--text .van-toast__text') || document.querySelector('.van-toast__text'); if (!node) return ''; const popup = node.closest('.van-toast') || node; const st = window.getComputedStyle(popup); const visible = st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0'; if (!visible) return ''; return (node.innerText || node.textContent || '').trim(); """ ) return (text or "").strip() except Exception: return "" def _get_slider_layout_metrics(page, bg_img, slider_ele) -> dict: try: metrics = page.run_js( """ const bg = arguments[0]; const slider = arguments[1]; const bar = document.querySelector('.verify-bar-area'); const sub = slider ? slider.querySelector('.verify-sub-block') : null; const bgR = bg ? bg.getBoundingClientRect() : {width: 0}; const sliderR = slider ? slider.getBoundingClientRect() : {width: 0}; const barR = bar ? bar.getBoundingClientRect() : {width: 0}; const subR = sub ? sub.getBoundingClientRect() : {width: 0}; return { bg_display_w: Number(bgR.width || 0), slider_w: Number(sliderR.width || 0), bar_w: Number(barR.width || 0), sub_w: Number(subR.width || 0), }; """, bg_img, slider_ele, ) except Exception: metrics = None if not isinstance(metrics, dict): metrics = {} bar_w = float(metrics.get("bar_w", 0) or 0) slider_w = float(metrics.get("slider_w", 0) or 0) track_max = int(round(max(0.0, bar_w - slider_w))) if bar_w > 0 and slider_w > 0 else -1 return { "bg_display_w": float(metrics.get("bg_display_w", 0) or 0), "slider_w": slider_w, "bar_w": bar_w, "sub_w": float(metrics.get("sub_w", 0) or 0), "track_max": track_max, } def _build_move_distance_candidates( match: dict, scale: float, distance_adjust: int, track_max: int, ) -> list[int]: base_distances = match.get("candidate_drag_distances") or [int(match["drag_distance"])] micro_offsets = (0, -1, 1, -2, 2, -3, 3) candidates: list[int] = [] seen: set[int] = set() for base in base_distances[:4]: scaled = int(round(int(base) * scale)) + int(distance_adjust) for offset in micro_offsets: val = scaled + offset if track_max >= 0: val = max(0, min(track_max, val)) if val not in seen: seen.add(val) candidates.append(val) if not candidates: fallback = int(round(int(match["drag_distance"]) * scale)) + int(distance_adjust) if track_max >= 0: fallback = max(0, min(track_max, fallback)) candidates = [fallback] return candidates def _normalize_listen_packet(packet): if packet is False: return None if isinstance(packet, list): return packet[0] if packet else None return packet def _coerce_json_body(body: Any) -> Any: if not isinstance(body, str): return body raw = body.strip() if not raw: return body if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")): try: return json.loads(raw) except Exception: return body return body def _code_indicates_success(value: Any) -> Optional[bool]: if value is None: return None if isinstance(value, bool): return value if isinstance(value, (int, float)): return int(value) in (0, 1, 200) text = str(value).strip() if not text: return None lower = text.lower() if lower in {"0", "00", "000", "0000", "1", "200", "ok", "success", "true"}: return True if lower in {"false", "fail", "failed", "error", "err"}: return False if lower.lstrip("-").isdigit(): return int(lower) in (0, 1, 200) return False def _to_int_or_none(value: Any) -> Optional[int]: try: if value is None: return None if isinstance(value, bool): return int(value) if isinstance(value, (int, float)): return int(value) text = str(value).strip() if not text: return None if text.lstrip("-").isdigit(): return int(text) return None except Exception: return None def _extract_interface_message(data: dict) -> str: for key in ("msg", "message", "error", "errorMsg", "detail", "retMsg"): value = data.get(key) if isinstance(value, str) and value.strip(): return value.strip() return "" def _is_sms_already_sent_response(data: dict, message: str) -> bool: code_keys = ("code", "retCode", "resultCode", "statusCode", "errno") code_val = None for key in code_keys: if key in data: code_val = _to_int_or_none(data.get(key)) if code_val is not None: break if code_val != 1001: return False text = (message or "").strip() if not text: return False # 这类文案通常表示验证码已下发或短信频控,不应判定为滑块失败。 hints = ("验证码已发送", "短信验证码已发送", "请稍后重试", "请稍等") return any(h in text for h in hints) def _assert_interface_success(body: Any) -> None: data = _coerce_json_body(body) if not isinstance(data, dict): return fail_reasons: list[str] = [] message = _extract_interface_message(data) is_soft_success = _is_sms_already_sent_response(data, message) for key in ("success", "ok"): if key in data and not bool(data[key]): fail_reasons.append(f"{key}={data[key]}") for key in ("code", "retCode", "resultCode", "statusCode", "errno"): if key not in data: continue code_ok = _code_indicates_success(data[key]) if code_ok is False: if is_soft_success: continue fail_reasons.append(f"{key}={data[key]}") if message: lowered = message.lower() fail_words = ("失败", "错误", "无效", "fail", "error", "invalid", "请重新", "未通过") if any(word in lowered for word in fail_words): if is_soft_success: return fail_reasons.append(f"msg={message}") if fail_reasons: preview = str(data) if len(preview) > 300: preview = preview[:300] + "..." raise RuntimeError(f"接口返回失败: {', '.join(fail_reasons)}; body={preview}") def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]: deadline = time.time() + timeout last_toast = "" while time.time() < deadline: wait_span = min(0.35, max(0.05, deadline - time.time())) packet = _normalize_listen_packet(page.listen.wait(timeout=wait_span, fit_count=False)) if packet is not None: return packet, last_toast toast = _read_slider_toast_text(page) if toast: last_toast = toast return None, last_toast def _to_jsonable(value: Any) -> Any: if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, dict): return {str(k): _to_jsonable(v) for k, v in value.items()} if isinstance(value, (list, tuple, set)): return [_to_jsonable(v) for v in value] return str(value) def _analyze_slider_failure(match: dict, layout: dict, attempts: list[dict], last_reason: str) -> dict: hypotheses: list[str] = [] confidence_ratio = float(match.get("confidence_ratio", 0) or 0) if confidence_ratio < 1.03: hypotheses.append("拼图匹配置信度偏低,背景与缺口特征区分不明显,易出现位移误差。") toast_texts = [str(a.get("toast") or "") for a in attempts if a.get("toast")] if any("验证码错误" in t for t in toast_texts): hypotheses.append("前端返回“验证码错误”,通常是最终落点偏差或拖动轨迹被风控识别。") interface_errors = [str(a.get("interface_error") or "") for a in attempts if a.get("interface_error")] if interface_errors: hypotheses.append("接口已返回失败状态,说明滑块校验请求已发出但服务端未判定通过。") packet_count = sum(1 for a in attempts if a.get("packet_received")) if attempts and packet_count == 0: hypotheses.append("多次拖动均未捕获到目标接口,请检查接口监听地址是否变化。") if attempts and bool(attempts[-1].get("verify_bar_visible_after")): hypotheses.append("验证码弹窗仍可见,当前 challenge 未完成。") boundary_hits = sum(1 for a in attempts if a.get("boundary_hit")) if boundary_hits > 0: hypotheses.append("拖动距离命中轨道边界,存在位移被截断风险。") if not hypotheses and last_reason: hypotheses.append(last_reason) if not hypotheses: hypotheses.append("未识别出单一原因,建议查看保存的标注图和报告。") return { "summary": hypotheses[0], "hypotheses": hypotheses, "metrics": { "confidence_ratio": confidence_ratio, "best_score": match.get("best_score"), "second_best": match.get("second_best"), "attempt_count": len(attempts), "packet_count": packet_count, "track_max": layout.get("track_max"), }, } def _save_failure_artifacts( page, bg_bytes: bytes, piece_bytes: bytes, match: dict, layout: dict, move_distances: list[int], attempts: list[dict], last_reason: str, analysis: dict, ) -> Path: root = Path(__file__).resolve().parent / "captcha_failures" case_dir = root / datetime.now().strftime("%Y%m%d_%H%M%S_%f") case_dir.mkdir(parents=True, exist_ok=True) (case_dir / "bg.png").write_bytes(bg_bytes) (case_dir / "piece.png").write_bytes(piece_bytes) try: page.get_screenshot(path=str(case_dir / "page.png")) except Exception: pass try: bg_img = Image.open(BytesIO(bg_bytes)).convert("RGB") draw = ImageDraw.Draw(bg_img) y0 = int(match.get("piece_bbox_y0", 0) or 0) h = int(match.get("piece_bbox_h", 0) or 0) w = int(match.get("piece_bbox_w", 0) or 0) candidate_xs = match.get("candidate_target_xs") or [match.get("target_x", 0)] colors = [(255, 0, 0), (255, 140, 0), (0, 128, 255), (0, 180, 80), (160, 80, 255)] for idx, x in enumerate(candidate_xs[:5], start=1): color = colors[(idx - 1) % len(colors)] x = int(x) draw.rectangle([x, y0, x + w, y0 + h], outline=color, width=2 if idx == 1 else 1) draw.text((x + 2, max(0, y0 - 14)), f"#{idx}", fill=color) bg_img.save(case_dir / "bg_overlay.png") except Exception: pass report = { "created_at": datetime.now().isoformat(timespec="seconds"), "failure_reason": last_reason, "analysis": analysis, "match": _to_jsonable(match), "layout": _to_jsonable(layout), "move_distances": move_distances, "attempts": _to_jsonable(attempts), } (case_dir / "report.json").write_text( json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8", ) return case_dir # ---------- 滑块拖动(仿人轨迹) ---------- def _ease_out_quad(t: float) -> float: return t * (2 - t) def _ease_out_cubic(t: float) -> float: return 1 - (1 - t) ** 3 def _ease_out_bounce(t: float) -> float: if t < 1 / 2.75: return 7.5625 * t * t elif t < 2 / 2.75: t -= 1.5 / 2.75 return 7.5625 * t * t + 0.75 elif t < 2.5 / 2.75: t -= 2.25 / 2.75 return 7.5625 * t * t + 0.9375 else: t -= 2.625 / 2.75 return 7.5625 * t * t + 0.984375 def build_human_track(distance: int, num_steps: int = 0) -> list[dict]: """生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。""" if distance == 0: return [] dist = abs(distance) sign = 1 if distance > 0 else -1 if num_steps <= 0: num_steps = max(12, int(dist * random.uniform(0.25, 0.4))) overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08))) total = dist + overshoot easing = random.choice([_ease_out_quad, _ease_out_cubic]) raw_positions: list[float] = [] for i in range(1, num_steps + 1): t = i / num_steps raw_positions.append(easing(t) * total) bounce_steps = random.randint(2, 4) for j in range(1, bounce_steps + 1): t = j / bounce_steps raw_positions.append(total - _ease_out_bounce(t) * overshoot) track: list[dict] = [] prev_x = 0.0 for pos in raw_positions: dx = round(pos - prev_x) if dx == 0 and random.random() < 0.3: continue prev_x += dx dy = random.choice([-1, 0, 0, 0, 1]) dt = ( random.uniform(0.005, 0.012) if prev_x < dist * 0.6 else random.uniform(0.008, 0.025) ) if random.random() < 0.03: dt += random.uniform(0.02, 0.06) track.append({"dx": sign * dx, "dy": dy, "dt": dt}) actual = sum(s["dx"] for s in track) diff = distance - actual if diff != 0: track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)}) return track def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None: page.run_cdp( "Input.dispatchMouseEvent", type=event_type, x=x, y=y, button=button, clickCount=1 if event_type == "mousePressed" else 0, ) def _get_element_center(page, ele) -> tuple[int, int]: rect = page.run_js( """const r = arguments[0].getBoundingClientRect(); return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""", ele, ) if rect and isinstance(rect, dict): return int(rect["x"]), int(rect["y"]) loc = ele.rect.midpoint return int(loc[0]), int(loc[1]) def drag_slider(page, slider_ele, distance: int) -> None: """用 CDP 级鼠标事件完成拖拽,模拟真人操作。""" cx, cy = _get_element_center(page, slider_ele) _dispatch_mouse(page, "mouseMoved", cx, cy) time.sleep(random.uniform(0.03, 0.08)) _dispatch_mouse(page, "mousePressed", cx, cy) time.sleep(random.uniform(0.02, 0.06)) cur_x, cur_y = cx, cy track = build_human_track(distance) for step in track: cur_x += step["dx"] cur_y += step["dy"] _dispatch_mouse(page, "mouseMoved", cur_x, cur_y) time.sleep(step["dt"]) time.sleep(random.uniform(0.02, 0.06)) _dispatch_mouse(page, "mouseReleased", cur_x, cur_y) # ---------- 核心自动化流程 ---------- def submit_phone( page, phone: str, url: str = "http://yscnb.com/tyyp/", alpha_threshold: int = 12, distance_adjust: int = 0, wait_page: float = 0.3, ) -> Any: """ 填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。 """ page.get(url) time.sleep(wait_page) # 1. 勾选协议 agree_checkbox = find_first( page, [ "css:#color-input-red", "css:input[name='color-input-red']", 'x://input[@id="color-input-red"]', "css:input.right-box[type='checkbox']", ], timeout=5, ) if agree_checkbox: click_safe(agree_checkbox) time.sleep(0.4) # 2. 立即订购 order_btn = None for attempt in range(4): order_btn = find_first( page, [ "css:div.paybg", "css:.paybg", 'x://button[contains(.,"立即订购")]', 'x://a[contains(.,"立即订购")]', 'x://span[contains(.,"立即订购")]', 'x://div[contains(.,"立即订购")]', 'x://*[contains(text(),"立即订购")]', 'x://*[contains(.,"立即订购")]', "css:.btn-order", "css:button.btn-primary", "css:button.btn", "css:a.btn", ], timeout=1, ) if order_btn: break time.sleep(0.25) if order_btn: try: order_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.05) except Exception: pass click_safe(order_btn) time.sleep(0.4) else: try: page.run_js(""" var nodes = document.querySelectorAll('button, a, span, div'); for (var i = 0; i < nodes.length; i++) { var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); if (t.indexOf('立即订购') >= 0) { nodes[i].scrollIntoView({block: 'center'}); nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); return true; } } return false; """) except Exception: pass time.sleep(0.4) # 3. 输入手机号 phone_input = find_first( page, [ 'x://*[@placeholder="请输入手机号码"]', "css:input.inp-txt", ], timeout=8, ) if not phone_input: raise RuntimeError("未找到手机号输入框") phone_input.input(phone, clear=True) agree = find_first( page, [ "css:i.ico-checkbox", 'x://i[contains(@class,"ico-checkbox")]', ], timeout=2, ) if agree: try: click_safe(agree) except Exception: pass # 4. 启动监听(必须在点击获取验证码之前) page.listen.start(GET_YAN_ZHEN_MA_URL) send_btn = find_first( page, [ "css:button.btn-code", 'x://button[contains(text(),"获取验证码")]', ], timeout=8, ) if not send_btn: raise RuntimeError("未找到「获取验证码」按钮") click_safe(send_btn) # 5. 等待滑块弹窗 verify_box = find_first( page, ["css:.verifybox", "css:.verify-bar-area"], timeout=6, ) if not verify_box: raise RuntimeError("未检测到滑块验证码弹窗") bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) slider = find_first(page, ["css:.verify-move-block"], timeout=5) if not bg_img or not piece_img or not slider: raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") # 6. 每次重试前重新抓取验证码图片并重算位移(不复用旧距离) deadline = time.time() + 15.0 last_reason = "" attempts: list[dict[str, Any]] = [] max_attempts = 6 last_bg_bytes = b"" last_piece_bytes = b"" last_match: dict[str, Any] = {} last_layout: dict[str, Any] = {} last_move_distances: list[int] = [] for idx in range(1, max_attempts + 1): if time.time() >= deadline: break bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=2) piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=2) slider = find_first(page, ["css:.verify-move-block"], timeout=2) attempt: dict[str, Any] = { "index": idx, "move_distance": None, "boundary_hit": False, "packet_received": False, "toast": "", "interface_error": "", "verify_bar_visible_after": False, } if not bg_img or not piece_img or not slider: last_reason = "重试时未找到验证码关键元素(背景图/拼图块/滑块)" attempt["interface_error"] = last_reason attempts.append(attempt) break try: bg_src = wait_for_data_src(bg_img, timeout=4) piece_src = wait_for_data_src(piece_img, timeout=4) bg_bytes = parse_data_url(bg_src) piece_bytes = parse_data_url(piece_src) except Exception as e: last_reason = f"重试时读取验证码图片失败: {e}" attempt["interface_error"] = last_reason attempts.append(attempt) continue if len(bg_bytes) < 100 or len(piece_bytes) < 100: last_reason = f"验证码图片数据异常: bg={len(bg_bytes)}B piece={len(piece_bytes)}B" attempt["interface_error"] = last_reason attempts.append(attempt) continue try: match = calc_drag_distance_from_bytes( bg_bytes, piece_bytes, alpha_threshold=alpha_threshold ) except Exception as e: last_reason = f"重试时位移计算失败: {e}" attempt["interface_error"] = last_reason attempts.append(attempt) continue layout = _get_slider_layout_metrics(page, bg_img, slider) bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"] scale = float(bg_display_w) / max(1, match["bg_width"]) track_max = int(layout["track_max"]) move_distances = _build_move_distance_candidates( match=match, scale=scale, distance_adjust=int(distance_adjust), track_max=track_max, ) pick_index = min(max(0, idx - 1), len(move_distances) - 1) move_distance = int(move_distances[pick_index]) attempt["move_distance"] = move_distance attempt["boundary_hit"] = bool(track_max >= 0 and move_distance in (0, track_max)) attempt["confidence_ratio"] = float(match.get("confidence_ratio", 0) or 0) attempt["candidate_count"] = len(move_distances) # 记录最后一次有效计算,用于失败样本保存与分析 last_bg_bytes = bg_bytes last_piece_bytes = piece_bytes last_match = match last_layout = layout last_move_distances = move_distances drag_slider(page, slider, move_distance) time.sleep(0.2) remaining = max(0.1, deadline - time.time()) packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining)) attempt["toast"] = toast_text or "" if packet is not None: attempt["packet_received"] = True response = getattr(packet, "response", None) if response is None: attempt["interface_error"] = f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段" last_reason = attempt["interface_error"] attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") attempts.append(attempt) if not attempt["verify_bar_visible_after"]: break continue body = _coerce_json_body(response.body) attempt["response_body"] = _to_jsonable(body) try: _assert_interface_success(body) return body except Exception as e: attempt["interface_error"] = str(e) last_reason = attempt["interface_error"] attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") attempts.append(attempt) if not attempt["verify_bar_visible_after"]: break continue if toast_text: last_reason = f"滑块验证失败:{toast_text}" elif _is_element_visible(page, ".verify-bar-area"): last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}" else: last_reason = "验证码弹窗已关闭但未捕获接口响应" attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") attempt["interface_error"] = last_reason if not attempt["interface_error"] else attempt["interface_error"] attempts.append(attempt) analysis = _analyze_slider_failure( match=last_match, layout=last_layout, attempts=attempts, last_reason=last_reason, ) save_info = "" try: case_dir = _save_failure_artifacts( page=page, bg_bytes=last_bg_bytes, piece_bytes=last_piece_bytes, match=last_match, layout=last_layout, move_distances=last_move_distances, attempts=attempts, last_reason=last_reason, analysis=analysis, ) save_info = f"; 失败样本目录: {case_dir}" except Exception as e: save_info = f"; 失败样本保存失败: {e}" raise RuntimeError( f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}" f"; 分析: {analysis.get('summary', '无')}" f"{save_info}" ) def submit_code(page, code: str) -> dict: """填写短信验证码,点击确认订购按钮。""" code_input = find_first( page, [ 'x://input[@placeholder*="验证码"]', 'x://input[@placeholder*="短信"]', "css:input.inp-txt[type='text']", "css:input[type='tel']", "css:.code-input", "css:input.verify-input", ], timeout=8, ) if not code_input: raise RuntimeError("未找到验证码输入框") code_input.input(code, clear=True) time.sleep(0.2) confirm_btn = find_first( page, [ "css:img.btn-buy", "css:.btn-buy", 'x://img[contains(@class,"btn-buy")]', 'x://*[contains(@class,"btn-buy")]', ], timeout=5, ) if not confirm_btn: raise RuntimeError("未找到确认订购按钮(img.btn-buy)") try: confirm_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.05) except Exception: pass click_safe(confirm_btn) return {"success": True, "message": "已点击确认订购"} # ---------- 入口 ---------- def main() -> None: parser = argparse.ArgumentParser(description="天翼云盘订购页自动化") parser.add_argument("--api", action="store_true", help="以 FastAPI 服务模式启动") parser.add_argument("--host", default="0.0.0.0", help="API 监听地址(--api 模式)") parser.add_argument("--api-port", type=int, default=8000, help="API 监听端口(--api 模式)") parser.add_argument( "--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)" ) parser.add_argument("--url", default=DEFAULT_TARGET_URL, help="目标页面 URL") parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建") args = parser.parse_args() if args.api: import uvicorn uvicorn.run(app, host=args.host, port=args.api_port) return if args.port: co = ChromiumOptions().set_local_port(port=args.port) page = ChromiumPage(addr_or_opts=co) else: page = ChromiumPage() print(f"打开页面: {args.url},手机号: {args.phone}") body = submit_phone( page=page, phone=args.phone, url=args.url, ) print("hn_userEquitys/getYanZhenMa/v2 响应:") print(body) input_code(page=page, code=123123) def input_code(page, code): page.ele('x://input[@placeholder="请输入验证码"]').input(code, clear=True) time.sleep(0.5) page.listen.start(ORDER_PACKET_URL) page.ele('x://*[@id="app"]/div/img').click(by_js=True) time.sleep(0.5) page.ele('x://*[@id="app"]/div/div[7]/div/div[3]/button[2]').click(by_js=True) res = _normalize_listen_packet(page.listen.wait(timeout=15, fit_count=False)) if res is None: raise RuntimeError(f"提交验证码后未捕获到 {ORDER_PACKET_URL} 抓包数据") response = getattr(res, "response", None) if response is None: raise RuntimeError("提交验证码后抓包缺少 response 字段") return _coerce_json_body(response.body) # ---------- FastAPI 接口 ---------- class ApiSubmitPhoneRequest(BaseModel): phone: str = Field(..., description="手机号码") url: str = Field(DEFAULT_TARGET_URL, description="目标页面 URL") proxy_api_url: str = Field(PROXY_SOURCE_URL, description="代理来源接口(返回 ip:port 文本)") class ApiSubmitPhoneResponse(BaseModel): success: bool = True flow_id: str = Field(..., description="流程唯一标识符") data: Any = Field(..., description="滑块流程抓包响应体(getYanZhenMa)") phone: str = "" url: str = "" created_at: str = "" proxy: str = "" ua: str = "" class ApiSubmitCodeRequest(BaseModel): flow_id: str = Field(..., description="submit_phone 返回的流程唯一标识符") code: str = Field(..., description="短信验证码") class ApiSubmitCodeResponse(BaseModel): success: bool = True flow_id: str = "" data: Any = Field(..., description="验证码提交后的抓包响应体") app = FastAPI(title="test1 自动化 API", description="TgeBrowser + DrissionPage 天翼页面自动化接口") _flow_sessions: dict[str, dict[str, Any]] = {} _flow_lock = threading.Lock() _cleanup_thread_started = False def _choose_mobile_ua() -> str: return random.choice(MOBILE_UA_POOL) def _fetch_proxy_text(proxy_api_url: str) -> str: resp = requests.get( proxy_api_url, headers=PROXY_FETCH_HEADERS, timeout=15, ) resp.raise_for_status() text = (resp.text or "").strip() if not text: raise RuntimeError("代理接口返回为空") first_line = text.splitlines()[0].strip() if not first_line: raise RuntimeError("代理接口返回格式异常:首行为空") return first_line def _parse_proxy_addr(proxy_text: str) -> tuple[str, int]: m = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text) if not m: raise RuntimeError(f"代理格式不合法(期望 ip:port): {proxy_text}") host = m.group(1) port = int(m.group(2)) if not (1 <= port <= 65535): raise RuntimeError(f"代理端口不合法: {port}") return host, port def _connect_page_from_start_data(start_data: dict) -> ChromiumPage: port = start_data.get("port") ws = start_data.get("ws") if port: co = ChromiumOptions().set_local_port(port=int(port)) return ChromiumPage(addr_or_opts=co) if ws: return ChromiumPage(addr_or_opts=ws) raise RuntimeError("TgeBrowser 未返回 port 或 ws") def _create_tgebrowser_browser( client: TgeBrowserClient, browser_name: str, start_page_url: str, proxy_host: str, proxy_port: int, mobile_ua: str, ) -> dict: proxy_candidates = [ {"protocol": "http", "host": proxy_host, "port": proxy_port}, {"protocol": "http", "host": proxy_host, "port": str(proxy_port)}, {"protocol": "http", "server": f"{proxy_host}:{proxy_port}"}, ] last_error: Optional[Exception] = None for proxy_conf in proxy_candidates: try: return client.create_browser( browser_name=browser_name, start_page_url=start_page_url, proxy=proxy_conf, fingerprint={ "os": "Android", "platformVersion": 12, "userAgent": mobile_ua, }, ) except Exception as e: last_error = e continue raise RuntimeError(f"创建浏览器失败(代理配置尝试均失败): {last_error}") def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None: try: page.run_cdp("Network.enable") except Exception: pass try: page.run_cdp("Network.setUserAgentOverride", userAgent=ua) except Exception: pass def _close_flow_resources(flow: dict[str, Any]) -> None: page = flow.get("page") client = flow.get("client") env_id = flow.get("env_id") try: if page: page.quit() except Exception: pass try: if client and env_id is not None: client.stop_browser(env_id=env_id) except Exception: pass def _cleanup_expired_flows() -> None: now_ts = time.time() expired: list[dict[str, Any]] = [] with _flow_lock: for flow_id, flow in list(_flow_sessions.items()): if flow.get("busy"): continue last_active_ts = float(flow.get("last_active_ts", now_ts)) if now_ts - last_active_ts > SESSION_IDLE_SECONDS: expired.append(_flow_sessions.pop(flow_id)) for flow in expired: _close_flow_resources(flow) def _flow_cleanup_worker() -> None: while True: try: _cleanup_expired_flows() except Exception: pass time.sleep(15) def _ensure_cleanup_thread() -> None: global _cleanup_thread_started with _flow_lock: if _cleanup_thread_started: return th = threading.Thread(target=_flow_cleanup_worker, name="flow_cleanup_worker", daemon=True) th.start() _cleanup_thread_started = True @app.on_event("startup") def _api_startup() -> None: _ensure_cleanup_thread() @app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse) def api_submit_phone(req: ApiSubmitPhoneRequest): _ensure_cleanup_thread() client: Optional[TgeBrowserClient] = None page: Optional[ChromiumPage] = None env_id: Optional[int] = None stored = False try: proxy_text = _fetch_proxy_text(req.proxy_api_url) proxy_host, proxy_port = _parse_proxy_addr(proxy_text) mobile_ua = _choose_mobile_ua() client = TgeBrowserClient() browser_name = f"tyyp_{req.phone[-4:]}_{int(time.time())}" create_data = _create_tgebrowser_browser( client=client, browser_name=browser_name, start_page_url=req.url, proxy_host=proxy_host, proxy_port=proxy_port, mobile_ua=mobile_ua, ) env_id = create_data.get("envId") if env_id is None: raise RuntimeError("创建浏览器失败:未返回 envId") start_data = client.start_browser(env_id=env_id) page = _connect_page_from_start_data(start_data) _apply_mobile_ua(page, mobile_ua) packet_body = submit_phone( page=page, phone=req.phone, url=req.url, ) flow_id = str(uuid.uuid4()) created = datetime.now() flow = { "flow_id": flow_id, "page": page, "client": client, "env_id": env_id, "phone": req.phone, "url": req.url, "proxy": proxy_text, "ua": mobile_ua, "created_at": created, "created_at_iso": created.isoformat(timespec="seconds"), "created_ts": time.time(), "last_active_ts": time.time(), "busy": False, } with _flow_lock: _flow_sessions[flow_id] = flow stored = True return ApiSubmitPhoneResponse( success=True, flow_id=flow_id, data=packet_body, phone=req.phone, url=req.url, created_at=flow["created_at_iso"], proxy=proxy_text, ua=mobile_ua, ) except Exception as e: if not stored: _close_flow_resources({"page": page, "client": client, "env_id": env_id}) raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/submit_code", response_model=ApiSubmitCodeResponse) def api_submit_code(req: ApiSubmitCodeRequest): _ensure_cleanup_thread() with _flow_lock: flow = _flow_sessions.get(req.flow_id) if not flow: raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {req.flow_id}") flow["busy"] = True flow["last_active_ts"] = time.time() page = flow.get("page") if not page: with _flow_lock: flow = _flow_sessions.get(req.flow_id) if flow: flow["busy"] = False flow["last_active_ts"] = time.time() raise HTTPException(status_code=500, detail="流程页面对象缺失") try: packet_body = input_code(page=page, code=req.code) return ApiSubmitCodeResponse( success=True, flow_id=req.flow_id, data=packet_body, ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: with _flow_lock: flow = _flow_sessions.get(req.flow_id) if flow: flow["busy"] = False flow["last_active_ts"] = time.time() @app.get("/api/flow/{flow_id}") def api_get_flow(flow_id: str): with _flow_lock: flow = _flow_sessions.get(flow_id) if not flow: raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}") return { "success": True, "flow_id": flow_id, "phone": flow.get("phone"), "url": flow.get("url"), "proxy": flow.get("proxy"), "ua": flow.get("ua"), "created_at": flow.get("created_at_iso"), "last_active_ts": flow.get("last_active_ts"), } @app.delete("/api/flow/{flow_id}") def api_close_flow(flow_id: str): with _flow_lock: flow = _flow_sessions.pop(flow_id, None) if not flow: return {"success": False, "message": "流程不存在"} _close_flow_resources(flow) return {"success": True, "message": "流程已关闭"} if __name__ == "__main__": main()