Files
to_session/process/mumu_process.py
Administrator a0720d80dc fefdwef
2025-11-12 12:54:37 +08:00

751 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import threading
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
import uiautomator2 as u2
from rich_progress import Haha
from mini_models.tg_phone_devices import TgPhoneDevices
from models.blum_tx import BlumTx
from models.device_info import DeviceInfo
from models.project_ip import ProjectType
from models.teleqram_account_mumu import TeleqramAccountMumu
from models.tg_models import TelegramAccount
from process.mumu_tools import mumu_quit, mumu_start, mumu_rename, mumu_create, mumu_get_list_all
from process.tools import get_host_ip, get_element, get_code, swipe_up, match_image, find_img, split_phone_number
lock = threading.Lock()
class MuTG:
def __init__(self, tg_phone, progress):
self.dev_info = None
self.tg_phone = tg_phone
self.progress = progress
self.d = None
self.index_id = None
def log_tg(self):
time.sleep(1)
type1, ele = get_element(self.d, type1="text", grammar='Start Messaging', timeout=5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='Continue', timeout=5)
if type1: # 点开过
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='允许', timeout=5)
if type1: # 点开过
ele.click()
self.d.xpath('//*[@content-desc="Country code"]').set_text(self.tg_phone.one_phone_number)
self.d.xpath('//*[@content-desc="Phone number"]').set_text(self.tg_phone.telephone)
self.d.xpath('//*[@content-desc="Done"]').click()
self.d(text="Yes").click()
type1, ele = get_element(self.d, type1="text", grammar='Continue', timeout=5)
if type1: # 点开过
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='允许', timeout=5)
if type1: # 点开过
ele.click()
# 获取验证码,密码
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Check your Telegram messages"]',
timeout=100)
time.sleep(15)
code = get_code(phone=self.tg_phone.ld_name, ) # 连接获取2fa和验证码
# 输入验证码,密码
self.d.send_keys(code) # 输入密码
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Your password"]',
timeout=100)
self.d.send_keys("Qasdasd123.0") # 输入2fa密码
time.sleep(5)
self.d.xpath('//*[@content-desc="Done"]').click()
# 使用链接打开tg的检测
def tg_testing(self):
self.progress.info(f"电话号码:{self.tg_phone.ld_name}:账号检测中》》》")
# 判断需要tg打开链接
for i in range(8):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=0.5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=0.5)
if type1: ele.click()
# 判断tg是否掉了
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start Messaging"]',
timeout=0.5) # 等待tg打开
if type1:
self.tg_phone.index_id = self.index_id
self.tg_phone.phone_type = 0
self.tg_phone.is_logged_in_telegram = 0
self.tg_phone.save()
self.progress.error(f"电话号码:{self.tg_phone.ld_name}:检测完成:未登录!!!")
return True
self.tg_phone.index_id = self.index_id
self.tg_phone.phone_type = 1
self.tg_phone.is_logged_in_telegram = 1
self.tg_phone.save()
self.progress.info(f"电话号码:{self.tg_phone.ld_name}:检测完成,已登录!!!")
return False
def metamask_add(self):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="MetaMask"]', timeout=5)
if type1:
self.d.app_uninstall("io.metamask")
self.d.app_install("../data/metamask.apk")
time.sleep(3)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="开始使用"]', timeout=50)
if type1:
ele.click()
time.sleep(3)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="创建新钱包"]', timeout=10)
if type1:
ele.click()
time.sleep(3)
self.d.xpath(
'//*[@text="我们将使用此数据来了解您如何与我们的营销通信交互。我们可能会分享相关资讯(例如产品特点)。"]').click()
swipe_up(d=self.d, start_y1=0.8, end_y1=0.2)
time.sleep(3)
self.d.xpath('//*[@text="我同意"]').click()
time.sleep(3)
self.d.xpath('//*[@text="我同意适用于我使用MetaMask及其所有功能的使用条款"]').click()
time.sleep(3)
self.d.xpath('//com.horcrux.svg.l').click()
time.sleep(3)
self.d.xpath('//*[@text="接受"]').click()
time.sleep(3)
rect = self.d.xpath('//*[@text="新密码"]').bounds
self.d.click(rect[2] - rect[0], rect[3] + 90)
self.d.send_keys('123456123456')
time.sleep(1)
rect = self.d.xpath('//*[@text="确认密码"]').bounds
self.d.click(rect[2] - rect[0], rect[3] + 90)
self.d.send_keys('123456123456')
time.sleep(5)
self.d.xpath('//*[@resource-id="i-understand-text"]').click()
time.sleep(1)
self.d.xpath('//*[@content-desc="submit-button"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="稍后提醒我"]', timeout=50)
if type1:
time.sleep(3)
ele.click()
time.sleep(3)
self.d.xpath('//*[@text="我了解丢失私钥助记词后将无法访问钱包。"]').click()
time.sleep(3)
self.d.xpath('//*[@text="跳过"]').click()
time.sleep(3)
self.d.xpath('//*[@text="已完成"]').click()
time.sleep(10)
self.d.xpath('//*[@text="不,谢谢"]').click()
time.sleep(3)
self.d.xpath('//*[@text="不,谢谢"]').click()
time.sleep(3)
self.d.xpath('//*[@text="稍后提醒我"]').click()
time.sleep(3)
self.d.xpath('//*[@text="我了解丢失私钥助记词后将无法访问钱包。"]').click()
time.sleep(3)
self.d.xpath('//*[@text="跳过"]').click()
time.sleep(3)
x_Y = match_image(d=self.d, template_path="images/metamask_add.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(3)
self.d.xpath('//*[@content-desc="添加账户或硬件钱包"]').click()
time.sleep(3)
self.d.xpath('//*[@text="导入账户"]').click()
time.sleep(3)
self.d.click(401, 933)
time.sleep(3)
self.d.send_keys(self.tg_koni_info.address1[2:])
time.sleep(3)
swipe_up(d=self.d, start_y1=0.8, end_y1=0.5, duration=0.1)
time.sleep(3)
self.d.xpath('//*[@text="导入"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="账户导入成功!"]', timeout=50)
time.sleep(3)
self.d.xpath('//*[@text=""]').click()
time.sleep(3)
self.d.xpath('//*[@text="Account 2"]').click()
time.sleep(3)
# self.d.xpath('//*[@resource-id="tab-bar-item-Browser"]').click()
# time.sleep(3)
# self.d.xpath('//*[@text="portfolio.metamask.io"]').click()
# time.sleep(3)
#
# # 模拟按键删除
# for _ in range(500):
# self.d.press("del")
#
# self.d.send_keys("https://chainlist.org/chain/1516")
# time.sleep(3)
# self.d.press("enter")
# time.sleep(20)
# swipe_up(d=self.d, start_y1=0.8, end_y1=0.6, duration=0.1)
# time.sleep(3)
# x_Y = match_image(d=self.d, template_path="images/metamask_ip_add.png", retry_times=5, threshold=0.8)
# if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
# time.sleep(3)
# self.d.xpath('//*[@content-desc="连接"]').click()
# time.sleep(3)
# self.d.xpath('//*[@content-desc="确认"]').click()
# time.sleep(3)
# self.d.xpath('//*[@text="切换网络"]').click()
# time.sleep(3)
# self.d.xpath('//*[@text="明白了"]').click()
x_Y = match_image(d=self.d, template_path="images/metamask_lian_add.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(3)
self.d.xpath('//*[@resource-id="tab-bar-item-Actions"]').click()
time.sleep(3)
self.d.xpath('//*[@text="网络名称(可选)"]').set_text("Story Odyssey Testnet")
time.sleep(3)
self.d.xpath('//*[@resource-id="button-menu-select-test-id"]').click()
time.sleep(3)
self.d.xpath('//*[@resource-id="button-menu-select-test-id"]').click()
time.sleep(3)
self.d.xpath('(//*[@text="添加 RPC远程过程调用URL"])[2]').click()
time.sleep(3)
self.d.xpath('//*[@text="新 RPC 网络"]').set_text("https://story-testnet-evm.itrocket.net")
time.sleep(3)
self.d.xpath('(//*[@text="添加 RPC远程过程调用URL"])[2]').click()
time.sleep(3)
self.d.xpath('//*[@text="链 ID可选"]').set_text("1516")
time.sleep(3)
self.d.xpath('(//*[@text="符号"])[2]').set_text("IP")
time.sleep(3)
self.d.xpath('//*[@text="保存"]').click()
time.sleep(3)
self.d.xpath('//*[@text="保存"]').click()
time.sleep(3)
x_Y = match_image(d=self.d, template_path="images/metamask_lian_add.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(3)
swipe_up(d=self.d, start_y1=0.8, end_y1=0.2)
time.sleep(3)
self.d.xpath('//*[@text="Story Odyssey Testnet"]').click()
time.sleep(3)
self.d.xpath('//*[@text="明白了"]').click()
time.sleep(10)
def start_mumu(self):
with lock:
try:
mumu_lists = mumu_get_list_all()
if mumu_lists:
for mumu_info in mumu_lists:
if mumu_info["name"] == self.tg_phone.ld_name:
self.index_id = mumu_info["index"]
self.progress.info(f"当前电话号码:{self.tg_phone.ld_name},找到模拟器名字!!!")
break
else:
self.progress.error(f"当前电话号码:{self.tg_phone.ld_name},没有找到模拟器名字!!!")
self.index_id = mumu_create()
time.sleep(5)
mumu_rename(id=self.index_id, name=self.tg_phone.ld_name)
time.sleep(10)
d_type = 0
# 连接模拟器
for i in range(3):
if d_type:
break
for i in range(3):
self.port = mumu_start(id=self.index_id)
time.sleep(15)
try:
self.d = u2.connect(f"127.0.0.1:{self.port}")
d_type = 1
break
except Exception as e:
pass
else:
mumu_quit(id=self.index_id)
return True
except:
pass
return False
def koni_wallet(self):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Got it"]', timeout=10)
if type1:
time.sleep(3)
ele.click()
time.sleep(3)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Connect now"]', timeout=10)
if type1:
time.sleep(3)
ele.click()
time.sleep(3)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Connect"]', timeout=10)
if type1:
time.sleep(3)
ele.click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/koni_mate.png", retry_times=50, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Retry"]', timeout=20)
if type1:
time.sleep(3)
ele.click()
time.sleep(10)
rect = self.d.xpath('//*[@text="密码"]').bounds
self.d.click(383, 727)
self.d.send_keys('123456123456')
self.d.xpath('//*[@text="登录"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="连接"]', timeout=50)
if type1:
time.sleep(3)
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="Continue"]').click()
time.sleep(5)
self.d.xpath('//*[@text="签名"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Confirm"]').click()
def open_project(self, url):
# 打开小程序
self.d.open_url(url)
# 判断需要tg打开链接
for i in range(5):
type1, ele = get_element(d=self.d, type1='xpath', grammar='//*[@text="Start"]', timeout=1)
if type1:
ele.click()
time.sleep(3)
break
try:
type1, ele = get_element(d=self.d, type1='xpath', grammar="//*[contains(@text, 'Telegram')]", timeout=1)
if type1:
ele.click()
time.sleep(3)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=1)
if type1: ele.click()
except Exception as e:
pass
def action(self):
try:
if self.start_mumu():
self.progress.info(f"当前电话号码:{self.tg_phone.ld_name},启动成功!!!")
else:
self.progress.error(f"当前电话号码:{self.tg_phone.ld_name},启动失败!!!")
raise False
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
if type1:
ele.click()
else:
time.sleep(1)
self.d.app_install('../data/Telegram.apk')
if self.tg_testing():
self.log_tg()
time.sleep(10)
self.tg_phone.is_logged_in_telegram = 1
self.tg_phone.save()
except Exception as e:
print(e)
finally:
mumu_quit(id=self.index_id)
self.progress.update()
def tonkeeper_add(self):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=10)
if type1:
self.d.app_uninstall('com.ton_keeper')
time.sleep(3)
self.d.app_install('../data/Tonkeeper.apk')
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="导入现有钱包"]', timeout=10)
if type1:
time.sleep(3)
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="使用24个秘密恢复词导入钱包"]', timeout=10)
if type1:
time.sleep(3)
ele.click()
time.sleep(3)
self.d.xpath('//*[@resource-id="com.ton_keeper:id/word_1"]').click()
time.sleep(3)
# 输入助记词
words = self.blum_tx_info.text.replace(" ", "\n")
self.d.send_keys(words)
time.sleep(3)
self.d.xpath('//*[@text="继续"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="继续"]', timeout=10)
if type1:
type1, ele1 = get_element(d=self.d, type1="xpath", grammar='//*[@resource-id="com.ton_keeper:id/selected"]',
timeout=5)
if type1:
ele1.click()
time.sleep(3)
ele.click()
time.sleep(3)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
self.d.xpath('//*[@text="之后"]').click()
time.sleep(3)
self.d.xpath('//*[@text="继续"]').click()
time.sleep(10)
res = self.d.xpath('//*[contains(@text, "您的地址")]').get_text().split('')[-1].split()[0]
if res != self.blum_tx_info.address[-4:]:
self.d.xpath('//*[@resource-id="com.ton_keeper:id/settings"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="钱包 v4R2"]', timeout=10)
if type1:
time.sleep(3)
ele.click()
time.sleep(3)
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
res = self.d.xpath('//*[contains(@text, "您的地址")]').get_text().split('')[-1]
logger.warning(f"当前模拟器名字:{self.tg_phone.ld_name},助记词后四位:{res}")
if res == self.blum_tx_info.address[-4:]:
return True
else:
return True
return False
def blum_tx(self):
try:
if self.start_mumu():
logger.info(f"当前电话号码:{self.tg_phone.ld_name},启动成功!!!")
else:
logger.error(f"当前电话号码:{self.tg_phone.ld_name},启动失败!!!")
raise False
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
# 按下桌面按键
self.d.press("home")
self.d.xpath('//*[@text="Telegram"]').click()
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
mumu_quit(id=self.index_id)
return
self.d.xpath('//*[@content-desc="Search"]').set_text('blum')
self.d.xpath('//*[contains(@text, "Blum, Verified")]').click()
# self.d.xpath('//*[@text="Blum, Verified\n, 12 994 072 users"]').click()
self.d.xpath('//*[@content-desc="Bot menu"]').click()
# //*[@text="Start"]
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start"]',
timeout=10) # 等待tg打开
if type1:
time.sleep(3)
ele.click()
# 重新连接钱包 --------------------------------------------------------------------------------------------
x_Y = find_img(d=self.d, template_path="images/blum_wallet.png", retry=500, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_wallet1.png", retry=10, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_wallet2.png", retry=10, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_wallet3.png", retry=10, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_wallet4.png", retry=10, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/tonkeeper__.png", retry=100, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
try:
time.sleep(3)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
x_Y = find_img(d=self.d, template_path="images/tonkeeper_x.png", retry=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_wallet5.png", retry=5, threshold=0.9)
if x_Y:
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Memepad"]',
timeout=500) # 等待tg打开
if type1:
time.sleep(3)
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Get Started"]',
timeout=10) # 等待tg打开
if type1: ele.click()
x_Y = find_img(d=self.d, template_path="images/blum_memepad.png", retry=500, threshold=0.8)
if x_Y: self.d.click(195, 1206)
x_Y = find_img(d=self.d, template_path="images/blum_buy.png", retry=10, threshold=0.9)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_max.png", retry=10, threshold=0.9)
if x_Y:
self.d.send_keys(str(self.blum_tx_info.balance - 0.5))
for i in range(5):
x_Y = find_img(d=self.d, template_path="images/blum_buy1.png", retry=3, threshold=0.9)
if x_Y: self.d.click(x_Y[0], x_Y[1])
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="滑动确认"]',
timeout=5) # 等待tg打开
if type1:
time.sleep(15)
self.d.swipe(91, 1458, 819, 1456, duration=1)
break
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
time.sleep(30)
self.blum_tx_info.buy_start = 1
self.blum_tx_info.save()
x_Y = find_img(d=self.d, template_path="images/tonkeeper_x.png", retry=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
else:
self.d.click(150, 1154)
time.sleep(5)
# 卖币 ------------------------------------------------------------------------------------------------------
# x_Y = match_image(d=self.d, template_path="images/blum_wallet.png", retry_times=500, threshold=0.8)
# if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
# time.sleep(15)
# x_Y = match_image(d=self.d, template_path="images/blum_wallet5.png", retry_times=20, threshold=0.8)
# if not x_Y:
# self.d.click(150, 1154)
# else:
# return
for i in range(3):
x_Y = find_img(d=self.d, template_path="images/blum_sell.png", retry=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_max.png", retry=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
x_Y = find_img(d=self.d, template_path="images/blum_sell1.png", retry=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0], x_Y[1])
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="滑动确认"]',
timeout=5) # 等待tg打开
if type1:
time.sleep(15)
self.d.swipe(91, 1458, 819, 1456, duration=1)
break
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
time.sleep(30)
self.blum_tx_info.tx_start = 1
self.blum_tx_info.sell_start = 1
self.blum_tx_info.save()
except Exception as e:
print(e)
finally:
mumu_quit(id=self.index_id)
if __name__ == '__main__':
# pro_info = ProjectType.get_or_none(
# ProjectType.two_server_ip == get_host_ip(),
# )
#
# # 启动类型 0:id,1:名字
# action_type = 1
#
# tg_phone_datas = TelegramAccount().select().where(
# TelegramAccount.server_id == pro_info.server_ip,
# TelegramAccount.is_logged_in_telegram == 1,
# )
#
# # 同时运行
# max_threads = 1
# delay_between_start = 0.5 # 每次启动线程之间的延迟时间(秒)
#
# with ThreadPoolExecutor(max_workers=max_threads) as executor:
#
# while True:
# futures = []
# for i in tg_phone_datas:
#
# try:
# tg = MuTG(tg_phone=i, )
# except Exception as e:
# continue
#
# # 提交线程任务到线程池
# future = executor.submit(tg.blum_tx)
# futures.append(future)
#
# # 线程启动间隔
# time.sleep(delay_between_start)
#
# # 等待所有提交的任务完成
# done = False
# while not done:
# done = True
# for future in futures:
# if not future.done():
# done = False
# time.sleep(60) # 可以根据需要调整休眠时间
# break
#
# time.sleep(600)
# 启动类型 0:id,1:名字
action_type = 1
tg_phone_datas = TgPhoneDevices().select().where(
TgPhoneDevices.is_valid_session == 1,
).limit(400).offset(400)
# 同时运行
max_threads = 15
delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
while True:
futures = []
with Haha(
total=len(tg_phone_datas),
desc="mumu模拟器流程",
refresh_interval=50
) as progress:
for i in tg_phone_datas:
one_phone_number, telephone = split_phone_number(phone_number=i.phone)
tg_info, type1 = TeleqramAccountMumu.get_or_create(
server_id=get_host_ip(),
ld_name=i.phone,
one_phone_number=one_phone_number,
telephone=telephone,
)
try:
tg = MuTG(tg_phone=tg_info, progress=progress)
except Exception as e:
continue
# 提交线程任务到线程池
future = executor.submit(tg.action)
futures.append(future)
# 线程启动间隔
time.sleep(delay_between_start)
# 等待所有提交的任务完成
done = False
while not done:
done = True
for future in futures:
if not future.done():
done = False
time.sleep(60) # 可以根据需要调整休眠时间
break
time.sleep(600)