async def authTgWebAppDataApi(phone, bot_name, url, device_model, platform, channel_name, start_params, invite_code, is_full): # 检查会话文件是否存在 if phone not in session_file_lst: return { 'code': 404, 'phone': phone, 'message': 'session file not found', 'tgWebAppData': '' } # 查询设备信息 sql_str = f"select * from device_info where phone = '{phone}'" device_data = list(db.select_data(sql_str)) # 查询账号信息 sql_str = f"select * from telegram_account_info where phone = '{phone}'" tel_data = list(db.select_data(sql_str)) device_data.extend(tel_data) if not device_data: return { 'code': 404, 'phone': phone, 'message': 'phone not found', 'tgWebAppData': '' } try: # 创建客户端 app = TelegramClient( f'./sessions/{phone}', api_id=api_id, api_hash=api_hash, device_model=device_model, system_version=system_version, app_version=app_version, system_lang_code='en-US', lang_code='en' ) await app.connect() if await app.is_user_authorized(): # 更新会话状态为正常 sql_str = f"update device_info set is_valid_session=1, message='正常' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) sql_str = f"update telegram_account_info set is_valid_session=1, message='正常' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) # 获取bot实体 bot = await app.get_entity(bot_name) # 检查是否已经关注bot if not await check_bot_member(app, bot_name): if invite_code: # 使用邀请码启动bot result = await app(StartBotRequest( bot=bot_name, peer=bot_name, start_param=invite_code )) else: # 直接发送/start命令 result = await app.send_message(bot, '/start') logger.info(f'已与机器人开始交互:{result}') # 随机选择平台 if not platform: platform = random.choice(['web', 'tdesktop']) logger.info(platform) # 加入频道(如果指定了频道) if channel_name: if not await check_member_by_name(app, channel_name): channel = await app.get_entity(channel_name) await app(JoinChannelRequest(channel)) logger.info(f'{phone} join channel {channel_name}') if is_full: # 完整模式 await app.send_message(bot, '/start') time.sleep(random.randint(3, 4)) tgData = (await app(GetFullUserRequest(bot))).full_user.bot_info.menu_button.url logger.success(tgData) # 保存数据到文件 with open(f'./data/{bot_name}.txt', 'a+', encoding='utf-8') as wf: wf.write(f'{phone}----{tgData}\n') await app.disconnect() return { 'code': 200, 'phone': phone, 'message': 'success', 'tgWebAppData': tgData } elif start_params: # 使用start_params请求WebApp app_info = await app(RequestAppWebViewRequest( 'me', InputBotAppShortName(await app.get_input_entity(bot_name), start_params), platform=platform, start_param=invite_code if invite_code else None )) logger.debug(f'{phone}----{app_info.url}') tgData = unquote(app_info.url.split('tgWebAppData=')[1].split('&tgWebAppVersion')[0], string=True) else: # 普通模式请求WebApp peer = InputPeerUser(bot.id, bot.access_hash) result = await app(RequestWebViewRequest( peer=peer, bot=bot, platform=platform, from_bot_menu=True, url=url )) logger.info(f'{phone}----{result.url}') tgData = unquote(result.url.split('tgWebAppData=')[1].split('&tgWebAppVersion')[0], string=True) logger.success(tgData) # 保存数据到文件 with open(f'./data/{bot_name}.txt', 'a+', encoding='utf-8') as wf: wf.write(f'{phone}----{tgData}\n') await app.disconnect() return { 'code': 200, 'phone': phone, 'message': 'success', 'tgWebAppData': tgData } else: # 会话无效 sql_str = f"update device_info set is_valid_session=0, message='失效' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) sql_str = f"update telegram_account_info set is_valid_session=0, message='失效' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) await app.disconnect() return { 'code': 400, 'phone': phone, 'message': 'session not valid' } except PhoneNumberBannedError as e: # 号码被封禁 sql_str = f"update device_info set is_valid_session=0, message='封禁' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) sql_str = f"update telegram_account_info set is_valid_session=0, message='封禁' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) await app.disconnect() return { 'code': 200, 'phone': phone, 'message': '封禁' } except AuthKeyDuplicatedError: # 会话密钥重复 await app.disconnect() sql_str = f"update device_info set is_valid_session=0, message='失效' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) sql_str = f"update telegram_account_info set is_valid_session=0, message='失效' where phone='{phone}' and device_model='{device_model}'" db.execute_query(sql_str) await app.disconnect() return { 'code': 200, 'phone': phone, 'message': 'session 失效' } except Exception as e: # 其他异常 import traceback logger.error(traceback.print_exc()) await app.disconnect() return { 'code': 500, 'phone': phone, 'message': '异常,联系开发者排查' } finally: await app.disconnect()