Files
fws_code/tyyp_1html_dp.py
2026-02-26 01:32:11 +08:00

550 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
站点流程自动化DrissionPage
1) 打开 http://yscnb.com/tyyp/1.html
2) 输入手机号
3) 点击“获取验证码”
4) 解析弹窗中的拼图验证码并执行拖动
说明:仅实现到“滑块拖动”步骤,不会自动填写短信验证码。
"""
from __future__ import annotations
import argparse
import base64
import random
import re
import time
from io import BytesIO
from pathlib import Path
import numpy as np
from PIL import Image
def _to_rgba_array(image_bytes: bytes) -> np.ndarray:
return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16)
def _to_rgb_array(image_bytes: bytes) -> np.ndarray:
return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16)
def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]:
mask = alpha > threshold
if not mask.any():
raise ValueError("拼图块 alpha 全透明,无法匹配")
ys, xs = np.where(mask)
return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1
def calc_drag_distance_from_bytes(bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12) -> dict:
"""计算拼图目标位移(基于背景图 + 拼图块图)。"""
bg = _to_rgb_array(bg_bytes)
piece_rgba = _to_rgba_array(piece_bytes)
bh, bw = bg.shape[:2]
ph, _ = piece_rgba.shape[:2]
if bh != ph:
raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}")
alpha = piece_rgba[:, :, 3]
x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold)
piece_crop = piece_rgba[y0:y1, x0:x1, :3]
mask = alpha[y0:y1, x0:x1] > alpha_threshold
ys, xs = np.where(mask)
piece_pixels = piece_crop[ys, xs]
patch_h, patch_w = piece_crop.shape[:2]
if patch_w > bw or patch_h > bh:
raise ValueError("拼图块裁剪尺寸超过背景图")
best_x = 0
best_score = float("inf")
second_best = float("inf")
max_x = bw - patch_w
for x in range(max_x + 1):
patch_pixels = bg[y0 + ys, x + xs]
score = float(np.abs(patch_pixels - piece_pixels).mean())
if score < best_score:
second_best = best_score
best_score = score
best_x = x
elif score < second_best:
second_best = score
drag_distance = best_x - x0
confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf")
return {
"target_x": int(best_x),
"piece_bbox_x0": int(x0),
"piece_bbox_y0": int(y0),
"piece_bbox_w": int(patch_w),
"piece_bbox_h": int(patch_h),
"bg_width": int(bw),
"bg_height": int(bh),
"drag_distance": int(drag_distance),
"best_score": best_score,
"second_best": second_best,
"confidence_ratio": confidence_ratio,
}
def parse_data_url(data_url: str) -> bytes:
if not data_url.startswith("data:image"):
raise ValueError("图片不是 data:image URL")
_, data = data_url.split(",", 1)
return base64.b64decode(data)
def style_px(style_text: str, key: str, default: float) -> float:
if not style_text:
return default
m = re.search(rf"{re.escape(key)}\s*:\s*([0-9.]+)px", style_text)
if not m:
return default
return float(m.group(1))
def click_safe(ele) -> None:
try:
ele.click()
return
except Exception:
pass
ele.click(by_js=True)
def _ease_out_quad(t: float) -> float:
return t * (2 - t)
def _ease_out_cubic(t: float) -> float:
return 1 - (1 - t) ** 3
def _ease_out_bounce(t: float) -> float:
if t < 1 / 2.75:
return 7.5625 * t * t
elif t < 2 / 2.75:
t -= 1.5 / 2.75
return 7.5625 * t * t + 0.75
elif t < 2.5 / 2.75:
t -= 2.25 / 2.75
return 7.5625 * t * t + 0.9375
else:
t -= 2.625 / 2.75
return 7.5625 * t * t + 0.984375
def build_human_track(distance: int, num_steps: int = 0) -> list[dict]:
"""生成仿人轨迹列表,每项 {'dx': int, 'dy': int, 'dt': float(秒)}。
包含:加速-匀速-减速-过冲-回弹 五阶段。
"""
if distance == 0:
return []
dist = abs(distance)
sign = 1 if distance > 0 else -1
if num_steps <= 0:
num_steps = max(12, int(dist * random.uniform(0.25, 0.4)))
overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08)))
total = dist + overshoot # 先超过,再回弹
easing = random.choice([_ease_out_quad, _ease_out_cubic])
# 正向阶段
raw_positions: list[float] = []
for i in range(1, num_steps + 1):
t = i / num_steps
raw_positions.append(easing(t) * total)
# 回弹阶段 (2~4 步)
bounce_steps = random.randint(2, 4)
for j in range(1, bounce_steps + 1):
t = j / bounce_steps
raw_positions.append(total - _ease_out_bounce(t) * overshoot)
track: list[dict] = []
prev_x = 0.0
for pos in raw_positions:
dx = round(pos - prev_x)
if dx == 0 and random.random() < 0.3:
continue
prev_x += dx
dy = random.choice([-1, 0, 0, 0, 1])
# 前半段快、后半段慢(加快整体速度)
dt = random.uniform(0.005, 0.012) if prev_x < dist * 0.6 else random.uniform(0.008, 0.025)
if random.random() < 0.03:
dt += random.uniform(0.02, 0.06)
track.append({"dx": sign * dx, "dy": dy, "dt": dt})
# 最终位置校正
actual = sum(s["dx"] for s in track)
diff = distance - actual
if diff != 0:
track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)})
return track
def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None:
"""通过 CDP Input.dispatchMouseEvent 发送鼠标事件。"""
page.run_cdp(
"Input.dispatchMouseEvent",
type=event_type,
x=x,
y=y,
button=button,
clickCount=1 if event_type == "mousePressed" else 0,
)
def _get_element_center(page, ele) -> tuple[int, int]:
"""获取元素在视口中的中心坐标。"""
rect = page.run_js(
"""const r = arguments[0].getBoundingClientRect();
return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""",
ele,
)
if rect and isinstance(rect, dict):
return int(rect["x"]), int(rect["y"])
# 回退:通过元素属性
loc = ele.rect.midpoint
return int(loc[0]), int(loc[1])
def drag_slider(page, slider_ele, distance: int) -> None:
"""用 CDP 级鼠标事件完成拖拽,模拟真人操作轨迹。"""
cx, cy = _get_element_center(page, slider_ele)
# 1. 鼠标移到滑块中心
_dispatch_mouse(page, "mouseMoved", cx, cy)
time.sleep(random.uniform(0.03, 0.08))
# 2. 按下
_dispatch_mouse(page, "mousePressed", cx, cy)
time.sleep(random.uniform(0.02, 0.06))
# 3. 沿轨迹移动
cur_x, cur_y = cx, cy
track = build_human_track(distance)
for step in track:
cur_x += step["dx"]
cur_y += step["dy"]
_dispatch_mouse(page, "mouseMoved", cur_x, cur_y)
time.sleep(step["dt"])
# 4. 到达终点后短暂停留
time.sleep(random.uniform(0.02, 0.06))
# 5. 释放
_dispatch_mouse(page, "mouseReleased", cur_x, cur_y)
def find_first(page, selectors: list[str], timeout: float = 5):
for sel in selectors:
try:
ele = page.ele(sel, timeout=timeout)
if ele:
return ele
except Exception:
continue
return None
def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str:
"""轮询等待 img 元素的 src 变为有效 data:image URL含非空 base64 数据)。"""
deadline = time.time() + timeout
while time.time() < deadline:
src = img_ele.attr("src") or ""
if src.startswith("data:image"):
_prefix, _, b64 = src.partition(",")
if b64.strip():
return src
time.sleep(interval)
raise RuntimeError(f"等待 data:image src 超时({timeout}s当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}")
def save_debug(debug_dir: Path, bg_bytes: bytes, piece_bytes: bytes) -> tuple[Path, Path]:
debug_dir.mkdir(parents=True, exist_ok=True)
bg_path = debug_dir / "captcha_bg.png"
piece_path = debug_dir / "captcha_piece.png"
bg_path.write_bytes(bg_bytes)
piece_path.write_bytes(piece_bytes)
return bg_path, piece_path
def run(args: argparse.Namespace) -> None:
from DrissionPage import ChromiumOptions, ChromiumPage
t_start = time.perf_counter()
if args.port:
co = ChromiumOptions().set_local_port(port=args.port)
page = ChromiumPage(addr_or_opts=co)
else:
page = ChromiumPage()
page.get(args.url)
time.sleep(args.wait_page)
# 首页先勾选协议 checkboxid=color-input-red 的复选框),再点「立即订购」
agree_checkbox = find_first(page, [
"css:#color-input-red",
"css:input[name='color-input-red']",
'x://input[@id="color-input-red"]',
"css:input.right-box[type='checkbox']",
], timeout=5)
if agree_checkbox:
click_safe(agree_checkbox)
print("已勾选协议复选框")
time.sleep(0.4) # 勾选后等待
# 立即订购:点击 div.paybg 即为立即订购
order_btn = None
for attempt in range(4):
order_btn = find_first(page, [
"css:div.paybg",
"css:.paybg",
'x://button[contains(.,"立即订购")]',
'x://a[contains(.,"立即订购")]',
'x://span[contains(.,"立即订购")]',
'x://div[contains(.,"立即订购")]',
'x://*[contains(text(),"立即订购")]',
'x://*[contains(.,"立即订购")]',
"css:.btn-order",
"css:.order-btn",
"css:button.btn-primary",
"css:button.btn",
"css:a.btn",
], timeout=1)
if order_btn:
break
time.sleep(0.25)
if order_btn:
try:
order_btn.run_js("this.scrollIntoView({block:'center'})")
time.sleep(0.05)
except Exception:
pass
click_safe(order_btn)
print("已点击立即订购")
time.sleep(0.4)
else:
# 兜底1若页面有 jQuery用 :contains 查找
jq_clicked = False
try:
jq_clicked = page.run_js("""
if (typeof $ !== 'undefined') {
var el = $('button, a, span, div').filter(function(){ return $(this).text().indexOf('立即订购')>=0; }).first();
if (el.length) { el[0].scrollIntoView({block:'center'}); el[0].click(); return true; }
el = $('*').filter(function(){ return $(this).text().trim()==='立即订购'; }).first();
if (el.length) { el[0].scrollIntoView({block:'center'}); el[0].click(); return true; }
}
return false;
""")
if jq_clicked:
print("已通过 jQuery 点击立即订购")
time.sleep(0.4)
except Exception:
pass
if not jq_clicked:
clicked = page.run_js("""
var nodes = document.querySelectorAll('button, a, span, div, input[type=button], input[type=submit]');
for (var i = 0; i < nodes.length; i++) {
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
if (t === '立即订购' || (t.indexOf('立即订购') >= 0 && t.length < 20)) {
var el = nodes[i];
if (el.offsetParent !== null || el.tagName === 'BODY') {
el.scrollIntoView({block: 'center'});
el.dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
return true;
}
}
}
for (var i = 0; i < nodes.length; i++) {
var t = (nodes[i].innerText || nodes[i].textContent || '').trim();
if (t.indexOf('立即订购') >= 0) {
nodes[i].scrollIntoView({block: 'center'});
nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window}));
return true;
}
}
return false;
""")
if clicked:
print("已通过 JS 点击立即订购")
time.sleep(0.4)
elif not clicked:
# 尝试在 iframe 内点击
in_iframe = page.run_js("""
var iframes = document.querySelectorAll('iframe');
for (var i = 0; i < iframes.length; i++) {
try {
var doc = iframes[i].contentDocument || iframes[i].contentWindow.document;
var all = doc.querySelectorAll('*');
for (var j = 0; j < all.length; j++) {
var t = (all[j].innerText || all[j].textContent || '').trim();
if (t.indexOf('立即订购') >= 0) {
all[j].scrollIntoView({block:'center'});
all[j].dispatchEvent(new MouseEvent('click',{bubbles:true,cancelable:true,view:window}));
return true;
}
}
} catch(e) {}
}
return false;
""")
if in_iframe:
print("已在 iframe 内点击立即订购")
time.sleep(0.4)
else:
# 调试:输出包含「立即订购」的元素信息
try:
info = page.run_js("""
var out=[], all=document.querySelectorAll('*');
for(var i=0;i<all.length;i++){
var t=(all[i].innerText||all[i].textContent||'').trim();
if(t.indexOf('立即订购')>=0)
out.push(all[i].tagName+(all[i].id?'#'+all[i].id:'')+(all[i].className?'.'+all[i].className.split(' ')[0]:''));
}
return out.slice(0,5).join(', ') || '';
""")
print(f"调试: 包含「立即订购」的元素(前5个): {info}")
except Exception:
pass
print("警告: 未找到「立即订购」按钮,尝试继续...")
phone_input = find_first(page, [
'x://input[@placeholder="请输入手机号码"]',
"css:input.inp-txt",
], timeout=8)
if not phone_input:
raise RuntimeError("未找到手机号输入框")
phone_input.input(args.phone, clear=True)
print(f"已输入手机号: {args.phone}")
if not args.skip_agree:
agree = find_first(page, [
"css:i.ico-checkbox",
'x://i[contains(@class,"ico-checkbox")]',
], timeout=2)
if agree:
try:
click_safe(agree)
print("已点击同意勾选")
except Exception:
print("同意勾选点击失败,继续执行")
send_btn = find_first(page, [
"css:button.btn-code",
'x://button[contains(text(),"获取验证码")]',
], timeout=8)
if not send_btn:
raise RuntimeError("未找到“获取验证码”按钮")
click_safe(send_btn)
print("已点击获取验证码,等待滑块弹窗")
# 等待验证码弹窗出现
verify_box = find_first(page, [
"css:.verifybox",
"css:.verify-bar-area",
], timeout=6)
if not verify_box:
raise RuntimeError("未检测到滑块验证码弹窗")
bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5)
piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5)
slider = find_first(page, ["css:.verify-move-block"], timeout=5)
bar = find_first(page, ["css:.verify-bar-area"], timeout=5)
if not bg_img or not piece_img or not slider or not bar:
raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)")
bg_src = wait_for_data_src(bg_img, timeout=10)
piece_src = wait_for_data_src(piece_img, timeout=10)
bg_bytes = parse_data_url(bg_src)
piece_bytes = parse_data_url(piece_src)
if len(bg_bytes) < 100 or len(piece_bytes) < 100:
raise RuntimeError(f"验证码图片数据异常: bg={len(bg_bytes)}B, piece={len(piece_bytes)}B")
if args.debug_dir:
bg_path, piece_path = save_debug(Path(args.debug_dir), bg_bytes, piece_bytes)
print(f"已保存验证码图片: {bg_path} | {piece_path}")
match = calc_drag_distance_from_bytes(bg_bytes, piece_bytes, alpha_threshold=args.alpha_threshold)
# 用渲染宽度做像素→屏幕映射
bg_display_w = page.run_js(
"""const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;""",
bg_img,
)
if not bg_display_w or bg_display_w <= 0:
bg_display_w = match["bg_width"]
scale = float(bg_display_w) / max(1, match["bg_width"])
move_distance = int(round(match["drag_distance"] * scale)) + int(args.distance_adjust)
print(
"滑块匹配结果: "
f"bg_width={match['bg_width']}, bg_display_w={bg_display_w}, "
f"target_x={match['target_x']}, drag_distance={match['drag_distance']}, "
f"scale={scale:.4f}, move_distance={move_distance}, "
f"confidence={match['confidence_ratio']:.4f}"
)
drag_slider(page, slider, move_distance)
time.sleep(args.wait_result)
elapsed = time.perf_counter() - t_start
print(f"总耗时: {elapsed:.2f} 秒(程序开始 → 滑块滑动完成)")
# 尝试判断是否通过:遮罩是否消失
still_visible = page.run_js(
"""
const m = document.querySelector('.mask');
if (!m) return false;
const s = window.getComputedStyle(m);
return s.display !== 'none' && s.visibility !== 'hidden' && s.opacity !== '0';
"""
)
if still_visible:
print("拖动已执行,但验证码弹窗仍在,可能需要微调 --distance-adjust")
else:
print("滑块拖动完成,验证码弹窗已关闭(疑似验证通过)")
def build_parser() -> argparse.ArgumentParser:
root = Path(__file__).resolve().parent
p = argparse.ArgumentParser(description="天翼订购页 1.html 自动滑块脚本")
p.add_argument("--url", default="http://yscnb.com/tyyp/1.html", help="目标页面 URL")
p.add_argument("--phone", required=True, help="手机号")
p.add_argument("--port", type=int, default=0, help="连接已有浏览器端口(可选)")
p.add_argument("--skip-agree", action="store_true", help="跳过勾选“同意办理”")
p.add_argument("--alpha-threshold", type=int, default=12, help="拼图 alpha 透明阈值")
p.add_argument("--distance-adjust", type=int, default=0, help="拖动距离微调像素")
p.add_argument("--wait-page", type=float, default=0.3, help="打开页面后等待秒数")
p.add_argument("--wait-result", type=float, default=0.5, help="拖动后等待结果秒数")
p.add_argument("--debug-dir", default=str(root / "captcha_debug"), help="验证码图片输出目录")
return p
def main() -> None:
args = build_parser().parse_args()
run(args)
if __name__ == "__main__":
main()