99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
天翼订购页 1.html 接口:
|
||
- POST /api/submit_phone 传电话号码,完成滑块后返回 hn_userEquitys/getYanZhenMa/v2 响应
|
||
- POST /api/submit_code 传验证码,点击确认订购
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import threading
|
||
from typing import Any, Optional
|
||
|
||
from fastapi import FastAPI, HTTPException
|
||
from pydantic import BaseModel, Field
|
||
|
||
from tyyp_service import submit_phone as do_submit_phone, submit_code as do_submit_code
|
||
|
||
app = FastAPI(title="天翼订购页 API", description="1.html 手机号与验证码提交流程")
|
||
|
||
# 全局页面实例,用于 submit_phone 与 submit_code 之间复用
|
||
_page: Optional[Any] = None
|
||
_page_lock = threading.Lock()
|
||
|
||
|
||
def _get_or_create_page():
|
||
global _page
|
||
with _page_lock:
|
||
if _page is None:
|
||
from DrissionPage import ChromiumPage
|
||
_page = ChromiumPage()
|
||
return _page
|
||
|
||
|
||
class SubmitPhoneRequest(BaseModel):
|
||
phone: str = Field(..., description="手机号码")
|
||
port: int = Field(0, description="连接已有浏览器端口,0 表示新建")
|
||
url: str = Field("http://yscnb.com/tyyp/1.html", description="目标页面 URL")
|
||
|
||
|
||
class SubmitPhoneResponse(BaseModel):
|
||
success: bool = True
|
||
data: Any = Field(..., description="hn_userEquitys/getYanZhenMa/v2 接口返回的响应体")
|
||
|
||
|
||
class SubmitCodeRequest(BaseModel):
|
||
code: str = Field(..., description="短信验证码")
|
||
|
||
|
||
class SubmitCodeResponse(BaseModel):
|
||
success: bool = True
|
||
message: str = ""
|
||
|
||
|
||
@app.post("/api/submit_phone", response_model=SubmitPhoneResponse)
|
||
def api_submit_phone(req: SubmitPhoneRequest):
|
||
"""
|
||
提交手机号:填写手机号、点击获取验证码、执行滑块验证,
|
||
监听 hn_userEquitys/getYanZhenMa/v2,将其响应体原样返回。
|
||
"""
|
||
global _page
|
||
try:
|
||
from DrissionPage import ChromiumOptions, ChromiumPage
|
||
if req.port:
|
||
co = ChromiumOptions().set_local_port(port=req.port)
|
||
page = ChromiumPage(addr_or_opts=co)
|
||
with _page_lock:
|
||
_page = page # 供 submit_code 复用
|
||
else:
|
||
page = _get_or_create_page()
|
||
|
||
body = do_submit_phone(
|
||
page=page,
|
||
phone=req.phone,
|
||
url=req.url,
|
||
port=req.port,
|
||
)
|
||
return SubmitPhoneResponse(success=True, data=body)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@app.post("/api/submit_code", response_model=SubmitCodeResponse)
|
||
def api_submit_code(req: SubmitCodeRequest):
|
||
"""
|
||
提交验证码:在已打开并处于验证码表单的页面上,
|
||
填写验证码并点击确认订购按钮(img.btn-buy)。
|
||
"""
|
||
try:
|
||
page = _get_or_create_page()
|
||
result = do_submit_code(page=page, code=req.code)
|
||
return SubmitCodeResponse(success=True, message=result.get("message", "已点击确认订购"))
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|