Files
to_session/process/单机版ld.py
Administrator a0720d80dc fefdwef
2025-11-12 12:54:37 +08:00

390 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
from concurrent.futures import ThreadPoolExecutor
from process.tools import get_ldplayer_list, close_ldplayer, start_ldplayer, get_element, get_host_ip, \
split_phone_number, rename_ldplayer, pull_simulator_backup
from loguru import logger
import uiautomator2 as u2
from models.tg_models import TelegramAccount
import socket
import os
import sys
import struct
def send_file(file_path, server_ip='localhost', port=5001):
"""
发送任意类型文件到服务端
:param file_path: 本地文件路径(支持任意后缀名)
:param server_ip: 服务端IP地址默认localhost
:param port: 服务端端口默认5555
:return: 传输成功返回True失败返回False
"""
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"[Error] File not found: {file_path}")
return False
file_name = os.path.basename(file_path)
client_sock = None
try:
# 创建Socket连接
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.settimeout(10) # 设置超时时间
client_sock.connect((server_ip, port))
# 发送文件名(支持任意后缀)
file_name_encoded = file_name.encode()
# 先发送文件名长度4字节整数再发送文件名
client_sock.sendall(struct.pack('I', len(file_name_encoded)))
client_sock.sendall(file_name_encoded)
# 分块发送文件内容
print(f"[Transfer] Sending: {file_name} -> {server_ip}")
sent_bytes = 0
with open(file_path, 'rb') as f:
while chunk := f.read(4096): # 4KB分块传输
client_sock.sendall(chunk)
sent_bytes += len(chunk)
print(f"[Success] Sent {sent_bytes} bytes")
return True
except socket.error as e:
print(f"[Error] Network failure: {str(e)}")
return False
except Exception as e:
print(f"[Error] Unexpected error: {str(e)}")
return False
finally:
if client_sock:
client_sock.close()
def decorator(func):
def wrapper(self, *args, **kwargs):
result = None
# 这里可以在函数执行前增加逻辑
# 启动模拟器
if not self.start_d():
return
try:
result = func(self, *args, **kwargs)
except Exception as e:
logger.error(f"电话号码:{self.tg_phone_info.index_id}{e}")
# 这里可以在函数执行后增加逻辑
close_ldplayer(simulator_id=self.tg_phone_info.index_id)
return result
return wrapper
class Ld:
def __init__(self, tg_phone_info):
self.tg_phone_info = tg_phone_info
# for i in get_ldplayer_list():
# if self.tg_phone_info.ld_name == i["Name"]:
# self.index_id = i["ID"]
# break
self.id = "emulator-" + str(5554 + (int(self.tg_phone_info.index_id) * 2))
def start_d(self):
d_type = 0
# 连接模拟器
for i in range(3):
if d_type:
break
start_ldplayer(simulator_id=self.tg_phone_info.index_id)
for i in range(3):
time.sleep(15)
try:
self.d = u2.connect(self.id)
d_type = 1
break
except Exception as e:
pass
else:
logger.error(f"电话号码:{self.tg_phone_info.ld_name}:连接失败!!!")
self.tg_phone_info.is_logged_in_telegram = -1
self.tg_phone_info.save()
close_ldplayer(simulator_id=self.tg_phone_info.index_id, )
time.sleep(10)
return d_type
# 使用链接打开tg的检测
def tg_testing(self):
logger.info(f"电话号码:{self.tg_phone_info.ld_name}:账号检测中》》》")
# 判断需要tg打开链接
for i in range(8):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=0.5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=0.5)
if type1: ele.click()
# 判断tg是否掉了
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start Messaging"]',
timeout=0.5) # 等待tg打开
if type1:
self.tg_phone_info.is_logged_in_telegram = 0
self.tg_phone_info.save()
logger.info(f"电话号码:{self.tg_phone_info.ld_name}:检测完成:未登录!!!")
return True
logger.warning(f"电话号码:{self.tg_phone_info.ld_name}:检测完成,已登录!!!")
return False
@decorator
def action(self):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
if type1:
ele.click()
else:
self.tg_phone_info.is_logged_in_telegram = 0
self.tg_phone_info.save()
return
if self.tg_testing():
logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!")
return
logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!")
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]',
timeout=0.5) # 等待tg打开
if type1:
ele.click()
# for i in range(3):
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]',
# timeout=0.5) # 等待tg打开
# if type1:
# ele.click()
#
# time.sleep(1)
#
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[contains(@text, "+")]',
# timeout=0.5) # 等待tg打开
# if type1:
# break
phones = self.d.xpath('//*[contains(@text, "+")]').get_text().replace("-", "").replace(' ',
"").replace(
'(', "").replace(')', "").replace("+", "")
one_phone_number, telephone = split_phone_number(phone_number=phones)
# 修改名字
rename_ldplayer(index=self.tg_phone_info.index_id, name=f"{one_phone_number}{telephone}")
self.tg_phone_info.ld_name = f"{one_phone_number}{telephone}"
self.tg_phone_info.one_phone_number = one_phone_number
self.tg_phone_info.telephone = telephone
self.tg_phone_info.is_logged_in_telegram = 1
self.tg_phone_info.phone_type = 1
self.tg_phone_info.save()
# 模拟按下返回键
self.d.press("back")
time.sleep(3)
self.d.xpath('(//android.view.ViewGroup)[1]').click()
time.sleep(3)
type1, ele = get_element(
self.d,
type1="xpath",
grammar='//*[@text="Your account is frozen\nTap to view details"]',
timeout=5
) # 等待tg打开
if type1:
self.tg_phone_info.is_logged_in_telegram = 3
self.tg_phone_info.phone_type = 3
self.tg_phone_info.save()
@decorator
def bf(self):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
if type1:
ele.click()
else:
self.tg_phone_info.is_logged_in_telegram = 0
self.tg_phone_info.save()
return
if self.tg_testing():
logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!")
return
logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!")
# 通过adb shell命令获取
all_packages = self.d.shell('pm list packages').output
# 格式化输出(去掉"package:"前缀)
clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()]
print("纯净包名列表:", clean_packages)
package_name = None
for i in clean_packages:
if "telegram" in i:
package_name = i
break
# 停止应用
self.d.app_stop(package_name)
time.sleep(5)
pull_simulator_backup(
d=self.d,
phone=self.tg_phone_info.ld_name,
local_backup=path
)
time.sleep(10)
send_file(file_path=fr"{path}\{self.tg_phone_info.ld_name}.tar", server_ip="192.168.50.122")
@decorator
def get_sb(self, ):
print(self.d.shell(f"adb -s {self.id} shell getprop ro.product.model"))
# adb -s 127.0.0.1:5555 shell getprop ro.product.model
def get_sb(tg_phone_info, directory, filename, ):
# 打开并读取配置文件
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
config_data = json.load(file)
# 提取所需的键值
phone_manufacturer = config_data.get('propertySettings.phoneManufacturer', 'N/A')
phone_model = config_data.get('propertySettings.phoneModel', 'N/A')
# 组合结果
result = f"{phone_manufacturer} {phone_model}"
tg_phone_info.manufacturer = result
tg_phone_info.save()
# 输出结果
print(f"文件名中的数字: {filename}, 设备信息: {result}")
if __name__ == '__main__':
directory = r"E:\leidian\LDPlayer9\vms\config"
ld_ids = []
for i in get_ldplayer_list():
ld_ids.append(i["ID"])
# print(i["ID"])
# 同时运行
max_threads = 15
delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
for _, id in enumerate(ld_ids):
tg_phone_info = TelegramAccount.get_or_none(
TelegramAccount.index_id == id,
TelegramAccount.server_id == get_host_ip(),
)
if not tg_phone_info.ld_name:
continue
get_sb(tg_phone_info=tg_phone_info, directory=directory, filename=f"leidian{id}.config", )
# ld = Ld(tg_phone_info, )
# executor.submit(ld.action)
# time.sleep(delay_between_start)
# # for i in get_ldplayer_list():
# # if id == i["ID"] and i["Name"] == "NoneNone":
# # ld = Ld(tg_phone_info, )
# # executor.submit(ld.action)
# # time.sleep(delay_between_start)
#
# if tg_phone_info.is_logged_in_telegram == -1:
# # logger.info(f"雷电名称:{id} 已经存在!!!")
# # continue
#
# ld = Ld(tg_phone_info, )
# executor.submit(ld.action)
#
# time.sleep(delay_between_start)
#
# if tg_phone_info.is_logged_in_telegram == -1:
# ld = Ld(tg_phone_info, )
# executor.submit(ld.action)
#
# time.sleep(delay_between_start)
# from pathlib import Path
#
# path = r"E:\backups"
#
# # 指定目录
# directory = Path(path)
#
# # 获取目录下的所有文件
# start = 0
# file_list = [str(file) for file in directory.rglob('*') if file.is_file()]
# for file in file_list:
# if tg_phone_info.ld_name in file:
# start = 1
# break
#
# if start:
# send_file(file_path=fr"{path}\{tg_phone_info.ld_name}.tar", server_ip="192.168.50.122")
# continue
#
# ld = Ld(tg_phone_info, )
# executor.submit(ld.bf)