rgfewfger

This commit is contained in:
27942
2025-12-08 16:20:22 +08:00
parent 0224785c8d
commit 1a287611e6
2 changed files with 212 additions and 97 deletions

31
test.py
View File

@@ -1,3 +1,28 @@
print(abs(100 - 10))
https://letsexchange.io/
# 3923 3938 16 3932 3938 6
class MyClass:
def __init__(self):
# 启动时自动执行的方法
self.start()
def start(self):
print("启动方法已执行")
def my_method(self):
print("指定的方法正在执行")
print(bfghsgrg)
# 模拟可能出现的错误
# 下面这行代码会触发一个异常,你可以根据需要注释掉它来测试正常情况
# result = 1 / 0
def end(self):
print("结束方法已执行")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end()
# 使用 with 语句来调用这个类和方法
with MyClass() as obj:
obj.my_method()

View File

@@ -22,6 +22,70 @@ class Hub_Web:
self.page = None
self.x_tab = None
self.start() # 执行类时,就启动的方法
def start(self):
self.create_Browser() # 创建浏览器
if self.get_page():
logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功")
else:
logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败")
return
if self.login_x_main():
self.xstart_info.start = 1
self.xstart_info.save()
logger.success(f"推特名字:{self.x_info.user_name}登录x成功")
# time.sleep(25)
#
# for i in self.x_tab.cookies():
# if i["name"] == "auth_token":
# self.xstart_info.cookie = i["value"]
# self.xstart_info.save()
else:
logger.error(f"推特名字:{self.x_info.user_name}登录x失败")
self.xstart_info.start = 0
self.xstart_info.save()
def end(self):
self.page.quit()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end()
def create_Browser(self):
if not self.xstart_info.bit_id:
if not self.xstart_info.ip_id:
self.ips_info = Ips.select().where(Ips.country == "法国").order_by(fn.Rand()).get()
self.xstart_info.ip_id = self.ips_info.id
self.xstart_info.save()
logger.info("没有浏览器,创建中。。。")
fz_datas = get_group_lists()
bit_id = createBrowser(
name=self.x_info.user_name,
groupId=fz_datas['推特'],
host=self.ips_info.host,
port=int(self.ips_info.port),
proxyUserName=self.ips_info.username,
proxyPassword=self.ips_info.password,
)
self.xstart_info.bit_id = bit_id
self.xstart_info.save()
def x_testing(self):
user_name = self.x_tab.ele(
'x://*[@id="react-root"]/div/div/div[2]/header/div/div/div/div[2]/div/button/div[2]/div/div[2]/div/div/div/span',
@@ -102,11 +166,38 @@ class Hub_Web:
self.page.set.window.max()
time.sleep(5)
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_x_start(self):
if self.login_x_main():
self.xstart_info.start = 1
self.xstart_info.save()
logger.success(f"推特名字:{self.x_info.user_name}登录x成功")
# time.sleep(25)
#
# for i in self.x_tab.cookies():
# if i["name"] == "auth_token":
# self.xstart_info.cookie = i["value"]
# self.xstart_info.save()
else:
logger.error(f"推特名字:{self.x_info.user_name}登录x失败")
self.xstart_info.start = 0
self.xstart_info.save()
def login_x_main(self):
self.x_tab = self.page.new_tab('https://x.com/login')
@@ -431,112 +522,111 @@ class Hub_Web:
else:
return False
def action(self):
# def action(self):
self.ips_info = Ips.select().where(Ips.country == "法国").order_by(fn.Rand()).get()
if not self.xstart_info.bit_id:
self.xstart_info.ip_id = self.ips_info.id
self.xstart_info.save()
# if not self.xstart_info.bit_id:
# self.xstart_info.ip_id = self.ips_info.id
# self.xstart_info.save()
#
# logger.info("没有浏览器,创建中。。。")
#
# fz_datas = get_group_lists()
#
# bit_id = createBrowser(
# name=self.x_info.user_name,
# groupId=fz_datas['推特'],
# host=self.ips_info.host,
# port=int(self.ips_info.port),
# proxyUserName=self.ips_info.username,
# proxyPassword=self.ips_info.password,
# )
#
# self.xstart_info.bit_id = bit_id
# self.xstart_info.save()
logger.info("没有浏览器,创建中。。。")
# self.get_page()
fz_datas = get_group_lists()
# if self.get_page():
# logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功")
# else:
# logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败")
# return
#
# time.sleep(5)
bit_id = createBrowser(
name=self.x_info.user_name,
groupId=fz_datas['推特'],
host=self.ips_info.host,
port=int(self.ips_info.port),
proxyUserName=self.ips_info.username,
proxyPassword=self.ips_info.password,
)
# for _, i in enumerate(self.page.get_tabs()):
# if _ == 0:
# continue
#
# i.close()
self.xstart_info.bit_id = bit_id
self.xstart_info.save()
# self.get_name()
# self.get_page()
# if self.login_x_main():
# self.xstart_info.start = 1
# self.xstart_info.save()
#
# logger.success(f"推特名字:{self.x_info.user_name}登录x成功")
#
# # time.sleep(25)
# #
# # for i in self.x_tab.cookies():
# # if i["name"] == "auth_token":
# # self.xstart_info.cookie = i["value"]
# # self.xstart_info.save()
#
# else:
# logger.error(f"推特名字:{self.x_info.user_name}登录x失败")
# self.xstart_info.start = 0
# self.xstart_info.save()
if self.get_page():
logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功")
else:
logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败")
return
# # 发推
# try:
# # self.x_tab.get("https://x.com/home")
# time.sleep(random.randint(3, 15))
#
# text = "Websea顶级渠道注册就可享受websea合约85%返佣,量大可谈,欢迎代理咨询。 TG飞机http://t.me/webseadds"
#
# self.x_tab.actions.click(
# 'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[1]/div/div/div/div/div/div/div/div/div/div/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input(
# text)
#
# time.sleep(random.randint(3, 15))
# self.x_tab.actions.click(
# on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/button')
# time.sleep(5)
# except:
# pass
time.sleep(5)
# # 获取推文链接
# self.get_urls()
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
# # 浏览推文
# try:
# self.to_li()
# except:
# pass
i.close()
# # 养号流程
# try:
# self.account_nurturing()
# except Exception as e:
# print(e)
self.get_name()
# 点击关注
# tab = self.page.new_tab(url="https://x.com/CryptoStart_App")
# time.sleep(10)
# ele = tab.ele('x://span[text()="Follow"]', timeout=0.5)
# if ele:
# tab.actions.click(on_ele=ele)
# time.sleep(10)
#
# for i in range(random.randint(1, 10)):
# tab.actions.scroll(delta_y=random.randint(400, 800))
# time.sleep(random.randint(3, 10))
if self.login_x_main():
self.xstart_info.start = 1
self.xstart_info.save()
logger.success(f"推特名字:{self.x_info.user_name}登录x成功")
# time.sleep(25)
#
# for i in self.x_tab.cookies():
# if i["name"] == "auth_token":
# self.xstart_info.cookie = i["value"]
# self.xstart_info.save()
else:
logger.error(f"推特名字:{self.x_info.user_name}登录x失败")
self.xstart_info.start = 0
self.xstart_info.save()
# # 发推
# try:
# # self.x_tab.get("https://x.com/home")
# time.sleep(random.randint(3, 15))
#
# text = "Websea顶级渠道注册就可享受websea合约85%返佣,量大可谈,欢迎代理咨询。 TG飞机http://t.me/webseadds"
#
# self.x_tab.actions.click(
# 'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[1]/div/div/div/div/div/div/div/div/div/div/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input(
# text)
#
# time.sleep(random.randint(3, 15))
# self.x_tab.actions.click(
# on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/button')
# time.sleep(5)
# except:
# pass
# # 获取推文链接
# self.get_urls()
# # 浏览推文
# try:
# self.to_li()
# except:
# pass
# 养号流程
try:
self.account_nurturing()
except Exception as e:
print(e)
# 点击关注
# tab = self.page.new_tab(url="https://x.com/CryptoStart_App")
# time.sleep(10)
# ele = tab.ele('x://span[text()="Follow"]', timeout=0.5)
# if ele:
# tab.actions.click(on_ele=ele)
# time.sleep(10)
#
# for i in range(random.randint(1, 10)):
# tab.actions.scroll(delta_y=random.randint(400, 800))
# time.sleep(random.randint(3, 10))
self.page.quit()
# self.page.quit()
if __name__ == '__main__':
@@ -581,7 +671,7 @@ if __name__ == '__main__':
hun_web = Hub_Web(x_info=x_token_info, xstart_info=xstart_info)
executor.submit(hun_web.action)
executor.submit(hun_web.account_nurturing)
# time.sleep(random.randint(15, 60))
time.sleep(delay_between_start)