Files
lm_code/推特/main.py
2026-01-15 15:06:37 +08:00

797 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import time
import pyotp
import random
from concurrent.futures import ThreadPoolExecutor
from peewee import fn
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
from bit_tools import openBrowser, createBrowser, get_group_lists, get_browser_lists_Browser
from db_init import ensure_tables, sync_xstart_from_bitbrowser
from models.ips import Ips
from models.xstart import Xstart
from models.xtoken import XToken
class Hub_Web:
def __init__(self, xstart_info, x_info=None):
self.xstart_info = xstart_info
self.x_info = x_info
self.page = None
self.x_tab = None
self.start() # 执行类时,就启动的方法
def start(self):
self.create_Browser() # 创建浏览器
if self.get_page():
logger.info(f",浏览器打开成功")
else:
logger.error(f",浏览器打开失败")
return
if self.login_x_main():
self.xstart_info.start = 1
self.xstart_info.save()
logger.success(f"登录x成功")
# time.sleep(25)
#
# for i in self.x_tab.cookies():
# if i["name"] == "auth_token":
# self.xstart_info.cookie = i["value"]
# self.xstart_info.save()
else:
logger.error(f"登录x失败")
self.xstart_info.start = 0
self.xstart_info.save()
def create_Browser(self):
if not self.xstart_info.bit_id:
if not self.xstart_info.ip_id:
self.ips_info = Ips.select().where(Ips.country == "法国").order_by(fn.Rand()).get()
self.xstart_info.ip_id = self.ips_info.id
self.xstart_info.save()
logger.info("没有浏览器,创建中。。。")
fz_datas = get_group_lists()
bit_id = createBrowser(
# name=self.x_info.user_name,
proxyType="http",
groupId=fz_datas['推特'],
host="104.168.59.92",
port=int(random.randint(20001, 25000)),
# proxyUserName=self.ips_info.username,
# proxyPassword=self.ips_info.password,
)
self.xstart_info.bit_id = bit_id
self.xstart_info.save()
def x_testing(self):
user_name = self.x_tab.ele(
'x://*[@id="react-root"]/div/div/div[2]/header/div/div/div/div[2]/div/button/div[2]/div/div[2]/div/div/div/span',
timeout=10,
)
if not user_name:
user_name = self.x_tab.ele(
'x://*[@id="react-root"]/div/div/div[2]/header/div/div/div/div[2]/div/button/div[2]/div/div/div/div[1]/span/span',
timeout=10,
)
return True
# if user_name and self.x_info.user_name.lower() == user_name.text.split("@")[-1].lower():
# return True
# else:
# return False
# self.x_tab.refresh()
# time.sleep(5)
#
# for i in self.x_tab.cookies():
# if i.get("name") == "auth_token":
# self.x_info.token = i.get("value")
# self.x_info.save()
#
# return True
# else:
# return False
def login_x(self):
try:
self.x_tab = self.page.new_tab('https://x.com/login')
time.sleep(5)
self.x_tab.ele('x://input[@autocomplete="username"]').input(self.x_info.user_name)
self.x_tab.ele('x:(//button[@role="button"])[3]').click()
self.x_tab.ele('x://input[@name="password"]').input(self.x_info.password)
self.x_tab.ele('x://button[@data-testid="LoginForm_Login_Button"]').click()
# 创建一个 TOTP 对象
totp = pyotp.TOTP(self.x_info.two_fa)
self.x_tab.ele('x://input[@data-testid="ocfEnterTextTextInput"]').input(totp.now())
self.x_tab.ele('x://button[@data-testid="ocfEnterTextNextButton"]').click()
time.sleep(5)
self.x_tab.ele('x://button[@data-testid="OAuth_Consent_Button"]').click()
if self.x_testing():
self.xstart_info.start = 1
self.xstart_info.save()
return True
else:
return False
except:
pass
return False
def get_page_tab(self, url):
for i in range(3):
try:
tab = self.page.get_tab(url=url)
return tab
except:
time.sleep(1)
def get_page(self, ):
try:
bit_port = openBrowser(id=self.xstart_info.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
time.sleep(5)
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_x_start(self):
if self.login_x_main():
self.xstart_info.start = 1
self.xstart_info.save()
logger.success(f"登录x成功")
# time.sleep(25)
#
# for i in self.x_tab.cookies():
# if i["name"] == "auth_token":
# self.xstart_info.cookie = i["value"]
# self.xstart_info.save()
else:
logger.error(f"登录x失败")
self.xstart_info.start = 0
self.xstart_info.save()
def login_x_main(self):
self.x_tab = self.page.new_tab('https://x.com/login')
if self.x_testing():
return True
else:
logger.info(f"开始登录x。。。")
if self.login_x():
return True
else:
return False
def get_txt(self):
with open('推文.txt', 'r', encoding='utf-8') as f:
content = f.read()
tasks = [] # 创建任务列表
lines = content.split('\n')
print(lines)
# 随机选择一个段落
random_paragraph = random.choice(lines)
return random_paragraph
def get_soon_txt(self):
with open('soon推文.txt', 'r', encoding='utf-8') as f:
content = f.read()
tasks = [] # 创建任务列表
lines = content.split('\n\n')
print(lines)
# 随机选择一个段落
random_paragraph = random.choice(lines)
return random_paragraph
def tweet_about_sui_chain(self):
self.x_tab.ele('x://div[@class="public-DraftStyleDefault-block public-DraftStyleDefault-ltr"]').input(
self.get_txt())
time.sleep(5)
self.x_tab.ele(
'x://span[@class="css-1jxf684 r-dnmrzs r-1udh08x r-1udbk01 r-3s2u2q r-bcqeeo r-1ttztb7 r-qvutc0 r-poiln3 r-a023e6 r-rjixqe"]//span[@class="css-1jxf684 r-bcqeeo r-1ttztb7 r-qvutc0 r-poiln3"]').click()
if self.hub_web_info.x_num:
self.hub_web_info.x_num += 1
else:
self.hub_web_info.x_num = 1
self.hub_web_info.save(only=[HubWeb.x_num])
def soon_action(self):
for i in range(3):
okx_tab = self.get_page_tab(url="mcohilncbfahbmgdjkbpemcciiolgcge")
if okx_tab:
okx_tab.close()
okx_wallet_tab = self.page.new_tab(
url="chrome-extension://mcohilncbfahbmgdjkbpemcciiolgcge/popup.html#/initialize")
time.sleep(5)
if "initialize" in okx_wallet_tab.url:
okx_wallet_tab.ele('x:(//button[@data-testid="okd-button"])[last()]').click()
okx_wallet_tab.ele(
'x://div[@class="_wallet-list__item_1kyzd_4 _wallet-list__item__hover_1kyzd_8 _wallet-list__cell_1kyzd_26 _listCell_q2vqq_29"]').click()
time.sleep(5)
okx_wallet_tab = self.get_page_tab(url="mcohilncbfahbmgdjkbpemcciiolgcge")
soon_wallet_info = CjjSoo.get_or_none(
CjjSoo.id == self.hub_web_info.soon_wallet_id
)
for ele, text in zip(okx_wallet_tab.eles('x://input[@class="mnemonic-words-inputs__container__input"]'),
soon_wallet_info.mnemonic.split(
" ")):
ele.input(text)
okx_wallet_tab.ele('x://button[@data-testid="okd-button"]').click()
okx_wallet_tab.ele('x:(//div[@class="_item-content_1cywj_42"])[last()]').click()
okx_wallet_tab.ele('x://span[@class="btn-content"]').click()
okx_wallet_tab.ele('x://input[@data-testid="okd-input"]').input("12345678")
okx_wallet_tab.ele('x:(//input[@data-testid="okd-input"])[last()]').input("12345678")
okx_wallet_tab.ele('x:(//span[@class="btn-content"])[last()]').click()
time.sleep(10)
okx_wallet_tab.ele('x:(//span[@class="btn-content"])[last()]').click()
if "unlock" in okx_wallet_tab.url:
okx_wallet_tab.ele('x://input[@data-testid="okd-input"]').input("12345678")
okx_wallet_tab.ele('x://button[@data-testid="okd-button"]').click()
time.sleep(5)
okx_wallet_tab.close()
soon_tab = self.page.new_tab(url="https://redpill.soo.network/ ")
sign_ele = soon_tab.ele('x://button[text()="Sign"]', timeout=5)
if sign_ele:
sign_ele.click()
time.sleep(5)
okx_wallet_tab = self.page.get_tab(url="mcohilncbfahbmgdjkbpemcciiolgcge")
okx_wallet_tab.ele('x:(//button[@data-testid="okd-button"])[last()]').click()
time.sleep(10)
sign_ele = soon_tab.ele('x://button[text()="Sign In"]', timeout=5)
if sign_ele:
sign_ele.click()
soon_tab.ele('x://button[contains(text(), "OKX")]').click()
time.sleep(5)
okx_wallet_tab = self.page.get_tab(url="mcohilncbfahbmgdjkbpemcciiolgcge")
okx_wallet_tab.ele('x:(//button[@data-testid="okd-button"])[last()]').click()
time.sleep(5)
sign_ele = soon_tab.ele('x://button[text()="Sign"]', timeout=5)
if sign_ele:
sign_ele.click()
time.sleep(5)
okx_wallet_tab = self.page.get_tab(url="mcohilncbfahbmgdjkbpemcciiolgcge")
okx_wallet_tab.ele('x:(//button[@data-testid="okd-button"])[last()]').click()
time.sleep(10)
soon_ele = soon_tab.ele('x://div[@class="group relative cursor-pointer"]', timeout=3)
if soon_ele:
soon_ele.click()
time.sleep(3)
soon_ele = soon_tab.ele('x://button[text()="Yes"]', timeout=3)
if soon_ele:
soon_ele.click()
time.sleep(3)
soon_ele = soon_tab.ele('x://button[text()="Confirm"]', timeout=3)
if soon_ele:
soon_ele.click()
time.sleep(3)
soon_ele = soon_tab.ele('x://button[text()="Connect X to Start My Journey"]', timeout=3)
if soon_ele:
soon_ele.click()
time.sleep(5)
soon_ele = soon_tab.ele('x://button[@data-testid="OAuth_Consent_Button"]', timeout=15)
if soon_ele:
soon_ele.click()
# -------------------------------------------------------------------------------------
x_tab = self.page.new_tab(url="https://x.com/intent/post?text=.%20%40soon_svm&hashtags=SOONISTHEREDPILL")
x_tab = self.page.get_tab(url="SOONISTHEREDPILL")
x_tab.ele('x://div[@class="false draftjs-styles_0 "]').input(self.get_soon_txt())
time.sleep(5)
x_tab.ele('x://button[@data-testid="tweetButton"]').click()
time.sleep(5)
x_tab.ele('x://button[@data-testid="tweetButton"]').click()
time.sleep(15)
def get_urls(self):
tab = self.page.new_tab()
tab.listen.start("https://x.com/i/api/graphql")
tab.get(url=f"https://x.com/{self.x_info.user_name}")
url_id = ""
try:
res = tab.listen.wait(timeout=25) # 等待并获取一个数据包
for i in res.response.body["data"]['user']['result']["timeline"]['timeline']["instructions"][1][
"entries"]:
if "tweet" in i["entryId"]:
new_string = i["entryId"].replace("tweet-", "")
url_id += new_string + ";"
except:
pass
self.xstart_info.url_id = url_id
self.xstart_info.save()
def to_li(self):
xtoken_info = XToken.select().where(XToken.start == 1).order_by(fn.Rand()).get()
xstart_info = Xstart.get_or_none(
x_id=xtoken_info.id,
)
result = xstart_info.url_id.split(';')
# 去除可能存在的空字符串元素
result = [item for item in result if item]
for i in result:
tab = self.page.new_tab(url=f"https://x.com/{xtoken_info.user_name}/status/{i}")
time.sleep(random.randint(1, 10))
if random.randint(1, 11) > 5:
# tab.ele(
# 'x:/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[2]/button').click()
tab.actions.click(
'x:/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[2]/button')
time.sleep(random.randint(1, 5))
# tab.ele(
# 'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div[2]/div/div[3]/div/div/div/div').click()
tab.actions.click(
'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div[2]/div/div[3]/div/div/div/div')
time.sleep(random.randint(1, 10))
if random.randint(1, 11) > 5:
# tab.ele(
# 'x:/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[3]/button').click()
tab.actions.click(
'x:/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[3]/button')
time.sleep(random.randint(1, 10))
def account_nurturing(self):
# self.start()
titles = ["Bianca", "币安", "OKX", "bitget", "bybit", "mexc"]
random_element = random.choice(titles)
self.page.get(url=f"https://x.com/search?q={random_element}")
names = []
for i1 in range(random.randint(10, 20)):
eles = self.page.eles(
'x:/html/body/div[1]/div/div/div[2]/main/div/div/div/div[1]/div/div[3]/section/div/div/div', timeout=5)
i = random.choice(eles)
ele = i.ele(
'x:./div/div/article/div/div/div[2]/div[2]/div[1]/div/div[1]/div/div/div[1]/div/a/div/div[1]/span/span',
timeout=0.5)
if not ele:
continue
if ele.text in names:
continue
names.append(ele.text)
self.page.actions.scroll(on_ele=ele)
time.sleep(1)
self.page.actions.click(on_ele=ele)
# 关注
if random.randint(1, 11) > 5:
ele = self.page.ele('x://span[text()="Follow"]', timeout=0.5)
if ele:
self.page.actions.click(on_ele=ele)
for i in range(random.randint(1, 10)):
eles = self.page.eles(
'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div[1]/div/div[3]/div/div/section/div/div/div',
timeout=5)
i = random.choice(eles)
ele = i.ele('x:.//div/div/article', timeout=5)
if ele:
self.page.actions.scroll(on_ele=ele)
time.sleep(1)
self.page.actions.click(on_ele=ele)
time.sleep(random.randint(1, 10))
self.page.actions.scroll(
on_ele='x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div/div/div[2]/div[2]/div/div[2]/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[3]/button/div/div[1]')
time.sleep(random.randint(1, 5))
if random.randint(1, 11) > 5:
self.page.actions.click(
'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div/div/div[2]/div[2]/div/div[2]/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[2]/button')
time.sleep(random.randint(1, 5))
self.page.actions.click(
'x:/html/body/div[1]/div/div/div[1]/div[3]/div/div/div/div[2]/div/div[3]/div/div/div/div/div[2]/div')
time.sleep(random.randint(1, 10))
if random.randint(1, 11) > 5:
self.page.actions.click(
'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div/div/div[2]/div[2]/div/div[2]/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[3]/button/div/div[1]')
time.sleep(random.randint(1, 10))
self.page.back()
time.sleep(random.randint(1, 10))
self.page.scroll(random.randint(500, 1000))
self.page.back()
time.sleep(random.randint(1, 10))
self.page.scroll(random.randint(500, 1000))
if random.randint(1, 11) > 5:
self.FollowTwitterAccount()
def get_name(self):
self.x_tab = self.page.new_tab('https://x.com/login')
user_name = self.x_tab.ele(
'x://*[@id="react-root"]/div/div/div[2]/header/div/div/div/div[2]/div/button/div[2]/div/div[2]/div/div/div/span',
timeout=10,
)
if not user_name:
user_name = self.x_tab.ele(
'x://*[@id="react-root"]/div/div/div[2]/header/div/div/div/div[2]/div/button/div[2]/div/div/div/div[1]/span/span',
timeout=10,
)
if user_name:
for i in XToken.select():
if i.user_name.lower() == user_name.text.split("@")[-1].lower():
self.xstart_info.x_id = i.id
self.xstart_info.save()
return True
else:
return False
def to_do_tui(self):
self.start()
# 发推
try:
# self.x_tab.get("https://x.com/home")
time.sleep(random.randint(3, 15))
# text = "Websea顶级渠道注册就可享受websea合约85%返佣,量大可谈,欢迎代理咨询。 TG飞机http://t.me/webseadds"
text = self.get_txt()
text = """
💡 Websea Futures Insurance 2.0 FAQ
What is "Referral-Based Rate Reduction"?
🤝 For every friend you successfully invite, your insurance fee rate drops by 1%, up to a maximum of 5%!
A friend counts as a successful boost once they register and reach a total insurance premiums of ≥ 10 USDT within 7 days.
Your friend also receives 1 reward node after completing registration.
Winwin benefits — dont miss out! 🚀
"""
self.xstart_info.url_id = text
self.xstart_info.save()
self.x_tab.actions.click(
'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/nav/div/div[2]/div/div[1]/div/button').input(
[r"E:\新建文件夹\6194986263283174596.jpg"])
self.x_tab.actions.click(
'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[1]/div/div/div/div/div/div/div/div/div/div/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input(
text)
time.sleep(random.randint(3, 15))
for i in range(3):
self.x_tab.actions.click(
on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/button')
time.sleep(1)
time.sleep(5)
except Exception as e:
print(e)
def end(self):
self.page.quit()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end()
def FollowTwitterAccount(self):
# 点击关注
# tab = self.page.new_tab(url="https://x.com/CryptoStart_App")
tab = self.page.new_tab(url="https://x.com/Websea_MY")
time.sleep(10)
# ele = tab.ele('x://span[text()="Follow"]', timeout=0.5)
# if ele:
# tab.actions.click(on_ele=ele)
# time.sleep(10)
tab.actions.click(
on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div[1]/div/div[3]/div/div/div[1]/div[1]/div[1]/div[2]/div/div[1]/button/div/span/span')
for i in range(random.randint(1, 10)):
tab.actions.scroll(delta_y=random.randint(400, 800))
time.sleep(random.randint(3, 10))
def 回复(self):
urls = [
"https://x.com/Websea_MY/status/2004081342364123146",
"https://x.com/Websea_MY/status/2003669526492405904",
"https://x.com/Websea_MY/status/2001119115575222779",
"https://x.com/Websea_MY/status/2001180999443734677"
]
tests = [
"O seguro de futuros da Websea ultrapassou oficialmente a marca de 400 rodadas de airdrops! 🚀 Contacte TGwebseatom 85% de reembolso",
"Websea #FuturesInsurance официально преодолела отметку в 400 раундов аирдропов! 🚀Свяжитесь с TGwebseatom 85% кэшбэк",
"¡Websea #FuturesInsurance ha superado oficialmente las 400 rondas de airdrops! 🚀Póngase en contacto con TGwebseatom 85 % de reembolso",
"Websea #FuturesInsurance has officially surpassed 400 rounds of airdrops! 🚀 Contact TGwebseatom 85% cashback",
"Websea #FuturesInsurance đã chính thức vượt qua 400 đợt airdrop! 🚀Liên hệ TGwebseatom 85% hoàn tiền",
]
tab = self.page.new_tab(random.choice(urls))
tab.actions.click(
'x:/html/body/div[1]/div/div/div[2]/main/div/div/div/div[1]/div/section/div/div/div[1]/div/div/article/div/div/div[3]/div[5]/div/div/div[2]/button')
time.sleep(random.randint(1, 5))
tab.actions.click(
'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div[2]/div/div[3]/div/div/div/a[1]')
time.sleep(random.randint(1, 5))
# tab.actions.type(random.choice(tests))
tab.ele(
'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div/div/div[2]/div[2]/div/div/div/div[3]/div[2]/div[1]/div/div/div/div[1]/div[2]/div/div/div/div/div/div/div/div/div/div/div[1]/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input(
random.choice(tests))
time.sleep(random.randint(1, 5))
for i in range(3):
try:
tab.actions.click(
'x:/html/body/div[1]/div/div/div[1]/div[2]/div/div/div/div/div/div[2]/div[2]/div/div/div/div[3]/div[2]/div[1]/div/div/div/div[2]/div[2]/div/div/div/button/div/span/span')
except:
pass
time.sleep(random.randint(1, 5))
time.sleep(random.randint(5, 15))
# def action(self):
# if not self.xstart_info.bit_id:
# self.xstart_info.ip_id = self.ips_info.id
# self.xstart_info.save()
#
# logger.info("没有浏览器,创建中。。。")
#
# fz_datas = get_group_lists()
#
# bit_id = createBrowser(
# name=self.x_info.user_name,
# groupId=fz_datas['推特'],
# host=self.ips_info.host,
# port=int(self.ips_info.port),
# proxyUserName=self.ips_info.username,
# proxyPassword=self.ips_info.password,
# )
#
# self.xstart_info.bit_id = bit_id
# self.xstart_info.save()
# self.get_page()
# if self.get_page():
# logger.info(f"推特名字:{self.x_info.user_name},浏览器打开成功")
# else:
# logger.error(f"推特名字:{self.x_info.user_name},浏览器打开失败")
# return
#
# time.sleep(5)
# for _, i in enumerate(self.page.get_tabs()):
# if _ == 0:
# continue
#
# i.close()
# self.get_name()
# if self.login_x_main():
# self.xstart_info.start = 1
# self.xstart_info.save()
#
# logger.success(f"推特名字:{self.x_info.user_name}登录x成功")
#
# # time.sleep(25)
# #
# # for i in self.x_tab.cookies():
# # if i["name"] == "auth_token":
# # self.xstart_info.cookie = i["value"]
# # self.xstart_info.save()
#
# else:
# logger.error(f"推特名字:{self.x_info.user_name}登录x失败")
# self.xstart_info.start = 0
# self.xstart_info.save()
# # 发推
# try:
# # self.x_tab.get("https://x.com/home")
# time.sleep(random.randint(3, 15))
#
# text = "Websea顶级渠道注册就可享受websea合约85%返佣,量大可谈,欢迎代理咨询。 TG飞机http://t.me/webseadds"
#
# self.x_tab.actions.click(
# 'x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[1]/div/div/div/div/div/div/div/div/div/div/div/div[1]/div/div/div/div/div/div[2]/div/div/div/div').input(
# text)
#
# time.sleep(random.randint(3, 15))
# self.x_tab.actions.click(
# on_ele='x://*[@id="react-root"]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]/div/div/div/button')
# time.sleep(5)
# except:
# pass
# # 获取推文链接
# self.get_urls()
# # 浏览推文
# try:
# self.to_li()
# except:
# pass
# # 养号流程
# try:
# self.account_nurturing()
# except Exception as e:
# print(e)
# 点击关注
# tab = self.page.new_tab(url="https://x.com/CryptoStart_App")
# time.sleep(10)
# ele = tab.ele('x://span[text()="Follow"]', timeout=0.5)
# if ele:
# tab.actions.click(on_ele=ele)
# time.sleep(10)
#
# for i in range(random.randint(1, 10)):
# tab.actions.scroll(delta_y=random.randint(400, 800))
# time.sleep(random.randint(3, 10))
# self.page.quit()
def run_work(x_token_info, xstart_info):
# hun_web = Hub_Web(x_info=x_token_info, xstart_info=xstart_info)
# executor.submit(hun_web.to_do_tui)
with Hub_Web(x_info=None, xstart_info=xstart_info) as hub:
try:
hub.account_nurturing() # 养号
except:
pass
# hub.to_do_tui() # 发推
hub.回复() # 发推
# hub.FollowTwitterAccount() # 关注
if __name__ == '__main__':
# ensure_tables()
# sync_xstart_from_bitbrowser(group_name="推特", max_pages=10)
#
# fz_datas = get_group_lists()
#
# for bit_data in get_browser_lists_Browser(id=fz_datas['推特']):
# print(bit_data)
# xstart_info, start = Xstart.get_or_create(
# bit_id=bit_data["id"],
# )
# if xstart_info.x_id:
# # continue
#
# hun_web = Hub_Web(xstart_info=xstart_info)
#
# # hun_web.action()
#
# threading.Thread(target=hun_web.action).start()
# time.sleep(random.randint(15, 60))
# 同时运行
max_threads = 5
delay_between_start = 30 # 每次启动线程之间的延迟时间(秒)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# for x_token_info in XToken.select():
# 查询数据并转换为列表
random_x_infos = list(Xstart.select())
# 直接对原列表进行打乱操作
random.shuffle(random_x_infos)
# 遍历打乱顺序后的列表
for x_token_info in random_x_infos:
# if xstart_info.start:
# continue
executor.submit(run_work, x_token_info, x_token_info)
# time.sleep(random.randint(60, 150))
time.sleep(delay_between_start)