Files
to_session/process/tools.py

510 lines
16 KiB
Python
Raw Normal View History

2025-11-12 12:54:37 +08:00
import json
import os
import threading
import cv2
import time
import socket
import subprocess
import numpy as np
import requests
from PIL import Image
from faker import Faker
lock = threading.Lock()
def get_device_ids():
try:
result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, check=True)
output = result.stdout
lines = output.splitlines()
devices = []
for line in lines[1:]:
if line.strip():
device_id = line.split()[0]
devices.append(device_id)
return devices
except subprocess.CalledProcessError as e:
print(f"命令执行失败: {e}")
return []
def get_nox_list():
command = r'powershell -Command "& \"C:\Program Files (x86)\Nox\bin\NoxConsole.exe\" list"'
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout.strip()
lines = output.split('\n')
output_dict = {}
for line in lines:
parts = line.split(',')
if len(parts) >= 3:
key = parts[0].strip()
value = {
'NoxID': parts[1].strip(),
'名称': parts[2].strip()
}
output_dict[key] = value
return output_dict
def create_simulator(n):
command = [
r'C:\Program Files (x86)\Nox\bin\NoxConsole.exe',
'add',
f'-name:{n}',
'-systemtype:12'
]
subprocess.run(command, capture_output=True, text=True)
def updata_resolving_power(n):
command = [
r'C:\Program Files (x86)\Nox\bin\NoxConsole.exe',
'modify',
f'-name:{n}',
'-resolution:540,960,240'
]
subprocess.run(command, capture_output=True, text=True)
def start_simulator(n):
command = [
r'D:\Program Files\Nox\bin\NoxConsole.exe',
'launch',
f'-index:{n}',
]
subprocess.run(command, capture_output=True, text=True)
def clone_simulator(n):
command = [
r'D:\Program Files\Nox\bin\NoxConsole.exe',
'quit',
f'-index:{n}',
]
subprocess.run(command, capture_output=True, text=True)
def get_element(d, type1, grammar, timeout=5):
if type1 == "xpath":
end_time = time.time() + timeout
element = ""
while time.time() < end_time:
try:
element = d.xpath(grammar)
if element.exists:
return True, element
except Exception as e:
pass
time.sleep(0.1)
return False, element
elif type1 == "text":
ele = d(text=grammar)
return ele.exists(timeout=timeout), ele
def start_nox(index=None):
command = ["NoxConsole.exe", "launch"]
if index is not None:
command.append(f"-index:{index}")
result = subprocess.run(command, capture_output=True, text=True)
def exit_simulator(index=None):
command = ["NoxConsole.exe", "quit"]
if index is not None:
command.append(f"-index:{index}")
result = subprocess.run(command, capture_output=True, text=True)
def get_host_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = None
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
def preprocess_image(image):
alpha = 1.2
beta = 30
adjusted = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
return adjusted
def match_image(d, template_path: str, threshold: float = 0.8, retry_times: int = 3, wait_time: int = 1):
for attempt in range(retry_times):
screenshot_image = d.screenshot(format="opencv")
screenshot_image = preprocess_image(screenshot_image)
template_image = cv2.imread(template_path, cv2.IMREAD_COLOR)
if template_image is None:
return None
template_image = preprocess_image(template_image)
result = cv2.matchTemplate(screenshot_image, template_image, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val < threshold:
if attempt < retry_times - 1:
time.sleep(wait_time)
else:
return None
else:
h, w = template_image.shape[:2]
center_x = int(max_loc[0] + w / 2)
center_y = int(max_loc[1] + h / 2)
return [(center_x, center_y)]
return None
def screenshot_and_crop(d, crop_area, save_path='wallet_connect.png'):
screenshot = d.screenshot()
cropped_img = screenshot.crop(crop_area)
cropped_img.save(save_path)
print(f'Cropped screenshot saved to {save_path}')
def generate_fake_info():
fake = Faker('en_US')
info = {}
full_name = fake.name()
first_name, last_name = full_name.split(' ', 1)
info['first_name'] = first_name
info['last_name'] = last_name
birthdate = fake.date_of_birth(minimum_age=18, maximum_age=80)
info['birth_year'] = birthdate.year
info['birth_month'] = birthdate.month
info['birth_day'] = birthdate.day
return info
def get_ldplayer_list():
command = [r"ldconsole", "list2"]
result = subprocess.run(command, capture_output=True, text=True, shell=True)
output = result.stdout.strip()
lines = output.split('\n')
ldplayer_list = []
for line in lines:
parts = line.split(',')
if len(parts) == 10:
ldplayer_info = {
"ID": int(parts[0]),
"Name": parts[1],
"Param1": int(parts[2]),
"Param2": int(parts[3]),
"Param3": int(parts[4]),
"Param4": int(parts[5]),
"Param5": int(parts[6]),
"Width": int(parts[7]),
"Height": int(parts[8]),
"DPI": int(parts[9]),
}
ldplayer_list.append(ldplayer_info)
return ldplayer_list
def swipe_up(d, start_y1, end_y1, duration=0.1):
width, height = d.window_size()
start_x = width / 2
start_y = height * start_y1
end_x = width / 2
end_y = height * end_y1
d.swipe(start_x, start_y, end_x, end_y, duration)
def close_ldplayer(simulator_id):
command = f"dnconsole quit --index {simulator_id}"
result = subprocess.run(command, capture_output=True, text=True, shell=True)
def start_ldplayer(simulator_id):
command = f"dnconsole launch --index {simulator_id}"
with lock:
result = subprocess.run(command, capture_output=True, text=True, shell=True)
ld_start = 0
for i in range(50):
for i in get_ldplayer_list():
if i['ID'] == simulator_id and i["Param3"] == 1:
ld_start = 1
break
if ld_start == 1:
break
time.sleep(1)
return ld_start
def create_ldplayer(name):
command = f"dnconsole add --name {name}"
with lock:
result = subprocess.run(command, capture_output=True, text=True, shell=True)
def rename_ldplayer(index, name):
command = f"dnconsole rename --index {index} --title {name}"
result = subprocess.run(command, capture_output=True, text=True, shell=True)
def mumu_get_list_all():
command = ["MuMuManager.exe", "info", "-v", "all"]
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
if result.returncode != 0:
print(f"Command failed with error: {result.stderr}")
return None
try:
info = json.loads(result.stdout)
return [info]
except json.JSONDecodeError as e:
print(f"Failed to parse JSON: {e}")
return None
def mumu_create(id):
command = ["MuMuManager.exe", "create", "-v", f'{id}']
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
def mumu_start(id):
command = ["MuMuManager.exe", "control", "-v", f'{id}', "launch"]
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
def mumu_quit(id):
command = ["MuMuManager.exe", "control", "-v", f'{id}', "shutdown"]
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
def mumu_rename(id, name):
command = ["MuMuManager.exe", "rename", "-v", f'{id}', "-n", f"{name}"]
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
def get_code(phone, device_mode=None):
url = f"http://192.168.50.175:9000/api/check_phone?phone={phone}"
res = requests.get(url)
return res.json()["data"]["verification_code"]
country_code_map = {
"1": "US",
"44": "GB",
"27": "ZA",
"20": "ZA",
"234": "NG",
"86": "CN",
"63": "PH",
"62": "PH",
"972": "PH",
}
def split_phone_number(phone_number):
if phone_number.startswith('+'):
phone_number = phone_number[1:]
for prefix in country_code_map:
if phone_number.startswith(prefix):
country_code = prefix
national_number = phone_number[len(prefix):]
return country_code, national_number
return None, None
def _read_and_scale_template(template_path: str, scale_ratio: float) -> np.ndarray:
template = cv2.imdecode(np.fromfile(template_path, dtype=np.uint8), cv2.IMREAD_COLOR)
if template is None:
raise FileNotFoundError(f"模板图片不存在: {template_path}")
if scale_ratio != 1.0:
h, w = template.shape[:2]
template = cv2.resize(
template,
(int(w * scale_ratio), int(h * scale_ratio)),
interpolation=cv2.INTER_AREA
)
return template
def find_img(d, template_path: str, threshold: float = 0.8, retry: int = 1, timeout=5, scale_ratio: float = 1.0):
template = _read_and_scale_template(template_path, scale_ratio)
for _ in range(retry):
try:
screenshot = d.screenshot(format='opencv')
match_result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result)
if max_val >= threshold:
h, w = template.shape[:2]
x = max_loc[0] + w // 2
y = max_loc[1] + h // 2
return (x, y)
except Exception as e:
print(f"图像识别异常: {str(e)}")
time.sleep(timeout)
return False
def adb_find_img(d, template_path: str, threshold: float = 0.8, retry: int = 1, timeout=5, scale_ratio: float = 1.0,
error_type=0):
template = _read_and_scale_template(template_path, scale_ratio)
remote_path = '/sdcard/screenshot.png'
for _ in range(retry):
try:
d.shell(['screencap', '-p', remote_path])
d.pull(remote_path, 'temp_screenshot.png')
screenshot_pil = Image.open('temp_screenshot.png')
screenshot = np.array(screenshot_pil)
screenshot = cv2.cvtColor(screenshot, cv2.COLOR_RGB2BGR)
match_result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result)
if max_val >= threshold:
h, w = template.shape[:2]
x = max_loc[0] + w // 2
y = max_loc[1] + h // 2
if os.path.exists('temp_screenshot.png'):
os.remove('temp_screenshot.png')
return (x, y)
except Exception as e:
print(f"图像识别异常: {str(e)}")
if error_type:
raise ""
if os.path.exists('temp_screenshot.png'):
os.remove('temp_screenshot.png')
time.sleep(timeout)
return False
def u2_adb_screenshot_and_crop(d, crop_area, save_path='wallet_connect.png'):
remote_path = '/sdcard/screenshot.png'
d.shell(['screencap', '-p', remote_path])
d.pull(remote_path, 'temp_screenshot.png')
try:
screenshot = Image.open('temp_screenshot.png')
cropped_img = screenshot.crop(crop_area)
cropped_img.save(save_path)
print(f'Cropped screenshot saved to {save_path}')
except Exception as e:
print(f"裁剪图片时出错: {e}")
finally:
if os.path.exists('temp_screenshot.png'):
os.remove('temp_screenshot.png')
def get_page_user(phone, url1):
url = "http://192.168.50.97:9999/get_user_data"
json_data = {
"phone": phone,
"url": url1
}
res = requests.post(url, json=json_data)
return res.json()
def send_command_to_server(server_host, server_port, command):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((server_host, server_port))
client_socket.sendall(command.encode('utf-8'))
if command.lower() == 'exit':
return None
data_length = int(client_socket.recv(10).decode('utf-8'))
received_data = b''
while len(received_data) < data_length:
chunk = client_socket.recv(data_length - len(received_data))
if not chunk:
break
received_data += chunk
output = received_data.decode('utf-8')
return output
except Exception as e:
print(f"发生错误: {e}")
return None
finally:
client_socket.close()
# 打包拉取
def pull_simulator_backup(
d, # U2对象
phone, # 电话号码
package_name="org.telegram.messenger.web", # 包名
local_backup=r"D:\backups"
):
# 判断并创建目录
if not os.path.exists(local_backup):
try:
os.makedirs(local_backup)
print(f"目录已创建:{local_backup}")
except Exception as e:
print(f"创建目录失败:{e}")
else:
print(f"目录已存在:{local_backup}")
# 打包
extract_cmd = (
"su -c '"
"cd /data/data/ && "
f"tar -cf /data/local/tmp/{phone}.tar {package_name}"
"'"
)
d.shell(extract_cmd, )
# 拉取
d.pull(f"/data/local/tmp/{phone}.tar", f"{local_backup}/{phone}.tar")
# 删除临时文件
d.shell(f"su -c 'rm /data/local/tmp/{phone}.tar'")
# 传入解压移动
def deploy_and_extract_to_android(
d, # U2对象
phone, # 电话号码
package_name="org.telegram.messenger.web", # 包名
local_backup=r"D:\backups"
):
# # 获取包名
# package_name = None
# # 通过adb shell命令获取
# all_packages = d.shell('pm list packages').output
# # 格式化输出(去掉"package:"前缀)
# clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()]
# for i in clean_packages:
# if "telegram" in i:
# package_name = i
# break
# 将本地备份文件上传到模拟器
d.push(f"{local_backup}/{phone}.tar", "/data/local/tmp/telegram_backup.tar")
# 解压
extract_cmd = (
"su -c '"
"mkdir -p /data/local/tmp && "
"tar -xf /data/local/tmp/telegram_backup.tar -C /data/local/tmp"
"'"
)
d.shell(extract_cmd, )
# 获取解压后的目录列表
path = None
list_cmd = "su -c 'ls -d /data/local/tmp/*/'"
result = d.shell(list_cmd)
dirs = result.output.strip().split('\n')
for i in dirs:
if "telegram" in i:
path = i
# 复制文件
copy_cmd = f"su -c 'cp -r {path}* /data/data/{package_name}/'"
d.shell(copy_cmd)
# # 确保目标目录权限正确
# chmod_cmd = "su -c 'chmod -R 755 /data/data/org.telegram.messenger'"
# d.shell(chmod_cmd)
# 可选:删除临时文件
cleanup_cmd = f"su -c 'rm -rf {path.rstrip('/')}'"
d.shell(cleanup_cmd)
cleanup_cmd = f"su -c 'rm -rf /data/data/{package_name}/app_webview'"
d.shell(cleanup_cmd)