#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 云翼云盘 API:基于 云翼云盘.py 的 FastAPI 接口 - POST /api/send_code 传入手机号、url,创建 TgeBrowser 指纹浏览器,执行滑块发送验证码,返回抓包内容 + flow_id - POST /api/submit_code 传入 flow_id、验证码,执行 input_code,返回抓包内容 浏览器环境:使用 TgeBrowser 指纹浏览器、随机手机 UA、HTTP 代理(从代理接口获取) 流程超时:若 3 分钟内无调用,自动关闭浏览器 """ from __future__ import annotations import random import threading import time import uuid from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional import requests from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field # 代理接口地址 PROXY_API_URL = "http://47.109.106.79:7002/ProxIpServiceTxt" PROXY_HEADERS = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Cache-Control": "no-cache", "DNT": "1", "Pragma": "no-cache", "Proxy-Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0", } # 随机手机 UA 池 MOBILE_UAS = [ "Mozilla/5.0 (Linux; Android 13; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 17_2_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (Linux; Android 12; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (Linux; Android 13; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 11; M2102J2SC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6 Mobile/15E148 Safari/604.1", ] # 流程空闲超时(秒) FLOW_IDLE_TIMEOUT = 180 def _fetch_proxy() -> tuple[str, int]: """从代理接口获取 ip:port,返回 (host, port)。""" resp = requests.get(PROXY_API_URL, headers=PROXY_HEADERS, timeout=10, verify=False) resp.raise_for_status() text = (resp.text or "").strip() if ":" not in text: raise RuntimeError(f"代理接口返回格式异常: {text[:100]}") host, _, port_str = text.partition(":") port = int(port_str.strip()) return host.strip(), port def _random_mobile_ua() -> str: return random.choice(MOBILE_UAS) @dataclass class FlowRecord: flow_id: str url: str phone: str created_at: datetime last_activity: float page: Any env_id: int client: Any capture_data: Any = None _flows: dict[str, FlowRecord] = {} _flows_lock = threading.Lock() _executor = ThreadPoolExecutor(max_workers=8) def _create_browser_with_proxy(proxy_host: str, proxy_port: int, user_agent: str, start_url: str) -> tuple[Any, int, Any]: """创建带代理和 UA 的 TgeBrowser,返回 (page, env_id, client)。""" from tgebrowser_client import TgeBrowserClient from DrissionPage import ChromiumOptions, ChromiumPage client = TgeBrowserClient() proxy = { "protocol": "http", "host": proxy_host, "port": proxy_port, } fingerprint = { "os": "Android", "platformVersion": 12, "userAgent": user_agent, "blockLargeImages": True, "maxImageKB": 0, "startupParams": "--blink-settings=imagesEnabled=false", } create_data = client.create_browser( browser_name=f"yunyiyunpan_{uuid.uuid4().hex[:8]}", start_page_url=start_url, proxy=proxy, fingerprint=fingerprint, ) env_id = create_data.get("envId") if env_id is None: raise RuntimeError("TgeBrowser 创建失败:未返回 envId") start_data = client.start_browser(env_id=env_id) start_data["envId"] = env_id port = start_data.get("port") if port is None: ws = start_data.get("ws") or "" port_str = ws.split(":")[-1].split("/")[0] try: port = int(port_str) except ValueError: raise RuntimeError(f"无法解析调试端口: {start_data}") if isinstance(port, str): port = int(port) co = ChromiumOptions().set_local_port(port=int(port)) page = ChromiumPage(addr_or_opts=co) return page, env_id, client def _run_send_code(phone: str, url: str) -> tuple[str, Any, FlowRecord]: """在后台线程中执行:获取代理、创建浏览器、执行 submit_phone,返回 (flow_id, capture_data, flow_record)。""" import importlib.util from pathlib import Path mod_path = Path(__file__).resolve().parent / "云翼云盘.py" spec = importlib.util.spec_from_file_location("yunyiyunpan", mod_path) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) submit_phone = mod.submit_phone proxy_host, proxy_port = _fetch_proxy() ua = _random_mobile_ua() page, env_id, client = _create_browser_with_proxy(proxy_host, proxy_port, ua, url) body = submit_phone(page=page, phone=phone, url=url) flow_id = uuid.uuid4().hex now = datetime.now() last_activity = time.time() record = FlowRecord( flow_id=flow_id, url=url, phone=phone, created_at=now, last_activity=last_activity, page=page, env_id=env_id, client=client, capture_data=body, ) with _flows_lock: _flows[flow_id] = record return flow_id, body, record def _run_input_code(flow_id: str, code: str) -> Any: """在后台线程中执行 input_code,返回抓包内容。""" import importlib.util from pathlib import Path mod_path = Path(__file__).resolve().parent / "云翼云盘.py" spec = importlib.util.spec_from_file_location("yunyiyunpan", mod_path) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) input_code = mod.input_code with _flows_lock: record = _flows.get(flow_id) if record is None: raise ValueError(f"流程不存在或已过期: {flow_id}") record.last_activity = time.time() result = input_code(page=record.page, code=str(code)) return result def _close_flow(flow_id: str) -> None: """关闭指定流程的浏览器。""" with _flows_lock: record = _flows.pop(flow_id, None) if record is None: return try: record.client.stop_browser(env_id=record.env_id) except Exception: pass def _cleanup_idle_flows() -> None: """关闭超时的流程。""" now = time.time() to_close = [] with _flows_lock: for fid, rec in list(_flows.items()): if now - rec.last_activity >= FLOW_IDLE_TIMEOUT: to_close.append(fid) for fid in to_close: _flows.pop(fid, None) for fid in to_close: try: rec = None with _flows_lock: pass # 已移除 # 需要从某处拿到 client/env_id,但 record 已移除,所以在 pop 前保存 pass except Exception: pass # 重新实现:在 pop 之前先保存需要 stop 的 client 和 env_id to_stop = [] with _flows_lock: for fid, rec in list(_flows.items()): if now - rec.last_activity >= FLOW_IDLE_TIMEOUT: to_stop.append((rec.client, rec.env_id)) del _flows[fid] for client, env_id in to_stop: try: client.stop_browser(env_id=env_id) except Exception: pass # ---------- FastAPI ---------- app = FastAPI(title="云翼云盘 API", description="天翼云盘订购页自动化接口(TgeBrowser + 代理)") class SendCodeRequest(BaseModel): phone: str = Field(..., description="手机号码") url: str = Field("http://yscnb.com/tyyp/", description="目标页面 URL(哪个界面注入)") class SendCodeResponse(BaseModel): success: bool = True flow_id: str = Field(..., description="流程唯一标识符") capture_data: Any = Field(..., description="getYanZhenMa/v2 抓包内容") url: str = Field(..., description="当前流程的 URL") phone: str = Field(..., description="手机号") created_at: str = Field(..., description="创建时间") class SubmitCodeRequest(BaseModel): flow_id: str = Field(..., description="第一个接口返回的唯一标识符") code: str = Field(..., description="短信验证码") class SubmitCodeResponse(BaseModel): success: bool = True capture_data: Any = Field(..., description="hn_userEquitys/common/order 抓包内容") @app.on_event("startup") def start_cleanup_task(): def _loop(): while True: time.sleep(60) try: _cleanup_idle_flows() except Exception: pass t = threading.Thread(target=_loop, daemon=True) t.start() @app.post("/api/send_code", response_model=SendCodeResponse) def api_send_code(req: SendCodeRequest): """ 提交手机号并发送验证码:使用 TgeBrowser 指纹浏览器(随机手机 UA + HTTP 代理), 打开页面,执行滑块,点击获取验证码,返回抓包内容和 flow_id。 浏览器保持打开,若 3 分钟内未调用 submit_code 将自动关闭。 """ try: future = _executor.submit(_run_send_code, req.phone, req.url) flow_id, capture_data, record = future.result(timeout=120) return SendCodeResponse( success=True, flow_id=flow_id, capture_data=capture_data, url=record.url, phone=record.phone, created_at=record.created_at.isoformat(), ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/submit_code", response_model=SubmitCodeResponse) def api_submit_code(req: SubmitCodeRequest): """ 提交验证码:根据 flow_id 找到对应浏览器,运行 input_code 输入验证码并提交, 返回 hn_userEquitys/common/order 抓包内容。 """ try: _cleanup_idle_flows() future = _executor.submit(_run_input_code, req.flow_id, req.code) capture_data = future.result(timeout=30) return SubmitCodeResponse(success=True, capture_data=capture_data) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8002)