import asyncio import re import random import time from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from dataclasses import dataclass from enum import Enum from loguru import logger from urllib.parse import unquote from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from telethon import TelegramClient from telethon.errors import PhoneNumberBannedError from telethon.tl.functions.channels import JoinChannelRequest from telethon.tl.types import InputBotAppShortName, InputPeerUser from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest from datetime import datetime import pymysql from peewee import ( Model, AutoField, CharField, IntegerField, DateTimeField, ) from peewee import MySQLDatabase # 安装 pymysql 作为 MySQLdb pymysql.install_as_MySQLdb() # 数据库配置 db_config = { 'database': 'hx', 'user': 'hx', 'password': 'haxi@123456', 'host': '192.168.11.30', 'port': 3306, } # 全局数据库实例 db = MySQLDatabase( db_config['database'], user=db_config['user'], password=db_config['password'], host=db_config['host'], port=db_config['port'], ) class TgPhoneDevices(Model): id = AutoField() # 自动递增的主键 area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号 phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码 device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号 system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本 app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本 lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码 system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码 is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态 api_id = IntegerField(null=True, help_text='API ID') # API ID kick_status = IntegerField(null=True, help_text='API ID') # API ID api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码 create_time = DateTimeField(default=datetime.now, help_text='记录创建时间') code = CharField(null=True, ) # 电话号码 to_code = CharField(null=True, ) # 电话号码 server_ip = CharField(null=True, ) # 电话号码 proxy_type = CharField(null=True, ) # 电话号码 addr = CharField(null=True, ) # 电话号码 port = CharField(null=True, ) # 电话号码 user = CharField(null=True, ) # 电话号码 pwd = CharField(null=True, ) # 电话号码 device_start = IntegerField(null=True, ) # API ID class Meta: database = db # 指定数据库 table_name = 'tg_phone_devices' # 指定表名称 # ================== 配置层 ================== class Config: """应用配置类""" SECRET_KEY = 'your-secret-key' TG_SESSION_PATH = r"C:\sessions" PROXY_CONFIG = { 'proxy_type': 'http', 'addr': '192.168.50.220', 'port': 10811, 'username': '', 'password': '' } MAX_CONCURRENT_SESSIONS = 1 # ================== 数据模型层 ================== class SessionStatus(Enum): """会话状态枚举""" IDLE = "idle" BUSY = "busy" ERROR = "error" @dataclass class SessionInfo: """会话信息数据类""" phone: str status: SessionStatus lock: asyncio.Lock last_used: float class VerificationRequest(BaseModel): """验证码请求模型""" phone: str = Field(..., description="手机号码") class TokenRequest(BaseModel): """Token请求模型""" phone: str = Field(..., description="手机号码") bot_name: str = Field(..., description="Bot用户名") invite_code: Optional[str] = Field(None, description="邀请码") platform: Optional[str] = Field(None, description="平台类型") start_params: Optional[str] = Field(None, description="启动参数") url: Optional[str] = Field(None, description="自定义URL") class BotInteractionRequest(BaseModel): """Bot交互请求模型""" phone: str = Field(..., description="手机号码") bot_name: str = Field(..., description="Bot用户名") invite_code: Optional[str] = Field(None, description="邀请码") datas: List[Dict[str, Any]] = Field(..., description="交互数据") class BotMessageRequest(BaseModel): """Bot消息请求模型""" phone: str = Field(..., description="手机号码") bot_name: str = Field(..., description="Bot用户名") invite_code: Optional[str] = Field(None, description="邀请码") class JoinChannelRequestModel(BaseModel): """加入频道请求模型""" phone: str = Field(..., description="手机号码") channel_names: List[str] = Field(..., description="频道名称列表") class ApiResponse(BaseModel): """统一API响应模型""" code: int = Field(200, description="状态码") message: str = Field("操作成功", description="响应消息") data: Optional[Any] = Field(None, description="响应数据") errors: Optional[List[str]] = Field(None, description="错误列表") # ================== 会话管理器 ================== class SessionManager: """会话管理器,控制同一号码的并发访问""" def __init__(self): self._sessions: Dict[str, SessionInfo] = {} self._cleanup_task: Optional[asyncio.Task] = None async def get_session(self, phone: str) -> SessionInfo: """获取或创建会话信息""" if phone not in self._sessions: self._sessions[phone] = SessionInfo( phone=phone, status=SessionStatus.IDLE, lock=asyncio.Lock(), last_used=time.time() ) return self._sessions[phone] async def acquire_session(self, phone: str, timeout: float = 30.0) -> bool: """获取会话锁""" session = await self.get_session(phone) try: await asyncio.wait_for(session.lock.acquire(), timeout=timeout) session.status = SessionStatus.BUSY session.last_used = time.time() return True except asyncio.TimeoutError: logger.warning(f"获取会话锁超时: {phone}") return False def release_session(self, phone: str): """释放会话锁""" if phone in self._sessions: session = self._sessions[phone] session.status = SessionStatus.IDLE session.last_used = time.time() if session.lock.locked(): session.lock.release() async def cleanup_expired_sessions(self): """清理过期会话""" current_time = time.time() expired_phones = [] for phone, session in self._sessions.items(): if current_time - session.last_used > 300: # 5分钟过期 expired_phones.append(phone) for phone in expired_phones: self.release_session(phone) del self._sessions[phone] logger.info(f"清理过期会话: {phone}") async def start_cleanup_task(self): """启动清理任务""" while True: await asyncio.sleep(60) # 每分钟清理一次 await self.cleanup_expired_sessions() # ================== 服务层 ================== class TelegramService: """Telegram客户端服务类""" def __init__(self, server_config): self.server_config = server_config self.client: Optional[TelegramClient] = None async def __aenter__(self): """异步上下文管理器入口""" try: self.client = await self._create_client() await self.client.connect() if not await self.client.is_user_authorized(): self.server_config.is_valid_session = 0 self.server_config.save(only=[TgPhoneDevices.is_valid_session]) raise Exception("用户未授权") return self except PhoneNumberBannedError: logger.error(f"电话号码{self.server_config.phone}被封禁") self.server_config.is_valid_session = -1 self.server_config.save(only=[TgPhoneDevices.is_valid_session]) raise except Exception as e: logger.error(f"客户端连接失败: {str(e)}") if "Connection to Telegram failed" in str(e): logger.error(f"电话号码{self.server_config.phone}代理失效") self.server_config.is_valid_session = 4 self.server_config.save(only=[TgPhoneDevices.is_valid_session]) raise async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" try: if self.client and self.client.is_connected(): await self.client.disconnect() except Exception as e: logger.error(f"客户端断开连接失败: {str(e)}") async def _create_client(self) -> TelegramClient: """创建Telegram客户端实例""" proxy = { 'proxy_type': self.server_config.proxy_type, 'addr': self.server_config.addr, 'port': int(self.server_config.port), 'username': self.server_config.user or "", 'password': self.server_config.pwd or "" } return TelegramClient( session=fr"{Config.TG_SESSION_PATH}\{self.server_config.phone}", api_id=self.server_config.api_id, api_hash=self.server_config.api_hash, device_model=self.server_config.device_model, system_version=self.server_config.system_version, app_version=self.server_config.app_version, system_lang_code=self.server_config.system_lang_code, lang_code=self.server_config.lang_code, proxy=proxy, auto_reconnect=False, ) @staticmethod def extract_verification_code(text: str) -> Optional[str]: """从文本中提取5位数字验证码""" try: matches = re.findall(r'\b\d{5}\b', text) return matches[0] if matches else None except (IndexError, TypeError, AttributeError) as e: logger.warning(f"验证码提取失败: {str(e)}") return None async def process_verification(self) -> Optional[str]: """处理验证流程,获取验证码""" try: messages = await self.client.get_messages(777000, limit=1) if messages and messages[0].text: return self.extract_verification_code(messages[0].text) return None except Exception as e: logger.error(f"验证流程处理失败: {str(e)}") return None async def click_confirmation(self) -> bool: """点击确认按钮""" try: messages = await self.client.get_messages(777000, limit=5) for msg in messages: if hasattr(msg, 'buttons') and msg.buttons: for row in msg.buttons: for btn in row: if hasattr(btn, 'text') and 'confirm' in btn.text.lower(): await btn.click() return True return False except PhoneNumberBannedError: logger.warning("点击确认时发现手机号被封禁") return False except Exception as e: logger.error(f"点击确认失败: {str(e)}") return False async def check_bot_name(self, bot_username: str) -> bool: """检查是否已经打开过指定bot""" try: async for dialog in self.client.iter_dialogs(): try: if hasattr(dialog.entity, 'usernames'): for username in dialog.entity.usernames: if username.username == bot_username: logger.info(f'已与机器人交互: @{bot_username}') return True except: pass if dialog.is_user and dialog.entity.username == bot_username: logger.info(f'已与机器人交互: @{bot_username}') return True logger.info(f'尚未与机器人交互: @{bot_username}') return False except Exception as e: logger.error(f"检查bot名称失败: {str(e)}") return False async def join_bot(self, bot_name: str, invite_code: str = "") -> bool: """加入指定的bot""" try: bot = await self.client.get_entity(bot_name) if invite_code: await self.client(StartBotRequest( bot=bot_name, peer=bot_name, start_param=invite_code )) else: await self.client.send_message(bot, '/start') return True except Exception as e: logger.error(f"加入bot失败: {str(e)}") return False async def get_bot_token(self, bot_name: str, invite_code: Optional[str] = None, platform: Optional[str] = None, start_params: Optional[str] = None, url: Optional[str] = None) -> str: """获取bot的webview token""" platform = platform or random.choice(['web', 'tdesktop']) try: if start_params: me = await self.client.get_me() logger.info(f'账号 {self.server_config.phone} -- {me.first_name} 登录成功!') res = await self.client(RequestAppWebViewRequest( 'me', InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params), platform="web", start_param=invite_code )) else: bot = await self.client.get_entity(bot_name) peer = InputPeerUser(bot.id, bot.access_hash) res = await self.client( RequestWebViewRequest( peer=peer, bot=bot, platform=platform, from_bot_menu=True, url=url, ) ) if not res or not res.url: raise ValueError("从Telegram获取的响应或URL为空") parts = res.url.split('tgWebAppData=') if len(parts) < 2: raise ValueError("URL格式无效 - 缺少tgWebAppData参数") token_part = parts[1].split('&tgWebAppVersion')[0] return unquote(token_part) except Exception as e: logger.error(f"电话号码:{self.server_config.phone},Token生成失败: {str(e)}") raise async def send_click_bot_message(self, bot_name: str, datas: List[Dict[str, Any]]) -> None: """与bot交互,发送消息或点击按钮""" bot = await self.client.get_entity(bot_name) for data in datas: if data.get('send_message'): for message in data['send_message']: await self.client.send_message(bot, message) logger.info(f'电话号码:{self.server_config.phone},发送消息:{message}成功!') await asyncio.sleep(1) break if data.get('click_button'): for button_name in data["click_button"]: messages = await self.client.get_messages(bot, limit=5) if not messages: logger.warning("未找到验证消息") continue for message in messages: if message.buttons: for row in message.buttons: for button in row: if button_name.lower() in button.text.lower(): try: await button.click() logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!') break except Exception as e: logger.error(f"点击按钮失败: {str(e)}") async def get_bot_message(self, bot_name: str) -> str: """获取bot最新消息""" try: bot = await self.client.get_entity(bot_name) messages = await self.client.get_messages(bot, limit=1) for message in messages: return message.message return "" except Exception as e: logger.error(f"获取bot消息失败: {str(e)}") return "" async def join_channels(self, channel_names: List[str]) -> bool: """加入多个频道""" try: for channel_name in channel_names: await self.client(JoinChannelRequest(channel_name)) logger.info(f"成功加入频道: {channel_name}") return True except Exception as e: logger.error(f"加入频道失败: {str(e)}") return False # ================== 依赖注入 ================== session_manager = SessionManager() async def get_device(phone: str) -> TgPhoneDevices: """获取设备信息的依赖函数""" device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone) if not device: raise HTTPException(status_code=404, detail="设备未注册") return device async def acquire_session_lock(phone: str) -> bool: """获取会话锁的依赖函数""" if not await session_manager.acquire_session(phone): raise HTTPException(status_code=429, detail="该号码正在被其他请求使用,请稍后重试") return True # ================== FastAPI应用 ================== @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时 session_manager._cleanup_task = asyncio.create_task(session_manager.start_cleanup_task()) logger.info("应用启动完成") yield # 关闭时 if session_manager._cleanup_task: session_manager._cleanup_task.cancel() try: await session_manager._cleanup_task except asyncio.CancelledError: pass logger.info("应用关闭完成") app = FastAPI( title="Telegram API Service", description="Telegram机器人交互API服务", version="2.0.0", lifespan=lifespan ) # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ================== 路由层 ================== @app.get("/api/check_phone", response_model=ApiResponse) async def check_phone( phone: str, device: TgPhoneDevices = Depends(get_device), _: bool = Depends(acquire_session_lock) ): """检查手机号状态接口""" try: async with TelegramService(device) as service: code = await service.process_verification() if code: return ApiResponse( code=200, message="验证码获取成功", data={"verification_code": code} ) return ApiResponse( code=422, message="验证码解析失败" ) except PhoneNumberBannedError: raise HTTPException(status_code=403, detail="账号已被封禁") except Exception as e: logger.error(f"服务异常: {str(e)}") raise HTTPException(status_code=500, detail="服务器内部错误") finally: session_manager.release_session(phone) @app.get("/api/click_confirm", response_model=ApiResponse) async def click_confirm( phone: str, device: TgPhoneDevices = Depends(get_device), _: bool = Depends(acquire_session_lock) ): """点击确认按钮接口""" try: async with TelegramService(device) as service: result = await service.click_confirmation() if result: return ApiResponse( code=200, message="确认操作成功" ) return ApiResponse( code=404, message="未找到确认按钮" ) except PhoneNumberBannedError: raise HTTPException(status_code=403, detail="账号已被封禁") except Exception as e: logger.error(f"确认异常: {str(e)}") raise HTTPException(status_code=500, detail="服务器内部错误") finally: session_manager.release_session(phone) @app.post("/api/get_token", response_model=ApiResponse) async def get_token( request: TokenRequest, ): """获取bot token接口""" try: device = await get_device(request.phone) await acquire_session_lock(request.phone) async with TelegramService(device) as service: # 检查并加入bot if not await service.check_bot_name(bot_username=request.bot_name): success = await service.join_bot( bot_name=request.bot_name, invite_code=request.invite_code or "" ) if not success: raise HTTPException(status_code=500, detail="加入bot失败") # 获取token token = await service.get_bot_token( bot_name=request.bot_name, invite_code=request.invite_code, platform=request.platform, start_params=request.start_params, url=request.url ) return ApiResponse( code=200, message="Token获取成功", data=token ) except PhoneNumberBannedError: raise HTTPException(status_code=403, detail="账号已被封禁") except Exception as e: logger.error(f"Token生成失败: {str(e)}") raise HTTPException(status_code=500, detail=f"Token生成失败: {str(e)}") finally: session_manager.release_session(request.phone) @app.post("/api/send_click", response_model=ApiResponse) async def send_click( request: BotInteractionRequest, ): """与bot交互,发送消息或点击按钮""" try: device = await get_device(request.phone) await acquire_session_lock(request.phone) async with TelegramService(device) as service: # 检查并加入bot if not await service.check_bot_name(bot_username=request.bot_name): success = await service.join_bot( bot_name=request.bot_name, invite_code=request.invite_code or "" ) if not success: raise HTTPException(status_code=500, detail="加入bot失败") # 执行交互操作 await service.send_click_bot_message( bot_name=request.bot_name, datas=request.datas ) return ApiResponse( code=200, message="交互操作成功" ) except PhoneNumberBannedError: raise HTTPException(status_code=403, detail="账号已被封禁") except Exception as e: logger.error(f"交互操作失败: {str(e)}") raise HTTPException(status_code=500, detail=f"交互操作失败: {str(e)}") finally: session_manager.release_session(request.phone) @app.post("/api/get_bot_message", response_model=ApiResponse) async def get_bot_message( request: BotMessageRequest, ): """获取bot最新消息""" try: device = await get_device(request.phone) await acquire_session_lock(request.phone) async with TelegramService(device) as service: # 检查并加入bot if not await service.check_bot_name(bot_username=request.bot_name): success = await service.join_bot( bot_name=request.bot_name, invite_code=request.invite_code or "" ) if not success: raise HTTPException(status_code=500, detail="加入bot失败") # 获取消息 message = await service.get_bot_message(bot_name=request.bot_name) return ApiResponse( code=200, message="获取消息成功", data=message ) except PhoneNumberBannedError: raise HTTPException(status_code=403, detail="账号已被封禁") except Exception as e: logger.error(f"获取消息失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取消息失败: {str(e)}") finally: session_manager.release_session(request.phone) @app.post("/api/join_channels", response_model=ApiResponse) async def join_channels( request: JoinChannelRequestModel, ): """加入多个频道""" try: device = await get_device(request.phone) await acquire_session_lock(request.phone) async with TelegramService(device) as service: success = await service.join_channels(request.channel_names) if success: return ApiResponse( code=200, message="加入频道成功" ) else: return ApiResponse( code=500, message="加入频道失败" ) except PhoneNumberBannedError: raise HTTPException(status_code=403, detail="账号已被封禁") except Exception as e: logger.error(f"加入频道失败: {str(e)}") raise HTTPException(status_code=500, detail=f"加入频道失败: {str(e)}") finally: session_manager.release_session(request.phone) # ================== 健康检查 ================== @app.get("/health") async def health_check(): """健康检查接口""" return {"status": "healthy", "timestamp": time.time()} if __name__ == '__main__': import uvicorn uvicorn.run( "app:app", host="0.0.0.0", port=9000, reload=True, log_level="info" )