Files
codex_jxs_code/bb_backtest_march_2026.py
2026-03-04 18:02:02 +08:00

721 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带延迟反转策略回测 - 2026年3月
策略规则:
1. 5分钟K线BB(10, 2.5)
2. 空仓触上轨开空,触下轨开多
3. 同向加仓最多1次保证金递增(1%->2%)
4. 延迟反转:触轨不立刻平仓,记录价格,回调到该价再平仓+反向开仓
5. 中轨平半+回开仓价全平
6. 止损亏损达保证金50%
"""
import pandas as pd
from pathlib import Path
from peewee import *
from loguru import logger
# 数据库配置
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
class BitMartETH5M(Model):
"""5分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
class BitMartETH1M(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BollingerBandBacktest:
"""布林带延迟反转策略回测"""
def __init__(self):
# 策略参数
self.bb_period = 10
self.bb_std = 2.5
self.initial_capital = 100 # 初始本金100U
self.leverage = 50 # 50倍杠杆
self.fee_rate = 0.0005 # 万五手续费
self.rebate_rate = 0.9 # 90%返佣
self.margin_ratio_1 = 0.01 # 首次开仓保证金比例1%
self.margin_ratio_2 = 0.01 # 加仓保证金比例1%
self.stop_loss_ratio = 0.5 # 止损比例50%
self.entry_slippage = 0.0002 # 开仓滑点2bps
self.rebate_credit_hour = 8 # 次日早上8点返佣到账上海时间
# 账户状态
self.capital = self.initial_capital
self.position = 0 # 持仓量(正=多,负=空)
self.position_count = 0 # 持仓次数0=空仓1=首次2=加仓)
self.entry_price = 0 # 开仓均价
self.total_margin = 0 # 总保证金
# 延迟反转状态
self.delay_reverse_price = None # 记录触碰轨道时的轨道价格
self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long'
self.delay_reverse_kline_index = None # 触发延迟反转的K线索引
# 中轨平仓记录
self.mid_closed_half = False # 是否已平50%
# 1分钟K线缓存key=5分钟起始时间戳, value=该5分钟内的1m列表
self.one_minute_by_5m = {}
# 交易记录
self.trades = []
self.daily_pnl = []
self.pending_rebates = [] # 待到账返佣队列
self.total_rebate_credited = 0.0
self.current_run_label = "default"
def calculate_bollinger_bands(self, df):
"""计算布林带"""
df['sma'] = df['close'].rolling(window=self.bb_period).mean()
df['std'] = df['close'].rolling(window=self.bb_period).std()
df['upper'] = df['sma'] + self.bb_std * df['std']
df['lower'] = df['sma'] - self.bb_std * df['std']
df['middle'] = df['sma']
return df
def get_rebate_amount(self, fee):
"""计算返佣金额"""
return fee * self.rebate_rate
def schedule_rebate(self, fee, timestamp):
"""登记返佣到账时间次日08:00上海时间"""
rebate = self.get_rebate_amount(fee)
if rebate <= 0:
return
trade_utc = pd.Timestamp(timestamp, tz='UTC')
trade_local = trade_utc.tz_convert('Asia/Shanghai')
credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour)
credit_utc = credit_local.tz_convert('UTC').tz_localize(None)
self.pending_rebates.append({
'credit_time': credit_utc,
'amount': rebate,
'trade_time': trade_utc.tz_localize(None),
})
def apply_pending_rebates(self, current_time):
"""处理当前时刻前应到账的返佣"""
if not self.pending_rebates:
return
remaining = []
for item in self.pending_rebates:
if item['credit_time'] <= current_time:
amount = item['amount']
self.capital += amount
self.total_rebate_credited += amount
self.trades.append({
'timestamp': current_time.strftime('%Y-%m-%d %H:%M'),
'action': '返佣到账',
'price': None,
'size': None,
'rebate': amount,
'capital': self.capital,
'reason': f"次日08:00返佣到账({item['trade_time'].strftime('%Y-%m-%d %H:%M')}手续费)"
})
logger.info(
f"[{current_time.strftime('%Y-%m-%d %H:%M')}] 返佣到账: {amount:.4f}U | 可用资金: {self.capital:.4f}U"
)
else:
remaining.append(item)
self.pending_rebates = remaining
def apply_entry_slippage(self, price, direction):
"""按方向施加不利滑点"""
if direction == 'long':
return price * (1 + self.entry_slippage)
return price * (1 - self.entry_slippage)
def load_one_minute_cache(self, start_ts, end_ts):
"""加载并缓存1分钟K线用于5分钟内走势确认"""
query = BitMartETH1M.select().where(
(BitMartETH1M.id >= start_ts) & (BitMartETH1M.id <= end_ts)
).order_by(BitMartETH1M.id)
bucket = {}
for row in query:
five_min_start = int(row.id - (row.id % 300000))
bucket.setdefault(five_min_start, []).append({
'timestamp': int(row.id),
'open': float(row.open) if row.open is not None else None,
'high': float(row.high) if row.high is not None else None,
'low': float(row.low) if row.low is not None else None,
'close': float(row.close) if row.close is not None else None,
})
self.one_minute_by_5m = bucket
total_rows = sum(len(v) for v in bucket.values())
logger.info(f"加载1分钟数据: {total_rows} 根 | 5分钟桶: {len(bucket)}")
def should_close_half_by_1m_trend(self, five_min_ts, middle_price, side):
"""
1分钟顺序确认中轨平半
- 多仓:先到中轨上方,再回踩中轨
- 空仓:先到中轨下方,再反抽中轨
"""
minute_rows = self.one_minute_by_5m.get(five_min_ts, [])
if not minute_rows:
return False, None
seen_above_middle = False
seen_below_middle = False
for minute in minute_rows:
m_open = minute['open']
m_high = minute['high']
m_low = minute['low']
m_close = minute['close']
if None in (m_open, m_high, m_low, m_close):
continue
minute_time = pd.to_datetime(minute['timestamp'], unit='ms').strftime('%H:%M')
if side == 'long':
# 当前1m开在中轨上方且回踩到中轨视为有效回踩
if m_open >= middle_price and m_low <= middle_price:
return True, f"1m({minute_time})回踩中轨"
# 已经到过中轨上方后,再次触及中轨
if seen_above_middle and m_low <= middle_price:
return True, f"1m({minute_time})回踩中轨"
if m_high >= middle_price:
seen_above_middle = True
else: # short
# 当前1m开在中轨下方且反抽到中轨视为有效反抽
if m_open <= middle_price and m_high >= middle_price:
return True, f"1m({minute_time})反抽中轨"
# 已经到过中轨下方后,再次触及中轨
if seen_below_middle and m_high >= middle_price:
return True, f"1m({minute_time})反抽中轨"
if m_low <= middle_price:
seen_below_middle = True
return False, None
def get_touch_entry_price(self, five_min_ts, direction, touch_price):
"""
触轨开仓的保守成交价:
- 在该5分钟内找到首个触轨1m
- 用该1m收盘价作为基准
- 为避免“过于理想”,不允许优于触轨价,再叠加不利滑点
"""
minute_rows = self.one_minute_by_5m.get(five_min_ts, [])
if not minute_rows:
return self.apply_entry_slippage(touch_price, direction)
trigger_close = None
for minute in minute_rows:
m_high = minute['high']
m_low = minute['low']
m_close = minute['close']
if None in (m_high, m_low, m_close):
continue
if direction == 'short' and m_high >= touch_price:
trigger_close = m_close
break
if direction == 'long' and m_low <= touch_price:
trigger_close = m_close
break
if trigger_close is None:
base_price = touch_price
elif direction == 'long':
base_price = max(touch_price, trigger_close)
else:
base_price = min(touch_price, trigger_close)
return self.apply_entry_slippage(base_price, direction)
def clear_delay_reversal(self):
"""清理延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
def mark_delay_reversal(self, reverse_type, trigger_price, kline_index, timestamp):
"""记录延迟反转触发信息"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_index = kline_index
if reverse_type == 'long_to_short':
logger.info(f"[{timestamp}] 多仓触上轨 @ {trigger_price:.2f},进入延迟反转")
else:
logger.info(f"[{timestamp}] 空仓触下轨 @ {trigger_price:.2f},进入延迟反转")
def reverse_position(self, price, new_direction, timestamp, reason):
"""全平后反向开仓"""
if self.position == 0:
self.clear_delay_reversal()
return False
close_side = "" if self.position > 0 else ""
open_side = "" if new_direction == "long" else ""
self.close_position(price, 1.0, timestamp, f"{reason}-平{close_side}")
open_price = self.apply_entry_slippage(price, new_direction)
self.open_position(open_price, new_direction, timestamp, f"{reason}-开{open_side}")
self.mid_closed_half = False
self.clear_delay_reversal()
return True
def check_and_execute_delay_reversal(self, i, row, prev_row, timestamp):
"""执行延迟反转三段判断逻辑"""
if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None:
return False
offset = i - self.delay_reverse_kline_index
high = row['high']
low = row['low']
if self.delay_reverse_type == 'long_to_short':
# 情况1/2触上轨后同K或下一K回调到记录上轨价
if offset <= 1 and low <= self.delay_reverse_price:
tag = "同K回调确认" if offset == 0 else "次K回调确认"
return self.reverse_position(self.delay_reverse_price, 'short', timestamp, f"延迟反转-{tag}")
# 情况3持续等待动态追踪上一根K线条件
if offset >= 2 and prev_row is not None:
prev_touch_upper = prev_row['high'] >= prev_row['upper']
if prev_touch_upper:
ref_price = prev_row['upper']
if low <= ref_price:
return self.reverse_position(ref_price, 'short', timestamp, "延迟反转-上一根触上轨后回调确认")
else:
prev_body_low = min(prev_row['open'], prev_row['close'])
if low <= prev_body_low:
return self.reverse_position(prev_body_low, 'short', timestamp, "延迟反转-跌破上一根实体确认")
elif self.delay_reverse_type == 'short_to_long':
# 情况1/2触下轨后同K或下一K反弹到记录下轨价
if offset <= 1 and high >= self.delay_reverse_price:
tag = "同K反弹确认" if offset == 0 else "次K反弹确认"
return self.reverse_position(self.delay_reverse_price, 'long', timestamp, f"延迟反转-{tag}")
# 情况3持续等待动态追踪上一根K线条件
if offset >= 2 and prev_row is not None:
prev_touch_lower = prev_row['low'] <= prev_row['lower']
if prev_touch_lower:
ref_price = prev_row['lower']
if high >= ref_price:
return self.reverse_position(ref_price, 'long', timestamp, "延迟反转-上一根触下轨后反弹确认")
else:
prev_body_high = max(prev_row['open'], prev_row['close'])
if high >= prev_body_high:
return self.reverse_position(prev_body_high, 'long', timestamp, "延迟反转-突破上一根实体确认")
return False
def open_position(self, price, direction, timestamp, reason):
"""开仓或加仓"""
if self.position_count not in (0, 1):
return False
if self.position_count == 1:
current_direction = 'long' if self.position > 0 else 'short'
if direction != current_direction:
logger.warning("加仓方向不一致,跳过")
return False
margin_ratio = self.margin_ratio_1 if self.position_count == 0 else self.margin_ratio_2
margin = self.capital * margin_ratio
if margin <= 0:
logger.warning(f"[{timestamp}] 资金不足,无法开仓 | 可用资金: {self.capital:.4f}U")
return False
position_size = margin * self.leverage / price
fee = position_size * price * self.fee_rate
required = margin + fee
if self.capital < required:
logger.warning(
f"[{timestamp}] 可用资金不足,无法开仓 | 需要: {required:.4f}U | 可用: {self.capital:.4f}U"
)
return False
# 冻结保证金并扣除手续费
self.capital -= required
self.schedule_rebate(fee, timestamp)
if self.position_count == 0:
self.position = position_size if direction == 'long' else -position_size
self.entry_price = price
self.total_margin = margin
self.position_count = 1
action = f'{direction}'
else:
old_size = abs(self.position)
new_size = old_size + position_size
old_value = old_size * self.entry_price
new_value = position_size * price
self.entry_price = (old_value + new_value) / new_size
self.position = new_size if direction == 'long' else -new_size
self.total_margin += margin
self.position_count = 2
action = f'{direction}'
self.mid_closed_half = False
self.trades.append({
'timestamp': timestamp,
'action': action,
'price': price,
'size': position_size,
'margin': margin,
'fee': fee,
'rebate': self.get_rebate_amount(fee),
'capital': self.capital,
'reason': reason
})
logger.info(
f"[{timestamp}] {action} @ {price:.2f} | 仓位: {position_size:.4f} | "
f"保证金: {margin:.4f}U | 手续费: {fee:.4f}U | 返佣待到账: {self.get_rebate_amount(fee):.4f}U | "
f"可用资金: {self.capital:.4f}U | {reason}"
)
return True
def close_position(self, price, ratio, timestamp, reason):
"""平仓"""
if self.position == 0:
return False
ratio = min(max(ratio, 0.0), 1.0)
if ratio == 0:
return False
close_size = abs(self.position) * ratio
if self.position > 0:
pnl = close_size * (price - self.entry_price)
else:
pnl = close_size * (self.entry_price - price)
fee = close_size * price * self.fee_rate
released_margin = self.total_margin * ratio
self.capital += released_margin + pnl - fee
self.schedule_rebate(fee, timestamp)
if ratio >= 0.999:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
else:
self.position *= (1 - ratio)
self.total_margin *= (1 - ratio)
if abs(self.position) < 1e-12:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
self.trades.append({
'timestamp': timestamp,
'action': f'平仓{int(ratio*100)}%',
'price': price,
'size': close_size,
'pnl': pnl,
'fee': fee,
'rebate': self.get_rebate_amount(fee),
'capital': self.capital,
'reason': reason
})
logger.info(f"[{timestamp}] 平仓{int(ratio*100)}% @ {price:.2f} | "
f"盈亏: {pnl:.4f}U | 手续费: {fee:.4f}U | 返佣待到账: {self.get_rebate_amount(fee):.4f}U | "
f"可用资金: {self.capital:.4f}U | {reason}")
return True
def check_stop_loss(self, high, low, timestamp):
"""检查止损"""
if self.position == 0:
return False
stop_price = low if self.position > 0 else high
if self.position > 0:
unrealized_pnl = abs(self.position) * (stop_price - self.entry_price)
else:
unrealized_pnl = abs(self.position) * (self.entry_price - stop_price)
# 止损条件亏损达到保证金的50%
if unrealized_pnl <= -self.total_margin * self.stop_loss_ratio:
logger.warning(f"触发止损!浮亏: {unrealized_pnl:.2f}U | 保证金: {self.total_margin:.2f}U")
self.close_position(stop_price, 1.0, timestamp, "止损")
return True
return False
def run_backtest(self, start_date, end_date):
"""运行回测"""
# 重置状态,支持同一实例重复回测
self.capital = self.initial_capital
self.position = 0
self.position_count = 0
self.entry_price = 0
self.total_margin = 0
self.mid_closed_half = False
self.trades = []
self.daily_pnl = []
self.pending_rebates = []
self.total_rebate_credited = 0.0
self.clear_delay_reversal()
logger.info(f"{'='*80}")
logger.info(f"开始回测: {start_date} ~ {end_date}")
logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})")
logger.info(f"{'='*80}")
# 从数据库加载数据
start_dt = pd.Timestamp(start_date)
end_dt = pd.Timestamp(end_date)
if isinstance(end_date, str) and len(end_date) <= 10:
end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
self.current_run_label = f"{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}"
start_ts = int(start_dt.timestamp() * 1000)
end_ts = int(end_dt.timestamp() * 1000)
query = BitMartETH5M.select().where(
(BitMartETH5M.id >= start_ts) & (BitMartETH5M.id <= end_ts)
).order_by(BitMartETH5M.id)
data = []
for row in query:
data.append({
'timestamp': row.id,
'open': row.open,
'high': row.high,
'low': row.low,
'close': row.close
})
if not data:
logger.error("没有找到数据!")
return None
df = pd.DataFrame(data)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.info(f"加载数据: {len(df)} 根K线")
logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}")
# 加载对应时间范围的1分钟数据用于中轨平半确认
one_min_start = int(df['timestamp'].min())
one_min_end = int(df['timestamp'].max()) + 300000
self.load_one_minute_cache(one_min_start, one_min_end)
# 计算布林带
df = self.calculate_bollinger_bands(df)
# 逐根K线回测
for i in range(self.bb_period, len(df)):
row = df.iloc[i]
prev_row = df.iloc[i-1] if i > 0 else None
timestamp = row['datetime'].strftime('%Y-%m-%d %H:%M')
five_min_ts = int(row['timestamp'])
high = row['high']
low = row['low']
upper = row['upper']
lower = row['lower']
middle = row['middle']
# 先处理当前时刻返佣到账
self.apply_pending_rebates(row['datetime'])
# 检查止损
if self.check_stop_loss(high, low, timestamp):
continue
# 已处于延迟反转状态时,先执行确认逻辑
if self.delay_reverse_price is not None:
if self.check_and_execute_delay_reversal(i, row, prev_row, timestamp):
continue
# === 中轨平仓逻辑 ===
if self.position != 0:
if self.position > 0: # 多仓
# 1m顺序确认先上方再回踩中轨平50%
if not self.mid_closed_half and low <= middle <= high:
should_close_half, reason_1m = self.should_close_half_by_1m_trend(
five_min_ts, middle, 'long'
)
if should_close_half:
self.close_position(middle, 0.5, timestamp, f"触中轨平50%-{reason_1m}")
self.mid_closed_half = True
# 继续回落到开仓均价全平
if self.position != 0 and self.mid_closed_half and low <= self.entry_price:
self.close_position(self.entry_price, 1.0, timestamp, "回开仓价全平")
else: # 空仓
# 1m顺序确认先下方再反抽中轨平50%
if not self.mid_closed_half and low <= middle <= high:
should_close_half, reason_1m = self.should_close_half_by_1m_trend(
five_min_ts, middle, 'short'
)
if should_close_half:
self.close_position(middle, 0.5, timestamp, f"触中轨平50%-{reason_1m}")
self.mid_closed_half = True
# 继续反弹到开仓均价全平
if self.position != 0 and self.mid_closed_half and high >= self.entry_price:
self.close_position(self.entry_price, 1.0, timestamp, "回开仓价全平")
# === 开仓与加仓逻辑 ===
if self.position == 0: # 空仓
self.clear_delay_reversal()
# 触上轨开空
if high >= upper:
entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper)
self.open_position(entry_price, 'short', timestamp, "触上轨开空")
# 触下轨开多
elif low <= lower:
entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower)
self.open_position(entry_price, 'long', timestamp, "触下轨开多")
continue
# 有持仓:先检查是否触发延迟反转(核心)
if self.position > 0 and high >= upper:
self.mark_delay_reversal('long_to_short', upper, i, timestamp)
if self.check_and_execute_delay_reversal(i, row, prev_row, timestamp):
continue
elif self.position < 0 and low <= lower:
self.mark_delay_reversal('short_to_long', lower, i, timestamp)
if self.check_and_execute_delay_reversal(i, row, prev_row, timestamp):
continue
# 进入延迟反转等待后,不再执行加仓
if self.delay_reverse_price is not None:
continue
# 同向加仓最多1次
if self.position_count == 1:
if self.position > 0 and low <= lower:
entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower)
self.open_position(entry_price, 'long', timestamp, "触下轨加多")
elif self.position < 0 and high >= upper:
entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper)
self.open_position(entry_price, 'short', timestamp, "触上轨加空")
# 最后平仓
if self.position != 0:
final_price = df.iloc[-1]['close']
final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M')
self.close_position(final_price, 1.0, final_time, "回测结束平仓")
# 生成报告
return self.generate_report(df)
def generate_report(self, df):
"""生成回测报告"""
logger.info(f"\n{'='*80}")
logger.info("回测报告")
logger.info(f"{'='*80}")
# 基本统计
total_trades = len([t for t in self.trades if '' in t['action']])
win_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) > 0])
loss_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) < 0])
total_pnl = sum([t.get('pnl', 0) for t in self.trades])
total_fee = sum([t.get('fee', 0) for t in self.trades])
total_rebate_expected = sum([t.get('rebate', 0) for t in self.trades if t.get('fee', 0) > 0])
pending_rebate = sum([x['amount'] for x in self.pending_rebates])
realized_net_fee = total_fee - self.total_rebate_credited
final_capital = self.capital
roi = (final_capital - self.initial_capital) / self.initial_capital * 100
logger.info(f"初始资金: {self.initial_capital:.2f}U")
logger.info(f"最终资金: {final_capital:.2f}U")
logger.info(f"总盈亏: {total_pnl:.2f}U")
logger.info(f"总手续费(开平全额): {total_fee:.2f}U")
logger.info(f"返佣应返总额: {total_rebate_expected:.2f}U")
logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U")
logger.info(f"返佣待到账: {pending_rebate:.2f}U")
logger.info(f"已实现净手续费: {realized_net_fee:.2f}U")
logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U")
logger.info(f"收益率: {roi:.2f}%")
logger.info(f"总交易次数: {total_trades}")
logger.info(f"盈利次数: {win_trades}")
logger.info(f"亏损次数: {loss_trades}")
if win_trades + loss_trades > 0:
logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%")
# 保存交易记录
trades_df = pd.DataFrame(self.trades)
output_file = f'bb_backtest_{self.current_run_label}_trades.csv'
trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"\n交易记录已保存到: {output_file}")
return {
'initial_capital': self.initial_capital,
'final_capital': final_capital,
'total_pnl': total_pnl,
'total_fee': total_fee,
'total_rebate_expected': total_rebate_expected,
'total_rebate_credited': self.total_rebate_credited,
'pending_rebate': pending_rebate,
'realized_net_fee': realized_net_fee,
'roi': roi,
'total_trades': total_trades,
'win_trades': win_trades,
'loss_trades': loss_trades,
'trades_file': output_file,
'trades': self.trades
}
if __name__ == '__main__':
# 连接数据库
db.connect(reuse_if_open=True)
try:
# 创建回测实例
backtest = BollingerBandBacktest()
# 运行回测2026年3月
result = backtest.run_backtest('2026-03-01', '2026-03-31')
if result:
logger.success(f"\n回测完成!最终收益率: {result['roi']:.2f}%")
finally:
db.close()