550 lines
19 KiB
Python
550 lines
19 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
站点流程自动化(DrissionPage):
|
|||
|
|
1) 打开 http://yscnb.com/tyyp/1.html
|
|||
|
|
2) 输入手机号
|
|||
|
|
3) 点击“获取验证码”
|
|||
|
|
4) 解析弹窗中的拼图验证码并执行拖动
|
|||
|
|
|
|||
|
|
说明:仅实现到“滑块拖动”步骤,不会自动填写短信验证码。
|
|||
|
|
"""
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import base64
|
|||
|
|
import random
|
|||
|
|
import re
|
|||
|
|
import time
|
|||
|
|
from io import BytesIO
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
import numpy as np
|
|||
|
|
from PIL import Image
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _to_rgba_array(image_bytes: bytes) -> np.ndarray:
|
|||
|
|
return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _to_rgb_array(image_bytes: bytes) -> np.ndarray:
|
|||
|
|
return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]:
|
|||
|
|
mask = alpha > threshold
|
|||
|
|
if not mask.any():
|
|||
|
|
raise ValueError("拼图块 alpha 全透明,无法匹配")
|
|||
|
|
ys, xs = np.where(mask)
|
|||
|
|
return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1
|
|||
|
|
|
|||
|
|
|
|||
|
|
def calc_drag_distance_from_bytes(bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12) -> dict:
|
|||
|
|
"""计算拼图目标位移(基于背景图 + 拼图块图)。"""
|
|||
|
|
bg = _to_rgb_array(bg_bytes)
|
|||
|
|
piece_rgba = _to_rgba_array(piece_bytes)
|
|||
|
|
|
|||
|
|
bh, bw = bg.shape[:2]
|
|||
|
|
ph, _ = piece_rgba.shape[:2]
|
|||
|
|
if bh != ph:
|
|||
|
|
raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}")
|
|||
|
|
|
|||
|
|
alpha = piece_rgba[:, :, 3]
|
|||
|
|
x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold)
|
|||
|
|
|
|||
|
|
piece_crop = piece_rgba[y0:y1, x0:x1, :3]
|
|||
|
|
mask = alpha[y0:y1, x0:x1] > alpha_threshold
|
|||
|
|
ys, xs = np.where(mask)
|
|||
|
|
piece_pixels = piece_crop[ys, xs]
|
|||
|
|
|
|||
|
|
patch_h, patch_w = piece_crop.shape[:2]
|
|||
|
|
if patch_w > bw or patch_h > bh:
|
|||
|
|
raise ValueError("拼图块裁剪尺寸超过背景图")
|
|||
|
|
|
|||
|
|
best_x = 0
|
|||
|
|
best_score = float("inf")
|
|||
|
|
second_best = float("inf")
|
|||
|
|
max_x = bw - patch_w
|
|||
|
|
|
|||
|
|
for x in range(max_x + 1):
|
|||
|
|
patch_pixels = bg[y0 + ys, x + xs]
|
|||
|
|
score = float(np.abs(patch_pixels - piece_pixels).mean())
|
|||
|
|
if score < best_score:
|
|||
|
|
second_best = best_score
|
|||
|
|
best_score = score
|
|||
|
|
best_x = x
|
|||
|
|
elif score < second_best:
|
|||
|
|
second_best = score
|
|||
|
|
|
|||
|
|
drag_distance = best_x - x0
|
|||
|
|
confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf")
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"target_x": int(best_x),
|
|||
|
|
"piece_bbox_x0": int(x0),
|
|||
|
|
"piece_bbox_y0": int(y0),
|
|||
|
|
"piece_bbox_w": int(patch_w),
|
|||
|
|
"piece_bbox_h": int(patch_h),
|
|||
|
|
"bg_width": int(bw),
|
|||
|
|
"bg_height": int(bh),
|
|||
|
|
"drag_distance": int(drag_distance),
|
|||
|
|
"best_score": best_score,
|
|||
|
|
"second_best": second_best,
|
|||
|
|
"confidence_ratio": confidence_ratio,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_data_url(data_url: str) -> bytes:
|
|||
|
|
if not data_url.startswith("data:image"):
|
|||
|
|
raise ValueError("图片不是 data:image URL")
|
|||
|
|
_, data = data_url.split(",", 1)
|
|||
|
|
return base64.b64decode(data)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def style_px(style_text: str, key: str, default: float) -> float:
|
|||
|
|
if not style_text:
|
|||
|
|
return default
|
|||
|
|
m = re.search(rf"{re.escape(key)}\s*:\s*([0-9.]+)px", style_text)
|
|||
|
|
if not m:
|
|||
|
|
return default
|
|||
|
|
return float(m.group(1))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def click_safe(ele) -> None:
|
|||
|
|
try:
|
|||
|
|
ele.click()
|
|||
|
|
return
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
ele.click(by_js=True)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _ease_out_quad(t: float) -> float:
|
|||
|
|
return t * (2 - t)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _ease_out_cubic(t: float) -> float:
|
|||
|
|
return 1 - (1 - t) ** 3
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _ease_out_bounce(t: float) -> float:
|
|||
|
|
if t < 1 / 2.75:
|
|||
|
|
return 7.5625 * t * t
|
|||
|
|
elif t < 2 / 2.75:
|
|||
|
|
t -= 1.5 / 2.75
|
|||
|
|
return 7.5625 * t * t + 0.75
|
|||
|
|
elif t < 2.5 / 2.75:
|
|||
|
|
t -= 2.25 / 2.75
|
|||
|
|
return 7.5625 * t * t + 0.9375
|
|||
|
|
else:
|
|||
|
|
t -= 2.625 / 2.75
|
|||
|
|
return 7.5625 * t * t + 0.984375
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_human_track(distance: int, num_steps: int = 0) -> list[dict]:
|
|||
|
|
"""生成仿人轨迹列表,每项 {'dx': int, 'dy': int, 'dt': float(秒)}。
|
|||
|
|
|
|||
|
|
包含:加速-匀速-减速-过冲-回弹 五阶段。
|
|||
|
|
"""
|
|||
|
|
if distance == 0:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
dist = abs(distance)
|
|||
|
|
sign = 1 if distance > 0 else -1
|
|||
|
|
|
|||
|
|
if num_steps <= 0:
|
|||
|
|
num_steps = max(12, int(dist * random.uniform(0.25, 0.4)))
|
|||
|
|
|
|||
|
|
overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08)))
|
|||
|
|
total = dist + overshoot # 先超过,再回弹
|
|||
|
|
|
|||
|
|
easing = random.choice([_ease_out_quad, _ease_out_cubic])
|
|||
|
|
|
|||
|
|
# 正向阶段
|
|||
|
|
raw_positions: list[float] = []
|
|||
|
|
for i in range(1, num_steps + 1):
|
|||
|
|
t = i / num_steps
|
|||
|
|
raw_positions.append(easing(t) * total)
|
|||
|
|
|
|||
|
|
# 回弹阶段 (2~4 步)
|
|||
|
|
bounce_steps = random.randint(2, 4)
|
|||
|
|
for j in range(1, bounce_steps + 1):
|
|||
|
|
t = j / bounce_steps
|
|||
|
|
raw_positions.append(total - _ease_out_bounce(t) * overshoot)
|
|||
|
|
|
|||
|
|
track: list[dict] = []
|
|||
|
|
prev_x = 0.0
|
|||
|
|
for pos in raw_positions:
|
|||
|
|
dx = round(pos - prev_x)
|
|||
|
|
if dx == 0 and random.random() < 0.3:
|
|||
|
|
continue
|
|||
|
|
prev_x += dx
|
|||
|
|
dy = random.choice([-1, 0, 0, 0, 1])
|
|||
|
|
# 前半段快、后半段慢(加快整体速度)
|
|||
|
|
dt = random.uniform(0.005, 0.012) if prev_x < dist * 0.6 else random.uniform(0.008, 0.025)
|
|||
|
|
if random.random() < 0.03:
|
|||
|
|
dt += random.uniform(0.02, 0.06)
|
|||
|
|
track.append({"dx": sign * dx, "dy": dy, "dt": dt})
|
|||
|
|
|
|||
|
|
# 最终位置校正
|
|||
|
|
actual = sum(s["dx"] for s in track)
|
|||
|
|
diff = distance - actual
|
|||
|
|
if diff != 0:
|
|||
|
|
track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)})
|
|||
|
|
|
|||
|
|
return track
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None:
|
|||
|
|
"""通过 CDP Input.dispatchMouseEvent 发送鼠标事件。"""
|
|||
|
|
page.run_cdp(
|
|||
|
|
"Input.dispatchMouseEvent",
|
|||
|
|
type=event_type,
|
|||
|
|
x=x,
|
|||
|
|
y=y,
|
|||
|
|
button=button,
|
|||
|
|
clickCount=1 if event_type == "mousePressed" else 0,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _get_element_center(page, ele) -> tuple[int, int]:
|
|||
|
|
"""获取元素在视口中的中心坐标。"""
|
|||
|
|
rect = page.run_js(
|
|||
|
|
"""const r = arguments[0].getBoundingClientRect();
|
|||
|
|
return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""",
|
|||
|
|
ele,
|
|||
|
|
)
|
|||
|
|
if rect and isinstance(rect, dict):
|
|||
|
|
return int(rect["x"]), int(rect["y"])
|
|||
|
|
# 回退:通过元素属性
|
|||
|
|
loc = ele.rect.midpoint
|
|||
|
|
return int(loc[0]), int(loc[1])
|
|||
|
|
|
|||
|
|
|
|||
|
|
def drag_slider(page, slider_ele, distance: int) -> None:
|
|||
|
|
"""用 CDP 级鼠标事件完成拖拽,模拟真人操作轨迹。"""
|
|||
|
|
cx, cy = _get_element_center(page, slider_ele)
|
|||
|
|
|
|||
|
|
# 1. 鼠标移到滑块中心
|
|||
|
|
_dispatch_mouse(page, "mouseMoved", cx, cy)
|
|||
|
|
time.sleep(random.uniform(0.03, 0.08))
|
|||
|
|
|
|||
|
|
# 2. 按下
|
|||
|
|
_dispatch_mouse(page, "mousePressed", cx, cy)
|
|||
|
|
time.sleep(random.uniform(0.02, 0.06))
|
|||
|
|
|
|||
|
|
# 3. 沿轨迹移动
|
|||
|
|
cur_x, cur_y = cx, cy
|
|||
|
|
track = build_human_track(distance)
|
|||
|
|
for step in track:
|
|||
|
|
cur_x += step["dx"]
|
|||
|
|
cur_y += step["dy"]
|
|||
|
|
_dispatch_mouse(page, "mouseMoved", cur_x, cur_y)
|
|||
|
|
time.sleep(step["dt"])
|
|||
|
|
|
|||
|
|
# 4. 到达终点后短暂停留
|
|||
|
|
time.sleep(random.uniform(0.02, 0.06))
|
|||
|
|
|
|||
|
|
# 5. 释放
|
|||
|
|
_dispatch_mouse(page, "mouseReleased", cur_x, cur_y)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def find_first(page, selectors: list[str], timeout: float = 5):
|
|||
|
|
for sel in selectors:
|
|||
|
|
try:
|
|||
|
|
ele = page.ele(sel, timeout=timeout)
|
|||
|
|
if ele:
|
|||
|
|
return ele
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str:
|
|||
|
|
"""轮询等待 img 元素的 src 变为有效 data:image URL(含非空 base64 数据)。"""
|
|||
|
|
deadline = time.time() + timeout
|
|||
|
|
while time.time() < deadline:
|
|||
|
|
src = img_ele.attr("src") or ""
|
|||
|
|
if src.startswith("data:image"):
|
|||
|
|
_prefix, _, b64 = src.partition(",")
|
|||
|
|
if b64.strip():
|
|||
|
|
return src
|
|||
|
|
time.sleep(interval)
|
|||
|
|
raise RuntimeError(f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def save_debug(debug_dir: Path, bg_bytes: bytes, piece_bytes: bytes) -> tuple[Path, Path]:
|
|||
|
|
debug_dir.mkdir(parents=True, exist_ok=True)
|
|||
|
|
bg_path = debug_dir / "captcha_bg.png"
|
|||
|
|
piece_path = debug_dir / "captcha_piece.png"
|
|||
|
|
bg_path.write_bytes(bg_bytes)
|
|||
|
|
piece_path.write_bytes(piece_bytes)
|
|||
|
|
return bg_path, piece_path
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run(args: argparse.Namespace) -> None:
|
|||
|
|
from DrissionPage import ChromiumOptions, ChromiumPage
|
|||
|
|
|
|||
|
|
t_start = time.perf_counter()
|
|||
|
|
|
|||
|
|
if args.port:
|
|||
|
|
co = ChromiumOptions().set_local_port(port=args.port)
|
|||
|
|
page = ChromiumPage(addr_or_opts=co)
|
|||
|
|
else:
|
|||
|
|
page = ChromiumPage()
|
|||
|
|
|
|||
|
|
page.get(args.url)
|
|||
|
|
time.sleep(args.wait_page)
|
|||
|
|
|
|||
|
|
# 首页先勾选协议 checkbox(id=color-input-red 的复选框),再点「立即订购」
|
|||
|
|
agree_checkbox = find_first(page, [
|
|||
|
|
"css:#color-input-red",
|
|||
|
|
"css:input[name='color-input-red']",
|
|||
|
|
'x://input[@id="color-input-red"]',
|
|||
|
|
"css:input.right-box[type='checkbox']",
|
|||
|
|
], timeout=5)
|
|||
|
|
if agree_checkbox:
|
|||
|
|
click_safe(agree_checkbox)
|
|||
|
|
print("已勾选协议复选框")
|
|||
|
|
time.sleep(0.4) # 勾选后等待
|
|||
|
|
|
|||
|
|
# 立即订购:点击 div.paybg 即为立即订购
|
|||
|
|
order_btn = None
|
|||
|
|
for attempt in range(4):
|
|||
|
|
order_btn = find_first(page, [
|
|||
|
|
"css:div.paybg",
|
|||
|
|
"css:.paybg",
|
|||
|
|
'x://button[contains(.,"立即订购")]',
|
|||
|
|
'x://a[contains(.,"立即订购")]',
|
|||
|
|
'x://span[contains(.,"立即订购")]',
|
|||
|
|
'x://div[contains(.,"立即订购")]',
|
|||
|
|
'x://*[contains(text(),"立即订购")]',
|
|||
|
|
'x://*[contains(.,"立即订购")]',
|
|||
|
|
"css:.btn-order",
|
|||
|
|
"css:.order-btn",
|
|||
|
|
"css:button.btn-primary",
|
|||
|
|
"css:button.btn",
|
|||
|
|
"css:a.btn",
|
|||
|
|
], timeout=1)
|
|||
|
|
if order_btn:
|
|||
|
|
break
|
|||
|
|
time.sleep(0.25)
|
|||
|
|
if order_btn:
|
|||
|
|
try:
|
|||
|
|
order_btn.run_js("this.scrollIntoView({block:'center'})")
|
|||
|
|
time.sleep(0.05)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
click_safe(order_btn)
|
|||
|
|
print("已点击立即订购")
|
|||
|
|
time.sleep(0.4)
|
|||
|
|
else:
|
|||
|
|
# 兜底1:若页面有 jQuery,用 :contains 查找
|
|||
|
|
jq_clicked = False
|
|||
|
|
try:
|
|||
|
|
jq_clicked = page.run_js("""
|
|||
|
|
if (typeof $ !== 'undefined') {
|
|||
|
|
var el = $('button, a, span, div').filter(function(){ return $(this).text().indexOf('立即订购')>=0; }).first();
|
|||
|
|
if (el.length) { el[0].scrollIntoView({block:'center'}); el[0].click(); return true; }
|
|||
|
|
el = $('*').filter(function(){ return $(this).text().trim()==='立即订购'; }).first();
|
|||
|
|
if (el.length) { el[0].scrollIntoView({block:'center'}); el[0].click(); return true; }
|
|||
|
|
}
|
|||
|
|
return false;
|
|||
|
|
""")
|
|||
|
|
if jq_clicked:
|
|||
|
|
print("已通过 jQuery 点击立即订购")
|
|||
|
|
time.sleep(0.4)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
if not jq_clicked:
|
|||
|
|
clicked = page.run_js("""
|
|||
|
|
var nodes = document.querySelectorAll('button, a, span, div, input[type=button], input[type=submit]');
|
|||
|
|
for (var i = 0; i < nodes.length; i++) {
|
|||
|
|
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
|
|||
|
|
if (t === '立即订购' || (t.indexOf('立即订购') >= 0 && t.length < 20)) {
|
|||
|
|
var el = nodes[i];
|
|||
|
|
if (el.offsetParent !== null || el.tagName === 'BODY') {
|
|||
|
|
el.scrollIntoView({block: 'center'});
|
|||
|
|
el.dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
for (var i = 0; i < nodes.length; i++) {
|
|||
|
|
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
|
|||
|
|
if (t.indexOf('立即订购') >= 0) {
|
|||
|
|
nodes[i].scrollIntoView({block: 'center'});
|
|||
|
|
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return false;
|
|||
|
|
""")
|
|||
|
|
if clicked:
|
|||
|
|
print("已通过 JS 点击立即订购")
|
|||
|
|
time.sleep(0.4)
|
|||
|
|
elif not clicked:
|
|||
|
|
# 尝试在 iframe 内点击
|
|||
|
|
in_iframe = page.run_js("""
|
|||
|
|
var iframes = document.querySelectorAll('iframe');
|
|||
|
|
for (var i = 0; i < iframes.length; i++) {
|
|||
|
|
try {
|
|||
|
|
var doc = iframes[i].contentDocument || iframes[i].contentWindow.document;
|
|||
|
|
var all = doc.querySelectorAll('*');
|
|||
|
|
for (var j = 0; j < all.length; j++) {
|
|||
|
|
var t = (all[j].innerText || all[j].textContent || '').trim();
|
|||
|
|
if (t.indexOf('立即订购') >= 0) {
|
|||
|
|
all[j].scrollIntoView({block:'center'});
|
|||
|
|
all[j].dispatchEvent(new MouseEvent('click',{bubbles:true,cancelable:true,view:window}));
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
} catch(e) {}
|
|||
|
|
}
|
|||
|
|
return false;
|
|||
|
|
""")
|
|||
|
|
if in_iframe:
|
|||
|
|
print("已在 iframe 内点击立即订购")
|
|||
|
|
time.sleep(0.4)
|
|||
|
|
else:
|
|||
|
|
# 调试:输出包含「立即订购」的元素信息
|
|||
|
|
try:
|
|||
|
|
info = page.run_js("""
|
|||
|
|
var out=[], all=document.querySelectorAll('*');
|
|||
|
|
for(var i=0;i<all.length;i++){
|
|||
|
|
var t=(all[i].innerText||all[i].textContent||'').trim();
|
|||
|
|
if(t.indexOf('立即订购')>=0)
|
|||
|
|
out.push(all[i].tagName+(all[i].id?'#'+all[i].id:'')+(all[i].className?'.'+all[i].className.split(' ')[0]:''));
|
|||
|
|
}
|
|||
|
|
return out.slice(0,5).join(', ') || '无';
|
|||
|
|
""")
|
|||
|
|
print(f"调试: 包含「立即订购」的元素(前5个): {info}")
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
print("警告: 未找到「立即订购」按钮,尝试继续...")
|
|||
|
|
|
|||
|
|
phone_input = find_first(page, [
|
|||
|
|
'x://input[@placeholder="请输入手机号码"]',
|
|||
|
|
"css:input.inp-txt",
|
|||
|
|
], timeout=8)
|
|||
|
|
if not phone_input:
|
|||
|
|
raise RuntimeError("未找到手机号输入框")
|
|||
|
|
|
|||
|
|
phone_input.input(args.phone, clear=True)
|
|||
|
|
print(f"已输入手机号: {args.phone}")
|
|||
|
|
|
|||
|
|
if not args.skip_agree:
|
|||
|
|
agree = find_first(page, [
|
|||
|
|
"css:i.ico-checkbox",
|
|||
|
|
'x://i[contains(@class,"ico-checkbox")]',
|
|||
|
|
], timeout=2)
|
|||
|
|
if agree:
|
|||
|
|
try:
|
|||
|
|
click_safe(agree)
|
|||
|
|
print("已点击同意勾选")
|
|||
|
|
except Exception:
|
|||
|
|
print("同意勾选点击失败,继续执行")
|
|||
|
|
|
|||
|
|
send_btn = find_first(page, [
|
|||
|
|
"css:button.btn-code",
|
|||
|
|
'x://button[contains(text(),"获取验证码")]',
|
|||
|
|
], timeout=8)
|
|||
|
|
if not send_btn:
|
|||
|
|
raise RuntimeError("未找到“获取验证码”按钮")
|
|||
|
|
|
|||
|
|
click_safe(send_btn)
|
|||
|
|
print("已点击获取验证码,等待滑块弹窗")
|
|||
|
|
|
|||
|
|
# 等待验证码弹窗出现
|
|||
|
|
verify_box = find_first(page, [
|
|||
|
|
"css:.verifybox",
|
|||
|
|
"css:.verify-bar-area",
|
|||
|
|
], timeout=6)
|
|||
|
|
if not verify_box:
|
|||
|
|
raise RuntimeError("未检测到滑块验证码弹窗")
|
|||
|
|
|
|||
|
|
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
|
|||
|
|
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
|
|||
|
|
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
|
|||
|
|
bar = find_first(page, ["css:.verify-bar-area"], timeout=5)
|
|||
|
|
|
|||
|
|
if not bg_img or not piece_img or not slider or not bar:
|
|||
|
|
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
|
|||
|
|
|
|||
|
|
bg_src = wait_for_data_src(bg_img, timeout=10)
|
|||
|
|
piece_src = wait_for_data_src(piece_img, timeout=10)
|
|||
|
|
|
|||
|
|
bg_bytes = parse_data_url(bg_src)
|
|||
|
|
piece_bytes = parse_data_url(piece_src)
|
|||
|
|
|
|||
|
|
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
|
|||
|
|
raise RuntimeError(f"验证码图片数据异常: bg={len(bg_bytes)}B, piece={len(piece_bytes)}B")
|
|||
|
|
|
|||
|
|
if args.debug_dir:
|
|||
|
|
bg_path, piece_path = save_debug(Path(args.debug_dir), bg_bytes, piece_bytes)
|
|||
|
|
print(f"已保存验证码图片: {bg_path} | {piece_path}")
|
|||
|
|
|
|||
|
|
match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=args.alpha_threshold)
|
|||
|
|
|
|||
|
|
# 用渲染宽度做像素→屏幕映射
|
|||
|
|
bg_display_w = page.run_js(
|
|||
|
|
"""const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;""",
|
|||
|
|
bg_img,
|
|||
|
|
)
|
|||
|
|
if not bg_display_w or bg_display_w <= 0:
|
|||
|
|
bg_display_w = match["bg_width"]
|
|||
|
|
scale = float(bg_display_w) / max(1, match["bg_width"])
|
|||
|
|
move_distance = int(round(match["drag_distance"] * scale)) + int(args.distance_adjust)
|
|||
|
|
|
|||
|
|
print(
|
|||
|
|
"滑块匹配结果: "
|
|||
|
|
f"bg_width={match['bg_width']}, bg_display_w={bg_display_w}, "
|
|||
|
|
f"target_x={match['target_x']}, drag_distance={match['drag_distance']}, "
|
|||
|
|
f"scale={scale:.4f}, move_distance={move_distance}, "
|
|||
|
|
f"confidence={match['confidence_ratio']:.4f}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
drag_slider(page, slider, move_distance)
|
|||
|
|
time.sleep(args.wait_result)
|
|||
|
|
|
|||
|
|
elapsed = time.perf_counter() - t_start
|
|||
|
|
print(f"总耗时: {elapsed:.2f} 秒(程序开始 → 滑块滑动完成)")
|
|||
|
|
|
|||
|
|
# 尝试判断是否通过:遮罩是否消失
|
|||
|
|
still_visible = page.run_js(
|
|||
|
|
"""
|
|||
|
|
const m = document.querySelector('.mask');
|
|||
|
|
if (!m) return false;
|
|||
|
|
const s = window.getComputedStyle(m);
|
|||
|
|
return s.display !== 'none' && s.visibility !== 'hidden' && s.opacity !== '0';
|
|||
|
|
"""
|
|||
|
|
)
|
|||
|
|
if still_visible:
|
|||
|
|
print("拖动已执行,但验证码弹窗仍在,可能需要微调 --distance-adjust")
|
|||
|
|
else:
|
|||
|
|
print("滑块拖动完成,验证码弹窗已关闭(疑似验证通过)")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_parser() -> argparse.ArgumentParser:
|
|||
|
|
root = Path(__file__).resolve().parent
|
|||
|
|
p = argparse.ArgumentParser(description="天翼订购页 1.html 自动滑块脚本")
|
|||
|
|
p.add_argument("--url", default="http://yscnb.com/tyyp/1.html", help="目标页面 URL")
|
|||
|
|
p.add_argument("--phone", required=True, help="手机号")
|
|||
|
|
p.add_argument("--port", type=int, default=0, help="连接已有浏览器端口(可选)")
|
|||
|
|
p.add_argument("--skip-agree", action="store_true", help="跳过勾选“同意办理”")
|
|||
|
|
p.add_argument("--alpha-threshold", type=int, default=12, help="拼图 alpha 透明阈值")
|
|||
|
|
p.add_argument("--distance-adjust", type=int, default=0, help="拖动距离微调像素")
|
|||
|
|
p.add_argument("--wait-page", type=float, default=0.3, help="打开页面后等待秒数")
|
|||
|
|
p.add_argument("--wait-result", type=float, default=0.5, help="拖动后等待结果秒数")
|
|||
|
|
p.add_argument("--debug-dir", default=str(root / "captcha_debug"), help="验证码图片输出目录")
|
|||
|
|
return p
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> None:
|
|||
|
|
args = build_parser().parse_args()
|
|||
|
|
run(args)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|