From 71df2f694b13d006b44d28141794c58c6ad7ad5d Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Fri, 27 Feb 2026 16:17:00 +0800 Subject: [PATCH] haha --- test1.py | 370 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 365 insertions(+), 5 deletions(-) diff --git a/test1.py b/test1.py index c5615e8..b32f6d2 100644 --- a/test1.py +++ b/test1.py @@ -19,6 +19,9 @@ from __future__ import annotations import argparse import json +import re +import threading +import uuid from typing import Any, Optional import base64 import random @@ -29,14 +32,43 @@ from pathlib import Path import numpy as np from PIL import Image, ImageDraw +import requests from DrissionPage import ChromiumPage, ChromiumOptions +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field + +from tgebrowser_client import TgeBrowserClient # ========== 调试用:在代码中直接指定手机号 ========== DEFAULT_PHONE = "17375712810" # 需要监听的 URL 特征(滑块通过后前端会请求此接口) GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2" +ORDER_PACKET_URL = "hn_userEquitys/common/order" +DEFAULT_TARGET_URL = "http://yscnb.com/tyyp/" +PROXY_SOURCE_URL = "http://47.109.106.79:7002/ProxIpServiceTxt" +SESSION_IDLE_SECONDS = 180 + +MOBILE_UA_POOL = [ + "Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 13; M2012K11AC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 12; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 13; SM-S9180) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 12; V2227A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (iPhone; CPU iPhone OS 18_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Mobile/15E148 Safari/604.1", +] + +PROXY_FETCH_HEADERS = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "Cache-Control": "no-cache", + "DNT": "1", + "Pragma": "no-cache", + "Proxy-Connection": "keep-alive", + "Upgrade-Insecure-Requests": "1", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0", +} # ---------- 拼图验证码计算 ---------- @@ -1025,13 +1057,21 @@ def submit_code(page, code: str) -> dict: def main() -> None: parser = argparse.ArgumentParser(description="天翼云盘订购页自动化") + parser.add_argument("--api", action="store_true", help="以 FastAPI 服务模式启动") + parser.add_argument("--host", default="0.0.0.0", help="API 监听地址(--api 模式)") + parser.add_argument("--api-port", type=int, default=8000, help="API 监听端口(--api 模式)") parser.add_argument( "--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE)" ) - parser.add_argument("--url", default="http://yscnb.com/tyyp/", help="目标页面 URL") + parser.add_argument("--url", default=DEFAULT_TARGET_URL, help="目标页面 URL") parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口,0 表示新建") args = parser.parse_args() + if args.api: + import uvicorn + uvicorn.run(app, host=args.host, port=args.api_port) + return + if args.port: co = ChromiumOptions().set_local_port(port=args.port) page = ChromiumPage(addr_or_opts=co) @@ -1051,15 +1091,335 @@ def main() -> None: def input_code(page, code): - page.ele('x://input[@placeholder="请输入验证码"]').input(code,clear=True) + page.ele('x://input[@placeholder="请输入验证码"]').input(code, clear=True) time.sleep(0.5) - page.listen.start("hn_userEquitys/common/order") + page.listen.start(ORDER_PACKET_URL) page.ele('x://*[@id="app"]/div/img').click(by_js=True) time.sleep(0.5) page.ele('x://*[@id="app"]/div/div[7]/div/div[3]/button[2]').click(by_js=True) - res = page.listen.wait() - print(res.response.body) + res = _normalize_listen_packet(page.listen.wait(timeout=15, fit_count=False)) + if res is None: + raise RuntimeError(f"提交验证码后未捕获到 {ORDER_PACKET_URL} 抓包数据") + response = getattr(res, "response", None) + if response is None: + raise RuntimeError("提交验证码后抓包缺少 response 字段") + return _coerce_json_body(response.body) + + +# ---------- FastAPI 接口 ---------- + +class ApiSubmitPhoneRequest(BaseModel): + phone: str = Field(..., description="手机号码") + url: str = Field(DEFAULT_TARGET_URL, description="目标页面 URL") + proxy_api_url: str = Field(PROXY_SOURCE_URL, description="代理来源接口(返回 ip:port 文本)") + + +class ApiSubmitPhoneResponse(BaseModel): + success: bool = True + flow_id: str = Field(..., description="流程唯一标识符") + data: Any = Field(..., description="滑块流程抓包响应体(getYanZhenMa)") + phone: str = "" + url: str = "" + created_at: str = "" + proxy: str = "" + ua: str = "" + + +class ApiSubmitCodeRequest(BaseModel): + flow_id: str = Field(..., description="submit_phone 返回的流程唯一标识符") + code: str = Field(..., description="短信验证码") + + +class ApiSubmitCodeResponse(BaseModel): + success: bool = True + flow_id: str = "" + data: Any = Field(..., description="验证码提交后的抓包响应体") + + +app = FastAPI(title="test1 自动化 API", description="TgeBrowser + DrissionPage 天翼页面自动化接口") + +_flow_sessions: dict[str, dict[str, Any]] = {} +_flow_lock = threading.Lock() +_cleanup_thread_started = False + + +def _choose_mobile_ua() -> str: + return random.choice(MOBILE_UA_POOL) + + +def _fetch_proxy_text(proxy_api_url: str) -> str: + resp = requests.get( + proxy_api_url, + headers=PROXY_FETCH_HEADERS, + timeout=15, + ) + resp.raise_for_status() + text = (resp.text or "").strip() + if not text: + raise RuntimeError("代理接口返回为空") + first_line = text.splitlines()[0].strip() + if not first_line: + raise RuntimeError("代理接口返回格式异常:首行为空") + return first_line + + +def _parse_proxy_addr(proxy_text: str) -> tuple[str, int]: + m = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text) + if not m: + raise RuntimeError(f"代理格式不合法(期望 ip:port): {proxy_text}") + host = m.group(1) + port = int(m.group(2)) + if not (1 <= port <= 65535): + raise RuntimeError(f"代理端口不合法: {port}") + return host, port + + +def _connect_page_from_start_data(start_data: dict) -> ChromiumPage: + port = start_data.get("port") + ws = start_data.get("ws") + if port: + co = ChromiumOptions().set_local_port(port=int(port)) + return ChromiumPage(addr_or_opts=co) + if ws: + return ChromiumPage(addr_or_opts=ws) + raise RuntimeError("TgeBrowser 未返回 port 或 ws") + + +def _create_tgebrowser_browser( + client: TgeBrowserClient, + browser_name: str, + start_page_url: str, + proxy_host: str, + proxy_port: int, + mobile_ua: str, +) -> dict: + proxy_candidates = [ + {"protocol": "http", "host": proxy_host, "port": proxy_port}, + {"protocol": "http", "host": proxy_host, "port": str(proxy_port)}, + {"protocol": "http", "server": f"{proxy_host}:{proxy_port}"}, + ] + last_error: Optional[Exception] = None + for proxy_conf in proxy_candidates: + try: + return client.create_browser( + browser_name=browser_name, + start_page_url=start_page_url, + proxy=proxy_conf, + fingerprint={ + "os": "Android", + "platformVersion": 12, + "userAgent": mobile_ua, + }, + ) + except Exception as e: + last_error = e + continue + raise RuntimeError(f"创建浏览器失败(代理配置尝试均失败): {last_error}") + + +def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None: + try: + page.run_cdp("Network.enable") + except Exception: + pass + try: + page.run_cdp("Network.setUserAgentOverride", userAgent=ua) + except Exception: + pass + + +def _close_flow_resources(flow: dict[str, Any]) -> None: + page = flow.get("page") + client = flow.get("client") + env_id = flow.get("env_id") + try: + if page: + page.quit() + except Exception: + pass + try: + if client and env_id is not None: + client.stop_browser(env_id=env_id) + except Exception: + pass + + +def _cleanup_expired_flows() -> None: + now_ts = time.time() + expired: list[dict[str, Any]] = [] + with _flow_lock: + for flow_id, flow in list(_flow_sessions.items()): + if flow.get("busy"): + continue + last_active_ts = float(flow.get("last_active_ts", now_ts)) + if now_ts - last_active_ts > SESSION_IDLE_SECONDS: + expired.append(_flow_sessions.pop(flow_id)) + for flow in expired: + _close_flow_resources(flow) + + +def _flow_cleanup_worker() -> None: + while True: + try: + _cleanup_expired_flows() + except Exception: + pass + time.sleep(15) + + +def _ensure_cleanup_thread() -> None: + global _cleanup_thread_started + with _flow_lock: + if _cleanup_thread_started: + return + th = threading.Thread(target=_flow_cleanup_worker, name="flow_cleanup_worker", daemon=True) + th.start() + _cleanup_thread_started = True + + +@app.on_event("startup") +def _api_startup() -> None: + _ensure_cleanup_thread() + + +@app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse) +def api_submit_phone(req: ApiSubmitPhoneRequest): + _ensure_cleanup_thread() + + client: Optional[TgeBrowserClient] = None + page: Optional[ChromiumPage] = None + env_id: Optional[int] = None + stored = False + + try: + proxy_text = _fetch_proxy_text(req.proxy_api_url) + proxy_host, proxy_port = _parse_proxy_addr(proxy_text) + mobile_ua = _choose_mobile_ua() + + client = TgeBrowserClient() + browser_name = f"tyyp_{req.phone[-4:]}_{int(time.time())}" + create_data = _create_tgebrowser_browser( + client=client, + browser_name=browser_name, + start_page_url=req.url, + proxy_host=proxy_host, + proxy_port=proxy_port, + mobile_ua=mobile_ua, + ) + env_id = create_data.get("envId") + if env_id is None: + raise RuntimeError("创建浏览器失败:未返回 envId") + + start_data = client.start_browser(env_id=env_id) + page = _connect_page_from_start_data(start_data) + _apply_mobile_ua(page, mobile_ua) + + packet_body = submit_phone( + page=page, + phone=req.phone, + url=req.url, + ) + + flow_id = str(uuid.uuid4()) + created = datetime.now() + flow = { + "flow_id": flow_id, + "page": page, + "client": client, + "env_id": env_id, + "phone": req.phone, + "url": req.url, + "proxy": proxy_text, + "ua": mobile_ua, + "created_at": created, + "created_at_iso": created.isoformat(timespec="seconds"), + "created_ts": time.time(), + "last_active_ts": time.time(), + "busy": False, + } + with _flow_lock: + _flow_sessions[flow_id] = flow + stored = True + + return ApiSubmitPhoneResponse( + success=True, + flow_id=flow_id, + data=packet_body, + phone=req.phone, + url=req.url, + created_at=flow["created_at_iso"], + proxy=proxy_text, + ua=mobile_ua, + ) + except Exception as e: + if not stored: + _close_flow_resources({"page": page, "client": client, "env_id": env_id}) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/api/submit_code", response_model=ApiSubmitCodeResponse) +def api_submit_code(req: ApiSubmitCodeRequest): + _ensure_cleanup_thread() + with _flow_lock: + flow = _flow_sessions.get(req.flow_id) + if not flow: + raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {req.flow_id}") + flow["busy"] = True + flow["last_active_ts"] = time.time() + page = flow.get("page") + + if not page: + with _flow_lock: + flow = _flow_sessions.get(req.flow_id) + if flow: + flow["busy"] = False + flow["last_active_ts"] = time.time() + raise HTTPException(status_code=500, detail="流程页面对象缺失") + + try: + packet_body = input_code(page=page, code=req.code) + return ApiSubmitCodeResponse( + success=True, + flow_id=req.flow_id, + data=packet_body, + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + finally: + with _flow_lock: + flow = _flow_sessions.get(req.flow_id) + if flow: + flow["busy"] = False + flow["last_active_ts"] = time.time() + + +@app.get("/api/flow/{flow_id}") +def api_get_flow(flow_id: str): + with _flow_lock: + flow = _flow_sessions.get(flow_id) + if not flow: + raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}") + return { + "success": True, + "flow_id": flow_id, + "phone": flow.get("phone"), + "url": flow.get("url"), + "proxy": flow.get("proxy"), + "ua": flow.get("ua"), + "created_at": flow.get("created_at_iso"), + "last_active_ts": flow.get("last_active_ts"), + } + + +@app.delete("/api/flow/{flow_id}") +def api_close_flow(flow_id: str): + with _flow_lock: + flow = _flow_sessions.pop(flow_id, None) + if not flow: + return {"success": False, "message": "流程不存在"} + _close_flow_resources(flow) + return {"success": True, "message": "流程已关闭"} if __name__ == "__main__": main()