418 lines
15 KiB
Python
418 lines
15 KiB
Python
import re
|
||
import json
|
||
import hmac
|
||
import time
|
||
import base64
|
||
import hashlib
|
||
import datetime
|
||
import requests
|
||
|
||
from tqdm import *
|
||
from loguru import *
|
||
from DrissionPage import *
|
||
from bs4 import BeautifulSoup
|
||
|
||
|
||
def is_bullish(c): # 阳线
|
||
return float(c['close']) > float(c['open'])
|
||
|
||
|
||
def is_bearish(c): # 阴线
|
||
return float(c['close']) < float(c['open'])
|
||
|
||
|
||
class WeexTransaction:
|
||
def __init__(self, tge_id):
|
||
self.tge_port = None # tge浏览器使用端口
|
||
self.tge_id = tge_id # tge id
|
||
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
|
||
self.tge_headers = {
|
||
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 替换为你自己的钉钉机器人 Webhook 地址
|
||
self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125"
|
||
# 替换为你自己的钉钉机器人秘钥
|
||
self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
|
||
|
||
self.page = None # 浏览器对象
|
||
|
||
self.start = 0 # 持仓状态 -1:做空,0:维持仓,1:做多
|
||
self.kline_1 = None # 0:跌,1:涨
|
||
self.kline_2 = None # 0:跌,1:涨
|
||
|
||
self.direction = None # 信号类型
|
||
|
||
self.pbar = None # 进度条对象
|
||
|
||
def get_signature(self, timestamp):
|
||
# 将时间戳和密钥拼接
|
||
string_to_sign = f'{timestamp}\n{self.secret}'
|
||
string_to_sign = string_to_sign.encode('utf-8')
|
||
# 使用 HMAC-SHA256 算法进行签名
|
||
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest()
|
||
# 对签名结果进行 Base64 编码
|
||
sign = base64.b64encode(hmac_code).decode('utf-8')
|
||
return sign
|
||
|
||
# def send_dingtalk_message(self, message_content):
|
||
# # 获取当前时间戳(毫秒)
|
||
# timestamp = str(round(time.time() * 1000))
|
||
# # 生成签名
|
||
# sign = self.get_signature(timestamp, )
|
||
# # 拼接带有签名信息的完整 Webhook URL
|
||
# full_url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
|
||
#
|
||
# # 定义消息内容
|
||
# message = {
|
||
# "msgtype": "text",
|
||
# "text": {
|
||
# "content": message_content
|
||
# }
|
||
# }
|
||
#
|
||
# # 设置请求头
|
||
# headers = {
|
||
# "Content-Type": "application/json"
|
||
# }
|
||
#
|
||
# try:
|
||
# # 发送 POST 请求
|
||
# response = requests.post(full_url, headers=headers, data=json.dumps(message))
|
||
#
|
||
# except requests.RequestException as e:
|
||
# print(f"请求发生错误: {e}")
|
||
|
||
def send_dingtalk_message(self, message_content):
|
||
|
||
pass
|
||
|
||
# url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
|
||
#
|
||
# res = requests.post(
|
||
# url=url,
|
||
# json={
|
||
# "phone": "8613661496481",
|
||
# "bot_name": "ergggreef",
|
||
# "datas": [
|
||
# {"send_message": [message_content], "click_button": [""], },
|
||
# ]
|
||
#
|
||
# }
|
||
# )
|
||
#
|
||
# print(res.json())
|
||
|
||
def openBrowser(self, ): # 直接指定ID打开窗口,也可以使用 createBrowser 方法返回的ID
|
||
|
||
try:
|
||
|
||
response = requests.post(
|
||
f"{self.tge_url}/api/browser/start",
|
||
json={"envId": self.tge_id},
|
||
headers=self.tge_headers
|
||
)
|
||
|
||
self.tge_port = response.json()["data"]["port"]
|
||
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def take_over_browser(self):
|
||
try:
|
||
co = ChromiumOptions()
|
||
co.set_local_port(self.tge_port)
|
||
|
||
self.page = ChromiumPage(addr_or_opts=co)
|
||
|
||
self.page.set.window.max()
|
||
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def is_bullish(self, c): # 阳线
|
||
return float(c['close']) > float(c['open'])
|
||
|
||
def is_bearish(self, c): # 阴线
|
||
return float(c['close']) < float(c['open'])
|
||
|
||
def check_signal(self, prev, curr):
|
||
"""
|
||
包住形态信号判定(仅15分钟K线):
|
||
- 前跌后涨包住 -> 做多
|
||
- 前涨后跌包住 -> 做空
|
||
"""
|
||
p_open, p_close = float(prev['open']), float(prev['close'])
|
||
c_open, c_close = float(curr['open']), float(curr['close'])
|
||
|
||
# 前跌后涨包住 -> 做多
|
||
if is_bullish(curr) and is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open):
|
||
return "long", "bear_bull_engulf"
|
||
|
||
# 前涨后跌包住 -> 做空
|
||
if is_bearish(curr) and is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open):
|
||
return "short", "bull_bear_engulf"
|
||
|
||
return None, None
|
||
|
||
def get_price(self):
|
||
|
||
for i in range(3):
|
||
try:
|
||
logger.info(f"获取最新数据:{i + 1}次。。。")
|
||
self.mn_tab.get(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
|
||
res = self.mn_tab.listen.wait(timeout=25) # 等待并获取一个数据包
|
||
|
||
datas = []
|
||
if res:
|
||
|
||
for data in res.response.body["data"]:
|
||
insert_data = {
|
||
'id': int(data["timestamp"]),
|
||
'open': float(data["open"]),
|
||
'high': float(data["high"]),
|
||
'low': float(data["low"]),
|
||
'close': float(data["close"])
|
||
}
|
||
|
||
datas.append(insert_data)
|
||
|
||
return datas
|
||
except:
|
||
pass
|
||
|
||
return False
|
||
|
||
def remove_tags_and_spaces(self, html):
|
||
# 创建 BeautifulSoup 对象
|
||
soup = BeautifulSoup(html, 'html.parser')
|
||
# 获取去除标签后的文本
|
||
text = soup.get_text()
|
||
# 去除所有空格
|
||
text = text.replace(" ", "")
|
||
return text
|
||
|
||
def to_do_page(self):
|
||
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
|
||
|
||
self.mn_tab.ele('x://*[contains(text(), "市价")]', timeout=15).click()
|
||
time.sleep(1)
|
||
|
||
self.mn_tab.ele('x://*[@id="amountInput"]').input(float(self.get_num()) / 100)
|
||
time.sleep(1)
|
||
|
||
if self.direction == "long" and not self.start:
|
||
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开多")
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开多")
|
||
self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click()
|
||
self.start = 1
|
||
elif self.direction == "short" and not self.start:
|
||
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开空")
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},开空")
|
||
self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click()
|
||
self.start = -1
|
||
elif self.direction == "long" and self.start == -1:
|
||
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平空做多")
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平空做多")
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
|
||
time.sleep(3)
|
||
self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click()
|
||
self.start = 1
|
||
elif self.direction == "short" and self.start == 1:
|
||
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平多做空")
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},反手平多做空")
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
|
||
time.sleep(3)
|
||
self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click()
|
||
self.start = -1
|
||
|
||
def get_text(self, target_text):
|
||
# 去除目标文本中的空白字符
|
||
cleaned_target_text = re.sub(r'\s', '', target_text)
|
||
|
||
# 创建 BeautifulSoup 对象
|
||
soup = BeautifulSoup(self.mn_tab.html, 'html.parser')
|
||
|
||
# 遍历所有标签的文本内容
|
||
for tag in soup.find_all():
|
||
tag_text = tag.get_text()
|
||
# 去除标签文本中的空白字符
|
||
cleaned_tag_text = re.sub(r'\s', '', tag_text)
|
||
if cleaned_target_text in cleaned_tag_text:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def get_now_time(self):
|
||
# 获取当前时间戳
|
||
current_timestamp = time.time()
|
||
# 将当前时间戳转换为 datetime 对象
|
||
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
|
||
|
||
# 计算距离当前时间最近的整点或 30 分时刻
|
||
if current_datetime.minute < 30:
|
||
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
|
||
else:
|
||
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
|
||
|
||
# 将目标 datetime 对象转换为时间戳
|
||
target_timestamp = target_datetime.timestamp()
|
||
|
||
return int(target_timestamp)
|
||
|
||
def close_extra_tabs_in_browser(self):
|
||
|
||
try:
|
||
for _, i in enumerate(self.page.get_tabs()):
|
||
if _ == 0:
|
||
continue
|
||
|
||
i.close()
|
||
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
return False
|
||
|
||
def get_num(self):
|
||
|
||
num_tab = self.page.new_tab()
|
||
num_tab.listen.start("capi.websea.com/webApi/asset/account")
|
||
|
||
for i in range(3):
|
||
try:
|
||
logger.info(f"获取最新数据:{i + 1}次。。。")
|
||
num_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
|
||
res = num_tab.listen.wait(timeout=15) # 等待并获取一个数据包
|
||
|
||
if res:
|
||
num_tab.close()
|
||
return res.response.body["result"]["availableUsd"]
|
||
except:
|
||
pass
|
||
|
||
num_tab.close()
|
||
|
||
return False
|
||
|
||
def action(self):
|
||
# 获取比特端口
|
||
if self.openBrowser():
|
||
logger.info("获取打开比特成功,成功获取端口!!!")
|
||
else:
|
||
logger.error("打开比特失败!!!")
|
||
return
|
||
|
||
# 接管浏览器
|
||
if self.take_over_browser():
|
||
logger.info("接管比特浏览器成功!!!")
|
||
else:
|
||
logger.error("接管浏览器失败!!!")
|
||
return
|
||
|
||
if self.close_extra_tabs_in_browser():
|
||
logger.info('关闭多余标签页成功!!!')
|
||
else:
|
||
logger.info('关闭多余标签页失败!!!')
|
||
|
||
self.mn_tab = self.page.new_tab()
|
||
self.mn_tab.listen.start("contract-v2.bitmart.com/v1/ifcontract/quote/kline")
|
||
logger.success("浏览器开启抓包模式。。。")
|
||
|
||
self.mn_tab.get(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") # 打开网页
|
||
|
||
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc:进度条说明,ncols:长度
|
||
|
||
while True:
|
||
# 获取当前时间
|
||
current_time = time.localtime()
|
||
current_minute = current_time.tm_min
|
||
|
||
if current_minute < 30:
|
||
self.pbar.n = current_minute
|
||
self.pbar.refresh()
|
||
else:
|
||
self.pbar.n = current_minute - 30
|
||
self.pbar.refresh()
|
||
|
||
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
|
||
# if current_minute not in range(60): # 判断是否是 新的30分钟了
|
||
time.sleep(10)
|
||
continue
|
||
|
||
new_price_datas = self.get_price()
|
||
if new_price_datas:
|
||
logger.success("获取最新交易价格成功!!!")
|
||
else:
|
||
logger.info("获取最新价格有问题!!!")
|
||
continue
|
||
|
||
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
|
||
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
|
||
|
||
# 判断抓取的数据是否正确
|
||
if self.get_now_time() != self.kline_3["id"]:
|
||
continue
|
||
|
||
time.sleep(15)
|
||
|
||
if self.get_text(target_text="全部平仓ETHUSDT永续多全仓100X持仓量"):
|
||
self.start = 1
|
||
elif self.get_text(target_text="全部平仓ETHUSDT永续空全仓100X持仓量"):
|
||
self.start = -1
|
||
else:
|
||
self.start = 0
|
||
|
||
if self.start == 1:
|
||
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
|
||
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平多")
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平多")
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
|
||
self.start = 0
|
||
elif self.start == -1:
|
||
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
|
||
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平空")
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1},{self.kline_2},平空")
|
||
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
|
||
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
|
||
self.start = 0
|
||
|
||
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
|
||
|
||
if self.direction:
|
||
try:
|
||
self.to_do_page()
|
||
except Exception as e:
|
||
self.send_dingtalk_message(
|
||
message_content=f"{datetime.datetime.now()},{e}")
|
||
|
||
self.pbar.reset() # 重置进度条
|
||
self.send_dingtalk_message(
|
||
message_content=
|
||
f"{datetime.datetime.now()},"
|
||
f"目前有持仓:{"无" if self.start == 0 else ("多" if self.start == 1 else "空")},"
|
||
f"当前信号:{"无" if not self.direction else ("多" if self.direction == "long" else "空")}"
|
||
)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
WeexTransaction(
|
||
tge_id=196495,
|
||
).action()
|
||
|
||
# //*[contains(text(), '特定文本')]
|