1066 lines
34 KiB
Python
1066 lines
34 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
天翼云盘订购页 http://yscnb.com/tyyp/ 自动化脚本(全部代码自包含)
|
||
|
||
自动化流程:
|
||
1. 打开页面
|
||
2. 勾选协议
|
||
3. 点击「立即订购」
|
||
4. 输入手机号
|
||
5. 点击「获取验证码」
|
||
6. 解析拼图验证码并执行滑块拖动
|
||
7. 监听 hn_userEquitys/getYanZhenMa/v2 响应并输出
|
||
|
||
用法:python test1.py [--phone 手机号]
|
||
调试时可在代码中修改 DEFAULT_PHONE
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
from typing import Any, Optional
|
||
import base64
|
||
import random
|
||
import time
|
||
from io import BytesIO
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
from PIL import Image, ImageDraw
|
||
|
||
from DrissionPage import ChromiumPage, ChromiumOptions
|
||
|
||
# ========== 调试用:在代码中直接指定手机号 ==========
|
||
DEFAULT_PHONE = "17375712810"
|
||
|
||
# 需要监听的 URL 特征(滑块通过后前端会请求此接口)
|
||
GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2"
|
||
|
||
|
||
# ---------- 拼图验证码计算 ----------
|
||
|
||
def _to_rgba_array(image_bytes: bytes) -> np.ndarray:
|
||
return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16)
|
||
|
||
|
||
def _to_rgb_array(image_bytes: bytes) -> np.ndarray:
|
||
return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16)
|
||
|
||
|
||
def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]:
|
||
mask = alpha > threshold
|
||
if not mask.any():
|
||
raise ValueError("拼图块 alpha 全透明,无法匹配")
|
||
ys, xs = np.where(mask)
|
||
return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1
|
||
|
||
|
||
def _to_gray(rgb: np.ndarray) -> np.ndarray:
|
||
return (
|
||
rgb[:, :, 0] * 0.299 + rgb[:, :, 1] * 0.587 + rgb[:, :, 2] * 0.114
|
||
).astype(np.float32)
|
||
|
||
|
||
def _grad_x(gray: np.ndarray) -> np.ndarray:
|
||
pad = np.pad(gray, ((0, 0), (1, 1)), mode="edge")
|
||
return np.abs(pad[:, 2:] - pad[:, :-2]) * 0.5
|
||
|
||
|
||
def calc_drag_distance_from_bytes(
|
||
bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12
|
||
) -> dict:
|
||
"""计算拼图目标位移,输出多个候选位移提高滑块命中率。"""
|
||
bg = _to_rgb_array(bg_bytes).astype(np.float32)
|
||
piece_rgba = _to_rgba_array(piece_bytes).astype(np.float32)
|
||
|
||
bh, bw = bg.shape[:2]
|
||
ph, _ = piece_rgba.shape[:2]
|
||
if bh != ph:
|
||
raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}")
|
||
|
||
alpha = piece_rgba[:, :, 3]
|
||
x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold)
|
||
|
||
piece_crop = piece_rgba[y0:y1, x0:x1, :3]
|
||
alpha_crop = alpha[y0:y1, x0:x1]
|
||
mask = alpha_crop > alpha_threshold
|
||
ys, xs = np.where(mask)
|
||
if len(xs) < 10:
|
||
raise ValueError("拼图有效像素过少,无法稳定匹配")
|
||
|
||
piece_gray = _to_gray(piece_crop)
|
||
bg_gray = _to_gray(bg)
|
||
piece_grad = _grad_x(piece_gray)
|
||
bg_grad = _grad_x(bg_gray)
|
||
|
||
piece_gray_pixels = piece_gray[ys, xs]
|
||
piece_grad_pixels = piece_grad[ys, xs]
|
||
weights = np.clip(alpha_crop[ys, xs].astype(np.float32), 1.0, 255.0)
|
||
weights = weights / float(weights.sum())
|
||
|
||
patch_h, patch_w = piece_crop.shape[:2]
|
||
if patch_w > bw or patch_h > bh:
|
||
raise ValueError("拼图块裁剪尺寸超过背景图")
|
||
|
||
max_x = bw - patch_w
|
||
scores: list[tuple[float, int, float, float]] = []
|
||
|
||
for x in range(max_x + 1):
|
||
patch_gray_pixels = bg_gray[y0 + ys, x + xs]
|
||
patch_grad_pixels = bg_grad[y0 + ys, x + xs]
|
||
color_score = float((np.abs(patch_gray_pixels - piece_gray_pixels) * weights).sum())
|
||
grad_score = float((np.abs(patch_grad_pixels - piece_grad_pixels) * weights).sum())
|
||
# 颜色 + 边缘加权,较纯 RGB 差值在压缩噪声下更稳定
|
||
score = color_score * 0.72 + grad_score * 0.28
|
||
scores.append((score, x, color_score, grad_score))
|
||
|
||
scores.sort(key=lambda item: item[0])
|
||
best_score, best_x, best_color_score, best_grad_score = scores[0]
|
||
second_best = scores[1][0] if len(scores) > 1 else float("inf")
|
||
|
||
drag_distance = best_x - x0
|
||
confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf")
|
||
|
||
candidate_xs: list[int] = []
|
||
for _score, x, _color, _grad in scores:
|
||
if any(abs(x - picked) <= 1 for picked in candidate_xs):
|
||
continue
|
||
candidate_xs.append(int(x))
|
||
if len(candidate_xs) >= 5:
|
||
break
|
||
|
||
candidate_distances = [int(x - x0) for x in candidate_xs]
|
||
|
||
return {
|
||
"target_x": int(best_x),
|
||
"piece_bbox_x0": int(x0),
|
||
"piece_bbox_y0": int(y0),
|
||
"piece_bbox_w": int(patch_w),
|
||
"piece_bbox_h": int(patch_h),
|
||
"bg_width": int(bw),
|
||
"bg_height": int(bh),
|
||
"drag_distance": int(drag_distance),
|
||
"best_score": best_score,
|
||
"best_color_score": best_color_score,
|
||
"best_grad_score": best_grad_score,
|
||
"second_best": second_best,
|
||
"confidence_ratio": confidence_ratio,
|
||
"candidate_target_xs": candidate_xs,
|
||
"candidate_drag_distances": candidate_distances,
|
||
}
|
||
|
||
|
||
def parse_data_url(data_url: str) -> bytes:
|
||
if not data_url.startswith("data:image"):
|
||
raise ValueError("图片不是 data:image URL")
|
||
_, data = data_url.split(",", 1)
|
||
return base64.b64decode(data)
|
||
|
||
|
||
# ---------- 元素查找与点击 ----------
|
||
|
||
def click_safe(ele) -> None:
|
||
try:
|
||
ele.click()
|
||
return
|
||
except Exception:
|
||
pass
|
||
ele.click(by_js=True)
|
||
|
||
|
||
def find_first(page, selectors: list[str], timeout: float = 5):
|
||
for sel in selectors:
|
||
try:
|
||
ele = page.ele(sel, timeout=timeout)
|
||
if ele:
|
||
return ele
|
||
except Exception:
|
||
continue
|
||
return None
|
||
|
||
|
||
def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str:
|
||
"""轮询等待 img 元素的 src 变为有效 data:image URL。"""
|
||
deadline = time.time() + timeout
|
||
while time.time() < deadline:
|
||
src = img_ele.attr("src") or ""
|
||
if src.startswith("data:image"):
|
||
_prefix, _, b64 = src.partition(",")
|
||
if b64.strip():
|
||
return src
|
||
time.sleep(interval)
|
||
raise RuntimeError(
|
||
f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}"
|
||
)
|
||
|
||
|
||
def _is_element_visible(page, selector: str) -> bool:
|
||
try:
|
||
visible = page.run_js(
|
||
"""
|
||
const el = document.querySelector(arguments[0]);
|
||
if (!el) return false;
|
||
const st = window.getComputedStyle(el);
|
||
if (st.display === 'none' || st.visibility === 'hidden' || st.opacity === '0') return false;
|
||
const r = el.getBoundingClientRect();
|
||
return r.width > 0 && r.height > 0;
|
||
""",
|
||
selector,
|
||
)
|
||
return bool(visible)
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _read_slider_toast_text(page) -> str:
|
||
try:
|
||
text = page.run_js(
|
||
"""
|
||
const node = document.querySelector('.van-toast--text .van-toast__text')
|
||
|| document.querySelector('.van-toast__text');
|
||
if (!node) return '';
|
||
const popup = node.closest('.van-toast') || node;
|
||
const st = window.getComputedStyle(popup);
|
||
const visible = st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0';
|
||
if (!visible) return '';
|
||
return (node.innerText || node.textContent || '').trim();
|
||
"""
|
||
)
|
||
return (text or "").strip()
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _get_slider_layout_metrics(page, bg_img, slider_ele) -> dict:
|
||
try:
|
||
metrics = page.run_js(
|
||
"""
|
||
const bg = arguments[0];
|
||
const slider = arguments[1];
|
||
const bar = document.querySelector('.verify-bar-area');
|
||
const sub = slider ? slider.querySelector('.verify-sub-block') : null;
|
||
const bgR = bg ? bg.getBoundingClientRect() : {width: 0};
|
||
const sliderR = slider ? slider.getBoundingClientRect() : {width: 0};
|
||
const barR = bar ? bar.getBoundingClientRect() : {width: 0};
|
||
const subR = sub ? sub.getBoundingClientRect() : {width: 0};
|
||
return {
|
||
bg_display_w: Number(bgR.width || 0),
|
||
slider_w: Number(sliderR.width || 0),
|
||
bar_w: Number(barR.width || 0),
|
||
sub_w: Number(subR.width || 0),
|
||
};
|
||
""",
|
||
bg_img,
|
||
slider_ele,
|
||
)
|
||
except Exception:
|
||
metrics = None
|
||
|
||
if not isinstance(metrics, dict):
|
||
metrics = {}
|
||
|
||
bar_w = float(metrics.get("bar_w", 0) or 0)
|
||
slider_w = float(metrics.get("slider_w", 0) or 0)
|
||
track_max = int(round(max(0.0, bar_w - slider_w))) if bar_w > 0 and slider_w > 0 else -1
|
||
|
||
return {
|
||
"bg_display_w": float(metrics.get("bg_display_w", 0) or 0),
|
||
"slider_w": slider_w,
|
||
"bar_w": bar_w,
|
||
"sub_w": float(metrics.get("sub_w", 0) or 0),
|
||
"track_max": track_max,
|
||
}
|
||
|
||
|
||
def _build_move_distance_candidates(
|
||
match: dict,
|
||
scale: float,
|
||
distance_adjust: int,
|
||
track_max: int,
|
||
) -> list[int]:
|
||
base_distances = match.get("candidate_drag_distances") or [int(match["drag_distance"])]
|
||
micro_offsets = (0, -1, 1, -2, 2, -3, 3)
|
||
|
||
candidates: list[int] = []
|
||
seen: set[int] = set()
|
||
|
||
for base in base_distances[:4]:
|
||
scaled = int(round(int(base) * scale)) + int(distance_adjust)
|
||
for offset in micro_offsets:
|
||
val = scaled + offset
|
||
if track_max >= 0:
|
||
val = max(0, min(track_max, val))
|
||
if val not in seen:
|
||
seen.add(val)
|
||
candidates.append(val)
|
||
|
||
if not candidates:
|
||
fallback = int(round(int(match["drag_distance"]) * scale)) + int(distance_adjust)
|
||
if track_max >= 0:
|
||
fallback = max(0, min(track_max, fallback))
|
||
candidates = [fallback]
|
||
|
||
return candidates
|
||
|
||
|
||
def _normalize_listen_packet(packet):
|
||
if packet is False:
|
||
return None
|
||
if isinstance(packet, list):
|
||
return packet[0] if packet else None
|
||
return packet
|
||
|
||
|
||
def _coerce_json_body(body: Any) -> Any:
|
||
if not isinstance(body, str):
|
||
return body
|
||
raw = body.strip()
|
||
if not raw:
|
||
return body
|
||
if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")):
|
||
try:
|
||
return json.loads(raw)
|
||
except Exception:
|
||
return body
|
||
return body
|
||
|
||
|
||
def _code_indicates_success(value: Any) -> Optional[bool]:
|
||
if value is None:
|
||
return None
|
||
if isinstance(value, bool):
|
||
return value
|
||
if isinstance(value, (int, float)):
|
||
return int(value) in (0, 1, 200)
|
||
|
||
text = str(value).strip()
|
||
if not text:
|
||
return None
|
||
lower = text.lower()
|
||
if lower in {"0", "00", "000", "0000", "1", "200", "ok", "success", "true"}:
|
||
return True
|
||
if lower in {"false", "fail", "failed", "error", "err"}:
|
||
return False
|
||
if lower.lstrip("-").isdigit():
|
||
return int(lower) in (0, 1, 200)
|
||
return False
|
||
|
||
|
||
def _to_int_or_none(value: Any) -> Optional[int]:
|
||
try:
|
||
if value is None:
|
||
return None
|
||
if isinstance(value, bool):
|
||
return int(value)
|
||
if isinstance(value, (int, float)):
|
||
return int(value)
|
||
text = str(value).strip()
|
||
if not text:
|
||
return None
|
||
if text.lstrip("-").isdigit():
|
||
return int(text)
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _extract_interface_message(data: dict) -> str:
|
||
for key in ("msg", "message", "error", "errorMsg", "detail", "retMsg"):
|
||
value = data.get(key)
|
||
if isinstance(value, str) and value.strip():
|
||
return value.strip()
|
||
return ""
|
||
|
||
|
||
def _is_sms_already_sent_response(data: dict, message: str) -> bool:
|
||
code_keys = ("code", "retCode", "resultCode", "statusCode", "errno")
|
||
code_val = None
|
||
for key in code_keys:
|
||
if key in data:
|
||
code_val = _to_int_or_none(data.get(key))
|
||
if code_val is not None:
|
||
break
|
||
if code_val != 1001:
|
||
return False
|
||
|
||
text = (message or "").strip()
|
||
if not text:
|
||
return False
|
||
# 这类文案通常表示验证码已下发或短信频控,不应判定为滑块失败。
|
||
hints = ("验证码已发送", "短信验证码已发送", "请稍后重试", "请稍等")
|
||
return any(h in text for h in hints)
|
||
|
||
|
||
def _assert_interface_success(body: Any) -> None:
|
||
data = _coerce_json_body(body)
|
||
if not isinstance(data, dict):
|
||
return
|
||
|
||
fail_reasons: list[str] = []
|
||
message = _extract_interface_message(data)
|
||
is_soft_success = _is_sms_already_sent_response(data, message)
|
||
|
||
for key in ("success", "ok"):
|
||
if key in data and not bool(data[key]):
|
||
fail_reasons.append(f"{key}={data[key]}")
|
||
|
||
for key in ("code", "retCode", "resultCode", "statusCode", "errno"):
|
||
if key not in data:
|
||
continue
|
||
code_ok = _code_indicates_success(data[key])
|
||
if code_ok is False:
|
||
if is_soft_success:
|
||
continue
|
||
fail_reasons.append(f"{key}={data[key]}")
|
||
|
||
if message:
|
||
lowered = message.lower()
|
||
fail_words = ("失败", "错误", "无效", "fail", "error", "invalid", "请重新", "未通过")
|
||
if any(word in lowered for word in fail_words):
|
||
if is_soft_success:
|
||
return
|
||
fail_reasons.append(f"msg={message}")
|
||
|
||
if fail_reasons:
|
||
preview = str(data)
|
||
if len(preview) > 300:
|
||
preview = preview[:300] + "..."
|
||
raise RuntimeError(f"接口返回失败: {', '.join(fail_reasons)}; body={preview}")
|
||
|
||
|
||
def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]:
|
||
deadline = time.time() + timeout
|
||
last_toast = ""
|
||
while time.time() < deadline:
|
||
wait_span = min(0.35, max(0.05, deadline - time.time()))
|
||
packet = _normalize_listen_packet(page.listen.wait(timeout=wait_span, fit_count=False))
|
||
if packet is not None:
|
||
return packet, last_toast
|
||
|
||
toast = _read_slider_toast_text(page)
|
||
if toast:
|
||
last_toast = toast
|
||
|
||
return None, last_toast
|
||
|
||
|
||
def _to_jsonable(value: Any) -> Any:
|
||
if value is None or isinstance(value, (str, int, float, bool)):
|
||
return value
|
||
if isinstance(value, dict):
|
||
return {str(k): _to_jsonable(v) for k, v in value.items()}
|
||
if isinstance(value, (list, tuple, set)):
|
||
return [_to_jsonable(v) for v in value]
|
||
return str(value)
|
||
|
||
|
||
def _analyze_slider_failure(match: dict, layout: dict, attempts: list[dict], last_reason: str) -> dict:
|
||
hypotheses: list[str] = []
|
||
confidence_ratio = float(match.get("confidence_ratio", 0) or 0)
|
||
|
||
if confidence_ratio < 1.03:
|
||
hypotheses.append("拼图匹配置信度偏低,背景与缺口特征区分不明显,易出现位移误差。")
|
||
|
||
toast_texts = [str(a.get("toast") or "") for a in attempts if a.get("toast")]
|
||
if any("验证码错误" in t for t in toast_texts):
|
||
hypotheses.append("前端返回“验证码错误”,通常是最终落点偏差或拖动轨迹被风控识别。")
|
||
|
||
interface_errors = [str(a.get("interface_error") or "") for a in attempts if a.get("interface_error")]
|
||
if interface_errors:
|
||
hypotheses.append("接口已返回失败状态,说明滑块校验请求已发出但服务端未判定通过。")
|
||
|
||
packet_count = sum(1 for a in attempts if a.get("packet_received"))
|
||
if attempts and packet_count == 0:
|
||
hypotheses.append("多次拖动均未捕获到目标接口,请检查接口监听地址是否变化。")
|
||
|
||
if attempts and bool(attempts[-1].get("verify_bar_visible_after")):
|
||
hypotheses.append("验证码弹窗仍可见,当前 challenge 未完成。")
|
||
|
||
boundary_hits = sum(1 for a in attempts if a.get("boundary_hit"))
|
||
if boundary_hits > 0:
|
||
hypotheses.append("拖动距离命中轨道边界,存在位移被截断风险。")
|
||
|
||
if not hypotheses and last_reason:
|
||
hypotheses.append(last_reason)
|
||
if not hypotheses:
|
||
hypotheses.append("未识别出单一原因,建议查看保存的标注图和报告。")
|
||
|
||
return {
|
||
"summary": hypotheses[0],
|
||
"hypotheses": hypotheses,
|
||
"metrics": {
|
||
"confidence_ratio": confidence_ratio,
|
||
"best_score": match.get("best_score"),
|
||
"second_best": match.get("second_best"),
|
||
"attempt_count": len(attempts),
|
||
"packet_count": packet_count,
|
||
"track_max": layout.get("track_max"),
|
||
},
|
||
}
|
||
|
||
|
||
def _save_failure_artifacts(
|
||
page,
|
||
bg_bytes: bytes,
|
||
piece_bytes: bytes,
|
||
match: dict,
|
||
layout: dict,
|
||
move_distances: list[int],
|
||
attempts: list[dict],
|
||
last_reason: str,
|
||
analysis: dict,
|
||
) -> Path:
|
||
root = Path(__file__).resolve().parent / "captcha_failures"
|
||
case_dir = root / datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
||
case_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
(case_dir / "bg.png").write_bytes(bg_bytes)
|
||
(case_dir / "piece.png").write_bytes(piece_bytes)
|
||
|
||
try:
|
||
page.get_screenshot(path=str(case_dir / "page.png"))
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
bg_img = Image.open(BytesIO(bg_bytes)).convert("RGB")
|
||
draw = ImageDraw.Draw(bg_img)
|
||
y0 = int(match.get("piece_bbox_y0", 0) or 0)
|
||
h = int(match.get("piece_bbox_h", 0) or 0)
|
||
w = int(match.get("piece_bbox_w", 0) or 0)
|
||
candidate_xs = match.get("candidate_target_xs") or [match.get("target_x", 0)]
|
||
colors = [(255, 0, 0), (255, 140, 0), (0, 128, 255), (0, 180, 80), (160, 80, 255)]
|
||
for idx, x in enumerate(candidate_xs[:5], start=1):
|
||
color = colors[(idx - 1) % len(colors)]
|
||
x = int(x)
|
||
draw.rectangle([x, y0, x + w, y0 + h], outline=color, width=2 if idx == 1 else 1)
|
||
draw.text((x + 2, max(0, y0 - 14)), f"#{idx}", fill=color)
|
||
bg_img.save(case_dir / "bg_overlay.png")
|
||
except Exception:
|
||
pass
|
||
|
||
report = {
|
||
"created_at": datetime.now().isoformat(timespec="seconds"),
|
||
"failure_reason": last_reason,
|
||
"analysis": analysis,
|
||
"match": _to_jsonable(match),
|
||
"layout": _to_jsonable(layout),
|
||
"move_distances": move_distances,
|
||
"attempts": _to_jsonable(attempts),
|
||
}
|
||
(case_dir / "report.json").write_text(
|
||
json.dumps(report, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
return case_dir
|
||
|
||
|
||
# ---------- 滑块拖动(仿人轨迹) ----------
|
||
|
||
def _ease_out_quad(t: float) -> float:
|
||
return t * (2 - t)
|
||
|
||
|
||
def _ease_out_cubic(t: float) -> float:
|
||
return 1 - (1 - t) ** 3
|
||
|
||
|
||
def _ease_out_bounce(t: float) -> float:
|
||
if t < 1 / 2.75:
|
||
return 7.5625 * t * t
|
||
elif t < 2 / 2.75:
|
||
t -= 1.5 / 2.75
|
||
return 7.5625 * t * t + 0.75
|
||
elif t < 2.5 / 2.75:
|
||
t -= 2.25 / 2.75
|
||
return 7.5625 * t * t + 0.9375
|
||
else:
|
||
t -= 2.625 / 2.75
|
||
return 7.5625 * t * t + 0.984375
|
||
|
||
|
||
def build_human_track(distance: int, num_steps: int = 0) -> list[dict]:
|
||
"""生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。"""
|
||
if distance == 0:
|
||
return []
|
||
|
||
dist = abs(distance)
|
||
sign = 1 if distance > 0 else -1
|
||
|
||
if num_steps <= 0:
|
||
num_steps = max(12, int(dist * random.uniform(0.25, 0.4)))
|
||
|
||
overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08)))
|
||
total = dist + overshoot
|
||
|
||
easing = random.choice([_ease_out_quad, _ease_out_cubic])
|
||
|
||
raw_positions: list[float] = []
|
||
for i in range(1, num_steps + 1):
|
||
t = i / num_steps
|
||
raw_positions.append(easing(t) * total)
|
||
|
||
bounce_steps = random.randint(2, 4)
|
||
for j in range(1, bounce_steps + 1):
|
||
t = j / bounce_steps
|
||
raw_positions.append(total - _ease_out_bounce(t) * overshoot)
|
||
|
||
track: list[dict] = []
|
||
prev_x = 0.0
|
||
for pos in raw_positions:
|
||
dx = round(pos - prev_x)
|
||
if dx == 0 and random.random() < 0.3:
|
||
continue
|
||
prev_x += dx
|
||
dy = random.choice([-1, 0, 0, 0, 1])
|
||
dt = (
|
||
random.uniform(0.005, 0.012)
|
||
if prev_x < dist * 0.6
|
||
else random.uniform(0.008, 0.025)
|
||
)
|
||
if random.random() < 0.03:
|
||
dt += random.uniform(0.02, 0.06)
|
||
track.append({"dx": sign * dx, "dy": dy, "dt": dt})
|
||
|
||
actual = sum(s["dx"] for s in track)
|
||
diff = distance - actual
|
||
if diff != 0:
|
||
track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)})
|
||
|
||
return track
|
||
|
||
|
||
def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None:
|
||
page.run_cdp(
|
||
"Input.dispatchMouseEvent",
|
||
type=event_type,
|
||
x=x,
|
||
y=y,
|
||
button=button,
|
||
clickCount=1 if event_type == "mousePressed" else 0,
|
||
)
|
||
|
||
|
||
def _get_element_center(page, ele) -> tuple[int, int]:
|
||
rect = page.run_js(
|
||
"""const r = arguments[0].getBoundingClientRect();
|
||
return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""",
|
||
ele,
|
||
)
|
||
if rect and isinstance(rect, dict):
|
||
return int(rect["x"]), int(rect["y"])
|
||
loc = ele.rect.midpoint
|
||
return int(loc[0]), int(loc[1])
|
||
|
||
|
||
def drag_slider(page, slider_ele, distance: int) -> None:
|
||
"""用 CDP 级鼠标事件完成拖拽,模拟真人操作。"""
|
||
cx, cy = _get_element_center(page, slider_ele)
|
||
|
||
_dispatch_mouse(page, "mouseMoved", cx, cy)
|
||
time.sleep(random.uniform(0.03, 0.08))
|
||
|
||
_dispatch_mouse(page, "mousePressed", cx, cy)
|
||
time.sleep(random.uniform(0.02, 0.06))
|
||
|
||
cur_x, cur_y = cx, cy
|
||
track = build_human_track(distance)
|
||
for step in track:
|
||
cur_x += step["dx"]
|
||
cur_y += step["dy"]
|
||
_dispatch_mouse(page, "mouseMoved", cur_x, cur_y)
|
||
time.sleep(step["dt"])
|
||
|
||
time.sleep(random.uniform(0.02, 0.06))
|
||
_dispatch_mouse(page, "mouseReleased", cur_x, cur_y)
|
||
|
||
|
||
# ---------- 核心自动化流程 ----------
|
||
|
||
def submit_phone(
|
||
page,
|
||
phone: str,
|
||
url: str = "http://yscnb.com/tyyp/",
|
||
alpha_threshold: int = 12,
|
||
distance_adjust: int = 0,
|
||
wait_page: float = 0.3,
|
||
) -> Any:
|
||
"""
|
||
填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。
|
||
"""
|
||
page.get(url)
|
||
time.sleep(wait_page)
|
||
|
||
# 1. 勾选协议
|
||
agree_checkbox = find_first(
|
||
page,
|
||
[
|
||
"css:#color-input-red",
|
||
"css:input[name='color-input-red']",
|
||
'x://input[@id="color-input-red"]',
|
||
"css:input.right-box[type='checkbox']",
|
||
],
|
||
timeout=5,
|
||
)
|
||
if agree_checkbox:
|
||
click_safe(agree_checkbox)
|
||
time.sleep(0.4)
|
||
|
||
# 2. 立即订购
|
||
order_btn = None
|
||
for attempt in range(4):
|
||
order_btn = find_first(
|
||
page,
|
||
[
|
||
"css:div.paybg",
|
||
"css:.paybg",
|
||
'x://button[contains(.,"立即订购")]',
|
||
'x://a[contains(.,"立即订购")]',
|
||
'x://span[contains(.,"立即订购")]',
|
||
'x://div[contains(.,"立即订购")]',
|
||
'x://*[contains(text(),"立即订购")]',
|
||
'x://*[contains(.,"立即订购")]',
|
||
"css:.btn-order",
|
||
"css:button.btn-primary",
|
||
"css:button.btn",
|
||
"css:a.btn",
|
||
],
|
||
timeout=1,
|
||
)
|
||
if order_btn:
|
||
break
|
||
time.sleep(0.25)
|
||
|
||
if order_btn:
|
||
try:
|
||
order_btn.run_js("this.scrollIntoView({block:'center'})")
|
||
time.sleep(0.05)
|
||
except Exception:
|
||
pass
|
||
click_safe(order_btn)
|
||
time.sleep(0.4)
|
||
else:
|
||
try:
|
||
page.run_js("""
|
||
var nodes = document.querySelectorAll('button, a, span, div');
|
||
for (var i = 0; i < nodes.length; i++) {
|
||
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
|
||
if (t.indexOf('立即订购') >= 0) {
|
||
nodes[i].scrollIntoView({block: 'center'});
|
||
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
""")
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.4)
|
||
|
||
# 3. 输入手机号
|
||
phone_input = find_first(
|
||
page,
|
||
[
|
||
'x://*[@placeholder="请输入手机号码"]',
|
||
"css:input.inp-txt",
|
||
],
|
||
timeout=8,
|
||
)
|
||
if not phone_input:
|
||
raise RuntimeError("未找到手机号输入框")
|
||
|
||
phone_input.input(phone, clear=True)
|
||
|
||
agree = find_first(
|
||
page,
|
||
[
|
||
"css:i.ico-checkbox",
|
||
'x://i[contains(@class,"ico-checkbox")]',
|
||
],
|
||
timeout=2,
|
||
)
|
||
if agree:
|
||
try:
|
||
click_safe(agree)
|
||
except Exception:
|
||
pass
|
||
|
||
# 4. 启动监听(必须在点击获取验证码之前)
|
||
page.listen.start(GET_YAN_ZHEN_MA_URL)
|
||
|
||
send_btn = find_first(
|
||
page,
|
||
[
|
||
"css:button.btn-code",
|
||
'x://button[contains(text(),"获取验证码")]',
|
||
],
|
||
timeout=8,
|
||
)
|
||
if not send_btn:
|
||
raise RuntimeError("未找到「获取验证码」按钮")
|
||
|
||
click_safe(send_btn)
|
||
|
||
# 5. 等待滑块弹窗
|
||
verify_box = find_first(
|
||
page,
|
||
["css:.verifybox", "css:.verify-bar-area"],
|
||
timeout=6,
|
||
)
|
||
if not verify_box:
|
||
raise RuntimeError("未检测到滑块验证码弹窗")
|
||
|
||
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
|
||
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
|
||
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
|
||
|
||
if not bg_img or not piece_img or not slider:
|
||
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
|
||
|
||
# 6. 每次重试前重新抓取验证码图片并重算位移(不复用旧距离)
|
||
deadline = time.time() + 15.0
|
||
last_reason = ""
|
||
attempts: list[dict[str, Any]] = []
|
||
max_attempts = 6
|
||
last_bg_bytes = b""
|
||
last_piece_bytes = b""
|
||
last_match: dict[str, Any] = {}
|
||
last_layout: dict[str, Any] = {}
|
||
last_move_distances: list[int] = []
|
||
|
||
for idx in range(1, max_attempts + 1):
|
||
if time.time() >= deadline:
|
||
break
|
||
|
||
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=2)
|
||
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=2)
|
||
slider = find_first(page, ["css:.verify-move-block"], timeout=2)
|
||
|
||
attempt: dict[str, Any] = {
|
||
"index": idx,
|
||
"move_distance": None,
|
||
"boundary_hit": False,
|
||
"packet_received": False,
|
||
"toast": "",
|
||
"interface_error": "",
|
||
"verify_bar_visible_after": False,
|
||
}
|
||
|
||
if not bg_img or not piece_img or not slider:
|
||
last_reason = "重试时未找到验证码关键元素(背景图/拼图块/滑块)"
|
||
attempt["interface_error"] = last_reason
|
||
attempts.append(attempt)
|
||
break
|
||
|
||
try:
|
||
bg_src = wait_for_data_src(bg_img, timeout=4)
|
||
piece_src = wait_for_data_src(piece_img, timeout=4)
|
||
bg_bytes = parse_data_url(bg_src)
|
||
piece_bytes = parse_data_url(piece_src)
|
||
except Exception as e:
|
||
last_reason = f"重试时读取验证码图片失败: {e}"
|
||
attempt["interface_error"] = last_reason
|
||
attempts.append(attempt)
|
||
continue
|
||
|
||
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
|
||
last_reason = f"验证码图片数据异常: bg={len(bg_bytes)}B piece={len(piece_bytes)}B"
|
||
attempt["interface_error"] = last_reason
|
||
attempts.append(attempt)
|
||
continue
|
||
|
||
try:
|
||
match = calc_drag_distance_from_bytes(
|
||
bg_bytes, piece_bytes, alpha_threshold=alpha_threshold
|
||
)
|
||
except Exception as e:
|
||
last_reason = f"重试时位移计算失败: {e}"
|
||
attempt["interface_error"] = last_reason
|
||
attempts.append(attempt)
|
||
continue
|
||
|
||
layout = _get_slider_layout_metrics(page, bg_img, slider)
|
||
bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"]
|
||
scale = float(bg_display_w) / max(1, match["bg_width"])
|
||
track_max = int(layout["track_max"])
|
||
move_distances = _build_move_distance_candidates(
|
||
match=match,
|
||
scale=scale,
|
||
distance_adjust=int(distance_adjust),
|
||
track_max=track_max,
|
||
)
|
||
pick_index = min(max(0, idx - 1), len(move_distances) - 1)
|
||
move_distance = int(move_distances[pick_index])
|
||
|
||
attempt["move_distance"] = move_distance
|
||
attempt["boundary_hit"] = bool(track_max >= 0 and move_distance in (0, track_max))
|
||
attempt["confidence_ratio"] = float(match.get("confidence_ratio", 0) or 0)
|
||
attempt["candidate_count"] = len(move_distances)
|
||
|
||
# 记录最后一次有效计算,用于失败样本保存与分析
|
||
last_bg_bytes = bg_bytes
|
||
last_piece_bytes = piece_bytes
|
||
last_match = match
|
||
last_layout = layout
|
||
last_move_distances = move_distances
|
||
|
||
drag_slider(page, slider, move_distance)
|
||
time.sleep(0.2)
|
||
|
||
remaining = max(0.1, deadline - time.time())
|
||
packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining))
|
||
attempt["toast"] = toast_text or ""
|
||
if packet is not None:
|
||
attempt["packet_received"] = True
|
||
response = getattr(packet, "response", None)
|
||
if response is None:
|
||
attempt["interface_error"] = f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段"
|
||
last_reason = attempt["interface_error"]
|
||
attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area")
|
||
attempts.append(attempt)
|
||
if not attempt["verify_bar_visible_after"]:
|
||
break
|
||
continue
|
||
|
||
body = _coerce_json_body(response.body)
|
||
attempt["response_body"] = _to_jsonable(body)
|
||
try:
|
||
_assert_interface_success(body)
|
||
return body
|
||
except Exception as e:
|
||
attempt["interface_error"] = str(e)
|
||
last_reason = attempt["interface_error"]
|
||
attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area")
|
||
attempts.append(attempt)
|
||
if not attempt["verify_bar_visible_after"]:
|
||
break
|
||
continue
|
||
|
||
if toast_text:
|
||
last_reason = f"滑块验证失败:{toast_text}"
|
||
elif _is_element_visible(page, ".verify-bar-area"):
|
||
last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}"
|
||
else:
|
||
last_reason = "验证码弹窗已关闭但未捕获接口响应"
|
||
attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area")
|
||
attempt["interface_error"] = last_reason if not attempt["interface_error"] else attempt["interface_error"]
|
||
attempts.append(attempt)
|
||
|
||
analysis = _analyze_slider_failure(
|
||
match=last_match,
|
||
layout=last_layout,
|
||
attempts=attempts,
|
||
last_reason=last_reason,
|
||
)
|
||
save_info = ""
|
||
try:
|
||
case_dir = _save_failure_artifacts(
|
||
page=page,
|
||
bg_bytes=last_bg_bytes,
|
||
piece_bytes=last_piece_bytes,
|
||
match=last_match,
|
||
layout=last_layout,
|
||
move_distances=last_move_distances,
|
||
attempts=attempts,
|
||
last_reason=last_reason,
|
||
analysis=analysis,
|
||
)
|
||
save_info = f"; 失败样本目录: {case_dir}"
|
||
except Exception as e:
|
||
save_info = f"; 失败样本保存失败: {e}"
|
||
|
||
raise RuntimeError(
|
||
f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}"
|
||
f"; 分析: {analysis.get('summary', '无')}"
|
||
f"{save_info}"
|
||
)
|
||
|
||
|
||
def submit_code(page, code: str) -> dict:
|
||
"""填写短信验证码,点击确认订购按钮。"""
|
||
code_input = find_first(
|
||
page,
|
||
[
|
||
'x://input[@placeholder*="验证码"]',
|
||
'x://input[@placeholder*="短信"]',
|
||
"css:input.inp-txt[type='text']",
|
||
"css:input[type='tel']",
|
||
"css:.code-input",
|
||
"css:input.verify-input",
|
||
],
|
||
timeout=8,
|
||
)
|
||
if not code_input:
|
||
raise RuntimeError("未找到验证码输入框")
|
||
|
||
code_input.input(code, clear=True)
|
||
time.sleep(0.2)
|
||
|
||
confirm_btn = find_first(
|
||
page,
|
||
[
|
||
"css:img.btn-buy",
|
||
"css:.btn-buy",
|
||
'x://img[contains(@class,"btn-buy")]',
|
||
'x://*[contains(@class,"btn-buy")]',
|
||
],
|
||
timeout=5,
|
||
)
|
||
if not confirm_btn:
|
||
raise RuntimeError("未找到确认订购按钮(img.btn-buy)")
|
||
|
||
try:
|
||
confirm_btn.run_js("this.scrollIntoView({block:'center'})")
|
||
time.sleep(0.05)
|
||
except Exception:
|
||
pass
|
||
|
||
click_safe(confirm_btn)
|
||
return {"success": True, "message": "已点击确认订购"}
|
||
|
||
|
||
# ---------- 入口 ----------
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="天翼云盘订购页自动化")
|
||
parser.add_argument(
|
||
"--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)"
|
||
)
|
||
parser.add_argument("--url", default="http://yscnb.com/tyyp/", help="目标页面 URL")
|
||
parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建")
|
||
args = parser.parse_args()
|
||
|
||
if args.port:
|
||
co = ChromiumOptions().set_local_port(port=args.port)
|
||
page = ChromiumPage(addr_or_opts=co)
|
||
else:
|
||
page = ChromiumPage()
|
||
|
||
print(f"打开页面: {args.url},手机号: {args.phone}")
|
||
body = submit_phone(
|
||
page=page,
|
||
phone=args.phone,
|
||
url=args.url,
|
||
)
|
||
print("hn_userEquitys/getYanZhenMa/v2 响应:")
|
||
print(body)
|
||
|
||
input_code(page=page, code=123123)
|
||
|
||
|
||
def input_code(page, code):
|
||
page.ele('x://input[@placeholder="请输入验证码"]').input(code,clear=True)
|
||
time.sleep(0.5)
|
||
|
||
page.listen.start("hn_userEquitys/common/order")
|
||
page.ele('x://*[@id="app"]/div/img').click(by_js=True)
|
||
time.sleep(0.5)
|
||
page.ele('x://*[@id="app"]/div/div[7]/div/div[3]/button[2]').click(by_js=True)
|
||
res = page.listen.wait()
|
||
print(res.response.body)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|