From 1a287611e654ca9b41f3ba6642e585bfc80c2c95 Mon Sep 17 00:00:00 2001 From: 27942 Date: Mon, 8 Dec 2025 16:20:22 +0800 Subject: [PATCH] rgfewfger --- test.py | 31 +++++- 推特/main.py | 278 ++++++++++++++++++++++++++++++++++----------------- 2 files changed, 212 insertions(+), 97 deletions(-) diff --git a/test.py b/test.py index 4c19206..22c63be 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,28 @@ -print(abs(100 - 10)) -https://letsexchange.io/ -# 3923 3938 16 3932 3938 6 \ No newline at end of file +class MyClass: + def __init__(self): + # 启动时自动执行的方法 + self.start() + + def start(self): + print("启动方法已执行") + + def my_method(self): + print("指定的方法正在执行") + print(bfghsgrg) + # 模拟可能出现的错误 + # 下面这行代码会触发一个异常,你可以根据需要注释掉它来测试正常情况 + # result = 1 / 0 + + def end(self): + print("结束方法已执行") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.end() + + +# 使用 with 语句来调用这个类和方法 +with MyClass() as obj: + obj.my_method() \ No newline at end of file diff --git a/推特/main.py b/推特/main.py index 6777a9b..1a12704 100644 --- a/推特/main.py +++ b/推特/main.py @@ -22,6 +22,70 @@ class Hub_Web: self.page = None self.x_tab = None + self.start() # 执行类时,就启动的方法 + + def start(self): + + self.create_Browser() # 创建浏览器 + + if self.get_page(): + logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功") + else: + logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败") + return + + if self.login_x_main(): + self.xstart_info.start = 1 + self.xstart_info.save() + + logger.success(f"推特名字:{self.x_info.user_name},登录x成功!!!") + + # time.sleep(25) + # + # for i in self.x_tab.cookies(): + # if i["name"] == "auth_token": + # self.xstart_info.cookie = i["value"] + # self.xstart_info.save() + + else: + logger.error(f"推特名字:{self.x_info.user_name},登录x失败!!!") + self.xstart_info.start = 0 + self.xstart_info.save() + + def end(self): + self.page.quit() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.end() + + def create_Browser(self): + if not self.xstart_info.bit_id: + + if not self.xstart_info.ip_id: + self.ips_info = Ips.select().where(Ips.country == "法国").order_by(fn.Rand()).get() + + self.xstart_info.ip_id = self.ips_info.id + self.xstart_info.save() + + logger.info("没有浏览器,创建中。。。") + + fz_datas = get_group_lists() + + bit_id = createBrowser( + name=self.x_info.user_name, + groupId=fz_datas['推特'], + host=self.ips_info.host, + port=int(self.ips_info.port), + proxyUserName=self.ips_info.username, + proxyPassword=self.ips_info.password, + ) + + self.xstart_info.bit_id = bit_id + self.xstart_info.save() + def x_testing(self): user_name = self.x_tab.ele( 'x://*[@id="react-root"]/div/div/div[2]/header/div/div/div/div[2]/div/button/div[2]/div/div[2]/div/div/div/span', @@ -102,11 +166,38 @@ class Hub_Web: self.page.set.window.max() + time.sleep(5) + + for _, i in enumerate(self.page.get_tabs()): + if _ == 0: + continue + + i.close() + return True except: pass return False + def get_x_start(self): + if self.login_x_main(): + self.xstart_info.start = 1 + self.xstart_info.save() + + logger.success(f"推特名字:{self.x_info.user_name},登录x成功!!!") + + # time.sleep(25) + # + # for i in self.x_tab.cookies(): + # if i["name"] == "auth_token": + # self.xstart_info.cookie = i["value"] + # self.xstart_info.save() + + else: + logger.error(f"推特名字:{self.x_info.user_name},登录x失败!!!") + self.xstart_info.start = 0 + self.xstart_info.save() + def login_x_main(self): self.x_tab = self.page.new_tab('https://x.com/login') @@ -431,112 +522,111 @@ class Hub_Web: else: return False - def action(self): + # def action(self): - self.ips_info = Ips.select().where(Ips.country == "法国").order_by(fn.Rand()).get() - if not self.xstart_info.bit_id: - self.xstart_info.ip_id = self.ips_info.id - self.xstart_info.save() +# if not self.xstart_info.bit_id: +# self.xstart_info.ip_id = self.ips_info.id +# self.xstart_info.save() +# +# logger.info("没有浏览器,创建中。。。") +# +# fz_datas = get_group_lists() +# +# bit_id = createBrowser( +# name=self.x_info.user_name, +# groupId=fz_datas['推特'], +# host=self.ips_info.host, +# port=int(self.ips_info.port), +# proxyUserName=self.ips_info.username, +# proxyPassword=self.ips_info.password, +# ) +# +# self.xstart_info.bit_id = bit_id +# self.xstart_info.save() - logger.info("没有浏览器,创建中。。。") +# self.get_page() - fz_datas = get_group_lists() +# if self.get_page(): +# logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功") +# else: +# logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败") +# return +# +# time.sleep(5) - bit_id = createBrowser( - name=self.x_info.user_name, - groupId=fz_datas['推特'], - host=self.ips_info.host, - port=int(self.ips_info.port), - proxyUserName=self.ips_info.username, - proxyPassword=self.ips_info.password, - ) +# for _, i in enumerate(self.page.get_tabs()): +# if _ == 0: +# continue +# +# i.close() - self.xstart_info.bit_id = bit_id - self.xstart_info.save() +# self.get_name() - # self.get_page() +# if self.login_x_main(): +# self.xstart_info.start = 1 +# self.xstart_info.save() +# +# logger.success(f"推特名字:{self.x_info.user_name},登录x成功!!!") +# +# # time.sleep(25) +# # +# # for i in self.x_tab.cookies(): +# # if i["name"] == "auth_token": +# # self.xstart_info.cookie = i["value"] +# # self.xstart_info.save() +# +# else: +# logger.error(f"推特名字:{self.x_info.user_name},登录x失败!!!") +# self.xstart_info.start = 0 +# self.xstart_info.save() - if self.get_page(): - logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功") - else: - logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败") - return +# # 发推 +# try: +# # self.x_tab.get("https://x.com/home") +# time.sleep(random.randint(3, 15)) +# +# text = "Websea顶级渠道,注册就可享受websea合约85%返佣,量大可谈,欢迎代理咨询。 TG(飞机):http://t.me/webseadds" +# +# self.x_tab.actions.click( +# 'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[1]/div/div/div/div/div/div/div/div/div/div/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input( +# text) +# +# time.sleep(random.randint(3, 15)) +# self.x_tab.actions.click( +# on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/button') +# time.sleep(5) +# except: +# pass - time.sleep(5) +# # 获取推文链接 +# self.get_urls() - for _, i in enumerate(self.page.get_tabs()): - if _ == 0: - continue +# # 浏览推文 +# try: +# self.to_li() +# except: +# pass - i.close() +# # 养号流程 +# try: +# self.account_nurturing() +# except Exception as e: +# print(e) - self.get_name() +# 点击关注 +# tab = self.page.new_tab(url="https://x.com/CryptoStart_App") +# time.sleep(10) +# ele = tab.ele('x://span[text()="Follow"]', timeout=0.5) +# if ele: +# tab.actions.click(on_ele=ele) +# time.sleep(10) +# +# for i in range(random.randint(1, 10)): +# tab.actions.scroll(delta_y=random.randint(400, 800)) +# time.sleep(random.randint(3, 10)) - if self.login_x_main(): - self.xstart_info.start = 1 - self.xstart_info.save() - - logger.success(f"推特名字:{self.x_info.user_name},登录x成功!!!") - - # time.sleep(25) - # - # for i in self.x_tab.cookies(): - # if i["name"] == "auth_token": - # self.xstart_info.cookie = i["value"] - # self.xstart_info.save() - - else: - logger.error(f"推特名字:{self.x_info.user_name},登录x失败!!!") - self.xstart_info.start = 0 - self.xstart_info.save() - - # # 发推 - # try: - # # self.x_tab.get("https://x.com/home") - # time.sleep(random.randint(3, 15)) - # - # text = "Websea顶级渠道,注册就可享受websea合约85%返佣,量大可谈,欢迎代理咨询。 TG(飞机):http://t.me/webseadds" - # - # self.x_tab.actions.click( - # 'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[1]/div/div/div/div/div/div/div/div/div/div/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input( - # text) - # - # time.sleep(random.randint(3, 15)) - # self.x_tab.actions.click( - # on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/button') - # time.sleep(5) - # except: - # pass - - # # 获取推文链接 - # self.get_urls() - - # # 浏览推文 - # try: - # self.to_li() - # except: - # pass - - # 养号流程 - try: - self.account_nurturing() - except Exception as e: - print(e) - - # 点击关注 - # tab = self.page.new_tab(url="https://x.com/CryptoStart_App") - # time.sleep(10) - # ele = tab.ele('x://span[text()="Follow"]', timeout=0.5) - # if ele: - # tab.actions.click(on_ele=ele) - # time.sleep(10) - # - # for i in range(random.randint(1, 10)): - # tab.actions.scroll(delta_y=random.randint(400, 800)) - # time.sleep(random.randint(3, 10)) - - self.page.quit() +# self.page.quit() if __name__ == '__main__': @@ -581,7 +671,7 @@ if __name__ == '__main__': hun_web = Hub_Web(x_info=x_token_info, xstart_info=xstart_info) - executor.submit(hun_web.action) + executor.submit(hun_web.account_nurturing) # time.sleep(random.randint(15, 60)) time.sleep(delay_between_start)