rgfewfger

This commit is contained in:
27942
2025-12-11 16:54:34 +08:00
parent 9f99a116c0
commit 3245574087

View File

@@ -1,5 +1,6 @@
import re
import time
import asyncio
import datetime
from tqdm import *
@@ -46,24 +47,24 @@ class WeexTransaction:
self.session = requests.Session() # 接口请求对象
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
pass
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
# url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
#
# res = requests.post(
# url=url,
# json={
# "phone": "8613661496481",
# "bot_name": "ergggreef",
# "datas": [
# {"send_message": [message_content], "click_button": [""], },
# ]
#
# }
# )
#
# print(res.json())
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
@@ -197,19 +198,22 @@ class WeexTransaction:
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.mn_tab.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
@@ -218,30 +222,14 @@ class WeexTransaction:
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
def get_text(self, target_text):
# 去除目标文本中的空白字符
cleaned_target_text = re.sub(r'\s', '', target_text)
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(self.mn_tab.html, 'html.parser')
# 遍历所有标签的文本内容
for tag in soup.find_all():
tag_text = tag.get_text()
# 去除标签文本中的空白字符
cleaned_tag_text = re.sub(r'\s', '', tag_text)
if cleaned_target_text in cleaned_tag_text:
return True
else:
return False
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
@@ -374,7 +362,7 @@ class WeexTransaction:
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue
@@ -403,7 +391,8 @@ class WeexTransaction:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
@@ -411,7 +400,7 @@ class WeexTransaction:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
@@ -424,12 +413,12 @@ class WeexTransaction:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()}{e}")
message_content=f"weex{datetime.datetime.now()}{e}")
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"{datetime.datetime.now()}"
f"weex{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)