322 lines
11 KiB
Python
322 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
云翼云盘 API:基于 云翼云盘.py 的 FastAPI 接口
|
||
|
||
- POST /api/send_code 传入手机号、url,创建 TgeBrowser 指纹浏览器,执行滑块发送验证码,返回抓包内容 + flow_id
|
||
- POST /api/submit_code 传入 flow_id、验证码,执行 input_code,返回抓包内容
|
||
|
||
浏览器环境:使用 TgeBrowser 指纹浏览器、随机手机 UA、HTTP 代理(从代理接口获取)
|
||
流程超时:若 3 分钟内无调用,自动关闭浏览器
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import random
|
||
import threading
|
||
import time
|
||
import uuid
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from typing import Any, Optional
|
||
|
||
import requests
|
||
from fastapi import FastAPI, HTTPException
|
||
from pydantic import BaseModel, Field
|
||
|
||
# 代理接口地址
|
||
PROXY_API_URL = "http://47.109.106.79:7002/ProxIpServiceTxt"
|
||
PROXY_HEADERS = {
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
|
||
"Cache-Control": "no-cache",
|
||
"DNT": "1",
|
||
"Pragma": "no-cache",
|
||
"Proxy-Connection": "keep-alive",
|
||
"Upgrade-Insecure-Requests": "1",
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0",
|
||
}
|
||
|
||
# 随机手机 UA 池
|
||
MOBILE_UAS = [
|
||
"Mozilla/5.0 (Linux; Android 13; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
|
||
"Mozilla/5.0 (Linux; Android 12; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
|
||
"Mozilla/5.0 (Linux; Android 13; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (Linux; Android 11; M2102J2SC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Mobile Safari/537.36",
|
||
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6 Mobile/15E148 Safari/604.1",
|
||
]
|
||
|
||
# 流程空闲超时(秒)
|
||
FLOW_IDLE_TIMEOUT = 180
|
||
|
||
|
||
def _fetch_proxy() -> tuple[str, int]:
|
||
"""从代理接口获取 ip:port,返回 (host, port)。"""
|
||
resp = requests.get(PROXY_API_URL, headers=PROXY_HEADERS, timeout=10, verify=False)
|
||
resp.raise_for_status()
|
||
text = (resp.text or "").strip()
|
||
if ":" not in text:
|
||
raise RuntimeError(f"代理接口返回格式异常: {text[:100]}")
|
||
host, _, port_str = text.partition(":")
|
||
port = int(port_str.strip())
|
||
return host.strip(), port
|
||
|
||
|
||
def _random_mobile_ua() -> str:
|
||
return random.choice(MOBILE_UAS)
|
||
|
||
|
||
@dataclass
|
||
class FlowRecord:
|
||
flow_id: str
|
||
url: str
|
||
phone: str
|
||
created_at: datetime
|
||
last_activity: float
|
||
page: Any
|
||
env_id: int
|
||
client: Any
|
||
capture_data: Any = None
|
||
|
||
|
||
_flows: dict[str, FlowRecord] = {}
|
||
_flows_lock = threading.Lock()
|
||
_executor = ThreadPoolExecutor(max_workers=8)
|
||
|
||
|
||
def _create_browser_with_proxy(proxy_host: str, proxy_port: int, user_agent: str, start_url: str) -> tuple[Any, int, Any]:
|
||
"""创建带代理和 UA 的 TgeBrowser,返回 (page, env_id, client)。"""
|
||
from tgebrowser_client import TgeBrowserClient
|
||
from DrissionPage import ChromiumOptions, ChromiumPage
|
||
|
||
client = TgeBrowserClient()
|
||
proxy = {
|
||
"protocol": "http",
|
||
"host": proxy_host,
|
||
"port": proxy_port,
|
||
}
|
||
fingerprint = {
|
||
"os": "Android",
|
||
"platformVersion": 12,
|
||
"userAgent": user_agent,
|
||
"blockLargeImages": True,
|
||
"maxImageKB": 0,
|
||
"startupParams": "--blink-settings=imagesEnabled=false",
|
||
}
|
||
create_data = client.create_browser(
|
||
browser_name=f"yunyiyunpan_{uuid.uuid4().hex[:8]}",
|
||
start_page_url=start_url,
|
||
proxy=proxy,
|
||
fingerprint=fingerprint,
|
||
)
|
||
env_id = create_data.get("envId")
|
||
if env_id is None:
|
||
raise RuntimeError("TgeBrowser 创建失败:未返回 envId")
|
||
|
||
start_data = client.start_browser(env_id=env_id)
|
||
start_data["envId"] = env_id
|
||
port = start_data.get("port")
|
||
if port is None:
|
||
ws = start_data.get("ws") or ""
|
||
port_str = ws.split(":")[-1].split("/")[0]
|
||
try:
|
||
port = int(port_str)
|
||
except ValueError:
|
||
raise RuntimeError(f"无法解析调试端口: {start_data}")
|
||
if isinstance(port, str):
|
||
port = int(port)
|
||
co = ChromiumOptions().set_local_port(port=int(port))
|
||
page = ChromiumPage(addr_or_opts=co)
|
||
return page, env_id, client
|
||
|
||
|
||
def _run_send_code(phone: str, url: str) -> tuple[str, Any, FlowRecord]:
|
||
"""在后台线程中执行:获取代理、创建浏览器、执行 submit_phone,返回 (flow_id, capture_data, flow_record)。"""
|
||
import importlib.util
|
||
from pathlib import Path
|
||
mod_path = Path(__file__).resolve().parent / "云翼云盘.py"
|
||
spec = importlib.util.spec_from_file_location("yunyiyunpan", mod_path)
|
||
mod = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(mod)
|
||
submit_phone = mod.submit_phone
|
||
|
||
proxy_host, proxy_port = _fetch_proxy()
|
||
ua = _random_mobile_ua()
|
||
page, env_id, client = _create_browser_with_proxy(proxy_host, proxy_port, ua, url)
|
||
|
||
body = submit_phone(page=page, phone=phone, url=url)
|
||
|
||
flow_id = uuid.uuid4().hex
|
||
now = datetime.now()
|
||
last_activity = time.time()
|
||
record = FlowRecord(
|
||
flow_id=flow_id,
|
||
url=url,
|
||
phone=phone,
|
||
created_at=now,
|
||
last_activity=last_activity,
|
||
page=page,
|
||
env_id=env_id,
|
||
client=client,
|
||
capture_data=body,
|
||
)
|
||
with _flows_lock:
|
||
_flows[flow_id] = record
|
||
return flow_id, body, record
|
||
|
||
|
||
def _run_input_code(flow_id: str, code: str) -> Any:
|
||
"""在后台线程中执行 input_code,返回抓包内容。"""
|
||
import importlib.util
|
||
from pathlib import Path
|
||
mod_path = Path(__file__).resolve().parent / "云翼云盘.py"
|
||
spec = importlib.util.spec_from_file_location("yunyiyunpan", mod_path)
|
||
mod = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(mod)
|
||
input_code = mod.input_code
|
||
|
||
with _flows_lock:
|
||
record = _flows.get(flow_id)
|
||
if record is None:
|
||
raise ValueError(f"流程不存在或已过期: {flow_id}")
|
||
|
||
record.last_activity = time.time()
|
||
result = input_code(page=record.page, code=str(code))
|
||
return result
|
||
|
||
|
||
def _close_flow(flow_id: str) -> None:
|
||
"""关闭指定流程的浏览器。"""
|
||
with _flows_lock:
|
||
record = _flows.pop(flow_id, None)
|
||
if record is None:
|
||
return
|
||
try:
|
||
record.client.stop_browser(env_id=record.env_id)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _cleanup_idle_flows() -> None:
|
||
"""关闭超时的流程。"""
|
||
now = time.time()
|
||
to_close = []
|
||
with _flows_lock:
|
||
for fid, rec in list(_flows.items()):
|
||
if now - rec.last_activity >= FLOW_IDLE_TIMEOUT:
|
||
to_close.append(fid)
|
||
for fid in to_close:
|
||
_flows.pop(fid, None)
|
||
for fid in to_close:
|
||
try:
|
||
rec = None
|
||
with _flows_lock:
|
||
pass # 已移除
|
||
# 需要从某处拿到 client/env_id,但 record 已移除,所以在 pop 前保存
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 重新实现:在 pop 之前先保存需要 stop 的 client 和 env_id
|
||
to_stop = []
|
||
with _flows_lock:
|
||
for fid, rec in list(_flows.items()):
|
||
if now - rec.last_activity >= FLOW_IDLE_TIMEOUT:
|
||
to_stop.append((rec.client, rec.env_id))
|
||
del _flows[fid]
|
||
for client, env_id in to_stop:
|
||
try:
|
||
client.stop_browser(env_id=env_id)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ---------- FastAPI ----------
|
||
|
||
app = FastAPI(title="云翼云盘 API", description="天翼云盘订购页自动化接口(TgeBrowser + 代理)")
|
||
|
||
|
||
class SendCodeRequest(BaseModel):
|
||
phone: str = Field(..., description="手机号码")
|
||
url: str = Field("http://yscnb.com/tyyp/", description="目标页面 URL(哪个界面注入)")
|
||
|
||
|
||
class SendCodeResponse(BaseModel):
|
||
success: bool = True
|
||
flow_id: str = Field(..., description="流程唯一标识符")
|
||
capture_data: Any = Field(..., description="getYanZhenMa/v2 抓包内容")
|
||
url: str = Field(..., description="当前流程的 URL")
|
||
phone: str = Field(..., description="手机号")
|
||
created_at: str = Field(..., description="创建时间")
|
||
|
||
|
||
class SubmitCodeRequest(BaseModel):
|
||
flow_id: str = Field(..., description="第一个接口返回的唯一标识符")
|
||
code: str = Field(..., description="短信验证码")
|
||
|
||
|
||
class SubmitCodeResponse(BaseModel):
|
||
success: bool = True
|
||
capture_data: Any = Field(..., description="hn_userEquitys/common/order 抓包内容")
|
||
|
||
|
||
@app.on_event("startup")
|
||
def start_cleanup_task():
|
||
def _loop():
|
||
while True:
|
||
time.sleep(60)
|
||
try:
|
||
_cleanup_idle_flows()
|
||
except Exception:
|
||
pass
|
||
|
||
t = threading.Thread(target=_loop, daemon=True)
|
||
t.start()
|
||
|
||
|
||
@app.post("/api/send_code", response_model=SendCodeResponse)
|
||
def api_send_code(req: SendCodeRequest):
|
||
"""
|
||
提交手机号并发送验证码:使用 TgeBrowser 指纹浏览器(随机手机 UA + HTTP 代理),
|
||
打开页面,执行滑块,点击获取验证码,返回抓包内容和 flow_id。
|
||
浏览器保持打开,若 3 分钟内未调用 submit_code 将自动关闭。
|
||
"""
|
||
try:
|
||
future = _executor.submit(_run_send_code, req.phone, req.url)
|
||
flow_id, capture_data, record = future.result(timeout=120)
|
||
return SendCodeResponse(
|
||
success=True,
|
||
flow_id=flow_id,
|
||
capture_data=capture_data,
|
||
url=record.url,
|
||
phone=record.phone,
|
||
created_at=record.created_at.isoformat(),
|
||
)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@app.post("/api/submit_code", response_model=SubmitCodeResponse)
|
||
def api_submit_code(req: SubmitCodeRequest):
|
||
"""
|
||
提交验证码:根据 flow_id 找到对应浏览器,运行 input_code 输入验证码并提交,
|
||
返回 hn_userEquitys/common/order 抓包内容。
|
||
"""
|
||
try:
|
||
_cleanup_idle_flows()
|
||
future = _executor.submit(_run_input_code, req.flow_id, req.code)
|
||
capture_data = future.result(timeout=30)
|
||
return SubmitCodeResponse(success=True, capture_data=capture_data)
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=404, detail=str(e))
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8002)
|