#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations from fastapi import FastAPI, HTTPException from tyyp_app.config import FLOW_CLEANUP_INTERVAL_SECONDS, PROXY_FETCH_HEADERS, SESSION_IDLE_SECONDS from tyyp_app.schemas import ( ApiSubmitCodeRequest, ApiSubmitCodeResponse, ApiSubmitPhoneRequest, ApiSubmitPhoneResponse, ) from tyyp_app.services import FlowService, FlowSessionStore, ProxyProvider def create_app() -> FastAPI: app = FastAPI( title="test1 自动化 API", description="TgeBrowser + DrissionPage 天翼页面自动化接口", ) session_store = FlowSessionStore( idle_seconds=SESSION_IDLE_SECONDS, cleanup_interval=FLOW_CLEANUP_INTERVAL_SECONDS, ) flow_service = FlowService( session_store=session_store, proxy_provider=ProxyProvider(headers=PROXY_FETCH_HEADERS, timeout=15), ) app.state.session_store = session_store app.state.flow_service = flow_service @app.on_event("startup") def _api_startup() -> None: session_store.start_cleanup_worker() @app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse) def api_submit_phone(req: ApiSubmitPhoneRequest): try: flow, packet_body = flow_service.submit_phone( phone=req.phone, url=req.url, proxy_api_url=req.proxy_api_url, ) return ApiSubmitPhoneResponse( success=True, flow_id=flow["flow_id"], data=packet_body, phone=flow["phone"], url=flow["url"], created_at=flow["created_at_iso"], proxy=flow["proxy"], ua=flow["ua"], ) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) @app.post("/api/submit_code", response_model=ApiSubmitCodeResponse) def api_submit_code(req: ApiSubmitCodeRequest): try: packet_body = flow_service.submit_code(flow_id=req.flow_id, code=req.code) return ApiSubmitCodeResponse(success=True, flow_id=req.flow_id, data=packet_body) except KeyError as exc: raise HTTPException(status_code=404, detail=str(exc)) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) @app.get("/api/flow/{flow_id}") def api_get_flow(flow_id: str): meta = flow_service.get_flow_meta(flow_id) if not meta: raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}") return {"success": True, **meta} @app.delete("/api/flow/{flow_id}") def api_close_flow(flow_id: str): closed = flow_service.close_flow(flow_id) if not closed: return {"success": False, "message": "流程不存在"} return {"success": True, "message": "流程已关闭"} return app app = create_app()