321 lines
12 KiB
Python
321 lines
12 KiB
Python
import random
|
|
import re
|
|
import string
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
import pyotp
|
|
from DrissionPage._configs.chromium_options import ChromiumOptions
|
|
from DrissionPage._pages.chromium_page import ChromiumPage
|
|
from loguru import logger
|
|
|
|
from hub_web.hub_tools import get_page
|
|
from models.bitgit import Bitgit
|
|
|
|
from models.ips import Ips
|
|
from tools123123.email123 import get_verify_code
|
|
|
|
from 比特.bit_tools import openBrowser
|
|
|
|
|
|
def extract_numbers(text):
|
|
# 正则表达式模式,用于匹配整数、浮点数和科学计数法表示的数字
|
|
pattern = r'[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?'
|
|
|
|
# 使用 findall 方法查找所有匹配的数值
|
|
numbers = re.findall(pattern, text)
|
|
|
|
# 将匹配到的字符串转换为相应的数值类型
|
|
numbers = [float(num) if '.' in num or 'e' in num.lower() else int(num) for num in numbers]
|
|
|
|
return numbers[0]
|
|
|
|
|
|
class BitGit():
|
|
def __init__(self, bitgit_inf):
|
|
|
|
self.bitgit_info = bitgit_inf
|
|
|
|
self.ips_info = Ips.get_or_none(Ips.id == self.bitgit_info.ip_id)
|
|
|
|
self.page = None
|
|
|
|
def totp_now(self):
|
|
|
|
code = pyotp.totp.TOTP(self.bitgit_info.two_fa).now()
|
|
return code
|
|
|
|
def get_user_data(self, bitgit_info):
|
|
self.bitgit_tab.get("https://www.bitget.com/zh-CN/asset/recharge")
|
|
time.sleep(5)
|
|
self.bitgit_tab.ele('x://span[text()="SOL"]').click()
|
|
time.sleep(3)
|
|
self.bitgit_tab.ele('x://*[@id="rc_select_2"]').click()
|
|
time.sleep(3)
|
|
self.bitgit_tab.ele('x://div[text()="Solana"]').click()
|
|
|
|
time.sleep(5)
|
|
self.bitgit_tab.refresh(ignore_cache=True)
|
|
time.sleep(5)
|
|
|
|
address = self.bitgit_tab.ele(
|
|
'x://*[@id="root"]/div/div[2]/div[2]/div[2]/div[1]/div[2]/div[2]/div[1]/div/div/div[2]/div[2]/div/span[2]/div/span/div/div').text
|
|
|
|
self.bitgit_tab.ele(
|
|
'x://*[@id="MicroHeaderFrame"]/div[2]/div[3]/div[8]').click()
|
|
time.sleep(3)
|
|
uuid = self.bitgit_tab.ele(
|
|
'x://div[@class="not-jump mt-2px flex items-center text-thirdText text-12px cursor-pointer leading-18px"]').text
|
|
user_name = self.bitgit_tab.ele(
|
|
'x://*[@id="MicroHeaderFrame"]/div[2]/div[3]/div[8]/div[2]/div/div/div[1]/div[1]/div/div[2]/div[2]/div[1]').text
|
|
|
|
print(uuid, user_name)
|
|
|
|
bitgit_info.user_name = user_name
|
|
bitgit_info.uuid = extract_numbers(uuid)
|
|
bitgit_info.address = address
|
|
bitgit_info.save()
|
|
|
|
|
|
def get_page(self, ):
|
|
|
|
try:
|
|
bit_port = openBrowser(id=self.bitgit_info.bit_id)
|
|
|
|
co = ChromiumOptions()
|
|
co.set_local_port(port=bit_port)
|
|
|
|
self.page = ChromiumPage(addr_or_opts=co)
|
|
|
|
self.page.set.window.max()
|
|
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
|
|
def action(self):
|
|
|
|
try:
|
|
|
|
if self.get_page():
|
|
logger.info(f"浏览器打开成功")
|
|
else:
|
|
logger.error(f"浏览器打开失败")
|
|
return
|
|
|
|
for _, i in enumerate(self.page.get_tabs()):
|
|
if _ == 0:
|
|
continue
|
|
|
|
i.close()
|
|
|
|
self.bitgit_tab = self.page.new_tab("https://www.bitget.com/zh-CN/")
|
|
|
|
try:
|
|
|
|
a = self.bitgit_tab.ele('x://span[text()="I am not a UK user"]', timeout=10)
|
|
if a:
|
|
a.click()
|
|
|
|
self.bitgit_tab.ele('x://a[text()="登录"]').click()
|
|
time.sleep(10)
|
|
input_ele = self.bitgit_tab.ele('x://input[@placeholder="手机/邮箱"]', timeout=10)
|
|
if input_ele:
|
|
input_ele.input(vals=self.bitgit_info.email, clear=True)
|
|
|
|
self.bitgit_tab.ele('x://span[normalize-space(text())="验证"]').click()
|
|
|
|
input_ele = self.bitgit_tab.ele('x://input[@placeholder="密码"]', timeout=9999)
|
|
if input_ele:
|
|
input_ele.input("Hxsd123456@..")
|
|
|
|
self.bitgit_tab.ele('x://span[normalize-space(text())="立即登录"]').click()
|
|
button_ele = self.bitgit_tab.ele('x://span[normalize-space(text())="继续"]', timeout=10)
|
|
if button_ele:
|
|
button_ele.click()
|
|
|
|
time.sleep(5)
|
|
|
|
# 输入邮箱的验证码
|
|
try:
|
|
if self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput0"]', timeout=10):
|
|
code = get_verify_code(self.bitgit_info.email, 0)[0]
|
|
for _, i in enumerate(code):
|
|
self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput{_}"]').input(i)
|
|
time.sleep(1)
|
|
except:
|
|
pass
|
|
|
|
# 输入谷歌验证器的验证码
|
|
try:
|
|
if self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput0"]', timeout=10):
|
|
for _, i in enumerate(self.totp_now()):
|
|
self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput{_}"]').input(i)
|
|
time.sleep(1)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
# 获取key----------------------------------------------------------------------------------------------------
|
|
self.bitgit_tab.get(url="https://www.bitget.com/zh-CN/account/newapi")
|
|
span_ele = self.bitgit_tab.ele('x://span[normalize-space(text())="同意"]', timeout=10)
|
|
if span_ele:
|
|
span_ele.click()
|
|
|
|
time.sleep(5)
|
|
self.bitgit_tab.ele('x://button[@data-testid="NewApiCreateApiButton"]').click()
|
|
time.sleep(1)
|
|
self.bitgit_tab.ele('x://h3[text()="系统生成的 API 密钥"]').click()
|
|
time.sleep(1)
|
|
# 获取所有小写字母
|
|
letters = string.ascii_lowercase
|
|
self.bitgit_tab.ele('x://input[@data-testid="NewApiCreateDialogRemarkInput"]').input(
|
|
''.join(random.choice(letters) for i in range(8))) # 输入备注名
|
|
time.sleep(1)
|
|
self.bitgit_tab.ele('x://input[@data-testid="NewApiCreateDialogApiCodeInput"]').input("hx123456") # 输入备注名
|
|
time.sleep(1)
|
|
self.bitgit_tab.ele('x://span[normalize-space(text())="只读权限"]', timeout=10).click()
|
|
time.sleep(1)
|
|
self.bitgit_tab.ele('x://span[@class="bit-checkbox__inner"]').click()
|
|
time.sleep(1)
|
|
self.bitgit_tab.ele('x://textarea[@placeholder="单个地址直接填写,多个 IP 地址用半角逗号分离"]').input(
|
|
self.ips_info.ip) # 输入备注名
|
|
time.sleep(1)
|
|
self.bitgit_tab.ele('x://span[normalize-space(text())="下一步"]', timeout=10).click()
|
|
|
|
time.sleep(5)
|
|
self.bitgit_tab.ele('x://button[@data-testid="2FAEmailCodeSendButton"]').click()
|
|
time.sleep(10)
|
|
|
|
# 输入邮箱的验证码
|
|
try:
|
|
if self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput0"]', timeout=10):
|
|
code = get_verify_code(self.bitgit_info.email, 0)[0]
|
|
for _, i in enumerate(code):
|
|
self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput{_}"]').input(i)
|
|
time.sleep(1)
|
|
except:
|
|
pass
|
|
|
|
# 输入谷歌验证器的验证码
|
|
try:
|
|
if self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput0"]', timeout=10):
|
|
for _, i in enumerate(self.totp_now()):
|
|
self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput{_}"]').input(i)
|
|
time.sleep(1)
|
|
except:
|
|
pass
|
|
|
|
self.bitgit_tab.ele('x://span[text()="确定"]').click()
|
|
|
|
time.sleep(50)
|
|
|
|
# apikey = self.bitgit_tab.ele(
|
|
# 'x://*[@id="__layout"]/div/div[3]/div/div[2]/div/div/div[7]/div/div[2]/div/div/div[1]/div[1]/div/div').text
|
|
# SecretKey = self.bitgit_tab.ele(
|
|
# 'x://*[@id="__layout"]/div/div[3]/div/div[2]/div/div/div[7]/div/div[2]/div/div/div[1]/div[2]/div/div').text
|
|
#
|
|
# print(apikey)
|
|
# print(SecretKey)
|
|
#
|
|
# self.bitgit_info.apikey = apikey
|
|
# self.bitgit_info.SecretKey = SecretKey
|
|
#
|
|
# self.bitgit_info.save()
|
|
#
|
|
# self.bitgit_tab.ele('x://span[normalize-space(text())="确定"]', timeout=10).click()
|
|
|
|
# # --------------------------------------------------------------------------------------------------------------
|
|
# self.bitgit_tab.get("https://www.bitget.com/zh-CN/asset/recharge?coinId=122&chainCoinId=3000")
|
|
# time.sleep(15)
|
|
#
|
|
# ele = self.bitgit_tab.ele('x://*[@id="react-joyride-step-0"]/div/div/div/div/div[2]/div[2]/span[1]',
|
|
# timeout=10)
|
|
# if ele:
|
|
# ele.click()
|
|
#
|
|
# # 切到主号
|
|
# self.bitgit_tab.ele(
|
|
# 'x://*[@id="MicroHeaderFrame"]/div[2]/div[3]/div[8]').click()
|
|
# time.sleep(5)
|
|
# self.bitgit_tab.ele('x://div[@class="flex gap-10px self-end"]').click()
|
|
# time.sleep(5)
|
|
# self.bitgit_tab.ele('x://*[@id="pane-first"]/div[2]/div[1]').click()
|
|
# time.sleep(5)
|
|
#
|
|
# # 获取主号的数据
|
|
# self.bitgit_tab.get("https://www.bitget.com/zh-CN/asset/recharge?coinId=122&chainCoinId=3000")
|
|
#
|
|
# bitgit_info, type1 = BitgitAddress.get_or_create(
|
|
# email=self.bitgit_info.email,
|
|
# user_name1=None,
|
|
# )
|
|
#
|
|
# self.get_user_data(bitgit_info)
|
|
#
|
|
# # 点击切换转户
|
|
# time.sleep(3)
|
|
# self.bitgit_tab.ele('x://div[@class="flex gap-10px self-end"]').click()
|
|
# time.sleep(3)
|
|
#
|
|
# # 滚动
|
|
# for i in range(3):
|
|
# self.bitgit_tab.actions.move_to('x://li[@class="list-item account-line"][last()]').scroll(
|
|
# delta_y=99)
|
|
# time.sleep(1)
|
|
#
|
|
# mini_user = self.bitgit_tab.eles('x://ul[@class="list"]/li')
|
|
#
|
|
# for i in range(20):
|
|
# bitgit_info, type1 = BitgitAddress.get_or_create(
|
|
# email=self.bitgit_info.email,
|
|
# user_name1=mini_user[i].ele('x:./div[2]/div[1]').text
|
|
# )
|
|
#
|
|
# if bitgit_info.address:
|
|
# continue
|
|
#
|
|
# mini_user[i].click()
|
|
#
|
|
# time.sleep(5)
|
|
# self.get_user_data(bitgit_info=bitgit_info)
|
|
#
|
|
# self.bitgit_tab.ele('x://div[@class="flex gap-10px self-end"]').click()
|
|
# time.sleep(5)
|
|
#
|
|
# # 滚动
|
|
# for i in range(3):
|
|
# self.bitgit_tab.actions.move_to('x://li[@class="list-item account-line"][last()]').scroll(
|
|
# delta_y=99)
|
|
# time.sleep(1)
|
|
#
|
|
# mini_user = self.bitgit_tab.eles('x://ul[@class="list"]/li')
|
|
#
|
|
# time.sleep(5)
|
|
|
|
time.sleep(10)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
finally:
|
|
self.page.quit()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
max_threads = 1
|
|
|
|
delay_between_start = 10 # 每次启动线程之间的延迟时间(秒)
|
|
|
|
with ThreadPoolExecutor(max_workers=max_threads) as executor:
|
|
for bitgit_inf in Bitgit.select().where(Bitgit.start.is_null()):
|
|
bitgit_web = BitGit(bitgit_inf)
|
|
# bitgit_web.action()
|
|
|
|
executor.submit(bitgit_web.action)
|
|
|
|
time.sleep(delay_between_start)
|