Files
fws_code/api_yunyiyunpan.py
ddrwode a159ea31cc haha
2026-02-27 18:34:17 +08:00

322 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
云翼云盘 API基于 云翼云盘.py 的 FastAPI 接口
- POST /api/send_code 传入手机号、url创建 TgeBrowser 指纹浏览器,执行滑块发送验证码,返回抓包内容 + flow_id
- POST /api/submit_code 传入 flow_id、验证码执行 input_code返回抓包内容
浏览器环境:使用 TgeBrowser 指纹浏览器、随机手机 UA、HTTP 代理(从代理接口获取)
流程超时:若 3 分钟内无调用,自动关闭浏览器
"""
from __future__ import annotations
import random
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
# 代理接口地址
PROXY_API_URL = "http://47.109.106.79:7002/ProxIpServiceTxt"
PROXY_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "no-cache",
"DNT": "1",
"Pragma": "no-cache",
"Proxy-Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0",
}
# 随机手机 UA 池
MOBILE_UAS = [
"Mozilla/5.0 (Linux; Android 13; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_2_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 12; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 13; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 11; M2102J2SC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6 Mobile/15E148 Safari/604.1",
]
# 流程空闲超时(秒)
FLOW_IDLE_TIMEOUT = 180
def _fetch_proxy() -> tuple[str, int]:
"""从代理接口获取 ip:port返回 (host, port)。"""
resp = requests.get(PROXY_API_URL, headers=PROXY_HEADERS, timeout=10, verify=False)
resp.raise_for_status()
text = (resp.text or "").strip()
if ":" not in text:
raise RuntimeError(f"代理接口返回格式异常: {text[:100]}")
host, _, port_str = text.partition(":")
port = int(port_str.strip())
return host.strip(), port
def _random_mobile_ua() -> str:
return random.choice(MOBILE_UAS)
@dataclass
class FlowRecord:
flow_id: str
url: str
phone: str
created_at: datetime
last_activity: float
page: Any
env_id: int
client: Any
capture_data: Any = None
_flows: dict[str, FlowRecord] = {}
_flows_lock = threading.Lock()
_executor = ThreadPoolExecutor(max_workers=8)
def _create_browser_with_proxy(proxy_host: str, proxy_port: int, user_agent: str, start_url: str) -> tuple[Any, int, Any]:
"""创建带代理和 UA 的 TgeBrowser返回 (page, env_id, client)。"""
from tgebrowser_client import TgeBrowserClient
from DrissionPage import ChromiumOptions, ChromiumPage
client = TgeBrowserClient()
proxy = {
"protocol": "http",
"host": proxy_host,
"port": proxy_port,
}
fingerprint = {
"os": "Android",
"platformVersion": 12,
"userAgent": user_agent,
"blockLargeImages": True,
"maxImageKB": 0,
"startupParams": "--blink-settings=imagesEnabled=false",
}
create_data = client.create_browser(
browser_name=f"yunyiyunpan_{uuid.uuid4().hex[:8]}",
start_page_url=start_url,
proxy=proxy,
fingerprint=fingerprint,
)
env_id = create_data.get("envId")
if env_id is None:
raise RuntimeError("TgeBrowser 创建失败:未返回 envId")
start_data = client.start_browser(env_id=env_id)
start_data["envId"] = env_id
port = start_data.get("port")
if port is None:
ws = start_data.get("ws") or ""
port_str = ws.split(":")[-1].split("/")[0]
try:
port = int(port_str)
except ValueError:
raise RuntimeError(f"无法解析调试端口: {start_data}")
if isinstance(port, str):
port = int(port)
co = ChromiumOptions().set_local_port(port=int(port))
page = ChromiumPage(addr_or_opts=co)
return page, env_id, client
def _run_send_code(phone: str, url: str) -> tuple[str, Any, FlowRecord]:
"""在后台线程中执行:获取代理、创建浏览器、执行 submit_phone返回 (flow_id, capture_data, flow_record)。"""
import importlib.util
from pathlib import Path
mod_path = Path(__file__).resolve().parent / "云翼云盘.py"
spec = importlib.util.spec_from_file_location("yunyiyunpan", mod_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
submit_phone = mod.submit_phone
proxy_host, proxy_port = _fetch_proxy()
ua = _random_mobile_ua()
page, env_id, client = _create_browser_with_proxy(proxy_host, proxy_port, ua, url)
body = submit_phone(page=page, phone=phone, url=url)
flow_id = uuid.uuid4().hex
now = datetime.now()
last_activity = time.time()
record = FlowRecord(
flow_id=flow_id,
url=url,
phone=phone,
created_at=now,
last_activity=last_activity,
page=page,
env_id=env_id,
client=client,
capture_data=body,
)
with _flows_lock:
_flows[flow_id] = record
return flow_id, body, record
def _run_input_code(flow_id: str, code: str) -> Any:
"""在后台线程中执行 input_code返回抓包内容。"""
import importlib.util
from pathlib import Path
mod_path = Path(__file__).resolve().parent / "云翼云盘.py"
spec = importlib.util.spec_from_file_location("yunyiyunpan", mod_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
input_code = mod.input_code
with _flows_lock:
record = _flows.get(flow_id)
if record is None:
raise ValueError(f"流程不存在或已过期: {flow_id}")
record.last_activity = time.time()
result = input_code(page=record.page, code=str(code))
return result
def _close_flow(flow_id: str) -> None:
"""关闭指定流程的浏览器。"""
with _flows_lock:
record = _flows.pop(flow_id, None)
if record is None:
return
try:
record.client.stop_browser(env_id=record.env_id)
except Exception:
pass
def _cleanup_idle_flows() -> None:
"""关闭超时的流程。"""
now = time.time()
to_close = []
with _flows_lock:
for fid, rec in list(_flows.items()):
if now - rec.last_activity >= FLOW_IDLE_TIMEOUT:
to_close.append(fid)
for fid in to_close:
_flows.pop(fid, None)
for fid in to_close:
try:
rec = None
with _flows_lock:
pass # 已移除
# 需要从某处拿到 client/env_id但 record 已移除,所以在 pop 前保存
pass
except Exception:
pass
# 重新实现:在 pop 之前先保存需要 stop 的 client 和 env_id
to_stop = []
with _flows_lock:
for fid, rec in list(_flows.items()):
if now - rec.last_activity >= FLOW_IDLE_TIMEOUT:
to_stop.append((rec.client, rec.env_id))
del _flows[fid]
for client, env_id in to_stop:
try:
client.stop_browser(env_id=env_id)
except Exception:
pass
# ---------- FastAPI ----------
app = FastAPI(title="云翼云盘 API", description="天翼云盘订购页自动化接口TgeBrowser + 代理)")
class SendCodeRequest(BaseModel):
phone: str = Field(..., description="手机号码")
url: str = Field("http://yscnb.com/tyyp/", description="目标页面 URL哪个界面注入")
class SendCodeResponse(BaseModel):
success: bool = True
flow_id: str = Field(..., description="流程唯一标识符")
capture_data: Any = Field(..., description="getYanZhenMa/v2 抓包内容")
url: str = Field(..., description="当前流程的 URL")
phone: str = Field(..., description="手机号")
created_at: str = Field(..., description="创建时间")
class SubmitCodeRequest(BaseModel):
flow_id: str = Field(..., description="第一个接口返回的唯一标识符")
code: str = Field(..., description="短信验证码")
class SubmitCodeResponse(BaseModel):
success: bool = True
capture_data: Any = Field(..., description="hn_userEquitys/common/order 抓包内容")
@app.on_event("startup")
def start_cleanup_task():
def _loop():
while True:
time.sleep(60)
try:
_cleanup_idle_flows()
except Exception:
pass
t = threading.Thread(target=_loop, daemon=True)
t.start()
@app.post("/api/send_code", response_model=SendCodeResponse)
def api_send_code(req: SendCodeRequest):
"""
提交手机号并发送验证码:使用 TgeBrowser 指纹浏览器(随机手机 UA + HTTP 代理),
打开页面,执行滑块,点击获取验证码,返回抓包内容和 flow_id。
浏览器保持打开,若 3 分钟内未调用 submit_code 将自动关闭。
"""
try:
future = _executor.submit(_run_send_code, req.phone, req.url)
flow_id, capture_data, record = future.result(timeout=120)
return SendCodeResponse(
success=True,
flow_id=flow_id,
capture_data=capture_data,
url=record.url,
phone=record.phone,
created_at=record.created_at.isoformat(),
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/submit_code", response_model=SubmitCodeResponse)
def api_submit_code(req: SubmitCodeRequest):
"""
提交验证码:根据 flow_id 找到对应浏览器,运行 input_code 输入验证码并提交,
返回 hn_userEquitys/common/order 抓包内容。
"""
try:
_cleanup_idle_flows()
future = _executor.submit(_run_input_code, req.flow_id, req.code)
capture_data = future.result(timeout=30)
return SubmitCodeResponse(success=True, capture_data=capture_data)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)