#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 站点流程自动化(DrissionPage): 1) 打开 http://yscnb.com/tyyp/1.html 2) 输入手机号 3) 点击“获取验证码” 4) 解析弹窗中的拼图验证码并执行拖动 说明:仅实现到“滑块拖动”步骤,不会自动填写短信验证码。 """ from __future__ import annotations import argparse import base64 import random import re import time from io import BytesIO from pathlib import Path import numpy as np from PIL import Image def _to_rgba_array(image_bytes: bytes) -> np.ndarray: return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16) def _to_rgb_array(image_bytes: bytes) -> np.ndarray: return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16) def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]: mask = alpha > threshold if not mask.any(): raise ValueError("拼图块 alpha 全透明,无法匹配") ys, xs = np.where(mask) return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 def calc_drag_distance_from_bytes(bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12) -> dict: """计算拼图目标位移(基于背景图 + 拼图块图)。""" bg = _to_rgb_array(bg_bytes) piece_rgba = _to_rgba_array(piece_bytes) bh, bw = bg.shape[:2] ph, _ = piece_rgba.shape[:2] if bh != ph: raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}") alpha = piece_rgba[:, :, 3] x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold) piece_crop = piece_rgba[y0:y1, x0:x1, :3] mask = alpha[y0:y1, x0:x1] > alpha_threshold ys, xs = np.where(mask) piece_pixels = piece_crop[ys, xs] patch_h, patch_w = piece_crop.shape[:2] if patch_w > bw or patch_h > bh: raise ValueError("拼图块裁剪尺寸超过背景图") best_x = 0 best_score = float("inf") second_best = float("inf") max_x = bw - patch_w for x in range(max_x + 1): patch_pixels = bg[y0 + ys, x + xs] score = float(np.abs(patch_pixels - piece_pixels).mean()) if score < best_score: second_best = best_score best_score = score best_x = x elif score < second_best: second_best = score drag_distance = best_x - x0 confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf") return { "target_x": int(best_x), "piece_bbox_x0": int(x0), "piece_bbox_y0": int(y0), "piece_bbox_w": int(patch_w), "piece_bbox_h": int(patch_h), "bg_width": int(bw), "bg_height": int(bh), "drag_distance": int(drag_distance), "best_score": best_score, "second_best": second_best, "confidence_ratio": confidence_ratio, } def parse_data_url(data_url: str) -> bytes: if not data_url.startswith("data:image"): raise ValueError("图片不是 data:image URL") _, data = data_url.split(",", 1) return base64.b64decode(data) def style_px(style_text: str, key: str, default: float) -> float: if not style_text: return default m = re.search(rf"{re.escape(key)}\s*:\s*([0-9.]+)px", style_text) if not m: return default return float(m.group(1)) def click_safe(ele) -> None: try: ele.click() return except Exception: pass ele.click(by_js=True) def _ease_out_quad(t: float) -> float: return t * (2 - t) def _ease_out_cubic(t: float) -> float: return 1 - (1 - t) ** 3 def _ease_out_bounce(t: float) -> float: if t < 1 / 2.75: return 7.5625 * t * t elif t < 2 / 2.75: t -= 1.5 / 2.75 return 7.5625 * t * t + 0.75 elif t < 2.5 / 2.75: t -= 2.25 / 2.75 return 7.5625 * t * t + 0.9375 else: t -= 2.625 / 2.75 return 7.5625 * t * t + 0.984375 def build_human_track(distance: int, num_steps: int = 0) -> list[dict]: """生成仿人轨迹列表,每项 {'dx': int, 'dy': int, 'dt': float(秒)}。 包含:加速-匀速-减速-过冲-回弹 五阶段。 """ if distance == 0: return [] dist = abs(distance) sign = 1 if distance > 0 else -1 if num_steps <= 0: num_steps = max(12, int(dist * random.uniform(0.25, 0.4))) overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08))) total = dist + overshoot # 先超过,再回弹 easing = random.choice([_ease_out_quad, _ease_out_cubic]) # 正向阶段 raw_positions: list[float] = [] for i in range(1, num_steps + 1): t = i / num_steps raw_positions.append(easing(t) * total) # 回弹阶段 (2~4 步) bounce_steps = random.randint(2, 4) for j in range(1, bounce_steps + 1): t = j / bounce_steps raw_positions.append(total - _ease_out_bounce(t) * overshoot) track: list[dict] = [] prev_x = 0.0 for pos in raw_positions: dx = round(pos - prev_x) if dx == 0 and random.random() < 0.3: continue prev_x += dx dy = random.choice([-1, 0, 0, 0, 1]) # 前半段快、后半段慢(加快整体速度) dt = random.uniform(0.005, 0.012) if prev_x < dist * 0.6 else random.uniform(0.008, 0.025) if random.random() < 0.03: dt += random.uniform(0.02, 0.06) track.append({"dx": sign * dx, "dy": dy, "dt": dt}) # 最终位置校正 actual = sum(s["dx"] for s in track) diff = distance - actual if diff != 0: track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)}) return track def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None: """通过 CDP Input.dispatchMouseEvent 发送鼠标事件。""" page.run_cdp( "Input.dispatchMouseEvent", type=event_type, x=x, y=y, button=button, clickCount=1 if event_type == "mousePressed" else 0, ) def _get_element_center(page, ele) -> tuple[int, int]: """获取元素在视口中的中心坐标。""" rect = page.run_js( """const r = arguments[0].getBoundingClientRect(); return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""", ele, ) if rect and isinstance(rect, dict): return int(rect["x"]), int(rect["y"]) # 回退:通过元素属性 loc = ele.rect.midpoint return int(loc[0]), int(loc[1]) def drag_slider(page, slider_ele, distance: int) -> None: """用 CDP 级鼠标事件完成拖拽,模拟真人操作轨迹。""" cx, cy = _get_element_center(page, slider_ele) # 1. 鼠标移到滑块中心 _dispatch_mouse(page, "mouseMoved", cx, cy) time.sleep(random.uniform(0.03, 0.08)) # 2. 按下 _dispatch_mouse(page, "mousePressed", cx, cy) time.sleep(random.uniform(0.02, 0.06)) # 3. 沿轨迹移动 cur_x, cur_y = cx, cy track = build_human_track(distance) for step in track: cur_x += step["dx"] cur_y += step["dy"] _dispatch_mouse(page, "mouseMoved", cur_x, cur_y) time.sleep(step["dt"]) # 4. 到达终点后短暂停留 time.sleep(random.uniform(0.02, 0.06)) # 5. 释放 _dispatch_mouse(page, "mouseReleased", cur_x, cur_y) def find_first(page, selectors: list[str], timeout: float = 5): for sel in selectors: try: ele = page.ele(sel, timeout=timeout) if ele: return ele except Exception: continue return None def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str: """轮询等待 img 元素的 src 变为有效 data:image URL(含非空 base64 数据)。""" deadline = time.time() + timeout while time.time() < deadline: src = img_ele.attr("src") or "" if src.startswith("data:image"): _prefix, _, b64 = src.partition(",") if b64.strip(): return src time.sleep(interval) raise RuntimeError(f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}") def save_debug(debug_dir: Path, bg_bytes: bytes, piece_bytes: bytes) -> tuple[Path, Path]: debug_dir.mkdir(parents=True, exist_ok=True) bg_path = debug_dir / "captcha_bg.png" piece_path = debug_dir / "captcha_piece.png" bg_path.write_bytes(bg_bytes) piece_path.write_bytes(piece_bytes) return bg_path, piece_path def run(args: argparse.Namespace) -> None: from DrissionPage import ChromiumOptions, ChromiumPage t_start = time.perf_counter() if args.port: co = ChromiumOptions().set_local_port(port=args.port) page = ChromiumPage(addr_or_opts=co) else: page = ChromiumPage() page.get(args.url) time.sleep(args.wait_page) # 首页先勾选协议 checkbox(id=color-input-red 的复选框),再点「立即订购」 agree_checkbox = find_first(page, [ "css:#color-input-red", "css:input[name='color-input-red']", 'x://input[@id="color-input-red"]', "css:input.right-box[type='checkbox']", ], timeout=5) if agree_checkbox: click_safe(agree_checkbox) print("已勾选协议复选框") time.sleep(0.4) # 勾选后等待 # 立即订购:点击 div.paybg 即为立即订购 order_btn = None for attempt in range(4): order_btn = find_first(page, [ "css:div.paybg", "css:.paybg", 'x://button[contains(.,"立即订购")]', 'x://a[contains(.,"立即订购")]', 'x://span[contains(.,"立即订购")]', 'x://div[contains(.,"立即订购")]', 'x://*[contains(text(),"立即订购")]', 'x://*[contains(.,"立即订购")]', "css:.btn-order", "css:.order-btn", "css:button.btn-primary", "css:button.btn", "css:a.btn", ], timeout=1) if order_btn: break time.sleep(0.25) if order_btn: try: order_btn.run_js("this.scrollIntoView({block:'center'})") time.sleep(0.05) except Exception: pass click_safe(order_btn) print("已点击立即订购") time.sleep(0.4) else: # 兜底1:若页面有 jQuery,用 :contains 查找 jq_clicked = False try: jq_clicked = page.run_js(""" if (typeof $ !== 'undefined') { var el = $('button, a, span, div').filter(function(){ return $(this).text().indexOf('立即订购')>=0; }).first(); if (el.length) { el[0].scrollIntoView({block:'center'}); el[0].click(); return true; } el = $('*').filter(function(){ return $(this).text().trim()==='立即订购'; }).first(); if (el.length) { el[0].scrollIntoView({block:'center'}); el[0].click(); return true; } } return false; """) if jq_clicked: print("已通过 jQuery 点击立即订购") time.sleep(0.4) except Exception: pass if not jq_clicked: clicked = page.run_js(""" var nodes = document.querySelectorAll('button, a, span, div, input[type=button], input[type=submit]'); for (var i = 0; i < nodes.length; i++) { var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); if (t === '立即订购' || (t.indexOf('立即订购') >= 0 && t.length < 20)) { var el = nodes[i]; if (el.offsetParent !== null || el.tagName === 'BODY') { el.scrollIntoView({block: 'center'}); el.dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); return true; } } } for (var i = 0; i < nodes.length; i++) { var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); if (t.indexOf('立即订购') >= 0) { nodes[i].scrollIntoView({block: 'center'}); nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); return true; } } return false; """) if clicked: print("已通过 JS 点击立即订购") time.sleep(0.4) elif not clicked: # 尝试在 iframe 内点击 in_iframe = page.run_js(""" var iframes = document.querySelectorAll('iframe'); for (var i = 0; i < iframes.length; i++) { try { var doc = iframes[i].contentDocument || iframes[i].contentWindow.document; var all = doc.querySelectorAll('*'); for (var j = 0; j < all.length; j++) { var t = (all[j].innerText || all[j].textContent || '').trim(); if (t.indexOf('立即订购') >= 0) { all[j].scrollIntoView({block:'center'}); all[j].dispatchEvent(new MouseEvent('click',{bubbles:true,cancelable:true,view:window})); return true; } } } catch(e) {} } return false; """) if in_iframe: print("已在 iframe 内点击立即订购") time.sleep(0.4) else: # 调试:输出包含「立即订购」的元素信息 try: info = page.run_js(""" var out=[], all=document.querySelectorAll('*'); for(var i=0;i=0) out.push(all[i].tagName+(all[i].id?'#'+all[i].id:'')+(all[i].className?'.'+all[i].className.split(' ')[0]:'')); } return out.slice(0,5).join(', ') || '无'; """) print(f"调试: 包含「立即订购」的元素(前5个): {info}") except Exception: pass print("警告: 未找到「立即订购」按钮,尝试继续...") phone_input = find_first(page, [ 'x://input[@placeholder="请输入手机号码"]', "css:input.inp-txt", ], timeout=8) if not phone_input: raise RuntimeError("未找到手机号输入框") phone_input.input(args.phone, clear=True) print(f"已输入手机号: {args.phone}") if not args.skip_agree: agree = find_first(page, [ "css:i.ico-checkbox", 'x://i[contains(@class,"ico-checkbox")]', ], timeout=2) if agree: try: click_safe(agree) print("已点击同意勾选") except Exception: print("同意勾选点击失败,继续执行") send_btn = find_first(page, [ "css:button.btn-code", 'x://button[contains(text(),"获取验证码")]', ], timeout=8) if not send_btn: raise RuntimeError("未找到“获取验证码”按钮") click_safe(send_btn) print("已点击获取验证码,等待滑块弹窗") # 等待验证码弹窗出现 verify_box = find_first(page, [ "css:.verifybox", "css:.verify-bar-area", ], timeout=6) if not verify_box: raise RuntimeError("未检测到滑块验证码弹窗") bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) slider = find_first(page, ["css:.verify-move-block"], timeout=5) bar = find_first(page, ["css:.verify-bar-area"], timeout=5) if not bg_img or not piece_img or not slider or not bar: raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") bg_src = wait_for_data_src(bg_img, timeout=10) piece_src = wait_for_data_src(piece_img, timeout=10) bg_bytes = parse_data_url(bg_src) piece_bytes = parse_data_url(piece_src) if len(bg_bytes) < 100 or len(piece_bytes) < 100: raise RuntimeError(f"验证码图片数据异常: bg={len(bg_bytes)}B, piece={len(piece_bytes)}B") if args.debug_dir: bg_path, piece_path = save_debug(Path(args.debug_dir), bg_bytes, piece_bytes) print(f"已保存验证码图片: {bg_path} | {piece_path}") match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=args.alpha_threshold) # 用渲染宽度做像素→屏幕映射 bg_display_w = page.run_js( """const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;""", bg_img, ) if not bg_display_w or bg_display_w <= 0: bg_display_w = match["bg_width"] scale = float(bg_display_w) / max(1, match["bg_width"]) move_distance = int(round(match["drag_distance"] * scale)) + int(args.distance_adjust) print( "滑块匹配结果: " f"bg_width={match['bg_width']}, bg_display_w={bg_display_w}, " f"target_x={match['target_x']}, drag_distance={match['drag_distance']}, " f"scale={scale:.4f}, move_distance={move_distance}, " f"confidence={match['confidence_ratio']:.4f}" ) drag_slider(page, slider, move_distance) time.sleep(args.wait_result) elapsed = time.perf_counter() - t_start print(f"总耗时: {elapsed:.2f} 秒(程序开始 → 滑块滑动完成)") # 尝试判断是否通过:遮罩是否消失 still_visible = page.run_js( """ const m = document.querySelector('.mask'); if (!m) return false; const s = window.getComputedStyle(m); return s.display !== 'none' && s.visibility !== 'hidden' && s.opacity !== '0'; """ ) if still_visible: print("拖动已执行,但验证码弹窗仍在,可能需要微调 --distance-adjust") else: print("滑块拖动完成,验证码弹窗已关闭(疑似验证通过)") def build_parser() -> argparse.ArgumentParser: root = Path(__file__).resolve().parent p = argparse.ArgumentParser(description="天翼订购页 1.html 自动滑块脚本") p.add_argument("--url", default="http://yscnb.com/tyyp/1.html", help="目标页面 URL") p.add_argument("--phone", required=True, help="手机号") p.add_argument("--port", type=int, default=0, help="连接已有浏览器端口(可选)") p.add_argument("--skip-agree", action="store_true", help="跳过勾选“同意办理”") p.add_argument("--alpha-threshold", type=int, default=12, help="拼图 alpha 透明阈值") p.add_argument("--distance-adjust", type=int, default=0, help="拖动距离微调像素") p.add_argument("--wait-page", type=float, default=0.3, help="打开页面后等待秒数") p.add_argument("--wait-result", type=float, default=0.5, help="拖动后等待结果秒数") p.add_argument("--debug-dir", default=str(root / "captcha_debug"), help="验证码图片输出目录") return p def main() -> None: args = build_parser().parse_args() run(args) if __name__ == "__main__": main()