import asyncio import re import random import time from loguru import logger from urllib.parse import unquote from telethon import TelegramClient from telethon.errors import PhoneNumberBannedError from telethon.tl.functions.channels import JoinChannelRequest from telethon.tl.types import InputBotAppShortName, InputPeerUser from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest from models.tg_phone_devices import TgPhoneDevices1 # ================== 服务层 ================== class TelegramService: """Telegram客户端服务类,封装所有Telegram相关操作""" def __init__(self, server_config): """初始化服务实例 Args: server_config: 服务器配置对象 """ self.server_config = server_config self.client = None # Telegram客户端实例 async def __aenter__(self): """异步上下文管理器入口,用于连接客户端""" try: self.client = await self._create_client() # 创建客户端实例 try: await self.client.connect() # 建立连接 except Exception as e: error_msg = str(e) logger.error(f"账号 {self.server_config.phone} 连接失败: {error_msg}") # 处理连接失败错误 if "Connection to Telegram failed 5 time(s)" in error_msg: logger.error(f"账号 {self.server_config.phone} 代理失效") if hasattr(self.server_config, 'is_valid_session'): if hasattr(self.server_config, 'save'): self.server_config.save(only=[TgPhoneDevices1.is_valid_session]) # 重新抛出异常,而不是返回False raise return self except Exception as e: logger.error(f"账号 {self.server_config['session']} 客户端连接失败: {str(e)}") raise async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口,用于断开连接""" try: if self.client and self.client.is_connected(): await self.client.disconnect() # 断开连接 except Exception as e: logger.error(f"客户端断开连接失败: {str(e)}") async def _create_client(self): """创建Telegram客户端实例 Returns: TelegramClient: 配置好的Telegram客户端实例 """ # 代理配置 proxy = { 'proxy_type': self.server_config["proxy_type"], # 代理类型 'addr': self.server_config["addr"], # 代理地址 'port': self.server_config["port"], # 代理端口 'username': self.server_config["username"] if self.server_config["username"] else "", # 代理用户名 'password': self.server_config["password"] if self.server_config["password"] else "" # 代理密码 } # 创建并返回Telegram客户端实例 return TelegramClient( session=self.server_config["session"], # 会话文件路径 api_id=self.server_config["api_id"], # API ID api_hash=self.server_config["api_hash"], # API Hash device_model=self.server_config["device_model"], # 设备型号 system_version=self.server_config["system_version"], # 系统版本 app_version=self.server_config["app_version"], # 应用版本 system_lang_code=self.server_config["system_lang_code"], # 系统语言代码 lang_code=self.server_config["lang_code"], # 应用语言代码 proxy=proxy, # 仅当配置了代理地址时使用代理 auto_reconnect=False, ) @staticmethod def extract_verification_code(text): """从文本中提取5位数字验证码 Args: text: 包含验证码的文本 Returns: str: 提取到的验证码,如果提取失败返回None """ try: matches = re.findall(r'\d{5}', text) # 使用单词边界匹配5位数字 return matches[0] if matches else None except (IndexError, TypeError, AttributeError) as e: try: matches = re.findall(r'\d{6}', text) # 使用单词边界匹配5位数字 return matches[0] if matches else None except (IndexError, TypeError, AttributeError) as e: logger.warning(f"验证码提取失败: {str(e)}") async def process_verification(self): """处理验证流程,获取验证码 Returns: str: 验证码,如果获取失败返回None """ try: messages = await self.client.get_messages(777000, limit=1) # 获取来自Telegram的最新消息 if messages and messages[0].text: return self.extract_verification_code(messages[0].text) # 提取验证码 return None except Exception as e: logger.error(f"验证流程处理失败: {str(e)}") return None async def click_confirmation(self): """点击确认按钮 Returns: bool: 是否成功点击确认按钮 """ try: messages = await self.client.get_messages(777000, limit=5) # 获取最多5条消息 for msg in messages: if hasattr(msg, 'buttons') and msg.buttons: # 检查消息是否有按钮 for row in msg.buttons: # 遍历按钮行 for btn in row: # 遍历每个按钮 if hasattr(btn, 'text') and 'confirm' in btn.text.lower(): # 查找确认按钮 await btn.click() # 点击按钮 return True return False except PhoneNumberBannedError: logger.warning("点击确认时发现手机号被封禁") return False except Exception as e: logger.error(f"点击确认失败: {str(e)}") return False async def check_bot_name(self, bot_username): """检查是否已经打开过指定bot Args: bot_username: bot的用户名 Returns: bool: 是否已经打开过该bot """ try: async for dialog in self.client.iter_dialogs(): try: for i in dialog.entity.usernames: if i.username == bot_username: logger.info(f'已与机器人交互: @{bot_username}') return True except: pass if dialog.is_user and dialog.entity.username == bot_username: logger.info(f'已与机器人交互: @{bot_username}') return True logger.info(f'尚未与机器人交互: @{bot_username}') return False except Exception as e: logger.error(f"检查bot名称失败: {str(e)}") return False async def join_bot(self, bot_name, invite_code=""): """加入指定的bot Args: bot_name: bot的用户名 invite_code: 邀请码(可选) Returns: bool: 是否成功加入bot """ try: bot = await self.client.get_entity(bot_name) # 获取bot实体 if invite_code: # 使用邀请码启动bot await self.client(StartBotRequest( bot=bot_name, peer=bot_name, start_param=invite_code )) else: # 直接发送/start命令 await self.client.send_message(bot, '/start') return True except Exception as e: logger.error(f"加入bot失败: {str(e)}") return False async def get_bot_token(self, bot_name, invite_code=None, platform=None, start_params=None, url=None): """获取bot的webview token Args: bot_name: bot的用户名 invite_code: 邀请码(可选) platform: 平台类型(可选) start_params: 启动参数(可选) url: 自定义URL(可选) Returns: str: 获取到的token Raises: Exception: 获取token失败时抛出异常 """ platform = platform or random.choice(['web', 'tdesktop']) # 默认随机选择平台 try: if start_params: me = await self.client.get_me() logger.info(f'账号 {8617882004254} -- {me.first_name} 登录成功!') # 使用AppWebView方式获取token res = await self.client(RequestAppWebViewRequest( 'me', InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params), platform="web", start_param=invite_code if invite_code else None )) else: # 使用普通WebView方式获取token bot = await self.client.get_entity(bot_name) peer = InputPeerUser(bot.id, bot.access_hash) res = await self.client( RequestWebViewRequest( peer=peer, bot=bot, platform=platform, from_bot_menu=True, url=url, ) ) if not res or not res.url: raise ValueError("从Telegram获取的响应或URL为空") # 从URL中解析token parts = res.url.split('tgWebAppData=') if len(parts) < 2: raise ValueError("URL格式无效 - 缺少tgWebAppData参数") token_part = parts[1].split('&tgWebAppVersion')[0] return unquote(token_part) # URL解码 except Exception as e: logger.error(f"电话号码:{self.server_config.phone},Token生成失败: {str(e)}") return str(e) async def send_click_bot_message(self, bot_name, datas): """ datas = [ {"send_message": [], "click_button": [""], }, ] """ bot = await self.client.get_entity(bot_name) for data in datas: if data['send_message']: for message in data['send_message']: await self.client.send_message(bot, message) logger.info(f'电话号码:{self.server_config.phone},发生消息:{message}成功!!!') time.sleep(1) break if data['click_button']: for button_name in data["click_button"]: messages = await self.client.get_messages(bot, limit=5) if not messages: print("未找到验证消息") return start = 0 # 遍历消息,寻找包含按钮的验证消息 for message in messages: if message.buttons: for row in message.buttons: for button in row: if button_name in button.text.lower(): try: await button.click() logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!!!') start = 1 except: pass if start == 1: break if start == 1: break if start == 1: break async def get_bot_message(self, bot_name): bot = await self.client.get_entity(bot_name) messages = await self.client.get_messages(bot, limit=1) # # 遍历消息,寻找包含按钮的验证消息 for message in messages: return message.message else: return "" async def join_channe(self, bot_name): for i in bot_name: try: await self.client(JoinChannelRequest(i)) except Exception as e: return False return True async def main(): # 查询需要检查的设备 devices = TgPhoneDevices1.select().where( # TgPhoneDevices1.device_start.is_null(), TgPhoneDevices1.is_valid_session == 0, ) for device in devices: device1 = { 'proxy_type': "http", 'addr': "192.168.1.79", 'port': device.port, 'username': "", 'password': "", 'session': fr"sessions\{device.phone}", 'api_id': 2040, 'api_hash': "b18441a1ff607e10a989891a5462e627", 'device_model': device.device_model, 'system_version': device.system_version, 'app_version': device.app_version, 'system_lang_code': device.system_lang_code, 'lang_code': device.lang_code, # 'auto_reconnect': "http", } async with TelegramService(device1) as service: if not await service.client.is_user_authorized(): logger.info("失败!!!") continue device.is_valid_session = 1 device.save() # # 操作session对象 # if await service.join_channe( # bot_name=["WebseaMalaysia"], # ): # logger.info("加入群成功!!!") # else: # logger.info("加入群失败!!!") if __name__ == '__main__': asyncio.run(main())