加入了五分之一交易代码

This commit is contained in:
ddrwode
2026-02-01 23:14:00 +08:00
parent f6e2d1fb7f
commit 15072ae9e4

View File

@@ -0,0 +1,559 @@
"""
BitMart 五分之一回归策略交易(精准版)
使用3分钟K线周期计算触发价格实时监测同根K线内多空都触及时用1分钟K线判断先后
策略规则(与 bitmart/回测-三分之一策略-精准版.py 一致):
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根3分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当当前3分钟K线同时触及做多和做空价格时拉取该3分钟对应的3根1分钟K线判断哪个方向先被触发
"""
import time
import datetime
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
class BitmartOneFifthStrategy:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=3, desc="等待K线", ncols=80) # 3分钟周期
self.last_kline_time = None
self.leverage = "100"
self.open_type = "cross"
self.risk_percent = 0.01
self.open_avg_price = None
self.current_amount = None
self.bit_id = bit_id
# 五分之一策略参数(与回测一致)
self.min_body_size = 0.1
self.kline_step = 3 # 3分钟K线
self.kline_count = 20
self.check_interval = 3
self.last_trigger_kline_id = None
self.last_trigger_direction = None
self.last_trade_kline_id = None
# ========================= 五分之一策略核心 =========================
def is_bullish(self, c):
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
return float(c['close']) < float(c['open'])
def get_body_size(self, candle):
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1):
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if self.get_body_size(prev) >= min_body_size:
return i, prev
return None, None
def get_one_fifth_levels(self, prev):
"""
计算前一根K线实体的 1/5 双向触发价格
做多触发 = 收盘价 + 实体/5做空触发 = 收盘价 - 实体/5
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = p_close + body / 5
short_trigger = p_close - body / 5
return long_trigger, short_trigger
def get_1m_bars_for_3m_bar(self, bar_3m):
"""获取当前3分钟K线对应的3根1分钟K线用于同根内多空都触发时判断先后"""
try:
start_ts = int(bar_3m['id'])
end_ts = start_ts + 3 * 60 # 秒
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1,
start_time=start_ts,
end_time=end_ts
)[0]
if response.get('code') != 1000:
return []
data = response.get('data', [])
out = []
for k in data:
out.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
out.sort(key=lambda x: x['id'])
return out
except Exception as e:
logger.warning(f"获取1分钟K线失败: {e}")
return []
def determine_trigger_order_by_1m(self, bars_1m, long_trigger, short_trigger):
"""
用1分钟K线判断在3分钟周期内先触发做多还是做空
返回 'long', 'short' 或 None
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_ok = high >= long_trigger
short_ok = low <= short_trigger
if long_ok and not short_ok:
return 'long'
if short_ok and not long_ok:
return 'short'
if long_ok and short_ok:
d_long = abs(long_trigger - open_price)
d_short = abs(short_trigger - open_price)
return 'short' if d_short < d_long else 'long'
return None
def check_realtime_trigger(self, kline_data):
"""
实时检测当前3分钟K线是否触发信号
若多空都触发优先用1分钟K线判断先后否则用开盘价距离
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr['id']
valid_prev_idx, prev = self.find_valid_prev_bar(
kline_data, len(kline_data) - 1, self.min_body_size
)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_one_fifth_levels(prev)
if long_trigger is None:
return None, None, None, None
c_high = float(curr['high'])
c_low = float(curr['low'])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
direction = None
trigger_price = None
if long_triggered and short_triggered:
bars_1m = self.get_1m_bars_for_3m_bar(curr)
if bars_1m:
direction = self.determine_trigger_order_by_1m(
bars_1m, long_trigger, short_trigger
)
trigger_price = long_trigger if direction == 'long' else short_trigger
if direction is None:
c_open = float(curr['open'])
d_long = abs(long_trigger - c_open)
d_short = abs(short_trigger - c_open)
direction = 'short' if d_short <= d_long else 'long'
trigger_price = long_trigger if direction == 'long' else short_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ========================= BitMart API =========================
def get_klines(self):
"""获取最近3分钟K线"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding(msg="获取K线异常", error=True)
return None
def get_current_price(self):
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
if isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器 =========================
def openBrowser(self):
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except Exception:
return False
def close_extra_tabs_in_browser(self):
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
def click_safe(self, xpath, sleep=0.5):
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except Exception:
return False
def 平仓(self):
logger.info("执行平仓操作...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding(msg="执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None):
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str}操作,金额: {size}")
size = 25
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(msg=f"执行{direction_str}操作,金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
prefix = "❌五分之一策略:" if error else "🔔五分之一策略:"
if error:
logger.error(msg)
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
logger.info(msg)
send_dingtalk_message(f"{prefix}{msg}")
# ========================= 主循环 =========================
def action(self):
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
if not self.openBrowser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info(f"五分之一策略3分钟K线开始实时监测检测间隔: {self.check_interval}")
last_report_time = 0
report_interval = 300
while True:
for i in range(5):
if self.openBrowser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
kline_data = self.get_klines()
if not kline_data or len(kline_data) < 3:
logger.warning("K线数据不足等待重试...")
time.sleep(self.check_interval)
continue
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
curr_kline_id = curr_kline['id']
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
if not self.get_position_status():
logger.warning("获取仓位信息失败")
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
logger.info(f"{'=' * 50}")
logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}")
logger.info(
f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}")
logger.info(
f" 当前3分钟K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)")
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
executed = False
if direction == "long":
if self.start == -1:
logger.info("📈 平空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0:
logger.info("📈 无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif direction == "short":
if self.start == 1:
logger.info("📉 平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0:
logger.info("📉 无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
self.last_trade_kline_id = curr_kline_id
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
logger.info(f"{'=' * 50}")
else:
logger.debug(
f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
if time.time() - last_report_time >= report_interval:
if self.get_position_status():
self._send_position_message(kline_data[-1])
last_report_time = time.time()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(15)
self.page.close()
time.sleep(15)
def _send_position_message(self, latest_kline):
current_price = float(latest_kline["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start != 0:
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if getattr(self, 'position_cross', None) else 0.0
if self.start == 1:
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
else:
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
if open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
direction_str = "" if self.start == -1 else ""
pnl_str = f"{unrealized_pnl:+.2f} USDT"
msg = (
f"【五分之一策略 {self.contract_symbol} 3分钟】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eth){float(current_amount) / 1000} eth\n"
f"持仓量(usdt){position_cross} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【五分之一策略 {self.contract_symbol} 3分钟】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
self.ding(msg=msg)
if __name__ == '__main__':
BitmartOneFifthStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()