haha
This commit is contained in:
338
test1.py
338
test1.py
@@ -18,7 +18,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from typing import Any
|
||||
import json
|
||||
from typing import Any, Optional
|
||||
import base64
|
||||
import random
|
||||
import time
|
||||
@@ -54,12 +55,23 @@ def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int,
|
||||
return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1
|
||||
|
||||
|
||||
def _to_gray(rgb: np.ndarray) -> np.ndarray:
|
||||
return (
|
||||
rgb[:, :, 0] * 0.299 + rgb[:, :, 1] * 0.587 + rgb[:, :, 2] * 0.114
|
||||
).astype(np.float32)
|
||||
|
||||
|
||||
def _grad_x(gray: np.ndarray) -> np.ndarray:
|
||||
pad = np.pad(gray, ((0, 0), (1, 1)), mode="edge")
|
||||
return np.abs(pad[:, 2:] - pad[:, :-2]) * 0.5
|
||||
|
||||
|
||||
def calc_drag_distance_from_bytes(
|
||||
bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12
|
||||
) -> dict:
|
||||
"""计算拼图目标位移(基于背景图 + 拼图块图)。"""
|
||||
bg = _to_rgb_array(bg_bytes)
|
||||
piece_rgba = _to_rgba_array(piece_bytes)
|
||||
"""计算拼图目标位移,输出多个候选位移提高滑块命中率。"""
|
||||
bg = _to_rgb_array(bg_bytes).astype(np.float32)
|
||||
piece_rgba = _to_rgba_array(piece_bytes).astype(np.float32)
|
||||
|
||||
bh, bw = bg.shape[:2]
|
||||
ph, _ = piece_rgba.shape[:2]
|
||||
@@ -70,32 +82,55 @@ def calc_drag_distance_from_bytes(
|
||||
x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold)
|
||||
|
||||
piece_crop = piece_rgba[y0:y1, x0:x1, :3]
|
||||
mask = alpha[y0:y1, x0:x1] > alpha_threshold
|
||||
alpha_crop = alpha[y0:y1, x0:x1]
|
||||
mask = alpha_crop > alpha_threshold
|
||||
ys, xs = np.where(mask)
|
||||
piece_pixels = piece_crop[ys, xs]
|
||||
if len(xs) < 10:
|
||||
raise ValueError("拼图有效像素过少,无法稳定匹配")
|
||||
|
||||
piece_gray = _to_gray(piece_crop)
|
||||
bg_gray = _to_gray(bg)
|
||||
piece_grad = _grad_x(piece_gray)
|
||||
bg_grad = _grad_x(bg_gray)
|
||||
|
||||
piece_gray_pixels = piece_gray[ys, xs]
|
||||
piece_grad_pixels = piece_grad[ys, xs]
|
||||
weights = np.clip(alpha_crop[ys, xs].astype(np.float32), 1.0, 255.0)
|
||||
weights = weights / float(weights.sum())
|
||||
|
||||
patch_h, patch_w = piece_crop.shape[:2]
|
||||
if patch_w > bw or patch_h > bh:
|
||||
raise ValueError("拼图块裁剪尺寸超过背景图")
|
||||
|
||||
best_x = 0
|
||||
best_score = float("inf")
|
||||
second_best = float("inf")
|
||||
max_x = bw - patch_w
|
||||
scores: list[tuple[float, int, float, float]] = []
|
||||
|
||||
for x in range(max_x + 1):
|
||||
patch_pixels = bg[y0 + ys, x + xs]
|
||||
score = float(np.abs(patch_pixels - piece_pixels).mean())
|
||||
if score < best_score:
|
||||
second_best = best_score
|
||||
best_score = score
|
||||
best_x = x
|
||||
elif score < second_best:
|
||||
second_best = score
|
||||
patch_gray_pixels = bg_gray[y0 + ys, x + xs]
|
||||
patch_grad_pixels = bg_grad[y0 + ys, x + xs]
|
||||
color_score = float((np.abs(patch_gray_pixels - piece_gray_pixels) * weights).sum())
|
||||
grad_score = float((np.abs(patch_grad_pixels - piece_grad_pixels) * weights).sum())
|
||||
# 颜色 + 边缘加权,较纯 RGB 差值在压缩噪声下更稳定
|
||||
score = color_score * 0.72 + grad_score * 0.28
|
||||
scores.append((score, x, color_score, grad_score))
|
||||
|
||||
scores.sort(key=lambda item: item[0])
|
||||
best_score, best_x, best_color_score, best_grad_score = scores[0]
|
||||
second_best = scores[1][0] if len(scores) > 1 else float("inf")
|
||||
|
||||
drag_distance = best_x - x0
|
||||
confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf")
|
||||
|
||||
candidate_xs: list[int] = []
|
||||
for _score, x, _color, _grad in scores:
|
||||
if any(abs(x - picked) <= 1 for picked in candidate_xs):
|
||||
continue
|
||||
candidate_xs.append(int(x))
|
||||
if len(candidate_xs) >= 5:
|
||||
break
|
||||
|
||||
candidate_distances = [int(x - x0) for x in candidate_xs]
|
||||
|
||||
return {
|
||||
"target_x": int(best_x),
|
||||
"piece_bbox_x0": int(x0),
|
||||
@@ -106,8 +141,12 @@ def calc_drag_distance_from_bytes(
|
||||
"bg_height": int(bh),
|
||||
"drag_distance": int(drag_distance),
|
||||
"best_score": best_score,
|
||||
"best_color_score": best_color_score,
|
||||
"best_grad_score": best_grad_score,
|
||||
"second_best": second_best,
|
||||
"confidence_ratio": confidence_ratio,
|
||||
"candidate_target_xs": candidate_xs,
|
||||
"candidate_drag_distances": candidate_distances,
|
||||
}
|
||||
|
||||
|
||||
@@ -155,6 +194,212 @@ def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> st
|
||||
)
|
||||
|
||||
|
||||
def _is_element_visible(page, selector: str) -> bool:
|
||||
try:
|
||||
visible = page.run_js(
|
||||
"""
|
||||
const el = document.querySelector(arguments[0]);
|
||||
if (!el) return false;
|
||||
const st = window.getComputedStyle(el);
|
||||
if (st.display === 'none' || st.visibility === 'hidden' || st.opacity === '0') return false;
|
||||
const r = el.getBoundingClientRect();
|
||||
return r.width > 0 && r.height > 0;
|
||||
""",
|
||||
selector,
|
||||
)
|
||||
return bool(visible)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _read_slider_toast_text(page) -> str:
|
||||
try:
|
||||
text = page.run_js(
|
||||
"""
|
||||
const node = document.querySelector('.van-toast--text .van-toast__text')
|
||||
|| document.querySelector('.van-toast__text');
|
||||
if (!node) return '';
|
||||
const popup = node.closest('.van-toast') || node;
|
||||
const st = window.getComputedStyle(popup);
|
||||
const visible = st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0';
|
||||
if (!visible) return '';
|
||||
return (node.innerText || node.textContent || '').trim();
|
||||
"""
|
||||
)
|
||||
return (text or "").strip()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _get_slider_layout_metrics(page, bg_img, slider_ele) -> dict:
|
||||
try:
|
||||
metrics = page.run_js(
|
||||
"""
|
||||
const bg = arguments[0];
|
||||
const slider = arguments[1];
|
||||
const bar = document.querySelector('.verify-bar-area');
|
||||
const sub = slider ? slider.querySelector('.verify-sub-block') : null;
|
||||
const bgR = bg ? bg.getBoundingClientRect() : {width: 0};
|
||||
const sliderR = slider ? slider.getBoundingClientRect() : {width: 0};
|
||||
const barR = bar ? bar.getBoundingClientRect() : {width: 0};
|
||||
const subR = sub ? sub.getBoundingClientRect() : {width: 0};
|
||||
return {
|
||||
bg_display_w: Number(bgR.width || 0),
|
||||
slider_w: Number(sliderR.width || 0),
|
||||
bar_w: Number(barR.width || 0),
|
||||
sub_w: Number(subR.width || 0),
|
||||
};
|
||||
""",
|
||||
bg_img,
|
||||
slider_ele,
|
||||
)
|
||||
except Exception:
|
||||
metrics = None
|
||||
|
||||
if not isinstance(metrics, dict):
|
||||
metrics = {}
|
||||
|
||||
bar_w = float(metrics.get("bar_w", 0) or 0)
|
||||
slider_w = float(metrics.get("slider_w", 0) or 0)
|
||||
track_max = int(round(max(0.0, bar_w - slider_w))) if bar_w > 0 and slider_w > 0 else -1
|
||||
|
||||
return {
|
||||
"bg_display_w": float(metrics.get("bg_display_w", 0) or 0),
|
||||
"slider_w": slider_w,
|
||||
"bar_w": bar_w,
|
||||
"sub_w": float(metrics.get("sub_w", 0) or 0),
|
||||
"track_max": track_max,
|
||||
}
|
||||
|
||||
|
||||
def _build_move_distance_candidates(
|
||||
match: dict,
|
||||
scale: float,
|
||||
distance_adjust: int,
|
||||
track_max: int,
|
||||
) -> list[int]:
|
||||
base_distances = match.get("candidate_drag_distances") or [int(match["drag_distance"])]
|
||||
micro_offsets = (0, -1, 1, -2, 2, -3, 3)
|
||||
|
||||
candidates: list[int] = []
|
||||
seen: set[int] = set()
|
||||
|
||||
for base in base_distances[:4]:
|
||||
scaled = int(round(int(base) * scale)) + int(distance_adjust)
|
||||
for offset in micro_offsets:
|
||||
val = scaled + offset
|
||||
if track_max >= 0:
|
||||
val = max(0, min(track_max, val))
|
||||
if val not in seen:
|
||||
seen.add(val)
|
||||
candidates.append(val)
|
||||
|
||||
if not candidates:
|
||||
fallback = int(round(int(match["drag_distance"]) * scale)) + int(distance_adjust)
|
||||
if track_max >= 0:
|
||||
fallback = max(0, min(track_max, fallback))
|
||||
candidates = [fallback]
|
||||
|
||||
return candidates
|
||||
|
||||
|
||||
def _normalize_listen_packet(packet):
|
||||
if packet is False:
|
||||
return None
|
||||
if isinstance(packet, list):
|
||||
return packet[0] if packet else None
|
||||
return packet
|
||||
|
||||
|
||||
def _coerce_json_body(body: Any) -> Any:
|
||||
if not isinstance(body, str):
|
||||
return body
|
||||
raw = body.strip()
|
||||
if not raw:
|
||||
return body
|
||||
if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")):
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except Exception:
|
||||
return body
|
||||
return body
|
||||
|
||||
|
||||
def _code_indicates_success(value: Any) -> Optional[bool]:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
if isinstance(value, (int, float)):
|
||||
return int(value) in (0, 1, 200)
|
||||
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
return None
|
||||
lower = text.lower()
|
||||
if lower in {"0", "00", "000", "0000", "1", "200", "ok", "success", "true"}:
|
||||
return True
|
||||
if lower in {"false", "fail", "failed", "error", "err"}:
|
||||
return False
|
||||
if lower.lstrip("-").isdigit():
|
||||
return int(lower) in (0, 1, 200)
|
||||
return False
|
||||
|
||||
|
||||
def _assert_interface_success(body: Any) -> None:
|
||||
data = _coerce_json_body(body)
|
||||
if not isinstance(data, dict):
|
||||
return
|
||||
|
||||
fail_reasons: list[str] = []
|
||||
|
||||
for key in ("success", "ok"):
|
||||
if key in data and not bool(data[key]):
|
||||
fail_reasons.append(f"{key}={data[key]}")
|
||||
|
||||
for key in ("code", "retCode", "resultCode", "statusCode", "errno"):
|
||||
if key not in data:
|
||||
continue
|
||||
code_ok = _code_indicates_success(data[key])
|
||||
if code_ok is False:
|
||||
fail_reasons.append(f"{key}={data[key]}")
|
||||
|
||||
message = ""
|
||||
for key in ("msg", "message", "error", "errorMsg", "detail", "retMsg"):
|
||||
value = data.get(key)
|
||||
if isinstance(value, str) and value.strip():
|
||||
message = value.strip()
|
||||
break
|
||||
|
||||
if message:
|
||||
lowered = message.lower()
|
||||
fail_words = ("失败", "错误", "无效", "fail", "error", "invalid", "请重新", "未通过")
|
||||
if any(word in lowered for word in fail_words):
|
||||
fail_reasons.append(f"msg={message}")
|
||||
|
||||
if fail_reasons:
|
||||
preview = str(data)
|
||||
if len(preview) > 300:
|
||||
preview = preview[:300] + "..."
|
||||
raise RuntimeError(f"接口返回失败: {', '.join(fail_reasons)}; body={preview}")
|
||||
|
||||
|
||||
def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]:
|
||||
deadline = time.time() + timeout
|
||||
last_toast = ""
|
||||
while time.time() < deadline:
|
||||
wait_span = min(0.35, max(0.05, deadline - time.time()))
|
||||
packet = _normalize_listen_packet(page.listen.wait(timeout=wait_span, fit_count=False))
|
||||
if packet is not None:
|
||||
return packet, last_toast
|
||||
|
||||
toast = _read_slider_toast_text(page)
|
||||
if toast:
|
||||
last_toast = toast
|
||||
|
||||
return None, last_toast
|
||||
|
||||
|
||||
# ---------- 滑块拖动(仿人轨迹) ----------
|
||||
|
||||
def _ease_out_quad(t: float) -> float:
|
||||
@@ -428,28 +673,51 @@ def submit_phone(
|
||||
match = calc_drag_distance_from_bytes(
|
||||
bg_bytes, piece_bytes, alpha_threshold=alpha_threshold
|
||||
)
|
||||
bg_display_w = page.run_js(
|
||||
"const el = arguments[0]; const r = el.getBoundingClientRect(); return r.width;",
|
||||
bg_img,
|
||||
)
|
||||
if not bg_display_w or bg_display_w <= 0:
|
||||
bg_display_w = match["bg_width"]
|
||||
layout = _get_slider_layout_metrics(page, bg_img, slider)
|
||||
bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"]
|
||||
scale = float(bg_display_w) / max(1, match["bg_width"])
|
||||
move_distance = int(round(match["drag_distance"] * scale)) + int(distance_adjust)
|
||||
move_distances = _build_move_distance_candidates(
|
||||
match=match,
|
||||
scale=scale,
|
||||
distance_adjust=int(distance_adjust),
|
||||
track_max=int(layout["track_max"]),
|
||||
)
|
||||
|
||||
# 6. 执行滑块并等待 getYanZhenMa 响应
|
||||
drag_slider(page, slider, move_distance)
|
||||
time.sleep(0.5)
|
||||
# 6. 多候选位移拖动并等待 getYanZhenMa 响应
|
||||
deadline = time.time() + 15.0
|
||||
last_reason = ""
|
||||
|
||||
packet = page.listen.wait(timeout=15, fit_count=False)
|
||||
if packet is False:
|
||||
raise RuntimeError(f"滑块拖动后 15s 内未收到 {GET_YAN_ZHEN_MA_URL} 响应")
|
||||
if isinstance(packet, list):
|
||||
packet = packet[0] if packet else None
|
||||
if packet is None:
|
||||
raise RuntimeError(f"未捕获到 {GET_YAN_ZHEN_MA_URL} 数据包")
|
||||
for idx, move_distance in enumerate(move_distances, start=1):
|
||||
slider = find_first(page, ["css:.verify-move-block"], timeout=2)
|
||||
if not slider:
|
||||
break
|
||||
|
||||
return packet.response.body
|
||||
drag_slider(page, slider, move_distance)
|
||||
time.sleep(0.2)
|
||||
|
||||
remaining = max(0.1, deadline - time.time())
|
||||
packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining))
|
||||
if packet is not None:
|
||||
response = getattr(packet, "response", None)
|
||||
if response is None:
|
||||
raise RuntimeError(f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段")
|
||||
body = _coerce_json_body(response.body)
|
||||
_assert_interface_success(body)
|
||||
return body
|
||||
|
||||
if toast_text:
|
||||
last_reason = f"滑块验证失败:{toast_text}"
|
||||
elif _is_element_visible(page, ".verify-bar-area"):
|
||||
last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}"
|
||||
else:
|
||||
last_reason = "验证码弹窗已关闭但未捕获接口响应"
|
||||
|
||||
if time.time() >= deadline:
|
||||
break
|
||||
|
||||
raise RuntimeError(
|
||||
f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}"
|
||||
)
|
||||
|
||||
|
||||
def submit_code(page, code: str) -> dict:
|
||||
|
||||
Reference in New Issue
Block a user