import random import re import string import time from concurrent.futures import ThreadPoolExecutor import pyotp from DrissionPage._configs.chromium_options import ChromiumOptions from DrissionPage._pages.chromium_page import ChromiumPage from loguru import logger from hub_web.hub_tools import get_page from models.bitgit import Bitgit from models.ips import Ips from tools123123.email123 import get_verify_code from 比特.bit_tools import openBrowser def extract_numbers(text): # 正则表达式模式,用于匹配整数、浮点数和科学计数法表示的数字 pattern = r'[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?' # 使用 findall 方法查找所有匹配的数值 numbers = re.findall(pattern, text) # 将匹配到的字符串转换为相应的数值类型 numbers = [float(num) if '.' in num or 'e' in num.lower() else int(num) for num in numbers] return numbers[0] class BitGit(): def __init__(self, bitgit_inf): self.bitgit_info = bitgit_inf self.ips_info = Ips.get_or_none(Ips.id == self.bitgit_info.ip_id) self.page = None def totp_now(self): code = pyotp.totp.TOTP(self.bitgit_info.two_fa).now() return code def get_user_data(self, bitgit_info): self.bitgit_tab.get("https://www.bitget.com/zh-CN/asset/recharge") time.sleep(5) self.bitgit_tab.ele('x://span[text()="SOL"]').click() time.sleep(3) self.bitgit_tab.ele('x://*[@id="rc_select_2"]').click() time.sleep(3) self.bitgit_tab.ele('x://div[text()="Solana"]').click() time.sleep(5) self.bitgit_tab.refresh(ignore_cache=True) time.sleep(5) address = self.bitgit_tab.ele( 'x://*[@id="root"]/div/div[2]/div[2]/div[2]/div[1]/div[2]/div[2]/div[1]/div/div/div[2]/div[2]/div/span[2]/div/span/div/div').text self.bitgit_tab.ele( 'x://*[@id="MicroHeaderFrame"]/div[2]/div[3]/div[8]').click() time.sleep(3) uuid = self.bitgit_tab.ele( 'x://div[@class="not-jump mt-2px flex items-center text-thirdText text-12px cursor-pointer leading-18px"]').text user_name = self.bitgit_tab.ele( 'x://*[@id="MicroHeaderFrame"]/div[2]/div[3]/div[8]/div[2]/div/div/div[1]/div[1]/div/div[2]/div[2]/div[1]').text print(uuid, user_name) bitgit_info.user_name = user_name bitgit_info.uuid = extract_numbers(uuid) bitgit_info.address = address bitgit_info.save() def get_page(self, ): try: bit_port = openBrowser(id=self.bitgit_info.bit_id) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) self.page.set.window.max() return True except: pass return False def action(self): try: if self.get_page(): logger.info(f"浏览器打开成功") else: logger.error(f"浏览器打开失败") return for _, i in enumerate(self.page.get_tabs()): if _ == 0: continue i.close() self.bitgit_tab = self.page.new_tab("https://www.bitget.com/zh-CN/") try: a = self.bitgit_tab.ele('x://span[text()="I am not a UK user"]', timeout=10) if a: a.click() self.bitgit_tab.ele('x://a[text()="登录"]').click() time.sleep(10) input_ele = self.bitgit_tab.ele('x://input[@placeholder="手机/邮箱"]', timeout=10) if input_ele: input_ele.input(vals=self.bitgit_info.email, clear=True) self.bitgit_tab.ele('x://span[normalize-space(text())="验证"]').click() input_ele = self.bitgit_tab.ele('x://input[@placeholder="密码"]', timeout=9999) if input_ele: input_ele.input("Hxsd123456@..") self.bitgit_tab.ele('x://span[normalize-space(text())="立即登录"]').click() button_ele = self.bitgit_tab.ele('x://span[normalize-space(text())="继续"]', timeout=10) if button_ele: button_ele.click() time.sleep(5) # 输入邮箱的验证码 try: if self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput0"]', timeout=10): code = get_verify_code(self.bitgit_info.email, 0)[0] for _, i in enumerate(code): self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput{_}"]').input(i) time.sleep(1) except: pass # 输入谷歌验证器的验证码 try: if self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput0"]', timeout=10): for _, i in enumerate(self.totp_now()): self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput{_}"]').input(i) time.sleep(1) except: pass except: pass # 获取key---------------------------------------------------------------------------------------------------- self.bitgit_tab.get(url="https://www.bitget.com/zh-CN/account/newapi") span_ele = self.bitgit_tab.ele('x://span[normalize-space(text())="同意"]', timeout=10) if span_ele: span_ele.click() time.sleep(5) self.bitgit_tab.ele('x://button[@data-testid="NewApiCreateApiButton"]').click() time.sleep(1) self.bitgit_tab.ele('x://h3[text()="系统生成的 API 密钥"]').click() time.sleep(1) # 获取所有小写字母 letters = string.ascii_lowercase self.bitgit_tab.ele('x://input[@data-testid="NewApiCreateDialogRemarkInput"]').input( ''.join(random.choice(letters) for i in range(8))) # 输入备注名 time.sleep(1) self.bitgit_tab.ele('x://input[@data-testid="NewApiCreateDialogApiCodeInput"]').input("hx123456") # 输入备注名 time.sleep(1) self.bitgit_tab.ele('x://span[normalize-space(text())="只读权限"]', timeout=10).click() time.sleep(1) self.bitgit_tab.ele('x://span[@class="bit-checkbox__inner"]').click() time.sleep(1) self.bitgit_tab.ele('x://textarea[@placeholder="单个地址直接填写,多个 IP 地址用半角逗号分离"]').input( self.ips_info.ip) # 输入备注名 time.sleep(1) self.bitgit_tab.ele('x://span[normalize-space(text())="下一步"]', timeout=10).click() time.sleep(5) self.bitgit_tab.ele('x://button[@data-testid="2FAEmailCodeSendButton"]').click() time.sleep(10) # 输入邮箱的验证码 try: if self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput0"]', timeout=10): code = get_verify_code(self.bitgit_info.email, 0)[0] for _, i in enumerate(code): self.bitgit_tab.ele(f'x://input[@data-testid="2FAEmailCodeInput{_}"]').input(i) time.sleep(1) except: pass # 输入谷歌验证器的验证码 try: if self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput0"]', timeout=10): for _, i in enumerate(self.totp_now()): self.bitgit_tab.ele(f'x://input[@data-testid="2FAGoogleCodeInput{_}"]').input(i) time.sleep(1) except: pass self.bitgit_tab.ele('x://span[text()="确定"]').click() time.sleep(50) # apikey = self.bitgit_tab.ele( # 'x://*[@id="__layout"]/div/div[3]/div/div[2]/div/div/div[7]/div/div[2]/div/div/div[1]/div[1]/div/div').text # SecretKey = self.bitgit_tab.ele( # 'x://*[@id="__layout"]/div/div[3]/div/div[2]/div/div/div[7]/div/div[2]/div/div/div[1]/div[2]/div/div').text # # print(apikey) # print(SecretKey) # # self.bitgit_info.apikey = apikey # self.bitgit_info.SecretKey = SecretKey # # self.bitgit_info.save() # # self.bitgit_tab.ele('x://span[normalize-space(text())="确定"]', timeout=10).click() # # -------------------------------------------------------------------------------------------------------------- # self.bitgit_tab.get("https://www.bitget.com/zh-CN/asset/recharge?coinId=122&chainCoinId=3000") # time.sleep(15) # # ele = self.bitgit_tab.ele('x://*[@id="react-joyride-step-0"]/div/div/div/div/div[2]/div[2]/span[1]', # timeout=10) # if ele: # ele.click() # # # 切到主号 # self.bitgit_tab.ele( # 'x://*[@id="MicroHeaderFrame"]/div[2]/div[3]/div[8]').click() # time.sleep(5) # self.bitgit_tab.ele('x://div[@class="flex gap-10px self-end"]').click() # time.sleep(5) # self.bitgit_tab.ele('x://*[@id="pane-first"]/div[2]/div[1]').click() # time.sleep(5) # # # 获取主号的数据 # self.bitgit_tab.get("https://www.bitget.com/zh-CN/asset/recharge?coinId=122&chainCoinId=3000") # # bitgit_info, type1 = BitgitAddress.get_or_create( # email=self.bitgit_info.email, # user_name1=None, # ) # # self.get_user_data(bitgit_info) # # # 点击切换转户 # time.sleep(3) # self.bitgit_tab.ele('x://div[@class="flex gap-10px self-end"]').click() # time.sleep(3) # # # 滚动 # for i in range(3): # self.bitgit_tab.actions.move_to('x://li[@class="list-item account-line"][last()]').scroll( # delta_y=99) # time.sleep(1) # # mini_user = self.bitgit_tab.eles('x://ul[@class="list"]/li') # # for i in range(20): # bitgit_info, type1 = BitgitAddress.get_or_create( # email=self.bitgit_info.email, # user_name1=mini_user[i].ele('x:./div[2]/div[1]').text # ) # # if bitgit_info.address: # continue # # mini_user[i].click() # # time.sleep(5) # self.get_user_data(bitgit_info=bitgit_info) # # self.bitgit_tab.ele('x://div[@class="flex gap-10px self-end"]').click() # time.sleep(5) # # # 滚动 # for i in range(3): # self.bitgit_tab.actions.move_to('x://li[@class="list-item account-line"][last()]').scroll( # delta_y=99) # time.sleep(1) # # mini_user = self.bitgit_tab.eles('x://ul[@class="list"]/li') # # time.sleep(5) time.sleep(10) except Exception as e: print(e) finally: self.page.quit() if __name__ == '__main__': max_threads = 1 delay_between_start = 10 # 每次启动线程之间的延迟时间(秒) with ThreadPoolExecutor(max_workers=max_threads) as executor: for bitgit_inf in Bitgit.select().where(Bitgit.start.is_null()): bitgit_web = BitGit(bitgit_inf) # bitgit_web.action() executor.submit(bitgit_web.action) time.sleep(delay_between_start)