735 lines
26 KiB
Python
735 lines
26 KiB
Python
import asyncio
|
||
import re
|
||
import random
|
||
import time
|
||
from typing import Optional, List, Dict, Any
|
||
from contextlib import asynccontextmanager
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
|
||
from loguru import logger
|
||
from urllib.parse import unquote
|
||
from fastapi import FastAPI, HTTPException, Depends
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel, Field
|
||
from telethon import TelegramClient
|
||
from telethon.errors import PhoneNumberBannedError
|
||
from telethon.tl.functions.channels import JoinChannelRequest
|
||
from telethon.tl.types import InputBotAppShortName, InputPeerUser
|
||
from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest
|
||
from datetime import datetime
|
||
import pymysql
|
||
from peewee import (
|
||
Model,
|
||
AutoField,
|
||
CharField,
|
||
IntegerField,
|
||
DateTimeField,
|
||
)
|
||
from peewee import MySQLDatabase
|
||
|
||
# 安装 pymysql 作为 MySQLdb
|
||
pymysql.install_as_MySQLdb()
|
||
|
||
# 数据库配置
|
||
db_config = {
|
||
'database': 'hx',
|
||
'user': 'hx',
|
||
'password': 'haxi@123456',
|
||
'host': '192.168.11.30',
|
||
'port': 3306,
|
||
}
|
||
|
||
# 全局数据库实例
|
||
db = MySQLDatabase(
|
||
db_config['database'],
|
||
user=db_config['user'],
|
||
password=db_config['password'],
|
||
host=db_config['host'],
|
||
port=db_config['port'],
|
||
)
|
||
|
||
|
||
class TgPhoneDevices(Model):
|
||
id = AutoField() # 自动递增的主键
|
||
area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号
|
||
phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码
|
||
device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号
|
||
system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本
|
||
app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本
|
||
lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码
|
||
system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码
|
||
is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态
|
||
api_id = IntegerField(null=True, help_text='API ID') # API ID
|
||
kick_status = IntegerField(null=True, help_text='API ID') # API ID
|
||
api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash
|
||
phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码
|
||
create_time = DateTimeField(default=datetime.now, help_text='记录创建时间')
|
||
code = CharField(null=True, ) # 电话号码
|
||
to_code = CharField(null=True, ) # 电话号码
|
||
server_ip = CharField(null=True, ) # 电话号码
|
||
proxy_type = CharField(null=True, ) # 电话号码
|
||
addr = CharField(null=True, ) # 电话号码
|
||
port = CharField(null=True, ) # 电话号码
|
||
user = CharField(null=True, ) # 电话号码
|
||
pwd = CharField(null=True, ) # 电话号码
|
||
device_start = IntegerField(null=True, ) # API ID
|
||
|
||
class Meta:
|
||
database = db # 指定数据库
|
||
table_name = 'tg_phone_devices' # 指定表名称
|
||
|
||
|
||
# ================== 配置层 ==================
|
||
class Config:
|
||
"""应用配置类"""
|
||
SECRET_KEY = 'your-secret-key'
|
||
TG_SESSION_PATH = r"C:\sessions"
|
||
PROXY_CONFIG = {
|
||
'proxy_type': 'http',
|
||
'addr': '192.168.50.220',
|
||
'port': 10811,
|
||
'username': '',
|
||
'password': ''
|
||
}
|
||
MAX_CONCURRENT_SESSIONS = 1
|
||
|
||
|
||
# ================== 数据模型层 ==================
|
||
class SessionStatus(Enum):
|
||
"""会话状态枚举"""
|
||
IDLE = "idle"
|
||
BUSY = "busy"
|
||
ERROR = "error"
|
||
|
||
|
||
@dataclass
|
||
class SessionInfo:
|
||
"""会话信息数据类"""
|
||
phone: str
|
||
status: SessionStatus
|
||
lock: asyncio.Lock
|
||
last_used: float
|
||
|
||
|
||
class VerificationRequest(BaseModel):
|
||
"""验证码请求模型"""
|
||
phone: str = Field(..., description="手机号码")
|
||
|
||
|
||
class TokenRequest(BaseModel):
|
||
"""Token请求模型"""
|
||
phone: str = Field(..., description="手机号码")
|
||
bot_name: str = Field(..., description="Bot用户名")
|
||
invite_code: Optional[str] = Field(None, description="邀请码")
|
||
platform: Optional[str] = Field(None, description="平台类型")
|
||
start_params: Optional[str] = Field(None, description="启动参数")
|
||
url: Optional[str] = Field(None, description="自定义URL")
|
||
|
||
|
||
class BotInteractionRequest(BaseModel):
|
||
"""Bot交互请求模型"""
|
||
phone: str = Field(..., description="手机号码")
|
||
bot_name: str = Field(..., description="Bot用户名")
|
||
invite_code: Optional[str] = Field(None, description="邀请码")
|
||
datas: List[Dict[str, Any]] = Field(..., description="交互数据")
|
||
|
||
|
||
class BotMessageRequest(BaseModel):
|
||
"""Bot消息请求模型"""
|
||
phone: str = Field(..., description="手机号码")
|
||
bot_name: str = Field(..., description="Bot用户名")
|
||
invite_code: Optional[str] = Field(None, description="邀请码")
|
||
|
||
|
||
class JoinChannelRequestModel(BaseModel):
|
||
"""加入频道请求模型"""
|
||
phone: str = Field(..., description="手机号码")
|
||
channel_names: List[str] = Field(..., description="频道名称列表")
|
||
|
||
|
||
class ApiResponse(BaseModel):
|
||
"""统一API响应模型"""
|
||
code: int = Field(200, description="状态码")
|
||
message: str = Field("操作成功", description="响应消息")
|
||
data: Optional[Any] = Field(None, description="响应数据")
|
||
errors: Optional[List[str]] = Field(None, description="错误列表")
|
||
|
||
|
||
# ================== 会话管理器 ==================
|
||
class SessionManager:
|
||
"""会话管理器,控制同一号码的并发访问"""
|
||
|
||
def __init__(self):
|
||
self._sessions: Dict[str, SessionInfo] = {}
|
||
self._cleanup_task: Optional[asyncio.Task] = None
|
||
|
||
async def get_session(self, phone: str) -> SessionInfo:
|
||
"""获取或创建会话信息"""
|
||
if phone not in self._sessions:
|
||
self._sessions[phone] = SessionInfo(
|
||
phone=phone,
|
||
status=SessionStatus.IDLE,
|
||
lock=asyncio.Lock(),
|
||
last_used=time.time()
|
||
)
|
||
return self._sessions[phone]
|
||
|
||
async def acquire_session(self, phone: str, timeout: float = 30.0) -> bool:
|
||
"""获取会话锁"""
|
||
session = await self.get_session(phone)
|
||
try:
|
||
await asyncio.wait_for(session.lock.acquire(), timeout=timeout)
|
||
session.status = SessionStatus.BUSY
|
||
session.last_used = time.time()
|
||
return True
|
||
except asyncio.TimeoutError:
|
||
logger.warning(f"获取会话锁超时: {phone}")
|
||
return False
|
||
|
||
def release_session(self, phone: str):
|
||
"""释放会话锁"""
|
||
if phone in self._sessions:
|
||
session = self._sessions[phone]
|
||
session.status = SessionStatus.IDLE
|
||
session.last_used = time.time()
|
||
if session.lock.locked():
|
||
session.lock.release()
|
||
|
||
async def cleanup_expired_sessions(self):
|
||
"""清理过期会话"""
|
||
current_time = time.time()
|
||
expired_phones = []
|
||
|
||
for phone, session in self._sessions.items():
|
||
if current_time - session.last_used > 300: # 5分钟过期
|
||
expired_phones.append(phone)
|
||
|
||
for phone in expired_phones:
|
||
self.release_session(phone)
|
||
del self._sessions[phone]
|
||
logger.info(f"清理过期会话: {phone}")
|
||
|
||
async def start_cleanup_task(self):
|
||
"""启动清理任务"""
|
||
while True:
|
||
await asyncio.sleep(60) # 每分钟清理一次
|
||
await self.cleanup_expired_sessions()
|
||
|
||
|
||
# ================== 服务层 ==================
|
||
class TelegramService:
|
||
"""Telegram客户端服务类"""
|
||
|
||
def __init__(self, server_config):
|
||
self.server_config = server_config
|
||
self.client: Optional[TelegramClient] = None
|
||
|
||
async def __aenter__(self):
|
||
"""异步上下文管理器入口"""
|
||
try:
|
||
self.client = await self._create_client()
|
||
await self.client.connect()
|
||
|
||
if not await self.client.is_user_authorized():
|
||
self.server_config.is_valid_session = 0
|
||
self.server_config.save(only=[TgPhoneDevices.is_valid_session])
|
||
raise Exception("用户未授权")
|
||
|
||
return self
|
||
except PhoneNumberBannedError:
|
||
logger.error(f"电话号码{self.server_config.phone}被封禁")
|
||
self.server_config.is_valid_session = -1
|
||
self.server_config.save(only=[TgPhoneDevices.is_valid_session])
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"客户端连接失败: {str(e)}")
|
||
if "Connection to Telegram failed" in str(e):
|
||
logger.error(f"电话号码{self.server_config.phone}代理失效")
|
||
self.server_config.is_valid_session = 4
|
||
self.server_config.save(only=[TgPhoneDevices.is_valid_session])
|
||
raise
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
"""异步上下文管理器出口"""
|
||
try:
|
||
if self.client and self.client.is_connected():
|
||
await self.client.disconnect()
|
||
except Exception as e:
|
||
logger.error(f"客户端断开连接失败: {str(e)}")
|
||
|
||
async def _create_client(self) -> TelegramClient:
|
||
"""创建Telegram客户端实例"""
|
||
proxy = {
|
||
'proxy_type': self.server_config.proxy_type,
|
||
'addr': self.server_config.addr,
|
||
'port': int(self.server_config.port),
|
||
'username': self.server_config.user or "",
|
||
'password': self.server_config.pwd or ""
|
||
}
|
||
|
||
return TelegramClient(
|
||
session=fr"{Config.TG_SESSION_PATH}\{self.server_config.phone}",
|
||
api_id=self.server_config.api_id,
|
||
api_hash=self.server_config.api_hash,
|
||
device_model=self.server_config.device_model,
|
||
system_version=self.server_config.system_version,
|
||
app_version=self.server_config.app_version,
|
||
system_lang_code=self.server_config.system_lang_code,
|
||
lang_code=self.server_config.lang_code,
|
||
proxy=proxy,
|
||
auto_reconnect=False,
|
||
)
|
||
|
||
@staticmethod
|
||
def extract_verification_code(text: str) -> Optional[str]:
|
||
"""从文本中提取5位数字验证码"""
|
||
try:
|
||
matches = re.findall(r'\b\d{5}\b', text)
|
||
return matches[0] if matches else None
|
||
except (IndexError, TypeError, AttributeError) as e:
|
||
logger.warning(f"验证码提取失败: {str(e)}")
|
||
return None
|
||
|
||
async def process_verification(self) -> Optional[str]:
|
||
"""处理验证流程,获取验证码"""
|
||
try:
|
||
messages = await self.client.get_messages(777000, limit=1)
|
||
if messages and messages[0].text:
|
||
return self.extract_verification_code(messages[0].text)
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"验证流程处理失败: {str(e)}")
|
||
return None
|
||
|
||
async def click_confirmation(self) -> bool:
|
||
"""点击确认按钮"""
|
||
try:
|
||
messages = await self.client.get_messages(777000, limit=5)
|
||
for msg in messages:
|
||
if hasattr(msg, 'buttons') and msg.buttons:
|
||
for row in msg.buttons:
|
||
for btn in row:
|
||
if hasattr(btn, 'text') and 'confirm' in btn.text.lower():
|
||
await btn.click()
|
||
return True
|
||
return False
|
||
except PhoneNumberBannedError:
|
||
logger.warning("点击确认时发现手机号被封禁")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"点击确认失败: {str(e)}")
|
||
return False
|
||
|
||
async def check_bot_name(self, bot_username: str) -> bool:
|
||
"""检查是否已经打开过指定bot"""
|
||
try:
|
||
async for dialog in self.client.iter_dialogs():
|
||
try:
|
||
if hasattr(dialog.entity, 'usernames'):
|
||
for username in dialog.entity.usernames:
|
||
if username.username == bot_username:
|
||
logger.info(f'已与机器人交互: @{bot_username}')
|
||
return True
|
||
except:
|
||
pass
|
||
if dialog.is_user and dialog.entity.username == bot_username:
|
||
logger.info(f'已与机器人交互: @{bot_username}')
|
||
return True
|
||
logger.info(f'尚未与机器人交互: @{bot_username}')
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"检查bot名称失败: {str(e)}")
|
||
return False
|
||
|
||
async def join_bot(self, bot_name: str, invite_code: str = "") -> bool:
|
||
"""加入指定的bot"""
|
||
try:
|
||
bot = await self.client.get_entity(bot_name)
|
||
if invite_code:
|
||
await self.client(StartBotRequest(
|
||
bot=bot_name,
|
||
peer=bot_name,
|
||
start_param=invite_code
|
||
))
|
||
else:
|
||
await self.client.send_message(bot, '/start')
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"加入bot失败: {str(e)}")
|
||
return False
|
||
|
||
async def get_bot_token(self, bot_name: str, invite_code: Optional[str] = None,
|
||
platform: Optional[str] = None, start_params: Optional[str] = None,
|
||
url: Optional[str] = None) -> str:
|
||
"""获取bot的webview token"""
|
||
platform = platform or random.choice(['web', 'tdesktop'])
|
||
|
||
try:
|
||
if start_params:
|
||
me = await self.client.get_me()
|
||
logger.info(f'账号 {self.server_config.phone} -- {me.first_name} 登录成功!')
|
||
|
||
res = await self.client(RequestAppWebViewRequest(
|
||
'me',
|
||
InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params),
|
||
platform="web",
|
||
start_param=invite_code
|
||
))
|
||
else:
|
||
bot = await self.client.get_entity(bot_name)
|
||
peer = InputPeerUser(bot.id, bot.access_hash)
|
||
res = await self.client(
|
||
RequestWebViewRequest(
|
||
peer=peer,
|
||
bot=bot,
|
||
platform=platform,
|
||
from_bot_menu=True,
|
||
url=url,
|
||
)
|
||
)
|
||
|
||
if not res or not res.url:
|
||
raise ValueError("从Telegram获取的响应或URL为空")
|
||
|
||
parts = res.url.split('tgWebAppData=')
|
||
if len(parts) < 2:
|
||
raise ValueError("URL格式无效 - 缺少tgWebAppData参数")
|
||
|
||
token_part = parts[1].split('&tgWebAppVersion')[0]
|
||
return unquote(token_part)
|
||
except Exception as e:
|
||
logger.error(f"电话号码:{self.server_config.phone},Token生成失败: {str(e)}")
|
||
raise
|
||
|
||
async def send_click_bot_message(self, bot_name: str, datas: List[Dict[str, Any]]) -> None:
|
||
"""与bot交互,发送消息或点击按钮"""
|
||
bot = await self.client.get_entity(bot_name)
|
||
|
||
for data in datas:
|
||
if data.get('send_message'):
|
||
for message in data['send_message']:
|
||
await self.client.send_message(bot, message)
|
||
logger.info(f'电话号码:{self.server_config.phone},发送消息:{message}成功!')
|
||
await asyncio.sleep(1)
|
||
break
|
||
|
||
if data.get('click_button'):
|
||
for button_name in data["click_button"]:
|
||
messages = await self.client.get_messages(bot, limit=5)
|
||
if not messages:
|
||
logger.warning("未找到验证消息")
|
||
continue
|
||
|
||
for message in messages:
|
||
if message.buttons:
|
||
for row in message.buttons:
|
||
for button in row:
|
||
if button_name.lower() in button.text.lower():
|
||
try:
|
||
await button.click()
|
||
logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!')
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"点击按钮失败: {str(e)}")
|
||
|
||
async def get_bot_message(self, bot_name: str) -> str:
|
||
"""获取bot最新消息"""
|
||
try:
|
||
bot = await self.client.get_entity(bot_name)
|
||
messages = await self.client.get_messages(bot, limit=1)
|
||
|
||
for message in messages:
|
||
return message.message
|
||
return ""
|
||
except Exception as e:
|
||
logger.error(f"获取bot消息失败: {str(e)}")
|
||
return ""
|
||
|
||
async def join_channels(self, channel_names: List[str]) -> bool:
|
||
"""加入多个频道"""
|
||
try:
|
||
for channel_name in channel_names:
|
||
await self.client(JoinChannelRequest(channel_name))
|
||
logger.info(f"成功加入频道: {channel_name}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"加入频道失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
# ================== 依赖注入 ==================
|
||
session_manager = SessionManager()
|
||
|
||
|
||
async def get_device(phone: str) -> TgPhoneDevices:
|
||
"""获取设备信息的依赖函数"""
|
||
device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone)
|
||
if not device:
|
||
raise HTTPException(status_code=404, detail="设备未注册")
|
||
return device
|
||
|
||
|
||
async def acquire_session_lock(phone: str) -> bool:
|
||
"""获取会话锁的依赖函数"""
|
||
if not await session_manager.acquire_session(phone):
|
||
raise HTTPException(status_code=429, detail="该号码正在被其他请求使用,请稍后重试")
|
||
return True
|
||
|
||
|
||
# ================== FastAPI应用 ==================
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
"""应用生命周期管理"""
|
||
# 启动时
|
||
session_manager._cleanup_task = asyncio.create_task(session_manager.start_cleanup_task())
|
||
logger.info("应用启动完成")
|
||
|
||
yield
|
||
|
||
# 关闭时
|
||
if session_manager._cleanup_task:
|
||
session_manager._cleanup_task.cancel()
|
||
try:
|
||
await session_manager._cleanup_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("应用关闭完成")
|
||
|
||
|
||
app = FastAPI(
|
||
title="Telegram API Service",
|
||
description="Telegram机器人交互API服务",
|
||
version="2.0.0",
|
||
lifespan=lifespan
|
||
)
|
||
|
||
# 添加CORS中间件
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
# ================== 路由层 ==================
|
||
@app.get("/api/check_phone", response_model=ApiResponse)
|
||
async def check_phone(
|
||
phone: str,
|
||
device: TgPhoneDevices = Depends(get_device),
|
||
_: bool = Depends(acquire_session_lock)
|
||
):
|
||
"""检查手机号状态接口"""
|
||
try:
|
||
async with TelegramService(device) as service:
|
||
code = await service.process_verification()
|
||
if code:
|
||
return ApiResponse(
|
||
code=200,
|
||
message="验证码获取成功",
|
||
data={"verification_code": code}
|
||
)
|
||
return ApiResponse(
|
||
code=422,
|
||
message="验证码解析失败"
|
||
)
|
||
except PhoneNumberBannedError:
|
||
raise HTTPException(status_code=403, detail="账号已被封禁")
|
||
except Exception as e:
|
||
logger.error(f"服务异常: {str(e)}")
|
||
raise HTTPException(status_code=500, detail="服务器内部错误")
|
||
finally:
|
||
session_manager.release_session(phone)
|
||
|
||
|
||
@app.get("/api/click_confirm", response_model=ApiResponse)
|
||
async def click_confirm(
|
||
phone: str,
|
||
device: TgPhoneDevices = Depends(get_device),
|
||
_: bool = Depends(acquire_session_lock)
|
||
):
|
||
"""点击确认按钮接口"""
|
||
try:
|
||
async with TelegramService(device) as service:
|
||
result = await service.click_confirmation()
|
||
if result:
|
||
return ApiResponse(
|
||
code=200,
|
||
message="确认操作成功"
|
||
)
|
||
return ApiResponse(
|
||
code=404,
|
||
message="未找到确认按钮"
|
||
)
|
||
except PhoneNumberBannedError:
|
||
raise HTTPException(status_code=403, detail="账号已被封禁")
|
||
except Exception as e:
|
||
logger.error(f"确认异常: {str(e)}")
|
||
raise HTTPException(status_code=500, detail="服务器内部错误")
|
||
finally:
|
||
session_manager.release_session(phone)
|
||
|
||
|
||
@app.post("/api/get_token", response_model=ApiResponse)
|
||
async def get_token(
|
||
request: TokenRequest,
|
||
):
|
||
"""获取bot token接口"""
|
||
try:
|
||
device = await get_device(request.phone)
|
||
await acquire_session_lock(request.phone)
|
||
async with TelegramService(device) as service:
|
||
# 检查并加入bot
|
||
if not await service.check_bot_name(bot_username=request.bot_name):
|
||
success = await service.join_bot(
|
||
bot_name=request.bot_name,
|
||
invite_code=request.invite_code or ""
|
||
)
|
||
if not success:
|
||
raise HTTPException(status_code=500, detail="加入bot失败")
|
||
|
||
# 获取token
|
||
token = await service.get_bot_token(
|
||
bot_name=request.bot_name,
|
||
invite_code=request.invite_code,
|
||
platform=request.platform,
|
||
start_params=request.start_params,
|
||
url=request.url
|
||
)
|
||
|
||
return ApiResponse(
|
||
code=200,
|
||
message="Token获取成功",
|
||
data=token
|
||
)
|
||
except PhoneNumberBannedError:
|
||
raise HTTPException(status_code=403, detail="账号已被封禁")
|
||
except Exception as e:
|
||
logger.error(f"Token生成失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"Token生成失败: {str(e)}")
|
||
finally:
|
||
session_manager.release_session(request.phone)
|
||
|
||
|
||
@app.post("/api/send_click", response_model=ApiResponse)
|
||
async def send_click(
|
||
request: BotInteractionRequest,
|
||
):
|
||
"""与bot交互,发送消息或点击按钮"""
|
||
try:
|
||
device = await get_device(request.phone)
|
||
await acquire_session_lock(request.phone)
|
||
async with TelegramService(device) as service:
|
||
# 检查并加入bot
|
||
if not await service.check_bot_name(bot_username=request.bot_name):
|
||
success = await service.join_bot(
|
||
bot_name=request.bot_name,
|
||
invite_code=request.invite_code or ""
|
||
)
|
||
if not success:
|
||
raise HTTPException(status_code=500, detail="加入bot失败")
|
||
|
||
# 执行交互操作
|
||
await service.send_click_bot_message(
|
||
bot_name=request.bot_name,
|
||
datas=request.datas
|
||
)
|
||
|
||
return ApiResponse(
|
||
code=200,
|
||
message="交互操作成功"
|
||
)
|
||
except PhoneNumberBannedError:
|
||
raise HTTPException(status_code=403, detail="账号已被封禁")
|
||
except Exception as e:
|
||
logger.error(f"交互操作失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"交互操作失败: {str(e)}")
|
||
finally:
|
||
session_manager.release_session(request.phone)
|
||
|
||
|
||
@app.post("/api/get_bot_message", response_model=ApiResponse)
|
||
async def get_bot_message(
|
||
request: BotMessageRequest,
|
||
):
|
||
"""获取bot最新消息"""
|
||
try:
|
||
device = await get_device(request.phone)
|
||
await acquire_session_lock(request.phone)
|
||
async with TelegramService(device) as service:
|
||
# 检查并加入bot
|
||
if not await service.check_bot_name(bot_username=request.bot_name):
|
||
success = await service.join_bot(
|
||
bot_name=request.bot_name,
|
||
invite_code=request.invite_code or ""
|
||
)
|
||
if not success:
|
||
raise HTTPException(status_code=500, detail="加入bot失败")
|
||
|
||
# 获取消息
|
||
message = await service.get_bot_message(bot_name=request.bot_name)
|
||
|
||
return ApiResponse(
|
||
code=200,
|
||
message="获取消息成功",
|
||
data=message
|
||
)
|
||
except PhoneNumberBannedError:
|
||
raise HTTPException(status_code=403, detail="账号已被封禁")
|
||
except Exception as e:
|
||
logger.error(f"获取消息失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"获取消息失败: {str(e)}")
|
||
finally:
|
||
session_manager.release_session(request.phone)
|
||
|
||
|
||
@app.post("/api/join_channels", response_model=ApiResponse)
|
||
async def join_channels(
|
||
request: JoinChannelRequestModel,
|
||
):
|
||
"""加入多个频道"""
|
||
try:
|
||
device = await get_device(request.phone)
|
||
await acquire_session_lock(request.phone)
|
||
async with TelegramService(device) as service:
|
||
success = await service.join_channels(request.channel_names)
|
||
|
||
if success:
|
||
return ApiResponse(
|
||
code=200,
|
||
message="加入频道成功"
|
||
)
|
||
else:
|
||
return ApiResponse(
|
||
code=500,
|
||
message="加入频道失败"
|
||
)
|
||
except PhoneNumberBannedError:
|
||
raise HTTPException(status_code=403, detail="账号已被封禁")
|
||
except Exception as e:
|
||
logger.error(f"加入频道失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"加入频道失败: {str(e)}")
|
||
finally:
|
||
session_manager.release_session(request.phone)
|
||
|
||
|
||
# ================== 健康检查 ==================
|
||
@app.get("/health")
|
||
async def health_check():
|
||
"""健康检查接口"""
|
||
return {"status": "healthy", "timestamp": time.time()}
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import uvicorn
|
||
|
||
uvicorn.run(
|
||
"app:app",
|
||
host="0.0.0.0",
|
||
port=9000,
|
||
reload=True,
|
||
log_level="info"
|
||
)
|