#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import random import re import threading import time import uuid from datetime import datetime from typing import Any, Optional import requests from DrissionPage import ChromiumOptions, ChromiumPage from tgebrowser_client import TgeBrowserClient from tyyp_app.automation import input_code, submit_phone from tyyp_app.config import ( FLOW_CLEANUP_INTERVAL_SECONDS, MOBILE_UA_POOL, SESSION_IDLE_SECONDS, ) class ProxyProvider: def __init__(self, headers: dict[str, str], timeout: float = 15): self._headers = headers self._timeout = timeout def fetch_proxy_text(self, proxy_api_url: str) -> str: resp = requests.get(proxy_api_url, headers=self._headers, timeout=self._timeout) resp.raise_for_status() text = (resp.text or "").strip() if not text: raise RuntimeError("代理接口返回为空") first_line = text.splitlines()[0].strip() if not first_line: raise RuntimeError("代理接口返回格式异常:首行为空") return first_line @staticmethod def parse_proxy_addr(proxy_text: str) -> tuple[str, int]: match = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text) if not match: raise RuntimeError(f"代理格式不合法(期望 ip:port): {proxy_text}") host = match.group(1) port = int(match.group(2)) if not (1 <= port <= 65535): raise RuntimeError(f"代理端口不合法: {port}") return host, port class FlowSessionStore: def __init__( self, idle_seconds: int = SESSION_IDLE_SECONDS, cleanup_interval: int = FLOW_CLEANUP_INTERVAL_SECONDS, ): self._idle_seconds = idle_seconds self._cleanup_interval = cleanup_interval self._sessions: dict[str, dict[str, Any]] = {} self._lock = threading.Lock() self._started = False def start_cleanup_worker(self) -> None: with self._lock: if self._started: return thread = threading.Thread(target=self._cleanup_loop, name="flow_cleanup_worker", daemon=True) thread.start() self._started = True def put(self, flow_id: str, flow: dict[str, Any]) -> None: with self._lock: self._sessions[flow_id] = flow def get(self, flow_id: str) -> Optional[dict[str, Any]]: with self._lock: return self._sessions.get(flow_id) def mark_busy(self, flow_id: str, busy: bool) -> Optional[dict[str, Any]]: with self._lock: flow = self._sessions.get(flow_id) if not flow: return None flow["busy"] = busy flow["last_active_ts"] = time.time() return flow def touch(self, flow_id: str) -> None: with self._lock: flow = self._sessions.get(flow_id) if flow: flow["last_active_ts"] = time.time() def pop(self, flow_id: str) -> Optional[dict[str, Any]]: with self._lock: return self._sessions.pop(flow_id, None) def _cleanup_loop(self) -> None: while True: expired: list[dict[str, Any]] = [] now_ts = time.time() with self._lock: for flow_id, flow in list(self._sessions.items()): if flow.get("busy"): continue last_active_ts = float(flow.get("last_active_ts", now_ts)) if now_ts - last_active_ts > self._idle_seconds: expired.append(self._sessions.pop(flow_id)) for flow in expired: FlowService.close_flow_resources(flow) time.sleep(self._cleanup_interval) class FlowService: def __init__(self, session_store: FlowSessionStore, proxy_provider: ProxyProvider): self._store = session_store self._proxy_provider = proxy_provider @staticmethod def _choose_mobile_ua() -> str: return random.choice(MOBILE_UA_POOL) @staticmethod def _infer_fingerprint_os(mobile_ua: str) -> str: ua = (mobile_ua or "").lower() if "iphone" in ua or "ipad" in ua or "cpu iphone os" in ua: return "iOS" return "Android" @staticmethod def _connect_page_from_start_data(start_data: dict[str, Any]) -> ChromiumPage: port = start_data.get("port") ws = start_data.get("ws") if port: co = ChromiumOptions().set_local_port(port=int(port)) return ChromiumPage(addr_or_opts=co) if ws: return ChromiumPage(addr_or_opts=ws) raise RuntimeError("TgeBrowser 未返回 port 或 ws") @staticmethod def _create_tgebrowser_browser( client: TgeBrowserClient, browser_name: str, start_page_url: str, proxy_host: str, proxy_port: int, mobile_ua: str, ) -> dict[str, Any]: fingerprint_os = FlowService._infer_fingerprint_os(mobile_ua) proxy_candidates = [ {"protocol": "http", "host": proxy_host, "port": proxy_port}, {"protocol": "http", "host": proxy_host, "port": str(proxy_port)}, ] fingerprint_candidates = [ {"os": fingerprint_os, "userAgent": mobile_ua}, {"os": fingerprint_os}, {"os": "Android"}, ] last_error: Optional[Exception] = None errors: list[str] = [] for proxy_conf in proxy_candidates: for fingerprint_conf in fingerprint_candidates: try: return client.create_browser( browser_name=browser_name, start_page_url=start_page_url, proxy=proxy_conf, fingerprint=fingerprint_conf, ) except Exception as exc: last_error = exc errors.append( f"proxy={proxy_conf}, fingerprint={fingerprint_conf}, error={exc}" ) continue detail = "; ".join(errors[-4:]) if errors else str(last_error) raise RuntimeError(f"创建浏览器失败(代理/指纹配置尝试均失败): {detail}") @staticmethod def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None: try: page.run_cdp("Network.enable") except Exception: pass try: page.run_cdp("Network.setUserAgentOverride", userAgent=ua) except Exception: pass @staticmethod def close_flow_resources(flow: dict[str, Any]) -> None: page = flow.get("page") client = flow.get("client") env_id = flow.get("env_id") try: if page: page.quit() except Exception: pass try: if client and env_id is not None: client.stop_browser(env_id=env_id) except Exception: pass def submit_phone(self, phone: str, url: str, proxy_api_url: str) -> tuple[dict[str, Any], Any]: client: Optional[TgeBrowserClient] = None page: Optional[ChromiumPage] = None env_id: Optional[int] = None proxy_text = "" mobile_ua = "" try: proxy_text = self._proxy_provider.fetch_proxy_text(proxy_api_url) proxy_host, proxy_port = self._proxy_provider.parse_proxy_addr(proxy_text) mobile_ua = self._choose_mobile_ua() client = TgeBrowserClient() phone_tail = phone[-4:] if len(phone) >= 4 else phone browser_name = f"tyyp_{phone_tail}_{int(time.time())}" create_data = self._create_tgebrowser_browser( client=client, browser_name=browser_name, start_page_url=url, proxy_host=proxy_host, proxy_port=proxy_port, mobile_ua=mobile_ua, ) env_id = create_data.get("envId") if env_id is None: raise RuntimeError("创建浏览器失败:未返回 envId") start_data = client.start_browser(env_id=env_id) page = self._connect_page_from_start_data(start_data) self._apply_mobile_ua(page, mobile_ua) packet_body = submit_phone(page=page, phone=phone, url=url) flow_id = str(uuid.uuid4()) created = datetime.now() flow = { "flow_id": flow_id, "page": page, "client": client, "env_id": env_id, "phone": phone, "url": url, "proxy": proxy_text, "ua": mobile_ua, "created_at": created, "created_at_iso": created.isoformat(timespec="seconds"), "last_active_ts": time.time(), "busy": False, } self._store.put(flow_id, flow) return flow, packet_body except Exception: self.close_flow_resources({"page": page, "client": client, "env_id": env_id}) raise def submit_code(self, flow_id: str, code: str) -> Any: flow = self._store.mark_busy(flow_id, True) if not flow: raise KeyError(f"流程不存在或已过期: {flow_id}") page = flow.get("page") if not page: self._store.mark_busy(flow_id, False) raise RuntimeError("流程页面对象缺失") try: return input_code(page=page, code=code) finally: self._store.mark_busy(flow_id, False) self._store.touch(flow_id) def get_flow_meta(self, flow_id: str) -> Optional[dict[str, Any]]: flow = self._store.get(flow_id) if not flow: return None return { "flow_id": flow_id, "phone": flow.get("phone"), "url": flow.get("url"), "proxy": flow.get("proxy"), "ua": flow.get("ua"), "created_at": flow.get("created_at_iso"), "last_active_ts": flow.get("last_active_ts"), "busy": flow.get("busy", False), } def close_flow(self, flow_id: str) -> bool: flow = self._store.pop(flow_id) if not flow: return False self.close_flow_resources(flow) return True