Files
to_session/process/单机版ld.py

390 lines
13 KiB
Python
Raw Permalink Normal View History

2025-11-12 12:54:37 +08:00
import json
import time
from concurrent.futures import ThreadPoolExecutor
from process.tools import get_ldplayer_list, close_ldplayer, start_ldplayer, get_element, get_host_ip, \
split_phone_number, rename_ldplayer, pull_simulator_backup
from loguru import logger
import uiautomator2 as u2
from models.tg_models import TelegramAccount
import socket
import os
import sys
import struct
def send_file(file_path, server_ip='localhost', port=5001):
"""
发送任意类型文件到服务端
:param file_path: 本地文件路径支持任意后缀名
:param server_ip: 服务端IP地址默认localhost
:param port: 服务端端口默认5555
:return: 传输成功返回True失败返回False
"""
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"[Error] File not found: {file_path}")
return False
file_name = os.path.basename(file_path)
client_sock = None
try:
# 创建Socket连接
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.settimeout(10) # 设置超时时间
client_sock.connect((server_ip, port))
# 发送文件名(支持任意后缀)
file_name_encoded = file_name.encode()
# 先发送文件名长度4字节整数再发送文件名
client_sock.sendall(struct.pack('I', len(file_name_encoded)))
client_sock.sendall(file_name_encoded)
# 分块发送文件内容
print(f"[Transfer] Sending: {file_name} -> {server_ip}")
sent_bytes = 0
with open(file_path, 'rb') as f:
while chunk := f.read(4096): # 4KB分块传输
client_sock.sendall(chunk)
sent_bytes += len(chunk)
print(f"[Success] Sent {sent_bytes} bytes")
return True
except socket.error as e:
print(f"[Error] Network failure: {str(e)}")
return False
except Exception as e:
print(f"[Error] Unexpected error: {str(e)}")
return False
finally:
if client_sock:
client_sock.close()
def decorator(func):
def wrapper(self, *args, **kwargs):
result = None
# 这里可以在函数执行前增加逻辑
# 启动模拟器
if not self.start_d():
return
try:
result = func(self, *args, **kwargs)
except Exception as e:
logger.error(f"电话号码:{self.tg_phone_info.index_id}{e}")
# 这里可以在函数执行后增加逻辑
close_ldplayer(simulator_id=self.tg_phone_info.index_id)
return result
return wrapper
class Ld:
def __init__(self, tg_phone_info):
self.tg_phone_info = tg_phone_info
# for i in get_ldplayer_list():
# if self.tg_phone_info.ld_name == i["Name"]:
# self.index_id = i["ID"]
# break
self.id = "emulator-" + str(5554 + (int(self.tg_phone_info.index_id) * 2))
def start_d(self):
d_type = 0
# 连接模拟器
for i in range(3):
if d_type:
break
start_ldplayer(simulator_id=self.tg_phone_info.index_id)
for i in range(3):
time.sleep(15)
try:
self.d = u2.connect(self.id)
d_type = 1
break
except Exception as e:
pass
else:
logger.error(f"电话号码:{self.tg_phone_info.ld_name}:连接失败!!!")
self.tg_phone_info.is_logged_in_telegram = -1
self.tg_phone_info.save()
close_ldplayer(simulator_id=self.tg_phone_info.index_id, )
time.sleep(10)
return d_type
# 使用链接打开tg的检测
def tg_testing(self):
logger.info(f"电话号码:{self.tg_phone_info.ld_name}:账号检测中》》》")
# 判断需要tg打开链接
for i in range(8):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=0.5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=0.5)
if type1: ele.click()
# 判断tg是否掉了
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start Messaging"]',
timeout=0.5) # 等待tg打开
if type1:
self.tg_phone_info.is_logged_in_telegram = 0
self.tg_phone_info.save()
logger.info(f"电话号码:{self.tg_phone_info.ld_name}:检测完成:未登录!!!")
return True
logger.warning(f"电话号码:{self.tg_phone_info.ld_name}:检测完成,已登录!!!")
return False
@decorator
def action(self):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
if type1:
ele.click()
else:
self.tg_phone_info.is_logged_in_telegram = 0
self.tg_phone_info.save()
return
if self.tg_testing():
logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!")
return
logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!")
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]',
timeout=0.5) # 等待tg打开
if type1:
ele.click()
# for i in range(3):
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]',
# timeout=0.5) # 等待tg打开
# if type1:
# ele.click()
#
# time.sleep(1)
#
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[contains(@text, "+")]',
# timeout=0.5) # 等待tg打开
# if type1:
# break
phones = self.d.xpath('//*[contains(@text, "+")]').get_text().replace("-", "").replace(' ',
"").replace(
'(', "").replace(')', "").replace("+", "")
one_phone_number, telephone = split_phone_number(phone_number=phones)
# 修改名字
rename_ldplayer(index=self.tg_phone_info.index_id, name=f"{one_phone_number}{telephone}")
self.tg_phone_info.ld_name = f"{one_phone_number}{telephone}"
self.tg_phone_info.one_phone_number = one_phone_number
self.tg_phone_info.telephone = telephone
self.tg_phone_info.is_logged_in_telegram = 1
self.tg_phone_info.phone_type = 1
self.tg_phone_info.save()
# 模拟按下返回键
self.d.press("back")
time.sleep(3)
self.d.xpath('(//android.view.ViewGroup)[1]').click()
time.sleep(3)
type1, ele = get_element(
self.d,
type1="xpath",
grammar='//*[@text="Your account is frozen\nTap to view details"]',
timeout=5
) # 等待tg打开
if type1:
self.tg_phone_info.is_logged_in_telegram = 3
self.tg_phone_info.phone_type = 3
self.tg_phone_info.save()
@decorator
def bf(self):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
if type1:
ele.click()
else:
self.tg_phone_info.is_logged_in_telegram = 0
self.tg_phone_info.save()
return
if self.tg_testing():
logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!")
return
logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!")
# 通过adb shell命令获取
all_packages = self.d.shell('pm list packages').output
# 格式化输出(去掉"package:"前缀)
clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()]
print("纯净包名列表:", clean_packages)
package_name = None
for i in clean_packages:
if "telegram" in i:
package_name = i
break
# 停止应用
self.d.app_stop(package_name)
time.sleep(5)
pull_simulator_backup(
d=self.d,
phone=self.tg_phone_info.ld_name,
local_backup=path
)
time.sleep(10)
send_file(file_path=fr"{path}\{self.tg_phone_info.ld_name}.tar", server_ip="192.168.50.122")
@decorator
def get_sb(self, ):
print(self.d.shell(f"adb -s {self.id} shell getprop ro.product.model"))
# adb -s 127.0.0.1:5555 shell getprop ro.product.model
def get_sb(tg_phone_info, directory, filename, ):
# 打开并读取配置文件
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
config_data = json.load(file)
# 提取所需的键值
phone_manufacturer = config_data.get('propertySettings.phoneManufacturer', 'N/A')
phone_model = config_data.get('propertySettings.phoneModel', 'N/A')
# 组合结果
result = f"{phone_manufacturer} {phone_model}"
tg_phone_info.manufacturer = result
tg_phone_info.save()
# 输出结果
print(f"文件名中的数字: {filename}, 设备信息: {result}")
if __name__ == '__main__':
directory = r"E:\leidian\LDPlayer9\vms\config"
ld_ids = []
for i in get_ldplayer_list():
ld_ids.append(i["ID"])
# print(i["ID"])
# 同时运行
max_threads = 15
delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
for _, id in enumerate(ld_ids):
tg_phone_info = TelegramAccount.get_or_none(
TelegramAccount.index_id == id,
TelegramAccount.server_id == get_host_ip(),
)
if not tg_phone_info.ld_name:
continue
get_sb(tg_phone_info=tg_phone_info, directory=directory, filename=f"leidian{id}.config", )
# ld = Ld(tg_phone_info, )
# executor.submit(ld.action)
# time.sleep(delay_between_start)
# # for i in get_ldplayer_list():
# # if id == i["ID"] and i["Name"] == "NoneNone":
# # ld = Ld(tg_phone_info, )
# # executor.submit(ld.action)
# # time.sleep(delay_between_start)
#
# if tg_phone_info.is_logged_in_telegram == -1:
# # logger.info(f"雷电名称:{id} 已经存在!!!")
# # continue
#
# ld = Ld(tg_phone_info, )
# executor.submit(ld.action)
#
# time.sleep(delay_between_start)
#
# if tg_phone_info.is_logged_in_telegram == -1:
# ld = Ld(tg_phone_info, )
# executor.submit(ld.action)
#
# time.sleep(delay_between_start)
# from pathlib import Path
#
# path = r"E:\backups"
#
# # 指定目录
# directory = Path(path)
#
# # 获取目录下的所有文件
# start = 0
# file_list = [str(file) for file in directory.rglob('*') if file.is_file()]
# for file in file_list:
# if tg_phone_info.ld_name in file:
# start = 1
# break
#
# if start:
# send_file(file_path=fr"{path}\{tg_phone_info.ld_name}.tar", server_ip="192.168.50.122")
# continue
#
# ld = Ld(tg_phone_info, )
# executor.submit(ld.bf)