From ee32d27b7faf188cc85cad19ec61c220d8cbb6a5 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Fri, 27 Feb 2026 17:05:29 +0800 Subject: [PATCH] haha --- test1.py | 1461 ++-------------------------------------- tyyp_app/__init__.py | 14 + tyyp_app/api.py | 87 +++ tyyp_app/automation.py | 1017 ++++++++++++++++++++++++++++ tyyp_app/cli.py | 49 ++ tyyp_app/config.py | 38 ++ tyyp_app/schemas.py | 37 + tyyp_app/services.py | 289 ++++++++ 8 files changed, 1577 insertions(+), 1415 deletions(-) create mode 100644 tyyp_app/__init__.py create mode 100644 tyyp_app/api.py create mode 100644 tyyp_app/automation.py create mode 100644 tyyp_app/cli.py create mode 100644 tyyp_app/config.py create mode 100644 tyyp_app/schemas.py create mode 100644 tyyp_app/services.py diff --git a/test1.py b/test1.py index b32f6d2..55b8afb 100644 --- a/test1.py +++ b/test1.py @@ -1,1425 +1,56 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -""" -天翼云盘订购页 http://yscnb.com/tyyp/ 自动化脚本(全部代码自包含) - -自动化流程: -1. 打开页面 -2. 勾选协议 -3. 点击「立即订购」 -4. 输入手机号 -5. 点击「获取验证码」 -6. 解析拼图验证码并执行滑块拖动 -7. 监听 hn_userEquitys/getYanZhenMa/v2 响应并输出 - -用法:python test1.py [--phone 手机号] - 调试时可在代码中修改 DEFAULT_PHONE -""" +"""兼容入口:保留 test1.py 的运行方式,实际实现位于 tyyp_app 包。""" from __future__ import annotations -import argparse -import json -import re -import threading -import uuid -from typing import Any, Optional -import base64 -import random -import time -from io import BytesIO -from datetime import datetime -from pathlib import Path +from tyyp_app.api import app +from tyyp_app.automation import input_code, submit_code, submit_phone +from tyyp_app.cli import main +from tyyp_app.config import ( + DEFAULT_PHONE, + DEFAULT_TARGET_URL, + FLOW_CLEANUP_INTERVAL_SECONDS, + GET_YAN_ZHEN_MA_URL, + MOBILE_UA_POOL, + ORDER_PACKET_URL, + PROJECT_ROOT, + PROXY_FETCH_HEADERS, + PROXY_SOURCE_URL, + SESSION_IDLE_SECONDS, +) +from tyyp_app.schemas import ( + ApiSubmitCodeRequest, + ApiSubmitCodeResponse, + ApiSubmitPhoneRequest, + ApiSubmitPhoneResponse, +) +from tyyp_app.services import FlowService, FlowSessionStore, ProxyProvider -import numpy as np -from PIL import Image, ImageDraw -import requests - -from DrissionPage import ChromiumPage, ChromiumOptions -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel, Field - -from tgebrowser_client import TgeBrowserClient - -# ========== 调试用:在代码中直接指定手机号 ========== -DEFAULT_PHONE = "17375712810" - -# 需要监听的 URL 特征(滑块通过后前端会请求此接口) -GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" -ORDER_PACKET_URL = "hn_userEquitys/common/order" -DEFAULT_TARGET_URL = "http://yscnb.com/tyyp/" -PROXY_SOURCE_URL = "http://47.109.106.79:7002/ProxIpServiceTxt" -SESSION_IDLE_SECONDS = 180 - -MOBILE_UA_POOL = [ - "Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 13; M2012K11AC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 12; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 13; SM-S9180) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 12; V2227A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (iPhone; CPU iPhone OS 18_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Mobile/15E148 Safari/604.1", +__all__ = [ + "app", + "main", + "submit_phone", + "submit_code", + "input_code", + "ApiSubmitPhoneRequest", + "ApiSubmitPhoneResponse", + "ApiSubmitCodeRequest", + "ApiSubmitCodeResponse", + "FlowService", + "FlowSessionStore", + "ProxyProvider", + "DEFAULT_PHONE", + "DEFAULT_TARGET_URL", + "GET_YAN_ZHEN_MA_URL", + "ORDER_PACKET_URL", + "PROXY_SOURCE_URL", + "SESSION_IDLE_SECONDS", + "FLOW_CLEANUP_INTERVAL_SECONDS", + "MOBILE_UA_POOL", + "PROXY_FETCH_HEADERS", + "PROJECT_ROOT", ] -PROXY_FETCH_HEADERS = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", - "Cache-Control": "no-cache", - "DNT": "1", - "Pragma": "no-cache", - "Proxy-Connection": "keep-alive", - "Upgrade-Insecure-Requests": "1", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0", -} - - -# ---------- 拼图验证码计算 ---------- - -def _to_rgba_array(image_bytes: bytes) -> np.ndarray: - return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16) - - -def _to_rgb_array(image_bytes: bytes) -> np.ndarray: - return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16) - - -def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]: - mask = alpha > threshold - if not mask.any(): - raise ValueError("拼图块 alpha 全透明,无法匹配") - ys, xs = np.where(mask) - return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 - - -def _to_gray(rgb: np.ndarray) -> np.ndarray: - return ( - rgb[:, :, 0] * 0.299 + rgb[:, :, 1] * 0.587 + rgb[:, :, 2] * 0.114 - ).astype(np.float32) - - -def _grad_x(gray: np.ndarray) -> np.ndarray: - pad = np.pad(gray, ((0, 0), (1, 1)), mode="edge") - return np.abs(pad[:, 2:] - pad[:, :-2]) * 0.5 - - -def calc_drag_distance_from_bytes( - bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12 -) -> dict: - """计算拼图目标位移,输出多个候选位移提高滑块命中率。""" - bg = _to_rgb_array(bg_bytes).astype(np.float32) - piece_rgba = _to_rgba_array(piece_bytes).astype(np.float32) - - bh, bw = bg.shape[:2] - ph, _ = piece_rgba.shape[:2] - if bh != ph: - raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}") - - alpha = piece_rgba[:, :, 3] - x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold) - - piece_crop = piece_rgba[y0:y1, x0:x1, :3] - alpha_crop = alpha[y0:y1, x0:x1] - mask = alpha_crop > alpha_threshold - ys, xs = np.where(mask) - if len(xs) < 10: - raise ValueError("拼图有效像素过少,无法稳定匹配") - - piece_gray = _to_gray(piece_crop) - bg_gray = _to_gray(bg) - piece_grad = _grad_x(piece_gray) - bg_grad = _grad_x(bg_gray) - - piece_gray_pixels = piece_gray[ys, xs] - piece_grad_pixels = piece_grad[ys, xs] - weights = np.clip(alpha_crop[ys, xs].astype(np.float32), 1.0, 255.0) - weights = weights / float(weights.sum()) - - patch_h, patch_w = piece_crop.shape[:2] - if patch_w > bw or patch_h > bh: - raise ValueError("拼图块裁剪尺寸超过背景图") - - max_x = bw - patch_w - scores: list[tuple[float, int, float, float]] = [] - - for x in range(max_x + 1): - patch_gray_pixels = bg_gray[y0 + ys, x + xs] - patch_grad_pixels = bg_grad[y0 + ys, x + xs] - color_score = float((np.abs(patch_gray_pixels - piece_gray_pixels) * weights).sum()) - grad_score = float((np.abs(patch_grad_pixels - piece_grad_pixels) * weights).sum()) - # 颜色 + 边缘加权,较纯 RGB 差值在压缩噪声下更稳定 - score = color_score * 0.72 + grad_score * 0.28 - scores.append((score, x, color_score, grad_score)) - - scores.sort(key=lambda item: item[0]) - best_score, best_x, best_color_score, best_grad_score = scores[0] - second_best = scores[1][0] if len(scores) > 1 else float("inf") - - drag_distance = best_x - x0 - confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf") - - candidate_xs: list[int] = [] - for _score, x, _color, _grad in scores: - if any(abs(x - picked) <= 1 for picked in candidate_xs): - continue - candidate_xs.append(int(x)) - if len(candidate_xs) >= 5: - break - - candidate_distances = [int(x - x0) for x in candidate_xs] - - return { - "target_x": int(best_x), - "piece_bbox_x0": int(x0), - "piece_bbox_y0": int(y0), - "piece_bbox_w": int(patch_w), - "piece_bbox_h": int(patch_h), - "bg_width": int(bw), - "bg_height": int(bh), - "drag_distance": int(drag_distance), - "best_score": best_score, - "best_color_score": best_color_score, - "best_grad_score": best_grad_score, - "second_best": second_best, - "confidence_ratio": confidence_ratio, - "candidate_target_xs": candidate_xs, - "candidate_drag_distances": candidate_distances, - } - - -def parse_data_url(data_url: str) -> bytes: - if not data_url.startswith("data:image"): - raise ValueError("图片不是 data:image URL") - _, data = data_url.split(",", 1) - return base64.b64decode(data) - - -# ---------- 元素查找与点击 ---------- - -def click_safe(ele) -> None: - try: - ele.click() - return - except Exception: - pass - ele.click(by_js=True) - - -def find_first(page, selectors: list[str], timeout: float = 5): - for sel in selectors: - try: - ele = page.ele(sel, timeout=timeout) - if ele: - return ele - except Exception: - continue - return None - - -def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str: - """轮询等待 img 元素的 src 变为有效 data:image URL。""" - deadline = time.time() + timeout - while time.time() < deadline: - src = img_ele.attr("src") or "" - if src.startswith("data:image"): - _prefix, _, b64 = src.partition(",") - if b64.strip(): - return src - time.sleep(interval) - raise RuntimeError( - f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}" - ) - - -def _is_element_visible(page, selector: str) -> bool: - try: - visible = page.run_js( - """ - const el = document.querySelector(arguments[0]); - if (!el) return false; - const st = window.getComputedStyle(el); - if (st.display === 'none' || st.visibility === 'hidden' || st.opacity === '0') return false; - const r = el.getBoundingClientRect(); - return r.width > 0 && r.height > 0; - """, - selector, - ) - return bool(visible) - except Exception: - return False - - -def _read_slider_toast_text(page) -> str: - try: - text = page.run_js( - """ - const node = document.querySelector('.van-toast--text .van-toast__text') - || document.querySelector('.van-toast__text'); - if (!node) return ''; - const popup = node.closest('.van-toast') || node; - const st = window.getComputedStyle(popup); - const visible = st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0'; - if (!visible) return ''; - return (node.innerText || node.textContent || '').trim(); - """ - ) - return (text or "").strip() - except Exception: - return "" - - -def _get_slider_layout_metrics(page, bg_img, slider_ele) -> dict: - try: - metrics = page.run_js( - """ - const bg = arguments[0]; - const slider = arguments[1]; - const bar = document.querySelector('.verify-bar-area'); - const sub = slider ? slider.querySelector('.verify-sub-block') : null; - const bgR = bg ? bg.getBoundingClientRect() : {width: 0}; - const sliderR = slider ? slider.getBoundingClientRect() : {width: 0}; - const barR = bar ? bar.getBoundingClientRect() : {width: 0}; - const subR = sub ? sub.getBoundingClientRect() : {width: 0}; - return { - bg_display_w: Number(bgR.width || 0), - slider_w: Number(sliderR.width || 0), - bar_w: Number(barR.width || 0), - sub_w: Number(subR.width || 0), - }; - """, - bg_img, - slider_ele, - ) - except Exception: - metrics = None - - if not isinstance(metrics, dict): - metrics = {} - - bar_w = float(metrics.get("bar_w", 0) or 0) - slider_w = float(metrics.get("slider_w", 0) or 0) - track_max = int(round(max(0.0, bar_w - slider_w))) if bar_w > 0 and slider_w > 0 else -1 - - return { - "bg_display_w": float(metrics.get("bg_display_w", 0) or 0), - "slider_w": slider_w, - "bar_w": bar_w, - "sub_w": float(metrics.get("sub_w", 0) or 0), - "track_max": track_max, - } - - -def _build_move_distance_candidates( - match: dict, - scale: float, - distance_adjust: int, - track_max: int, -) -> list[int]: - base_distances = match.get("candidate_drag_distances") or [int(match["drag_distance"])] - micro_offsets = (0, -1, 1, -2, 2, -3, 3) - - candidates: list[int] = [] - seen: set[int] = set() - - for base in base_distances[:4]: - scaled = int(round(int(base) * scale)) + int(distance_adjust) - for offset in micro_offsets: - val = scaled + offset - if track_max >= 0: - val = max(0, min(track_max, val)) - if val not in seen: - seen.add(val) - candidates.append(val) - - if not candidates: - fallback = int(round(int(match["drag_distance"]) * scale)) + int(distance_adjust) - if track_max >= 0: - fallback = max(0, min(track_max, fallback)) - candidates = [fallback] - - return candidates - - -def _normalize_listen_packet(packet): - if packet is False: - return None - if isinstance(packet, list): - return packet[0] if packet else None - return packet - - -def _coerce_json_body(body: Any) -> Any: - if not isinstance(body, str): - return body - raw = body.strip() - if not raw: - return body - if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")): - try: - return json.loads(raw) - except Exception: - return body - return body - - -def _code_indicates_success(value: Any) -> Optional[bool]: - if value is None: - return None - if isinstance(value, bool): - return value - if isinstance(value, (int, float)): - return int(value) in (0, 1, 200) - - text = str(value).strip() - if not text: - return None - lower = text.lower() - if lower in {"0", "00", "000", "0000", "1", "200", "ok", "success", "true"}: - return True - if lower in {"false", "fail", "failed", "error", "err"}: - return False - if lower.lstrip("-").isdigit(): - return int(lower) in (0, 1, 200) - return False - - -def _to_int_or_none(value: Any) -> Optional[int]: - try: - if value is None: - return None - if isinstance(value, bool): - return int(value) - if isinstance(value, (int, float)): - return int(value) - text = str(value).strip() - if not text: - return None - if text.lstrip("-").isdigit(): - return int(text) - return None - except Exception: - return None - - -def _extract_interface_message(data: dict) -> str: - for key in ("msg", "message", "error", "errorMsg", "detail", "retMsg"): - value = data.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - return "" - - -def _is_sms_already_sent_response(data: dict, message: str) -> bool: - code_keys = ("code", "retCode", "resultCode", "statusCode", "errno") - code_val = None - for key in code_keys: - if key in data: - code_val = _to_int_or_none(data.get(key)) - if code_val is not None: - break - if code_val != 1001: - return False - - text = (message or "").strip() - if not text: - return False - # 这类文案通常表示验证码已下发或短信频控,不应判定为滑块失败。 - hints = ("验证码已发送", "短信验证码已发送", "请稍后重试", "请稍等") - return any(h in text for h in hints) - - -def _assert_interface_success(body: Any) -> None: - data = _coerce_json_body(body) - if not isinstance(data, dict): - return - - fail_reasons: list[str] = [] - message = _extract_interface_message(data) - is_soft_success = _is_sms_already_sent_response(data, message) - - for key in ("success", "ok"): - if key in data and not bool(data[key]): - fail_reasons.append(f"{key}={data[key]}") - - for key in ("code", "retCode", "resultCode", "statusCode", "errno"): - if key not in data: - continue - code_ok = _code_indicates_success(data[key]) - if code_ok is False: - if is_soft_success: - continue - fail_reasons.append(f"{key}={data[key]}") - - if message: - lowered = message.lower() - fail_words = ("失败", "错误", "无效", "fail", "error", "invalid", "请重新", "未通过") - if any(word in lowered for word in fail_words): - if is_soft_success: - return - fail_reasons.append(f"msg={message}") - - if fail_reasons: - preview = str(data) - if len(preview) > 300: - preview = preview[:300] + "..." - raise RuntimeError(f"接口返回失败: {', '.join(fail_reasons)}; body={preview}") - - -def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]: - deadline = time.time() + timeout - last_toast = "" - while time.time() < deadline: - wait_span = min(0.35, max(0.05, deadline - time.time())) - packet = _normalize_listen_packet(page.listen.wait(timeout=wait_span, fit_count=False)) - if packet is not None: - return packet, last_toast - - toast = _read_slider_toast_text(page) - if toast: - last_toast = toast - - return None, last_toast - - -def _to_jsonable(value: Any) -> Any: - if value is None or isinstance(value, (str, int, float, bool)): - return value - if isinstance(value, dict): - return {str(k): _to_jsonable(v) for k, v in value.items()} - if isinstance(value, (list, tuple, set)): - return [_to_jsonable(v) for v in value] - return str(value) - - -def _analyze_slider_failure(match: dict, layout: dict, attempts: list[dict], last_reason: str) -> dict: - hypotheses: list[str] = [] - confidence_ratio = float(match.get("confidence_ratio", 0) or 0) - - if confidence_ratio < 1.03: - hypotheses.append("拼图匹配置信度偏低,背景与缺口特征区分不明显,易出现位移误差。") - - toast_texts = [str(a.get("toast") or "") for a in attempts if a.get("toast")] - if any("验证码错误" in t for t in toast_texts): - hypotheses.append("前端返回“验证码错误”,通常是最终落点偏差或拖动轨迹被风控识别。") - - interface_errors = [str(a.get("interface_error") or "") for a in attempts if a.get("interface_error")] - if interface_errors: - hypotheses.append("接口已返回失败状态,说明滑块校验请求已发出但服务端未判定通过。") - - packet_count = sum(1 for a in attempts if a.get("packet_received")) - if attempts and packet_count == 0: - hypotheses.append("多次拖动均未捕获到目标接口,请检查接口监听地址是否变化。") - - if attempts and bool(attempts[-1].get("verify_bar_visible_after")): - hypotheses.append("验证码弹窗仍可见,当前 challenge 未完成。") - - boundary_hits = sum(1 for a in attempts if a.get("boundary_hit")) - if boundary_hits > 0: - hypotheses.append("拖动距离命中轨道边界,存在位移被截断风险。") - - if not hypotheses and last_reason: - hypotheses.append(last_reason) - if not hypotheses: - hypotheses.append("未识别出单一原因,建议查看保存的标注图和报告。") - - return { - "summary": hypotheses[0], - "hypotheses": hypotheses, - "metrics": { - "confidence_ratio": confidence_ratio, - "best_score": match.get("best_score"), - "second_best": match.get("second_best"), - "attempt_count": len(attempts), - "packet_count": packet_count, - "track_max": layout.get("track_max"), - }, - } - - -def _save_failure_artifacts( - page, - bg_bytes: bytes, - piece_bytes: bytes, - match: dict, - layout: dict, - move_distances: list[int], - attempts: list[dict], - last_reason: str, - analysis: dict, -) -> Path: - root = Path(__file__).resolve().parent / "captcha_failures" - case_dir = root / datetime.now().strftime("%Y%m%d_%H%M%S_%f") - case_dir.mkdir(parents=True, exist_ok=True) - - (case_dir / "bg.png").write_bytes(bg_bytes) - (case_dir / "piece.png").write_bytes(piece_bytes) - - try: - page.get_screenshot(path=str(case_dir / "page.png")) - except Exception: - pass - - try: - bg_img = Image.open(BytesIO(bg_bytes)).convert("RGB") - draw = ImageDraw.Draw(bg_img) - y0 = int(match.get("piece_bbox_y0", 0) or 0) - h = int(match.get("piece_bbox_h", 0) or 0) - w = int(match.get("piece_bbox_w", 0) or 0) - candidate_xs = match.get("candidate_target_xs") or [match.get("target_x", 0)] - colors = [(255, 0, 0), (255, 140, 0), (0, 128, 255), (0, 180, 80), (160, 80, 255)] - for idx, x in enumerate(candidate_xs[:5], start=1): - color = colors[(idx - 1) % len(colors)] - x = int(x) - draw.rectangle([x, y0, x + w, y0 + h], outline=color, width=2 if idx == 1 else 1) - draw.text((x + 2, max(0, y0 - 14)), f"#{idx}", fill=color) - bg_img.save(case_dir / "bg_overlay.png") - except Exception: - pass - - report = { - "created_at": datetime.now().isoformat(timespec="seconds"), - "failure_reason": last_reason, - "analysis": analysis, - "match": _to_jsonable(match), - "layout": _to_jsonable(layout), - "move_distances": move_distances, - "attempts": _to_jsonable(attempts), - } - (case_dir / "report.json").write_text( - json.dumps(report, ensure_ascii=False, indent=2), - encoding="utf-8", - ) - return case_dir - - -# ---------- 滑块拖动(仿人轨迹) ---------- - -def _ease_out_quad(t: float) -> float: - return t * (2 - t) - - -def _ease_out_cubic(t: float) -> float: - return 1 - (1 - t) ** 3 - - -def _ease_out_bounce(t: float) -> float: - if t < 1 / 2.75: - return 7.5625 * t * t - elif t < 2 / 2.75: - t -= 1.5 / 2.75 - return 7.5625 * t * t + 0.75 - elif t < 2.5 / 2.75: - t -= 2.25 / 2.75 - return 7.5625 * t * t + 0.9375 - else: - t -= 2.625 / 2.75 - return 7.5625 * t * t + 0.984375 - - -def build_human_track(distance: int, num_steps: int = 0) -> list[dict]: - """生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。""" - if distance == 0: - return [] - - dist = abs(distance) - sign = 1 if distance > 0 else -1 - - if num_steps <= 0: - num_steps = max(12, int(dist * random.uniform(0.25, 0.4))) - - overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08))) - total = dist + overshoot - - easing = random.choice([_ease_out_quad, _ease_out_cubic]) - - raw_positions: list[float] = [] - for i in range(1, num_steps + 1): - t = i / num_steps - raw_positions.append(easing(t) * total) - - bounce_steps = random.randint(2, 4) - for j in range(1, bounce_steps + 1): - t = j / bounce_steps - raw_positions.append(total - _ease_out_bounce(t) * overshoot) - - track: list[dict] = [] - prev_x = 0.0 - for pos in raw_positions: - dx = round(pos - prev_x) - if dx == 0 and random.random() < 0.3: - continue - prev_x += dx - dy = random.choice([-1, 0, 0, 0, 1]) - dt = ( - random.uniform(0.005, 0.012) - if prev_x < dist * 0.6 - else random.uniform(0.008, 0.025) - ) - if random.random() < 0.03: - dt += random.uniform(0.02, 0.06) - track.append({"dx": sign * dx, "dy": dy, "dt": dt}) - - actual = sum(s["dx"] for s in track) - diff = distance - actual - if diff != 0: - track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)}) - - return track - - -def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None: - page.run_cdp( - "Input.dispatchMouseEvent", - type=event_type, - x=x, - y=y, - button=button, - clickCount=1 if event_type == "mousePressed" else 0, - ) - - -def _get_element_center(page, ele) -> tuple[int, int]: - rect = page.run_js( - """const r = arguments[0].getBoundingClientRect(); - return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""", - ele, - ) - if rect and isinstance(rect, dict): - return int(rect["x"]), int(rect["y"]) - loc = ele.rect.midpoint - return int(loc[0]), int(loc[1]) - - -def drag_slider(page, slider_ele, distance: int) -> None: - """用 CDP 级鼠标事件完成拖拽,模拟真人操作。""" - cx, cy = _get_element_center(page, slider_ele) - - _dispatch_mouse(page, "mouseMoved", cx, cy) - time.sleep(random.uniform(0.03, 0.08)) - - _dispatch_mouse(page, "mousePressed", cx, cy) - time.sleep(random.uniform(0.02, 0.06)) - - cur_x, cur_y = cx, cy - track = build_human_track(distance) - for step in track: - cur_x += step["dx"] - cur_y += step["dy"] - _dispatch_mouse(page, "mouseMoved", cur_x, cur_y) - time.sleep(step["dt"]) - - time.sleep(random.uniform(0.02, 0.06)) - _dispatch_mouse(page, "mouseReleased", cur_x, cur_y) - - -# ---------- 核心自动化流程 ---------- - -def submit_phone( - page, - phone: str, - url: str = "http://yscnb.com/tyyp/", - alpha_threshold: int = 12, - distance_adjust: int = 0, - wait_page: float = 0.3, -) -> Any: - """ - 填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。 - """ - page.get(url) - time.sleep(wait_page) - - # 1. 勾选协议 - agree_checkbox = find_first( - page, - [ - "css:#color-input-red", - "css:input[name='color-input-red']", - 'x://input[@id="color-input-red"]', - "css:input.right-box[type='checkbox']", - ], - timeout=5, - ) - if agree_checkbox: - click_safe(agree_checkbox) - time.sleep(0.4) - - # 2. 立即订购 - order_btn = None - for attempt in range(4): - order_btn = find_first( - page, - [ - "css:div.paybg", - "css:.paybg", - 'x://button[contains(.,"立即订购")]', - 'x://a[contains(.,"立即订购")]', - 'x://span[contains(.,"立即订购")]', - 'x://div[contains(.,"立即订购")]', - 'x://*[contains(text(),"立即订购")]', - 'x://*[contains(.,"立即订购")]', - "css:.btn-order", - "css:button.btn-primary", - "css:button.btn", - "css:a.btn", - ], - timeout=1, - ) - if order_btn: - break - time.sleep(0.25) - - if order_btn: - try: - order_btn.run_js("this.scrollIntoView({block:'center'})") - time.sleep(0.05) - except Exception: - pass - click_safe(order_btn) - time.sleep(0.4) - else: - try: - page.run_js(""" - var nodes = document.querySelectorAll('button, a, span, div'); - for (var i = 0; i < nodes.length; i++) { - var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); - if (t.indexOf('立即订购') >= 0) { - nodes[i].scrollIntoView({block: 'center'}); - nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); - return true; - } - } - return false; - """) - except Exception: - pass - time.sleep(0.4) - - # 3. 输入手机号 - phone_input = find_first( - page, - [ - 'x://*[@placeholder="请输入手机号码"]', - "css:input.inp-txt", - ], - timeout=8, - ) - if not phone_input: - raise RuntimeError("未找到手机号输入框") - - phone_input.input(phone, clear=True) - - agree = find_first( - page, - [ - "css:i.ico-checkbox", - 'x://i[contains(@class,"ico-checkbox")]', - ], - timeout=2, - ) - if agree: - try: - click_safe(agree) - except Exception: - pass - - # 4. 启动监听(必须在点击获取验证码之前) - page.listen.start(GET_YAN_ZHEN_MA_URL) - - send_btn = find_first( - page, - [ - "css:button.btn-code", - 'x://button[contains(text(),"获取验证码")]', - ], - timeout=8, - ) - if not send_btn: - raise RuntimeError("未找到「获取验证码」按钮") - - click_safe(send_btn) - - # 5. 等待滑块弹窗 - verify_box = find_first( - page, - ["css:.verifybox", "css:.verify-bar-area"], - timeout=6, - ) - if not verify_box: - raise RuntimeError("未检测到滑块验证码弹窗") - - bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) - piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) - slider = find_first(page, ["css:.verify-move-block"], timeout=5) - - if not bg_img or not piece_img or not slider: - raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") - - # 6. 每次重试前重新抓取验证码图片并重算位移(不复用旧距离) - deadline = time.time() + 15.0 - last_reason = "" - attempts: list[dict[str, Any]] = [] - max_attempts = 6 - last_bg_bytes = b"" - last_piece_bytes = b"" - last_match: dict[str, Any] = {} - last_layout: dict[str, Any] = {} - last_move_distances: list[int] = [] - - for idx in range(1, max_attempts + 1): - if time.time() >= deadline: - break - - bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=2) - piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=2) - slider = find_first(page, ["css:.verify-move-block"], timeout=2) - - attempt: dict[str, Any] = { - "index": idx, - "move_distance": None, - "boundary_hit": False, - "packet_received": False, - "toast": "", - "interface_error": "", - "verify_bar_visible_after": False, - } - - if not bg_img or not piece_img or not slider: - last_reason = "重试时未找到验证码关键元素(背景图/拼图块/滑块)" - attempt["interface_error"] = last_reason - attempts.append(attempt) - break - - try: - bg_src = wait_for_data_src(bg_img, timeout=4) - piece_src = wait_for_data_src(piece_img, timeout=4) - bg_bytes = parse_data_url(bg_src) - piece_bytes = parse_data_url(piece_src) - except Exception as e: - last_reason = f"重试时读取验证码图片失败: {e}" - attempt["interface_error"] = last_reason - attempts.append(attempt) - continue - - if len(bg_bytes) < 100 or len(piece_bytes) < 100: - last_reason = f"验证码图片数据异常: bg={len(bg_bytes)}B piece={len(piece_bytes)}B" - attempt["interface_error"] = last_reason - attempts.append(attempt) - continue - - try: - match = calc_drag_distance_from_bytes( - bg_bytes, piece_bytes, alpha_threshold=alpha_threshold - ) - except Exception as e: - last_reason = f"重试时位移计算失败: {e}" - attempt["interface_error"] = last_reason - attempts.append(attempt) - continue - - layout = _get_slider_layout_metrics(page, bg_img, slider) - bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"] - scale = float(bg_display_w) / max(1, match["bg_width"]) - track_max = int(layout["track_max"]) - move_distances = _build_move_distance_candidates( - match=match, - scale=scale, - distance_adjust=int(distance_adjust), - track_max=track_max, - ) - pick_index = min(max(0, idx - 1), len(move_distances) - 1) - move_distance = int(move_distances[pick_index]) - - attempt["move_distance"] = move_distance - attempt["boundary_hit"] = bool(track_max >= 0 and move_distance in (0, track_max)) - attempt["confidence_ratio"] = float(match.get("confidence_ratio", 0) or 0) - attempt["candidate_count"] = len(move_distances) - - # 记录最后一次有效计算,用于失败样本保存与分析 - last_bg_bytes = bg_bytes - last_piece_bytes = piece_bytes - last_match = match - last_layout = layout - last_move_distances = move_distances - - drag_slider(page, slider, move_distance) - time.sleep(0.2) - - remaining = max(0.1, deadline - time.time()) - packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining)) - attempt["toast"] = toast_text or "" - if packet is not None: - attempt["packet_received"] = True - response = getattr(packet, "response", None) - if response is None: - attempt["interface_error"] = f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段" - last_reason = attempt["interface_error"] - attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") - attempts.append(attempt) - if not attempt["verify_bar_visible_after"]: - break - continue - - body = _coerce_json_body(response.body) - attempt["response_body"] = _to_jsonable(body) - try: - _assert_interface_success(body) - return body - except Exception as e: - attempt["interface_error"] = str(e) - last_reason = attempt["interface_error"] - attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") - attempts.append(attempt) - if not attempt["verify_bar_visible_after"]: - break - continue - - if toast_text: - last_reason = f"滑块验证失败:{toast_text}" - elif _is_element_visible(page, ".verify-bar-area"): - last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}" - else: - last_reason = "验证码弹窗已关闭但未捕获接口响应" - attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") - attempt["interface_error"] = last_reason if not attempt["interface_error"] else attempt["interface_error"] - attempts.append(attempt) - - analysis = _analyze_slider_failure( - match=last_match, - layout=last_layout, - attempts=attempts, - last_reason=last_reason, - ) - save_info = "" - try: - case_dir = _save_failure_artifacts( - page=page, - bg_bytes=last_bg_bytes, - piece_bytes=last_piece_bytes, - match=last_match, - layout=last_layout, - move_distances=last_move_distances, - attempts=attempts, - last_reason=last_reason, - analysis=analysis, - ) - save_info = f"; 失败样本目录: {case_dir}" - except Exception as e: - save_info = f"; 失败样本保存失败: {e}" - - raise RuntimeError( - f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}" - f"; 分析: {analysis.get('summary', '无')}" - f"{save_info}" - ) - - -def submit_code(page, code: str) -> dict: - """填写短信验证码,点击确认订购按钮。""" - code_input = find_first( - page, - [ - 'x://input[@placeholder*="验证码"]', - 'x://input[@placeholder*="短信"]', - "css:input.inp-txt[type='text']", - "css:input[type='tel']", - "css:.code-input", - "css:input.verify-input", - ], - timeout=8, - ) - if not code_input: - raise RuntimeError("未找到验证码输入框") - - code_input.input(code, clear=True) - time.sleep(0.2) - - confirm_btn = find_first( - page, - [ - "css:img.btn-buy", - "css:.btn-buy", - 'x://img[contains(@class,"btn-buy")]', - 'x://*[contains(@class,"btn-buy")]', - ], - timeout=5, - ) - if not confirm_btn: - raise RuntimeError("未找到确认订购按钮(img.btn-buy)") - - try: - confirm_btn.run_js("this.scrollIntoView({block:'center'})") - time.sleep(0.05) - except Exception: - pass - - click_safe(confirm_btn) - return {"success": True, "message": "已点击确认订购"} - - -# ---------- 入口 ---------- - -def main() -> None: - parser = argparse.ArgumentParser(description="天翼云盘订购页自动化") - parser.add_argument("--api", action="store_true", help="以 FastAPI 服务模式启动") - parser.add_argument("--host", default="0.0.0.0", help="API 监听地址(--api 模式)") - parser.add_argument("--api-port", type=int, default=8000, help="API 监听端口(--api 模式)") - parser.add_argument( - "--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)" - ) - parser.add_argument("--url", default=DEFAULT_TARGET_URL, help="目标页面 URL") - parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建") - args = parser.parse_args() - - if args.api: - import uvicorn - uvicorn.run(app, host=args.host, port=args.api_port) - return - - if args.port: - co = ChromiumOptions().set_local_port(port=args.port) - page = ChromiumPage(addr_or_opts=co) - else: - page = ChromiumPage() - - print(f"打开页面: {args.url},手机号: {args.phone}") - body = submit_phone( - page=page, - phone=args.phone, - url=args.url, - ) - print("hn_userEquitys/getYanZhenMa/v2 响应:") - print(body) - - input_code(page=page, code=123123) - - -def input_code(page, code): - page.ele('x://input[@placeholder="请输入验证码"]').input(code, clear=True) - time.sleep(0.5) - - page.listen.start(ORDER_PACKET_URL) - page.ele('x://*[@id="app"]/div/img').click(by_js=True) - time.sleep(0.5) - page.ele('x://*[@id="app"]/div/div[7]/div/div[3]/button[2]').click(by_js=True) - res = _normalize_listen_packet(page.listen.wait(timeout=15, fit_count=False)) - if res is None: - raise RuntimeError(f"提交验证码后未捕获到 {ORDER_PACKET_URL} 抓包数据") - response = getattr(res, "response", None) - if response is None: - raise RuntimeError("提交验证码后抓包缺少 response 字段") - return _coerce_json_body(response.body) - - -# ---------- FastAPI 接口 ---------- - -class ApiSubmitPhoneRequest(BaseModel): - phone: str = Field(..., description="手机号码") - url: str = Field(DEFAULT_TARGET_URL, description="目标页面 URL") - proxy_api_url: str = Field(PROXY_SOURCE_URL, description="代理来源接口(返回 ip:port 文本)") - - -class ApiSubmitPhoneResponse(BaseModel): - success: bool = True - flow_id: str = Field(..., description="流程唯一标识符") - data: Any = Field(..., description="滑块流程抓包响应体(getYanZhenMa)") - phone: str = "" - url: str = "" - created_at: str = "" - proxy: str = "" - ua: str = "" - - -class ApiSubmitCodeRequest(BaseModel): - flow_id: str = Field(..., description="submit_phone 返回的流程唯一标识符") - code: str = Field(..., description="短信验证码") - - -class ApiSubmitCodeResponse(BaseModel): - success: bool = True - flow_id: str = "" - data: Any = Field(..., description="验证码提交后的抓包响应体") - - -app = FastAPI(title="test1 自动化 API", description="TgeBrowser + DrissionPage 天翼页面自动化接口") - -_flow_sessions: dict[str, dict[str, Any]] = {} -_flow_lock = threading.Lock() -_cleanup_thread_started = False - - -def _choose_mobile_ua() -> str: - return random.choice(MOBILE_UA_POOL) - - -def _fetch_proxy_text(proxy_api_url: str) -> str: - resp = requests.get( - proxy_api_url, - headers=PROXY_FETCH_HEADERS, - timeout=15, - ) - resp.raise_for_status() - text = (resp.text or "").strip() - if not text: - raise RuntimeError("代理接口返回为空") - first_line = text.splitlines()[0].strip() - if not first_line: - raise RuntimeError("代理接口返回格式异常:首行为空") - return first_line - - -def _parse_proxy_addr(proxy_text: str) -> tuple[str, int]: - m = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text) - if not m: - raise RuntimeError(f"代理格式不合法(期望 ip:port): {proxy_text}") - host = m.group(1) - port = int(m.group(2)) - if not (1 <= port <= 65535): - raise RuntimeError(f"代理端口不合法: {port}") - return host, port - - -def _connect_page_from_start_data(start_data: dict) -> ChromiumPage: - port = start_data.get("port") - ws = start_data.get("ws") - if port: - co = ChromiumOptions().set_local_port(port=int(port)) - return ChromiumPage(addr_or_opts=co) - if ws: - return ChromiumPage(addr_or_opts=ws) - raise RuntimeError("TgeBrowser 未返回 port 或 ws") - - -def _create_tgebrowser_browser( - client: TgeBrowserClient, - browser_name: str, - start_page_url: str, - proxy_host: str, - proxy_port: int, - mobile_ua: str, -) -> dict: - proxy_candidates = [ - {"protocol": "http", "host": proxy_host, "port": proxy_port}, - {"protocol": "http", "host": proxy_host, "port": str(proxy_port)}, - {"protocol": "http", "server": f"{proxy_host}:{proxy_port}"}, - ] - last_error: Optional[Exception] = None - for proxy_conf in proxy_candidates: - try: - return client.create_browser( - browser_name=browser_name, - start_page_url=start_page_url, - proxy=proxy_conf, - fingerprint={ - "os": "Android", - "platformVersion": 12, - "userAgent": mobile_ua, - }, - ) - except Exception as e: - last_error = e - continue - raise RuntimeError(f"创建浏览器失败(代理配置尝试均失败): {last_error}") - - -def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None: - try: - page.run_cdp("Network.enable") - except Exception: - pass - try: - page.run_cdp("Network.setUserAgentOverride", userAgent=ua) - except Exception: - pass - - -def _close_flow_resources(flow: dict[str, Any]) -> None: - page = flow.get("page") - client = flow.get("client") - env_id = flow.get("env_id") - try: - if page: - page.quit() - except Exception: - pass - try: - if client and env_id is not None: - client.stop_browser(env_id=env_id) - except Exception: - pass - - -def _cleanup_expired_flows() -> None: - now_ts = time.time() - expired: list[dict[str, Any]] = [] - with _flow_lock: - for flow_id, flow in list(_flow_sessions.items()): - if flow.get("busy"): - continue - last_active_ts = float(flow.get("last_active_ts", now_ts)) - if now_ts - last_active_ts > SESSION_IDLE_SECONDS: - expired.append(_flow_sessions.pop(flow_id)) - for flow in expired: - _close_flow_resources(flow) - - -def _flow_cleanup_worker() -> None: - while True: - try: - _cleanup_expired_flows() - except Exception: - pass - time.sleep(15) - - -def _ensure_cleanup_thread() -> None: - global _cleanup_thread_started - with _flow_lock: - if _cleanup_thread_started: - return - th = threading.Thread(target=_flow_cleanup_worker, name="flow_cleanup_worker", daemon=True) - th.start() - _cleanup_thread_started = True - - -@app.on_event("startup") -def _api_startup() -> None: - _ensure_cleanup_thread() - - -@app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse) -def api_submit_phone(req: ApiSubmitPhoneRequest): - _ensure_cleanup_thread() - - client: Optional[TgeBrowserClient] = None - page: Optional[ChromiumPage] = None - env_id: Optional[int] = None - stored = False - - try: - proxy_text = _fetch_proxy_text(req.proxy_api_url) - proxy_host, proxy_port = _parse_proxy_addr(proxy_text) - mobile_ua = _choose_mobile_ua() - - client = TgeBrowserClient() - browser_name = f"tyyp_{req.phone[-4:]}_{int(time.time())}" - create_data = _create_tgebrowser_browser( - client=client, - browser_name=browser_name, - start_page_url=req.url, - proxy_host=proxy_host, - proxy_port=proxy_port, - mobile_ua=mobile_ua, - ) - env_id = create_data.get("envId") - if env_id is None: - raise RuntimeError("创建浏览器失败:未返回 envId") - - start_data = client.start_browser(env_id=env_id) - page = _connect_page_from_start_data(start_data) - _apply_mobile_ua(page, mobile_ua) - - packet_body = submit_phone( - page=page, - phone=req.phone, - url=req.url, - ) - - flow_id = str(uuid.uuid4()) - created = datetime.now() - flow = { - "flow_id": flow_id, - "page": page, - "client": client, - "env_id": env_id, - "phone": req.phone, - "url": req.url, - "proxy": proxy_text, - "ua": mobile_ua, - "created_at": created, - "created_at_iso": created.isoformat(timespec="seconds"), - "created_ts": time.time(), - "last_active_ts": time.time(), - "busy": False, - } - with _flow_lock: - _flow_sessions[flow_id] = flow - stored = True - - return ApiSubmitPhoneResponse( - success=True, - flow_id=flow_id, - data=packet_body, - phone=req.phone, - url=req.url, - created_at=flow["created_at_iso"], - proxy=proxy_text, - ua=mobile_ua, - ) - except Exception as e: - if not stored: - _close_flow_resources({"page": page, "client": client, "env_id": env_id}) - raise HTTPException(status_code=500, detail=str(e)) - - -@app.post("/api/submit_code", response_model=ApiSubmitCodeResponse) -def api_submit_code(req: ApiSubmitCodeRequest): - _ensure_cleanup_thread() - with _flow_lock: - flow = _flow_sessions.get(req.flow_id) - if not flow: - raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {req.flow_id}") - flow["busy"] = True - flow["last_active_ts"] = time.time() - page = flow.get("page") - - if not page: - with _flow_lock: - flow = _flow_sessions.get(req.flow_id) - if flow: - flow["busy"] = False - flow["last_active_ts"] = time.time() - raise HTTPException(status_code=500, detail="流程页面对象缺失") - - try: - packet_body = input_code(page=page, code=req.code) - return ApiSubmitCodeResponse( - success=True, - flow_id=req.flow_id, - data=packet_body, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - finally: - with _flow_lock: - flow = _flow_sessions.get(req.flow_id) - if flow: - flow["busy"] = False - flow["last_active_ts"] = time.time() - - -@app.get("/api/flow/{flow_id}") -def api_get_flow(flow_id: str): - with _flow_lock: - flow = _flow_sessions.get(flow_id) - if not flow: - raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}") - return { - "success": True, - "flow_id": flow_id, - "phone": flow.get("phone"), - "url": flow.get("url"), - "proxy": flow.get("proxy"), - "ua": flow.get("ua"), - "created_at": flow.get("created_at_iso"), - "last_active_ts": flow.get("last_active_ts"), - } - - -@app.delete("/api/flow/{flow_id}") -def api_close_flow(flow_id: str): - with _flow_lock: - flow = _flow_sessions.pop(flow_id, None) - if not flow: - return {"success": False, "message": "流程不存在"} - _close_flow_resources(flow) - return {"success": True, "message": "流程已关闭"} if __name__ == "__main__": main() diff --git a/tyyp_app/__init__.py b/tyyp_app/__init__.py new file mode 100644 index 0000000..2a40127 --- /dev/null +++ b/tyyp_app/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations + +from tyyp_app.api import app, create_app +from tyyp_app.automation import input_code, submit_code, submit_phone + +__all__ = [ + "app", + "create_app", + "submit_phone", + "submit_code", + "input_code", +] diff --git a/tyyp_app/api.py b/tyyp_app/api.py new file mode 100644 index 0000000..71616fe --- /dev/null +++ b/tyyp_app/api.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations + +from fastapi import FastAPI, HTTPException + +from tyyp_app.config import FLOW_CLEANUP_INTERVAL_SECONDS, PROXY_FETCH_HEADERS, SESSION_IDLE_SECONDS +from tyyp_app.schemas import ( + ApiSubmitCodeRequest, + ApiSubmitCodeResponse, + ApiSubmitPhoneRequest, + ApiSubmitPhoneResponse, +) +from tyyp_app.services import FlowService, FlowSessionStore, ProxyProvider + + +def create_app() -> FastAPI: + app = FastAPI( + title="test1 自动化 API", + description="TgeBrowser + DrissionPage 天翼页面自动化接口", + ) + + session_store = FlowSessionStore( + idle_seconds=SESSION_IDLE_SECONDS, + cleanup_interval=FLOW_CLEANUP_INTERVAL_SECONDS, + ) + flow_service = FlowService( + session_store=session_store, + proxy_provider=ProxyProvider(headers=PROXY_FETCH_HEADERS, timeout=15), + ) + + app.state.session_store = session_store + app.state.flow_service = flow_service + + @app.on_event("startup") + def _api_startup() -> None: + session_store.start_cleanup_worker() + + @app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse) + def api_submit_phone(req: ApiSubmitPhoneRequest): + try: + flow, packet_body = flow_service.submit_phone( + phone=req.phone, + url=req.url, + proxy_api_url=req.proxy_api_url, + ) + return ApiSubmitPhoneResponse( + success=True, + flow_id=flow["flow_id"], + data=packet_body, + phone=flow["phone"], + url=flow["url"], + created_at=flow["created_at_iso"], + proxy=flow["proxy"], + ua=flow["ua"], + ) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + @app.post("/api/submit_code", response_model=ApiSubmitCodeResponse) + def api_submit_code(req: ApiSubmitCodeRequest): + try: + packet_body = flow_service.submit_code(flow_id=req.flow_id, code=req.code) + return ApiSubmitCodeResponse(success=True, flow_id=req.flow_id, data=packet_body) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + except Exception as exc: + raise HTTPException(status_code=500, detail=str(exc)) + + @app.get("/api/flow/{flow_id}") + def api_get_flow(flow_id: str): + meta = flow_service.get_flow_meta(flow_id) + if not meta: + raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}") + return {"success": True, **meta} + + @app.delete("/api/flow/{flow_id}") + def api_close_flow(flow_id: str): + closed = flow_service.close_flow(flow_id) + if not closed: + return {"success": False, "message": "流程不存在"} + return {"success": True, "message": "流程已关闭"} + + return app + + +app = create_app() diff --git a/tyyp_app/automation.py b/tyyp_app/automation.py new file mode 100644 index 0000000..6c0d282 --- /dev/null +++ b/tyyp_app/automation.py @@ -0,0 +1,1017 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""天翼云盘订购页自动化核心逻辑(滑块识别、拖动、抓包与失败分析)。""" +from __future__ import annotations + +import json +from typing import Any, Optional +import base64 +import random +import time +from io import BytesIO +from datetime import datetime +from pathlib import Path + +import numpy as np +from PIL import Image, ImageDraw + +from tyyp_app.config import GET_YAN_ZHEN_MA_URL, ORDER_PACKET_URL, PROJECT_ROOT + + +# ---------- 拼图验证码计算 ---------- + +def _to_rgba_array(image_bytes: bytes) -> np.ndarray: + return np.array(Image.open(BytesIO(image_bytes)).convert("RGBA"), dtype=np.int16) + + +def _to_rgb_array(image_bytes: bytes) -> np.ndarray: + return np.array(Image.open(BytesIO(image_bytes)).convert("RGB"), dtype=np.int16) + + +def _piece_bbox(alpha: np.ndarray, threshold: int = 12) -> tuple[int, int, int, int]: + mask = alpha > threshold + if not mask.any(): + raise ValueError("拼图块 alpha 全透明,无法匹配") + ys, xs = np.where(mask) + return int(xs.min()), int(ys.min()), int(xs.max()) + 1, int(ys.max()) + 1 + + +def _to_gray(rgb: np.ndarray) -> np.ndarray: + return ( + rgb[:, :, 0] * 0.299 + rgb[:, :, 1] * 0.587 + rgb[:, :, 2] * 0.114 + ).astype(np.float32) + + +def _grad_x(gray: np.ndarray) -> np.ndarray: + pad = np.pad(gray, ((0, 0), (1, 1)), mode="edge") + return np.abs(pad[:, 2:] - pad[:, :-2]) * 0.5 + + +def calc_drag_distance_from_bytes( + bg_bytes: bytes, piece_bytes: bytes, alpha_threshold: int = 12 +) -> dict: + """计算拼图目标位移,输出多个候选位移提高滑块命中率。""" + bg = _to_rgb_array(bg_bytes).astype(np.float32) + piece_rgba = _to_rgba_array(piece_bytes).astype(np.float32) + + bh, bw = bg.shape[:2] + ph, _ = piece_rgba.shape[:2] + if bh != ph: + raise ValueError(f"背景与拼图块高度不一致: {bh} != {ph}") + + alpha = piece_rgba[:, :, 3] + x0, y0, x1, y1 = _piece_bbox(alpha, threshold=alpha_threshold) + + piece_crop = piece_rgba[y0:y1, x0:x1, :3] + alpha_crop = alpha[y0:y1, x0:x1] + mask = alpha_crop > alpha_threshold + ys, xs = np.where(mask) + if len(xs) < 10: + raise ValueError("拼图有效像素过少,无法稳定匹配") + + piece_gray = _to_gray(piece_crop) + bg_gray = _to_gray(bg) + piece_grad = _grad_x(piece_gray) + bg_grad = _grad_x(bg_gray) + + piece_gray_pixels = piece_gray[ys, xs] + piece_grad_pixels = piece_grad[ys, xs] + weights = np.clip(alpha_crop[ys, xs].astype(np.float32), 1.0, 255.0) + weights = weights / float(weights.sum()) + + patch_h, patch_w = piece_crop.shape[:2] + if patch_w > bw or patch_h > bh: + raise ValueError("拼图块裁剪尺寸超过背景图") + + max_x = bw - patch_w + scores: list[tuple[float, int, float, float]] = [] + + for x in range(max_x + 1): + patch_gray_pixels = bg_gray[y0 + ys, x + xs] + patch_grad_pixels = bg_grad[y0 + ys, x + xs] + color_score = float((np.abs(patch_gray_pixels - piece_gray_pixels) * weights).sum()) + grad_score = float((np.abs(patch_grad_pixels - piece_grad_pixels) * weights).sum()) + # 颜色 + 边缘加权,较纯 RGB 差值在压缩噪声下更稳定 + score = color_score * 0.72 + grad_score * 0.28 + scores.append((score, x, color_score, grad_score)) + + scores.sort(key=lambda item: item[0]) + best_score, best_x, best_color_score, best_grad_score = scores[0] + second_best = scores[1][0] if len(scores) > 1 else float("inf") + + drag_distance = best_x - x0 + confidence_ratio = (second_best / best_score) if best_score > 0 else float("inf") + + candidate_xs: list[int] = [] + for _score, x, _color, _grad in scores: + if any(abs(x - picked) <= 1 for picked in candidate_xs): + continue + candidate_xs.append(int(x)) + if len(candidate_xs) >= 5: + break + + candidate_distances = [int(x - x0) for x in candidate_xs] + + return { + "target_x": int(best_x), + "piece_bbox_x0": int(x0), + "piece_bbox_y0": int(y0), + "piece_bbox_w": int(patch_w), + "piece_bbox_h": int(patch_h), + "bg_width": int(bw), + "bg_height": int(bh), + "drag_distance": int(drag_distance), + "best_score": best_score, + "best_color_score": best_color_score, + "best_grad_score": best_grad_score, + "second_best": second_best, + "confidence_ratio": confidence_ratio, + "candidate_target_xs": candidate_xs, + "candidate_drag_distances": candidate_distances, + } + + +def parse_data_url(data_url: str) -> bytes: + if not data_url.startswith("data:image"): + raise ValueError("图片不是 data:image URL") + _, data = data_url.split(",", 1) + return base64.b64decode(data) + + +# ---------- 元素查找与点击 ---------- + +def click_safe(ele) -> None: + try: + ele.click() + return + except Exception: + pass + ele.click(by_js=True) + + +def find_first(page, selectors: list[str], timeout: float = 5): + for sel in selectors: + try: + ele = page.ele(sel, timeout=timeout) + if ele: + return ele + except Exception: + continue + return None + + +def wait_for_data_src(img_ele, timeout: float = 6, interval: float = 0.12) -> str: + """轮询等待 img 元素的 src 变为有效 data:image URL。""" + deadline = time.time() + timeout + while time.time() < deadline: + src = img_ele.attr("src") or "" + if src.startswith("data:image"): + _prefix, _, b64 = src.partition(",") + if b64.strip(): + return src + time.sleep(interval) + raise RuntimeError( + f"等待 data:image src 超时({timeout}s),当前 src 前80字符: {(img_ele.attr('src') or '')[:80]}" + ) + + +def _is_element_visible(page, selector: str) -> bool: + try: + visible = page.run_js( + """ + const el = document.querySelector(arguments[0]); + if (!el) return false; + const st = window.getComputedStyle(el); + if (st.display === 'none' || st.visibility === 'hidden' || st.opacity === '0') return false; + const r = el.getBoundingClientRect(); + return r.width > 0 && r.height > 0; + """, + selector, + ) + return bool(visible) + except Exception: + return False + + +def _read_slider_toast_text(page) -> str: + try: + text = page.run_js( + """ + const node = document.querySelector('.van-toast--text .van-toast__text') + || document.querySelector('.van-toast__text'); + if (!node) return ''; + const popup = node.closest('.van-toast') || node; + const st = window.getComputedStyle(popup); + const visible = st.display !== 'none' && st.visibility !== 'hidden' && st.opacity !== '0'; + if (!visible) return ''; + return (node.innerText || node.textContent || '').trim(); + """ + ) + return (text or "").strip() + except Exception: + return "" + + +def _get_slider_layout_metrics(page, bg_img, slider_ele) -> dict: + try: + metrics = page.run_js( + """ + const bg = arguments[0]; + const slider = arguments[1]; + const bar = document.querySelector('.verify-bar-area'); + const sub = slider ? slider.querySelector('.verify-sub-block') : null; + const bgR = bg ? bg.getBoundingClientRect() : {width: 0}; + const sliderR = slider ? slider.getBoundingClientRect() : {width: 0}; + const barR = bar ? bar.getBoundingClientRect() : {width: 0}; + const subR = sub ? sub.getBoundingClientRect() : {width: 0}; + return { + bg_display_w: Number(bgR.width || 0), + slider_w: Number(sliderR.width || 0), + bar_w: Number(barR.width || 0), + sub_w: Number(subR.width || 0), + }; + """, + bg_img, + slider_ele, + ) + except Exception: + metrics = None + + if not isinstance(metrics, dict): + metrics = {} + + bar_w = float(metrics.get("bar_w", 0) or 0) + slider_w = float(metrics.get("slider_w", 0) or 0) + track_max = int(round(max(0.0, bar_w - slider_w))) if bar_w > 0 and slider_w > 0 else -1 + + return { + "bg_display_w": float(metrics.get("bg_display_w", 0) or 0), + "slider_w": slider_w, + "bar_w": bar_w, + "sub_w": float(metrics.get("sub_w", 0) or 0), + "track_max": track_max, + } + + +def _build_move_distance_candidates( + match: dict, + scale: float, + distance_adjust: int, + track_max: int, +) -> list[int]: + base_distances = match.get("candidate_drag_distances") or [int(match["drag_distance"])] + micro_offsets = (0, -1, 1, -2, 2, -3, 3) + + candidates: list[int] = [] + seen: set[int] = set() + + for base in base_distances[:4]: + scaled = int(round(int(base) * scale)) + int(distance_adjust) + for offset in micro_offsets: + val = scaled + offset + if track_max >= 0: + val = max(0, min(track_max, val)) + if val not in seen: + seen.add(val) + candidates.append(val) + + if not candidates: + fallback = int(round(int(match["drag_distance"]) * scale)) + int(distance_adjust) + if track_max >= 0: + fallback = max(0, min(track_max, fallback)) + candidates = [fallback] + + return candidates + + +def _normalize_listen_packet(packet): + if packet is False: + return None + if isinstance(packet, list): + return packet[0] if packet else None + return packet + + +def _coerce_json_body(body: Any) -> Any: + if not isinstance(body, str): + return body + raw = body.strip() + if not raw: + return body + if (raw.startswith("{") and raw.endswith("}")) or (raw.startswith("[") and raw.endswith("]")): + try: + return json.loads(raw) + except Exception: + return body + return body + + +def _code_indicates_success(value: Any) -> Optional[bool]: + if value is None: + return None + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return int(value) in (0, 1, 200) + + text = str(value).strip() + if not text: + return None + lower = text.lower() + if lower in {"0", "00", "000", "0000", "1", "200", "ok", "success", "true"}: + return True + if lower in {"false", "fail", "failed", "error", "err"}: + return False + if lower.lstrip("-").isdigit(): + return int(lower) in (0, 1, 200) + return False + + +def _to_int_or_none(value: Any) -> Optional[int]: + try: + if value is None: + return None + if isinstance(value, bool): + return int(value) + if isinstance(value, (int, float)): + return int(value) + text = str(value).strip() + if not text: + return None + if text.lstrip("-").isdigit(): + return int(text) + return None + except Exception: + return None + + +def _extract_interface_message(data: dict) -> str: + for key in ("msg", "message", "error", "errorMsg", "detail", "retMsg"): + value = data.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return "" + + +def _is_sms_already_sent_response(data: dict, message: str) -> bool: + code_keys = ("code", "retCode", "resultCode", "statusCode", "errno") + code_val = None + for key in code_keys: + if key in data: + code_val = _to_int_or_none(data.get(key)) + if code_val is not None: + break + if code_val != 1001: + return False + + text = (message or "").strip() + if not text: + return False + # 这类文案通常表示验证码已下发或短信频控,不应判定为滑块失败。 + hints = ("验证码已发送", "短信验证码已发送", "请稍后重试", "请稍等") + return any(h in text for h in hints) + + +def _assert_interface_success(body: Any) -> None: + data = _coerce_json_body(body) + if not isinstance(data, dict): + return + + fail_reasons: list[str] = [] + message = _extract_interface_message(data) + is_soft_success = _is_sms_already_sent_response(data, message) + + for key in ("success", "ok"): + if key in data and not bool(data[key]): + fail_reasons.append(f"{key}={data[key]}") + + for key in ("code", "retCode", "resultCode", "statusCode", "errno"): + if key not in data: + continue + code_ok = _code_indicates_success(data[key]) + if code_ok is False: + if is_soft_success: + continue + fail_reasons.append(f"{key}={data[key]}") + + if message: + lowered = message.lower() + fail_words = ("失败", "错误", "无效", "fail", "error", "invalid", "请重新", "未通过") + if any(word in lowered for word in fail_words): + if is_soft_success: + return + fail_reasons.append(f"msg={message}") + + if fail_reasons: + preview = str(data) + if len(preview) > 300: + preview = preview[:300] + "..." + raise RuntimeError(f"接口返回失败: {', '.join(fail_reasons)}; body={preview}") + + +def _wait_packet_or_feedback(page, timeout: float) -> tuple[Any, str]: + deadline = time.time() + timeout + last_toast = "" + while time.time() < deadline: + wait_span = min(0.35, max(0.05, deadline - time.time())) + packet = _normalize_listen_packet(page.listen.wait(timeout=wait_span, fit_count=False)) + if packet is not None: + return packet, last_toast + + toast = _read_slider_toast_text(page) + if toast: + last_toast = toast + + return None, last_toast + + +def _to_jsonable(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(k): _to_jsonable(v) for k, v in value.items()} + if isinstance(value, (list, tuple, set)): + return [_to_jsonable(v) for v in value] + return str(value) + + +def _analyze_slider_failure(match: dict, layout: dict, attempts: list[dict], last_reason: str) -> dict: + hypotheses: list[str] = [] + confidence_ratio = float(match.get("confidence_ratio", 0) or 0) + + if confidence_ratio < 1.03: + hypotheses.append("拼图匹配置信度偏低,背景与缺口特征区分不明显,易出现位移误差。") + + toast_texts = [str(a.get("toast") or "") for a in attempts if a.get("toast")] + if any("验证码错误" in t for t in toast_texts): + hypotheses.append("前端返回“验证码错误”,通常是最终落点偏差或拖动轨迹被风控识别。") + + interface_errors = [str(a.get("interface_error") or "") for a in attempts if a.get("interface_error")] + if interface_errors: + hypotheses.append("接口已返回失败状态,说明滑块校验请求已发出但服务端未判定通过。") + + packet_count = sum(1 for a in attempts if a.get("packet_received")) + if attempts and packet_count == 0: + hypotheses.append("多次拖动均未捕获到目标接口,请检查接口监听地址是否变化。") + + if attempts and bool(attempts[-1].get("verify_bar_visible_after")): + hypotheses.append("验证码弹窗仍可见,当前 challenge 未完成。") + + boundary_hits = sum(1 for a in attempts if a.get("boundary_hit")) + if boundary_hits > 0: + hypotheses.append("拖动距离命中轨道边界,存在位移被截断风险。") + + if not hypotheses and last_reason: + hypotheses.append(last_reason) + if not hypotheses: + hypotheses.append("未识别出单一原因,建议查看保存的标注图和报告。") + + return { + "summary": hypotheses[0], + "hypotheses": hypotheses, + "metrics": { + "confidence_ratio": confidence_ratio, + "best_score": match.get("best_score"), + "second_best": match.get("second_best"), + "attempt_count": len(attempts), + "packet_count": packet_count, + "track_max": layout.get("track_max"), + }, + } + + +def _save_failure_artifacts( + page, + bg_bytes: bytes, + piece_bytes: bytes, + match: dict, + layout: dict, + move_distances: list[int], + attempts: list[dict], + last_reason: str, + analysis: dict, +) -> Path: + root = PROJECT_ROOT / "captcha_failures" + case_dir = root / datetime.now().strftime("%Y%m%d_%H%M%S_%f") + case_dir.mkdir(parents=True, exist_ok=True) + + (case_dir / "bg.png").write_bytes(bg_bytes) + (case_dir / "piece.png").write_bytes(piece_bytes) + + try: + page.get_screenshot(path=str(case_dir / "page.png")) + except Exception: + pass + + try: + bg_img = Image.open(BytesIO(bg_bytes)).convert("RGB") + draw = ImageDraw.Draw(bg_img) + y0 = int(match.get("piece_bbox_y0", 0) or 0) + h = int(match.get("piece_bbox_h", 0) or 0) + w = int(match.get("piece_bbox_w", 0) or 0) + candidate_xs = match.get("candidate_target_xs") or [match.get("target_x", 0)] + colors = [(255, 0, 0), (255, 140, 0), (0, 128, 255), (0, 180, 80), (160, 80, 255)] + for idx, x in enumerate(candidate_xs[:5], start=1): + color = colors[(idx - 1) % len(colors)] + x = int(x) + draw.rectangle([x, y0, x + w, y0 + h], outline=color, width=2 if idx == 1 else 1) + draw.text((x + 2, max(0, y0 - 14)), f"#{idx}", fill=color) + bg_img.save(case_dir / "bg_overlay.png") + except Exception: + pass + + report = { + "created_at": datetime.now().isoformat(timespec="seconds"), + "failure_reason": last_reason, + "analysis": analysis, + "match": _to_jsonable(match), + "layout": _to_jsonable(layout), + "move_distances": move_distances, + "attempts": _to_jsonable(attempts), + } + (case_dir / "report.json").write_text( + json.dumps(report, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + return case_dir + + +# ---------- 滑块拖动(仿人轨迹) ---------- + +def _ease_out_quad(t: float) -> float: + return t * (2 - t) + + +def _ease_out_cubic(t: float) -> float: + return 1 - (1 - t) ** 3 + + +def _ease_out_bounce(t: float) -> float: + if t < 1 / 2.75: + return 7.5625 * t * t + elif t < 2 / 2.75: + t -= 1.5 / 2.75 + return 7.5625 * t * t + 0.75 + elif t < 2.5 / 2.75: + t -= 2.25 / 2.75 + return 7.5625 * t * t + 0.9375 + else: + t -= 2.625 / 2.75 + return 7.5625 * t * t + 0.984375 + + +def build_human_track(distance: int, num_steps: int = 0) -> list[dict]: + """生成仿人轨迹列表:加速-匀速-减速-过冲-回弹。""" + if distance == 0: + return [] + + dist = abs(distance) + sign = 1 if distance > 0 else -1 + + if num_steps <= 0: + num_steps = max(12, int(dist * random.uniform(0.25, 0.4))) + + overshoot = random.randint(max(2, int(dist * 0.03)), max(3, int(dist * 0.08))) + total = dist + overshoot + + easing = random.choice([_ease_out_quad, _ease_out_cubic]) + + raw_positions: list[float] = [] + for i in range(1, num_steps + 1): + t = i / num_steps + raw_positions.append(easing(t) * total) + + bounce_steps = random.randint(2, 4) + for j in range(1, bounce_steps + 1): + t = j / bounce_steps + raw_positions.append(total - _ease_out_bounce(t) * overshoot) + + track: list[dict] = [] + prev_x = 0.0 + for pos in raw_positions: + dx = round(pos - prev_x) + if dx == 0 and random.random() < 0.3: + continue + prev_x += dx + dy = random.choice([-1, 0, 0, 0, 1]) + dt = ( + random.uniform(0.005, 0.012) + if prev_x < dist * 0.6 + else random.uniform(0.008, 0.025) + ) + if random.random() < 0.03: + dt += random.uniform(0.02, 0.06) + track.append({"dx": sign * dx, "dy": dy, "dt": dt}) + + actual = sum(s["dx"] for s in track) + diff = distance - actual + if diff != 0: + track.append({"dx": diff, "dy": 0, "dt": random.uniform(0.01, 0.03)}) + + return track + + +def _dispatch_mouse(page, event_type: str, x: int, y: int, button: str = "left") -> None: + page.run_cdp( + "Input.dispatchMouseEvent", + type=event_type, + x=x, + y=y, + button=button, + clickCount=1 if event_type == "mousePressed" else 0, + ) + + +def _get_element_center(page, ele) -> tuple[int, int]: + rect = page.run_js( + """const r = arguments[0].getBoundingClientRect(); + return {x: Math.round(r.x + r.width/2), y: Math.round(r.y + r.height/2)}""", + ele, + ) + if rect and isinstance(rect, dict): + return int(rect["x"]), int(rect["y"]) + loc = ele.rect.midpoint + return int(loc[0]), int(loc[1]) + + +def drag_slider(page, slider_ele, distance: int) -> None: + """用 CDP 级鼠标事件完成拖拽,模拟真人操作。""" + cx, cy = _get_element_center(page, slider_ele) + + _dispatch_mouse(page, "mouseMoved", cx, cy) + time.sleep(random.uniform(0.03, 0.08)) + + _dispatch_mouse(page, "mousePressed", cx, cy) + time.sleep(random.uniform(0.02, 0.06)) + + cur_x, cur_y = cx, cy + track = build_human_track(distance) + for step in track: + cur_x += step["dx"] + cur_y += step["dy"] + _dispatch_mouse(page, "mouseMoved", cur_x, cur_y) + time.sleep(step["dt"]) + + time.sleep(random.uniform(0.02, 0.06)) + _dispatch_mouse(page, "mouseReleased", cur_x, cur_y) + + +# ---------- 核心自动化流程 ---------- + +def submit_phone( + page, + phone: str, + url: str = "http://yscnb.com/tyyp/", + alpha_threshold: int = 12, + distance_adjust: int = 0, + wait_page: float = 0.3, +) -> Any: + """ + 填写手机号、点击获取验证码、执行滑块,返回 getYanZhenMa/v2 接口响应体。 + """ + page.get(url) + time.sleep(wait_page) + + # 1. 勾选协议 + agree_checkbox = find_first( + page, + [ + "css:#color-input-red", + "css:input[name='color-input-red']", + 'x://input[@id="color-input-red"]', + "css:input.right-box[type='checkbox']", + ], + timeout=5, + ) + if agree_checkbox: + click_safe(agree_checkbox) + time.sleep(0.4) + + # 2. 立即订购 + order_btn = None + for attempt in range(4): + order_btn = find_first( + page, + [ + "css:div.paybg", + "css:.paybg", + 'x://button[contains(.,"立即订购")]', + 'x://a[contains(.,"立即订购")]', + 'x://span[contains(.,"立即订购")]', + 'x://div[contains(.,"立即订购")]', + 'x://*[contains(text(),"立即订购")]', + 'x://*[contains(.,"立即订购")]', + "css:.btn-order", + "css:button.btn-primary", + "css:button.btn", + "css:a.btn", + ], + timeout=1, + ) + if order_btn: + break + time.sleep(0.25) + + if order_btn: + try: + order_btn.run_js("this.scrollIntoView({block:'center'})") + time.sleep(0.05) + except Exception: + pass + click_safe(order_btn) + time.sleep(0.4) + else: + try: + page.run_js(""" + var nodes = document.querySelectorAll('button, a, span, div'); + for (var i = 0; i < nodes.length; i++) { + var t = (nodes[i].innerText || nodes[i].textContent || '').trim(); + if (t.indexOf('立即订购') >= 0) { + nodes[i].scrollIntoView({block: 'center'}); + nodes[i].dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true, view: window})); + return true; + } + } + return false; + """) + except Exception: + pass + time.sleep(0.4) + + # 3. 输入手机号 + phone_input = find_first( + page, + [ + 'x://*[@placeholder="请输入手机号码"]', + "css:input.inp-txt", + ], + timeout=8, + ) + if not phone_input: + raise RuntimeError("未找到手机号输入框") + + phone_input.input(phone, clear=True) + + agree = find_first( + page, + [ + "css:i.ico-checkbox", + 'x://i[contains(@class,"ico-checkbox")]', + ], + timeout=2, + ) + if agree: + try: + click_safe(agree) + except Exception: + pass + + # 4. 启动监听(必须在点击获取验证码之前) + page.listen.start(GET_YAN_ZHEN_MA_URL) + + send_btn = find_first( + page, + [ + "css:button.btn-code", + 'x://button[contains(text(),"获取验证码")]', + ], + timeout=8, + ) + if not send_btn: + raise RuntimeError("未找到「获取验证码」按钮") + + click_safe(send_btn) + + # 5. 等待滑块弹窗 + verify_box = find_first( + page, + ["css:.verifybox", "css:.verify-bar-area"], + timeout=6, + ) + if not verify_box: + raise RuntimeError("未检测到滑块验证码弹窗") + + bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=5) + piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=5) + slider = find_first(page, ["css:.verify-move-block"], timeout=5) + + if not bg_img or not piece_img or not slider: + raise RuntimeError("验证码关键元素缺失(背景图/拼图块/滑块)") + + # 6. 每次重试前重新抓取验证码图片并重算位移(不复用旧距离) + deadline = time.time() + 15.0 + last_reason = "" + attempts: list[dict[str, Any]] = [] + max_attempts = 6 + last_bg_bytes = b"" + last_piece_bytes = b"" + last_match: dict[str, Any] = {} + last_layout: dict[str, Any] = {} + last_move_distances: list[int] = [] + + for idx in range(1, max_attempts + 1): + if time.time() >= deadline: + break + + bg_img = find_first(page, ["css:.verify-img-panel img"], timeout=2) + piece_img = find_first(page, ["css:.verify-sub-block img"], timeout=2) + slider = find_first(page, ["css:.verify-move-block"], timeout=2) + + attempt: dict[str, Any] = { + "index": idx, + "move_distance": None, + "boundary_hit": False, + "packet_received": False, + "toast": "", + "interface_error": "", + "verify_bar_visible_after": False, + } + + if not bg_img or not piece_img or not slider: + last_reason = "重试时未找到验证码关键元素(背景图/拼图块/滑块)" + attempt["interface_error"] = last_reason + attempts.append(attempt) + break + + try: + bg_src = wait_for_data_src(bg_img, timeout=4) + piece_src = wait_for_data_src(piece_img, timeout=4) + bg_bytes = parse_data_url(bg_src) + piece_bytes = parse_data_url(piece_src) + except Exception as e: + last_reason = f"重试时读取验证码图片失败: {e}" + attempt["interface_error"] = last_reason + attempts.append(attempt) + continue + + if len(bg_bytes) < 100 or len(piece_bytes) < 100: + last_reason = f"验证码图片数据异常: bg={len(bg_bytes)}B piece={len(piece_bytes)}B" + attempt["interface_error"] = last_reason + attempts.append(attempt) + continue + + try: + match = calc_drag_distance_from_bytes( + bg_bytes, piece_bytes, alpha_threshold=alpha_threshold + ) + except Exception as e: + last_reason = f"重试时位移计算失败: {e}" + attempt["interface_error"] = last_reason + attempts.append(attempt) + continue + + layout = _get_slider_layout_metrics(page, bg_img, slider) + bg_display_w = layout["bg_display_w"] if layout["bg_display_w"] > 0 else match["bg_width"] + scale = float(bg_display_w) / max(1, match["bg_width"]) + track_max = int(layout["track_max"]) + move_distances = _build_move_distance_candidates( + match=match, + scale=scale, + distance_adjust=int(distance_adjust), + track_max=track_max, + ) + pick_index = min(max(0, idx - 1), len(move_distances) - 1) + move_distance = int(move_distances[pick_index]) + + attempt["move_distance"] = move_distance + attempt["boundary_hit"] = bool(track_max >= 0 and move_distance in (0, track_max)) + attempt["confidence_ratio"] = float(match.get("confidence_ratio", 0) or 0) + attempt["candidate_count"] = len(move_distances) + + # 记录最后一次有效计算,用于失败样本保存与分析 + last_bg_bytes = bg_bytes + last_piece_bytes = piece_bytes + last_match = match + last_layout = layout + last_move_distances = move_distances + + drag_slider(page, slider, move_distance) + time.sleep(0.2) + + remaining = max(0.1, deadline - time.time()) + packet, toast_text = _wait_packet_or_feedback(page, timeout=min(4.0, remaining)) + attempt["toast"] = toast_text or "" + if packet is not None: + attempt["packet_received"] = True + response = getattr(packet, "response", None) + if response is None: + attempt["interface_error"] = f"捕获到 {GET_YAN_ZHEN_MA_URL} 数据包但无 response 字段" + last_reason = attempt["interface_error"] + attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") + attempts.append(attempt) + if not attempt["verify_bar_visible_after"]: + break + continue + + body = _coerce_json_body(response.body) + attempt["response_body"] = _to_jsonable(body) + try: + _assert_interface_success(body) + return body + except Exception as e: + attempt["interface_error"] = str(e) + last_reason = attempt["interface_error"] + attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") + attempts.append(attempt) + if not attempt["verify_bar_visible_after"]: + break + continue + + if toast_text: + last_reason = f"滑块验证失败:{toast_text}" + elif _is_element_visible(page, ".verify-bar-area"): + last_reason = f"第{idx}次拖动未触发 {GET_YAN_ZHEN_MA_URL}" + else: + last_reason = "验证码弹窗已关闭但未捕获接口响应" + attempt["verify_bar_visible_after"] = _is_element_visible(page, ".verify-bar-area") + attempt["interface_error"] = last_reason if not attempt["interface_error"] else attempt["interface_error"] + attempts.append(attempt) + + analysis = _analyze_slider_failure( + match=last_match, + layout=last_layout, + attempts=attempts, + last_reason=last_reason, + ) + save_info = "" + try: + case_dir = _save_failure_artifacts( + page=page, + bg_bytes=last_bg_bytes, + piece_bytes=last_piece_bytes, + match=last_match, + layout=last_layout, + move_distances=last_move_distances, + attempts=attempts, + last_reason=last_reason, + analysis=analysis, + ) + save_info = f"; 失败样本目录: {case_dir}" + except Exception as e: + save_info = f"; 失败样本保存失败: {e}" + + raise RuntimeError( + f"滑块未通过或接口未返回成功结果: {last_reason or '未知原因'}" + f"; 分析: {analysis.get('summary', '无')}" + f"{save_info}" + ) + + +def submit_code(page, code: str) -> dict: + """填写短信验证码,点击确认订购按钮。""" + code_input = find_first( + page, + [ + 'x://input[@placeholder*="验证码"]', + 'x://input[@placeholder*="短信"]', + "css:input.inp-txt[type='text']", + "css:input[type='tel']", + "css:.code-input", + "css:input.verify-input", + ], + timeout=8, + ) + if not code_input: + raise RuntimeError("未找到验证码输入框") + + code_input.input(code, clear=True) + time.sleep(0.2) + + confirm_btn = find_first( + page, + [ + "css:img.btn-buy", + "css:.btn-buy", + 'x://img[contains(@class,"btn-buy")]', + 'x://*[contains(@class,"btn-buy")]', + ], + timeout=5, + ) + if not confirm_btn: + raise RuntimeError("未找到确认订购按钮(img.btn-buy)") + + try: + confirm_btn.run_js("this.scrollIntoView({block:'center'})") + time.sleep(0.05) + except Exception: + pass + + click_safe(confirm_btn) + return {"success": True, "message": "已点击确认订购"} + + +def input_code(page, code): + page.ele('x://input[@placeholder="请输入验证码"]').input(code, clear=True) + time.sleep(0.5) + + page.listen.start(ORDER_PACKET_URL) + page.ele('x://*[@id="app"]/div/img').click(by_js=True) + time.sleep(0.5) + page.ele('x://*[@id="app"]/div/div[7]/div/div[3]/button[2]').click(by_js=True) + res = _normalize_listen_packet(page.listen.wait(timeout=15, fit_count=False)) + if res is None: + raise RuntimeError(f"提交验证码后未捕获到 {ORDER_PACKET_URL} 抓包数据") + response = getattr(res, "response", None) + if response is None: + raise RuntimeError("提交验证码后抓包缺少 response 字段") + return _coerce_json_body(response.body) diff --git a/tyyp_app/cli.py b/tyyp_app/cli.py new file mode 100644 index 0000000..ac8339c --- /dev/null +++ b/tyyp_app/cli.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations + +import argparse + +from DrissionPage import ChromiumOptions, ChromiumPage + +from tyyp_app.api import app +from tyyp_app.automation import input_code, submit_phone +from tyyp_app.config import DEFAULT_PHONE, DEFAULT_TARGET_URL + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="天翼云盘订购页自动化") + parser.add_argument("--api", action="store_true", help="以 FastAPI 服务模式启动") + parser.add_argument("--host", default="0.0.0.0", help="API 监听地址(--api 模式)") + parser.add_argument("--api-port", type=int, default=8000, help="API 监听端口(--api 模式)") + parser.add_argument("--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)") + parser.add_argument("--url", default=DEFAULT_TARGET_URL, help="目标页面 URL") + parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建") + return parser + + +def run_once(phone: str, url: str, port: int = 0) -> None: + if port: + co = ChromiumOptions().set_local_port(port=port) + page = ChromiumPage(addr_or_opts=co) + else: + page = ChromiumPage() + + print(f"打开页面: {url},手机号: {phone}") + body = submit_phone(page=page, phone=phone, url=url) + print("hn_userEquitys/getYanZhenMa/v2 响应:") + print(body) + + input_code(page=page, code=123123) + + +def main() -> None: + args = build_parser().parse_args() + + if args.api: + import uvicorn + + uvicorn.run(app, host=args.host, port=args.api_port) + return + + run_once(phone=args.phone, url=args.url, port=args.port) diff --git a/tyyp_app/config.py b/tyyp_app/config.py new file mode 100644 index 0000000..40a0138 --- /dev/null +++ b/tyyp_app/config.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations + +from pathlib import Path + +# 项目根目录(fws_code) +PROJECT_ROOT = Path(__file__).resolve().parent.parent + +DEFAULT_PHONE = "17375712810" +DEFAULT_TARGET_URL = "http://yscnb.com/tyyp/" + +GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" +ORDER_PACKET_URL = "hn_userEquitys/common/order" + +PROXY_SOURCE_URL = "http://47.109.106.79:7002/ProxIpServiceTxt" +SESSION_IDLE_SECONDS = 180 +FLOW_CLEANUP_INTERVAL_SECONDS = 15 + +MOBILE_UA_POOL = [ + "Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 13; M2012K11AC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 12; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 13; SM-S9180) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 12; V2227A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (iPhone; CPU iPhone OS 18_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Mobile/15E148 Safari/604.1", +] + +PROXY_FETCH_HEADERS = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "Cache-Control": "no-cache", + "DNT": "1", + "Pragma": "no-cache", + "Proxy-Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0", +} diff --git a/tyyp_app/schemas.py b/tyyp_app/schemas.py new file mode 100644 index 0000000..6d4e550 --- /dev/null +++ b/tyyp_app/schemas.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + +from tyyp_app.config import DEFAULT_TARGET_URL, PROXY_SOURCE_URL + + +class ApiSubmitPhoneRequest(BaseModel): + phone: str = Field(..., description="手机号码") + url: str = Field(DEFAULT_TARGET_URL, description="目标页面 URL") + proxy_api_url: str = Field(PROXY_SOURCE_URL, description="代理来源接口(返回 ip:port 文本)") + + +class ApiSubmitPhoneResponse(BaseModel): + success: bool = True + flow_id: str = Field(..., description="流程唯一标识符") + data: Any = Field(..., description="滑块流程抓包响应体(getYanZhenMa)") + phone: str = "" + url: str = "" + created_at: str = "" + proxy: str = "" + ua: str = "" + + +class ApiSubmitCodeRequest(BaseModel): + flow_id: str = Field(..., description="submit_phone 返回的流程唯一标识符") + code: str = Field(..., description="短信验证码") + + +class ApiSubmitCodeResponse(BaseModel): + success: bool = True + flow_id: str = "" + data: Any = Field(..., description="验证码提交后的抓包响应体") diff --git a/tyyp_app/services.py b/tyyp_app/services.py new file mode 100644 index 0000000..2661c6e --- /dev/null +++ b/tyyp_app/services.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations + +import random +import re +import threading +import time +import uuid +from datetime import datetime +from typing import Any, Optional + +import requests +from DrissionPage import ChromiumOptions, ChromiumPage + +from tgebrowser_client import TgeBrowserClient +from tyyp_app.automation import input_code, submit_phone +from tyyp_app.config import ( + FLOW_CLEANUP_INTERVAL_SECONDS, + MOBILE_UA_POOL, + SESSION_IDLE_SECONDS, +) + + +class ProxyProvider: + def __init__(self, headers: dict[str, str], timeout: float = 15): + self._headers = headers + self._timeout = timeout + + def fetch_proxy_text(self, proxy_api_url: str) -> str: + resp = requests.get(proxy_api_url, headers=self._headers, timeout=self._timeout) + resp.raise_for_status() + text = (resp.text or "").strip() + if not text: + raise RuntimeError("代理接口返回为空") + first_line = text.splitlines()[0].strip() + if not first_line: + raise RuntimeError("代理接口返回格式异常:首行为空") + return first_line + + @staticmethod + def parse_proxy_addr(proxy_text: str) -> tuple[str, int]: + match = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text) + if not match: + raise RuntimeError(f"代理格式不合法(期望 ip:port): {proxy_text}") + + host = match.group(1) + port = int(match.group(2)) + if not (1 <= port <= 65535): + raise RuntimeError(f"代理端口不合法: {port}") + return host, port + + +class FlowSessionStore: + def __init__( + self, + idle_seconds: int = SESSION_IDLE_SECONDS, + cleanup_interval: int = FLOW_CLEANUP_INTERVAL_SECONDS, + ): + self._idle_seconds = idle_seconds + self._cleanup_interval = cleanup_interval + self._sessions: dict[str, dict[str, Any]] = {} + self._lock = threading.Lock() + self._started = False + + def start_cleanup_worker(self) -> None: + with self._lock: + if self._started: + return + thread = threading.Thread(target=self._cleanup_loop, name="flow_cleanup_worker", daemon=True) + thread.start() + self._started = True + + def put(self, flow_id: str, flow: dict[str, Any]) -> None: + with self._lock: + self._sessions[flow_id] = flow + + def get(self, flow_id: str) -> Optional[dict[str, Any]]: + with self._lock: + return self._sessions.get(flow_id) + + def mark_busy(self, flow_id: str, busy: bool) -> Optional[dict[str, Any]]: + with self._lock: + flow = self._sessions.get(flow_id) + if not flow: + return None + flow["busy"] = busy + flow["last_active_ts"] = time.time() + return flow + + def touch(self, flow_id: str) -> None: + with self._lock: + flow = self._sessions.get(flow_id) + if flow: + flow["last_active_ts"] = time.time() + + def pop(self, flow_id: str) -> Optional[dict[str, Any]]: + with self._lock: + return self._sessions.pop(flow_id, None) + + def _cleanup_loop(self) -> None: + while True: + expired: list[dict[str, Any]] = [] + now_ts = time.time() + with self._lock: + for flow_id, flow in list(self._sessions.items()): + if flow.get("busy"): + continue + last_active_ts = float(flow.get("last_active_ts", now_ts)) + if now_ts - last_active_ts > self._idle_seconds: + expired.append(self._sessions.pop(flow_id)) + for flow in expired: + FlowService.close_flow_resources(flow) + time.sleep(self._cleanup_interval) + + +class FlowService: + def __init__(self, session_store: FlowSessionStore, proxy_provider: ProxyProvider): + self._store = session_store + self._proxy_provider = proxy_provider + + @staticmethod + def _choose_mobile_ua() -> str: + return random.choice(MOBILE_UA_POOL) + + @staticmethod + def _connect_page_from_start_data(start_data: dict[str, Any]) -> ChromiumPage: + port = start_data.get("port") + ws = start_data.get("ws") + if port: + co = ChromiumOptions().set_local_port(port=int(port)) + return ChromiumPage(addr_or_opts=co) + if ws: + return ChromiumPage(addr_or_opts=ws) + raise RuntimeError("TgeBrowser 未返回 port 或 ws") + + @staticmethod + def _create_tgebrowser_browser( + client: TgeBrowserClient, + browser_name: str, + start_page_url: str, + proxy_host: str, + proxy_port: int, + mobile_ua: str, + ) -> dict[str, Any]: + proxy_candidates = [ + {"protocol": "http", "host": proxy_host, "port": proxy_port}, + {"protocol": "http", "host": proxy_host, "port": str(proxy_port)}, + {"protocol": "http", "server": f"{proxy_host}:{proxy_port}"}, + ] + + last_error: Optional[Exception] = None + for proxy_conf in proxy_candidates: + try: + return client.create_browser( + browser_name=browser_name, + start_page_url=start_page_url, + proxy=proxy_conf, + fingerprint={ + "os": "Android", + "platformVersion": 12, + "userAgent": mobile_ua, + }, + ) + except Exception as exc: + last_error = exc + continue + + raise RuntimeError(f"创建浏览器失败(代理配置尝试均失败): {last_error}") + + @staticmethod + def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None: + try: + page.run_cdp("Network.enable") + except Exception: + pass + try: + page.run_cdp("Network.setUserAgentOverride", userAgent=ua) + except Exception: + pass + + @staticmethod + def close_flow_resources(flow: dict[str, Any]) -> None: + page = flow.get("page") + client = flow.get("client") + env_id = flow.get("env_id") + try: + if page: + page.quit() + except Exception: + pass + try: + if client and env_id is not None: + client.stop_browser(env_id=env_id) + except Exception: + pass + + def submit_phone(self, phone: str, url: str, proxy_api_url: str) -> tuple[dict[str, Any], Any]: + client: Optional[TgeBrowserClient] = None + page: Optional[ChromiumPage] = None + env_id: Optional[int] = None + + proxy_text = "" + mobile_ua = "" + try: + proxy_text = self._proxy_provider.fetch_proxy_text(proxy_api_url) + proxy_host, proxy_port = self._proxy_provider.parse_proxy_addr(proxy_text) + mobile_ua = self._choose_mobile_ua() + + client = TgeBrowserClient() + phone_tail = phone[-4:] if len(phone) >= 4 else phone + browser_name = f"tyyp_{phone_tail}_{int(time.time())}" + create_data = self._create_tgebrowser_browser( + client=client, + browser_name=browser_name, + start_page_url=url, + proxy_host=proxy_host, + proxy_port=proxy_port, + mobile_ua=mobile_ua, + ) + env_id = create_data.get("envId") + if env_id is None: + raise RuntimeError("创建浏览器失败:未返回 envId") + + start_data = client.start_browser(env_id=env_id) + page = self._connect_page_from_start_data(start_data) + self._apply_mobile_ua(page, mobile_ua) + packet_body = submit_phone(page=page, phone=phone, url=url) + + flow_id = str(uuid.uuid4()) + created = datetime.now() + flow = { + "flow_id": flow_id, + "page": page, + "client": client, + "env_id": env_id, + "phone": phone, + "url": url, + "proxy": proxy_text, + "ua": mobile_ua, + "created_at": created, + "created_at_iso": created.isoformat(timespec="seconds"), + "last_active_ts": time.time(), + "busy": False, + } + self._store.put(flow_id, flow) + return flow, packet_body + except Exception: + self.close_flow_resources({"page": page, "client": client, "env_id": env_id}) + raise + + def submit_code(self, flow_id: str, code: str) -> Any: + flow = self._store.mark_busy(flow_id, True) + if not flow: + raise KeyError(f"流程不存在或已过期: {flow_id}") + + page = flow.get("page") + if not page: + self._store.mark_busy(flow_id, False) + raise RuntimeError("流程页面对象缺失") + + try: + return input_code(page=page, code=code) + finally: + self._store.mark_busy(flow_id, False) + self._store.touch(flow_id) + + def get_flow_meta(self, flow_id: str) -> Optional[dict[str, Any]]: + flow = self._store.get(flow_id) + if not flow: + return None + + return { + "flow_id": flow_id, + "phone": flow.get("phone"), + "url": flow.get("url"), + "proxy": flow.get("proxy"), + "ua": flow.get("ua"), + "created_at": flow.get("created_at_iso"), + "last_active_ts": flow.get("last_active_ts"), + "busy": flow.get("busy", False), + } + + def close_flow(self, flow_id: str) -> bool: + flow = self._store.pop(flow_id) + if not flow: + return False + self.close_flow_resources(flow) + return True