import re import random import time from loguru import logger from urllib.parse import unquote from flask import Flask, request, jsonify, Blueprint from telethon import TelegramClient from telethon.errors import PhoneNumberBannedError from telethon.tl.functions.channels import JoinChannelRequest from telethon.tl.types import InputBotAppShortName, InputPeerUser from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest from models.tg_phone_devices import TgPhoneDevices # ================== 配置层 ================== class Config: """应用配置类,包含所有全局配置参数""" SECRET_KEY = 'your-secret-key' # Flask应用密钥 TG_SESSION_PATH = r"C:\sessions" # Telegram会话文件存储路径 PROXY_CONFIG = { 'proxy_type': 'http', # 或 'socks4',具体看你的代理类型 'addr': '192.168.50.220', # 代理服务器地址 'port': 10811, # 代理服务器端口 'username': '', # 如果有用户名,填写 'password': '' # 如果有密码,填写 } # ================== 服务层 ================== class TelegramService: """Telegram客户端服务类,封装所有Telegram相关操作""" def __init__(self, server_config): """初始化服务实例 Args: server_config: 服务器配置对象 """ self.server_config = server_config self.client = None # Telegram客户端实例 async def __aenter__(self): """异步上下文管理器入口,用于连接客户端""" try: self.client = await self._create_client() # 创建客户端实例 try: await self.client.connect() # 建立连接 except: return False return self except Exception as e: logger.error(f"电话号码:{self.server_config.phone},客户端连接失败: {str(e)}") if str(e) == "Connection to Telegram failed 5 time(s)": logger.error(f"电话号码{self.server_config.phone},代理失效!!!") self.server_config.is_valid_session = 4 self.server_config.save(only=[TgPhoneDevices.is_valid_session]) raise async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口,用于断开连接""" try: if self.client and self.client.is_connected(): await self.client.disconnect() # 断开连接 except Exception as e: logger.error(f"客户端断开连接失败: {str(e)}") async def _create_client(self): """创建Telegram客户端实例 Returns: TelegramClient: 配置好的Telegram客户端实例 """ # 代理配置 proxy = { 'proxy_type': self.server_config.proxy_type, # 代理类型 'addr': "192.168.1.20", # 代理地址 'port': int(self.server_config.port), # 代理端口 'username': self.server_config.user if self.server_config.user else "", # 代理用户名 'password': self.server_config.pwd if self.server_config.pwd else "" # 代理密码 } # 创建并返回Telegram客户端实例 return TelegramClient( session=fr"sessions\{self.server_config.phone}", # 会话文件路径 api_id=self.server_config.api_id, # API ID api_hash=self.server_config.api_hash, # API Hash device_model=self.server_config.device_model, # 设备型号 system_version=self.server_config.system_version, # 系统版本 app_version=self.server_config.app_version, # 应用版本 system_lang_code=self.server_config.system_lang_code, # 系统语言代码 lang_code=self.server_config.lang_code, # 应用语言代码 proxy=proxy, # 仅当配置了代理地址时使用代理 auto_reconnect=False, ) @staticmethod def extract_verification_code(text): """从文本中提取5位数字验证码 Args: text: 包含验证码的文本 Returns: str: 提取到的验证码,如果提取失败返回None """ try: matches = re.findall(r'\d{5}', text) # 使用单词边界匹配5位数字 return matches[0] if matches else None except (IndexError, TypeError, AttributeError) as e: logger.warning(f"验证码提取失败: {str(e)}") return None async def process_verification(self): """处理验证流程,获取验证码 Returns: str: 验证码,如果获取失败返回None """ try: messages = await self.client.get_messages(777000, limit=1) # 获取来自Telegram的最新消息 if messages and messages[0].text: return self.extract_verification_code(messages[0].text) # 提取验证码 return None except Exception as e: logger.error(f"验证流程处理失败: {str(e)}") return None async def click_confirmation(self): """点击确认按钮 Returns: bool: 是否成功点击确认按钮 """ try: messages = await self.client.get_messages(777000, limit=5) # 获取最多5条消息 for msg in messages: if hasattr(msg, 'buttons') and msg.buttons: # 检查消息是否有按钮 for row in msg.buttons: # 遍历按钮行 for btn in row: # 遍历每个按钮 if hasattr(btn, 'text') and 'confirm' in btn.text.lower(): # 查找确认按钮 await btn.click() # 点击按钮 return True return False except PhoneNumberBannedError: logger.warning("点击确认时发现手机号被封禁") return False except Exception as e: logger.error(f"点击确认失败: {str(e)}") return False async def check_bot_name(self, bot_username): """检查是否已经打开过指定bot Args: bot_username: bot的用户名 Returns: bool: 是否已经打开过该bot """ try: async for dialog in self.client.iter_dialogs(): try: for i in dialog.entity.usernames: if i.username == bot_username: logger.info(f'已与机器人交互: @{bot_username}') return True except: pass if dialog.is_user and dialog.entity.username == bot_username: logger.info(f'已与机器人交互: @{bot_username}') return True logger.info(f'尚未与机器人交互: @{bot_username}') return False except Exception as e: logger.error(f"检查bot名称失败: {str(e)}") return False async def join_bot(self, bot_name, invite_code=""): """加入指定的bot Args: bot_name: bot的用户名 invite_code: 邀请码(可选) Returns: bool: 是否成功加入bot """ try: bot = await self.client.get_entity(bot_name) # 获取bot实体 if invite_code: # 使用邀请码启动bot await self.client(StartBotRequest( bot=bot_name, peer=bot_name, start_param=invite_code )) else: # 直接发送/start命令 await self.client.send_message(bot, '/start') return True except Exception as e: logger.error(f"加入bot失败: {str(e)}") return False async def get_bot_token(self, bot_name, invite_code=None, platform=None, start_params=None, url=None): """获取bot的webview token Args: bot_name: bot的用户名 invite_code: 邀请码(可选) platform: 平台类型(可选) start_params: 启动参数(可选) url: 自定义URL(可选) Returns: str: 获取到的token Raises: Exception: 获取token失败时抛出异常 """ platform = platform or random.choice(['web', 'tdesktop']) # 默认随机选择平台 try: if start_params: me = await self.client.get_me() logger.info(f'账号 {8617882004254} -- {me.first_name} 登录成功!') # 使用AppWebView方式获取token res = await self.client(RequestAppWebViewRequest( 'me', InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params), platform="web", start_param=invite_code if invite_code else None )) else: # 使用普通WebView方式获取token bot = await self.client.get_entity(bot_name) peer = InputPeerUser(bot.id, bot.access_hash) res = await self.client( RequestWebViewRequest( peer=peer, bot=bot, platform=platform, from_bot_menu=True, url=url, ) ) if not res or not res.url: raise ValueError("从Telegram获取的响应或URL为空") # 从URL中解析token parts = res.url.split('tgWebAppData=') if len(parts) < 2: raise ValueError("URL格式无效 - 缺少tgWebAppData参数") token_part = parts[1].split('&tgWebAppVersion')[0] return unquote(token_part) # URL解码 except Exception as e: logger.error(f"电话号码:{self.server_config.phone},Token生成失败: {str(e)}") return str(e) async def send_click_bot_message(self, bot_name, datas): """ datas = [ {"send_message": [], "click_button": [""], }, ] """ bot = await self.client.get_entity(bot_name) for data in datas: if data['send_message']: for message in data['send_message']: await self.client.send_message(bot, message) logger.info(f'电话号码:{self.server_config.phone},发生消息:{message}成功!!!') time.sleep(1) break if data['click_button']: for button_name in data["click_button"]: messages = await self.client.get_messages(bot, limit=5) if not messages: print("未找到验证消息") return start = 0 # 遍历消息,寻找包含按钮的验证消息 for message in messages: if message.buttons: for row in message.buttons: for button in row: if button_name in button.text.lower(): try: await button.click() logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!!!') start = 1 except: pass if start == 1: break if start == 1: break if start == 1: break async def get_bot_message(self, bot_name): bot = await self.client.get_entity(bot_name) messages = await self.client.get_messages(bot, limit=1) # # 遍历消息,寻找包含按钮的验证消息 for message in messages: return message.message else: return "" async def join_channe(self, bot_name): for i in bot_name: try: await self.client(JoinChannelRequest(i)) except Exception as e: return False return True # ================== 响应格式层 ================== class ApiResponse: """统一API响应格式类""" @staticmethod def success(data=None, message="操作成功", code=200): """成功响应 Args: data: 返回的数据 message: 成功消息 code: HTTP状态码 Returns: tuple: (json响应, HTTP状态码) """ response = { "code": code, "message": message, "data": data } return jsonify(response), code @staticmethod def error(message="请求失败", code=500, errors=None, details=None): """错误响应 Args: message: 错误消息 code: HTTP状态码 errors: 错误列表 details: 错误详情(调试用) Returns: tuple: (json响应, HTTP状态码) """ response = { "code": code, "message": message, "errors": errors or [], "details": details # 仅在调试模式下显示 } return jsonify(response), code # ================== 路由层 ================== app = Flask(__name__) api_bp = Blueprint('api', __name__, url_prefix='/api') # 创建API蓝图 # ================== 错误处理器 ================== @app.errorhandler(404) def handle_not_found(e): """404错误处理器""" return ApiResponse.error("资源未找到", code=404) @app.errorhandler(405) def handle_method_not_allowed(e): """405错误处理器""" return ApiResponse.error("方法不允许", code=405) @app.errorhandler(400) def handle_bad_request(e): """400错误处理器,特别处理JSON解析错误""" if "Failed to decode JSON object" in str(e): return ApiResponse.error( "无效的JSON格式 - 属性名必须使用双引号", code=400 ) return ApiResponse.error(str(e), code=400) @app.errorhandler(Exception) def handle_general_exception(e): """全局异常处理器""" logger.error(f"未处理的异常: {str(e)}") return ApiResponse.error( "服务器内部错误", code=500, details=str(e) if app.debug else None ) # ================== 业务逻辑层 ================== @api_bp.route('/check_phone', methods=['GET']) async def check_phone(): """检查手机号状态接口 返回: JSON响应: 包含验证码或错误信息 """ phone_num = request.args.get('phone') if not phone_num or not phone_num.strip(): return ApiResponse.error("必须提供手机号参数", code=400) try: device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone_num) if not device: return ApiResponse.error("设备未注册", code=404) async with TelegramService(device) as service: code = await service.process_verification() if code: return ApiResponse.success( {"verification_code": code}, "验证码获取成功" ) return ApiResponse.error("验证码解析失败", code=422) except PhoneNumberBannedError: logger.error(f"被封禁的账号: {phone_num}") return ApiResponse.error("账号已被封禁", code=403) except Exception as e: logger.error(f"服务异常: {str(e)}") return ApiResponse.error( "服务器内部错误", code=500, details=str(e) if app.debug else None ) @api_bp.route('/click_confirm', methods=['GET']) async def click_confirm(): """点击确认按钮接口 返回: JSON响应: 操作结果或错误信息 """ phone_num = request.args.get('phone') if not phone_num or not phone_num.strip(): return ApiResponse.error("必须提供手机号参数", code=400) try: device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone_num) if not device: return ApiResponse.error("设备未注册", code=404) async with TelegramService(device) as service: result = await service.click_confirmation() if result: return ApiResponse.success(None, "确认操作成功") return ApiResponse.error("未找到确认按钮", code=404) except PhoneNumberBannedError: return ApiResponse.error("账号已被封禁", code=403) except Exception as e: logger.error(f"确认异常: {str(e)}") return ApiResponse.error( "服务器内部错误", code=500, details=str(e) if app.debug else None ) @api_bp.route('/get_token', methods=['POST']) async def get_token(): """获取bot token接口 返回: JSON响应: 包含token或错误信息 """ # 检查请求是否为JSON格式 if not request.is_json: return ApiResponse.error("请求必须是JSON格式", code=400) try: params = request.get_json() # 解析JSON参数 if not params: return ApiResponse.error("空的JSON负载", code=400) except Exception as e: logger.error(f"JSON解析失败: {str(e)}") return ApiResponse.error("无效的JSON格式", code=400) # 验证必需参数 missing = [p for p in ['phone', 'bot_name'] if not params.get(p)] if missing: return ApiResponse.error( f"缺少必需参数: {', '.join(missing)}", code=400 ) device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone']) if not device: return ApiResponse.error("设备未注册", code=404) # elif device.is_valid_session != 1: # return ApiResponse.error("账号有问题", code=505) try: async with TelegramService(device) as service: if not service: return ApiResponse.error("失败!!!", code=500) if not await service.client.is_user_authorized(): device.is_valid_session = 0 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("授权失败!!!", code=500) # 检查并加入bot if not await service.check_bot_name(bot_username=params["bot_name"]): success = await service.join_bot( bot_name=params["bot_name"], invite_code=params.get("invite_code", "") ) if not success: return ApiResponse.error("加入bot失败", code=500) # 获取token token = await service.get_bot_token( bot_name=params['bot_name'], invite_code=params.get('invite_code'), platform=params.get('platform'), start_params=params.get('start_params'), url=params.get('url') ) return ApiResponse.success(token, "Token获取成功") except PhoneNumberBannedError: device.is_valid_session = -1 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("账号已被封禁", code=403) except Exception as e: logger.error(f"Token生成失败: {str(e)}") return ApiResponse.error( "Token生成失败", code=500, details=str(e) if app.debug else None ) @api_bp.route('/send_click', methods=['POST']) async def send_click(): """ 与bot交互,点击或发生消息 datas = [ {"message": [], "click_button":[""],} ] """ try: params = request.get_json() # 解析JSON参数 if not params: return ApiResponse.error("空的JSON负载", code=400) except Exception as e: logger.error(f"JSON解析失败: {str(e)}") return ApiResponse.error("无效的JSON格式", code=400) # 验证必需参数 missing = [p for p in ['phone', 'bot_name'] if not params.get(p)] if missing: return ApiResponse.error( f"缺少必需参数: {', '.join(missing)}", code=400 ) device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone']) if not device: return ApiResponse.error("设备未注册", code=404) try: async with TelegramService(device) as service: if not await service.client.is_user_authorized(): device.is_valid_session = 0 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("授权失败!!!", code=500) # 检查并加入bot if not await service.check_bot_name(bot_username=params["bot_name"]): success = await service.join_bot( bot_name=params["bot_name"], invite_code=params.get("invite_code", "") ) if not success: return ApiResponse.error("加入bot失败", code=500) # 操作session对象 token = await service.send_click_bot_message( bot_name=params['bot_name'], datas=params["datas"] ) return ApiResponse.success(token, "点击和发生成功!!!") except PhoneNumberBannedError: device.is_valid_session = -1 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("账号已被封禁", code=403) except Exception as e: logger.error(f"点击和发生失败: {str(e)}") return ApiResponse.error( "点击和发生失败", code=500, details=str(e) if app.debug else None ) @api_bp.route('/get_bot_message', methods=['POST']) async def get_bot_message(): try: params = request.get_json() # 解析JSON参数 if not params: return ApiResponse.error("空的JSON负载", code=400) except Exception as e: logger.error(f"JSON解析失败: {str(e)}") return ApiResponse.error("无效的JSON格式", code=400) # 验证必需参数 missing = [p for p in ['phone', 'bot_name'] if not params.get(p)] if missing: return ApiResponse.error( f"缺少必需参数: {', '.join(missing)}", code=400 ) device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone']) if not device: return ApiResponse.error("设备未注册", code=404) elif device.is_valid_session != 1: return ApiResponse.error("账号有问题", code=505) try: async with TelegramService(device) as service: if not await service.client.is_user_authorized(): device.is_valid_session = 0 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("授权失败!!!", code=500) # 检查并加入bot if not await service.check_bot_name(bot_username=params["bot_name"]): success = await service.join_bot( bot_name=params["bot_name"], invite_code=params.get("invite_code", "") ) if not success: return ApiResponse.error("加入bot失败", code=500) # 操作session对象 data = await service.get_bot_message( bot_name=params['bot_name'], ) return ApiResponse.success(data, "获取token成功!!!") except PhoneNumberBannedError: device.is_valid_session = -1 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("账号已被封禁", code=403) except Exception as e: logger.error(f"Token生成失败: {str(e)}") return ApiResponse.error( "Token生成失败", code=500, details=str(e) if app.debug else None ) @api_bp.route('/join_channe', methods=['POST']) async def join_channe(): try: params = request.get_json() # 解析JSON参数 if not params: return ApiResponse.error("空的JSON负载", code=400) except Exception as e: logger.error(f"JSON解析失败: {str(e)}") return ApiResponse.error("无效的JSON格式", code=400) # 验证必需参数 missing = [p for p in ['phone', 'bot_name'] if not params.get(p)] if missing: return ApiResponse.error( f"缺少必需参数: {', '.join(missing)}", code=400 ) device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone']) if not device: return ApiResponse.error("设备未注册", code=404) elif device.is_valid_session != 1: return ApiResponse.error("账号有问题", code=505) try: async with TelegramService(device) as service: if not await service.client.is_user_authorized(): device.is_valid_session = 0 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("授权失败!!!", code=500) # 操作session对象 if await service.join_channe( bot_name=params['bot_name'], ): return ApiResponse.success("加入群成功!!!") else: return ApiResponse.error("加入群失败!!!") except PhoneNumberBannedError: device.is_valid_session = -1 device.save(only=[TgPhoneDevices.is_valid_session]) return ApiResponse.error("账号已被封禁", code=403) except Exception as e: logger.error(f"访问失败: {str(e)}") return ApiResponse.error( "访问失败!!!", code=500, details=str(e) if app.debug else None ) # ================== 应用初始化 ================== def initialize_app(): """初始化Flask应用""" app.config.from_object(Config) # 加载配置 app.register_blueprint(api_bp) # 注册蓝图 # 配置JSON处理 app.config['JSON_AS_ASCII'] = False # 支持Unicode app.config['JSON_SORT_KEYS'] = False # 保持字典顺序 app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True # 调试模式下美化输出 if __name__ == '__main__': initialize_app() # 初始化应用 app.run( host='0.0.0.0', # 监听所有网络接口 port=9001, # 使用9000端口 debug=True, # 调试模式 threaded=True # 使用多线程处理请求 )