Files
codex_jxs_code/bb_backtest_march_2026.py
2026-03-05 12:51:12 +08:00

755 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带延迟反转策略回测 - 2026年3月
策略规则:
1. 5分钟K线BB(10, 2.5)
2. 空仓触上轨开空,触下轨开多
3. 同向加仓最多1次保证金递增(1%->2%)
4. 延迟反转:触轨不立刻平仓,记录价格,回调到该价再平仓+反向开仓
5. 中轨平半+回开仓价全平
6. 止损亏损达保证金50%
"""
import pandas as pd
from pathlib import Path
from peewee import *
from loguru import logger
# 数据库配置
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
class BitMartETH5M(Model):
"""5分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
class BitMartETH1M(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BollingerBandBacktest:
"""布林带延迟反转策略回测"""
def __init__(self):
# 策略参数
self.bb_period = 10
self.bb_std = 2.5
self.initial_capital = 100 # 初始本金100U
self.leverage = 50 # 50倍杠杆
self.fee_rate = 0.0005 # 万五手续费
self.rebate_rate = 0.9 # 90%返佣
self.margin_ratio_1 = 0.01 # 首次开仓保证金比例1%
self.margin_ratio_2 = 0.01 # 加仓保证金比例1%
self.stop_loss_ratio = 0.5 # 止损比例50%
self.entry_slippage = 0.0002 # 开仓滑点2bps
self.rebate_credit_hour = 8 # 次日早上8点返佣到账上海时间
# 账户状态
self.capital = self.initial_capital
self.position = 0 # 持仓量(正=多,负=空)
self.position_count = 0 # 持仓次数0=空仓1=首次2=加仓)
self.entry_price = 0 # 开仓均价
self.total_margin = 0 # 总保证金
# 延迟反转状态
self.delay_reverse_price = None # 记录触碰轨道时的轨道价格
self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long'
self.delay_reverse_kline_index = None # 触发延迟反转的K线索引
# 中轨平仓记录
self.mid_closed_half = False # 是否已平50%
# 1分钟K线缓存key=5分钟起始时间戳, value=该5分钟内的1m列表
self.one_minute_by_5m = {}
# 交易记录
self.trades = []
self.daily_pnl = []
self.pending_rebates = [] # 待到账返佣队列
self.total_rebate_credited = 0.0
self.current_run_label = "default"
def calculate_bollinger_bands(self, df):
"""计算布林带整体右移1根避免使用当前K收盘价"""
df['sma'] = df['close'].rolling(window=self.bb_period).mean()
df['std'] = df['close'].rolling(window=self.bb_period).std()
df['upper'] = (df['sma'] + self.bb_std * df['std']).shift(1)
df['lower'] = (df['sma'] - self.bb_std * df['std']).shift(1)
df['middle'] = df['sma'].shift(1)
return df
def get_rebate_amount(self, fee):
"""计算返佣金额"""
return fee * self.rebate_rate
def schedule_rebate(self, fee, timestamp):
"""登记返佣到账时间次日08:00上海时间"""
rebate = self.get_rebate_amount(fee)
if rebate <= 0:
return
trade_utc = pd.Timestamp(timestamp, tz='UTC')
trade_local = trade_utc.tz_convert('Asia/Shanghai')
credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour)
credit_utc = credit_local.tz_convert('UTC').tz_localize(None)
self.pending_rebates.append({
'credit_time': credit_utc,
'amount': rebate,
'trade_time': trade_utc.tz_localize(None),
})
def apply_pending_rebates(self, current_time):
"""处理当前时刻前应到账的返佣"""
if not self.pending_rebates:
return
remaining = []
for item in self.pending_rebates:
if item['credit_time'] <= current_time:
amount = item['amount']
self.capital += amount
self.total_rebate_credited += amount
self.trades.append({
'timestamp': current_time.strftime('%Y-%m-%d %H:%M'),
'action': '返佣到账',
'price': None,
'size': None,
'rebate': amount,
'capital': self.capital,
'reason': f"次日08:00返佣到账({item['trade_time'].strftime('%Y-%m-%d %H:%M')}手续费)"
})
logger.info(
f"[{current_time.strftime('%Y-%m-%d %H:%M')}] 返佣到账: {amount:.4f}U | 可用资金: {self.capital:.4f}U"
)
else:
remaining.append(item)
self.pending_rebates = remaining
def apply_entry_slippage(self, price, direction):
"""按方向施加不利滑点"""
if direction == 'long':
return price * (1 + self.entry_slippage)
return price * (1 - self.entry_slippage)
def load_one_minute_cache(self, start_ts, end_ts):
"""加载并缓存1分钟K线用于5分钟内走势确认"""
query = BitMartETH1M.select().where(
(BitMartETH1M.id >= start_ts) & (BitMartETH1M.id <= end_ts)
).order_by(BitMartETH1M.id)
bucket = {}
for row in query:
five_min_start = int(row.id - (row.id % 300000))
bucket.setdefault(five_min_start, []).append({
'timestamp': int(row.id),
'open': float(row.open) if row.open is not None else None,
'high': float(row.high) if row.high is not None else None,
'low': float(row.low) if row.low is not None else None,
'close': float(row.close) if row.close is not None else None,
})
self.one_minute_by_5m = bucket
total_rows = sum(len(v) for v in bucket.values())
logger.info(f"加载1分钟数据: {total_rows} 根 | 5分钟桶: {len(bucket)}")
def should_close_half_by_1m_trend(self, five_min_ts, middle_price, side):
"""
1分钟顺序确认中轨平半
- 多仓:先到中轨上方,再回踩中轨
- 空仓:先到中轨下方,再反抽中轨
"""
minute_rows = self.one_minute_by_5m.get(five_min_ts, [])
if not minute_rows:
return False, None
seen_above_middle = False
seen_below_middle = False
for minute in minute_rows:
m_open = minute['open']
m_high = minute['high']
m_low = minute['low']
m_close = minute['close']
if None in (m_open, m_high, m_low, m_close):
continue
minute_time = pd.to_datetime(minute['timestamp'], unit='ms').strftime('%H:%M')
if side == 'long':
# 当前1m开在中轨上方且回踩到中轨视为有效回踩
if m_open >= middle_price and m_low <= middle_price:
return True, f"1m({minute_time})回踩中轨"
# 已经到过中轨上方后,再次触及中轨
if seen_above_middle and m_low <= middle_price:
return True, f"1m({minute_time})回踩中轨"
if m_high >= middle_price:
seen_above_middle = True
else: # short
# 当前1m开在中轨下方且反抽到中轨视为有效反抽
if m_open <= middle_price and m_high >= middle_price:
return True, f"1m({minute_time})反抽中轨"
# 已经到过中轨下方后,再次触及中轨
if seen_below_middle and m_high >= middle_price:
return True, f"1m({minute_time})反抽中轨"
if m_low <= middle_price:
seen_below_middle = True
return False, None
def get_touch_entry_price(self, five_min_ts, direction, touch_price):
"""
触轨开仓的保守成交价:
- 在该5分钟内找到首个触轨1m
- 用该1m收盘价作为基准
- 为避免“过于理想”,不允许优于触轨价,再叠加不利滑点
"""
minute_rows = self.one_minute_by_5m.get(five_min_ts, [])
if not minute_rows:
return self.apply_entry_slippage(touch_price, direction)
trigger_close = None
for minute in minute_rows:
m_high = minute['high']
m_low = minute['low']
m_close = minute['close']
if None in (m_high, m_low, m_close):
continue
if direction == 'short' and m_high >= touch_price:
trigger_close = m_close
break
if direction == 'long' and m_low <= touch_price:
trigger_close = m_close
break
if trigger_close is None:
base_price = touch_price
elif direction == 'long':
base_price = max(touch_price, trigger_close)
else:
base_price = min(touch_price, trigger_close)
return self.apply_entry_slippage(base_price, direction)
def clear_delay_reversal(self):
"""清理延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
def mark_delay_reversal(self, reverse_type, trigger_price, kline_index, timestamp):
"""记录延迟反转触发信息"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_index = kline_index
if reverse_type == 'long_to_short':
logger.info(f"[{timestamp}] 多仓触上轨 @ {trigger_price:.2f},进入延迟反转")
else:
logger.info(f"[{timestamp}] 空仓触下轨 @ {trigger_price:.2f},进入延迟反转")
def reverse_position(self, price, new_direction, timestamp, reason):
"""全平后反向开仓"""
if self.position == 0:
self.clear_delay_reversal()
return False
close_side = "" if self.position > 0 else ""
open_side = "" if new_direction == "long" else ""
self.close_position(price, 1.0, timestamp, f"{reason}-平{close_side}")
open_price = self.apply_entry_slippage(price, new_direction)
self.open_position(open_price, new_direction, timestamp, f"{reason}-开{open_side}")
self.mid_closed_half = False
self.clear_delay_reversal()
return True
def check_delay_reversal_signal(self, i, row, prev_row):
"""检查延迟反转是否在当前收盘K成立仅返回信号不直接执行"""
if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None:
return None
offset = i - self.delay_reverse_kline_index
# 禁止同K确认最早次K确认
if offset <= 0:
return None
high = row['high']
low = row['low']
if self.delay_reverse_type == 'long_to_short':
# 情况1触上轨后次K回调到记录上轨价
if offset == 1 and low <= self.delay_reverse_price:
return 'short', "延迟反转-次K回调确认", self.delay_reverse_price
# 情况2持续等待动态追踪上一根K线条件
if offset >= 2 and prev_row is not None:
prev_upper = prev_row['upper']
prev_touch_upper = pd.notna(prev_upper) and prev_row['high'] >= prev_upper
if prev_touch_upper:
if low <= prev_upper:
return 'short', "延迟反转-上一根触上轨后回调确认", prev_upper
else:
prev_body_low = min(prev_row['open'], prev_row['close'])
if low <= prev_body_low:
return 'short', "延迟反转-跌破上一根实体确认", prev_body_low
elif self.delay_reverse_type == 'short_to_long':
# 情况1触下轨后次K反弹到记录下轨价
if offset == 1 and high >= self.delay_reverse_price:
return 'long', "延迟反转-次K反弹确认", self.delay_reverse_price
# 情况2持续等待动态追踪上一根K线条件
if offset >= 2 and prev_row is not None:
prev_lower = prev_row['lower']
prev_touch_lower = pd.notna(prev_lower) and prev_row['low'] <= prev_lower
if prev_touch_lower:
if high >= prev_lower:
return 'long', "延迟反转-上一根触下轨后反弹确认", prev_lower
else:
prev_body_high = max(prev_row['open'], prev_row['close'])
if high >= prev_body_high:
return 'long', "延迟反转-突破上一根实体确认", prev_body_high
return None
def open_position(self, price, direction, timestamp, reason):
"""开仓或加仓"""
if self.position_count not in (0, 1):
return False
if self.position_count == 1:
current_direction = 'long' if self.position > 0 else 'short'
if direction != current_direction:
logger.warning("加仓方向不一致,跳过")
return False
margin_ratio = self.margin_ratio_1 if self.position_count == 0 else self.margin_ratio_2
margin = self.capital * margin_ratio
if margin <= 0:
logger.warning(f"[{timestamp}] 资金不足,无法开仓 | 可用资金: {self.capital:.4f}U")
return False
position_size = margin * self.leverage / price
fee = position_size * price * self.fee_rate
required = margin + fee
if self.capital < required:
logger.warning(
f"[{timestamp}] 可用资金不足,无法开仓 | 需要: {required:.4f}U | 可用: {self.capital:.4f}U"
)
return False
# 冻结保证金并扣除手续费
self.capital -= required
self.schedule_rebate(fee, timestamp)
if self.position_count == 0:
self.position = position_size if direction == 'long' else -position_size
self.entry_price = price
self.total_margin = margin
self.position_count = 1
action = f'{direction}'
else:
old_size = abs(self.position)
new_size = old_size + position_size
old_value = old_size * self.entry_price
new_value = position_size * price
self.entry_price = (old_value + new_value) / new_size
self.position = new_size if direction == 'long' else -new_size
self.total_margin += margin
self.position_count = 2
action = f'{direction}'
self.mid_closed_half = False
self.trades.append({
'timestamp': timestamp,
'action': action,
'price': price,
'size': position_size,
'margin': margin,
'fee': fee,
'rebate': self.get_rebate_amount(fee),
'capital': self.capital,
'reason': reason
})
logger.info(
f"[{timestamp}] {action} @ {price:.2f} | 仓位: {position_size:.4f} | "
f"保证金: {margin:.4f}U | 手续费: {fee:.4f}U | 返佣待到账: {self.get_rebate_amount(fee):.4f}U | "
f"可用资金: {self.capital:.4f}U | {reason}"
)
return True
def close_position(self, price, ratio, timestamp, reason):
"""平仓"""
if self.position == 0:
return False
ratio = min(max(ratio, 0.0), 1.0)
if ratio == 0:
return False
close_size = abs(self.position) * ratio
if self.position > 0:
pnl = close_size * (price - self.entry_price)
else:
pnl = close_size * (self.entry_price - price)
fee = close_size * price * self.fee_rate
released_margin = self.total_margin * ratio
self.capital += released_margin + pnl - fee
self.schedule_rebate(fee, timestamp)
if ratio >= 0.999:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
else:
self.position *= (1 - ratio)
self.total_margin *= (1 - ratio)
if abs(self.position) < 1e-12:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
self.trades.append({
'timestamp': timestamp,
'action': f'平仓{int(ratio*100)}%',
'price': price,
'size': close_size,
'pnl': pnl,
'fee': fee,
'rebate': self.get_rebate_amount(fee),
'capital': self.capital,
'reason': reason
})
logger.info(f"[{timestamp}] 平仓{int(ratio*100)}% @ {price:.2f} | "
f"盈亏: {pnl:.4f}U | 手续费: {fee:.4f}U | 返佣待到账: {self.get_rebate_amount(fee):.4f}U | "
f"可用资金: {self.capital:.4f}U | {reason}")
return True
def check_stop_loss(self, high, low):
"""检查当前收盘K是否触发止损信号"""
if self.position == 0:
return False
stop_price = low if self.position > 0 else high
if self.position > 0:
unrealized_pnl = abs(self.position) * (stop_price - self.entry_price)
else:
unrealized_pnl = abs(self.position) * (self.entry_price - stop_price)
return unrealized_pnl <= -self.total_margin * self.stop_loss_ratio
def run_backtest(self, start_date, end_date):
"""运行回测开仓当K触发平仓/止损仍按下一根K开盘执行"""
# 重置状态,支持同一实例重复回测
self.capital = self.initial_capital
self.position = 0
self.position_count = 0
self.entry_price = 0
self.total_margin = 0
self.mid_closed_half = False
self.trades = []
self.daily_pnl = []
self.pending_rebates = []
self.total_rebate_credited = 0.0
self.clear_delay_reversal()
logger.info(f"{'='*80}")
logger.info(f"开始回测: {start_date} ~ {end_date}")
logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})")
logger.info(f"{'='*80}")
# 从数据库加载数据
start_dt = pd.Timestamp(start_date)
end_dt = pd.Timestamp(end_date)
if isinstance(end_date, str) and len(end_date) <= 10:
end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
self.current_run_label = f"{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}"
start_ts = int(start_dt.timestamp() * 1000)
end_ts = int(end_dt.timestamp() * 1000)
query = BitMartETH5M.select().where(
(BitMartETH5M.id >= start_ts) & (BitMartETH5M.id <= end_ts)
).order_by(BitMartETH5M.id)
data = []
for row in query:
data.append({
'timestamp': row.id,
'open': row.open,
'high': row.high,
'low': row.low,
'close': row.close
})
if not data:
logger.error("没有找到数据!")
return None
df = pd.DataFrame(data)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.info(f"加载数据: {len(df)} 根K线")
logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}")
# 计算布林带
df = self.calculate_bollinger_bands(df)
if len(df) <= self.bb_period + 1:
logger.error("数据不足,无法执行回测")
return None
# 逐根K线回测开仓当K触发平仓/止损下一K开盘执行
for i in range(self.bb_period, len(df) - 1):
row = df.iloc[i]
prev_row = df.iloc[i-1] if i > 0 else None
next_row = df.iloc[i + 1]
signal_dt = row['datetime']
signal_ts = signal_dt.strftime('%Y-%m-%d %H:%M')
five_min_ts = int(row['timestamp'])
execute_ts = next_row['datetime'].strftime('%Y-%m-%d %H:%M')
next_open = float(next_row['open']) if pd.notna(next_row['open']) else None
high = row['high']
low = row['low']
upper = row['upper']
lower = row['lower']
middle = row['middle']
# 先处理当前收盘时刻返佣到账
self.apply_pending_rebates(signal_dt)
if pd.isna(upper) or pd.isna(lower) or pd.isna(middle):
continue
if next_open is None:
continue
# 检查止损收盘确认下一K开盘平仓
if self.check_stop_loss(high, low):
logger.warning(f"[{signal_ts}] 触发止损信号下一K开盘执行")
self.close_position(next_open, 1.0, execute_ts, f"止损-收盘确认({signal_ts})")
continue
# 已处于延迟反转状态时,先检查确认逻辑
if self.delay_reverse_price is not None:
reversal_signal = self.check_delay_reversal_signal(i, row, prev_row)
if reversal_signal is not None and self.position != 0:
new_direction, reason, reversal_price = reversal_signal
if reversal_price is None or pd.isna(reversal_price):
reversal_price = float(row['close'])
close_side = "" if self.position > 0 else ""
open_side = "" if new_direction == 'long' else ""
self.close_position(
reversal_price,
1.0,
signal_ts,
f"{reason}-当K确认({signal_ts})-平{close_side}"
)
open_price = self.apply_entry_slippage(reversal_price, new_direction)
self.open_position(
open_price,
new_direction,
signal_ts,
f"{reason}-当K确认({signal_ts})-开{open_side}"
)
self.mid_closed_half = False
self.clear_delay_reversal()
continue
# === 中轨平仓逻辑 ===
if self.position != 0:
had_mid_closed_half = self.mid_closed_half
if self.position > 0: # 多仓
# 回到开仓价全平+反手(仅在此前已平半的前提下触发)
if had_mid_closed_half and low <= self.entry_price:
self.close_position(next_open, 1.0, execute_ts, f"回开仓价全平-收盘确认({signal_ts})")
entry_price = self.apply_entry_slippage(next_open, 'short')
self.open_position(entry_price, 'short', execute_ts, f"回开仓价反手开空-收盘确认({signal_ts})")
self.mid_closed_half = False
continue
# 触中轨平半(收盘确认)
if not had_mid_closed_half and low <= middle <= high:
self.close_position(next_open, 0.5, execute_ts, f"触中轨平50%-收盘确认({signal_ts})")
self.mid_closed_half = True
continue
else: # 空仓
# 回到开仓价全平+反手(仅在此前已平半的前提下触发)
if had_mid_closed_half and high >= self.entry_price:
self.close_position(next_open, 1.0, execute_ts, f"回开仓价全平-收盘确认({signal_ts})")
entry_price = self.apply_entry_slippage(next_open, 'long')
self.open_position(entry_price, 'long', execute_ts, f"回开仓价反手开多-收盘确认({signal_ts})")
self.mid_closed_half = False
continue
# 触中轨平半(收盘确认)
if not had_mid_closed_half and low <= middle <= high:
self.close_position(next_open, 0.5, execute_ts, f"触中轨平50%-收盘确认({signal_ts})")
self.mid_closed_half = True
continue
# === 开仓与加仓逻辑 ===
if self.position == 0: # 空仓
self.clear_delay_reversal()
# 触上轨开空
if high >= upper:
entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper)
self.open_position(entry_price, 'short', signal_ts, f"触上轨开空-当K触发({signal_ts})")
# 触下轨开多
elif low <= lower:
entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower)
self.open_position(entry_price, 'long', signal_ts, f"触下轨开多-当K触发({signal_ts})")
continue
# 有持仓:先检查是否触发延迟反转(核心)
if self.position > 0 and high >= upper:
self.mark_delay_reversal('long_to_short', upper, i, signal_ts)
continue
elif self.position < 0 and low <= lower:
self.mark_delay_reversal('short_to_long', lower, i, signal_ts)
continue
# 进入延迟反转等待后,不再执行加仓
if self.delay_reverse_price is not None:
continue
# 同向加仓最多1次
if self.position_count == 1:
if self.position > 0 and low <= lower:
entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower)
self.open_position(entry_price, 'long', signal_ts, f"触下轨加多-当K触发({signal_ts})")
elif self.position < 0 and high >= upper:
entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper)
self.open_position(entry_price, 'short', signal_ts, f"触上轨加空-当K触发({signal_ts})")
# 回测末尾再处理一次返佣到账
self.apply_pending_rebates(df.iloc[-1]['datetime'])
# 最后平仓
if self.position != 0:
final_price = df.iloc[-1]['close']
final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M')
self.close_position(final_price, 1.0, final_time, "回测结束平仓")
# 生成报告
return self.generate_report(df)
def generate_report(self, df):
"""生成回测报告"""
logger.info(f"\n{'='*80}")
logger.info("回测报告")
logger.info(f"{'='*80}")
# 基本统计
total_trades = len([t for t in self.trades if '' in t['action']])
win_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) > 0])
loss_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) < 0])
total_pnl = sum([t.get('pnl', 0) for t in self.trades])
total_fee = sum([t.get('fee', 0) for t in self.trades])
total_rebate_expected = sum([t.get('rebate', 0) for t in self.trades if t.get('fee', 0) > 0])
pending_rebate = sum([x['amount'] for x in self.pending_rebates])
realized_net_fee = total_fee - self.total_rebate_credited
final_capital = self.capital
roi = (final_capital - self.initial_capital) / self.initial_capital * 100
logger.info(f"初始资金: {self.initial_capital:.2f}U")
logger.info(f"最终资金: {final_capital:.2f}U")
logger.info(f"总盈亏: {total_pnl:.2f}U")
logger.info(f"总手续费(开平全额): {total_fee:.2f}U")
logger.info(f"返佣应返总额: {total_rebate_expected:.2f}U")
logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U")
logger.info(f"返佣待到账: {pending_rebate:.2f}U")
logger.info(f"已实现净手续费: {realized_net_fee:.2f}U")
logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U")
logger.info(f"收益率: {roi:.2f}%")
logger.info(f"总交易次数: {total_trades}")
logger.info(f"盈利次数: {win_trades}")
logger.info(f"亏损次数: {loss_trades}")
if win_trades + loss_trades > 0:
logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%")
# 保存交易记录
trades_df = pd.DataFrame(self.trades)
output_dir = Path(__file__).parent / 'backtest_outputs' / 'trades'
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f'bb_backtest_{self.current_run_label}_trades.csv'
trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"\n交易记录已保存到: {output_file}")
return {
'initial_capital': self.initial_capital,
'final_capital': final_capital,
'total_pnl': total_pnl,
'total_fee': total_fee,
'total_rebate_expected': total_rebate_expected,
'total_rebate_credited': self.total_rebate_credited,
'pending_rebate': pending_rebate,
'realized_net_fee': realized_net_fee,
'roi': roi,
'total_trades': total_trades,
'win_trades': win_trades,
'loss_trades': loss_trades,
'trades_file': str(output_file),
'trades': self.trades
}
if __name__ == '__main__':
# 连接数据库
db.connect(reuse_if_open=True)
try:
# 创建回测实例
backtest = BollingerBandBacktest()
# 运行回测2026年全年
result = backtest.run_backtest('2026-01-01', '2026-12-31')
if result:
logger.success(f"\n回测完成!最终收益率: {result['roi']:.2f}%")
finally:
db.close()