This commit is contained in:
ddrwode
2026-02-27 13:37:07 +08:00
parent 8fccaea689
commit f600b1720e
2 changed files with 547 additions and 0 deletions

21
test.py Normal file
View File

@@ -0,0 +1,21 @@
from DrissionPage import ChromiumPage, ChromiumOptions
page = ChromiumPage()
page.get("https://flows.cdyylkj.com/miguMusicTL/")
page.listen.start("api/migu/P2973")
page.ele('x://*[@placeholder="请输入您的移动手机号"]').input("18981818763")
page.ele('x://*[@id="app"]/div[1]/div[4]/div/div').click(by_js=True)
page.ele('x://button[normalize-space(text()) ="办理"]').click(by_js=True)
res = page.listen.wait()
print(res.response.body)

526
test1.py Normal file
View File

@@ -0,0 +1,526 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
天翼云盘订购页 http://yscnb.com/tyyp/ 自动化脚本(全部代码自包含)
自动化流程:
1. 打开页面
2. 勾选协议
3. 点击「立即订购」
4. 输入手机号
5. 点击「获取验证码」
6. 解析拼图验证码并执行滑块拖动
7. 监听 hn_userEquitys/getYanZhenMa/v2 响应并输出
用法python test1.py [--phone 手机号]
调试时可在代码中修改 DEFAULT_PHONE
"""
from __future__ import annotations
import argparse
from typing import Any
import base64
import random
import time
from io import BytesIO
import numpy as np
from PIL import Image
from DrissionPage import ChromiumPage, ChromiumOptions
# ========== 调试用:在代码中直接指定手机号 ==========
DEFAULT_PHONE = "17375712810"
# 需要监听的 URL 特征(滑块通过后前端会请求此接口)
GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2"
# ---------- 拼图验证码计算 ----------
def _to_rgba_array(image_bytes: bytes) -> np.ndarray:
return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16)
def _to_rgb_array(image_bytes: bytes) -> np.ndarray:
return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16)
def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]:
mask = alpha > threshold
if not mask.any():
raise ValueError("拼图块 alpha 全透明,无法匹配")
ys, xs = np.where(mask)
return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1
def calc_drag_distance_from_bytes(
bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12
) -> dict:
"""计算拼图目标位移(基于背景图 + 拼图块图)。"""
bg = _to_rgb_array(bg_bytes)
piece_rgba = _to_rgba_array(piece_bytes)
bh, bw = bg.shape[:2]
ph, _ = piece_rgba.shape[:2]
if bh != ph:
raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}")
alpha = piece_rgba[:, :, 3]
x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold)
piece_crop = piece_rgba[y0:y1, x0:x1, :3]
mask = alpha[y0:y1, x0:x1] > alpha_threshold
ys, xs = np.where(mask)
piece_pixels = piece_crop[ys, xs]
patch_h, patch_w = piece_crop.shape[:2]
if patch_w > bw or patch_h > bh:
raise ValueError("拼图块裁剪尺寸超过背景图")
best_x = 0
best_score = float("inf")
second_best = float("inf")
max_x = bw - patch_w
for x in range(max_x + 1):
patch_pixels = bg[y0 + ys, x + xs]
score = float(np.abs(patch_pixels - piece_pixels).mean())
if score < best_score:
second_best = best_score
best_score = score
best_x = x
elif score < second_best:
second_best = score
drag_distance = best_x - x0
confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf")
return {
"target_x": int(best_x),
"piece_bbox_x0": int(x0),
"piece_bbox_y0": int(y0),
"piece_bbox_w": int(patch_w),
"piece_bbox_h": int(patch_h),
"bg_width": int(bw),
"bg_height": int(bh),
"drag_distance": int(drag_distance),
"best_score": best_score,
"second_best": second_best,
"confidence_ratio": confidence_ratio,
}
def parse_data_url(data_url: str) -> bytes:
if not data_url.startswith("data:image"):
raise ValueError("图片不是 data:image URL")
_, data = data_url.split(",", 1)
return base64.b64decode(data)
# ---------- 元素查找与点击 ----------
def click_safe(ele) -> None:
try:
ele.click()
return
except Exception:
pass
ele.click(by_js=True)
def find_first(page, selectors: list[str], timeout: float = 5):
for sel in selectors:
try:
ele = page.ele(sel, timeout=timeout)
if ele:
return ele
except Exception:
continue
return None
def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str:
"""轮询等待 img 元素的 src 变为有效 data:image URL。"""
deadline = time.time() + timeout
while time.time() < deadline:
src = img_ele.attr("src") or ""
if src.startswith("data:image"):
_prefix, _, b64 = src.partition(",")
if b64.strip():
return src
time.sleep(interval)
raise RuntimeError(
f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}"
)
# ---------- 滑块拖动(仿人轨迹) ----------
def _ease_out_quad(t: float) -> float:
return t * (2 - t)
def _ease_out_cubic(t: float) -> float:
return 1 - (1 - t) ** 3
def _ease_out_bounce(t: float) -> float:
if t < 1 / 2.75:
return 7.5625 * t * t
elif t < 2 / 2.75:
t -= 1.5 / 2.75
return 7.5625 * t * t + 0.75
elif t < 2.5 / 2.75:
t -= 2.25 / 2.75
return 7.5625 * t * t + 0.9375
else:
t -= 2.625 / 2.75
return 7.5625 * t * t + 0.984375
def build_human_track(distance: int, num_steps: int = 0) -> list[dict]:
"""生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。"""
if distance == 0:
return []
dist = abs(distance)
sign = 1 if distance > 0 else -1
if num_steps <= 0:
num_steps = max(12, int(dist * random.uniform(0.25, 0.4)))
overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08)))
total = dist + overshoot
easing = random.choice([_ease_out_quad, _ease_out_cubic])
raw_positions: list[float] = []
for i in range(1, num_steps + 1):
t = i / num_steps
raw_positions.append(easing(t) * total)
bounce_steps = random.randint(2, 4)
for j in range(1, bounce_steps + 1):
t = j / bounce_steps
raw_positions.append(total - _ease_out_bounce(t) * overshoot)
track: list[dict] = []
prev_x = 0.0
for pos in raw_positions:
dx = round(pos - prev_x)
if dx == 0 and random.random() < 0.3:
continue
prev_x += dx
dy = random.choice([-1, 0, 0, 0, 1])
dt = (
random.uniform(0.005, 0.012)
if prev_x < dist * 0.6
else random.uniform(0.008, 0.025)
)
if random.random() < 0.03:
dt += random.uniform(0.02, 0.06)
track.append({"dx": sign * dx, "dy": dy, "dt": dt})
actual = sum(s["dx"] for s in track)
diff = distance - actual
if diff != 0:
track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)})
return track
def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None:
page.run_cdp(
"Input.dispatchMouseEvent",
type=event_type,
x=x,
y=y,
button=button,
clickCount=1 if event_type == "mousePressed" else 0,
)
def _get_element_center(page, ele) -> tuple[int, int]:
rect = page.run_js(
"""const r = arguments[0].getBoundingClientRect();
return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""",
ele,
)
if rect and isinstance(rect, dict):
return int(rect["x"]), int(rect["y"])
loc = ele.rect.midpoint
return int(loc[0]), int(loc[1])
def drag_slider(page, slider_ele, distance: int) -> None:
"""用 CDP 级鼠标事件完成拖拽,模拟真人操作。"""
cx, cy = _get_element_center(page, slider_ele)
_dispatch_mouse(page, "mouseMoved", cx, cy)
time.sleep(random.uniform(0.03, 0.08))
_dispatch_mouse(page, "mousePressed", cx, cy)
time.sleep(random.uniform(0.02, 0.06))
cur_x, cur_y = cx, cy
track = build_human_track(distance)
for step in track:
cur_x += step["dx"]
cur_y += step["dy"]
_dispatch_mouse(page, "mouseMoved", cur_x, cur_y)
time.sleep(step["dt"])
time.sleep(random.uniform(0.02, 0.06))
_dispatch_mouse(page, "mouseReleased", cur_x, cur_y)
# ---------- 核心自动化流程 ----------
def submit_phone(
page,
phone: str,
url: str = "http://yscnb.com/tyyp/",
alpha_threshold: int = 12,
distance_adjust: int = 0,
wait_page: float = 0.3,
) -> Any:
"""
填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。
"""
page.get(url)
time.sleep(wait_page)
# 1. 勾选协议
agree_checkbox = find_first(
page,
[
"css:#color-input-red",
"css:input[name='color-input-red']",
'x://input[@id="color-input-red"]',
"css:input.right-box[type='checkbox']",
],
timeout=5,
)
if agree_checkbox:
click_safe(agree_checkbox)
time.sleep(0.4)
# 2. 立即订购
order_btn = None
for attempt in range(4):
order_btn = find_first(
page,
[
"css:div.paybg",
"css:.paybg",
'x://button[contains(.,"立即订购")]',
'x://a[contains(.,"立即订购")]',
'x://span[contains(.,"立即订购")]',
'x://div[contains(.,"立即订购")]',
'x://*[contains(text(),"立即订购")]',
'x://*[contains(.,"立即订购")]',
"css:.btn-order",
"css:button.btn-primary",
"css:button.btn",
"css:a.btn",
],
timeout=1,
)
if order_btn:
break
time.sleep(0.25)
if order_btn:
try:
order_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(order_btn)
time.sleep(0.4)
else:
try:
page.run_js("""
var nodes = document.querySelectorAll('button, a, span, div');
for (var i = 0; i < nodes.length; i++) {
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
if (t.indexOf('立即订购') >= 0) {
nodes[i].scrollIntoView({block: 'center'});
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
return true;
}
}
return false;
""")
except Exception:
pass
time.sleep(0.4)
# 3. 输入手机号
phone_input = find_first(
page,
[
'x://*[@placeholder="请输入手机号码"]',
"css:input.inp-txt",
],
timeout=8,
)
if not phone_input:
raise RuntimeError("未找到手机号输入框")
phone_input.input(phone, clear=True)
agree = find_first(
page,
[
"css:i.ico-checkbox",
'x://i[contains(@class,"ico-checkbox")]',
],
timeout=2,
)
if agree:
try:
click_safe(agree)
except Exception:
pass
# 4. 启动监听(必须在点击获取验证码之前)
page.listen.start(GET_YAN_ZHEN_MA_URL)
send_btn = find_first(
page,
[
"css:button.btn-code",
'x://button[contains(text(),"获取验证码")]',
],
timeout=8,
)
if not send_btn:
raise RuntimeError("未找到「获取验证码」按钮")
click_safe(send_btn)
# 5. 等待滑块弹窗
verify_box = find_first(
page,
["css:.verifybox", "css:.verify-bar-area"],
timeout=6,
)
if not verify_box:
raise RuntimeError("未检测到滑块验证码弹窗")
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
if not bg_img or not piece_img or not slider:
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
bg_src = wait_for_data_src(bg_img, timeout=10)
piece_src = wait_for_data_src(piece_img, timeout=10)
bg_bytes = parse_data_url(bg_src)
piece_bytes = parse_data_url(piece_src)
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
raise RuntimeError("验证码图片数据异常")
match = calc_drag_distance_from_bytes(
bg_bytes, piece_bytes, alpha_threshold=alpha_threshold
)
bg_display_w = page.run_js(
"const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;",
bg_img,
)
if not bg_display_w or bg_display_w <= 0:
bg_display_w = match["bg_width"]
scale = float(bg_display_w) / max(1, match["bg_width"])
move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust)
# 6. 执行滑块并等待 getYanZhenMa 响应
drag_slider(page, slider, move_distance)
time.sleep(0.5)
packet = page.listen.wait(timeout=15, fit_count=False)
if packet is False:
raise RuntimeError(f"滑块拖动后 15s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应")
if isinstance(packet, list):
packet = packet[0] if packet else None
if packet is None:
raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包")
return packet.response.body
def submit_code(page, code: str) -> dict:
"""填写短信验证码,点击确认订购按钮。"""
code_input = find_first(
page,
[
'x://input[@placeholder*="验证码"]',
'x://input[@placeholder*="短信"]',
"css:input.inp-txt[type='text']",
"css:input[type='tel']",
"css:.code-input",
"css:input.verify-input",
],
timeout=8,
)
if not code_input:
raise RuntimeError("未找到验证码输入框")
code_input.input(code, clear=True)
time.sleep(0.2)
confirm_btn = find_first(
page,
[
"css:img.btn-buy",
"css:.btn-buy",
'x://img[contains(@class,"btn-buy")]',
'x://*[contains(@class,"btn-buy")]',
],
timeout=5,
)
if not confirm_btn:
raise RuntimeError("未找到确认订购按钮img.btn-buy")
try:
confirm_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(confirm_btn)
return {"success": True, "message": "已点击确认订购"}
# ---------- 入口 ----------
def main() -> None:
parser = argparse.ArgumentParser(description="天翼云盘订购页自动化")
parser.add_argument(
"--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE"
)
parser.add_argument("--url", default="http://yscnb.com/tyyp/", help="目标页面 URL")
parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口0 表示新建")
args = parser.parse_args()
if args.port:
co = ChromiumOptions().set_local_port(port=args.port)
page = ChromiumPage(addr_or_opts=co)
else:
page = ChromiumPage()
print(f"打开页面: {args.url},手机号: {args.phone}")
body = submit_phone(
page=page,
phone=args.phone,
url=args.url,
)
print("hn_userEquitys/getYanZhenMa/v2 响应:")
print(body)
if __name__ == "__main__":
main()