""" 布林带延迟反转策略回测 - 2026年3月 策略规则: 1. 5分钟K线,BB(10, 2.5) 2. 空仓触上轨开空,触下轨开多 3. 同向加仓最多1次,保证金递增(1%->2%) 4. 延迟反转:触轨不立刻平仓,记录价格,回调到该价再平仓+反向开仓 5. 中轨平半+回开仓价全平 6. 止损:亏损达保证金50% """ import pandas as pd from pathlib import Path from peewee import * from loguru import logger # 数据库配置 DB_PATH = Path(__file__).parent / 'models' / 'database.db' db = SqliteDatabase(str(DB_PATH)) class BitMartETH5M(Model): """5分钟K线模型""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) class Meta: database = db table_name = 'bitmart_eth_5m' class BitMartETH1M(Model): """1分钟K线模型""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) class Meta: database = db table_name = 'bitmart_eth_1m' class BollingerBandBacktest: """布林带延迟反转策略回测""" def __init__(self): # 策略参数 self.bb_period = 10 self.bb_std = 2.5 self.initial_capital = 100 # 初始本金100U self.leverage = 50 # 50倍杠杆 self.fee_rate = 0.0005 # 万五手续费 self.rebate_rate = 0.9 # 90%返佣 self.margin_ratio_1 = 0.01 # 首次开仓保证金比例1% self.margin_ratio_2 = 0.01 # 加仓保证金比例1% self.stop_loss_ratio = 0.5 # 止损比例50% self.entry_slippage = 0.0002 # 开仓滑点(2bps) self.rebate_credit_hour = 8 # 次日早上8点返佣到账(上海时间) # 账户状态 self.capital = self.initial_capital self.position = 0 # 持仓量(正=多,负=空) self.position_count = 0 # 持仓次数(0=空仓,1=首次,2=加仓) self.entry_price = 0 # 开仓均价 self.total_margin = 0 # 总保证金 # 延迟反转状态 self.delay_reverse_price = None # 记录触碰轨道时的轨道价格 self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long' self.delay_reverse_kline_index = None # 触发延迟反转的K线索引 # 中轨平仓记录 self.mid_closed_half = False # 是否已平50% # 1分钟K线缓存(key=5分钟起始时间戳, value=该5分钟内的1m列表) self.one_minute_by_5m = {} # 交易记录 self.trades = [] self.daily_pnl = [] self.pending_rebates = [] # 待到账返佣队列 self.total_rebate_credited = 0.0 self.current_run_label = "default" def calculate_bollinger_bands(self, df): """计算布林带(整体右移1根,避免使用当前K收盘价)""" df['sma'] = df['close'].rolling(window=self.bb_period).mean() df['std'] = df['close'].rolling(window=self.bb_period).std() df['upper'] = (df['sma'] + self.bb_std * df['std']).shift(1) df['lower'] = (df['sma'] - self.bb_std * df['std']).shift(1) df['middle'] = df['sma'].shift(1) return df def get_rebate_amount(self, fee): """计算返佣金额""" return fee * self.rebate_rate def schedule_rebate(self, fee, timestamp): """登记返佣到账时间(次日08:00,上海时间)""" rebate = self.get_rebate_amount(fee) if rebate <= 0: return trade_utc = pd.Timestamp(timestamp, tz='UTC') trade_local = trade_utc.tz_convert('Asia/Shanghai') credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour) credit_utc = credit_local.tz_convert('UTC').tz_localize(None) self.pending_rebates.append({ 'credit_time': credit_utc, 'amount': rebate, 'trade_time': trade_utc.tz_localize(None), }) def apply_pending_rebates(self, current_time): """处理当前时刻前应到账的返佣""" if not self.pending_rebates: return remaining = [] for item in self.pending_rebates: if item['credit_time'] <= current_time: amount = item['amount'] self.capital += amount self.total_rebate_credited += amount self.trades.append({ 'timestamp': current_time.strftime('%Y-%m-%d %H:%M'), 'action': '返佣到账', 'price': None, 'size': None, 'rebate': amount, 'capital': self.capital, 'reason': f"次日08:00返佣到账({item['trade_time'].strftime('%Y-%m-%d %H:%M')}手续费)" }) logger.info( f"[{current_time.strftime('%Y-%m-%d %H:%M')}] 返佣到账: {amount:.4f}U | 可用资金: {self.capital:.4f}U" ) else: remaining.append(item) self.pending_rebates = remaining def apply_entry_slippage(self, price, direction): """按方向施加不利滑点""" if direction == 'long': return price * (1 + self.entry_slippage) return price * (1 - self.entry_slippage) def load_one_minute_cache(self, start_ts, end_ts): """加载并缓存1分钟K线,用于5分钟内走势确认""" query = BitMartETH1M.select().where( (BitMartETH1M.id >= start_ts) & (BitMartETH1M.id <= end_ts) ).order_by(BitMartETH1M.id) bucket = {} for row in query: five_min_start = int(row.id - (row.id % 300000)) bucket.setdefault(five_min_start, []).append({ 'timestamp': int(row.id), 'open': float(row.open) if row.open is not None else None, 'high': float(row.high) if row.high is not None else None, 'low': float(row.low) if row.low is not None else None, 'close': float(row.close) if row.close is not None else None, }) self.one_minute_by_5m = bucket total_rows = sum(len(v) for v in bucket.values()) logger.info(f"加载1分钟数据: {total_rows} 根 | 5分钟桶: {len(bucket)}") def should_close_half_by_1m_trend(self, five_min_ts, middle_price, side): """ 1分钟顺序确认中轨平半: - 多仓:先到中轨上方,再回踩中轨 - 空仓:先到中轨下方,再反抽中轨 """ minute_rows = self.one_minute_by_5m.get(five_min_ts, []) if not minute_rows: return False, None seen_above_middle = False seen_below_middle = False for minute in minute_rows: m_open = minute['open'] m_high = minute['high'] m_low = minute['low'] m_close = minute['close'] if None in (m_open, m_high, m_low, m_close): continue minute_time = pd.to_datetime(minute['timestamp'], unit='ms').strftime('%H:%M') if side == 'long': # 当前1m开在中轨上方且回踩到中轨,视为有效回踩 if m_open >= middle_price and m_low <= middle_price: return True, f"1m({minute_time})回踩中轨" # 已经到过中轨上方后,再次触及中轨 if seen_above_middle and m_low <= middle_price: return True, f"1m({minute_time})回踩中轨" if m_high >= middle_price: seen_above_middle = True else: # short # 当前1m开在中轨下方且反抽到中轨,视为有效反抽 if m_open <= middle_price and m_high >= middle_price: return True, f"1m({minute_time})反抽中轨" # 已经到过中轨下方后,再次触及中轨 if seen_below_middle and m_high >= middle_price: return True, f"1m({minute_time})反抽中轨" if m_low <= middle_price: seen_below_middle = True return False, None def get_touch_entry_price(self, five_min_ts, direction, touch_price): """ 触轨开仓的保守成交价: - 在该5分钟内找到首个触轨1m - 用该1m收盘价作为基准 - 为避免“过于理想”,不允许优于触轨价,再叠加不利滑点 """ minute_rows = self.one_minute_by_5m.get(five_min_ts, []) if not minute_rows: return self.apply_entry_slippage(touch_price, direction) trigger_close = None for minute in minute_rows: m_high = minute['high'] m_low = minute['low'] m_close = minute['close'] if None in (m_high, m_low, m_close): continue if direction == 'short' and m_high >= touch_price: trigger_close = m_close break if direction == 'long' and m_low <= touch_price: trigger_close = m_close break if trigger_close is None: base_price = touch_price elif direction == 'long': base_price = max(touch_price, trigger_close) else: base_price = min(touch_price, trigger_close) return self.apply_entry_slippage(base_price, direction) def clear_delay_reversal(self): """清理延迟反转状态""" self.delay_reverse_price = None self.delay_reverse_type = None self.delay_reverse_kline_index = None def mark_delay_reversal(self, reverse_type, trigger_price, kline_index, timestamp): """记录延迟反转触发信息""" self.delay_reverse_type = reverse_type self.delay_reverse_price = trigger_price self.delay_reverse_kline_index = kline_index if reverse_type == 'long_to_short': logger.info(f"[{timestamp}] 多仓触上轨 @ {trigger_price:.2f},进入延迟反转") else: logger.info(f"[{timestamp}] 空仓触下轨 @ {trigger_price:.2f},进入延迟反转") def reverse_position(self, price, new_direction, timestamp, reason): """全平后反向开仓""" if self.position == 0: self.clear_delay_reversal() return False close_side = "多" if self.position > 0 else "空" open_side = "多" if new_direction == "long" else "空" self.close_position(price, 1.0, timestamp, f"{reason}-平{close_side}") open_price = self.apply_entry_slippage(price, new_direction) self.open_position(open_price, new_direction, timestamp, f"{reason}-开{open_side}") self.mid_closed_half = False self.clear_delay_reversal() return True def check_delay_reversal_signal(self, i, row, prev_row): """检查延迟反转是否在当前收盘K成立(仅返回信号,不直接执行)""" if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None: return None offset = i - self.delay_reverse_kline_index # 禁止同K确认,最早次K确认 if offset <= 0: return None high = row['high'] low = row['low'] if self.delay_reverse_type == 'long_to_short': # 情况1:触上轨后,次K回调到记录上轨价 if offset == 1 and low <= self.delay_reverse_price: return 'short', "延迟反转-次K回调确认", self.delay_reverse_price # 情况2:持续等待,动态追踪上一根K线条件 if offset >= 2 and prev_row is not None: prev_upper = prev_row['upper'] prev_touch_upper = pd.notna(prev_upper) and prev_row['high'] >= prev_upper if prev_touch_upper: if low <= prev_upper: return 'short', "延迟反转-上一根触上轨后回调确认", prev_upper else: prev_body_low = min(prev_row['open'], prev_row['close']) if low <= prev_body_low: return 'short', "延迟反转-跌破上一根实体确认", prev_body_low elif self.delay_reverse_type == 'short_to_long': # 情况1:触下轨后,次K反弹到记录下轨价 if offset == 1 and high >= self.delay_reverse_price: return 'long', "延迟反转-次K反弹确认", self.delay_reverse_price # 情况2:持续等待,动态追踪上一根K线条件 if offset >= 2 and prev_row is not None: prev_lower = prev_row['lower'] prev_touch_lower = pd.notna(prev_lower) and prev_row['low'] <= prev_lower if prev_touch_lower: if high >= prev_lower: return 'long', "延迟反转-上一根触下轨后反弹确认", prev_lower else: prev_body_high = max(prev_row['open'], prev_row['close']) if high >= prev_body_high: return 'long', "延迟反转-突破上一根实体确认", prev_body_high return None def open_position(self, price, direction, timestamp, reason): """开仓或加仓""" if self.position_count not in (0, 1): return False if self.position_count == 1: current_direction = 'long' if self.position > 0 else 'short' if direction != current_direction: logger.warning("加仓方向不一致,跳过") return False margin_ratio = self.margin_ratio_1 if self.position_count == 0 else self.margin_ratio_2 margin = self.capital * margin_ratio if margin <= 0: logger.warning(f"[{timestamp}] 资金不足,无法开仓 | 可用资金: {self.capital:.4f}U") return False position_size = margin * self.leverage / price fee = position_size * price * self.fee_rate required = margin + fee if self.capital < required: logger.warning( f"[{timestamp}] 可用资金不足,无法开仓 | 需要: {required:.4f}U | 可用: {self.capital:.4f}U" ) return False # 冻结保证金并扣除手续费 self.capital -= required self.schedule_rebate(fee, timestamp) if self.position_count == 0: self.position = position_size if direction == 'long' else -position_size self.entry_price = price self.total_margin = margin self.position_count = 1 action = f'开{direction}' else: old_size = abs(self.position) new_size = old_size + position_size old_value = old_size * self.entry_price new_value = position_size * price self.entry_price = (old_value + new_value) / new_size self.position = new_size if direction == 'long' else -new_size self.total_margin += margin self.position_count = 2 action = f'加{direction}' self.mid_closed_half = False self.trades.append({ 'timestamp': timestamp, 'action': action, 'price': price, 'size': position_size, 'margin': margin, 'fee': fee, 'rebate': self.get_rebate_amount(fee), 'capital': self.capital, 'reason': reason }) logger.info( f"[{timestamp}] {action} @ {price:.2f} | 仓位: {position_size:.4f} | " f"保证金: {margin:.4f}U | 手续费: {fee:.4f}U | 返佣待到账: {self.get_rebate_amount(fee):.4f}U | " f"可用资金: {self.capital:.4f}U | {reason}" ) return True def close_position(self, price, ratio, timestamp, reason): """平仓""" if self.position == 0: return False ratio = min(max(ratio, 0.0), 1.0) if ratio == 0: return False close_size = abs(self.position) * ratio if self.position > 0: pnl = close_size * (price - self.entry_price) else: pnl = close_size * (self.entry_price - price) fee = close_size * price * self.fee_rate released_margin = self.total_margin * ratio self.capital += released_margin + pnl - fee self.schedule_rebate(fee, timestamp) if ratio >= 0.999: self.position = 0 self.position_count = 0 self.total_margin = 0 self.entry_price = 0 self.mid_closed_half = False self.clear_delay_reversal() else: self.position *= (1 - ratio) self.total_margin *= (1 - ratio) if abs(self.position) < 1e-12: self.position = 0 self.position_count = 0 self.total_margin = 0 self.entry_price = 0 self.mid_closed_half = False self.clear_delay_reversal() self.trades.append({ 'timestamp': timestamp, 'action': f'平仓{int(ratio*100)}%', 'price': price, 'size': close_size, 'pnl': pnl, 'fee': fee, 'rebate': self.get_rebate_amount(fee), 'capital': self.capital, 'reason': reason }) logger.info(f"[{timestamp}] 平仓{int(ratio*100)}% @ {price:.2f} | " f"盈亏: {pnl:.4f}U | 手续费: {fee:.4f}U | 返佣待到账: {self.get_rebate_amount(fee):.4f}U | " f"可用资金: {self.capital:.4f}U | {reason}") return True def check_stop_loss(self, high, low): """检查当前收盘K是否触发止损信号""" if self.position == 0: return False stop_price = low if self.position > 0 else high if self.position > 0: unrealized_pnl = abs(self.position) * (stop_price - self.entry_price) else: unrealized_pnl = abs(self.position) * (self.entry_price - stop_price) return unrealized_pnl <= -self.total_margin * self.stop_loss_ratio def run_backtest(self, start_date, end_date): """运行回测(开仓当K触发,平仓/止损仍按下一根K开盘执行)""" # 重置状态,支持同一实例重复回测 self.capital = self.initial_capital self.position = 0 self.position_count = 0 self.entry_price = 0 self.total_margin = 0 self.mid_closed_half = False self.trades = [] self.daily_pnl = [] self.pending_rebates = [] self.total_rebate_credited = 0.0 self.clear_delay_reversal() logger.info(f"{'='*80}") logger.info(f"开始回测: {start_date} ~ {end_date}") logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})") logger.info(f"{'='*80}") # 从数据库加载数据 start_dt = pd.Timestamp(start_date) end_dt = pd.Timestamp(end_date) if isinstance(end_date, str) and len(end_date) <= 10: end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1) self.current_run_label = f"{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}" start_ts = int(start_dt.timestamp() * 1000) end_ts = int(end_dt.timestamp() * 1000) query = BitMartETH5M.select().where( (BitMartETH5M.id >= start_ts) & (BitMartETH5M.id <= end_ts) ).order_by(BitMartETH5M.id) data = [] for row in query: data.append({ 'timestamp': row.id, 'open': row.open, 'high': row.high, 'low': row.low, 'close': row.close }) if not data: logger.error("没有找到数据!") return None df = pd.DataFrame(data) df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms') logger.info(f"加载数据: {len(df)} 根K线") logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}") # 计算布林带 df = self.calculate_bollinger_bands(df) if len(df) <= self.bb_period + 1: logger.error("数据不足,无法执行回测") return None # 逐根K线回测(开仓当K触发,平仓/止损下一K开盘执行) for i in range(self.bb_period, len(df) - 1): row = df.iloc[i] prev_row = df.iloc[i-1] if i > 0 else None next_row = df.iloc[i + 1] signal_dt = row['datetime'] signal_ts = signal_dt.strftime('%Y-%m-%d %H:%M') five_min_ts = int(row['timestamp']) execute_ts = next_row['datetime'].strftime('%Y-%m-%d %H:%M') next_open = float(next_row['open']) if pd.notna(next_row['open']) else None high = row['high'] low = row['low'] upper = row['upper'] lower = row['lower'] middle = row['middle'] # 先处理当前收盘时刻返佣到账 self.apply_pending_rebates(signal_dt) if pd.isna(upper) or pd.isna(lower) or pd.isna(middle): continue if next_open is None: continue # 检查止损(收盘确认,下一K开盘平仓) if self.check_stop_loss(high, low): logger.warning(f"[{signal_ts}] 触发止损信号,下一K开盘执行") self.close_position(next_open, 1.0, execute_ts, f"止损-收盘确认({signal_ts})") continue # 已处于延迟反转状态时,先检查确认逻辑 if self.delay_reverse_price is not None: reversal_signal = self.check_delay_reversal_signal(i, row, prev_row) if reversal_signal is not None and self.position != 0: new_direction, reason, reversal_price = reversal_signal if reversal_price is None or pd.isna(reversal_price): reversal_price = float(row['close']) close_side = "多" if self.position > 0 else "空" open_side = "多" if new_direction == 'long' else "空" self.close_position( reversal_price, 1.0, signal_ts, f"{reason}-当K确认({signal_ts})-平{close_side}" ) open_price = self.apply_entry_slippage(reversal_price, new_direction) self.open_position( open_price, new_direction, signal_ts, f"{reason}-当K确认({signal_ts})-开{open_side}" ) self.mid_closed_half = False self.clear_delay_reversal() continue # === 中轨平仓逻辑 === if self.position != 0: had_mid_closed_half = self.mid_closed_half if self.position > 0: # 多仓 # 回到开仓价全平+反手(仅在此前已平半的前提下触发) if had_mid_closed_half and low <= self.entry_price: self.close_position(next_open, 1.0, execute_ts, f"回开仓价全平-收盘确认({signal_ts})") entry_price = self.apply_entry_slippage(next_open, 'short') self.open_position(entry_price, 'short', execute_ts, f"回开仓价反手开空-收盘确认({signal_ts})") self.mid_closed_half = False continue # 触中轨平半(收盘确认) if not had_mid_closed_half and low <= middle <= high: self.close_position(next_open, 0.5, execute_ts, f"触中轨平50%-收盘确认({signal_ts})") self.mid_closed_half = True continue else: # 空仓 # 回到开仓价全平+反手(仅在此前已平半的前提下触发) if had_mid_closed_half and high >= self.entry_price: self.close_position(next_open, 1.0, execute_ts, f"回开仓价全平-收盘确认({signal_ts})") entry_price = self.apply_entry_slippage(next_open, 'long') self.open_position(entry_price, 'long', execute_ts, f"回开仓价反手开多-收盘确认({signal_ts})") self.mid_closed_half = False continue # 触中轨平半(收盘确认) if not had_mid_closed_half and low <= middle <= high: self.close_position(next_open, 0.5, execute_ts, f"触中轨平50%-收盘确认({signal_ts})") self.mid_closed_half = True continue # === 开仓与加仓逻辑 === if self.position == 0: # 空仓 self.clear_delay_reversal() # 触上轨开空 if high >= upper: entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper) self.open_position(entry_price, 'short', signal_ts, f"触上轨开空-当K触发({signal_ts})") # 触下轨开多 elif low <= lower: entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower) self.open_position(entry_price, 'long', signal_ts, f"触下轨开多-当K触发({signal_ts})") continue # 有持仓:先检查是否触发延迟反转(核心) if self.position > 0 and high >= upper: self.mark_delay_reversal('long_to_short', upper, i, signal_ts) continue elif self.position < 0 and low <= lower: self.mark_delay_reversal('short_to_long', lower, i, signal_ts) continue # 进入延迟反转等待后,不再执行加仓 if self.delay_reverse_price is not None: continue # 同向加仓最多1次 if self.position_count == 1: if self.position > 0 and low <= lower: entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower) self.open_position(entry_price, 'long', signal_ts, f"触下轨加多-当K触发({signal_ts})") elif self.position < 0 and high >= upper: entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper) self.open_position(entry_price, 'short', signal_ts, f"触上轨加空-当K触发({signal_ts})") # 回测末尾再处理一次返佣到账 self.apply_pending_rebates(df.iloc[-1]['datetime']) # 最后平仓 if self.position != 0: final_price = df.iloc[-1]['close'] final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M') self.close_position(final_price, 1.0, final_time, "回测结束平仓") # 生成报告 return self.generate_report(df) def generate_report(self, df): """生成回测报告""" logger.info(f"\n{'='*80}") logger.info("回测报告") logger.info(f"{'='*80}") # 基本统计 total_trades = len([t for t in self.trades if '开' in t['action']]) win_trades = len([t for t in self.trades if '平' in t['action'] and t.get('pnl', 0) > 0]) loss_trades = len([t for t in self.trades if '平' in t['action'] and t.get('pnl', 0) < 0]) total_pnl = sum([t.get('pnl', 0) for t in self.trades]) total_fee = sum([t.get('fee', 0) for t in self.trades]) total_rebate_expected = sum([t.get('rebate', 0) for t in self.trades if t.get('fee', 0) > 0]) pending_rebate = sum([x['amount'] for x in self.pending_rebates]) realized_net_fee = total_fee - self.total_rebate_credited final_capital = self.capital roi = (final_capital - self.initial_capital) / self.initial_capital * 100 logger.info(f"初始资金: {self.initial_capital:.2f}U") logger.info(f"最终资金: {final_capital:.2f}U") logger.info(f"总盈亏: {total_pnl:.2f}U") logger.info(f"总手续费(开平全额): {total_fee:.2f}U") logger.info(f"返佣应返总额: {total_rebate_expected:.2f}U") logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U") logger.info(f"返佣待到账: {pending_rebate:.2f}U") logger.info(f"已实现净手续费: {realized_net_fee:.2f}U") logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U") logger.info(f"收益率: {roi:.2f}%") logger.info(f"总交易次数: {total_trades}") logger.info(f"盈利次数: {win_trades}") logger.info(f"亏损次数: {loss_trades}") if win_trades + loss_trades > 0: logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%") # 保存交易记录 trades_df = pd.DataFrame(self.trades) output_dir = Path(__file__).parent / 'backtest_outputs' / 'trades' output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f'bb_backtest_{self.current_run_label}_trades.csv' trades_df.to_csv(output_file, index=False, encoding='utf-8-sig') logger.info(f"\n交易记录已保存到: {output_file}") return { 'initial_capital': self.initial_capital, 'final_capital': final_capital, 'total_pnl': total_pnl, 'total_fee': total_fee, 'total_rebate_expected': total_rebate_expected, 'total_rebate_credited': self.total_rebate_credited, 'pending_rebate': pending_rebate, 'realized_net_fee': realized_net_fee, 'roi': roi, 'total_trades': total_trades, 'win_trades': win_trades, 'loss_trades': loss_trades, 'trades_file': str(output_file), 'trades': self.trades } if __name__ == '__main__': # 连接数据库 db.connect(reuse_if_open=True) try: # 创建回测实例 backtest = BollingerBandBacktest() # 运行回测(2026年全年) result = backtest.run_backtest('2026-01-01', '2026-12-31') if result: logger.success(f"\n回测完成!最终收益率: {result['roi']:.2f}%") finally: db.close()