This commit is contained in:
27942
2025-11-06 15:16:03 +08:00
parent a93cd36058
commit a2a0711e8a
2 changed files with 370 additions and 8 deletions

View File

@@ -1,18 +1,16 @@
import datetime
import re
import time
import hmac
import hashlib
import base64
import time
from bs4 import BeautifulSoup
import requests
import json
import hmac
import time
import base64
import hashlib
import datetime
import requests
from tqdm import *
from loguru import *
from DrissionPage import *
from bs4 import BeautifulSoup
def is_bullish(c): # 阳线

View File

@@ -0,0 +1,364 @@
import re
import json
import hmac
import time
import base64
import hashlib
import datetime
import requests
from tqdm import *
from loguru import *
from DrissionPage import *
from bs4 import BeautifulSoup
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
class WeexTransaction:
def __init__(self, tge_id):
self.tge_port = None # tge浏览器使用端口
self.tge_id = tge_id # tge id
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# 替换为你自己的钉钉机器人 Webhook 地址
self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125"
# 替换为你自己的钉钉机器人秘钥
self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
self.page = None # 浏览器对象
self.start = 0 # 持仓状态 -1:做空0维持仓1做多
self.kline_1 = None # 01
self.kline_2 = None # 01
self.direction = None # 信号类型
self.pbar = None # 进度条对象
def get_signature(self, timestamp):
# 将时间戳和密钥拼接
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign = string_to_sign.encode('utf-8')
# 使用 HMAC-SHA256 算法进行签名
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest()
# 对签名结果进行 Base64 编码
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
def send_dingtalk_message(self, message_content):
# 获取当前时间戳(毫秒)
timestamp = str(round(time.time() * 1000))
# 生成签名
sign = self.get_signature(timestamp, )
# 拼接带有签名信息的完整 Webhook URL
full_url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
# 定义消息内容
message = {
"msgtype": "text",
"text": {
"content": message_content
}
}
# 设置请求头
headers = {
"Content-Type": "application/json"
}
try:
# 发送 POST 请求
response = requests.post(full_url, headers=headers, data=json.dumps(message))
except requests.RequestException as e:
print(f"请求发生错误: {e}")
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
return None, None
def get_price(self):
for i in range(3):
try:
logger.info(f"获取最新数据:{i + 1}次。。。")
self.mn_tab.refresh()
res = self.mn_tab.listen.wait(timeout=15) # 等待并获取一个数据包
datas = []
if res:
for data in res.response.body['data']["dataList"]:
insert_data = {
'id': int(data[4]),
'open': float(data[3]),
'high': float(data[1]),
'low': float(data[2]),
'close': float(data[0])
}
datas.append(insert_data)
return datas
except:
pass
return False
def to_do_page(self):
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
self.mn_tab.ele('x://*[contains(text(), "市价")]', timeout=15).click()
time.sleep(1)
self.mn_tab.ele('x://input[@placeholder="请输入数量"]').input(1600)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.mn_tab.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
def get_text(self, target_text):
# 去除目标文本中的空白字符
cleaned_target_text = re.sub(r'\s', '', target_text)
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(self.mn_tab.html, 'html.parser')
# 遍历所有标签的文本内容
for tag in soup.find_all():
tag_text = tag.get_text()
# 去除标签文本中的空白字符
cleaned_tag_text = re.sub(r'\s', '', tag_text)
if cleaned_target_text in cleaned_tag_text:
return True
else:
return False
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp) * 1000
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def action(self):
# 获取比特端口
if self.openBrowser():
logger.info("获取打开比特成功,成功获取端口!!!")
else:
logger.error("打开比特失败!!!")
return
# 接管浏览器
if self.take_over_browser():
logger.info("接管比特浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab()
self.mn_tab.listen.start("public/quote/v1/getKlineV2")
logger.success("浏览器开启抓包模式。。。")
self.mn_tab.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, 58]: # 判断是否是 新的30分钟了
time.sleep(10)
continue
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
else:
logger.info("获取最新价格有问题!!!")
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() != self.kline_3["id"]:
continue
time.sleep(15)
if self.get_text(target_text="仓位(1)"):
if self.get_text(target_text="ETH/SUSDT多"):
self.start = 1
elif self.get_text(target_text="ETH/SUSDT空"):
self.start = -1
else:
self.start = 0
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()}{e}")
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
if __name__ == '__main__':
WeexTransaction(
tge_id=146473,
).action()
# //*[contains(text(), '特定文本')]