This commit is contained in:
ddrwode
2026-02-27 17:05:29 +08:00
parent 71df2f694b
commit ee32d27b7f
8 changed files with 1577 additions and 1415 deletions

1461
test1.py

File diff suppressed because it is too large Load Diff

14
tyyp_app/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from tyyp_app.api import app, create_app
from tyyp_app.automation import input_code, submit_code, submit_phone
__all__ = [
"app",
"create_app",
"submit_phone",
"submit_code",
"input_code",
]

87
tyyp_app/api.py Normal file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from fastapi import FastAPI, HTTPException
from tyyp_app.config import FLOW_CLEANUP_INTERVAL_SECONDS, PROXY_FETCH_HEADERS, SESSION_IDLE_SECONDS
from tyyp_app.schemas import (
ApiSubmitCodeRequest,
ApiSubmitCodeResponse,
ApiSubmitPhoneRequest,
ApiSubmitPhoneResponse,
)
from tyyp_app.services import FlowService, FlowSessionStore, ProxyProvider
def create_app() -> FastAPI:
app = FastAPI(
title="test1 自动化 API",
description="TgeBrowser + DrissionPage 天翼页面自动化接口",
)
session_store = FlowSessionStore(
idle_seconds=SESSION_IDLE_SECONDS,
cleanup_interval=FLOW_CLEANUP_INTERVAL_SECONDS,
)
flow_service = FlowService(
session_store=session_store,
proxy_provider=ProxyProvider(headers=PROXY_FETCH_HEADERS, timeout=15),
)
app.state.session_store = session_store
app.state.flow_service = flow_service
@app.on_event("startup")
def _api_startup() -> None:
session_store.start_cleanup_worker()
@app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse)
def api_submit_phone(req: ApiSubmitPhoneRequest):
try:
flow, packet_body = flow_service.submit_phone(
phone=req.phone,
url=req.url,
proxy_api_url=req.proxy_api_url,
)
return ApiSubmitPhoneResponse(
success=True,
flow_id=flow["flow_id"],
data=packet_body,
phone=flow["phone"],
url=flow["url"],
created_at=flow["created_at_iso"],
proxy=flow["proxy"],
ua=flow["ua"],
)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
@app.post("/api/submit_code", response_model=ApiSubmitCodeResponse)
def api_submit_code(req: ApiSubmitCodeRequest):
try:
packet_body = flow_service.submit_code(flow_id=req.flow_id, code=req.code)
return ApiSubmitCodeResponse(success=True, flow_id=req.flow_id, data=packet_body)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
@app.get("/api/flow/{flow_id}")
def api_get_flow(flow_id: str):
meta = flow_service.get_flow_meta(flow_id)
if not meta:
raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}")
return {"success": True, **meta}
@app.delete("/api/flow/{flow_id}")
def api_close_flow(flow_id: str):
closed = flow_service.close_flow(flow_id)
if not closed:
return {"success": False, "message": "流程不存在"}
return {"success": True, "message": "流程已关闭"}
return app
app = create_app()

1017
tyyp_app/automation.py Normal file

File diff suppressed because it is too large Load Diff

49
tyyp_app/cli.py Normal file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
from DrissionPage import ChromiumOptions, ChromiumPage
from tyyp_app.api import app
from tyyp_app.automation import input_code, submit_phone
from tyyp_app.config import DEFAULT_PHONE, DEFAULT_TARGET_URL
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="天翼云盘订购页自动化")
parser.add_argument("--api", action="store_true", help="以 FastAPI 服务模式启动")
parser.add_argument("--host", default="0.0.0.0", help="API 监听地址(--api 模式)")
parser.add_argument("--api-port", type=int, default=8000, help="API 监听端口(--api 模式)")
parser.add_argument("--phone", default=DEFAULT_PHONE, help="手机号码(默认用代码中的 DEFAULT_PHONE")
parser.add_argument("--url", default=DEFAULT_TARGET_URL, help="目标页面 URL")
parser.add_argument("--port", type=int, default=0, help="连接已有浏览器端口0 表示新建")
return parser
def run_once(phone: str, url: str, port: int = 0) -> None:
if port:
co = ChromiumOptions().set_local_port(port=port)
page = ChromiumPage(addr_or_opts=co)
else:
page = ChromiumPage()
print(f"打开页面: {url},手机号: {phone}")
body = submit_phone(page=page, phone=phone, url=url)
print("hn_userEquitys/getYanZhenMa/v2 响应:")
print(body)
input_code(page=page, code=123123)
def main() -> None:
args = build_parser().parse_args()
if args.api:
import uvicorn
uvicorn.run(app, host=args.host, port=args.api_port)
return
run_once(phone=args.phone, url=args.url, port=args.port)

38
tyyp_app/config.py Normal file
View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
# 项目根目录fws_code
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_PHONE = "17375712810"
DEFAULT_TARGET_URL = "http://yscnb.com/tyyp/"
GET_YAN_ZHEN_MA_URL = "hn_userEquitys/getYanZhenMa/v2"
ORDER_PACKET_URL = "hn_userEquitys/common/order"
PROXY_SOURCE_URL = "http://47.109.106.79:7002/ProxIpServiceTxt"
SESSION_IDLE_SECONDS = 180
FLOW_CLEANUP_INTERVAL_SECONDS = 15
MOBILE_UA_POOL = [
"Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; M2012K11AC) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 12; 2201123C) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; SM-S9180) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 12; V2227A) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 18_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Mobile/15E148 Safari/604.1",
]
PROXY_FETCH_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Cache-Control": "no-cache",
"DNT": "1",
"Pragma": "no-cache",
"Proxy-Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36 Edg/145.0.0.0",
}

37
tyyp_app/schemas.py Normal file
View File

@@ -0,0 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, Field
from tyyp_app.config import DEFAULT_TARGET_URL, PROXY_SOURCE_URL
class ApiSubmitPhoneRequest(BaseModel):
phone: str = Field(..., description="手机号码")
url: str = Field(DEFAULT_TARGET_URL, description="目标页面 URL")
proxy_api_url: str = Field(PROXY_SOURCE_URL, description="代理来源接口(返回 ip:port 文本)")
class ApiSubmitPhoneResponse(BaseModel):
success: bool = True
flow_id: str = Field(..., description="流程唯一标识符")
data: Any = Field(..., description="滑块流程抓包响应体getYanZhenMa")
phone: str = ""
url: str = ""
created_at: str = ""
proxy: str = ""
ua: str = ""
class ApiSubmitCodeRequest(BaseModel):
flow_id: str = Field(..., description="submit_phone 返回的流程唯一标识符")
code: str = Field(..., description="短信验证码")
class ApiSubmitCodeResponse(BaseModel):
success: bool = True
flow_id: str = ""
data: Any = Field(..., description="验证码提交后的抓包响应体")

289
tyyp_app/services.py Normal file
View File

@@ -0,0 +1,289 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
import re
import threading
import time
import uuid
from datetime import datetime
from typing import Any, Optional
import requests
from DrissionPage import ChromiumOptions, ChromiumPage
from tgebrowser_client import TgeBrowserClient
from tyyp_app.automation import input_code, submit_phone
from tyyp_app.config import (
FLOW_CLEANUP_INTERVAL_SECONDS,
MOBILE_UA_POOL,
SESSION_IDLE_SECONDS,
)
class ProxyProvider:
def __init__(self, headers: dict[str, str], timeout: float = 15):
self._headers = headers
self._timeout = timeout
def fetch_proxy_text(self, proxy_api_url: str) -> str:
resp = requests.get(proxy_api_url, headers=self._headers, timeout=self._timeout)
resp.raise_for_status()
text = (resp.text or "").strip()
if not text:
raise RuntimeError("代理接口返回为空")
first_line = text.splitlines()[0].strip()
if not first_line:
raise RuntimeError("代理接口返回格式异常:首行为空")
return first_line
@staticmethod
def parse_proxy_addr(proxy_text: str) -> tuple[str, int]:
match = re.match(r"^\s*((?:\d{1,3}\.){3}\d{1,3}):(\d{1,5})\s*$", proxy_text)
if not match:
raise RuntimeError(f"代理格式不合法(期望 ip:port: {proxy_text}")
host = match.group(1)
port = int(match.group(2))
if not (1 <= port <= 65535):
raise RuntimeError(f"代理端口不合法: {port}")
return host, port
class FlowSessionStore:
def __init__(
self,
idle_seconds: int = SESSION_IDLE_SECONDS,
cleanup_interval: int = FLOW_CLEANUP_INTERVAL_SECONDS,
):
self._idle_seconds = idle_seconds
self._cleanup_interval = cleanup_interval
self._sessions: dict[str, dict[str, Any]] = {}
self._lock = threading.Lock()
self._started = False
def start_cleanup_worker(self) -> None:
with self._lock:
if self._started:
return
thread = threading.Thread(target=self._cleanup_loop, name="flow_cleanup_worker", daemon=True)
thread.start()
self._started = True
def put(self, flow_id: str, flow: dict[str, Any]) -> None:
with self._lock:
self._sessions[flow_id] = flow
def get(self, flow_id: str) -> Optional[dict[str, Any]]:
with self._lock:
return self._sessions.get(flow_id)
def mark_busy(self, flow_id: str, busy: bool) -> Optional[dict[str, Any]]:
with self._lock:
flow = self._sessions.get(flow_id)
if not flow:
return None
flow["busy"] = busy
flow["last_active_ts"] = time.time()
return flow
def touch(self, flow_id: str) -> None:
with self._lock:
flow = self._sessions.get(flow_id)
if flow:
flow["last_active_ts"] = time.time()
def pop(self, flow_id: str) -> Optional[dict[str, Any]]:
with self._lock:
return self._sessions.pop(flow_id, None)
def _cleanup_loop(self) -> None:
while True:
expired: list[dict[str, Any]] = []
now_ts = time.time()
with self._lock:
for flow_id, flow in list(self._sessions.items()):
if flow.get("busy"):
continue
last_active_ts = float(flow.get("last_active_ts", now_ts))
if now_ts - last_active_ts > self._idle_seconds:
expired.append(self._sessions.pop(flow_id))
for flow in expired:
FlowService.close_flow_resources(flow)
time.sleep(self._cleanup_interval)
class FlowService:
def __init__(self, session_store: FlowSessionStore, proxy_provider: ProxyProvider):
self._store = session_store
self._proxy_provider = proxy_provider
@staticmethod
def _choose_mobile_ua() -> str:
return random.choice(MOBILE_UA_POOL)
@staticmethod
def _connect_page_from_start_data(start_data: dict[str, Any]) -> ChromiumPage:
port = start_data.get("port")
ws = start_data.get("ws")
if port:
co = ChromiumOptions().set_local_port(port=int(port))
return ChromiumPage(addr_or_opts=co)
if ws:
return ChromiumPage(addr_or_opts=ws)
raise RuntimeError("TgeBrowser 未返回 port 或 ws")
@staticmethod
def _create_tgebrowser_browser(
client: TgeBrowserClient,
browser_name: str,
start_page_url: str,
proxy_host: str,
proxy_port: int,
mobile_ua: str,
) -> dict[str, Any]:
proxy_candidates = [
{"protocol": "http", "host": proxy_host, "port": proxy_port},
{"protocol": "http", "host": proxy_host, "port": str(proxy_port)},
{"protocol": "http", "server": f"{proxy_host}:{proxy_port}"},
]
last_error: Optional[Exception] = None
for proxy_conf in proxy_candidates:
try:
return client.create_browser(
browser_name=browser_name,
start_page_url=start_page_url,
proxy=proxy_conf,
fingerprint={
"os": "Android",
"platformVersion": 12,
"userAgent": mobile_ua,
},
)
except Exception as exc:
last_error = exc
continue
raise RuntimeError(f"创建浏览器失败(代理配置尝试均失败): {last_error}")
@staticmethod
def _apply_mobile_ua(page: ChromiumPage, ua: str) -> None:
try:
page.run_cdp("Network.enable")
except Exception:
pass
try:
page.run_cdp("Network.setUserAgentOverride", userAgent=ua)
except Exception:
pass
@staticmethod
def close_flow_resources(flow: dict[str, Any]) -> None:
page = flow.get("page")
client = flow.get("client")
env_id = flow.get("env_id")
try:
if page:
page.quit()
except Exception:
pass
try:
if client and env_id is not None:
client.stop_browser(env_id=env_id)
except Exception:
pass
def submit_phone(self, phone: str, url: str, proxy_api_url: str) -> tuple[dict[str, Any], Any]:
client: Optional[TgeBrowserClient] = None
page: Optional[ChromiumPage] = None
env_id: Optional[int] = None
proxy_text = ""
mobile_ua = ""
try:
proxy_text = self._proxy_provider.fetch_proxy_text(proxy_api_url)
proxy_host, proxy_port = self._proxy_provider.parse_proxy_addr(proxy_text)
mobile_ua = self._choose_mobile_ua()
client = TgeBrowserClient()
phone_tail = phone[-4:] if len(phone) >= 4 else phone
browser_name = f"tyyp_{phone_tail}_{int(time.time())}"
create_data = self._create_tgebrowser_browser(
client=client,
browser_name=browser_name,
start_page_url=url,
proxy_host=proxy_host,
proxy_port=proxy_port,
mobile_ua=mobile_ua,
)
env_id = create_data.get("envId")
if env_id is None:
raise RuntimeError("创建浏览器失败:未返回 envId")
start_data = client.start_browser(env_id=env_id)
page = self._connect_page_from_start_data(start_data)
self._apply_mobile_ua(page, mobile_ua)
packet_body = submit_phone(page=page, phone=phone, url=url)
flow_id = str(uuid.uuid4())
created = datetime.now()
flow = {
"flow_id": flow_id,
"page": page,
"client": client,
"env_id": env_id,
"phone": phone,
"url": url,
"proxy": proxy_text,
"ua": mobile_ua,
"created_at": created,
"created_at_iso": created.isoformat(timespec="seconds"),
"last_active_ts": time.time(),
"busy": False,
}
self._store.put(flow_id, flow)
return flow, packet_body
except Exception:
self.close_flow_resources({"page": page, "client": client, "env_id": env_id})
raise
def submit_code(self, flow_id: str, code: str) -> Any:
flow = self._store.mark_busy(flow_id, True)
if not flow:
raise KeyError(f"流程不存在或已过期: {flow_id}")
page = flow.get("page")
if not page:
self._store.mark_busy(flow_id, False)
raise RuntimeError("流程页面对象缺失")
try:
return input_code(page=page, code=code)
finally:
self._store.mark_busy(flow_id, False)
self._store.touch(flow_id)
def get_flow_meta(self, flow_id: str) -> Optional[dict[str, Any]]:
flow = self._store.get(flow_id)
if not flow:
return None
return {
"flow_id": flow_id,
"phone": flow.get("phone"),
"url": flow.get("url"),
"proxy": flow.get("proxy"),
"ua": flow.get("ua"),
"created_at": flow.get("created_at_iso"),
"last_active_ts": flow.get("last_active_ts"),
"busy": flow.get("busy", False),
}
def close_flow(self, flow_id: str) -> bool:
flow = self._store.pop(flow_id)
if not flow:
return False
self.close_flow_resources(flow)
return True