Files
fws_code/tyyp_app/api.py
ddrwode ee32d27b7f haha
2026-02-27 17:05:29 +08:00

88 lines
2.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from fastapi import FastAPI, HTTPException
from tyyp_app.config import FLOW_CLEANUP_INTERVAL_SECONDS, PROXY_FETCH_HEADERS, SESSION_IDLE_SECONDS
from tyyp_app.schemas import (
ApiSubmitCodeRequest,
ApiSubmitCodeResponse,
ApiSubmitPhoneRequest,
ApiSubmitPhoneResponse,
)
from tyyp_app.services import FlowService, FlowSessionStore, ProxyProvider
def create_app() -> FastAPI:
app = FastAPI(
title="test1 自动化 API",
description="TgeBrowser + DrissionPage 天翼页面自动化接口",
)
session_store = FlowSessionStore(
idle_seconds=SESSION_IDLE_SECONDS,
cleanup_interval=FLOW_CLEANUP_INTERVAL_SECONDS,
)
flow_service = FlowService(
session_store=session_store,
proxy_provider=ProxyProvider(headers=PROXY_FETCH_HEADERS, timeout=15),
)
app.state.session_store = session_store
app.state.flow_service = flow_service
@app.on_event("startup")
def _api_startup() -> None:
session_store.start_cleanup_worker()
@app.post("/api/submit_phone", response_model=ApiSubmitPhoneResponse)
def api_submit_phone(req: ApiSubmitPhoneRequest):
try:
flow, packet_body = flow_service.submit_phone(
phone=req.phone,
url=req.url,
proxy_api_url=req.proxy_api_url,
)
return ApiSubmitPhoneResponse(
success=True,
flow_id=flow["flow_id"],
data=packet_body,
phone=flow["phone"],
url=flow["url"],
created_at=flow["created_at_iso"],
proxy=flow["proxy"],
ua=flow["ua"],
)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
@app.post("/api/submit_code", response_model=ApiSubmitCodeResponse)
def api_submit_code(req: ApiSubmitCodeRequest):
try:
packet_body = flow_service.submit_code(flow_id=req.flow_id, code=req.code)
return ApiSubmitCodeResponse(success=True, flow_id=req.flow_id, data=packet_body)
except KeyError as exc:
raise HTTPException(status_code=404, detail=str(exc))
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
@app.get("/api/flow/{flow_id}")
def api_get_flow(flow_id: str):
meta = flow_service.get_flow_meta(flow_id)
if not meta:
raise HTTPException(status_code=404, detail=f"流程不存在或已过期: {flow_id}")
return {"success": True, **meta}
@app.delete("/api/flow/{flow_id}")
def api_close_flow(flow_id: str):
closed = flow_service.close_flow(flow_id)
if not closed:
return {"success": False, "message": "流程不存在"}
return {"success": True, "message": "流程已关闭"}
return app
app = create_app()