""" 布林带延迟反转策略 - 实盘交易 基于回测策略 bb_backtest_march_2026.py 使用框架的API查询 + 浏览器自动化交易 """ import time import numpy as np from datetime import datetime, timezone from pathlib import Path from loguru import logger from bit_tools import openBrowser from DrissionPage import ChromiumPage, ChromiumOptions from bitmart.api_contract import APIContract class BBDelayReversalConfig: """策略配置""" # 浏览器ID(从框架获取) BIT_ID = "f2320f57e24c45529a009e1541e25961" # API凭证(从框架获取) API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" MEMO = "合约交易" # 合约 CONTRACT_SYMBOL = "ETHUSDT" TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT" # 布林带参数 BB_PERIOD = 10 BB_STD = 2.5 # 仓位管理(从框架获取) LEVERAGE = "50" OPEN_TYPE = "isolated" # 逐仓模式 MARGIN_PCT = 0.01 # 首次开仓1% # 运行参数 POLL_INTERVAL = 5 KLINE_STEP = 5 KLINE_HOURS = 2 class BBDelayReversalTrader: """布林带延迟反转交易器""" def __init__(self, bit_id: str = None): self.cfg = BBDelayReversalConfig() if bit_id: self.cfg.BIT_ID = bit_id # API(使用框架配置) self.contractAPI = APIContract( self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO, timeout=(5, 15) ) # 浏览器 self.page: ChromiumPage | None = None self.page_start = True self.last_page_open_time = 0.0 self.PAGE_REFRESH_INTERVAL = 1800 # 持仓状态 self.position = 0 # -1空, 0无, 1多 self.position_count = 0 # 0空仓, 1首次, 2加仓 self.entry_price = 0 self.current_amount = 0 self.total_margin = 0 # 延迟反转状态 self.delay_reverse_price = None self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long' self.delay_reverse_kline_id = None # 中轨平仓 self.mid_closed_half = False # 交易控制 self.last_trade_time = 0.0 self.last_kline_id = None self.cooldown_seconds = 10 # 交易冷却时间 # 日志 self.log_dir = Path(__file__).resolve().parent logger.add( self.log_dir / "bb_delay_trade_{time:YYYY-MM-DD}.log", rotation="1 day", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" ) # ========== API查询方法 ========== def get_klines(self) -> list | None: """获取5分钟K线(使用框架方法)""" try: end_time = int(time.time()) start_time = end_time - 3600 * self.cfg.KLINE_HOURS response = self.contractAPI.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=self.cfg.KLINE_STEP, start_time=start_time, end_time=end_time )[0]["data"] formatted = [] for k in response: formatted.append({ 'id': int(k["timestamp"]), 'open': float(k["open_price"]), 'high': float(k["high_price"]), 'low': float(k["low_price"]), 'close': float(k["close_price"]) }) formatted.sort(key=lambda x: x['id']) return formatted except Exception as e: logger.error(f"获取K线异常: {e}") return None def get_current_price(self) -> float | None: """获取当前价格(使用框架方法)""" try: end_time = int(time.time()) response = self.contractAPI.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=1, start_time=end_time - 3600 * 3, end_time=end_time )[0] if response['code'] == 1000: return float(response['data'][-1]["close_price"]) return None except Exception as e: logger.error(f"获取价格异常: {e}") return None def get_balance(self) -> float | None: """获取可用余额(使用框架方法)""" try: response = self.contractAPI.get_assets_detail()[0] if response['code'] == 1000: data = response['data'] if isinstance(data, dict): return float(data.get('available_balance', 0)) elif isinstance(data, list): for asset in data: if asset.get('currency') == 'USDT': return float(asset.get('available_balance', 0)) return None except Exception as e: logger.error(f"余额查询异常: {e}") return None def get_position_status(self) -> bool: """查询持仓状态(使用框架方法)""" try: response = self.contractAPI.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0] if response['code'] == 1000: positions = response['data'] if not positions: self.position = 0 self.position_count = 0 self.entry_price = 0 self.current_amount = 0 return True pos = positions[0] self.position = 1 if pos['position_type'] == 1 else -1 self.entry_price = float(pos['open_avg_price']) self.current_amount = float(pos['current_amount']) logger.debug(f"持仓: {'多' if self.position > 0 else '空'} | " f"价格={self.entry_price:.2f} | 数量={self.current_amount:.4f}") return True else: return False except Exception as e: logger.error(f"持仓查询异常: {e}") return False def set_leverage(self) -> bool: """设置杠杆(使用框架方法)""" try: response = self.contractAPI.post_submit_leverage( contract_symbol=self.cfg.CONTRACT_SYMBOL, leverage=self.cfg.LEVERAGE, open_type=self.cfg.OPEN_TYPE )[0] if response['code'] == 1000: logger.success(f"{self.cfg.OPEN_TYPE}模式 + {self.cfg.LEVERAGE}x 杠杆设置成功") return True else: logger.error(f"杠杆设置失败: {response}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False # ========== 布林带计算 ========== def calc_bollinger(self, closes: list): """计算布林带""" if len(closes) < self.cfg.BB_PERIOD: return None arr = np.array(closes[-self.cfg.BB_PERIOD:], dtype=float) mid = arr.mean() std = arr.std(ddof=0) upper = mid + self.cfg.BB_STD * std lower = mid - self.cfg.BB_STD * std return mid, upper, lower # ========== 浏览器自动化 ========== def open_browser(self) -> bool: """打开浏览器(使用框架方法)""" try: bit_port = openBrowser(id=self.cfg.BIT_ID) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) self.last_page_open_time = time.time() return True except: return False def click_safe(self, xpath, sleep=0.5) -> bool: """安全点击(使用框架方法)""" try: ele = self.page.ele(xpath) if not ele: return False ele.scroll.to_see(center=True) time.sleep(sleep) ele.click() return True except: return False def browser_open_position(self, direction: str, usdt_amount: float) -> bool: """浏览器开仓(使用框架的开单方法)""" try: logger.info(f"浏览器操作: 开{'多' if direction == 'long' else '空'} {usdt_amount}U") # 1. 先点击"开仓"按钮 self.click_safe('x://button[normalize-space(text()) ="开仓"]') time.sleep(0.5) # 2. 点击市价 self.click_safe('x://button[normalize-space(text()) ="市价"]') time.sleep(0.5) # 3. 输入金额 self.page.ele('x://*[@id="size_0"]').input(vals=usdt_amount, clear=True) time.sleep(0.5) # 4. 点击开仓按钮 if direction == 'long': # 市价做多 self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') else: # 市价做空 self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') time.sleep(2) return True except Exception as e: logger.error(f"浏览器开仓失败: {e}") return False def browser_close_position(self, ratio: float = 1.0) -> bool: """浏览器平仓(使用框架方法)""" try: logger.info(f"浏览器操作: 平仓{int(ratio*100)}%") # 1. 先点击"平仓"按钮 self.click_safe('x://button[normalize-space(text()) ="平仓"]') time.sleep(0.5) if ratio >= 0.99: # 2. 全平:点击市价 self.click_safe('x://button[normalize-space(text()) ="市价"]') time.sleep(0.5) # 3. 点击平仓按钮 if self.position > 0: # 平多仓 self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]') else: # 平空仓 self.click_safe('x://span[normalize-space(text()) ="买入/平空"]') else: # 2. 平半:点击50%按钮 self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]') time.sleep(0.5) # 3. 点击平仓按钮 if self.position > 0: # 平一半多仓 self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]') else: # 平一半空仓 self.click_safe('x://span[normalize-space(text()) ="买入/平空"]') time.sleep(2) return True except Exception as e: logger.error(f"浏览器平仓失败: {e}") return False def can_trade(self) -> bool: """检查是否可以交易(冷却时间)""" now = time.time() if now - self.last_trade_time < self.cooldown_seconds: remain = self.cooldown_seconds - (now - self.last_trade_time) logger.debug(f"交易冷却中,剩余 {remain:.0f}s") return False return True def update_trade_time(self): """更新最后交易时间""" self.last_trade_time = time.time() # ========== 延迟反转逻辑 ========== def mark_delay_reversal(self, reverse_type: str, trigger_price: float, kline_id: int): """标记延迟反转""" self.delay_reverse_type = reverse_type self.delay_reverse_price = trigger_price self.delay_reverse_kline_id = kline_id logger.warning(f"⚠️ 延迟反转触发: {reverse_type} @ {trigger_price:.2f} | K线ID: {kline_id}") def clear_delay_reversal(self): """清除延迟反转状态""" self.delay_reverse_price = None self.delay_reverse_type = None self.delay_reverse_kline_id = None def check_delay_reversal(self, current_kline, prev_kline, prev_upper, prev_lower) -> tuple | None: """ 检查延迟反转确认 根据回测代码的逻辑: 1. 次K确认:触轨后的下一根K线回调/反弹到记录价格 2. 持续追踪:追踪上一根K线的触轨情况或实体 """ if self.position == 0 or self.delay_reverse_price is None: return None if self.delay_reverse_kline_id is None: return None current_kline_id = current_kline['id'] trigger_kline_id = self.delay_reverse_kline_id # 计算K线偏移(5分钟 = 300000ms) offset = (current_kline_id - trigger_kline_id) // 300000 if offset <= 0: return None high = current_kline['high'] low = current_kline['low'] logger.debug(f"延迟反转检查: offset={offset} | 当前K线 H={high:.2f} L={low:.2f}") if self.delay_reverse_type == 'long_to_short': # 多转空: 需要价格回调 if offset == 1: # 次K确认:回调到记录的上轨价格 if low <= self.delay_reverse_price: logger.success(f"✓ 延迟反转确认(次K): 回调到 {self.delay_reverse_price:.2f}") return 'short', self.delay_reverse_price, "次K回调确认" elif offset >= 2 and prev_kline and prev_upper: # 持续追踪上一根K线 prev_high = prev_kline['high'] prev_touch_upper = prev_high >= prev_upper if prev_touch_upper: # 上一根触上轨,当前K回调到上一根上轨价 if low <= prev_upper: logger.success(f"✓ 延迟反转确认(追踪): 上一根触上轨后回调到 {prev_upper:.2f}") return 'short', prev_upper, "上一根触上轨后回调确认" else: # 上一根未触轨,跌破上一根实体 prev_body_low = min(prev_kline['open'], prev_kline['close']) if low <= prev_body_low: logger.success(f"✓ 延迟反转确认(追踪): 跌破上一根实体 {prev_body_low:.2f}") return 'short', prev_body_low, "跌破上一根实体确认" elif self.delay_reverse_type == 'short_to_long': # 空转多: 需要价格反弹 if offset == 1: # 次K确认:反弹到记录的下轨价格 if high >= self.delay_reverse_price: logger.success(f"✓ 延迟反转确认(次K): 反弹到 {self.delay_reverse_price:.2f}") return 'long', self.delay_reverse_price, "次K反弹确认" elif offset >= 2 and prev_kline and prev_lower: # 持续追踪上一根K线 prev_low = prev_kline['low'] prev_touch_lower = prev_low <= prev_lower if prev_touch_lower: # 上一根触下轨,当前K反弹到上一根下轨价 if high >= prev_lower: logger.success(f"✓ 延迟反转确认(追踪): 上一根触下轨后反弹到 {prev_lower:.2f}") return 'long', prev_lower, "上一根触下轨后反弹确认" else: # 上一根未触轨,突破上一根实体 prev_body_high = max(prev_kline['open'], prev_kline['close']) if high >= prev_body_high: logger.success(f"✓ 延迟反转确认(追踪): 突破上一根实体 {prev_body_high:.2f}") return 'long', prev_body_high, "突破上一根实体确认" return None # ========== 主循环 ========== def run(self): """策略主循环""" logger.info("=" * 60) logger.info(f"布林带延迟反转策略启动") logger.info(f"BB({self.cfg.BB_PERIOD}, {self.cfg.BB_STD}) | " f"{self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}") logger.info("=" * 60) # 设置杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,退出") return # 初始持仓 if not self.get_position_status(): logger.error("初始持仓查询失败,退出") return logger.info(f"初始持仓: {self.position}") page_start = True kline_history = [] # 保存历史K线 prev_bb_upper = None # 保存上一根K线的上轨 prev_bb_lower = None # 保存上一根K线的下轨 while True: try: # ===== 浏览器管理 ===== if page_start: for i in range(5): if self.open_browser(): logger.info("浏览器打开成功") break else: logger.error("打开浏览器失败") return self.page.get(self.cfg.TRADE_URL) time.sleep(2) # 默认点击开仓按钮,准备开仓 self.click_safe('x://button[normalize-space(text()) ="开仓"]') time.sleep(0.5) self.click_safe('x://button[normalize-space(text()) ="市价"]') page_start = False # 定期刷新浏览器 if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL: logger.info("浏览器刷新") try: self.page.close() except: pass page_start = True time.sleep(3) continue # ===== 获取K线 ===== klines = self.get_klines() if not klines or len(klines) < self.cfg.BB_PERIOD + 1: logger.warning(f"K线数据不足: {len(klines) if klines else 0}") time.sleep(self.cfg.POLL_INTERVAL) continue # 使用已收盘K线计算BB closed_klines = klines[:-1] current_kline = klines[-1] if len(closed_klines) < self.cfg.BB_PERIOD: time.sleep(self.cfg.POLL_INTERVAL) continue # 计算布林带 closes = [k['close'] for k in closed_klines] bb = self.calc_bollinger(closes) if bb is None: time.sleep(self.cfg.POLL_INTERVAL) continue bb_mid, bb_upper, bb_lower = bb # 获取当前价格 current_price = self.get_current_price() if current_price is None: time.sleep(self.cfg.POLL_INTERVAL) continue cur_high = current_kline['high'] cur_low = current_kline['low'] kline_id = current_kline['id'] # 触轨判断 touched_upper = cur_high >= bb_upper touched_lower = cur_low <= bb_lower touched_middle = cur_low <= bb_mid <= cur_high # 延迟反转状态显示 delay_status = "" if self.delay_reverse_price is not None: delay_status = f" | 🔄延迟反转中: {self.delay_reverse_type} @ {self.delay_reverse_price:.2f}" logger.info( f"价格={current_price:.2f} | " f"BB: {bb_lower:.2f}/{bb_mid:.2f}/{bb_upper:.2f} | " f"触上={touched_upper} 触下={touched_lower} 触中={touched_middle} | " f"仓位={self.position}{delay_status}" ) # 同步持仓 if not self.get_position_status(): time.sleep(self.cfg.POLL_INTERVAL) continue # 避免同一K线重复触发 if kline_id == self.last_kline_id: time.sleep(self.cfg.POLL_INTERVAL) continue # ===== 延迟反转确认(优先级最高)===== if self.delay_reverse_price is not None: prev_kline = kline_history[-1] if len(kline_history) > 0 else None reversal = self.check_delay_reversal( current_kline, prev_kline, prev_bb_upper, prev_bb_lower ) if reversal: new_direction, reversal_price, reason = reversal logger.warning(f"🔄 延迟反转确认: {reason} @ {reversal_price:.2f}") # 平仓 logger.info("执行平仓...") self.browser_close_position(1.0) time.sleep(3) self.get_position_status() if self.position == 0: # 反向开仓 logger.info(f"执行反向开{'多' if new_direction == 'long' else '空'}...") balance = self.get_balance() if balance: usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) self.browser_open_position(new_direction, usdt_amount) time.sleep(3) self.get_position_status() if self.position != 0: self.position_count = 1 self.mid_closed_half = False self.clear_delay_reversal() self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 延迟反转完成!") else: logger.error(f"平仓后仍有持仓: {self.position}") # 更新历史 kline_history.append(current_kline) prev_bb_upper = bb_upper prev_bb_lower = bb_lower continue # ===== 中轨平仓(延迟反转期间不执行)===== if self.position != 0 and touched_middle and self.delay_reverse_price is None: if not self.mid_closed_half: logger.info("📊 触中轨,平50%") if not self.can_trade(): continue self.browser_close_position(0.5) time.sleep(3) self.get_position_status() self.mid_closed_half = True self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 平半成功") # 更新历史 kline_history.append(current_kline) prev_bb_upper = bb_upper prev_bb_lower = bb_lower continue # ===== 回到开仓价全平+反手(仅在已平半的情况下)===== if self.position != 0 and self.mid_closed_half and self.delay_reverse_price is None: should_close = False if self.position > 0 and cur_low <= self.entry_price: should_close = True elif self.position < 0 and cur_high >= self.entry_price: should_close = True if should_close: logger.info(f"💰 回到开仓价 {self.entry_price:.2f},全平+反手") if not self.can_trade(): continue old_direction = 'long' if self.position > 0 else 'short' new_direction = 'short' if old_direction == 'long' else 'long' self.browser_close_position(1.0) time.sleep(3) self.get_position_status() if self.position == 0: balance = self.get_balance() if balance: usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) self.browser_open_position(new_direction, usdt_amount) time.sleep(3) self.get_position_status() if self.position != 0: self.position_count = 1 self.mid_closed_half = False self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 反手完成!") # 更新历史 kline_history.append(current_kline) prev_bb_upper = bb_upper prev_bb_lower = bb_lower continue # ===== 开仓(空仓时)===== if self.position == 0: self.clear_delay_reversal() balance = self.get_balance() if not balance: time.sleep(self.cfg.POLL_INTERVAL) continue usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) if touched_upper: logger.info(f"🔴 空仓触上轨,开空 {usdt_amount}U") if not self.can_trade(): continue self.browser_open_position('short', usdt_amount) time.sleep(3) self.get_position_status() if self.position == -1: self.position_count = 1 self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 开空成功") elif touched_lower: logger.info(f"🟢 空仓触下轨,开多 {usdt_amount}U") if not self.can_trade(): continue self.browser_open_position('long', usdt_amount) time.sleep(3) self.get_position_status() if self.position == 1: self.position_count = 1 self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 开多成功") # ===== 延迟反转触发(有持仓且未在延迟反转中)===== elif self.position > 0 and touched_upper and self.delay_reverse_price is None: logger.warning("⚠️ 多仓触上轨,标记延迟反转") self.mark_delay_reversal('long_to_short', bb_upper, kline_id) self.last_kline_id = kline_id elif self.position < 0 and touched_lower and self.delay_reverse_price is None: logger.warning("⚠️ 空仓触下轨,标记延迟反转") self.mark_delay_reversal('short_to_long', bb_lower, kline_id) self.last_kline_id = kline_id # ===== 加仓(仅首次开仓后,且未在延迟反转中)===== elif self.position_count == 1 and self.delay_reverse_price is None: balance = self.get_balance() if balance: usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) if self.position > 0 and touched_lower: logger.info(f"➕ 多仓触下轨,加仓 {usdt_amount}U") if not self.can_trade(): continue self.browser_open_position('long', usdt_amount) time.sleep(3) self.get_position_status() if self.position == 1: self.position_count = 2 self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 加仓成功") elif self.position < 0 and touched_upper: logger.info(f"➕ 空仓触上轨,加仓 {usdt_amount}U") if not self.can_trade(): continue self.browser_open_position('short', usdt_amount) time.sleep(3) self.get_position_status() if self.position == -1: self.position_count = 2 self.last_kline_id = kline_id self.update_trade_time() logger.success(f"✓ 加仓成功") # 更新K线历史和布林带历史 kline_history.append(current_kline) if len(kline_history) > 100: kline_history = kline_history[-100:] prev_bb_upper = bb_upper prev_bb_lower = bb_lower time.sleep(self.cfg.POLL_INTERVAL) except KeyboardInterrupt: logger.info("用户中断") break except Exception as e: logger.error(f"主循环异常: {e}") page_start = True time.sleep(10) if __name__ == "__main__": # 使用框架的浏览器ID trader = BBDelayReversalTrader(bit_id="f2320f57e24c45529a009e1541e25961") trader.run()