Files
lm_code/me/bitmart-三分之一策略交易.py
ddrwode c85f370e32 hahaa
2026-01-30 14:51:51 +08:00

763 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 三分之一回归策略交易(双向触发版)
使用5分钟K线周期实时监测
策略规则:
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
- 方向以当前收盘相对前一根收盘为准:当前收盘在前一根之上偏多(只开多/不开空),之下偏空(只开空/不开多);双触时也按此选方向,避免先探后拉/先拉后砸误开反手。
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根K线内若先开空后价格回调到「做多触发价」开仓价+上一根实体/3则平空开多反之先开多后跌到做空触发价则平多开空。同一根K线内仅允许一次反手防止频繁来回开仓。
示例1阳线
前一根K线开盘3000收盘3100阳线实体=100
- 做多触发价格 = 3100 + 33 = 3133继续上涨做多
- 做空触发价格 = 3100 - 33 = 3067回调做空←当前跌到这里就做空
示例2阴线
前一根K线开盘3100收盘3000阴线实体=100
- 做多触发价格 = 3000 + 33 = 3033反弹做多
- 做空触发价格 = 3000 - 33 = 2967继续下跌做空
"""
import time
import datetime
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage,ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
class BitmartOneThirdStrategy:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "6104088c65a68d7e53df5d9395b67d78e555293a"
self.secret_key = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01"
self.memo = "me"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=15, desc="等待K线", ncols=80) # 5分钟周期
self.last_kline_time = None
self.leverage = "40" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
# 三分之一策略参数
self.min_body_size = 0.1 # 最小实体大小
self.kline_step = 15 # K线周期5分钟
self.kline_count = 20 # 获取的K线数量用于向前查找有效K线
# 实时监测参数
self.check_interval = 3 # 检测间隔(秒)
self.last_trigger_kline_id = None # 记录上次触发信号的K线ID避免同一K线重复触发
self.last_trigger_direction = None # 记录上次触发的方向
self.last_trade_kline_id = None # 记录上次实际交易的K线ID防止同一K线内频繁反手
# ========================= 三分之一策略核心函数 =========================
def is_bullish(self, c):
"""判断阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
"""判断阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(self, candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = self.get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_levels(self, prev):
"""
计算前一根K线实体的 1/3 双向触发价格
返回:(做多触发价格, 做空触发价格)
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3实体
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3实体
示例:
阳线 open=3000, close=3100, 实体=100
- 做多触发 = 3100 + 33 = 3133继续涨
- 做空触发 = 3100 - 33 = 3067回调
阴线 open=3100, close=3000, 实体=100
- 做多触发 = 3000 + 33 = 3033反弹
- 做空触发 = 3000 - 33 = 2967继续跌
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001: # 十字星,忽略
return None, None
# 基于收盘价的双向触发价格
long_trigger = p_close + body / 3 # 从收盘价往上涨1/3触发做多
short_trigger = p_close - body / 3 # 从收盘价往下跌1/3触发做空
return long_trigger, short_trigger
def check_trigger(self, all_data, current_idx):
"""
检查当前K线是否触发了交易信号双向检测
返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None)
规则:
- 当前K线高点 >= 做多触发价格 → 做多信号
- 当前K线低点 <= 做空触发价格 → 做空信号
"""
if current_idx <= 0:
return None, None, None
curr = all_data[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size)
if prev is None:
return None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None
# 使用影线部分high/low来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
c_close = float(curr['close'])
p_close = float(prev['close'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 当前价格相对前一根收盘的方向:已在前一根之上偏多,之下偏空
if long_triggered and short_triggered:
# 两个方向都触达时,以当前收盘相对前一根收盘为准,不按“谁先触达”
if c_close > p_close:
return 'long', long_trigger, valid_prev_idx
elif c_close < p_close:
return 'short', short_trigger, valid_prev_idx
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx
return 'long', long_trigger, valid_prev_idx
if short_triggered:
# 仅触达做空价:若当前收盘已在前一根之上,说明已拉回偏多,不开空
if c_close > p_close:
return None, None, None
return 'short', short_trigger, valid_prev_idx
if long_triggered:
# 仅触达做多价:若当前收盘已在前一根之下,说明已回落偏空,不开多
if c_close < p_close:
return None, None, None
return 'long', long_trigger, valid_prev_idx
return None, None, None
def check_realtime_trigger(self, kline_data):
"""
实时检测当前K线是否触发信号双向检测
基于已收盘的K线计算触发价格用当前正在形成的K线判断
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
# 当前正在形成的K线最后一根未收盘
curr = kline_data[-1]
curr_kline_id = curr['id']
# 从倒数第二根开始往前找有效K线已收盘的K线
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, None
# 使用当前K线的实时高低点来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
c_close = float(curr['close'])
p_close = float(prev['close'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 确定触发方向:当前价格已在前一根之上偏多、之下偏空时,以当前方向为准
direction = None
trigger_price = None
if long_triggered and short_triggered:
# 两个方向都触达时,以当前收盘相对前一根收盘为准(不按谁先触达)
if c_close > p_close:
direction = 'long'
trigger_price = long_trigger
elif c_close < p_close:
direction = 'short'
trigger_price = short_trigger
else:
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
direction = 'short'
trigger_price = short_trigger
else:
direction = 'long'
trigger_price = long_trigger
elif short_triggered:
# 仅触达做空价:若当前收盘已在前一根之上,说明已拉回偏多,不开空
if c_close <= p_close:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
# 仅触达做多价:若当前收盘已在前一根之下,说明已回落偏空,不开多
if c_close >= p_close:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
# 检查是否在同一根K线内已经触发过相同方向
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ========================= BitMart API 函数 =========================
def get_klines(self):
"""获取最近N根5分钟K线"""
try:
end_time = int(time.time())
# 获取足够多的K线用于向前查找有效K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
error_msg = str(e)
# 检查是否是429限流错误
if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding(msg="获取K线异常", error=True)
return None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器自动化函数 =========================
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def close_extra_tabs_in_browser(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""市价平仓"""
logger.info("执行平仓操作...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding(msg="执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None):
"""
市价开单
marketPriceLongOrder: 1 做多, -1 做空
"""
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str}操作,金额: {size}")
size = 50
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(msg=f"执行{direction_str}操作,金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
"""统一消息格式"""
prefix = "❌三分之一策略:" if error else "🔔三分之一策略:"
if error:
logger.error(msg)
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
logger.info(msg)
send_dingtalk_message(f"{prefix}{msg}")
# ========================= 时间计算函数 =========================
def get_now_time(self):
"""获取当前5分钟整点时间戳"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的5分钟整点
minute = current_datetime.minute
target_minute = (minute // 5) * 5 # 向下取整到5分钟
target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0)
return int(target_datetime.timestamp())
def get_time_to_next_5min(self):
"""获取距离下一个5分钟的秒数"""
current_timestamp = time.time()
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
minute = current_datetime.minute
next_5min = ((minute // 5) + 1) * 5
if next_5min >= 60:
next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)
else:
next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0)
return (next_datetime - current_datetime).total_seconds()
# ========================= 主运行函数 =========================
def action(self):
"""主运行逻辑 - 实时监测版本"""
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info(f"开始实时监测,检测间隔: {self.check_interval}")
# 用于定时发送持仓信息每5分钟发一次
last_report_time = 0
report_interval = 300 # 5分钟报告一次持仓
while True:
# 1. 打开浏览器
for i in range(5):
if self.openBrowser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
# 获取K线数据
kline_data = self.get_klines()
if not kline_data:
logger.warning("获取K线数据失败等待重试...")
time.sleep(self.check_interval)
continue
if len(kline_data) < 3:
logger.warning("K线数据不足")
time.sleep(self.check_interval)
continue
# 获取当前K线信息用于日志
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
# ========== 实时信号检测 ==========
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
curr_kline_id = curr_kline['id']
# 获取持仓状态先获取用于判断是否允许同一K线内反手
if not self.get_position_status():
logger.warning("获取仓位信息失败")
time.sleep(self.check_interval)
continue
# 检查是否在同一K线内已经交易过防止频繁反手
# 例外同一根K线内仅允许一次反手
# - 先开空 -> 价格回调到做多触发价 -> 允许平空开多
# - 先开多 -> 价格跌到做空触发价 -> 允许平多开空
if self.last_trade_kline_id == curr_kline_id:
allow_reverse = (self.start == -1 and direction == "long") or (self.start == 1 and direction == "short")
if not allow_reverse:
logger.debug(f"同一K线内已交易且非反手场景跳过本次{direction}信号")
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
action_desc = "平空开多" if (self.start == -1 and direction == "long") else "平多开空"
logger.info(f"同一K线内回调/反弹触发反手:当前持仓{'' if self.start == -1 else ''},信号{direction} -> {action_desc}")
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
# 检查信号与持仓是否同向(避免重复日志)
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
# 信号与持仓同向,静默忽略
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
# 开仓原因(用于日志:多久、为什么开仓)
is_same_kline_reverse = (self.last_trade_kline_id == curr_kline_id)
if is_same_kline_reverse and direction == "long":
open_reason = f"同一根K线内价格回调到做多触发价 {trigger_price:.2f}(前一根[{prev_time}]{prev_type} 实体={prev_body:.2f}),平空反手开多"
elif is_same_kline_reverse and direction == "short":
open_reason = f"同一根K线内价格跌到做空触发价 {trigger_price:.2f}(前一根[{prev_time}]{prev_type} 实体={prev_body:.2f}),平多反手开空"
elif direction == "long":
open_reason = f"当前K线最高价触达做多触发价 {trigger_price:.2f}(前一根[{prev_time}]{prev_type} 实体={prev_body:.2f} 收盘={valid_prev['close']:.2f}"
else:
open_reason = f"当前K线最低价触达做空触发价 {trigger_price:.2f}(前一根[{prev_time}]{prev_type} 实体={prev_body:.2f} 收盘={valid_prev['close']:.2f}"
open_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"{'=' * 50}")
logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}")
logger.info(
f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}")
logger.info(
f" 当前K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)")
logger.info(f" 开仓时间: {open_time_str}")
logger.info(f" 开仓原因: {open_reason}")
# ========== 执行交易逻辑 ==========
balance = self.get_available_balance()
if balance is None:
balance = 0
trade_size = balance * self.risk_percent
executed = False
if direction == "long":
if self.start == -1: # 当前空仓,平空开多
logger.info(f"📈 平空仓,反手开多 | {open_time_str} | {open_reason}")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0: # 当前无仓,直接开多
logger.info(f"📈 无仓位,开多 | {open_time_str} | {open_reason}")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif direction == "short":
if self.start == 1: # 当前多仓,平多开空
logger.info(f"📉 平多仓,反手开空 | {open_time_str} | {open_reason}")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0: # 当前无仓,直接开空
logger.info(f"📉 无仓位,开空 | {open_time_str} | {open_reason}")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
# 记录本次触发
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
# 记录交易K线防止同一K线内频繁反手
self.last_trade_kline_id = curr_kline_id
# 发送开仓时间与原因到钉钉
self.ding(msg=f"开仓时间: {open_time_str}\n开仓原因: {open_reason}")
# 交易后立即发送持仓信息
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
logger.info(f"{'=' * 50}")
else:
# 没有信号时,显示实时价格
logger.debug(
f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
# ========== 定时发送持仓信息 ==========
current_time = time.time()
if current_time - last_report_time >= report_interval:
if self.get_position_status():
self._send_position_message(kline_data[-1])
last_report_time = current_time
# 等待下次检测
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(15)
self.page.close()
time.sleep(15)
def _send_position_message(self, latest_kline):
"""发送持仓信息到钉钉"""
current_price = float(latest_kline["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start != 0:
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if hasattr(self,
'position_cross') and self.position_cross else 0.0
# 计算浮动盈亏
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
else: # 空头
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
# 计算收益率
if open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
direction_str = "" if self.start == -1 else ""
pnl_str = f"{unrealized_pnl:+.2f} USDT"
msg = (
f"【三分之一策略 {self.contract_symbol} 5分钟】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eth){float(current_amount) / 1000} eth\n"
f"持仓量(usdt){position_cross} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【三分之一策略 {self.contract_symbol} 5分钟】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
self.ding(msg=msg)
if __name__ == '__main__':
# 启动三分之一策略交易
BitmartOneThirdStrategy(bit_id="62f9107d0c674925972084e282df55b3").action()