Files
codex_jxs_code/bb_delay_reversal_trade.py
2026-03-05 15:44:55 +08:00

666 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带延迟反转策略 - 实盘交易
基于回测策略 bb_backtest_march_2026.py
使用框架的API查询 + 浏览器自动化交易
"""
import time
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage, ChromiumOptions
from bitmart.api_contract import APIContract
class BBDelayReversalConfig:
"""策略配置"""
# 浏览器ID从框架获取
BIT_ID = "f2320f57e24c45529a009e1541e25961"
# API凭证从框架获取
API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
MEMO = "合约交易"
# 合约
CONTRACT_SYMBOL = "ETHUSDT"
TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 布林带参数
BB_PERIOD = 10
BB_STD = 2.5
# 仓位管理(从框架获取)
LEVERAGE = "50"
OPEN_TYPE = "isolated" # 逐仓模式
MARGIN_PCT = 0.01 # 首次开仓1%
# 运行参数
POLL_INTERVAL = 5
KLINE_STEP = 5
KLINE_HOURS = 2
class BBDelayReversalTrader:
"""布林带延迟反转交易器"""
def __init__(self, bit_id: str = None):
self.cfg = BBDelayReversalConfig()
if bit_id:
self.cfg.BIT_ID = bit_id
# API使用框架配置
self.contractAPI = APIContract(
self.cfg.API_KEY,
self.cfg.SECRET_KEY,
self.cfg.MEMO,
timeout=(5, 15)
)
# 浏览器
self.page: ChromiumPage | None = None
self.page_start = True
self.last_page_open_time = 0.0
self.PAGE_REFRESH_INTERVAL = 1800
# 持仓状态
self.position = 0 # -1空, 0无, 1多
self.position_count = 0 # 0空仓, 1首次, 2加仓
self.entry_price = 0
self.current_amount = 0
self.total_margin = 0
# 延迟反转状态
self.delay_reverse_price = None
self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long'
self.delay_reverse_kline_id = None
# 中轨平仓
self.mid_closed_half = False
# 交易控制
self.last_trade_time = 0.0
self.last_kline_id = None
# 日志
self.log_dir = Path(__file__).resolve().parent
logger.add(
self.log_dir / "bb_delay_trade_{time:YYYY-MM-DD}.log",
rotation="1 day", retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# ========== API查询方法 ==========
def get_klines(self) -> list | None:
"""获取5分钟K线使用框架方法"""
try:
end_time = int(time.time())
start_time = end_time - 3600 * self.cfg.KLINE_HOURS
response = self.contractAPI.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=self.cfg.KLINE_STEP,
start_time=start_time,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
return None
def get_current_price(self) -> float | None:
"""获取当前价格(使用框架方法)"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=1,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_balance(self) -> float | None:
"""获取可用余额(使用框架方法)"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self) -> bool:
"""查询持仓状态(使用框架方法)"""
try:
response = self.contractAPI.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.position = 0
self.position_count = 0
self.entry_price = 0
self.current_amount = 0
return True
pos = positions[0]
self.position = 1 if pos['position_type'] == 1 else -1
self.entry_price = float(pos['open_avg_price'])
self.current_amount = float(pos['current_amount'])
logger.debug(f"持仓: {'' if self.position > 0 else ''} | "
f"价格={self.entry_price:.2f} | 数量={self.current_amount:.4f}")
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self) -> bool:
"""设置杠杆(使用框架方法)"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
leverage=self.cfg.LEVERAGE,
open_type=self.cfg.OPEN_TYPE
)[0]
if response['code'] == 1000:
logger.success(f"{self.cfg.OPEN_TYPE}模式 + {self.cfg.LEVERAGE}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========== 布林带计算 ==========
def calc_bollinger(self, closes: list):
"""计算布林带"""
if len(closes) < self.cfg.BB_PERIOD:
return None
arr = np.array(closes[-self.cfg.BB_PERIOD:], dtype=float)
mid = arr.mean()
std = arr.std(ddof=0)
upper = mid + self.cfg.BB_STD * std
lower = mid - self.cfg.BB_STD * std
return mid, upper, lower
# ========== 浏览器自动化 ==========
def open_browser(self) -> bool:
"""打开浏览器(使用框架方法)"""
try:
bit_port = openBrowser(id=self.cfg.BIT_ID)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.last_page_open_time = time.time()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5) -> bool:
"""安全点击(使用框架方法)"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except:
return False
def browser_open_position(self, direction: str, usdt_amount: float) -> bool:
"""浏览器开仓(使用框架的开单方法)"""
try:
logger.info(f"浏览器操作: 开{'' if direction == 'long' else ''} {usdt_amount}U")
# 使用框架的开单方法
if direction == 'long':
# 市价做多
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(usdt_amount)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
else:
# 市价做空
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(usdt_amount)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(2)
return True
except Exception as e:
logger.error(f"浏览器开仓失败: {e}")
return False
def browser_close_position(self, ratio: float = 1.0) -> bool:
"""浏览器平仓(使用框架方法)"""
try:
logger.info(f"浏览器操作: 平仓{int(ratio*100)}%")
if ratio >= 0.99:
# 全平(使用框架的全平仓方法)
self.click_safe('x://span[normalize-space(text()) ="市价"]')
else:
# 平半(使用框架的平一半方法)
self.click_safe('x://button[normalize-space(text()) ="平仓"]')
self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]')
if self.position > 0:
# 平一半多仓
self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]')
else:
# 平一半空仓
self.click_safe('x://span[normalize-space(text()) ="买入/平空"]')
time.sleep(2)
return True
except Exception as e:
logger.error(f"浏览器平仓失败: {e}")
return False
# ========== 延迟反转逻辑 ==========
def mark_delay_reversal(self, reverse_type: str, trigger_price: float, kline_id: int):
"""标记延迟反转"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_id = kline_id
logger.warning(f"⚠️ 延迟反转触发: {reverse_type} @ {trigger_price:.2f} | K线ID: {kline_id}")
def clear_delay_reversal(self):
"""清除延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_id = None
def check_delay_reversal(self, current_kline, prev_kline, prev_upper, prev_lower) -> tuple | None:
"""
检查延迟反转确认
根据回测代码的逻辑:
1. 次K确认触轨后的下一根K线回调/反弹到记录价格
2. 持续追踪追踪上一根K线的触轨情况或实体
"""
if self.position == 0 or self.delay_reverse_price is None:
return None
if self.delay_reverse_kline_id is None:
return None
current_kline_id = current_kline['id']
trigger_kline_id = self.delay_reverse_kline_id
# 计算K线偏移5分钟 = 300000ms
offset = (current_kline_id - trigger_kline_id) // 300000
if offset <= 0:
return None
high = current_kline['high']
low = current_kline['low']
logger.debug(f"延迟反转检查: offset={offset} | 当前K线 H={high:.2f} L={low:.2f}")
if self.delay_reverse_type == 'long_to_short':
# 多转空: 需要价格回调
if offset == 1:
# 次K确认回调到记录的上轨价格
if low <= self.delay_reverse_price:
logger.success(f"✓ 延迟反转确认(次K): 回调到 {self.delay_reverse_price:.2f}")
return 'short', self.delay_reverse_price, "次K回调确认"
elif offset >= 2 and prev_kline and prev_upper:
# 持续追踪上一根K线
prev_high = prev_kline['high']
prev_touch_upper = prev_high >= prev_upper
if prev_touch_upper:
# 上一根触上轨当前K回调到上一根上轨价
if low <= prev_upper:
logger.success(f"✓ 延迟反转确认(追踪): 上一根触上轨后回调到 {prev_upper:.2f}")
return 'short', prev_upper, "上一根触上轨后回调确认"
else:
# 上一根未触轨,跌破上一根实体
prev_body_low = min(prev_kline['open'], prev_kline['close'])
if low <= prev_body_low:
logger.success(f"✓ 延迟反转确认(追踪): 跌破上一根实体 {prev_body_low:.2f}")
return 'short', prev_body_low, "跌破上一根实体确认"
elif self.delay_reverse_type == 'short_to_long':
# 空转多: 需要价格反弹
if offset == 1:
# 次K确认反弹到记录的下轨价格
if high >= self.delay_reverse_price:
logger.success(f"✓ 延迟反转确认(次K): 反弹到 {self.delay_reverse_price:.2f}")
return 'long', self.delay_reverse_price, "次K反弹确认"
elif offset >= 2 and prev_kline and prev_lower:
# 持续追踪上一根K线
prev_low = prev_kline['low']
prev_touch_lower = prev_low <= prev_lower
if prev_touch_lower:
# 上一根触下轨当前K反弹到上一根下轨价
if high >= prev_lower:
logger.success(f"✓ 延迟反转确认(追踪): 上一根触下轨后反弹到 {prev_lower:.2f}")
return 'long', prev_lower, "上一根触下轨后反弹确认"
else:
# 上一根未触轨,突破上一根实体
prev_body_high = max(prev_kline['open'], prev_kline['close'])
if high >= prev_body_high:
logger.success(f"✓ 延迟反转确认(追踪): 突破上一根实体 {prev_body_high:.2f}")
return 'long', prev_body_high, "突破上一根实体确认"
return None
# ========== 主循环 ==========
def run(self):
"""策略主循环"""
logger.info("=" * 60)
logger.info(f"布林带延迟反转策略启动")
logger.info(f"BB({self.cfg.BB_PERIOD}, {self.cfg.BB_STD}) | "
f"{self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
logger.info("=" * 60)
# 设置杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,退出")
return
# 初始持仓
if not self.get_position_status():
logger.error("初始持仓查询失败,退出")
return
logger.info(f"初始持仓: {self.position}")
page_start = True
kline_history = [] # 保存历史K线
prev_bb_upper = None # 保存上一根K线的上轨
prev_bb_lower = None # 保存上一根K线的下轨
while True:
try:
# ===== 浏览器管理 =====
if page_start:
for i in range(5):
if self.open_browser():
logger.info("浏览器打开成功")
break
else:
logger.error("打开浏览器失败")
return
self.page.get(self.cfg.TRADE_URL)
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
page_start = False
# 定期刷新浏览器
if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL:
logger.info("浏览器刷新")
try:
self.page.close()
except:
pass
page_start = True
time.sleep(3)
continue
# ===== 获取K线 =====
klines = self.get_klines()
if not klines or len(klines) < self.cfg.BB_PERIOD + 1:
logger.warning(f"K线数据不足: {len(klines) if klines else 0}")
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 使用已收盘K线计算BB
closed_klines = klines[:-1]
current_kline = klines[-1]
if len(closed_klines) < self.cfg.BB_PERIOD:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 计算布林带
closes = [k['close'] for k in closed_klines]
bb = self.calc_bollinger(closes)
if bb is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
bb_mid, bb_upper, bb_lower = bb
# 获取当前价格
current_price = self.get_current_price()
if current_price is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
cur_high = current_kline['high']
cur_low = current_kline['low']
kline_id = current_kline['id']
# 触轨判断
touched_upper = cur_high >= bb_upper
touched_lower = cur_low <= bb_lower
touched_middle = cur_low <= bb_mid <= cur_high
# 延迟反转状态显示
delay_status = ""
if self.delay_reverse_price is not None:
delay_status = f" | 🔄延迟反转中: {self.delay_reverse_type} @ {self.delay_reverse_price:.2f}"
logger.info(
f"价格={current_price:.2f} | "
f"BB: {bb_lower:.2f}/{bb_mid:.2f}/{bb_upper:.2f} | "
f"触上={touched_upper} 触下={touched_lower} 触中={touched_middle} | "
f"仓位={self.position}{delay_status}"
)
# 同步持仓
if not self.get_position_status():
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 避免同一K线重复触发
if kline_id == self.last_kline_id:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# ===== 延迟反转确认(优先级最高)=====
if self.delay_reverse_price is not None:
prev_kline = kline_history[-1] if len(kline_history) > 0 else None
reversal = self.check_delay_reversal(
current_kline, prev_kline, prev_bb_upper, prev_bb_lower
)
if reversal:
new_direction, reversal_price, reason = reversal
logger.warning(f"🔄 延迟反转确认: {reason} @ {reversal_price:.2f}")
# 平仓
logger.info("执行平仓...")
self.browser_close_position(1.0)
time.sleep(3)
self.get_position_status()
if self.position == 0:
# 反向开仓
logger.info(f"执行反向开{'' if new_direction == 'long' else ''}...")
balance = self.get_balance()
if balance:
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
self.browser_open_position(new_direction, usdt_amount)
time.sleep(3)
self.get_position_status()
if self.position != 0:
self.position_count = 1
self.mid_closed_half = False
self.clear_delay_reversal()
self.last_kline_id = kline_id
logger.success(f"✓ 延迟反转完成!")
else:
logger.error(f"平仓后仍有持仓: {self.position}")
# 更新历史
kline_history.append(current_kline)
prev_bb_upper = bb_upper
prev_bb_lower = bb_lower
continue
# ===== 中轨平仓 =====
if self.position != 0 and touched_middle:
if not self.mid_closed_half:
logger.info("触中轨平50%")
self.browser_close_position(0.5)
time.sleep(2)
self.mid_closed_half = True
self.last_kline_id = kline_id
continue
elif self.mid_closed_half:
# 回到开仓价全平+反手
if (self.position > 0 and cur_low <= self.entry_price) or \
(self.position < 0 and cur_high >= self.entry_price):
logger.info("回到开仓价,全平+反手")
old_direction = 'long' if self.position > 0 else 'short'
new_direction = 'short' if old_direction == 'long' else 'long'
self.browser_close_position(1.0)
time.sleep(2)
self.get_position_status()
if self.position == 0:
balance = self.get_balance()
if balance:
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
self.browser_open_position(new_direction, usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position != 0:
self.position_count = 1
self.mid_closed_half = False
self.last_kline_id = kline_id
continue
# ===== 开仓与加仓 =====
if self.position == 0:
self.clear_delay_reversal()
balance = self.get_balance()
if not balance:
time.sleep(self.cfg.POLL_INTERVAL)
continue
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
if touched_upper:
logger.info(f"空仓触上轨,开空 {usdt_amount}U")
self.browser_open_position('short', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == -1:
self.position_count = 1
self.last_kline_id = kline_id
elif touched_lower:
logger.info(f"空仓触下轨,开多 {usdt_amount}U")
self.browser_open_position('long', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == 1:
self.position_count = 1
self.last_kline_id = kline_id
# ===== 延迟反转触发 =====
elif self.position > 0 and touched_upper:
logger.warning("⚠️ 多仓触上轨,标记延迟反转")
self.mark_delay_reversal('long_to_short', bb_upper, kline_id)
self.last_kline_id = kline_id
elif self.position < 0 and touched_lower:
logger.warning("⚠️ 空仓触下轨,标记延迟反转")
self.mark_delay_reversal('short_to_long', bb_lower, kline_id)
self.last_kline_id = kline_id
# ===== 加仓 =====
elif self.position_count == 1 and self.delay_reverse_price is None:
balance = self.get_balance()
if balance:
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
if self.position > 0 and touched_lower:
logger.info(f"多仓触下轨,加仓 {usdt_amount}U")
self.browser_open_position('long', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == 1:
self.position_count = 2
self.last_kline_id = kline_id
elif self.position < 0 and touched_upper:
logger.info(f"空仓触上轨,加仓 {usdt_amount}U")
self.browser_open_position('short', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == -1:
self.position_count = 2
self.last_kline_id = kline_id
# 更新K线历史和布林带历史
kline_history.append(current_kline)
if len(kline_history) > 100:
kline_history = kline_history[-100:]
prev_bb_upper = bb_upper
prev_bb_lower = bb_lower
time.sleep(self.cfg.POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("用户中断")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
page_start = True
time.sleep(10)
if __name__ == "__main__":
# 使用框架的浏览器ID
trader = BBDelayReversalTrader(bit_id="f2320f57e24c45529a009e1541e25961")
trader.run()