""" 布林带延迟反转策略 - 实盘交易 基于回测策略 bb_backtest_march_2026.py 使用框架的API查询 + 浏览器自动化交易 """ import time import numpy as np from datetime import datetime, timezone from pathlib import Path from loguru import logger from bit_tools import openBrowser from DrissionPage import ChromiumPage, ChromiumOptions from bitmart.api_contract import APIContract class BBDelayReversalConfig: """策略配置""" # 浏览器ID(从框架获取) BIT_ID = "f2320f57e24c45529a009e1541e25961" # API凭证(从框架获取) API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" MEMO = "合约交易" # 合约 CONTRACT_SYMBOL = "ETHUSDT" TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT" # 布林带参数 BB_PERIOD = 10 BB_STD = 2.5 # 仓位管理(从框架获取) LEVERAGE = "50" OPEN_TYPE = "isolated" # 逐仓模式 MARGIN_PCT = 0.01 # 首次开仓1% # 运行参数 POLL_INTERVAL = 5 KLINE_STEP = 5 KLINE_HOURS = 2 class BBDelayReversalTrader: """布林带延迟反转交易器""" def __init__(self, bit_id: str = None): self.cfg = BBDelayReversalConfig() if bit_id: self.cfg.BIT_ID = bit_id # API(使用框架配置) self.contractAPI = APIContract( self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO, timeout=(5, 15) ) # 浏览器 self.page: ChromiumPage | None = None self.page_start = True self.last_page_open_time = 0.0 self.PAGE_REFRESH_INTERVAL = 1800 # 持仓状态 self.position = 0 # -1空, 0无, 1多 self.position_count = 0 # 0空仓, 1首次, 2加仓 self.entry_price = 0 self.current_amount = 0 self.total_margin = 0 # 延迟反转状态 self.delay_reverse_price = None self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long' self.delay_reverse_kline_id = None # 中轨平仓 self.mid_closed_half = False # 交易控制 self.last_trade_time = 0.0 self.last_kline_id = None # 日志 self.log_dir = Path(__file__).resolve().parent logger.add( self.log_dir / "bb_delay_trade_{time:YYYY-MM-DD}.log", rotation="1 day", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" ) # ========== API查询方法 ========== def get_klines(self) -> list | None: """获取5分钟K线(使用框架方法)""" try: end_time = int(time.time()) start_time = end_time - 3600 * self.cfg.KLINE_HOURS response = self.contractAPI.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=self.cfg.KLINE_STEP, start_time=start_time, end_time=end_time )[0]["data"] formatted = [] for k in response: formatted.append({ 'id': int(k["timestamp"]), 'open': float(k["open_price"]), 'high': float(k["high_price"]), 'low': float(k["low_price"]), 'close': float(k["close_price"]) }) formatted.sort(key=lambda x: x['id']) return formatted except Exception as e: logger.error(f"获取K线异常: {e}") return None def get_current_price(self) -> float | None: """获取当前价格(使用框架方法)""" try: end_time = int(time.time()) response = self.contractAPI.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=1, start_time=end_time - 3600 * 3, end_time=end_time )[0] if response['code'] == 1000: return float(response['data'][-1]["close_price"]) return None except Exception as e: logger.error(f"获取价格异常: {e}") return None def get_balance(self) -> float | None: """获取可用余额(使用框架方法)""" try: response = self.contractAPI.get_assets_detail()[0] if response['code'] == 1000: data = response['data'] if isinstance(data, dict): return float(data.get('available_balance', 0)) elif isinstance(data, list): for asset in data: if asset.get('currency') == 'USDT': return float(asset.get('available_balance', 0)) return None except Exception as e: logger.error(f"余额查询异常: {e}") return None def get_position_status(self) -> bool: """查询持仓状态(使用框架方法)""" try: response = self.contractAPI.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0] if response['code'] == 1000: positions = response['data'] if not positions: self.position = 0 self.position_count = 0 self.entry_price = 0 self.current_amount = 0 return True pos = positions[0] self.position = 1 if pos['position_type'] == 1 else -1 self.entry_price = float(pos['open_avg_price']) self.current_amount = float(pos['current_amount']) logger.debug(f"持仓: {'多' if self.position > 0 else '空'} | " f"价格={self.entry_price:.2f} | 数量={self.current_amount:.4f}") return True else: return False except Exception as e: logger.error(f"持仓查询异常: {e}") return False def set_leverage(self) -> bool: """设置杠杆(使用框架方法)""" try: response = self.contractAPI.post_submit_leverage( contract_symbol=self.cfg.CONTRACT_SYMBOL, leverage=self.cfg.LEVERAGE, open_type=self.cfg.OPEN_TYPE )[0] if response['code'] == 1000: logger.success(f"{self.cfg.OPEN_TYPE}模式 + {self.cfg.LEVERAGE}x 杠杆设置成功") return True else: logger.error(f"杠杆设置失败: {response}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False # ========== 布林带计算 ========== def calc_bollinger(self, closes: list): """计算布林带""" if len(closes) < self.cfg.BB_PERIOD: return None arr = np.array(closes[-self.cfg.BB_PERIOD:], dtype=float) mid = arr.mean() std = arr.std(ddof=0) upper = mid + self.cfg.BB_STD * std lower = mid - self.cfg.BB_STD * std return mid, upper, lower # ========== 浏览器自动化 ========== def open_browser(self) -> bool: """打开浏览器(使用框架方法)""" try: bit_port = openBrowser(id=self.cfg.BIT_ID) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) self.last_page_open_time = time.time() return True except: return False def click_safe(self, xpath, sleep=0.5) -> bool: """安全点击(使用框架方法)""" try: ele = self.page.ele(xpath) if not ele: return False ele.scroll.to_see(center=True) time.sleep(sleep) ele.click() return True except: return False def browser_open_position(self, direction: str, usdt_amount: float) -> bool: """浏览器开仓(使用框架的开单方法)""" try: logger.info(f"浏览器操作: 开{'多' if direction == 'long' else '空'} {usdt_amount}U") # 使用框架的开单方法 if direction == 'long': # 市价做多 self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(usdt_amount) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') else: # 市价做空 self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(usdt_amount) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') time.sleep(2) return True except Exception as e: logger.error(f"浏览器开仓失败: {e}") return False def browser_close_position(self, ratio: float = 1.0) -> bool: """浏览器平仓(使用框架方法)""" try: logger.info(f"浏览器操作: 平仓{int(ratio*100)}%") if ratio >= 0.99: # 全平(使用框架的全平仓方法) self.click_safe('x://span[normalize-space(text()) ="市价"]') else: # 平半(使用框架的平一半方法) self.click_safe('x://button[normalize-space(text()) ="平仓"]') self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]') if self.position > 0: # 平一半多仓 self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]') else: # 平一半空仓 self.click_safe('x://span[normalize-space(text()) ="买入/平空"]') time.sleep(2) return True except Exception as e: logger.error(f"浏览器平仓失败: {e}") return False # ========== 延迟反转逻辑 ========== def mark_delay_reversal(self, reverse_type: str, trigger_price: float, kline_id: int): """标记延迟反转""" self.delay_reverse_type = reverse_type self.delay_reverse_price = trigger_price self.delay_reverse_kline_id = kline_id logger.info(f"触发延迟反转: {reverse_type} @ {trigger_price:.2f}") def clear_delay_reversal(self): """清除延迟反转状态""" self.delay_reverse_price = None self.delay_reverse_type = None self.delay_reverse_kline_id = None def check_delay_reversal(self, current_kline, prev_kline, kline_index) -> tuple | None: """检查延迟反转确认""" if self.position == 0 or self.delay_reverse_price is None: return None if self.delay_reverse_kline_id is None: return None offset = kline_index - self.delay_reverse_kline_id if offset <= 0: return None high = current_kline['high'] low = current_kline['low'] if self.delay_reverse_type == 'long_to_short': # 多转空: 回调到记录价格 if offset == 1 and low <= self.delay_reverse_price: return 'short', self.delay_reverse_price, "次K回调确认" if offset >= 2 and prev_kline: prev_body_low = min(prev_kline['open'], prev_kline['close']) if low <= prev_body_low: return 'short', prev_body_low, "跌破上一根实体" elif self.delay_reverse_type == 'short_to_long': # 空转多: 反弹到记录价格 if offset == 1 and high >= self.delay_reverse_price: return 'long', self.delay_reverse_price, "次K反弹确认" if offset >= 2 and prev_kline: prev_body_high = max(prev_kline['open'], prev_kline['close']) if high >= prev_body_high: return 'long', prev_body_high, "突破上一根实体" return None # ========== 主循环 ========== def run(self): """策略主循环""" logger.info("=" * 60) logger.info(f"布林带延迟反转策略启动") logger.info(f"BB({self.cfg.BB_PERIOD}, {self.cfg.BB_STD}) | " f"{self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}") logger.info("=" * 60) # 设置杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,退出") return # 初始持仓 if not self.get_position_status(): logger.error("初始持仓查询失败,退出") return logger.info(f"初始持仓: {self.position}") page_start = True kline_history = [] while True: try: # ===== 浏览器管理 ===== if page_start: for i in range(5): if self.open_browser(): logger.info("浏览器打开成功") break else: logger.error("打开浏览器失败") return self.page.get(self.cfg.TRADE_URL) time.sleep(2) self.click_safe('x://button[normalize-space(text()) ="市价"]') page_start = False # 定期刷新浏览器 if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL: logger.info("浏览器刷新") try: self.page.close() except: pass page_start = True time.sleep(3) continue # ===== 获取K线 ===== klines = self.get_klines() if not klines or len(klines) < self.cfg.BB_PERIOD + 1: logger.warning(f"K线数据不足: {len(klines) if klines else 0}") time.sleep(self.cfg.POLL_INTERVAL) continue # 使用已收盘K线计算BB closed_klines = klines[:-1] current_kline = klines[-1] if len(closed_klines) < self.cfg.BB_PERIOD: time.sleep(self.cfg.POLL_INTERVAL) continue # 计算布林带 closes = [k['close'] for k in closed_klines] bb = self.calc_bollinger(closes) if bb is None: time.sleep(self.cfg.POLL_INTERVAL) continue bb_mid, bb_upper, bb_lower = bb # 获取当前价格 current_price = self.get_current_price() if current_price is None: time.sleep(self.cfg.POLL_INTERVAL) continue cur_high = current_kline['high'] cur_low = current_kline['low'] kline_id = current_kline['id'] # 触轨判断 touched_upper = cur_high >= bb_upper touched_lower = cur_low <= bb_lower touched_middle = cur_low <= bb_mid <= cur_high logger.info( f"价格={current_price:.2f} | " f"BB: {bb_lower:.2f}/{bb_mid:.2f}/{bb_upper:.2f} | " f"触上={touched_upper} 触下={touched_lower} 触中={touched_middle} | " f"仓位={self.position}" ) # 同步持仓 if not self.get_position_status(): time.sleep(self.cfg.POLL_INTERVAL) continue # 避免同一K线重复触发 if kline_id == self.last_kline_id: time.sleep(self.cfg.POLL_INTERVAL) continue # ===== 延迟反转确认 ===== if self.delay_reverse_price is not None and len(kline_history) > 0: prev_kline = kline_history[-1] if len(kline_history) > 0 else None reversal = self.check_delay_reversal( current_kline, prev_kline, len(kline_history) ) if reversal: new_direction, reversal_price, reason = reversal logger.info(f"延迟反转确认: {reason} @ {reversal_price:.2f}") # 平仓 self.browser_close_position(1.0) time.sleep(2) self.get_position_status() if self.position == 0: # 反向开仓 balance = self.get_balance() if balance: usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) self.browser_open_position(new_direction, usdt_amount) time.sleep(2) self.get_position_status() if self.position != 0: self.position_count = 1 self.mid_closed_half = False self.clear_delay_reversal() self.last_kline_id = kline_id continue # ===== 中轨平仓 ===== if self.position != 0 and touched_middle: if not self.mid_closed_half: logger.info("触中轨,平50%") self.browser_close_position(0.5) time.sleep(2) self.mid_closed_half = True self.last_kline_id = kline_id continue elif self.mid_closed_half: # 回到开仓价全平+反手 if (self.position > 0 and cur_low <= self.entry_price) or \ (self.position < 0 and cur_high >= self.entry_price): logger.info("回到开仓价,全平+反手") old_direction = 'long' if self.position > 0 else 'short' new_direction = 'short' if old_direction == 'long' else 'long' self.browser_close_position(1.0) time.sleep(2) self.get_position_status() if self.position == 0: balance = self.get_balance() if balance: usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) self.browser_open_position(new_direction, usdt_amount) time.sleep(2) self.get_position_status() if self.position != 0: self.position_count = 1 self.mid_closed_half = False self.last_kline_id = kline_id continue # ===== 开仓与加仓 ===== if self.position == 0: self.clear_delay_reversal() balance = self.get_balance() if not balance: time.sleep(self.cfg.POLL_INTERVAL) continue usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) if touched_upper: logger.info(f"空仓触上轨,开空 {usdt_amount}U") self.browser_open_position('short', usdt_amount) time.sleep(2) self.get_position_status() if self.position == -1: self.position_count = 1 self.last_kline_id = kline_id elif touched_lower: logger.info(f"空仓触下轨,开多 {usdt_amount}U") self.browser_open_position('long', usdt_amount) time.sleep(2) self.get_position_status() if self.position == 1: self.position_count = 1 self.last_kline_id = kline_id # ===== 延迟反转触发 ===== elif self.position > 0 and touched_upper: logger.info("多仓触上轨,标记延迟反转") self.mark_delay_reversal('long_to_short', bb_upper, len(kline_history)) self.last_kline_id = kline_id elif self.position < 0 and touched_lower: logger.info("空仓触下轨,标记延迟反转") self.mark_delay_reversal('short_to_long', bb_lower, len(kline_history)) self.last_kline_id = kline_id # ===== 加仓 ===== elif self.position_count == 1 and self.delay_reverse_price is None: balance = self.get_balance() if balance: usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2) if self.position > 0 and touched_lower: logger.info(f"多仓触下轨,加仓 {usdt_amount}U") self.browser_open_position('long', usdt_amount) time.sleep(2) self.get_position_status() if self.position == 1: self.position_count = 2 self.last_kline_id = kline_id elif self.position < 0 and touched_upper: logger.info(f"空仓触上轨,加仓 {usdt_amount}U") self.browser_open_position('short', usdt_amount) time.sleep(2) self.get_position_status() if self.position == -1: self.position_count = 2 self.last_kline_id = kline_id # 更新K线历史 kline_history.append(current_kline) if len(kline_history) > 100: kline_history = kline_history[-100:] time.sleep(self.cfg.POLL_INTERVAL) except KeyboardInterrupt: logger.info("用户中断") break except Exception as e: logger.error(f"主循环异常: {e}") page_start = True time.sleep(10) if __name__ == "__main__": # 使用框架的浏览器ID trader = BBDelayReversalTrader(bit_id="f2320f57e24c45529a009e1541e25961") trader.run()