Files
to_session/process/tg_process.py
Administrator a0720d80dc fefdwef
2025-11-12 12:54:37 +08:00

2110 lines
76 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import re
import sys
import os
from models.blum_tx import BlumTx
from models.tg_birds import TgBbrids
from models.tg_paws import TgPaws
from models.tg_phone_devices import TgPhoneDevices
from models.tg_xpin import TgXpin
from models.tg_zoo import TgZoo
from models.wallet_okx import WalletAddressOkx
from ton.tools import get_jettons
# from process.tg_hi_ping import TGHIPIN
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 获取 'models' 模块所在的目录,假设它在和 'process' 同级的目录中
models_dir = os.path.dirname(current_dir) # 'process' 目录的父目录
# 将 'models' 模块所在的目录添加到 sys.path
sys.path.append(models_dir)
import time
import uiautomator2 as u2
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from models.tg_blum_hz import TgBlumHz
from models.tg_models import TelegramAccount
from process.tools import start_ldplayer, close_ldplayer, get_element, get_host_ip, get_ldplayer_list, match_image, \
generate_fake_info, create_ldplayer, get_code, find_img
import threading
lock = threading.Lock()
def get_x_y(coordinate, x, y):
# 随机生成 0-50 之间的数值
random_value = random.randint(x, y)
# 随机决定是加还是减
if random.choice([True, False]):
new_coordinate = coordinate + random_value
else:
new_coordinate = coordinate - random_value
return new_coordinate
def swipe_up(d, start_y1, end_y1, duration=0.1, ):
"""
向上滑动屏幕的函数。
:param d: uiautomator2 的设备实例
:param duration: 滑动持续时间(秒)
"""
# 获取屏幕的宽度和高度
width, height = d.window_size()
# 定义滑动的起始和结束位置
start_x = width / 2
start_y = height * start_y1 # 从屏幕下方 80% 的位置开始滑动
end_x = width / 2
end_y = height * end_y1 # 滑动到屏幕上方 20% 的位置
# 执行滑动操作
d.swipe(start_x, start_y, end_x, end_y, duration)
def get_code_paw(url):
import requests
from lxml import html
try:
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"dnt": "1",
"priority": "u=0, i",
"sec-ch-ua": "\"Not/A)Brand\";v=\"8\", \"Chromium\";v=\"126\", \"Microsoft Edge\";v=\"126\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0"
}
response = requests.get(url, headers=headers)
# 将字符串形式的HTML转换为lxml可以解析的HTML树
root = html.fromstring(response.content)
# 使用XPath查找ID为'test'的div元素
code = root.xpath('//*[@id="code"]')[0].text
paw = root.xpath('//*[@id="password"]')[0].text
return code, paw
except Exception as e:
raise '电话号码失效'
# 装饰器封住
def decorator(func):
def wrapper(self, *args, **kwargs):
result = None
# 这里可以在函数执行前增加逻辑
# 启动模拟器
if not self.start_d():
return
try:
result = func(self, *args, **kwargs)
except Exception as e:
print(e)
# 这里可以在函数执行后增加逻辑
close_ldplayer(simulator_id=self.index_id)
return result
return wrapper
class TG:
def __init__(self, tg_phone, dev_info=None, type1=0, ):
self.d = None # u2原生对象
self.index_id = None # 雷电模拟器的id
self.wallet_okx = None # 钱包对象
self.type1 = type1 # 雷电启动类型 0index1name
self.tg_phone = tg_phone # 电话表的信息
self.dev_info = dev_info # session表对象
if type1 == 1: # 名字启动的逻辑
# 查询出id
for i in get_ldplayer_list():
if self.tg_phone.ld_name == i["Name"]:
self.index_id = i["ID"]
break
self.id = "emulator-" + str(5554 + (int(self.index_id) * 2))
elif type1 == 0: # id启动的逻辑
self.id = "emulator-" + str(5554 + (int(self.tg_phone.index_id) * 2))
self.index_id = self.tg_phone.index_id
def open_vpn(self, ):
type1, ele = get_element(self.d, type1="text", grammar='Clash', timeout=1)
if type1: # 点开过
ele.click()
else:
apk_path = '../data/clash.apk'
self.d.app_install(apk_path)
time.sleep(5)
self.d.xpath('//*[@text="配置"]').click()
time.sleep(5)
self.d.xpath('//*[@content-desc="新建"]').click()
time.sleep(5)
self.d.xpath('//*[@text="URL"]').click()
time.sleep(5)
self.d.xpath('//*[@text="仅接受 http(s) 和 content 类型"]').click()
time.sleep(5)
self.d.send_keys(
'https://api-huacloud.dev/sub?target=clash&insert=true&emoji=true&udp=true&clash.doh=true&new_name=true&filename=Flower_SS&url=https%3A%2F%2Fapi.xmancdn.com%2Fosubscribe.php%3Fsid%3D119913%26token%3DJPyA80MQ0pi7%26sip002%3D1')
time.sleep(5)
self.d.xpath('//*[@text="确认"]').click()
time.sleep(5)
self.d.xpath('//*[@resource-id="com.github.kr328.clash.foss:id/action_layout"]').click()
time.sleep(15)
self.d.xpath('//*[@text="新配置"]').click()
time.sleep(5)
self.d.shell('input keyevent 4')
time.sleep(5)
type1, ele = get_element(self.d, type1="text", grammar='已停止', timeout=5)
if type1: # 点开过+
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='确定', timeout=5)
if type1: # 点开过+
ele.click()
# 使用 shell 方法执行 adb 命令,模拟按下主页按钮
self.d.shell('input keyevent 3')
def open_vpn1(self):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Hiddify"]', timeout=1)
if type1: # 点开过
ele.click()
else:
time.sleep(5)
self.d.app_install('../data/Hiddify-Android-universal.apk')
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="开始"]', timeout=10)
if type1: # 点开过
ele.click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="新的配置文件"]', timeout=1)
if type1: # 点开过
ele.click()
time.sleep(5)
self.d.xpath('//*[@content-desc="从剪贴板添加"]').set_text(
"https://api-huacloud.dev/sub?target=clash&insert=true&emoji=true&udp=true&clash.doh=true&new_name=true&filename=Flower_SS&url=https%3A%2F%2Fapi.xmancdn.com%2Fosubscribe.php%3Fsid%3D147271%26token%3Du8lHQf7fIWL5%26sip002%3D1")
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="点击连接"]', timeout=10)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="确定"]', timeout=1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="已连接"]', timeout=50)
if type1:
time.sleep(5)
def open_vpn2(self):
# 判断是否桌面是否有软件
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="FlClash"]', timeout=5)
if type1: # 有就删除,再安装
# self.d.app_uninstall("com.follow.clash")
# time.sleep(5)
# self.d.app_install('../data/FlClash.apk')
ele.click()
else: # 没有安装
self.d.app_install('../data/FlClash.apk')
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="同意"]', timeout=5)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@content-desc="代理\n第 2 个标签,共 4 个"]',
timeout=5)
if not type1:
time.sleep(5)
self.d.xpath('//*[@content-desc="配置\n第 2 个标签,共 3 个"]').click()
time.sleep(5)
self.d.xpath('(//android.widget.Button)[last()]').click()
time.sleep(5)
self.d.xpath('//*[@content-desc="URL\n通过URL获取配置文件"]').click()
time.sleep(5)
self.d.xpath('(//android.widget.EditText)[last()]').click()
time.sleep(1)
self.d.send_keys(
"https://api-huacloud.dev/sub?target=clash&insert=true&emoji=true&udp=true&clash.doh=true&new_name=true&filename=Flower_SS&url=https%3A%2F%2Fapi.xmancdn.com%2Fosubscribe.php%3Fsid%3D147271%26token%3Du8lHQf7fIWL5%26sip002%3D1")
time.sleep(5)
self.d.xpath('//*[@content-desc="提交"]').click()
time.sleep(10)
time.sleep(5)
self.d.xpath('//*[@content-desc="代理\n第 2 个标签,共 4 个"]').click()
time.sleep(5)
self.d.xpath('//android.widget.Button[2]').click()
time.sleep(5)
self.d.xpath('//*[@content-desc="延迟"]').click()
time.sleep(5)
self.d.press('back')
time.sleep(5)
self.d.xpath('(//android.widget.Button)[last()]').click()
time.sleep(15)
self.d.click(309, 1291)
time.sleep(5)
self.d.xpath('//*[@content-desc="仪表盘\n第 1 个标签,共 4 个"]').click()
time.sleep(5)
self.d.xpath('//*[@content-desc="00:00:00"]').click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="确定"]', timeout=5)
if type1:
ele.click()
time.sleep(10)
# 模拟按下 HOME 键,即返回桌面
self.d.press("home")
time.sleep(5)
def kick_out_device(self):
self.d.xpath('//*[@text="Telegram"]').click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=5)
if type1: # 点开过+
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=5)
if type1: # 点开过+
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=5)
if type1: # 点开过+
ele.click()
self.d.xpath('//*[@content-desc="Open navigation menu"]').click()
self.d.xpath('//*[@text="Settings"]').click()
time.sleep(2)
swipe_up(d=self.d, start_y1=0.8, end_y1=0.3, duration=0.5)
self.d.xpath('//*[@text="Devices"]').click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Terminate All Other Sessions"]', timeout=5)
if type1: # 点开过+
ele.click()
self.d.xpath('//*[@text="Terminate"]').click()
@decorator
def up_name(self):
self.d.xpath('//*[@text="Telegram"]').click()
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
time.sleep(5)
self.d.xpath('//*[@content-desc="Open navigation menu"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Settings"]').click()
time.sleep(5)
self.d.xpath('//*[@content-desc="More options"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Edit Info"]').click()
time.sleep(5)
rect = self.d.xpath('//*[@text="Your name"]').bounds
self.d.long_click(rect[2] - rect[0], rect[3] + 90)
time.sleep(5)
self.d.xpath('//*[@text="全选"]').click()
time.sleep(5)
self.d.click(rect[2] - rect[0], rect[3] + 90)
time.sleep(5)
for i in range(35):
self.d.press("delete")
time.sleep(5)
user_info = generate_fake_info()
self.d.send_keys(f"🐦 SUI {user_info['first_name']}")
time.sleep(5)
# self.d.xpath(
# '/hierarchy/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.FrameLayout[2]/android.widget.FrameLayout[1]/android.widget.LinearLayout[1]/android.widget.ImageButton[1]/android.widget.ImageView[1]').click()
self.d.click(1003, 155)
time.sleep(15)
tg_brids_info = TgBbrids.get_or_none(TgBbrids.mobile == self.tg_phone.ld_name)
tg_brids_info.modify_nickname = 1
tg_brids_info.baned = 1
tg_brids_info.save()
def log_tg(self):
try:
# 查询模拟器是否存在
for i in get_ldplayer_list():
if self.tg_phone.ld_name == i["Name"]:
self.index_id = i["ID"]
break
else: # 没有查询到,创建模拟器
create_ldplayer(name=self.tg_phone.ld_name)
time.sleep(10)
# 查询出id
for i in get_ldplayer_list():
if self.tg_phone.ld_name == i["Name"]:
self.index_id = i["ID"]
break
if self.index_id:
self.id = "emulator-" + str(5554 + (int(self.index_id) * 2))
logger.info(f"账号:{self.tg_phone.ld_name}查询到模拟器id启动")
else:
logger.info(f"账号:{self.tg_phone.ld_name},没有查询到,退出!!!")
return
self.start_d()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=5) # 等待tg打开
if type1:
ele.click()
else:
time.sleep(1)
self.d.app_install('../data/Telegram.apk')
# 链接打开后的tg检测检测
if self.tg_testing():
logger.info(f"账号:{self.tg_phone.ld_name},开始登录。。。")
else:
logger.info(f"账号:{self.tg_phone.ld_name},登录成功。。。")
self.tg_phone.phone_type = 1
self.tg_phone.is_logged_in_telegram = 1
self.tg_phone.server_id = get_host_ip()
self.tg_phone.save()
close_ldplayer(simulator_id=self.index_id)
return
time.sleep(1)
type1, ele = get_element(self.d, type1="text", grammar='Start Messaging', timeout=5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='Continue', timeout=5)
if type1: # 点开过
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='允许', timeout=5)
if type1: # 点开过
ele.click()
self.d.xpath('//*[@content-desc="Country code"]').set_text(self.tg_phone.one_phone_number)
self.d.xpath('//*[@content-desc="Phone number"]').set_text(self.tg_phone.telephone)
self.d.xpath('//*[@content-desc="Done"]').click()
self.d(text="Yes").click()
type1, ele = get_element(self.d, type1="text", grammar='Continue', timeout=5)
if type1: # 点开过
ele.click()
type1, ele = get_element(self.d, type1="text", grammar='允许', timeout=5)
if type1: # 点开过
ele.click()
# 获取验证码,密码
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Check your Telegram messages"]',
timeout=100)
time.sleep(10)
code = get_code(phone=self.tg_phone.ld_name, )
# 输入验证码,密码
# self.d.send_keys(code)
self.d.shell(f'input text "{code}"')
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Your password"]',
timeout=100)
time.sleep(3)
self.d.send_keys("Qasdasd123.0")
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@content-desc="Done"]', timeout=5)
if type1: # 点开过
ele.click()
self.tg_phone.is_logged_in_telegram = 1
self.tg_phone.server_id = get_host_ip()
self.tg_phone.save()
time.sleep(25)
except:
pass
finally:
close_ldplayer(simulator_id=self.index_id)
# 使用链接打开tg的检测
def tg_testing(self):
logger.info(f"电话号码:{self.tg_phone.ld_name}:账号检测中》》》")
# 判断需要tg打开链接
for i in range(8):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=3)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5)
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Telegram"]', timeout=0.5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=0.5)
if type1: ele.click()
# 判断tg是否掉了
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Start Messaging"]',
timeout=0.5) # 等待tg打开
if type1:
self.tg_phone.is_logged_in_telegram = 0
self.tg_phone.save()
logger.info(f"电话号码:{self.tg_phone.ld_name}:检测完成:未登录!!!")
return True
logger.warning(f"电话号码:{self.tg_phone.ld_name}:检测完成,已登录!!!")
return False
@decorator
def bind_wallet(self, ):
# 使用url打开tg小程序
self.d.open_url('https://t.me/wallet/start')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
else:
print("检查完成,账号正常!!!")
# 第一次打开钱包的按钮
type1, ele = get_element(d=self.d, type1="xpath",
grammar='//*[@text="I agree to the Terms of Use"]/android.view.View',
timeout=5)
if type1:
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="Continue"]').click()
# 已经加入过的逻辑
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="TON Space"]', timeout=5)
if type1:
ele.click()
x_y = match_image(d=self.d, template_path='wallet_int_ton_set_up.png', retry_times=10)
if x_y:
self.d.click(x_y[0][0], x_y[0][1])
swipe_up(d=self.d, start_y1=0.8, end_y1=0.5, duration=0.1)
self.d.xpath('//*[@text="Remove TON Space"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Remove TON Space"]').click()
time.sleep(5)
self.d.xpath('//*[@text="I understand that this TON Space will be removed from this device"]').click()
time.sleep(5)
self.d.xpath('//*[@text="I have safely stored the Secret Recovery Phrase for TON Space"]').click()
time.sleep(5)
self.d.xpath('(//*[@text="Remove TON Space"])[last()]').click()
else:
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="让我们开始吧"]', timeout=5)
if type1:
ele.click()
x_y = match_image(d=self.d, template_path='wallet_set_up.png', retry_times=10)
if x_y:
self.d.click(x_y[0][0], x_y[0][1])
# 修改语言
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="钱包语言中文 (简体)"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="English"]').click()
time.sleep(5)
self.d.xpath('//*[@content-desc="Go back"]').click()
# 打开ton开关
x_y = match_image(d=self.d, template_path='wallet_ton_start.png', retry_times=10)
if x_y:
self.d.click(x_y[0][0], x_y[0][1])
time.sleep(5)
self.d.xpath('//*[@content-desc="Go back"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="TON Space"]', timeout=5)
if type1:
ele.click()
swipe_up(d=self.d, start_y1=0.8, end_y1=0.5, duration=0.1)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Add Existing Wallet"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="Import Manually"]').click()
test = 'birth comic stage error expose certain gossip unfold wagon piece mushroom chronic meat knock cart fix shine lunch future hammer gauge very awake ecology'
self.d.send_keys(test.replace(" ", "\n"))
self.d.xpath('//*[@text="Next"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="View Wallet"]', timeout=500)
if type1:
ele.click()
def open_project(self, url):
# 打开小程序
self.d.open_url(url)
# 判断需要tg打开链接
for i in range(5):
type1, ele = get_element(d=self.d, type1='xpath', grammar='//*[@text="Start"]', timeout=1)
if type1:
ele.click()
time.sleep(5)
break
try:
type1, ele = get_element(d=self.d, type1='xpath', grammar="//*[contains(@text, 'Telegram')]", timeout=1)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=1)
if type1: ele.click()
except Exception as e:
pass
def get_code(self):
# 启动模拟器
if not self.start_d():
return
self.d.xpath('//*[@text="Telegram"]').click()
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return True
for i in range(10):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=0.1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=0.1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.1)
if type1:
ele.click()
self.d.xpath('//*[@content-desc="Search"]').set_text('telegram')
time.sleep(5)
try:
self.d.xpath('//*[@text="Telegram, Verified\n, support"]').click()
except Exception as e:
self.d.xpath('//*[@text="Telegram Notifications, Verified\n, support"]').click()
for i in range(5):
swipe_up(d=self.d, start_y1=0.8, end_y1=0.3, duration=0.1)
time.sleep(5)
def get_code1(self):
self.d = u2.connect(self.id)
# # 链接打开后的tg检测检测
# if self.tg_testing():
# print("账号掉了!!!")
# return True
for i in range(15):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=0.5)
if type1:
ele.click()
for i in range(15):
swipe_up(d=self.d, start_y1=0.8, end_y1=0.3, duration=0.1)
time.sleep(1)
codes = []
for i in self.d.xpath('//android.view.ViewGroup').all():
matches = re.findall(r'\d{5}', i.text)
if matches:
codes.append(matches[0])
return codes
def close_ldplayer(self):
# 这里可以在函数执行后增加逻辑
close_ldplayer(simulator_id=self.index_id)
def start_d(self):
print(self.index_id, )
d_type = 0
# 连接模拟器
for i in range(3):
if d_type:
break
start_ldplayer(simulator_id=self.index_id)
for i in range(3):
time.sleep(15)
try:
self.d = u2.connect(self.id)
d_type = 1
break
except Exception as e:
pass
else:
close_ldplayer(simulator_id=self.index_id, )
time.sleep(10)
return d_type
def action(self):
print(self.index_id, )
d_type = 0
# 连接模拟器
for i in range(3):
if d_type:
break
start_ldplayer(simulator_id=self.index_id)
for i in range(3):
time.sleep(15)
try:
self.d = u2.connect(self.id)
d_type = 1
break
except Exception as e:
pass
else:
close_ldplayer(simulator_id=self.index_id, )
time.sleep(10)
# tg逻辑
# self.tg_action()
code = self.get_code() # 获取验证码
# 启动仓鼠
# self.hk_action()
close_ldplayer(simulator_id=self.index_id)
return code
@decorator
def action1(self):
tg_blum = TgBlumHz.get_or_none(TgBlumHz.tg_id == self.tg_phone.id)
if not tg_blum:
tg_blum = TgBlumHz.create(
tg_id=self.tg_phone.id,
phone=self.tg_phone.ld_name,
)
elif tg_blum.start == 1:
return
self.open_vpn2()
self.open_project(url='https://t.me/ton_society_bot')
time.sleep(5)
type1, ele = get_element(d=self.d, type1='xpath', grammar='//*[@content-desc="Bot menu"]', timeout=50)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1='xpath', grammar='//*[@text="Start"]', timeout=10)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1='xpath', grammar='//*[@text="Normie Airdrop"]', timeout=50)
if type1:
print(1)
tg_blum.start = 1
else:
tg_blum.start = 1
tg_blum.save()
@decorator
def pell_action(self):
try:
self.open_vpn2() # 打开vpn
self.open_project(url='https://t.me/AllianceMicroNodeBot/AMN?startapp=2P7FSM')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
time.sleep(25)
except Exception as e:
print(e)
def tonkeeper_add(self):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=10)
if type1:
self.d.app_uninstall('com.ton_keeper')
time.sleep(5)
from pathlib import Path
# 获取当前脚本所在的文件夹路径
current_folder_path = Path(__file__).parent.resolve()
# 上一个路径
grandparent_folder_path = current_folder_path.parent
# 获取当前脚本所在文件夹下 data 文件夹的路径
data_folder_path = grandparent_folder_path / 'data'
with lock:
os.system(f"adb -s {self.d.serial} install -t {data_folder_path}/Tonkeeper.apk") # -s 指定设备,-t 允许测试包
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=10)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="导入现有钱包"]', timeout=10)
if type1:
time.sleep(5)
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="使用24个秘密恢复词导入钱包"]', timeout=10)
if type1:
time.sleep(5)
ele.click()
time.sleep(5)
self.d.xpath('//*[@resource-id="com.ton_keeper:id/word_1"]').click()
time.sleep(5)
# 输入助记词
words = self.wallet_okx.words.replace(" ", "\n")
self.d.send_keys(words)
time.sleep(5)
self.d.xpath('//*[@text="继续"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="继续"]', timeout=5)
if type1:
time.sleep(5)
self.d.xpath('//*[@resource-id="com.ton_keeper:id/selected"]').click()
time.sleep(5)
ele.click()
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
self.d.xpath('//*[@text="之后"]').click()
time.sleep(5)
self.d.xpath('//*[@text="继续"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@resource-id="com.ton_keeper:id/stories_close"]',
timeout=15)
if type1:
ele.click()
time.sleep(5)
res = self.d.xpath('//*[contains(@text, "您的地址")]').get_text().split('')[-1].split()[0]
if res != self.wallet_okx.v4r2_address[-4:]:
self.d.xpath('//*[@resource-id="com.ton_keeper:id/settings"]').click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="钱包 v4R2"]', timeout=25)
if type1:
time.sleep(5)
ele.click()
time.sleep(5)
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
res = self.d.xpath('//*[contains(@text, "您的地址")]').get_text().split('')[-1]
logger.warning(f"当前模拟器名字:{self.tg_phone.ld_name},助记词后四位:{res}")
if res == self.wallet_okx.v4r2_address[-4:]:
return True
else:
return True
return False
@decorator
def paws_wallet(self):
self.tg_paws_info = TgPaws.get_or_none(TgPaws.phone == self.tg_phone.ld_name)
if self.tg_paws_info:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name},项目表中查询到》》》")
else:
logger.error(f"当前模拟器名字:{self.tg_phone.ld_name},项目表中没有查询到!!!")
return
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "paws-不用跑"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/PAWSOG_bot/PAWS?startapp=l87URKRF')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
try:
self.d.xpath(f'//*[contains(@text, "{self.wallet_okx.v4r2_address[-4:]}")]')
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name},钱包成功!!!")
return
except:
logger.error(f"当前模拟器名字:{self.tg_phone.ld_name},钱包没有绑或者是绑定失败!!!")
self.tg_paws_info.wallet_start = 3
self.tg_paws_info.save()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Check the footprint map"]', timeout=20)
if type1:
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/paws_ton1.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/paws_wallet.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
self.d.xpath('//*[@text="连接钱包"]').click()
self.tg_paws_info.wallet_start = 1
self.tg_paws_info.save()
time.sleep(20)
@decorator
def paws_wallet_start(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "paws-不用跑"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/PAWSOG_bot/PAWS?startapp=l87URKRF')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
try:
self.d.xpath(f'//*[contains(@text, "{self.wallet_okx.v4r2_address[-4:]}")]')
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name},钱包成功!!!")
return
except:
logger.error(f"当前模拟器名字:{self.tg_phone.ld_name},钱包没有绑或者是绑定失败!!!")
self.tg_paws_info.wallet_start = 3
self.tg_paws_info.save()
@decorator
def set_time_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "time"
)
if self.wallet_okx.stast:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name},钱包已经绑定!!!")
return
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.d.open_url(url='https://t.me/TimeFarmCryptoBot?start=1ll25QcyYLilP5NQF')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
type1, ele = get_element(d=self.d, type1='xpath', grammar='(//*[@text="Open App"])[last()]', timeout=10)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1='xpath', grammar='//*[@text="Start"]', timeout=5)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Next"]', timeout=25)
if type1:
ele.click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/time_start.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Done"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="Wallet"]').click()
time.sleep(5)
self.d.xpath('//*[@text="ton Connect your TON wallet >"]').click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/time_tonkeeper.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
# self.d.xpath('//*[@text="连接钱包"]').click()
x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
try:
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
self.wallet_okx.stast = 1
self.wallet_okx.save()
time.sleep(20)
@decorator
def alliance_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "alliance"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/AllianceMicroNodeBot/AMN?startapp=32HSHC')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
self.d.xpath('//*[@text="Quest"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Mission"]').click()
time.sleep(5)
swipe_up(d=self.d, start_y1=0.8, end_y1=0.2)
time.sleep(5)
self.d.xpath('//*[@text="Bind Ton Wallet"]').click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/capybara_tonkeeper.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
# self.d.xpath('//*[@text="连接钱包"]').click()
x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
try:
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
self.wallet_okx.stast = 1
self.wallet_okx.save()
time.sleep(20)
@decorator
def capybara_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "capybara"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/the_capybara_meme_bot/start?startapp=477af0fa174bdccabb09662124ec5834')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
# https://t.me/the_capybara_meme_bot/start?startapp=477af0fa174bdccabb09662124ec5834
self.d.xpath('//*[@text="Home"]').click()
time.sleep(5)
self.d.xpath('//android.view.View/*[@text="Connect Wallet"]').click()
time.sleep(5)
self.d.xpath('//*[@text="TON Wallet"]').click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/capybara_tonkeeper.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
# self.d.xpath('//*[@text="连接钱包"]').click()
x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
try:
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
self.wallet_okx.stast = 1
self.wallet_okx.save()
time.sleep(20)
@decorator
def start_bot(self):
self.open_project(url="https://t.me/terminalgame_bot/terminalgame?startapp=B3GU504V&startApp=B3GU504V")
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
self.open_project(url="https://t.me/terminalgame_bot/terminalgame?startapp=B3GU504V&startApp=B3GU504V")
time.sleep(10)
for i in range(10):
self.d.click(441, 1466)
time.sleep(5)
@decorator
def paws_top(self):
self.open_project(url="https://t.me/PAWSOG_bot/PAWS?startapp=7TeJz714")
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
self.open_project(url="https://t.me/PAWSOG_bot/PAWS?startapp=7TeJz714")
time.sleep(5)
self.d.xpath('//*[@text="Earn"]').click()
time.sleep(5)
self.d.click(433, 658)
import random
# 定义两个坐标
x1, y1 = 735, 780
x2, y2 = 812, 814
# 计算坐标差值
dx = x2 - x1
dy = y2 - y1
# 生成一个随机的权重,范围在 [0, 1]
random_weight = random.random()
# 计算随机位置
random_x = x1 + dx * random_weight
random_y = y1 + dy * random_weight
# 将坐标转换为整数
random_x = int(round(random_x))
random_y = int(round(random_y))
time.sleep(25)
self.d.click(random_x, random_y)
time.sleep(25)
# 定义两个坐标
x1, y1 = 742, 1200
x2, y2 = 780, 1245
# 计算坐标差值
dx = x2 - x1
dy = y2 - y1
# 生成一个随机的权重,范围在 [0, 1]
random_weight = random.random()
# 计算随机位置
random_x = x1 + dx * random_weight
random_y = y1 + dy * random_weight
# 将坐标转换为整数
random_x = int(round(random_x))
random_y = int(round(random_y))
self.d.click(random_x, random_y)
time.sleep(25)
# self.open_project(url="https://t.me/PAWSOG_bot/PAWS?startapp=7TeJz714")
#
# # 链接打开后的tg检测检测
# if self.tg_testing():
# print("账号掉了!!!")
# return
#
# self.d.xpath('//*[@text="Earn"]').click()
# time.sleep(5)
# for i in range(5):
# self.open_project(url="https://t.me/PAWSOG_bot/PAWS?startapp=7TeJz714")
#
# if i == 0:
# type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Earn"]', timeout=50)
# if type1:
# time.sleep(10)
# ele.click()
# time.sleep(5)
#
# x_Y = match_image(d=self.d, template_path="images/paws_task_2.png", retry_times=5, threshold=0.8)
# if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
#
# time.sleep(5)
# x_Y = match_image(d=self.d, template_path="images/paws_start.png", retry_times=5, threshold=0.8)
# if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
# time.sleep(5)
#
# for i in range(5):
# #
# # # 模拟按返回键
# # d.press("back")
#
# type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Check"]', timeout=5)
# if type1:
# time.sleep(5)
# ele.click()
# type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Claim"]', timeout=5)
# if type1:
# time.sleep(5)
# ele.click()
# time.sleep(5)
@decorator
def sq(self):
self.d.xpath('//*[@text="Telegram"]').click()
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return True
self.d.xpath('//*[@content-desc="Search"]').set_text('telegram')
self.d.xpath('//*[@text="Telegram, Verified\n, support"]').click()
self.d.xpath('//*[@text="Confirm"]').click()
time.sleep(5)
@decorator
def paws_satrt(self):
self.d.app_uninstall("com.okinc.okex.gp")
self.open_project(url="https://t.me/PAWSOG_bot/PAWS?startapp=7TeJz714")
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return True
logger.info(f"账号ID{self.tg_phone.id},检测完成!!!")
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Earn"]', timeout=50)
if type1:
time.sleep(5)
ele.click()
try:
x_Y = match_image(d=self.d, template_path="images/paws_task_claim.png", retry_times=5, threshold=0.8)
x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
if x_Y: self.d.click(x_start, y_start)
except:
pass
try:
x_Y = match_image(d=self.d, template_path="images/paws_task_start1.png", retry_times=5, threshold=0.8)
x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
if x_Y: self.d.click(x_start, y_start)
time.sleep(5)
# self.d.xpath('//*[@content-desc="Web tabs PAWS"]').click()
x_Y = match_image(d=self.d, template_path="images/paws_task_claim.png", retry_times=5, threshold=0.8)
x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
if x_Y: self.d.click(x_start, y_start)
except:
pass
# try:
# x_Y = match_image(d=self.d, template_path="images/paws_task_start.png", retry_times=5, threshold=0.8)
# x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
# y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
# if x_Y: self.d.click(x_start, y_start)
# except:
# pass
#
# try:
# x_Y = match_image(d=self.d, template_path="images/paws_task_claim.png", retry_times=5, threshold=0.8)
# x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
# y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
# if x_Y: self.d.click(x_start, y_start)
# except:
# pass
#
# time.sleep(10)
# x_Y = match_image(d=self.d, template_path="images/paws_task_partners.png", retry_times=5, threshold=0.8)
# if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
# x_Y = match_image(d=self.d, template_path="images/paws_task_start1.png", retry_times=5, threshold=0.8)
# x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
# y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
# if x_Y: self.d.click(x_start, y_start)
# time.sleep(5)
# self.d.xpath('//*[@text="Start"]').click()
# time.sleep(5)
# try:
# self.d.xpath('//*[@text="Continue"]').click()
# self.d.xpath('//*[@text="Claim 50 crystal"]').click()
# self.d.xpath('//*[@text="Get points"]').click()
# self.d.xpath('//*[@text="Yes, continue"]').click()
# except:
# pass
#
# time.sleep(25)
# self.d.xpath('//*[@content-desc="Go back"]').click()
# x_Y = match_image(d=self.d, template_path="images/paws_task_check.png", retry_times=5, threshold=0.8)
# x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
# y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
# if x_Y: self.d.click(x_start, y_start)
# time.sleep(10)
# x_Y = match_image(d=self.d, template_path="images/paws_task_claim.png", retry_times=5, threshold=0.8)
# x_start = get_x_y(coordinate=x_Y[0][0], x=0, y=25)
# y_start = get_x_y(coordinate=x_Y[0][1], x=0, y=10)
# if x_Y: self.d.click(x_start, y_start)
time.sleep(25)
@decorator
def zoo_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
# WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "blum"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/zoo_story_bot/game?startapp=ref5660836163')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
for i in range(15):
# //*[@text="下一步"]
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="下一步"]', timeout=1)
if type1:
time.sleep(5)
ele.click()
time.sleep(1)
x_Y = match_image(d=self.d, template_path="images/zoo_1.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(1)
x_Y = match_image(d=self.d, template_path="images/zoo_2.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
self.d.xpath('(//android.widget.Button)[last()]').click()
self.d.xpath('//*[@text="Airdrop"]').click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/zoo_wallet.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
x_Y = match_image(d=self.d, template_path="images/zoo_wallet_1.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/zoo_tonkeeper.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
self.d.xpath('//*[@text="连接钱包"]').click()
time.sleep(10)
self.tg_zoo_info = TgZoo.get_or_none(TgZoo.phone == self.tg_phone.ld_name)
if self.tg_zoo_info:
self.tg_zoo_info.wallet_id = self.wallet_okx.id
self.tg_zoo_info.save(only=[TgZoo.wallet_id])
@decorator
def tg_confirm(self):
try:
self.d.xpath('//*[@text="Telegram"]').click()
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
else:
print("检查完成,账号正常!!!")
for i in range(10):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=0.1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=0.1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="允许"]', timeout=0.1)
if type1:
ele.click()
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="OK"]', timeout=0.1)
if type1:
ele.click()
self.d.xpath('//*[@content-desc="Search"]').set_text('telegram')
time.sleep(5)
try:
self.d.xpath('//*[@text="Telegram, Verified\n, support"]').click()
except Exception as e:
self.d.xpath('//*[@text="Telegram Notifications, Verified\n, support"]').click()
for i in range(5):
swipe_up(d=self.d, start_y1=0.8, end_y1=0.3, duration=0.1)
time.sleep(5)
for i in range(5):
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Confirm"]', timeout=0.1)
if type1:
ele.click()
time.sleep(5)
time.sleep(5)
return True
except Exception as e:
print(e)
return False
@decorator
def sen_bot(self):
self.d.open_url(url="https://t.me/bandit_quest_verification_bot")
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="开始"]', timeout=50)
if type1: # 有就删除,再安装
self.d.app_uninstall("com.okinc.okex.gp")
self.d.send_keys('/connect <YOUR_WALLET_ADDRESS>')
self.d.xpath('//*[@content-desc="发送"]').click()
@decorator
def blum_tx1(self, blum_type):
# if self.tonkeeper_add():
# logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
# else:
# logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
# return
#
# self.open_project(url='https://t.me/blum/app')
#
# # 链接打开后的tg检测检测
# if self.tg_testing():
# print("账号掉了!!!")
# return
#
# self.d.xpath('//*[@text="Wallet"]').click()
# self.d.xpath('//*[@text="Arrow down"]').click()
# self.d.xpath('//*[@text="Disconnect"]').click()
# self.d.xpath('//*[@text="Connect wallet"]').click()
# self.d.xpath('//*[@text="Tonkeeper"]').click()
#
# time.sleep(5)
# # self.d.xpath('//*[@text="连接钱包"]').click()
# x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
# if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
#
# try:
# time.sleep(5)
# for i in range(2):
# for i in range(4):
# self.d.xpath('//*[@text="1"]').click()
# time.sleep(1)
# except:
# pass
self.d.open_url("https://t.me/BlumCryptoTradingBot")
time.sleep(30)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
if blum_type:
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="START"]', timeout=5) # 等待tg打开
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='(//*[@text="AGREE & CONTINUE"])[last()]',
timeout=5) # 等待tg打开
if type1:
ele.click()
time.sleep(5)
else:
type1, ele = get_element(self.d, type1="xpath", grammar='(//*[@text="同意"])[last()]',
timeout=1) # 等待tg打开
if type1:
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="Message"]').set_text('/start')
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text=".. BUY • .. SELL"]',
timeout=5) # 等待tg打开
if not type1:
self.d.xpath('//*[@text="Settings"]').click()
time.sleep(5)
type1, ele = get_element(self.d, type1="xpath", grammar='(//*[@text=".. Language"])[last()]',
timeout=1) # 等待tg打开
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='(//*[@text=".. 语言"])[last()]',
timeout=1) # 等待tg打开
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='(//*[@text="语言"])[last()]',
timeout=1) # 等待tg打开
if type1:
ele.click()
type1, ele = get_element(self.d, type1="xpath", grammar='(//*[@text="Language"])[last()]',
timeout=1) # 等待tg打开
if type1:
ele.click()
time.sleep(5)
self.d.xpath('//*[@text="English"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Message"]').set_text('/start')
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
self.d.xpath('//*[@text="Wallet"]').click()
time.sleep(5)
self.d.xpath('//*[@text="TON wallet"]').click()
time.sleep(5)
self.d.xpath('(//*[@text="Import wallet"])[last()]').click()
time.sleep(5)
self.d.xpath('//*[@text="CONFIRM"]').click()
time.sleep(5)
self.d.send_keys(self.blum_tx.text)
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
self.d.xpath('//*[@text="V4R2"]').click()
time.sleep(5)
self.d.xpath('//*[@text="Message"]').set_text('/start')
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
self.d.xpath('//*[@text=".. BUY • .. SELL"]').click()
time.sleep(5)
self.d.xpath('//*[@text=".. Token lists"]').click()
time.sleep(5)
self.d.xpath('//*[@text="❤️‍.. Blum hot"]').click()
time.sleep(5)
self.d.xpath('//*[@text="1⃣ FPIBANK"]').click()
time.sleep(5)
if "3%" not in self.d.xpath('//*[contains(@text, "Slippage")]').get_text():
time.sleep(5)
self.d.xpath('//*[contains(@text, "Slippage")]').click()
time.sleep(5)
self.d.xpath('//*[@text="Message"]').set_text('3')
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
if "10 TON" not in self.d.xpath('//*[contains(@text, "Buy amount")]').get_text():
time.sleep(5)
self.d.xpath('(//*[contains(@text, "Buy amount")])[last()]').click()
time.sleep(5)
random_num = round(random.uniform(0.1, 0.2), 2)
self.d.xpath('//*[@text="Message"]').set_text(str(random_num))
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
self.d.xpath('//*[@text="BUY"]').click()
for i in range(50):
if self.process_item():
time.sleep(5)
break
time.sleep(5)
self.blum_sell()
if self.blum_tx.tx_num:
self.blum_tx.tx_num += 1
else:
self.blum_tx.tx_num = 1
self.blum_tx.tx_start = 1
self.blum_tx.save()
def process_item(self, ):
data = get_jettons(address=self.blum_tx.address)
if not data:
return
if data:
data = [item for item in data if
item.get("name") != "Locked TON" and item.get("name") != "TON Activity Reward"]
has_non_zero = any(item['balance'] != 0 for item in data)
if has_non_zero:
return True
return False
def blum_sell(self):
# self.d.open_url("https://t.me/BlumCryptoTradingBot")
#
# time.sleep(25)
# type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Remind me later"]', timeout=5)
# if type1: ele.click()
# time.sleep(5)
self.d.xpath('//*[@text="Message"]').set_text('/start')
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
self.d.xpath('(//*[@text=".. Positions"])[last()]').click()
time.sleep(5)
self.d.xpath('(//*[@text="FPIBANK"])[last()]').click()
time.sleep(5)
if "3%" not in self.d.xpath('//*[contains(@text, "Slippage")]').get_text():
time.sleep(5)
self.d.xpath('(//*[contains(@text, "Slippage")])[last()]').click()
time.sleep(5)
self.d.xpath('//*[@text="Message"]').set_text('3')
time.sleep(5)
x_y = find_img(d=self.d, timeout=1, retry=10, template_path="images/tg_send.png")
if x_y:
self.d.click(x_y[0], x_y[1])
time.sleep(5)
self.d.xpath('(//*[@text="SELL"])[last()]').click()
time.sleep(25)
def blum_action(self):
self.blum_tx = BlumTx.get_or_none(
BlumTx.phone == self.tg_phone.ld_name,
)
logger.info(f'电话号码:{self.tg_phone.ld_name},开始查询地址是否有代币!!!')
if self.process_item():
logger.info(f'电话号码:{self.tg_phone.ld_name},地址有代币!!!')
self.blum_tx1(blum_type=0)
else:
logger.info(f'电话号码:{self.tg_phone.ld_name},地址没有代币!!!')
self.blum_tx1(blum_type=1)
# if not self.blum_tx.tx_start:
# self.blum_tx1(blum_type=1)
# else:
# logger.info(f'电话号码:{self.tg_phone.ld_name},已做过交易跳过!!!')
return
@decorator
def game_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "gameness"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/GamenessBot/app?startapp=ref_qBSlye')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
for i in range(3):
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="Continue"]', timeout=3) # 等待tg打开
if type1:
ele.click()
time.sleep(3)
self.d.xpath('//*[@text="Wallet"]').click()
time.sleep(3)
self.d.xpath('//*[@text="Connect Wallet"]').click()
x_Y = match_image(d=self.d, template_path="images/game_tonkeepper.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
time.sleep(5)
# self.d.xpath('//*[@text="连接钱包"]').click()
x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
try:
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
self.wallet_okx.stast = 1
self.wallet_okx.save()
time.sleep(20)
@decorator
def goat_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "goatgamingbot"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/goatgamingbot/goatgaming?startapp=ref_R3LBGZCSF')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
time.sleep(60)
x_Y = match_image(d=self.d, template_path="images/goat_task1.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/goat_x.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="S3 SNAPSHOT SOON"]', timeout=5)
if type1: # 点开过+
self.d.xpath('(//android.widget.Button)[5]').click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="WIN US$1,000+ ON JACKPOTS!"]', timeout=5)
if type1: # 点开过+
self.d.xpath('(//android.widget.Button)[5]').click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="CHECK OUT REFERRALS"]', timeout=5)
if type1: # 点开过+
self.d.xpath('(//android.widget.Button)[5]').click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="BOOST $GG TGE ALLOCATION"]', timeout=5)
if type1: # 点开过+
self.d.xpath('(//android.widget.Button)[5]').click()
type1, ele = get_element(self.d, type1="xpath", grammar='//*[@text="BOOST YOUR XP GAIN"]', timeout=5)
if type1: # 点开过+
self.d.xpath('(//android.widget.Button)[5]').click()
time.sleep(10)
self.d.click(843, 196)
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/goat_connect_ton.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/goat_ton.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
time.sleep(5)
# self.d.xpath('//*[@text="连接钱包"]').click()
x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
if x_Y:
self.d.click(x_Y[0][0], x_Y[0][1])
try:
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
self.wallet_okx.stast = 1
self.wallet_okx.save()
time.sleep(20)
@decorator
def ducks_wallet(self):
self.wallet_okx = WalletAddressOkx.get_or_none(
# WalletAddressOkx.tg_id == self.tg_phone.id,
WalletAddressOkx.ld_name == self.tg_phone.ld_name,
WalletAddressOkx.project_name == "ducks"
)
if self.tonkeeper_add():
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定成功")
else:
logger.info(f"当前模拟器名字:{self.tg_phone.ld_name}tonkeeper绑定失败")
return
self.open_project(url='https://t.me/duckscoop_bot/app?startapp=HowrUJpa40')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
# time.sleep(30)
self.d.xpath('//*[@text="Connect TON Wallet"]').click()
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/goat_ton.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="Tonkeeper"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
type1, ele = get_element(d=self.d, type1="xpath", grammar='//*[@text="始终"]', timeout=5)
if type1:
ele.click()
time.sleep(5)
time.sleep(5)
# self.d.xpath('//*[@text="连接钱包"]').click()
x_Y = match_image(d=self.d, template_path="images/tonkeeper__.png", retry_times=5, threshold=0.8)
if x_Y:
self.d.click(x_Y[0][0], x_Y[0][1])
try:
time.sleep(5)
for i in range(2):
for i in range(4):
self.d.xpath('//*[@text="1"]').click()
time.sleep(1)
except:
pass
self.wallet_okx.stast = 1
self.wallet_okx.save()
time.sleep(20)
@decorator
def xpin_okx_wallet(self):
self.open_project(url='https://t.me/XPIN_Offical_Bot/XPIN?startapp=stl175_1947911285261996032')
# 链接打开后的tg检测检测
if self.tg_testing():
print("账号掉了!!!")
return
self.d.xpath('//*[@text="Connect"]').click()
x_Y = match_image(d=self.d, template_path="images/xpin_okx.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/xpin_okx1.png", retry_times=5, threshold=0.8)
if x_Y: self.d.click(x_Y[0][0], x_Y[0][1])
time.sleep(5)
x_Y = match_image(d=self.d, template_path="images/xpin_okx2.png", retry_times=500, threshold=0.8)
if x_Y:
self.d.click(x_Y[0][0], x_Y[0][1])
self.xpin_info = TgXpin.get_or_none(
TgXpin.phone == self.tg_phone.ld_name,
)
self.xpin_info.wallet_start = 1
self.xpin_info.save()
if __name__ == '__main__':
# 启动类型 0:id,1:名字, 登录账号的时候3
action_type = 1
tg_phone_datas = TelegramAccount().select().where(
TelegramAccount.server_id == get_host_ip(),
TelegramAccount.is_logged_in_telegram == 1,
)
# 同时运行
max_threads = 1
delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
while True:
futures = []
for i in tg_phone_datas:
# wallet_okx = WalletAddressOkx.get_or_none(
# # WalletAddressOkx.tg_id == i.id,
# WalletAddressOkx.ld_name == i.ld_name,
# WalletAddressOkx.project_name == "ducks"
# )
#
# if wallet_okx.stast:
# continue
try:
tg = TG(type1=action_type, tg_phone=i)
except:
continue
# 提交线程任务到线程池
future = executor.submit(tg.xpin_okx_wallet)
futures.append(future)
# 线程启动间隔
time.sleep(delay_between_start)
# 等待所有提交的任务完成
done = False
while not done:
done = True
for future in futures:
if not future.done():
done = False
time.sleep(60) # 可以根据需要调整休眠时间
break
time.sleep(300)
# # session登录
#
# # 启动类型 0:id,1:名字, 登录账号的时候3
# action_type = 3
#
# # 同时运行
# max_threads = 10
# delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
#
# with ThreadPoolExecutor(max_workers=max_threads) as executor:
#
# futures = []
#
# for tg_phone_info in TgPhoneDevices.select().where(
# (TgPhoneDevices.is_valid_session == 1) &
# ((TgPhoneDevices.code == "jiedata123123") |
# (TgPhoneDevices.code == "jiedata123"))
# ):
# print(tg_phone_info.phone)
# tg_info = TelegramAccount.get_or_none(
# TelegramAccount.ld_name == tg_phone_info.phone
# )
#
# # if tg_info.server_id:
# # logger.info(f"账号:{tg_phone_info.phone},已登录过,跳过!!!")
# # continue
#
# # 查询或创建,电话号码
# tg_phone, tyoe1 = TelegramAccount.get_or_create(
# one_phone_number=tg_phone_info.area_code,
# telephone=tg_phone_info.phone_number,
# ld_name=tg_phone_info.phone,
# server_id=get_host_ip()
# )
#
# if tg_phone.phone_type:
# logger.info(f"账号:{tg_phone.ld_name},登录成功!!!")
#
# continue
#
# try:
# tg = TG(type1=action_type, tg_phone=tg_phone, dev_info=tg_phone_info)
# # 提交线程任务到线程池
# future = executor.submit(tg.log_tg)
# futures.append(future)
#
# # 线程启动间隔
# time.sleep(delay_between_start)
# except Exception as e:
# continue