import asyncio import os.path import random import threading import time import traceback from asyncio.exceptions import IncompleteReadError from urllib.parse import unquote from loguru import logger from telethon import TelegramClient from telethon.errors import PhoneNumberBannedError from telethon.tl.functions.account import GetAuthorizationsRequest from telethon.tl.functions.messages import RequestAppWebViewRequest from telethon.tl.types import InputBotAppShortName from models.tg_phone_devices import TgPhoneDevices async def create_telegram_client(server_type): """ 创建并配置 Telegram 客户端 """ proxy = { 'proxy_type': server_type.proxy_type, # 或 'socks4',具体看你的代理类型 'addr': server_type.addr, # 代理服务器地址 'port': int(server_type.port), # 代理服务器端口 'username': server_type.user if server_type.user else "", # 如果有用户名,填写 'password': server_type.pwd if server_type.pwd else "" # 如果有密码,填写 } try: client = TelegramClient( fr"C:\sessions\{server_type.phone}", api_id=server_type.api_id, api_hash=server_type.api_hash, system_lang_code=server_type.system_lang_code, lang_code=server_type.lang_code, device_model=server_type.device_model, system_version=server_type.system_version, app_version=server_type.app_version, proxy=proxy, auto_reconnect=False, ) return client except Exception as e: print(e) traceback.print_stack() logger.error(f"当前电话号码:{server_type.phone},有问题!!!") async def login_and_validate(server): client = await create_telegram_client(server) try: try: await client.connect() except Exception as e: server.is_valid_session = 4 server.save(only=[TgPhoneDevices.is_valid_session]) return if await client.is_user_authorized(): me = await client.get_me() logger.info(f'账号 {me.phone} -- {me.first_name} 登录成功!!!') server.is_valid_session = 1 time.sleep(random.randint(3, 8)) # 获取设备数 authorizations = await client(GetAuthorizationsRequest()) server.device_start = len(authorizations.authorizations) time.sleep(random.randint(3, 8)) # 判断是否被冻结了 bot_username = "BlumCryptoBot" start = 0 async for dialog in client.iter_dialogs(): try: for i in dialog.entity.usernames: if i.username == bot_username: logger.info(f'账号 {server.phone},已与机器人交互: @{bot_username}') start = 1 break if start: break except: pass if dialog.is_user and dialog.entity.username == bot_username: logger.info(f'账号 {server.phone},已与机器人交互: @{bot_username}') start = 1 break try: if not start: logger.info(f'账号 {server.phone},尚未与机器人交互: @{bot_username}') bot = await client.get_entity(bot_username) # 获取bot实体 await client.send_message(bot, '/start') invite_code = "ref_H21tvNwu3n" await client.get_input_entity(bot_username) app_info = await client(RequestAppWebViewRequest( 'me', InputBotAppShortName(await client.get_input_entity(bot_username), "app"), platform="web", start_param=invite_code if invite_code else None )) tgData = unquote(app_info.url.split('tgWebAppData=')[1].split('&tgWebAppVersion')[0]) print(tgData) except Exception as e: print(e) server.is_valid_session = 3 else: logger.error(f'账号 {server.phone} 授权失败!!!') server.is_valid_session = 0 except PhoneNumberBannedError: logger.error(f"账号 {server.phone},账号以死!!!") server.is_valid_session = -1 server.save(only=[TgPhoneDevices.is_valid_session, TgPhoneDevices.device_start]) logger.success('数据已更新') client.disconnect() async def start_task(server): try: status = check_exist_session_file(server.phone) if not status: server.is_valid_session = -1 server.save(only=[TgPhoneDevices.is_valid_session]) return await login_and_validate(server) except Exception as e: a = f"1{e}" if a == "1Connection to Telegram failed 5 time(s)": server.is_valid_session = 4 server.save(only=[TgPhoneDevices.is_valid_session]) logger.error(f"当前电话号码:{server.phone},有问题!!!") def check_exist_session_file(phone): path = fr"C:\sessions\{phone}.session" is_exist_session_file = os.path.isfile(path) if not is_exist_session_file: logger.error(f'未找到session文件:{path}') return False return True def main1(server): asyncio.run(start_task(server=server)) def main(): devices = TgPhoneDevices.select().where( TgPhoneDevices.is_valid_session.is_null(False), # TgPhoneDevices.phone.is_null(False) # TgPhoneDevices.device_start.is_null() # TgPhoneDevices.kick_status.is_null() ) logger.info(f'本轮需要检查:{len(devices)}') for _, server in enumerate(devices): logger.info(f'正在检测,第{_},账号:{server.phone}') # main1(server=server) threading.Thread(target=main1, args=(server,)).start() time.sleep(0.3) if __name__ == '__main__': main()