import json import os import threading import cv2 import time import socket import subprocess import numpy as np import requests from PIL import Image from faker import Faker lock = threading.Lock() def get_device_ids(): try: result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, check=True) output = result.stdout lines = output.splitlines() devices = [] for line in lines[1:]: if line.strip(): device_id = line.split()[0] devices.append(device_id) return devices except subprocess.CalledProcessError as e: print(f"命令执行失败: {e}") return [] def get_nox_list(): command = r'powershell -Command "& \"C:\Program Files (x86)\Nox\bin\NoxConsole.exe\" list"' result = subprocess.run(command, shell=True, capture_output=True, text=True) output = result.stdout.strip() lines = output.split('\n') output_dict = {} for line in lines: parts = line.split(',') if len(parts) >= 3: key = parts[0].strip() value = { 'NoxID': parts[1].strip(), '名称': parts[2].strip() } output_dict[key] = value return output_dict def create_simulator(n): command = [ r'C:\Program Files (x86)\Nox\bin\NoxConsole.exe', 'add', f'-name:{n}', '-systemtype:12' ] subprocess.run(command, capture_output=True, text=True) def updata_resolving_power(n): command = [ r'C:\Program Files (x86)\Nox\bin\NoxConsole.exe', 'modify', f'-name:{n}', '-resolution:540,960,240' ] subprocess.run(command, capture_output=True, text=True) def start_simulator(n): command = [ r'D:\Program Files\Nox\bin\NoxConsole.exe', 'launch', f'-index:{n}', ] subprocess.run(command, capture_output=True, text=True) def clone_simulator(n): command = [ r'D:\Program Files\Nox\bin\NoxConsole.exe', 'quit', f'-index:{n}', ] subprocess.run(command, capture_output=True, text=True) def get_element(d, type1, grammar, timeout=5): if type1 == "xpath": end_time = time.time() + timeout element = "" while time.time() < end_time: try: element = d.xpath(grammar) if element.exists: return True, element except Exception as e: pass time.sleep(0.1) return False, element elif type1 == "text": ele = d(text=grammar) return ele.exists(timeout=timeout), ele def start_nox(index=None): command = ["NoxConsole.exe", "launch"] if index is not None: command.append(f"-index:{index}") result = subprocess.run(command, capture_output=True, text=True) def exit_simulator(index=None): command = ["NoxConsole.exe", "quit"] if index is not None: command.append(f"-index:{index}") result = subprocess.run(command, capture_output=True, text=True) def get_host_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ip = None try: s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip def preprocess_image(image): alpha = 1.2 beta = 30 adjusted = cv2.convertScaleAbs(image, alpha=alpha, beta=beta) return adjusted def match_image(d, template_path: str, threshold: float = 0.8, retry_times: int = 3, wait_time: int = 1): for attempt in range(retry_times): screenshot_image = d.screenshot(format="opencv") screenshot_image = preprocess_image(screenshot_image) template_image = cv2.imread(template_path, cv2.IMREAD_COLOR) if template_image is None: return None template_image = preprocess_image(template_image) result = cv2.matchTemplate(screenshot_image, template_image, cv2.TM_CCOEFF_NORMED) min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) if max_val < threshold: if attempt < retry_times - 1: time.sleep(wait_time) else: return None else: h, w = template_image.shape[:2] center_x = int(max_loc[0] + w / 2) center_y = int(max_loc[1] + h / 2) return [(center_x, center_y)] return None def screenshot_and_crop(d, crop_area, save_path='wallet_connect.png'): screenshot = d.screenshot() cropped_img = screenshot.crop(crop_area) cropped_img.save(save_path) print(f'Cropped screenshot saved to {save_path}') def generate_fake_info(): fake = Faker('en_US') info = {} full_name = fake.name() first_name, last_name = full_name.split(' ', 1) info['first_name'] = first_name info['last_name'] = last_name birthdate = fake.date_of_birth(minimum_age=18, maximum_age=80) info['birth_year'] = birthdate.year info['birth_month'] = birthdate.month info['birth_day'] = birthdate.day return info def get_ldplayer_list(): command = [r"ldconsole", "list2"] result = subprocess.run(command, capture_output=True, text=True, shell=True) output = result.stdout.strip() lines = output.split('\n') ldplayer_list = [] for line in lines: parts = line.split(',') if len(parts) == 10: ldplayer_info = { "ID": int(parts[0]), "Name": parts[1], "Param1": int(parts[2]), "Param2": int(parts[3]), "Param3": int(parts[4]), "Param4": int(parts[5]), "Param5": int(parts[6]), "Width": int(parts[7]), "Height": int(parts[8]), "DPI": int(parts[9]), } ldplayer_list.append(ldplayer_info) return ldplayer_list def swipe_up(d, start_y1, end_y1, duration=0.1): width, height = d.window_size() start_x = width / 2 start_y = height * start_y1 end_x = width / 2 end_y = height * end_y1 d.swipe(start_x, start_y, end_x, end_y, duration) def close_ldplayer(simulator_id): command = f"dnconsole quit --index {simulator_id}" result = subprocess.run(command, capture_output=True, text=True, shell=True) def start_ldplayer(simulator_id): command = f"dnconsole launch --index {simulator_id}" with lock: result = subprocess.run(command, capture_output=True, text=True, shell=True) ld_start = 0 for i in range(50): for i in get_ldplayer_list(): if i['ID'] == simulator_id and i["Param3"] == 1: ld_start = 1 break if ld_start == 1: break time.sleep(1) return ld_start def create_ldplayer(name): command = f"dnconsole add --name {name}" with lock: result = subprocess.run(command, capture_output=True, text=True, shell=True) def rename_ldplayer(index, name): command = f"dnconsole rename --index {index} --title {name}" result = subprocess.run(command, capture_output=True, text=True, shell=True) def mumu_get_list_all(): command = ["MuMuManager.exe", "info", "-v", "all"] result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8') if result.returncode != 0: print(f"Command failed with error: {result.stderr}") return None try: info = json.loads(result.stdout) return [info] except json.JSONDecodeError as e: print(f"Failed to parse JSON: {e}") return None def mumu_create(id): command = ["MuMuManager.exe", "create", "-v", f'{id}'] result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8') def mumu_start(id): command = ["MuMuManager.exe", "control", "-v", f'{id}', "launch"] result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8') def mumu_quit(id): command = ["MuMuManager.exe", "control", "-v", f'{id}', "shutdown"] result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8') def mumu_rename(id, name): command = ["MuMuManager.exe", "rename", "-v", f'{id}', "-n", f"{name}"] result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8') def get_code(phone, device_mode=None): url = f"http://192.168.50.175:9000/api/check_phone?phone={phone}" res = requests.get(url) return res.json()["data"]["verification_code"] country_code_map = { "1": "US", "44": "GB", "27": "ZA", "20": "ZA", "234": "NG", "86": "CN", "63": "PH", "62": "PH", "972": "PH", } def split_phone_number(phone_number): if phone_number.startswith('+'): phone_number = phone_number[1:] for prefix in country_code_map: if phone_number.startswith(prefix): country_code = prefix national_number = phone_number[len(prefix):] return country_code, national_number return None, None def _read_and_scale_template(template_path: str, scale_ratio: float) -> np.ndarray: template = cv2.imdecode(np.fromfile(template_path, dtype=np.uint8), cv2.IMREAD_COLOR) if template is None: raise FileNotFoundError(f"模板图片不存在: {template_path}") if scale_ratio != 1.0: h, w = template.shape[:2] template = cv2.resize( template, (int(w * scale_ratio), int(h * scale_ratio)), interpolation=cv2.INTER_AREA ) return template def find_img(d, template_path: str, threshold: float = 0.8, retry: int = 1, timeout=5, scale_ratio: float = 1.0): template = _read_and_scale_template(template_path, scale_ratio) for _ in range(retry): try: screenshot = d.screenshot(format='opencv') match_result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED) min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result) if max_val >= threshold: h, w = template.shape[:2] x = max_loc[0] + w // 2 y = max_loc[1] + h // 2 return (x, y) except Exception as e: print(f"图像识别异常: {str(e)}") time.sleep(timeout) return False def adb_find_img(d, template_path: str, threshold: float = 0.8, retry: int = 1, timeout=5, scale_ratio: float = 1.0, error_type=0): template = _read_and_scale_template(template_path, scale_ratio) remote_path = '/sdcard/screenshot.png' for _ in range(retry): try: d.shell(['screencap', '-p', remote_path]) d.pull(remote_path, 'temp_screenshot.png') screenshot_pil = Image.open('temp_screenshot.png') screenshot = np.array(screenshot_pil) screenshot = cv2.cvtColor(screenshot, cv2.COLOR_RGB2BGR) match_result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED) min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result) if max_val >= threshold: h, w = template.shape[:2] x = max_loc[0] + w // 2 y = max_loc[1] + h // 2 if os.path.exists('temp_screenshot.png'): os.remove('temp_screenshot.png') return (x, y) except Exception as e: print(f"图像识别异常: {str(e)}") if error_type: raise "" if os.path.exists('temp_screenshot.png'): os.remove('temp_screenshot.png') time.sleep(timeout) return False def u2_adb_screenshot_and_crop(d, crop_area, save_path='wallet_connect.png'): remote_path = '/sdcard/screenshot.png' d.shell(['screencap', '-p', remote_path]) d.pull(remote_path, 'temp_screenshot.png') try: screenshot = Image.open('temp_screenshot.png') cropped_img = screenshot.crop(crop_area) cropped_img.save(save_path) print(f'Cropped screenshot saved to {save_path}') except Exception as e: print(f"裁剪图片时出错: {e}") finally: if os.path.exists('temp_screenshot.png'): os.remove('temp_screenshot.png') def get_page_user(phone, url1): url = "http://192.168.50.97:9999/get_user_data" json_data = { "phone": phone, "url": url1 } res = requests.post(url, json=json_data) return res.json() def send_command_to_server(server_host, server_port, command): client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: client_socket.connect((server_host, server_port)) client_socket.sendall(command.encode('utf-8')) if command.lower() == 'exit': return None data_length = int(client_socket.recv(10).decode('utf-8')) received_data = b'' while len(received_data) < data_length: chunk = client_socket.recv(data_length - len(received_data)) if not chunk: break received_data += chunk output = received_data.decode('utf-8') return output except Exception as e: print(f"发生错误: {e}") return None finally: client_socket.close() # 打包拉取 def pull_simulator_backup( d, # U2对象 phone, # 电话号码 package_name="org.telegram.messenger.web", # 包名 local_backup=r"D:\backups" ): # 判断并创建目录 if not os.path.exists(local_backup): try: os.makedirs(local_backup) print(f"目录已创建:{local_backup}") except Exception as e: print(f"创建目录失败:{e}") else: print(f"目录已存在:{local_backup}") # 打包 extract_cmd = ( "su -c '" "cd /data/data/ && " f"tar -cf /data/local/tmp/{phone}.tar {package_name}" "'" ) d.shell(extract_cmd, ) # 拉取 d.pull(f"/data/local/tmp/{phone}.tar", f"{local_backup}/{phone}.tar") # 删除临时文件 d.shell(f"su -c 'rm /data/local/tmp/{phone}.tar'") # 传入解压移动 def deploy_and_extract_to_android( d, # U2对象 phone, # 电话号码 package_name="org.telegram.messenger.web", # 包名 local_backup=r"D:\backups" ): # # 获取包名 # package_name = None # # 通过adb shell命令获取 # all_packages = d.shell('pm list packages').output # # 格式化输出(去掉"package:"前缀) # clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()] # for i in clean_packages: # if "telegram" in i: # package_name = i # break # 将本地备份文件上传到模拟器 d.push(f"{local_backup}/{phone}.tar", "/data/local/tmp/telegram_backup.tar") # 解压 extract_cmd = ( "su -c '" "mkdir -p /data/local/tmp && " "tar -xf /data/local/tmp/telegram_backup.tar -C /data/local/tmp" "'" ) d.shell(extract_cmd, ) # 获取解压后的目录列表 path = None list_cmd = "su -c 'ls -d /data/local/tmp/*/'" result = d.shell(list_cmd) dirs = result.output.strip().split('\n') for i in dirs: if "telegram" in i: path = i # 复制文件 copy_cmd = f"su -c 'cp -r {path}* /data/data/{package_name}/'" d.shell(copy_cmd) # # 确保目标目录权限正确 # chmod_cmd = "su -c 'chmod -R 755 /data/data/org.telegram.messenger'" # d.shell(chmod_cmd) # 可选:删除临时文件 cleanup_cmd = f"su -c 'rm -rf {path.rstrip('/')}'" d.shell(cleanup_cmd) cleanup_cmd = f"su -c 'rm -rf /data/data/{package_name}/app_webview'" d.shell(cleanup_cmd)