diff --git a/test1.py b/test1.py index 18a0b5b..0f238b1 100644 --- a/test1.py +++ b/test1.py @@ -24,9 +24,11 @@ import base64 import random import time from io import BytesIO +from datetime import datetime +from pathlib import Path import numpy as np -from PIL import Image +from PIL import Image, ImageDraw from DrissionPage import ChromiumPage, ChromiumOptions @@ -400,6 +402,117 @@ def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]: return None, last_toast +def _to_jsonable(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(k): _to_jsonable(v) for k, v in value.items()} + if isinstance(value, (list, tuple, set)): + return [_to_jsonable(v) for v in value] + return str(value) + + +def _analyze_slider_failure(match: dict, layout: dict, attempts: list[dict], last_reason: str) -> dict: + hypotheses: list[str] = [] + confidence_ratio = float(match.get("confidence_ratio", 0) or 0) + + if confidence_ratio < 1.03: + hypotheses.append("拼图匹配置信度偏低,背景与缺口特征区分不明显,易出现位移误差。") + + toast_texts = [str(a.get("toast") or "") for a in attempts if a.get("toast")] + if any("验证码错误" in t for t in toast_texts): + hypotheses.append("前端返回“验证码错误”,通常是最终落点偏差或拖动轨迹被风控识别。") + + interface_errors = [str(a.get("interface_error") or "") for a in attempts if a.get("interface_error")] + if interface_errors: + hypotheses.append("接口已返回失败状态,说明滑块校验请求已发出但服务端未判定通过。") + + packet_count = sum(1 for a in attempts if a.get("packet_received")) + if attempts and packet_count == 0: + hypotheses.append("多次拖动均未捕获到目标接口,请检查接口监听地址是否变化。") + + if attempts and bool(attempts[-1].get("verify_bar_visible_after")): + hypotheses.append("验证码弹窗仍可见,当前 challenge 未完成。") + + boundary_hits = sum(1 for a in attempts if a.get("boundary_hit")) + if boundary_hits > 0: + hypotheses.append("拖动距离命中轨道边界,存在位移被截断风险。") + + if not hypotheses and last_reason: + hypotheses.append(last_reason) + if not hypotheses: + hypotheses.append("未识别出单一原因,建议查看保存的标注图和报告。") + + return { + "summary": hypotheses[0], + "hypotheses": hypotheses, + "metrics": { + "confidence_ratio": confidence_ratio, + "best_score": match.get("best_score"), + "second_best": match.get("second_best"), + "attempt_count": len(attempts), + "packet_count": packet_count, + "track_max": layout.get("track_max"), + }, + } + + +def _save_failure_artifacts( + page, + bg_bytes: bytes, + piece_bytes: bytes, + match: dict, + layout: dict, + move_distances: list[int], + attempts: list[dict], + last_reason: str, + analysis: dict, +) -> Path: + root = Path(__file__).resolve().parent / "captcha_failures" + case_dir = root / datetime.now().strftime("%Y%m%d_%H%M%S_%f") + case_dir.mkdir(parents=True, exist_ok=True) + + (case_dir / "bg.png").write_bytes(bg_bytes) + (case_dir / "piece.png").write_bytes(piece_bytes) + + try: + page.get_screenshot(path=str(case_dir / "page.png")) + except Exception: + pass + + try: + bg_img = Image.open(BytesIO(bg_bytes)).convert("RGB") + draw = ImageDraw.Draw(bg_img) + y0 = int(match.get("piece_bbox_y0", 0) or 0) + h = int(match.get("piece_bbox_h", 0) or 0) + w = int(match.get("piece_bbox_w", 0) or 0) + candidate_xs = match.get("candidate_target_xs") or [match.get("target_x", 0)] + colors = [(255, 0, 0), (255, 140, 0), (0, 128, 255), (0, 180, 80), (160, 80, 255)] + for idx, x in enumerate(candidate_xs[:5], start=1): + color = colors[(idx - 1) % len(colors)] + x = int(x) + draw.rectangle([x, y0, x + w, y0 + h], outline=color, width=2 if idx == 1 else 1) + draw.text((x + 2, max(0, y0 - 14)), f"#{idx}", fill=color) + bg_img.save(case_dir / "bg_overlay.png") + except Exception: + pass + + report = { + "created_at": datetime.now().isoformat(timespec="seconds"), + "failure_reason": last_reason, + "analysis": analysis, + "match": _to_jsonable(match), + "layout": _to_jsonable(layout), + "move_distances": move_distances, + "attempts": _to_jsonable(attempts), + } + (case_dir / "report.json").write_text( + json.dumps(report, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + return case_dir + + # ---------- 滑块拖动(仿人轨迹) ---------- def _ease_out_quad(t: float) -> float: @@ -676,20 +789,32 @@ def submit_phone( layout = _get_slider_layout_metrics(page, bg_img, slider) bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"] scale = float(bg_display_w) / max(1, match["bg_width"]) + track_max = int(layout["track_max"]) move_distances = _build_move_distance_candidates( match=match, scale=scale, distance_adjust=int(distance_adjust), - track_max=int(layout["track_max"]), + track_max=track_max, ) # 6. 多候选位移拖动并等待 getYanZhenMa 响应 deadline = time.time() + 15.0 last_reason = "" + attempts: list[dict[str, Any]] = [] for idx, move_distance in enumerate(move_distances, start=1): + attempt: dict[str, Any] = { + "index": idx, + "move_distance": int(move_distance), + "boundary_hit": bool(track_max >= 0 and move_distance in (0, track_max)), + "packet_received": False, + "toast": "", + "interface_error": "", + } slider = find_first(page, ["css:.verify-move-block"], timeout=2) if not slider: + attempt["interface_error"] = "未找到滑块元素" + attempts.append(attempt) break drag_slider(page, slider, move_distance) @@ -697,13 +822,32 @@ def submit_phone( remaining = max(0.1, deadline - time.time()) packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining)) + attempt["toast"] = toast_text or "" if packet is not None: + attempt["packet_received"] = True response = getattr(packet, "response", None) if response is None: - raise RuntimeError(f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段") + attempt["interface_error"] = f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段" + last_reason = attempt["interface_error"] + attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") + attempts.append(attempt) + if not attempt["verify_bar_visible_after"]: + break + continue + body = _coerce_json_body(response.body) - _assert_interface_success(body) - return body + attempt["response_body"] = _to_jsonable(body) + try: + _assert_interface_success(body) + return body + except Exception as e: + attempt["interface_error"] = str(e) + last_reason = attempt["interface_error"] + attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") + attempts.append(attempt) + if not attempt["verify_bar_visible_after"]: + break + continue if toast_text: last_reason = f"滑块验证失败:{toast_text}" @@ -711,12 +855,40 @@ def submit_phone( last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}" else: last_reason = "验证码弹窗已关闭但未捕获接口响应" + attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") + attempt["interface_error"] = last_reason if not attempt["interface_error"] else attempt["interface_error"] + attempts.append(attempt) if time.time() >= deadline: break + analysis = _analyze_slider_failure( + match=match, + layout=layout, + attempts=attempts, + last_reason=last_reason, + ) + save_info = "" + try: + case_dir = _save_failure_artifacts( + page=page, + bg_bytes=bg_bytes, + piece_bytes=piece_bytes, + match=match, + layout=layout, + move_distances=move_distances, + attempts=attempts, + last_reason=last_reason, + analysis=analysis, + ) + save_info = f"; 失败样本目录: {case_dir}" + except Exception as e: + save_info = f"; 失败样本保存失败: {e}" + raise RuntimeError( f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}" + f"; 分析: {analysis.get('summary', '无')}" + f"{save_info}" )