510 lines
16 KiB
Python
510 lines
16 KiB
Python
import json
|
|
import os
|
|
import threading
|
|
import cv2
|
|
import time
|
|
import socket
|
|
import subprocess
|
|
import numpy as np
|
|
import requests
|
|
from PIL import Image
|
|
from faker import Faker
|
|
|
|
lock = threading.Lock()
|
|
|
|
|
|
def get_device_ids():
|
|
try:
|
|
result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, check=True)
|
|
output = result.stdout
|
|
lines = output.splitlines()
|
|
devices = []
|
|
for line in lines[1:]:
|
|
if line.strip():
|
|
device_id = line.split()[0]
|
|
devices.append(device_id)
|
|
return devices
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"命令执行失败: {e}")
|
|
return []
|
|
|
|
|
|
def get_nox_list():
|
|
command = r'powershell -Command "& \"C:\Program Files (x86)\Nox\bin\NoxConsole.exe\" list"'
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
|
output = result.stdout.strip()
|
|
lines = output.split('\n')
|
|
output_dict = {}
|
|
for line in lines:
|
|
parts = line.split(',')
|
|
if len(parts) >= 3:
|
|
key = parts[0].strip()
|
|
value = {
|
|
'NoxID': parts[1].strip(),
|
|
'名称': parts[2].strip()
|
|
}
|
|
output_dict[key] = value
|
|
return output_dict
|
|
|
|
|
|
def create_simulator(n):
|
|
command = [
|
|
r'C:\Program Files (x86)\Nox\bin\NoxConsole.exe',
|
|
'add',
|
|
f'-name:{n}',
|
|
'-systemtype:12'
|
|
]
|
|
subprocess.run(command, capture_output=True, text=True)
|
|
|
|
|
|
def updata_resolving_power(n):
|
|
command = [
|
|
r'C:\Program Files (x86)\Nox\bin\NoxConsole.exe',
|
|
'modify',
|
|
f'-name:{n}',
|
|
'-resolution:540,960,240'
|
|
]
|
|
subprocess.run(command, capture_output=True, text=True)
|
|
|
|
|
|
def start_simulator(n):
|
|
command = [
|
|
r'D:\Program Files\Nox\bin\NoxConsole.exe',
|
|
'launch',
|
|
f'-index:{n}',
|
|
]
|
|
subprocess.run(command, capture_output=True, text=True)
|
|
|
|
|
|
def clone_simulator(n):
|
|
command = [
|
|
r'D:\Program Files\Nox\bin\NoxConsole.exe',
|
|
'quit',
|
|
f'-index:{n}',
|
|
]
|
|
subprocess.run(command, capture_output=True, text=True)
|
|
|
|
|
|
def get_element(d, type1, grammar, timeout=5):
|
|
if type1 == "xpath":
|
|
end_time = time.time() + timeout
|
|
element = ""
|
|
while time.time() < end_time:
|
|
try:
|
|
element = d.xpath(grammar)
|
|
if element.exists:
|
|
return True, element
|
|
except Exception as e:
|
|
pass
|
|
time.sleep(0.1)
|
|
return False, element
|
|
elif type1 == "text":
|
|
ele = d(text=grammar)
|
|
return ele.exists(timeout=timeout), ele
|
|
|
|
|
|
def start_nox(index=None):
|
|
command = ["NoxConsole.exe", "launch"]
|
|
if index is not None:
|
|
command.append(f"-index:{index}")
|
|
result = subprocess.run(command, capture_output=True, text=True)
|
|
|
|
|
|
def exit_simulator(index=None):
|
|
command = ["NoxConsole.exe", "quit"]
|
|
if index is not None:
|
|
command.append(f"-index:{index}")
|
|
result = subprocess.run(command, capture_output=True, text=True)
|
|
|
|
|
|
def get_host_ip():
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
ip = None
|
|
try:
|
|
s.connect(('8.8.8.8', 80))
|
|
ip = s.getsockname()[0]
|
|
finally:
|
|
s.close()
|
|
return ip
|
|
|
|
|
|
def preprocess_image(image):
|
|
alpha = 1.2
|
|
beta = 30
|
|
adjusted = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
|
|
return adjusted
|
|
|
|
|
|
def match_image(d, template_path: str, threshold: float = 0.8, retry_times: int = 3, wait_time: int = 1):
|
|
for attempt in range(retry_times):
|
|
screenshot_image = d.screenshot(format="opencv")
|
|
screenshot_image = preprocess_image(screenshot_image)
|
|
template_image = cv2.imread(template_path, cv2.IMREAD_COLOR)
|
|
if template_image is None:
|
|
return None
|
|
template_image = preprocess_image(template_image)
|
|
result = cv2.matchTemplate(screenshot_image, template_image, cv2.TM_CCOEFF_NORMED)
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
|
if max_val < threshold:
|
|
if attempt < retry_times - 1:
|
|
time.sleep(wait_time)
|
|
else:
|
|
return None
|
|
else:
|
|
h, w = template_image.shape[:2]
|
|
center_x = int(max_loc[0] + w / 2)
|
|
center_y = int(max_loc[1] + h / 2)
|
|
return [(center_x, center_y)]
|
|
return None
|
|
|
|
|
|
def screenshot_and_crop(d, crop_area, save_path='wallet_connect.png'):
|
|
screenshot = d.screenshot()
|
|
cropped_img = screenshot.crop(crop_area)
|
|
cropped_img.save(save_path)
|
|
print(f'Cropped screenshot saved to {save_path}')
|
|
|
|
|
|
def generate_fake_info():
|
|
fake = Faker('en_US')
|
|
info = {}
|
|
full_name = fake.name()
|
|
first_name, last_name = full_name.split(' ', 1)
|
|
info['first_name'] = first_name
|
|
info['last_name'] = last_name
|
|
birthdate = fake.date_of_birth(minimum_age=18, maximum_age=80)
|
|
info['birth_year'] = birthdate.year
|
|
info['birth_month'] = birthdate.month
|
|
info['birth_day'] = birthdate.day
|
|
return info
|
|
|
|
|
|
def get_ldplayer_list():
|
|
command = [r"ldconsole", "list2"]
|
|
result = subprocess.run(command, capture_output=True, text=True, shell=True)
|
|
output = result.stdout.strip()
|
|
lines = output.split('\n')
|
|
ldplayer_list = []
|
|
for line in lines:
|
|
parts = line.split(',')
|
|
if len(parts) == 10:
|
|
ldplayer_info = {
|
|
"ID": int(parts[0]),
|
|
"Name": parts[1],
|
|
"Param1": int(parts[2]),
|
|
"Param2": int(parts[3]),
|
|
"Param3": int(parts[4]),
|
|
"Param4": int(parts[5]),
|
|
"Param5": int(parts[6]),
|
|
"Width": int(parts[7]),
|
|
"Height": int(parts[8]),
|
|
"DPI": int(parts[9]),
|
|
}
|
|
ldplayer_list.append(ldplayer_info)
|
|
return ldplayer_list
|
|
|
|
|
|
def swipe_up(d, start_y1, end_y1, duration=0.1):
|
|
width, height = d.window_size()
|
|
start_x = width / 2
|
|
start_y = height * start_y1
|
|
end_x = width / 2
|
|
end_y = height * end_y1
|
|
d.swipe(start_x, start_y, end_x, end_y, duration)
|
|
|
|
|
|
def close_ldplayer(simulator_id):
|
|
command = f"dnconsole quit --index {simulator_id}"
|
|
result = subprocess.run(command, capture_output=True, text=True, shell=True)
|
|
|
|
|
|
def start_ldplayer(simulator_id):
|
|
command = f"dnconsole launch --index {simulator_id}"
|
|
with lock:
|
|
result = subprocess.run(command, capture_output=True, text=True, shell=True)
|
|
ld_start = 0
|
|
for i in range(50):
|
|
for i in get_ldplayer_list():
|
|
if i['ID'] == simulator_id and i["Param3"] == 1:
|
|
ld_start = 1
|
|
break
|
|
if ld_start == 1:
|
|
break
|
|
time.sleep(1)
|
|
return ld_start
|
|
|
|
|
|
def create_ldplayer(name):
|
|
command = f"dnconsole add --name {name}"
|
|
with lock:
|
|
result = subprocess.run(command, capture_output=True, text=True, shell=True)
|
|
|
|
|
|
def rename_ldplayer(index, name):
|
|
command = f"dnconsole rename --index {index} --title {name}"
|
|
result = subprocess.run(command, capture_output=True, text=True, shell=True)
|
|
|
|
|
|
def mumu_get_list_all():
|
|
command = ["MuMuManager.exe", "info", "-v", "all"]
|
|
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
|
|
if result.returncode != 0:
|
|
print(f"Command failed with error: {result.stderr}")
|
|
return None
|
|
try:
|
|
info = json.loads(result.stdout)
|
|
return [info]
|
|
except json.JSONDecodeError as e:
|
|
print(f"Failed to parse JSON: {e}")
|
|
return None
|
|
|
|
|
|
def mumu_create(id):
|
|
command = ["MuMuManager.exe", "create", "-v", f'{id}']
|
|
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
|
|
def mumu_start(id):
|
|
command = ["MuMuManager.exe", "control", "-v", f'{id}', "launch"]
|
|
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
|
|
def mumu_quit(id):
|
|
command = ["MuMuManager.exe", "control", "-v", f'{id}', "shutdown"]
|
|
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
|
|
def mumu_rename(id, name):
|
|
command = ["MuMuManager.exe", "rename", "-v", f'{id}', "-n", f"{name}"]
|
|
result = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
|
|
def get_code(phone, device_mode=None):
|
|
url = f"http://192.168.50.175:9000/api/check_phone?phone={phone}"
|
|
res = requests.get(url)
|
|
return res.json()["data"]["verification_code"]
|
|
|
|
|
|
country_code_map = {
|
|
"1": "US",
|
|
"44": "GB",
|
|
"27": "ZA",
|
|
"20": "ZA",
|
|
"234": "NG",
|
|
"86": "CN",
|
|
"63": "PH",
|
|
"62": "PH",
|
|
"972": "PH",
|
|
}
|
|
|
|
|
|
def split_phone_number(phone_number):
|
|
if phone_number.startswith('+'):
|
|
phone_number = phone_number[1:]
|
|
for prefix in country_code_map:
|
|
if phone_number.startswith(prefix):
|
|
country_code = prefix
|
|
national_number = phone_number[len(prefix):]
|
|
return country_code, national_number
|
|
return None, None
|
|
|
|
|
|
def _read_and_scale_template(template_path: str, scale_ratio: float) -> np.ndarray:
|
|
template = cv2.imdecode(np.fromfile(template_path, dtype=np.uint8), cv2.IMREAD_COLOR)
|
|
if template is None:
|
|
raise FileNotFoundError(f"模板图片不存在: {template_path}")
|
|
if scale_ratio != 1.0:
|
|
h, w = template.shape[:2]
|
|
template = cv2.resize(
|
|
template,
|
|
(int(w * scale_ratio), int(h * scale_ratio)),
|
|
interpolation=cv2.INTER_AREA
|
|
)
|
|
return template
|
|
|
|
|
|
def find_img(d, template_path: str, threshold: float = 0.8, retry: int = 1, timeout=5, scale_ratio: float = 1.0):
|
|
template = _read_and_scale_template(template_path, scale_ratio)
|
|
for _ in range(retry):
|
|
try:
|
|
screenshot = d.screenshot(format='opencv')
|
|
match_result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result)
|
|
if max_val >= threshold:
|
|
h, w = template.shape[:2]
|
|
x = max_loc[0] + w // 2
|
|
y = max_loc[1] + h // 2
|
|
return (x, y)
|
|
except Exception as e:
|
|
print(f"图像识别异常: {str(e)}")
|
|
time.sleep(timeout)
|
|
return False
|
|
|
|
|
|
def adb_find_img(d, template_path: str, threshold: float = 0.8, retry: int = 1, timeout=5, scale_ratio: float = 1.0,
|
|
error_type=0):
|
|
template = _read_and_scale_template(template_path, scale_ratio)
|
|
remote_path = '/sdcard/screenshot.png'
|
|
for _ in range(retry):
|
|
try:
|
|
d.shell(['screencap', '-p', remote_path])
|
|
d.pull(remote_path, 'temp_screenshot.png')
|
|
screenshot_pil = Image.open('temp_screenshot.png')
|
|
screenshot = np.array(screenshot_pil)
|
|
screenshot = cv2.cvtColor(screenshot, cv2.COLOR_RGB2BGR)
|
|
match_result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result)
|
|
if max_val >= threshold:
|
|
h, w = template.shape[:2]
|
|
x = max_loc[0] + w // 2
|
|
y = max_loc[1] + h // 2
|
|
if os.path.exists('temp_screenshot.png'):
|
|
os.remove('temp_screenshot.png')
|
|
return (x, y)
|
|
except Exception as e:
|
|
print(f"图像识别异常: {str(e)}")
|
|
if error_type:
|
|
raise ""
|
|
if os.path.exists('temp_screenshot.png'):
|
|
os.remove('temp_screenshot.png')
|
|
time.sleep(timeout)
|
|
return False
|
|
|
|
|
|
def u2_adb_screenshot_and_crop(d, crop_area, save_path='wallet_connect.png'):
|
|
remote_path = '/sdcard/screenshot.png'
|
|
d.shell(['screencap', '-p', remote_path])
|
|
d.pull(remote_path, 'temp_screenshot.png')
|
|
try:
|
|
screenshot = Image.open('temp_screenshot.png')
|
|
cropped_img = screenshot.crop(crop_area)
|
|
cropped_img.save(save_path)
|
|
print(f'Cropped screenshot saved to {save_path}')
|
|
except Exception as e:
|
|
print(f"裁剪图片时出错: {e}")
|
|
finally:
|
|
if os.path.exists('temp_screenshot.png'):
|
|
os.remove('temp_screenshot.png')
|
|
|
|
|
|
def get_page_user(phone, url1):
|
|
url = "http://192.168.50.97:9999/get_user_data"
|
|
json_data = {
|
|
"phone": phone,
|
|
"url": url1
|
|
}
|
|
res = requests.post(url, json=json_data)
|
|
return res.json()
|
|
|
|
|
|
def send_command_to_server(server_host, server_port, command):
|
|
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
client_socket.connect((server_host, server_port))
|
|
client_socket.sendall(command.encode('utf-8'))
|
|
if command.lower() == 'exit':
|
|
return None
|
|
data_length = int(client_socket.recv(10).decode('utf-8'))
|
|
received_data = b''
|
|
while len(received_data) < data_length:
|
|
chunk = client_socket.recv(data_length - len(received_data))
|
|
if not chunk:
|
|
break
|
|
received_data += chunk
|
|
output = received_data.decode('utf-8')
|
|
return output
|
|
except Exception as e:
|
|
print(f"发生错误: {e}")
|
|
return None
|
|
finally:
|
|
client_socket.close()
|
|
|
|
|
|
# 打包拉取
|
|
def pull_simulator_backup(
|
|
d, # U2对象
|
|
phone, # 电话号码
|
|
package_name="org.telegram.messenger.web", # 包名
|
|
local_backup=r"D:\backups"
|
|
|
|
):
|
|
# 判断并创建目录
|
|
if not os.path.exists(local_backup):
|
|
try:
|
|
os.makedirs(local_backup)
|
|
print(f"目录已创建:{local_backup}")
|
|
except Exception as e:
|
|
print(f"创建目录失败:{e}")
|
|
else:
|
|
print(f"目录已存在:{local_backup}")
|
|
|
|
# 打包
|
|
extract_cmd = (
|
|
"su -c '"
|
|
"cd /data/data/ && "
|
|
f"tar -cf /data/local/tmp/{phone}.tar {package_name}"
|
|
"'"
|
|
)
|
|
d.shell(extract_cmd, )
|
|
|
|
# 拉取
|
|
d.pull(f"/data/local/tmp/{phone}.tar", f"{local_backup}/{phone}.tar")
|
|
|
|
# 删除临时文件
|
|
d.shell(f"su -c 'rm /data/local/tmp/{phone}.tar'")
|
|
|
|
|
|
# 传入解压移动
|
|
def deploy_and_extract_to_android(
|
|
d, # U2对象
|
|
phone, # 电话号码
|
|
package_name="org.telegram.messenger.web", # 包名
|
|
local_backup=r"D:\backups"
|
|
):
|
|
# # 获取包名
|
|
# package_name = None
|
|
# # 通过adb shell命令获取
|
|
# all_packages = d.shell('pm list packages').output
|
|
# # 格式化输出(去掉"package:"前缀)
|
|
# clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()]
|
|
# for i in clean_packages:
|
|
# if "telegram" in i:
|
|
# package_name = i
|
|
# break
|
|
|
|
# 将本地备份文件上传到模拟器
|
|
d.push(f"{local_backup}/{phone}.tar", "/data/local/tmp/telegram_backup.tar")
|
|
|
|
# 解压
|
|
extract_cmd = (
|
|
"su -c '"
|
|
"mkdir -p /data/local/tmp && "
|
|
"tar -xf /data/local/tmp/telegram_backup.tar -C /data/local/tmp"
|
|
"'"
|
|
)
|
|
d.shell(extract_cmd, )
|
|
|
|
# 获取解压后的目录列表
|
|
path = None
|
|
list_cmd = "su -c 'ls -d /data/local/tmp/*/'"
|
|
result = d.shell(list_cmd)
|
|
dirs = result.output.strip().split('\n')
|
|
for i in dirs:
|
|
if "telegram" in i:
|
|
path = i
|
|
|
|
# 复制文件
|
|
copy_cmd = f"su -c 'cp -r {path}* /data/data/{package_name}/'"
|
|
d.shell(copy_cmd)
|
|
|
|
# # 确保目标目录权限正确
|
|
# chmod_cmd = "su -c 'chmod -R 755 /data/data/org.telegram.messenger'"
|
|
# d.shell(chmod_cmd)
|
|
|
|
# 可选:删除临时文件
|
|
cleanup_cmd = f"su -c 'rm -rf {path.rstrip('/')}'"
|
|
d.shell(cleanup_cmd)
|
|
|
|
cleanup_cmd = f"su -c 'rm -rf /data/data/{package_name}/app_webview'"
|
|
d.shell(cleanup_cmd)
|