From f600b1720e6d576f4744f205652ea3aa3030af63 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Fri, 27 Feb 2026 13:37:07 +0800 Subject: [PATCH] haha --- test.py | 21 +++ test1.py | 526 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 547 insertions(+) create mode 100644 test.py create mode 100644 test1.py diff --git a/test.py b/test.py new file mode 100644 index 0000000..d705741 --- /dev/null +++ b/test.py @@ -0,0 +1,21 @@ +from DrissionPage import ChromiumPage, ChromiumOptions + +page = ChromiumPage() + +page.get("https://flows.cdyylkj.com/miguMusicTL/") + +page.listen.start("api/migu/P2973") +page.ele('x://*[@placeholder="请输入您的移动手机号"]').input("18981818763") +page.ele('x://*[@id="app"]/div[1]/div[4]/div/div').click(by_js=True) +page.ele('x://button[normalize-space(text()) ="办理"]').click(by_js=True) + +res = page.listen.wait() +print(res.response.body) + + + + + + + + diff --git a/test1.py b/test1.py new file mode 100644 index 0000000..008c80b --- /dev/null +++ b/test1.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +天翼云盘订购页 http://yscnb.com/tyyp/ 自动化脚本(全部代码自包含) + +自动化流程: +1. 打开页面 +2. 勾选协议 +3. 点击「立即订购」 +4. 输入手机号 +5. 点击「获取验证码」 +6. 解析拼图验证码并执行滑块拖动 +7. 监听 hn_userEquitys/getYanZhenMa/v2 响应并输出 + +用法:python test1.py [--phone 手机号] + 调试时可在代码中修改 DEFAULT_PHONE +""" +from __future__ import annotations + +import argparse +from typing import Any +import base64 +import random +import time +from io import BytesIO + +import numpy as np +from PIL import Image + +from DrissionPage import ChromiumPage, ChromiumOptions + +# ========== 调试用:在代码中直接指定手机号 ========== +DEFAULT_PHONE = "17375712810" + +# 需要监听的 URL 特征(滑块通过后前端会请求此接口) +GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" + + +# ---------- 拼图验证码计算 ---------- + +def _to_rgba_array(image_bytes: bytes) -> np.ndarray: + return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16) + + +def _to_rgb_array(image_bytes: bytes) -> np.ndarray: + return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16) + + +def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]: + mask = alpha > threshold + if not mask.any(): + raise ValueError("拼图块 alpha 全透明,无法匹配") + ys, xs = np.where(mask) + return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 + + +def calc_drag_distance_from_bytes( + bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12 +) -> dict: + """计算拼图目标位移(基于背景图 + 拼图块图)。""" + bg = _to_rgb_array(bg_bytes) + piece_rgba = _to_rgba_array(piece_bytes) + + bh, bw = bg.shape[:2] + ph, _ = piece_rgba.shape[:2] + if bh != ph: + raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}") + + alpha = piece_rgba[:, :, 3] + x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold) + + piece_crop = piece_rgba[y0:y1, x0:x1, :3] + mask = alpha[y0:y1, x0:x1] > alpha_threshold + ys, xs = np.where(mask) + piece_pixels = piece_crop[ys, xs] + + patch_h, patch_w = piece_crop.shape[:2] + if patch_w > bw or patch_h > bh: + raise ValueError("拼图块裁剪尺寸超过背景图") + + best_x = 0 + best_score = float("inf") + second_best = float("inf") + max_x = bw - patch_w + + for x in range(max_x + 1): + patch_pixels = bg[y0 + ys, x + xs] + score = float(np.abs(patch_pixels - piece_pixels).mean()) + if score < best_score: + second_best = best_score + best_score = score + best_x = x + elif score < second_best: + second_best = score + + drag_distance = best_x - x0 + confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf") + + return { + "target_x": int(best_x), + "piece_bbox_x0": int(x0), + "piece_bbox_y0": int(y0), + "piece_bbox_w": int(patch_w), + "piece_bbox_h": int(patch_h), + "bg_width": int(bw), + "bg_height": int(bh), + "drag_distance": int(drag_distance), + "best_score": best_score, + "second_best": second_best, + "confidence_ratio": confidence_ratio, + } + + +def parse_data_url(data_url: str) -> bytes: + if not data_url.startswith("data:image"): + raise ValueError("图片不是 data:image URL") + _, data = data_url.split(",", 1) + return base64.b64decode(data) + + +# ---------- 元素查找与点击 ---------- + +def click_safe(ele) -> None: + try: + ele.click() + return + except Exception: + pass + ele.click(by_js=True) + + +def find_first(page, selectors: list[str], timeout: float = 5): + for sel in selectors: + try: + ele = page.ele(sel, timeout=timeout) + if ele: + return ele + except Exception: + continue + return None + + +def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str: + """轮询等待 img 元素的 src 变为有效 data:image URL。""" + deadline = time.time() + timeout + while time.time() < deadline: + src = img_ele.attr("src") or "" + if src.startswith("data:image"): + _prefix, _, b64 = src.partition(",") + if b64.strip(): + return src + time.sleep(interval) + raise RuntimeError( + f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}" + ) + + +# ---------- 滑块拖动(仿人轨迹) ---------- + +def _ease_out_quad(t: float) -> float: + return t * (2 - t) + + +def _ease_out_cubic(t: float) -> float: + return 1 - (1 - t) ** 3 + + +def _ease_out_bounce(t: float) -> float: + if t < 1 / 2.75: + return 7.5625 * t * t + elif t < 2 / 2.75: + t -= 1.5 / 2.75 + return 7.5625 * t * t + 0.75 + elif t < 2.5 / 2.75: + t -= 2.25 / 2.75 + return 7.5625 * t * t + 0.9375 + else: + t -= 2.625 / 2.75 + return 7.5625 * t * t + 0.984375 + + +def build_human_track(distance: int, num_steps: int = 0) -> list[dict]: + """生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。""" + if distance == 0: + return [] + + dist = abs(distance) + sign = 1 if distance > 0 else -1 + + if num_steps <= 0: + num_steps = max(12, int(dist * random.uniform(0.25, 0.4))) + + overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08))) + total = dist + overshoot + + easing = random.choice([_ease_out_quad, _ease_out_cubic]) + + raw_positions: list[float] = [] + for i in range(1, num_steps + 1): + t = i / num_steps + raw_positions.append(easing(t) * total) + + bounce_steps = random.randint(2, 4) + for j in range(1, bounce_steps + 1): + t = j / bounce_steps + raw_positions.append(total - _ease_out_bounce(t) * overshoot) + + track: list[dict] = [] + prev_x = 0.0 + for pos in raw_positions: + dx = round(pos - prev_x) + if dx == 0 and random.random() < 0.3: + continue + prev_x += dx + dy = random.choice([-1, 0, 0, 0, 1]) + dt = ( + random.uniform(0.005, 0.012) + if prev_x < dist * 0.6 + else random.uniform(0.008, 0.025) + ) + if random.random() < 0.03: + dt += random.uniform(0.02, 0.06) + track.append({"dx": sign * dx, "dy": dy, "dt": dt}) + + actual = sum(s["dx"] for s in track) + diff = distance - actual + if diff != 0: + track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)}) + + return track + + +def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None: + page.run_cdp( + "Input.dispatchMouseEvent", + type=event_type, + x=x, + y=y, + button=button, + clickCount=1 if event_type == "mousePressed" else 0, + ) + + +def _get_element_center(page, ele) -> tuple[int, int]: + rect = page.run_js( + """const r = arguments[0].getBoundingClientRect(); + return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""", + ele, + ) + if rect and isinstance(rect, dict): + return int(rect["x"]), int(rect["y"]) + loc = ele.rect.midpoint + return int(loc[0]), int(loc[1]) + + +def drag_slider(page, slider_ele, distance: int) -> None: + """用 CDP 级鼠标事件完成拖拽,模拟真人操作。""" + cx, cy = _get_element_center(page, slider_ele) + + _dispatch_mouse(page, "mouseMoved", cx, cy) + time.sleep(random.uniform(0.03, 0.08)) + + _dispatch_mouse(page, "mousePressed", cx, cy) + time.sleep(random.uniform(0.02, 0.06)) + + cur_x, cur_y = cx, cy + track = build_human_track(distance) + for step in track: + cur_x += step["dx"] + cur_y += step["dy"] + _dispatch_mouse(page, "mouseMoved", cur_x, cur_y) + time.sleep(step["dt"]) + + time.sleep(random.uniform(0.02, 0.06)) + _dispatch_mouse(page, "mouseReleased", cur_x, cur_y) + + +# ---------- 核心自动化流程 ---------- + +def submit_phone( + page, + phone: str, + url: str = "http://yscnb.com/tyyp/", + alpha_threshold: int = 12, + distance_adjust: int = 0, + wait_page: float = 0.3, +) -> Any: + """ + 填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。 + """ + page.get(url) + time.sleep(wait_page) + + # 1. 勾选协议 + agree_checkbox = find_first( + page, + [ + "css:#color-input-red", + "css:input[name='color-input-red']", + 'x://input[@id="color-input-red"]', + "css:input.right-box[type='checkbox']", + ], + timeout=5, + ) + if agree_checkbox: + click_safe(agree_checkbox) + time.sleep(0.4) + + # 2. 立即订购 + order_btn = None + for attempt in range(4): + order_btn = find_first( + page, + [ + "css:div.paybg", + "css:.paybg", + 'x://button[contains(.,"立即订购")]', + 'x://a[contains(.,"立即订购")]', + 'x://span[contains(.,"立即订购")]', + 'x://div[contains(.,"立即订购")]', + 'x://*[contains(text(),"立即订购")]', + 'x://*[contains(.,"立即订购")]', + "css:.btn-order", + "css:button.btn-primary", + "css:button.btn", + "css:a.btn", + ], + timeout=1, + ) + if order_btn: + break + time.sleep(0.25) + + if order_btn: + try: + order_btn.run_js("this.scrollIntoView({block:'center'})") + time.sleep(0.05) + except Exception: + pass + click_safe(order_btn) + time.sleep(0.4) + else: + try: + page.run_js(""" + var nodes = document.querySelectorAll('button, a, span, div'); + for (var i = 0; i < nodes.length; i++) { + var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); + if (t.indexOf('立即订购') >= 0) { + nodes[i].scrollIntoView({block: 'center'}); + nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); + return true; + } + } + return false; + """) + except Exception: + pass + time.sleep(0.4) + + # 3. 输入手机号 + phone_input = find_first( + page, + [ + 'x://*[@placeholder="请输入手机号码"]', + "css:input.inp-txt", + ], + timeout=8, + ) + if not phone_input: + raise RuntimeError("未找到手机号输入框") + + phone_input.input(phone, clear=True) + + agree = find_first( + page, + [ + "css:i.ico-checkbox", + 'x://i[contains(@class,"ico-checkbox")]', + ], + timeout=2, + ) + if agree: + try: + click_safe(agree) + except Exception: + pass + + # 4. 启动监听(必须在点击获取验证码之前) + page.listen.start(GET_YAN_ZHEN_MA_URL) + + send_btn = find_first( + page, + [ + "css:button.btn-code", + 'x://button[contains(text(),"获取验证码")]', + ], + timeout=8, + ) + if not send_btn: + raise RuntimeError("未找到「获取验证码」按钮") + + click_safe(send_btn) + + # 5. 等待滑块弹窗 + verify_box = find_first( + page, + ["css:.verifybox", "css:.verify-bar-area"], + timeout=6, + ) + if not verify_box: + raise RuntimeError("未检测到滑块验证码弹窗") + + bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) + piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) + slider = find_first(page, ["css:.verify-move-block"], timeout=5) + + if not bg_img or not piece_img or not slider: + raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") + + bg_src = wait_for_data_src(bg_img, timeout=10) + piece_src = wait_for_data_src(piece_img, timeout=10) + bg_bytes = parse_data_url(bg_src) + piece_bytes = parse_data_url(piece_src) + + if len(bg_bytes) < 100 or len(piece_bytes) < 100: + raise RuntimeError("验证码图片数据异常") + + match = calc_drag_distance_from_bytes( + bg_bytes, piece_bytes, alpha_threshold=alpha_threshold + ) + bg_display_w = page.run_js( + "const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;", + bg_img, + ) + if not bg_display_w or bg_display_w <= 0: + bg_display_w = match["bg_width"] + scale = float(bg_display_w) / max(1, match["bg_width"]) + move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust) + + # 6. 执行滑块并等待 getYanZhenMa 响应 + drag_slider(page, slider, move_distance) + time.sleep(0.5) + + packet = page.listen.wait(timeout=15, fit_count=False) + if packet is False: + raise RuntimeError(f"滑块拖动后 15s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应") + if isinstance(packet, list): + packet = packet[0] if packet else None + if packet is None: + raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包") + + return packet.response.body + + +def submit_code(page, code: str) -> dict: + """填写短信验证码,点击确认订购按钮。""" + code_input = find_first( + page, + [ + 'x://input[@placeholder*="验证码"]', + 'x://input[@placeholder*="短信"]', + "css:input.inp-txt[type='text']", + "css:input[type='tel']", + "css:.code-input", + "css:input.verify-input", + ], + timeout=8, + ) + if not code_input: + raise RuntimeError("未找到验证码输入框") + + code_input.input(code, clear=True) + time.sleep(0.2) + + confirm_btn = find_first( + page, + [ + "css:img.btn-buy", + "css:.btn-buy", + 'x://img[contains(@class,"btn-buy")]', + 'x://*[contains(@class,"btn-buy")]', + ], + timeout=5, + ) + if not confirm_btn: + raise RuntimeError("未找到确认订购按钮(img.btn-buy)") + + try: + confirm_btn.run_js("this.scrollIntoView({block:'center'})") + time.sleep(0.05) + except Exception: + pass + + click_safe(confirm_btn) + return {"success": True, "message": "已点击确认订购"} + + +# ---------- 入口 ---------- + +def main() -> None: + parser = argparse.ArgumentParser(description="天翼云盘订购页自动化") + parser.add_argument( + "--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)" + ) + parser.add_argument("--url", default="http://yscnb.com/tyyp/", help="目标页面 URL") + parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建") + args = parser.parse_args() + + if args.port: + co = ChromiumOptions().set_local_port(port=args.port) + page = ChromiumPage(addr_or_opts=co) + else: + page = ChromiumPage() + + print(f"打开页面: {args.url},手机号: {args.phone}") + body = submit_phone( + page=page, + phone=args.phone, + url=args.url, + ) + print("hn_userEquitys/getYanZhenMa/v2 响应:") + print(body) + + +if __name__ == "__main__": + main()