Files
tg_code/接口/session优化.py
Administrator 37e9d4038c gfregfregfr
2025-11-12 12:50:39 +08:00

735 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import re
import random
import time
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from enum import Enum
from loguru import logger
from urllib.parse import unquote
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from telethon import TelegramClient
from telethon.errors import PhoneNumberBannedError
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.types import InputBotAppShortName, InputPeerUser
from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest
from datetime import datetime
import pymysql
from peewee import (
Model,
AutoField,
CharField,
IntegerField,
DateTimeField,
)
from peewee import MySQLDatabase
# 安装 pymysql 作为 MySQLdb
pymysql.install_as_MySQLdb()
# 数据库配置
db_config = {
'database': 'hx',
'user': 'hx',
'password': 'haxi@123456',
'host': '192.168.11.30',
'port': 3306,
}
# 全局数据库实例
db = MySQLDatabase(
db_config['database'],
user=db_config['user'],
password=db_config['password'],
host=db_config['host'],
port=db_config['port'],
)
class TgPhoneDevices(Model):
id = AutoField() # 自动递增的主键
area_code = CharField(null=True, max_length=255, help_text='电话号码区号') # 区号
phone_number = CharField(null=True, max_length=255, help_text='电话号码') # 电话号码
device_model = CharField(null=True, max_length=255, help_text='设备型号') # 设备型号
system_version = CharField(null=True, max_length=255, help_text='系统版本') # 系统版本
app_version = CharField(null=True, max_length=255, help_text='应用版本') # 应用版本
lang_code = CharField(null=True, max_length=255, help_text='语言代码') # 语言代码
system_lang_code = CharField(null=True, max_length=255, help_text='系统语言代码') # 系统语言代码
is_valid_session = IntegerField(null=True, help_text='Session 状态') # Session 状态
api_id = IntegerField(null=True, help_text='API ID') # API ID
kick_status = IntegerField(null=True, help_text='API ID') # API ID
api_hash = CharField(null=True, max_length=255, help_text='API Hash') # API Hash
phone = CharField(null=True, max_length=255, help_text='完整电话号码') # 电话号码
create_time = DateTimeField(default=datetime.now, help_text='记录创建时间')
code = CharField(null=True, ) # 电话号码
to_code = CharField(null=True, ) # 电话号码
server_ip = CharField(null=True, ) # 电话号码
proxy_type = CharField(null=True, ) # 电话号码
addr = CharField(null=True, ) # 电话号码
port = CharField(null=True, ) # 电话号码
user = CharField(null=True, ) # 电话号码
pwd = CharField(null=True, ) # 电话号码
device_start = IntegerField(null=True, ) # API ID
class Meta:
database = db # 指定数据库
table_name = 'tg_phone_devices' # 指定表名称
# ================== 配置层 ==================
class Config:
"""应用配置类"""
SECRET_KEY = 'your-secret-key'
TG_SESSION_PATH = r"C:\sessions"
PROXY_CONFIG = {
'proxy_type': 'http',
'addr': '192.168.50.220',
'port': 10811,
'username': '',
'password': ''
}
MAX_CONCURRENT_SESSIONS = 1
# ================== 数据模型层 ==================
class SessionStatus(Enum):
"""会话状态枚举"""
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
@dataclass
class SessionInfo:
"""会话信息数据类"""
phone: str
status: SessionStatus
lock: asyncio.Lock
last_used: float
class VerificationRequest(BaseModel):
"""验证码请求模型"""
phone: str = Field(..., description="手机号码")
class TokenRequest(BaseModel):
"""Token请求模型"""
phone: str = Field(..., description="手机号码")
bot_name: str = Field(..., description="Bot用户名")
invite_code: Optional[str] = Field(None, description="邀请码")
platform: Optional[str] = Field(None, description="平台类型")
start_params: Optional[str] = Field(None, description="启动参数")
url: Optional[str] = Field(None, description="自定义URL")
class BotInteractionRequest(BaseModel):
"""Bot交互请求模型"""
phone: str = Field(..., description="手机号码")
bot_name: str = Field(..., description="Bot用户名")
invite_code: Optional[str] = Field(None, description="邀请码")
datas: List[Dict[str, Any]] = Field(..., description="交互数据")
class BotMessageRequest(BaseModel):
"""Bot消息请求模型"""
phone: str = Field(..., description="手机号码")
bot_name: str = Field(..., description="Bot用户名")
invite_code: Optional[str] = Field(None, description="邀请码")
class JoinChannelRequestModel(BaseModel):
"""加入频道请求模型"""
phone: str = Field(..., description="手机号码")
channel_names: List[str] = Field(..., description="频道名称列表")
class ApiResponse(BaseModel):
"""统一API响应模型"""
code: int = Field(200, description="状态码")
message: str = Field("操作成功", description="响应消息")
data: Optional[Any] = Field(None, description="响应数据")
errors: Optional[List[str]] = Field(None, description="错误列表")
# ================== 会话管理器 ==================
class SessionManager:
"""会话管理器,控制同一号码的并发访问"""
def __init__(self):
self._sessions: Dict[str, SessionInfo] = {}
self._cleanup_task: Optional[asyncio.Task] = None
async def get_session(self, phone: str) -> SessionInfo:
"""获取或创建会话信息"""
if phone not in self._sessions:
self._sessions[phone] = SessionInfo(
phone=phone,
status=SessionStatus.IDLE,
lock=asyncio.Lock(),
last_used=time.time()
)
return self._sessions[phone]
async def acquire_session(self, phone: str, timeout: float = 30.0) -> bool:
"""获取会话锁"""
session = await self.get_session(phone)
try:
await asyncio.wait_for(session.lock.acquire(), timeout=timeout)
session.status = SessionStatus.BUSY
session.last_used = time.time()
return True
except asyncio.TimeoutError:
logger.warning(f"获取会话锁超时: {phone}")
return False
def release_session(self, phone: str):
"""释放会话锁"""
if phone in self._sessions:
session = self._sessions[phone]
session.status = SessionStatus.IDLE
session.last_used = time.time()
if session.lock.locked():
session.lock.release()
async def cleanup_expired_sessions(self):
"""清理过期会话"""
current_time = time.time()
expired_phones = []
for phone, session in self._sessions.items():
if current_time - session.last_used > 300: # 5分钟过期
expired_phones.append(phone)
for phone in expired_phones:
self.release_session(phone)
del self._sessions[phone]
logger.info(f"清理过期会话: {phone}")
async def start_cleanup_task(self):
"""启动清理任务"""
while True:
await asyncio.sleep(60) # 每分钟清理一次
await self.cleanup_expired_sessions()
# ================== 服务层 ==================
class TelegramService:
"""Telegram客户端服务类"""
def __init__(self, server_config):
self.server_config = server_config
self.client: Optional[TelegramClient] = None
async def __aenter__(self):
"""异步上下文管理器入口"""
try:
self.client = await self._create_client()
await self.client.connect()
if not await self.client.is_user_authorized():
self.server_config.is_valid_session = 0
self.server_config.save(only=[TgPhoneDevices.is_valid_session])
raise Exception("用户未授权")
return self
except PhoneNumberBannedError:
logger.error(f"电话号码{self.server_config.phone}被封禁")
self.server_config.is_valid_session = -1
self.server_config.save(only=[TgPhoneDevices.is_valid_session])
raise
except Exception as e:
logger.error(f"客户端连接失败: {str(e)}")
if "Connection to Telegram failed" in str(e):
logger.error(f"电话号码{self.server_config.phone}代理失效")
self.server_config.is_valid_session = 4
self.server_config.save(only=[TgPhoneDevices.is_valid_session])
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
try:
if self.client and self.client.is_connected():
await self.client.disconnect()
except Exception as e:
logger.error(f"客户端断开连接失败: {str(e)}")
async def _create_client(self) -> TelegramClient:
"""创建Telegram客户端实例"""
proxy = {
'proxy_type': self.server_config.proxy_type,
'addr': self.server_config.addr,
'port': int(self.server_config.port),
'username': self.server_config.user or "",
'password': self.server_config.pwd or ""
}
return TelegramClient(
session=fr"{Config.TG_SESSION_PATH}\{self.server_config.phone}",
api_id=self.server_config.api_id,
api_hash=self.server_config.api_hash,
device_model=self.server_config.device_model,
system_version=self.server_config.system_version,
app_version=self.server_config.app_version,
system_lang_code=self.server_config.system_lang_code,
lang_code=self.server_config.lang_code,
proxy=proxy,
auto_reconnect=False,
)
@staticmethod
def extract_verification_code(text: str) -> Optional[str]:
"""从文本中提取5位数字验证码"""
try:
matches = re.findall(r'\b\d{5}\b', text)
return matches[0] if matches else None
except (IndexError, TypeError, AttributeError) as e:
logger.warning(f"验证码提取失败: {str(e)}")
return None
async def process_verification(self) -> Optional[str]:
"""处理验证流程,获取验证码"""
try:
messages = await self.client.get_messages(777000, limit=1)
if messages and messages[0].text:
return self.extract_verification_code(messages[0].text)
return None
except Exception as e:
logger.error(f"验证流程处理失败: {str(e)}")
return None
async def click_confirmation(self) -> bool:
"""点击确认按钮"""
try:
messages = await self.client.get_messages(777000, limit=5)
for msg in messages:
if hasattr(msg, 'buttons') and msg.buttons:
for row in msg.buttons:
for btn in row:
if hasattr(btn, 'text') and 'confirm' in btn.text.lower():
await btn.click()
return True
return False
except PhoneNumberBannedError:
logger.warning("点击确认时发现手机号被封禁")
return False
except Exception as e:
logger.error(f"点击确认失败: {str(e)}")
return False
async def check_bot_name(self, bot_username: str) -> bool:
"""检查是否已经打开过指定bot"""
try:
async for dialog in self.client.iter_dialogs():
try:
if hasattr(dialog.entity, 'usernames'):
for username in dialog.entity.usernames:
if username.username == bot_username:
logger.info(f'已与机器人交互: @{bot_username}')
return True
except:
pass
if dialog.is_user and dialog.entity.username == bot_username:
logger.info(f'已与机器人交互: @{bot_username}')
return True
logger.info(f'尚未与机器人交互: @{bot_username}')
return False
except Exception as e:
logger.error(f"检查bot名称失败: {str(e)}")
return False
async def join_bot(self, bot_name: str, invite_code: str = "") -> bool:
"""加入指定的bot"""
try:
bot = await self.client.get_entity(bot_name)
if invite_code:
await self.client(StartBotRequest(
bot=bot_name,
peer=bot_name,
start_param=invite_code
))
else:
await self.client.send_message(bot, '/start')
return True
except Exception as e:
logger.error(f"加入bot失败: {str(e)}")
return False
async def get_bot_token(self, bot_name: str, invite_code: Optional[str] = None,
platform: Optional[str] = None, start_params: Optional[str] = None,
url: Optional[str] = None) -> str:
"""获取bot的webview token"""
platform = platform or random.choice(['web', 'tdesktop'])
try:
if start_params:
me = await self.client.get_me()
logger.info(f'账号 {self.server_config.phone} -- {me.first_name} 登录成功!')
res = await self.client(RequestAppWebViewRequest(
'me',
InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params),
platform="web",
start_param=invite_code
))
else:
bot = await self.client.get_entity(bot_name)
peer = InputPeerUser(bot.id, bot.access_hash)
res = await self.client(
RequestWebViewRequest(
peer=peer,
bot=bot,
platform=platform,
from_bot_menu=True,
url=url,
)
)
if not res or not res.url:
raise ValueError("从Telegram获取的响应或URL为空")
parts = res.url.split('tgWebAppData=')
if len(parts) < 2:
raise ValueError("URL格式无效 - 缺少tgWebAppData参数")
token_part = parts[1].split('&tgWebAppVersion')[0]
return unquote(token_part)
except Exception as e:
logger.error(f"电话号码:{self.server_config.phone}Token生成失败: {str(e)}")
raise
async def send_click_bot_message(self, bot_name: str, datas: List[Dict[str, Any]]) -> None:
"""与bot交互发送消息或点击按钮"""
bot = await self.client.get_entity(bot_name)
for data in datas:
if data.get('send_message'):
for message in data['send_message']:
await self.client.send_message(bot, message)
logger.info(f'电话号码:{self.server_config.phone},发送消息:{message}成功!')
await asyncio.sleep(1)
break
if data.get('click_button'):
for button_name in data["click_button"]:
messages = await self.client.get_messages(bot, limit=5)
if not messages:
logger.warning("未找到验证消息")
continue
for message in messages:
if message.buttons:
for row in message.buttons:
for button in row:
if button_name.lower() in button.text.lower():
try:
await button.click()
logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!')
break
except Exception as e:
logger.error(f"点击按钮失败: {str(e)}")
async def get_bot_message(self, bot_name: str) -> str:
"""获取bot最新消息"""
try:
bot = await self.client.get_entity(bot_name)
messages = await self.client.get_messages(bot, limit=1)
for message in messages:
return message.message
return ""
except Exception as e:
logger.error(f"获取bot消息失败: {str(e)}")
return ""
async def join_channels(self, channel_names: List[str]) -> bool:
"""加入多个频道"""
try:
for channel_name in channel_names:
await self.client(JoinChannelRequest(channel_name))
logger.info(f"成功加入频道: {channel_name}")
return True
except Exception as e:
logger.error(f"加入频道失败: {str(e)}")
return False
# ================== 依赖注入 ==================
session_manager = SessionManager()
async def get_device(phone: str) -> TgPhoneDevices:
"""获取设备信息的依赖函数"""
device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone)
if not device:
raise HTTPException(status_code=404, detail="设备未注册")
return device
async def acquire_session_lock(phone: str) -> bool:
"""获取会话锁的依赖函数"""
if not await session_manager.acquire_session(phone):
raise HTTPException(status_code=429, detail="该号码正在被其他请求使用,请稍后重试")
return True
# ================== FastAPI应用 ==================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时
session_manager._cleanup_task = asyncio.create_task(session_manager.start_cleanup_task())
logger.info("应用启动完成")
yield
# 关闭时
if session_manager._cleanup_task:
session_manager._cleanup_task.cancel()
try:
await session_manager._cleanup_task
except asyncio.CancelledError:
pass
logger.info("应用关闭完成")
app = FastAPI(
title="Telegram API Service",
description="Telegram机器人交互API服务",
version="2.0.0",
lifespan=lifespan
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ================== 路由层 ==================
@app.get("/api/check_phone", response_model=ApiResponse)
async def check_phone(
phone: str,
device: TgPhoneDevices = Depends(get_device),
_: bool = Depends(acquire_session_lock)
):
"""检查手机号状态接口"""
try:
async with TelegramService(device) as service:
code = await service.process_verification()
if code:
return ApiResponse(
code=200,
message="验证码获取成功",
data={"verification_code": code}
)
return ApiResponse(
code=422,
message="验证码解析失败"
)
except PhoneNumberBannedError:
raise HTTPException(status_code=403, detail="账号已被封禁")
except Exception as e:
logger.error(f"服务异常: {str(e)}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
session_manager.release_session(phone)
@app.get("/api/click_confirm", response_model=ApiResponse)
async def click_confirm(
phone: str,
device: TgPhoneDevices = Depends(get_device),
_: bool = Depends(acquire_session_lock)
):
"""点击确认按钮接口"""
try:
async with TelegramService(device) as service:
result = await service.click_confirmation()
if result:
return ApiResponse(
code=200,
message="确认操作成功"
)
return ApiResponse(
code=404,
message="未找到确认按钮"
)
except PhoneNumberBannedError:
raise HTTPException(status_code=403, detail="账号已被封禁")
except Exception as e:
logger.error(f"确认异常: {str(e)}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
session_manager.release_session(phone)
@app.post("/api/get_token", response_model=ApiResponse)
async def get_token(
request: TokenRequest,
):
"""获取bot token接口"""
try:
device = await get_device(request.phone)
await acquire_session_lock(request.phone)
async with TelegramService(device) as service:
# 检查并加入bot
if not await service.check_bot_name(bot_username=request.bot_name):
success = await service.join_bot(
bot_name=request.bot_name,
invite_code=request.invite_code or ""
)
if not success:
raise HTTPException(status_code=500, detail="加入bot失败")
# 获取token
token = await service.get_bot_token(
bot_name=request.bot_name,
invite_code=request.invite_code,
platform=request.platform,
start_params=request.start_params,
url=request.url
)
return ApiResponse(
code=200,
message="Token获取成功",
data=token
)
except PhoneNumberBannedError:
raise HTTPException(status_code=403, detail="账号已被封禁")
except Exception as e:
logger.error(f"Token生成失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"Token生成失败: {str(e)}")
finally:
session_manager.release_session(request.phone)
@app.post("/api/send_click", response_model=ApiResponse)
async def send_click(
request: BotInteractionRequest,
):
"""与bot交互发送消息或点击按钮"""
try:
device = await get_device(request.phone)
await acquire_session_lock(request.phone)
async with TelegramService(device) as service:
# 检查并加入bot
if not await service.check_bot_name(bot_username=request.bot_name):
success = await service.join_bot(
bot_name=request.bot_name,
invite_code=request.invite_code or ""
)
if not success:
raise HTTPException(status_code=500, detail="加入bot失败")
# 执行交互操作
await service.send_click_bot_message(
bot_name=request.bot_name,
datas=request.datas
)
return ApiResponse(
code=200,
message="交互操作成功"
)
except PhoneNumberBannedError:
raise HTTPException(status_code=403, detail="账号已被封禁")
except Exception as e:
logger.error(f"交互操作失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"交互操作失败: {str(e)}")
finally:
session_manager.release_session(request.phone)
@app.post("/api/get_bot_message", response_model=ApiResponse)
async def get_bot_message(
request: BotMessageRequest,
):
"""获取bot最新消息"""
try:
device = await get_device(request.phone)
await acquire_session_lock(request.phone)
async with TelegramService(device) as service:
# 检查并加入bot
if not await service.check_bot_name(bot_username=request.bot_name):
success = await service.join_bot(
bot_name=request.bot_name,
invite_code=request.invite_code or ""
)
if not success:
raise HTTPException(status_code=500, detail="加入bot失败")
# 获取消息
message = await service.get_bot_message(bot_name=request.bot_name)
return ApiResponse(
code=200,
message="获取消息成功",
data=message
)
except PhoneNumberBannedError:
raise HTTPException(status_code=403, detail="账号已被封禁")
except Exception as e:
logger.error(f"获取消息失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取消息失败: {str(e)}")
finally:
session_manager.release_session(request.phone)
@app.post("/api/join_channels", response_model=ApiResponse)
async def join_channels(
request: JoinChannelRequestModel,
):
"""加入多个频道"""
try:
device = await get_device(request.phone)
await acquire_session_lock(request.phone)
async with TelegramService(device) as service:
success = await service.join_channels(request.channel_names)
if success:
return ApiResponse(
code=200,
message="加入频道成功"
)
else:
return ApiResponse(
code=500,
message="加入频道失败"
)
except PhoneNumberBannedError:
raise HTTPException(status_code=403, detail="账号已被封禁")
except Exception as e:
logger.error(f"加入频道失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"加入频道失败: {str(e)}")
finally:
session_manager.release_session(request.phone)
# ================== 健康检查 ==================
@app.get("/health")
async def health_check():
"""健康检查接口"""
return {"status": "healthy", "timestamp": time.time()}
if __name__ == '__main__':
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=9000,
reload=True,
log_level="info"
)