import json import time from concurrent.futures import ThreadPoolExecutor from process.tools import get_ldplayer_list, close_ldplayer, start_ldplayer, get_element, get_host_ip, \ split_phone_number, rename_ldplayer, pull_simulator_backup from loguru import logger import uiautomator2 as u2 from models.tg_models import TelegramAccount import socket import os import sys import struct def send_file(file_path, server_ip='localhost', port=5001): """ 发送任意类型文件到服务端 :param file_path: 本地文件路径(支持任意后缀名) :param server_ip: 服务端IP地址,默认localhost :param port: 服务端端口,默认5555 :return: 传输成功返回True,失败返回False """ # 检查文件是否存在 if not os.path.exists(file_path): print(f"[Error] File not found: {file_path}") return False file_name = os.path.basename(file_path) client_sock = None try: # 创建Socket连接 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_sock.settimeout(10) # 设置超时时间 client_sock.connect((server_ip, port)) # 发送文件名(支持任意后缀) file_name_encoded = file_name.encode() # 先发送文件名长度(4字节整数),再发送文件名 client_sock.sendall(struct.pack('I', len(file_name_encoded))) client_sock.sendall(file_name_encoded) # 分块发送文件内容 print(f"[Transfer] Sending: {file_name} -> {server_ip}") sent_bytes = 0 with open(file_path, 'rb') as f: while chunk := f.read(4096): # 4KB分块传输 client_sock.sendall(chunk) sent_bytes += len(chunk) print(f"[Success] Sent {sent_bytes} bytes") return True except socket.error as e: print(f"[Error] Network failure: {str(e)}") return False except Exception as e: print(f"[Error] Unexpected error: {str(e)}") return False finally: if client_sock: client_sock.close() def decorator(func): def wrapper(self, *args, **kwargs): result = None # 这里可以在函数执行前增加逻辑 # 启动模拟器 if not self.start_d(): return try: result = func(self, *args, **kwargs) except Exception as e: logger.error(f"电话号码:{self.tg_phone_info.index_id}:{e}!!!") # 这里可以在函数执行后增加逻辑 close_ldplayer(simulator_id=self.tg_phone_info.index_id) return result return wrapper class Ld: def __init__(self, tg_phone_info): self.tg_phone_info = tg_phone_info # for i in get_ldplayer_list(): # if self.tg_phone_info.ld_name == i["Name"]: # self.index_id = i["ID"] # break self.id = "emulator-" + str(5554 + (int(self.tg_phone_info.index_id) * 2)) def start_d(self): d_type = 0 # 连接模拟器 for i in range(3): if d_type: break start_ldplayer(simulator_id=self.tg_phone_info.index_id) for i in range(3): time.sleep(15) try: self.d = u2.connect(self.id) d_type = 1 break except Exception as e: pass else: logger.error(f"电话号码:{self.tg_phone_info.ld_name}:连接失败!!!") self.tg_phone_info.is_logged_in_telegram = -1 self.tg_phone_info.save() close_ldplayer(simulator_id=self.tg_phone_info.index_id, ) time.sleep(10) return d_type # 使用链接打开tg的检测 def tg_testing(self): logger.info(f"电话号码:{self.tg_phone_info.ld_name}:账号检测中》》》") # 判断需要tg打开链接 for i in range(8): type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.5) if type1: ele.click() type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3) if type1: ele.click() type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3) if type1: ele.click() type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.5) if type1: ele.click() type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5) if type1: ele.click() type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=0.5) if type1: ele.click() time.sleep(5) type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=0.5) if type1: ele.click() # 判断tg是否掉了 type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start Messaging"]', timeout=0.5) # 等待tg打开 if type1: self.tg_phone_info.is_logged_in_telegram = 0 self.tg_phone_info.save() logger.info(f"电话号码:{self.tg_phone_info.ld_name}:检测完成:未登录!!!") return True logger.warning(f"电话号码:{self.tg_phone_info.ld_name}:检测完成,已登录!!!") return False @decorator def action(self): type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开 if type1: ele.click() else: self.tg_phone_info.is_logged_in_telegram = 0 self.tg_phone_info.save() return if self.tg_testing(): logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!") return logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!") type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]', timeout=0.5) # 等待tg打开 if type1: ele.click() # for i in range(3): # type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]', # timeout=0.5) # 等待tg打开 # if type1: # ele.click() # # time.sleep(1) # # type1, ele = get_element(self.d, type1="xpath", grammar='//*[contains(@text, "+")]', # timeout=0.5) # 等待tg打开 # if type1: # break phones = self.d.xpath('//*[contains(@text, "+")]').get_text().replace("-", "").replace(' ', "").replace( '(', "").replace(')', "").replace("+", "") one_phone_number, telephone = split_phone_number(phone_number=phones) # 修改名字 rename_ldplayer(index=self.tg_phone_info.index_id, name=f"{one_phone_number}{telephone}") self.tg_phone_info.ld_name = f"{one_phone_number}{telephone}" self.tg_phone_info.one_phone_number = one_phone_number self.tg_phone_info.telephone = telephone self.tg_phone_info.is_logged_in_telegram = 1 self.tg_phone_info.phone_type = 1 self.tg_phone_info.save() # 模拟按下返回键 self.d.press("back") time.sleep(3) self.d.xpath('(//android.view.ViewGroup)[1]').click() time.sleep(3) type1, ele = get_element( self.d, type1="xpath", grammar='//*[@text="Your account is frozen\nTap to view details"]', timeout=5 ) # 等待tg打开 if type1: self.tg_phone_info.is_logged_in_telegram = 3 self.tg_phone_info.phone_type = 3 self.tg_phone_info.save() @decorator def bf(self): type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开 if type1: ele.click() else: self.tg_phone_info.is_logged_in_telegram = 0 self.tg_phone_info.save() return if self.tg_testing(): logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!") return logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!") # 通过adb shell命令获取 all_packages = self.d.shell('pm list packages').output # 格式化输出(去掉"package:"前缀) clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()] print("纯净包名列表:", clean_packages) package_name = None for i in clean_packages: if "telegram" in i: package_name = i break # 停止应用 self.d.app_stop(package_name) time.sleep(5) pull_simulator_backup( d=self.d, phone=self.tg_phone_info.ld_name, local_backup=path ) time.sleep(10) send_file(file_path=fr"{path}\{self.tg_phone_info.ld_name}.tar", server_ip="192.168.50.122") @decorator def get_sb(self, ): print(self.d.shell(f"adb -s {self.id} shell getprop ro.product.model")) # adb -s 127.0.0.1:5555 shell getprop ro.product.model def get_sb(tg_phone_info, directory, filename, ): # 打开并读取配置文件 with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file: config_data = json.load(file) # 提取所需的键值 phone_manufacturer = config_data.get('propertySettings.phoneManufacturer', 'N/A') phone_model = config_data.get('propertySettings.phoneModel', 'N/A') # 组合结果 result = f"{phone_manufacturer} {phone_model}" tg_phone_info.manufacturer = result tg_phone_info.save() # 输出结果 print(f"文件名中的数字: {filename}, 设备信息: {result}") if __name__ == '__main__': directory = r"E:\leidian\LDPlayer9\vms\config" ld_ids = [] for i in get_ldplayer_list(): ld_ids.append(i["ID"]) # print(i["ID"]) # 同时运行 max_threads = 15 delay_between_start = 15 # 每次启动线程之间的延迟时间(秒) with ThreadPoolExecutor(max_workers=max_threads) as executor: for _, id in enumerate(ld_ids): tg_phone_info = TelegramAccount.get_or_none( TelegramAccount.index_id == id, TelegramAccount.server_id == get_host_ip(), ) if not tg_phone_info.ld_name: continue get_sb(tg_phone_info=tg_phone_info, directory=directory, filename=f"leidian{id}.config", ) # ld = Ld(tg_phone_info, ) # executor.submit(ld.action) # time.sleep(delay_between_start) # # for i in get_ldplayer_list(): # # if id == i["ID"] and i["Name"] == "NoneNone": # # ld = Ld(tg_phone_info, ) # # executor.submit(ld.action) # # time.sleep(delay_between_start) # # if tg_phone_info.is_logged_in_telegram == -1: # # logger.info(f"雷电名称:{id} 已经存在!!!") # # continue # # ld = Ld(tg_phone_info, ) # executor.submit(ld.action) # # time.sleep(delay_between_start) # # if tg_phone_info.is_logged_in_telegram == -1: # ld = Ld(tg_phone_info, ) # executor.submit(ld.action) # # time.sleep(delay_between_start) # from pathlib import Path # # path = r"E:\backups" # # # 指定目录 # directory = Path(path) # # # 获取目录下的所有文件 # start = 0 # file_list = [str(file) for file in directory.rglob('*') if file.is_file()] # for file in file_list: # if tg_phone_info.ld_name in file: # start = 1 # break # # if start: # send_file(file_path=fr"{path}\{tg_phone_info.ld_name}.tar", server_ip="192.168.50.122") # continue # # ld = Ld(tg_phone_info, ) # executor.submit(ld.bf)