390 lines
13 KiB
Python
390 lines
13 KiB
Python
import json
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
from process.tools import get_ldplayer_list, close_ldplayer, start_ldplayer, get_element, get_host_ip, \
|
||
split_phone_number, rename_ldplayer, pull_simulator_backup
|
||
|
||
from loguru import logger
|
||
import uiautomator2 as u2
|
||
|
||
from models.tg_models import TelegramAccount
|
||
|
||
import socket
|
||
import os
|
||
import sys
|
||
import struct
|
||
|
||
|
||
def send_file(file_path, server_ip='localhost', port=5001):
|
||
"""
|
||
发送任意类型文件到服务端
|
||
:param file_path: 本地文件路径(支持任意后缀名)
|
||
:param server_ip: 服务端IP地址,默认localhost
|
||
:param port: 服务端端口,默认5555
|
||
:return: 传输成功返回True,失败返回False
|
||
"""
|
||
# 检查文件是否存在
|
||
if not os.path.exists(file_path):
|
||
print(f"[Error] File not found: {file_path}")
|
||
return False
|
||
|
||
file_name = os.path.basename(file_path)
|
||
client_sock = None
|
||
try:
|
||
# 创建Socket连接
|
||
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
client_sock.settimeout(10) # 设置超时时间
|
||
client_sock.connect((server_ip, port))
|
||
|
||
# 发送文件名(支持任意后缀)
|
||
file_name_encoded = file_name.encode()
|
||
# 先发送文件名长度(4字节整数),再发送文件名
|
||
client_sock.sendall(struct.pack('I', len(file_name_encoded)))
|
||
client_sock.sendall(file_name_encoded)
|
||
|
||
# 分块发送文件内容
|
||
print(f"[Transfer] Sending: {file_name} -> {server_ip}")
|
||
sent_bytes = 0
|
||
with open(file_path, 'rb') as f:
|
||
while chunk := f.read(4096): # 4KB分块传输
|
||
client_sock.sendall(chunk)
|
||
sent_bytes += len(chunk)
|
||
print(f"[Success] Sent {sent_bytes} bytes")
|
||
return True
|
||
|
||
except socket.error as e:
|
||
print(f"[Error] Network failure: {str(e)}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"[Error] Unexpected error: {str(e)}")
|
||
return False
|
||
finally:
|
||
if client_sock:
|
||
client_sock.close()
|
||
|
||
|
||
def decorator(func):
|
||
def wrapper(self, *args, **kwargs):
|
||
result = None
|
||
|
||
# 这里可以在函数执行前增加逻辑
|
||
|
||
# 启动模拟器
|
||
if not self.start_d():
|
||
return
|
||
|
||
try:
|
||
result = func(self, *args, **kwargs)
|
||
except Exception as e:
|
||
logger.error(f"电话号码:{self.tg_phone_info.index_id}:{e}!!!")
|
||
|
||
# 这里可以在函数执行后增加逻辑
|
||
close_ldplayer(simulator_id=self.tg_phone_info.index_id)
|
||
return result
|
||
|
||
return wrapper
|
||
|
||
|
||
class Ld:
|
||
def __init__(self, tg_phone_info):
|
||
self.tg_phone_info = tg_phone_info
|
||
|
||
# for i in get_ldplayer_list():
|
||
# if self.tg_phone_info.ld_name == i["Name"]:
|
||
# self.index_id = i["ID"]
|
||
# break
|
||
|
||
self.id = "emulator-" + str(5554 + (int(self.tg_phone_info.index_id) * 2))
|
||
|
||
def start_d(self):
|
||
d_type = 0
|
||
|
||
# 连接模拟器
|
||
for i in range(3):
|
||
if d_type:
|
||
break
|
||
|
||
start_ldplayer(simulator_id=self.tg_phone_info.index_id)
|
||
|
||
for i in range(3):
|
||
time.sleep(15)
|
||
try:
|
||
self.d = u2.connect(self.id)
|
||
d_type = 1
|
||
break
|
||
except Exception as e:
|
||
pass
|
||
|
||
else:
|
||
logger.error(f"电话号码:{self.tg_phone_info.ld_name}:连接失败!!!")
|
||
self.tg_phone_info.is_logged_in_telegram = -1
|
||
self.tg_phone_info.save()
|
||
|
||
close_ldplayer(simulator_id=self.tg_phone_info.index_id, )
|
||
|
||
time.sleep(10)
|
||
|
||
return d_type
|
||
|
||
# 使用链接打开tg的检测
|
||
def tg_testing(self):
|
||
logger.info(f"电话号码:{self.tg_phone_info.ld_name}:账号检测中》》》")
|
||
|
||
# 判断需要tg打开链接
|
||
for i in range(8):
|
||
|
||
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.5)
|
||
if type1:
|
||
ele.click()
|
||
|
||
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
|
||
if type1:
|
||
ele.click()
|
||
|
||
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
|
||
if type1:
|
||
ele.click()
|
||
|
||
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.5)
|
||
if type1:
|
||
ele.click()
|
||
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5)
|
||
if type1:
|
||
ele.click()
|
||
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=0.5)
|
||
if type1:
|
||
ele.click()
|
||
time.sleep(5)
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=0.5)
|
||
if type1: ele.click()
|
||
|
||
# 判断tg是否掉了
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start Messaging"]',
|
||
timeout=0.5) # 等待tg打开
|
||
if type1:
|
||
self.tg_phone_info.is_logged_in_telegram = 0
|
||
self.tg_phone_info.save()
|
||
|
||
logger.info(f"电话号码:{self.tg_phone_info.ld_name}:检测完成:未登录!!!")
|
||
return True
|
||
|
||
logger.warning(f"电话号码:{self.tg_phone_info.ld_name}:检测完成,已登录!!!")
|
||
return False
|
||
|
||
@decorator
|
||
def action(self):
|
||
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
|
||
if type1:
|
||
ele.click()
|
||
else:
|
||
self.tg_phone_info.is_logged_in_telegram = 0
|
||
self.tg_phone_info.save()
|
||
return
|
||
|
||
if self.tg_testing():
|
||
logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!")
|
||
|
||
return
|
||
|
||
logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!")
|
||
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]',
|
||
timeout=0.5) # 等待tg打开
|
||
if type1:
|
||
ele.click()
|
||
|
||
# for i in range(3):
|
||
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Open navigation menu"]',
|
||
# timeout=0.5) # 等待tg打开
|
||
# if type1:
|
||
# ele.click()
|
||
#
|
||
# time.sleep(1)
|
||
#
|
||
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[contains(@text, "+")]',
|
||
# timeout=0.5) # 等待tg打开
|
||
# if type1:
|
||
# break
|
||
|
||
phones = self.d.xpath('//*[contains(@text, "+")]').get_text().replace("-", "").replace(' ',
|
||
"").replace(
|
||
'(', "").replace(')', "").replace("+", "")
|
||
|
||
one_phone_number, telephone = split_phone_number(phone_number=phones)
|
||
|
||
# 修改名字
|
||
rename_ldplayer(index=self.tg_phone_info.index_id, name=f"{one_phone_number}{telephone}")
|
||
|
||
self.tg_phone_info.ld_name = f"{one_phone_number}{telephone}"
|
||
self.tg_phone_info.one_phone_number = one_phone_number
|
||
self.tg_phone_info.telephone = telephone
|
||
self.tg_phone_info.is_logged_in_telegram = 1
|
||
self.tg_phone_info.phone_type = 1
|
||
|
||
self.tg_phone_info.save()
|
||
|
||
# 模拟按下返回键
|
||
self.d.press("back")
|
||
time.sleep(3)
|
||
self.d.xpath('(//android.view.ViewGroup)[1]').click()
|
||
time.sleep(3)
|
||
type1, ele = get_element(
|
||
self.d,
|
||
type1="xpath",
|
||
grammar='//*[@text="Your account is frozen\nTap to view details"]',
|
||
timeout=5
|
||
) # 等待tg打开
|
||
if type1:
|
||
self.tg_phone_info.is_logged_in_telegram = 3
|
||
self.tg_phone_info.phone_type = 3
|
||
|
||
self.tg_phone_info.save()
|
||
|
||
@decorator
|
||
def bf(self):
|
||
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
|
||
if type1:
|
||
ele.click()
|
||
else:
|
||
self.tg_phone_info.is_logged_in_telegram = 0
|
||
self.tg_phone_info.save()
|
||
return
|
||
|
||
if self.tg_testing():
|
||
logger.error(f"雷电名称:{self.tg_phone_info.ld_name},掉了!!!")
|
||
|
||
return
|
||
|
||
logger.info(f"雷电名称:{self.tg_phone_info.ld_name},检测完成!!!")
|
||
|
||
# 通过adb shell命令获取
|
||
all_packages = self.d.shell('pm list packages').output
|
||
# 格式化输出(去掉"package:"前缀)
|
||
clean_packages = [pkg.split(':')[1] for pkg in all_packages.splitlines()]
|
||
print("纯净包名列表:", clean_packages)
|
||
package_name = None
|
||
for i in clean_packages:
|
||
if "telegram" in i:
|
||
package_name = i
|
||
break
|
||
|
||
# 停止应用
|
||
self.d.app_stop(package_name)
|
||
time.sleep(5)
|
||
|
||
pull_simulator_backup(
|
||
d=self.d,
|
||
phone=self.tg_phone_info.ld_name,
|
||
local_backup=path
|
||
)
|
||
|
||
time.sleep(10)
|
||
|
||
send_file(file_path=fr"{path}\{self.tg_phone_info.ld_name}.tar", server_ip="192.168.50.122")
|
||
|
||
@decorator
|
||
def get_sb(self, ):
|
||
|
||
print(self.d.shell(f"adb -s {self.id} shell getprop ro.product.model"))
|
||
|
||
|
||
# adb -s 127.0.0.1:5555 shell getprop ro.product.model
|
||
|
||
|
||
def get_sb(tg_phone_info, directory, filename, ):
|
||
# 打开并读取配置文件
|
||
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
|
||
config_data = json.load(file)
|
||
|
||
# 提取所需的键值
|
||
phone_manufacturer = config_data.get('propertySettings.phoneManufacturer', 'N/A')
|
||
phone_model = config_data.get('propertySettings.phoneModel', 'N/A')
|
||
|
||
# 组合结果
|
||
result = f"{phone_manufacturer} {phone_model}"
|
||
|
||
tg_phone_info.manufacturer = result
|
||
tg_phone_info.save()
|
||
|
||
# 输出结果
|
||
print(f"文件名中的数字: {filename}, 设备信息: {result}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
|
||
directory = r"E:\leidian\LDPlayer9\vms\config"
|
||
|
||
ld_ids = []
|
||
|
||
for i in get_ldplayer_list():
|
||
ld_ids.append(i["ID"])
|
||
|
||
# print(i["ID"])
|
||
|
||
# 同时运行
|
||
max_threads = 15
|
||
delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
|
||
|
||
with ThreadPoolExecutor(max_workers=max_threads) as executor:
|
||
for _, id in enumerate(ld_ids):
|
||
tg_phone_info = TelegramAccount.get_or_none(
|
||
TelegramAccount.index_id == id,
|
||
TelegramAccount.server_id == get_host_ip(),
|
||
)
|
||
|
||
if not tg_phone_info.ld_name:
|
||
continue
|
||
|
||
get_sb(tg_phone_info=tg_phone_info, directory=directory, filename=f"leidian{id}.config", )
|
||
|
||
# ld = Ld(tg_phone_info, )
|
||
# executor.submit(ld.action)
|
||
# time.sleep(delay_between_start)
|
||
|
||
# # for i in get_ldplayer_list():
|
||
# # if id == i["ID"] and i["Name"] == "NoneNone":
|
||
# # ld = Ld(tg_phone_info, )
|
||
# # executor.submit(ld.action)
|
||
# # time.sleep(delay_between_start)
|
||
#
|
||
# if tg_phone_info.is_logged_in_telegram == -1:
|
||
# # logger.info(f"雷电名称:{id} 已经存在!!!")
|
||
# # continue
|
||
#
|
||
# ld = Ld(tg_phone_info, )
|
||
# executor.submit(ld.action)
|
||
#
|
||
# time.sleep(delay_between_start)
|
||
#
|
||
# if tg_phone_info.is_logged_in_telegram == -1:
|
||
# ld = Ld(tg_phone_info, )
|
||
# executor.submit(ld.action)
|
||
#
|
||
# time.sleep(delay_between_start)
|
||
|
||
# from pathlib import Path
|
||
#
|
||
# path = r"E:\backups"
|
||
#
|
||
# # 指定目录
|
||
# directory = Path(path)
|
||
#
|
||
# # 获取目录下的所有文件
|
||
# start = 0
|
||
# file_list = [str(file) for file in directory.rglob('*') if file.is_file()]
|
||
# for file in file_list:
|
||
# if tg_phone_info.ld_name in file:
|
||
# start = 1
|
||
# break
|
||
#
|
||
# if start:
|
||
# send_file(file_path=fr"{path}\{tg_phone_info.ld_name}.tar", server_ip="192.168.50.122")
|
||
# continue
|
||
#
|
||
# ld = Ld(tg_phone_info, )
|
||
# executor.submit(ld.bf)
|