fewfwe
This commit is contained in:
@@ -1,5 +1,10 @@
|
||||
import time
|
||||
|
||||
import hmac
|
||||
import hashlib
|
||||
import base64
|
||||
import time
|
||||
import requests
|
||||
import json
|
||||
import requests
|
||||
|
||||
from tqdm import *
|
||||
@@ -25,6 +30,11 @@ class WeexTransaction:
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# 替换为你自己的钉钉机器人 Webhook 地址
|
||||
self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125"
|
||||
# 替换为你自己的钉钉机器人秘钥
|
||||
self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
|
||||
|
||||
self.page = None # 浏览器对象
|
||||
|
||||
self.start = 0 # 持仓状态 -1:做空,0:维持仓,1:做多
|
||||
@@ -35,6 +45,48 @@ class WeexTransaction:
|
||||
|
||||
self.pbar = None # 进度条对象
|
||||
|
||||
def get_signature(self,timestamp):
|
||||
# 将时间戳和密钥拼接
|
||||
string_to_sign = f'{timestamp}\n{self.secret}'
|
||||
string_to_sign = string_to_sign.encode('utf-8')
|
||||
# 使用 HMAC-SHA256 算法进行签名
|
||||
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest()
|
||||
# 对签名结果进行 Base64 编码
|
||||
sign = base64.b64encode(hmac_code).decode('utf-8')
|
||||
return sign
|
||||
|
||||
def send_dingtalk_message(self, message_content):
|
||||
# 获取当前时间戳(毫秒)
|
||||
timestamp = str(round(time.time() * 1000))
|
||||
# 生成签名
|
||||
sign = self.get_signature(timestamp,)
|
||||
# 拼接带有签名信息的完整 Webhook URL
|
||||
full_url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
|
||||
|
||||
# 定义消息内容
|
||||
message = {
|
||||
"msgtype": "text",
|
||||
"text": {
|
||||
"content": message_content
|
||||
}
|
||||
}
|
||||
|
||||
# 设置请求头
|
||||
headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
# 发送 POST 请求
|
||||
response = requests.post(full_url, headers=headers, data=json.dumps(message))
|
||||
# 检查响应状态码
|
||||
if response.status_code == 200:
|
||||
print("消息发送成功")
|
||||
else:
|
||||
print(f"消息发送失败,状态码: {response.status_code}, 响应内容: {response.text}")
|
||||
except requests.RequestException as e:
|
||||
print(f"请求发生错误: {e}")
|
||||
|
||||
def openBrowser(self, ): # 直接指定ID打开窗口,也可以使用 createBrowser 方法返回的ID
|
||||
|
||||
try:
|
||||
@@ -128,18 +180,22 @@ class WeexTransaction:
|
||||
|
||||
if self.direction == "long" and not self.start:
|
||||
logger.success(f"第一根信号:{self.kline_1},{self.kline_2},开多")
|
||||
self.send_dingtalk_message(message_content=f"第一根信号:{self.kline_1},{self.kline_2},开多")
|
||||
self.page.ele('x://span[normalize-space()="买入开多"]').click()
|
||||
elif self.direction == "short" and not self.start:
|
||||
logger.success(f"第一根信号:{self.kline_1},{self.kline_2},开空")
|
||||
self.send_dingtalk_message(message_content=f"第一根信号:{self.kline_1},{self.kline_2},开空")
|
||||
self.page.ele('x://span[normalize-space()="卖出开空"]').click()
|
||||
elif self.direction == "long" and self.start == -1:
|
||||
logger.success(f"第一根信号:{self.kline_1},{self.kline_2},反手平空做多")
|
||||
self.send_dingtalk_message(message_content=f"第一根信号:{self.kline_1},{self.kline_2},反手平空做多")
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
|
||||
time.sleep(3)
|
||||
self.page.ele('x://span[normalize-space()="买入开多"]').click()
|
||||
elif self.direction == "short" and self.start == 1:
|
||||
logger.success(f"第一根信号:{self.kline_1},{self.kline_2},反手平多做空")
|
||||
self.send_dingtalk_message(message_content=f"第一根信号:{self.kline_1},{self.kline_2},反手平多做空")
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
|
||||
time.sleep(3)
|
||||
@@ -204,11 +260,13 @@ class WeexTransaction:
|
||||
if self.start == 1:
|
||||
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
|
||||
logger.success(f"第一根信号:{self.kline_1},{self.kline_2},平多")
|
||||
self.send_dingtalk_message(message_content=f"第一根信号:{self.kline_1},{self.kline_2},平多")
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
|
||||
elif self.start == -1:
|
||||
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
|
||||
logger.success(f"第一根信号:{self.kline_1},{self.kline_2},平空")
|
||||
self.send_dingtalk_message(message_content=f"第一根信号:{self.kline_1},{self.kline_2},平空")
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user