import time from concurrent.futures import ThreadPoolExecutor from models.tg_models import TelegramAccount from process.tg_process import TG from process.tools import get_host_ip from loguru import logger from DrissionPage import ChromiumPage, ChromiumOptions from 比特.bit_tools import openBrowser class BitProcess(object): def __init__(self, bit_info): self.bit_info = bit_info self.tg = None self.page = None def get_page(self): try: port = openBrowser(id=self.bit_info.bit_id)["data"]["http"].split(":") co = ChromiumOptions().set_local_port(port[1]) self.page = ChromiumPage(co) return True except: pass return False def start_ld(self): self.tg = TG(type1=1, tg_phone=self.bit_info, ) if self.tg.get_code(): return False return True def get_is_logged_in_telegram(self): try: self.page.ele('x://*[@id="LeftMainHeader"]/div[1]/button', timeout=50).click() self.page.ele('x://button[text()="Settings"]', timeout=50) self.bit_info.bit_is_logged_in_telegram = 1 self.bit_info.save() return True except: pass return False def action(self): if self.get_page(): logger.info(f"当前电话号码:{self.bit_info.ld_name},打开浏览器成功!!!") else: logger.error(f"当前电话号码:{self.bit_info.ld_name},打开浏览器失败!!!") return try: for _, i in enumerate(self.page.get_tabs()): if _ == 0: continue i.close() except: pass if self.start_ld(): logger.info(f"当前电话号码:{self.bit_info.ld_name},打开模拟器成功!!!") else: logger.error(f"当前电话号码:{self.bit_info.ld_name},打开模拟器失败!!!") self.page.get('https://web.telegram.org/a/') if self.get_is_logged_in_telegram(): logger.info(f"当前电话号码:{self.bit_info.ld_name},登录成功!!!") self.page.ele("x://button[text()='Log in by phone Number']", timeout=50).click() time.sleep(10) self.page.ele('x://*[@id="sign-in-phone-number"]').input(clear=True, vals=self.bit_info.ld_name) time.sleep(5) self.page.ele("x://button[text()='Next']", timeout=50).click() time.sleep(10) self.page.ele('x://*[@id="sign-in-code"]').input(self.tg.get_code1()[-1]) self.tg.close_ldplayer() time.sleep(10) self.page.ele('x://*[@id="sign-in-password"]').input("Qasdasd123.0") time.sleep(10) self.page.ele("x://button[text()='Next']", timeout=50).click() if self.get_is_logged_in_telegram(): logger.info(f"当前电话号码:{self.bit_info.ld_name},登录成功!!!") if __name__ == '__main__': tg_phone_datas = TelegramAccount().select().where( TelegramAccount.server_id == get_host_ip(), TelegramAccount.is_logged_in_telegram == 1, ) # 同时运行 max_threads = 1 delay_between_start = 15 # 每次启动线程之间的延迟时间(秒) with ThreadPoolExecutor(max_workers=max_threads) as executor: while True: futures = [] for i in tg_phone_datas: try: tg = BitProcess(bit_info=i) except Exception as e: continue # 提交线程任务到线程池 future = executor.submit(tg.action) futures.append(future) # 线程启动间隔 time.sleep(delay_between_start) # 等待所有提交的任务完成 done = False while not done: done = True for future in futures: if not future.done(): done = False time.sleep(60) # 可以根据需要调整休眠时间 break time.sleep(600)