Files
lm_code/交易/weex_交易.py
2025-12-19 09:46:11 +08:00

536 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import asyncio
import datetime
from tqdm import *
from loguru import *
from DrissionPage import *
import requests
from telethon import TelegramClient
from 交易.tools import send_dingtalk_message
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
class WeexTransaction:
def __init__(self, tge_id):
self.tge_port = None # tge浏览器使用端口
self.tge_id = tge_id # tge id
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# 替换为你自己的钉钉机器人 Webhook 地址
self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125"
# 替换为你自己的钉钉机器人秘钥
self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
self.page = None # 浏览器对象
self.start = 0 # 持仓状态 -1:做空0维持仓1做多
self.kline_1 = None # 01
self.kline_2 = None # 01
self.kline_1 = self.kline_2 = self.kline_3 = None
self.direction = None # 信号类型
self.pbar = None # 进度条对象
self.session = requests.Session() # 接口请求对象
self.headers = None
def get_now_time1(self):
timestamp = time.time()
local_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
return formatted_time
def send_dingtalk_message(self, message_content, type=1):
if type:
send_dingtalk_message(
message_content=f"🔔weex" + message_content
)
else:
for i in range(15):
send_dingtalk_message(
message_content=f"❌weex" + message_content
)
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open):
return "short", "bull_bear_engulf"
return None, None
def get_price(self):
params = {
'contractId': '10000002',
'productCode': 'cmt_ethusdt',
'priceType': 'LAST_PRICE',
'klineType': 'MINUTE_30',
'limit': '300',
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
datas = []
for i in range(3):
logger.info(f"获取最新数据:{i + 1}次。。。")
try:
response = self.session.get('https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
params=params, )
for i in response.json()["data"]["dataList"]:
insert_data = {
'id': int(i[4]),
'open': float(i[3]),
'high': float(i[1]),
'low': float(i[2]),
'close': float(i[0])
}
datas.append(insert_data)
return datas
except:
time.sleep(1)
return datas
def to_do_page(self):
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
num = self.get_num()
if num:
logger.info("获取可用余额成功!!!")
else:
logger.error("获取可用余额失败!!!")
self.send_dingtalk_message("获取可用余额失败!!!", type=0)
return
self.page.ele('x://input[@placeholder="请输入数量"]').input(float(num) / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(f"信号:{self.direction},开多,开仓金额:{float(num) / 100}")
self.page.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(f"信号:{self.direction},开空,开仓金额:{float(num) / 100}")
self.page.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(f"信号:{self.direction},反手平空做多,开仓金额:{float(num) / 100}")
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.page.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(f"信号:{self.direction},反手平多做空,开仓金额:{float(num) / 100}")
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.page.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp) * 1000
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_num(self):
for i in range(3):
try:
response = self.session.post(
'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new',
)
return response.json()["data"]["newContract"]["balanceList"][0]["available"]
except:
time.sleep(1)
return False
def get_token(self):
tab = self.page.new_tab()
tab.listen.start("/user/security/getLanguageType")
for i in range(3):
tab.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT")
try:
res = tab.listen.wait(timeout=5)
if res.request.headers.get("U-TOKEN"):
if not self.headers:
self.session.headers = res.request.headers
else:
self.session.headers["U-TOKEN"] = res.request.headers["U-TOKEN"]
tab.close()
return True
except:
time.sleep(1)
tab.close()
return False
def get_position_status(self):
json_data = {
'filterContractIdList': [
10000002,
],
'limit': 100,
'languageType': 0,
'sign': 'SIGN',
'timeZone': 'string',
}
for i in range(3):
try:
response = self.session.post(
'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
json=json_data,
)
datas = []
for i in response.json()["data"]["dataList"]:
datas.append(i)
if datas[0]["legacyOrderDirection"] == "OPEN_LONG":
self.start = 1
elif datas[0]["legacyOrderDirection"] == "OPEN_SHORT":
self.start = -1
else:
self.start = 0
self.datas = datas[0]
return True
except:
time.sleep(1)
return False
def action(self):
# 获取比特端口
if self.openBrowser():
logger.info("获取打开比特成功,成功获取端口!!!")
else:
logger.error("打开比特失败!!!")
return
# 接管浏览器
if self.take_over_browser():
logger.info("接管比特浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
self.time_start = None # 时间状态 避免同一个时段,发生太多消息
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
# if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, 35]: # 判断是否是 新的30分钟了
# # if current_minute not in range(60): # 判断是否是 新的30分钟了
# time.sleep(10)
# continue
if self.kline_3 and self.get_now_time() == self.kline_3["id"]:
continue
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败")
self.send_dingtalk_message(message_content=f"获取token失败", type=0)
new_price_datas = self.get_price()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0)
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() != self.kline_3["id"]:
continue
if self.time_start == self.get_now_time():
continue
self.time_start = self.get_now_time()
self.page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
try:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
except:
self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0)
continue
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0)
continue
self.pbar.reset() # 重置进度条
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
num = self.get_num()
# 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值
message_content = None
current_price = float(self.kline_3["close"])
if self.start:
# 1. 从 self.datas 中提取并转换关键数据
# 假设 self.datas 是你贴的那个JSON字典如果是个列表取最新一笔
data = self.datas
if isinstance(data, list) and data:
data = data[-1] # 如果是列表,取最新一笔成交
fill_size = float(self.datas['fillSize']) # 持仓量单位ETH
fill_value = float(self.datas['fillValue']) # 成交名义价值 USDT
open_avg_price = fill_value / fill_size # 开仓均价(本笔成交均价)
position_side = self.datas['positionSide'] # "SHORT" 或 "LONG"
# 2. 方向判断并设置 self.start方便后续策略使用
if position_side == 'SHORT':
direction = ""
self.start = -1
elif position_side == 'LONG':
direction = ""
self.start = 1
else:
direction = ""
self.start = 0
# 3. 当前价格从你的K线数据
current_price = float(self.kline_3["close"])
# 4. 持仓量(假设当前持仓等于这笔成交量,如有加减仓后续可维护累计)
current_amount = fill_size # 单位ETH
# 5. 计算浮动盈亏USDT
if self.start == 1: # 多头
unrealized_pnl = current_amount * (current_price - open_avg_price)
elif self.start == -1: # 空头
unrealized_pnl = current_amount * (open_avg_price - current_price)
else:
unrealized_pnl = 0.0
# 6. 收益率
if self.start != 0 and open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
pnl_str = f"{unrealized_pnl:+.2f} USDT"
# 7. 当前持仓名义价值
current_value = current_amount * current_price
# 8. 持仓量显示
amount_display = f"{current_amount:.3f} ETH"
# 9. 组装消息钉钉Markdown格式更美观
message_content = (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**{direction}\n"
f"**当前现价**{current_price:.2f} USDT\n"
f"**开仓均价**{open_avg_price:.2f} USDT\n"
f"**持仓数量(eth)**{amount_display} eth\n"
f"**持仓数量(usdt)**{float(self.datas['fillValue']) / 100:.2f} usdt\n"
f"**名义价值**{current_value:.2f} USDT\n"
f"**浮动盈亏**{pnl_str}{rate_str}\n"
f"**账户可用余额**{float(num):.2f} USDT"
)
else:
message_content = (
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
f"**持仓方向**:无\n"
f"**当前现价**{current_price:.2f} USDT\n"
# f"**开仓均价**{open_avg_price:.2f} USDT\n"
# f"**持仓数量**{amount_display}\n"
# f"**名义价值**{current_value:.2f} USDT\n"
# f"**浮动盈亏**{pnl_str}{rate_str}\n"
f"**账户可用余额**{float(num):.2f} USDT"
)
# 10. 发送钉钉消息
self.send_dingtalk_message(message_content=message_content)
if __name__ == '__main__':
WeexTransaction(
tge_id=146473,
).action()