Files
fws_code/tyyp_app/services.py
ddrwode db0fa81b2e haha
2026-02-27 17:36:31 +08:00

304 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
import re
import threading
import time
import uuid
from datetime import datetime
from typing import Any, Optional
import requests
from DrissionPage import ChromiumOptions, ChromiumPage
from tgebrowser_client import TgeBrowserClient
from tyyp_app.automation import input_code, submit_phone
from tyyp_app.config import (
FLOW_CLEANUP_INTERVAL_SECONDS,
MOBILE_UA_POOL,
SESSION_IDLE_SECONDS,
)
class ProxyProvider:
def __init__(self, headers: dict[str, str], timeout: float = 15):
self._headers = headers
self._timeout = timeout
def fetch_proxy_text(self, proxy_api_url: str) -> str:
resp = requests.get(proxy_api_url, headers=self._headers, timeout=self._timeout)
resp.raise_for_status()
text = (resp.text or "").strip()
if not text:
raise RuntimeError("代理接口返回为空")
first_line = text.splitlines()[0].strip()
if not first_line:
raise RuntimeError("代理接口返回格式异常:首行为空")
return first_line
@staticmethod
def parse_proxy_addr(proxy_text: str) -> tuple[str, int]:
match = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text)
if not match:
raise RuntimeError(f"代理格式不合法(期望 ip:port: {proxy_text}")
host = match.group(1)
port = int(match.group(2))
if not (1 <= port <= 65535):
raise RuntimeError(f"代理端口不合法: {port}")
return host, port
class FlowSessionStore:
def __init__(
self,
idle_seconds: int = SESSION_IDLE_SECONDS,
cleanup_interval: int = FLOW_CLEANUP_INTERVAL_SECONDS,
):
self._idle_seconds = idle_seconds
self._cleanup_interval = cleanup_interval
self._sessions: dict[str, dict[str, Any]] = {}
self._lock = threading.Lock()
self._started = False
def start_cleanup_worker(self) -> None:
with self._lock:
if self._started:
return
thread = threading.Thread(target=self._cleanup_loop, name="flow_cleanup_worker", daemon=True)
thread.start()
self._started = True
def put(self, flow_id: str, flow: dict[str, Any]) -> None:
with self._lock:
self._sessions[flow_id] = flow
def get(self, flow_id: str) -> Optional[dict[str, Any]]:
with self._lock:
return self._sessions.get(flow_id)
def mark_busy(self, flow_id: str, busy: bool) -> Optional[dict[str, Any]]:
with self._lock:
flow = self._sessions.get(flow_id)
if not flow:
return None
flow["busy"] = busy
flow["last_active_ts"] = time.time()
return flow
def touch(self, flow_id: str) -> None:
with self._lock:
flow = self._sessions.get(flow_id)
if flow:
flow["last_active_ts"] = time.time()
def pop(self, flow_id: str) -> Optional[dict[str, Any]]:
with self._lock:
return self._sessions.pop(flow_id, None)
def _cleanup_loop(self) -> None:
while True:
expired: list[dict[str, Any]] = []
now_ts = time.time()
with self._lock:
for flow_id, flow in list(self._sessions.items()):
if flow.get("busy"):
continue
last_active_ts = float(flow.get("last_active_ts", now_ts))
if now_ts - last_active_ts > self._idle_seconds:
expired.append(self._sessions.pop(flow_id))
for flow in expired:
FlowService.close_flow_resources(flow)
time.sleep(self._cleanup_interval)
class FlowService:
def __init__(self, session_store: FlowSessionStore, proxy_provider: ProxyProvider):
self._store = session_store
self._proxy_provider = proxy_provider
@staticmethod
def _choose_mobile_ua() -> str:
return random.choice(MOBILE_UA_POOL)
@staticmethod
def _infer_fingerprint_os(mobile_ua: str) -> str:
ua = (mobile_ua or "").lower()
if "iphone" in ua or "ipad" in ua or "cpu iphone os" in ua:
return "iOS"
return "Android"
@staticmethod
def _connect_page_from_start_data(start_data: dict[str, Any]) -> ChromiumPage:
port = start_data.get("port")
ws = start_data.get("ws")
if port:
co = ChromiumOptions().set_local_port(port=int(port))
return ChromiumPage(addr_or_opts=co)
if ws:
return ChromiumPage(addr_or_opts=ws)
raise RuntimeError("TgeBrowser 未返回 port 或 ws")
@staticmethod
def _create_tgebrowser_browser(
client: TgeBrowserClient,
browser_name: str,
start_page_url: str,
proxy_host: str,
proxy_port: int,
mobile_ua: str,
) -> dict[str, Any]:
fingerprint_os = FlowService._infer_fingerprint_os(mobile_ua)
proxy_candidates = [
{"protocol": "http", "host": proxy_host, "port": proxy_port},
{"protocol": "http", "host": proxy_host, "port": str(proxy_port)},
]
fingerprint_candidates = [
{"os": fingerprint_os, "userAgent": mobile_ua},
{"os": fingerprint_os},
{"os": "Android"},
]
last_error: Optional[Exception] = None
errors: list[str] = []
for proxy_conf in proxy_candidates:
for fingerprint_conf in fingerprint_candidates:
try:
return client.create_browser(
browser_name=browser_name,
start_page_url=start_page_url,
proxy=proxy_conf,
fingerprint=fingerprint_conf,
)
except Exception as exc:
last_error = exc
errors.append(
f"proxy={proxy_conf}, fingerprint={fingerprint_conf}, error={exc}"
)
continue
detail = "; ".join(errors[-4:]) if errors else str(last_error)
raise RuntimeError(f"创建浏览器失败(代理/指纹配置尝试均失败): {detail}")
@staticmethod
def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None:
try:
page.run_cdp("Network.enable")
except Exception:
pass
try:
page.run_cdp("Network.setUserAgentOverride", userAgent=ua)
except Exception:
pass
@staticmethod
def close_flow_resources(flow: dict[str, Any]) -> None:
page = flow.get("page")
client = flow.get("client")
env_id = flow.get("env_id")
try:
if page:
page.quit()
except Exception:
pass
try:
if client and env_id is not None:
client.stop_browser(env_id=env_id)
except Exception:
pass
def submit_phone(self, phone: str, url: str, proxy_api_url: str) -> tuple[dict[str, Any], Any]:
client: Optional[TgeBrowserClient] = None
page: Optional[ChromiumPage] = None
env_id: Optional[int] = None
proxy_text = ""
mobile_ua = ""
try:
proxy_text = self._proxy_provider.fetch_proxy_text(proxy_api_url)
proxy_host, proxy_port = self._proxy_provider.parse_proxy_addr(proxy_text)
mobile_ua = self._choose_mobile_ua()
client = TgeBrowserClient()
phone_tail = phone[-4:] if len(phone) >= 4 else phone
browser_name = f"tyyp_{phone_tail}_{int(time.time())}"
create_data = self._create_tgebrowser_browser(
client=client,
browser_name=browser_name,
start_page_url=url,
proxy_host=proxy_host,
proxy_port=proxy_port,
mobile_ua=mobile_ua,
)
env_id = create_data.get("envId")
if env_id is None:
raise RuntimeError("创建浏览器失败:未返回 envId")
start_data = client.start_browser(env_id=env_id)
page = self._connect_page_from_start_data(start_data)
self._apply_mobile_ua(page, mobile_ua)
packet_body = submit_phone(page=page, phone=phone, url=url)
flow_id = str(uuid.uuid4())
created = datetime.now()
flow = {
"flow_id": flow_id,
"page": page,
"client": client,
"env_id": env_id,
"phone": phone,
"url": url,
"proxy": proxy_text,
"ua": mobile_ua,
"created_at": created,
"created_at_iso": created.isoformat(timespec="seconds"),
"last_active_ts": time.time(),
"busy": False,
}
self._store.put(flow_id, flow)
return flow, packet_body
except Exception:
self.close_flow_resources({"page": page, "client": client, "env_id": env_id})
raise
def submit_code(self, flow_id: str, code: str) -> Any:
flow = self._store.mark_busy(flow_id, True)
if not flow:
raise KeyError(f"流程不存在或已过期: {flow_id}")
page = flow.get("page")
if not page:
self._store.mark_busy(flow_id, False)
raise RuntimeError("流程页面对象缺失")
try:
return input_code(page=page, code=code)
finally:
self._store.mark_busy(flow_id, False)
self._store.touch(flow_id)
def get_flow_meta(self, flow_id: str) -> Optional[dict[str, Any]]:
flow = self._store.get(flow_id)
if not flow:
return None
return {
"flow_id": flow_id,
"phone": flow.get("phone"),
"url": flow.get("url"),
"proxy": flow.get("proxy"),
"ua": flow.get("ua"),
"created_at": flow.get("created_at_iso"),
"last_active_ts": flow.get("last_active_ts"),
"busy": flow.get("busy", False),
}
def close_flow(self, flow_id: str) -> bool:
flow = self._store.pop(flow_id)
if not flow:
return False
self.close_flow_resources(flow)
return True