import os import time import socket import asyncio from concurrent.futures import ThreadPoolExecutor import requests from loguru import logger from opentele.api import API from telethon import TelegramClient from telethon.errors import SessionPasswordNeededError, PhoneCodeInvalidError, RPCError, PhoneNumberBannedError from job_models.tg_models import TelegramAccount from tootls import split_phone_number, get_code from models.tg_phone_devices import TgPhoneDevices def get_host_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ip = None try: s.connect(('8.8.8.8', 80)) # 114.114.114.114也是dns地址 ip = s.getsockname()[0] finally: s.close() return ip async def get_device_info(): """ 获取设备信息 """ NewApi = API.TelegramDesktop.Generate(system="windows") return { "api_id": NewApi.api_id, "api_hash": NewApi.api_hash, "device_model": NewApi.device_model, "system_version": NewApi.system_version, "app_version": NewApi.app_version, "lang_code": NewApi.lang_code, "system_lang_code": NewApi.system_lang_code } async def create_telegram_client(server): """ 创建并配置 Telegram 客户端 """ client = TelegramClient( fr"C:\sessions\{server.phone}", api_id=server.api_id, api_hash=server.api_hash, device_model=server.device_model, system_version=server.system_version, app_version=server.app_version, system_lang_code=server.system_lang_code, lang_code=server.lang_code, ) return client async def login_telegram(client, tg_phone_data, server): """ 处理登录流程 """ try: sent_code_info = await client.send_code_request(server.phone) logger.info(f"账号:{server.phone},发送验证码成功》》》") time.sleep(5) res = requests.get( url=f"http://{server_ip}:8080/get_code?ld_name={tg_phone_data.ld_name}", ) if res.status_code == 200: logger.info(f"账号:{server.phone},获取模拟器验证码成功》》》") try: await client.sign_in(phone=server.phone, code=res.json()["codes"]) logger.info(f"账号:{server.phone},输入验证码成功》》》") except PhoneCodeInvalidError: logger.error(f"账号 {server.phone},验证码无效,请重试") return 3 except SessionPasswordNeededError: password = "jiedata123" if server.to_code: password = server.to_code await client.sign_in(password=password) logger.info(f"账号:{server.phone},输入2fa成功》》》") server.code = password server.save() return 1 except PhoneNumberBannedError: logger.error(f"账号 {server.phone},已被封禁!!!") return 4 except (RPCError, Exception) as e: logger.error(f'账号:{server.phone},登录失败,错误信息:{e}') return 0 async def handle_login_failure(server): """ 处理登录失败的情况 """ time.sleep(5) session_file = fr"C:\sessions\{server.phone}.session" if os.path.exists(session_file): os.remove(session_file) print(f"文件 {session_file} 已删除") # 更新服务器信息 server.area_code = server.area_code server.api_id = server.api_id server.api_hash = server.api_hash server.device_model = server.device_model server.system_version = server.system_version server.app_version = server.app_version server.system_lang_code = server.system_lang_code server.lang_code = server.lang_code server.phone = server.phone server.save() # 重新创建客户端并尝试登录 client = await create_telegram_client(server) login_start = await login_telegram(client, tg_phone_data, server) if login_start == 1: me = await client.get_me() logger.info(f'账号 {server.phone} -- {me.first_name} 登录成功!') server.is_valid_session = "1" server.save() elif login_start == 0: logger.error("Telegram 登录失败") server.is_valid_session = "0" server.save() async def main(_, tg_phone_data): device_info = await get_device_info() area_code = tg_phone_data.one_phone_number phone_number = tg_phone_data.telephone server, created = TgPhoneDevices().get_or_create( phone=area_code + phone_number, ) if server.is_valid_session: logger.info(f"第{_},账号 {server.phone},登录成功!!!") return if server.is_valid_session is None: server.area_code = area_code server.phone_number = phone_number server.api_id = device_info["api_id"] server.api_hash = device_info["api_hash"] server.device_model = device_info["device_model"] server.system_version = device_info["system_version"] server.app_version = device_info["app_version"] server.system_lang_code = device_info["system_lang_code"] server.lang_code = device_info["lang_code"] server.phone = area_code + phone_number server.save() logger.info(f"第{_},账号:{server.phone},开始登录。。。") res = requests.get( url=f"http://{server_ip}:8080/start?ld_name={tg_phone_data.ld_name}", timeout=500 ) if res.status_code != 200: logger.info(f"第{_},账号:{server.phone},启动模拟器失败。。。") return logger.info(f"第{_},账号:{server.phone},启动模拟器成功。。。") client = await create_telegram_client(server) await client.connect() if await client.is_user_authorized(): me = await client.get_me() logger.info(f'第{_},账号:{server.phone} -- {me.first_name} 登录成功!') await client.disconnect() return login_start = await login_telegram(client, tg_phone_data, server) if login_start == 1: me = await client.get_me() logger.info(f'第{_},账号 {server.phone} -- {me.first_name} 登录成功!') server.is_valid_session = 1 server.server_ip = server_ip server.save() elif login_start == 0: logger.error(f"第{_},账号 {server.phone},Telegram 登录失败") server.is_valid_session = 0 server.save() elif login_start == 3: await client.disconnect() await handle_login_failure(server) elif login_start == 4: server.is_valid_session = -1 server.save() await client.disconnect() res = requests.get( url=f"http://{server_ip}:8080/close?ld_name={tg_phone_data.ld_name}", ) if res.status_code == 200: logger.info(f"第{_},账号 {server.phone},关闭模拟器成功!!!") def main1(_, tg_phone_data): asyncio.run(main(_, tg_phone_data)) if __name__ == '__main__': server_ip = "192.168.50.97" tg_phone_datas = TelegramAccount().select().where( TelegramAccount.server_id == server_ip, TelegramAccount.is_logged_in_telegram == 1, ) # 同时运行 max_threads = 3 delay_between_start = 15 # 每次启动线程之间的延迟时间(秒) with ThreadPoolExecutor(max_workers=max_threads) as executor: for _, tg_phone_data in enumerate(tg_phone_datas): executor.submit(main1(_, tg_phone_data)) # time.sleep(delay_between_start)