Files
codex_jxs_code/bb_backtest_march_2026.py

633 lines
25 KiB
Python
Raw Normal View History

2026-03-04 16:48:24 +08:00
"""
布林带延迟反转策略回测 - 2026年3月
策略规则
1. 5分钟K线BB(10, 2.5)
2. 空仓触上轨开空触下轨开多
3. 同向加仓最多1次保证金递增(1%->2%)
4. 延迟反转触轨不立刻平仓记录价格回调到该价再平仓+反向开仓
5. 中轨平半+回开仓价全平
6. 止损亏损达保证金50%
"""
import pandas as pd
from pathlib import Path
from peewee import *
from loguru import logger
# 数据库配置
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
class BitMartETH5M(Model):
"""5分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
class BitMartETH1M(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BollingerBandBacktest:
"""布林带延迟反转策略回测"""
def __init__(self):
# 策略参数
self.bb_period = 10
self.bb_std = 2.5
self.initial_capital = 100 # 初始本金100U
self.leverage = 50 # 50倍杠杆
self.fee_rate = 0.0005 # 万五手续费
self.rebate_rate = 0.9 # 90%返佣
self.margin_ratio_1 = 0.01 # 首次开仓保证金比例1%
self.margin_ratio_2 = 0.02 # 加仓保证金比例2%
self.stop_loss_ratio = 0.5 # 止损比例50%
self.entry_slippage = 0.0002 # 开仓滑点2bps
# 账户状态
self.capital = self.initial_capital
self.position = 0 # 持仓量(正=多,负=空)
self.position_count = 0 # 持仓次数0=空仓1=首次2=加仓)
self.entry_price = 0 # 开仓均价
self.total_margin = 0 # 总保证金
# 延迟反转状态
self.delay_reverse_price = None # 记录触碰轨道时的轨道价格
self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long'
self.delay_reverse_kline_index = None # 触发延迟反转的K线索引
# 中轨平仓记录
self.mid_closed_half = False # 是否已平50%
# 1分钟K线缓存key=5分钟起始时间戳, value=该5分钟内的1m列表
self.one_minute_by_5m = {}
# 交易记录
self.trades = []
self.daily_pnl = []
def calculate_bollinger_bands(self, df):
"""计算布林带"""
df['sma'] = df['close'].rolling(window=self.bb_period).mean()
df['std'] = df['close'].rolling(window=self.bb_period).std()
df['upper'] = df['sma'] + self.bb_std * df['std']
df['lower'] = df['sma'] - self.bb_std * df['std']
df['middle'] = df['sma']
return df
def get_net_fee(self, fee):
"""计算扣除返佣后的净手续费"""
return fee * (1 - self.rebate_rate)
def apply_entry_slippage(self, price, direction):
"""按方向施加不利滑点"""
if direction == 'long':
return price * (1 + self.entry_slippage)
return price * (1 - self.entry_slippage)
def load_one_minute_cache(self, start_ts, end_ts):
"""加载并缓存1分钟K线用于5分钟内走势确认"""
query = BitMartETH1M.select().where(
(BitMartETH1M.id >= start_ts) & (BitMartETH1M.id <= end_ts)
).order_by(BitMartETH1M.id)
bucket = {}
for row in query:
five_min_start = int(row.id - (row.id % 300000))
bucket.setdefault(five_min_start, []).append({
'timestamp': int(row.id),
'open': float(row.open) if row.open is not None else None,
'high': float(row.high) if row.high is not None else None,
'low': float(row.low) if row.low is not None else None,
'close': float(row.close) if row.close is not None else None,
})
self.one_minute_by_5m = bucket
total_rows = sum(len(v) for v in bucket.values())
logger.info(f"加载1分钟数据: {total_rows} 根 | 5分钟桶: {len(bucket)}")
def should_close_half_by_1m_trend(self, five_min_ts, middle_price, side):
"""
1分钟顺序确认中轨平半
- 多仓先到中轨上方再回踩中轨
- 空仓先到中轨下方再反抽中轨
"""
minute_rows = self.one_minute_by_5m.get(five_min_ts, [])
if not minute_rows:
return False, None
seen_above_middle = False
seen_below_middle = False
for minute in minute_rows:
m_open = minute['open']
m_high = minute['high']
m_low = minute['low']
m_close = minute['close']
if None in (m_open, m_high, m_low, m_close):
continue
minute_time = pd.to_datetime(minute['timestamp'], unit='ms').strftime('%H:%M')
if side == 'long':
# 当前1m开在中轨上方且回踩到中轨视为有效回踩
if m_open >= middle_price and m_low <= middle_price:
return True, f"1m({minute_time})回踩中轨"
# 已经到过中轨上方后,再次触及中轨
if seen_above_middle and m_low <= middle_price:
return True, f"1m({minute_time})回踩中轨"
if m_high >= middle_price:
seen_above_middle = True
else: # short
# 当前1m开在中轨下方且反抽到中轨视为有效反抽
if m_open <= middle_price and m_high >= middle_price:
return True, f"1m({minute_time})反抽中轨"
# 已经到过中轨下方后,再次触及中轨
if seen_below_middle and m_high >= middle_price:
return True, f"1m({minute_time})反抽中轨"
if m_low <= middle_price:
seen_below_middle = True
return False, None
def get_touch_entry_price(self, five_min_ts, direction, touch_price):
"""
触轨开仓的保守成交价
- 在该5分钟内找到首个触轨1m
- 用该1m收盘价作为基准
- 为避免过于理想不允许优于触轨价再叠加不利滑点
"""
minute_rows = self.one_minute_by_5m.get(five_min_ts, [])
if not minute_rows:
return self.apply_entry_slippage(touch_price, direction)
trigger_close = None
for minute in minute_rows:
m_high = minute['high']
m_low = minute['low']
m_close = minute['close']
if None in (m_high, m_low, m_close):
continue
if direction == 'short' and m_high >= touch_price:
trigger_close = m_close
break
if direction == 'long' and m_low <= touch_price:
trigger_close = m_close
break
if trigger_close is None:
base_price = touch_price
elif direction == 'long':
base_price = max(touch_price, trigger_close)
else:
base_price = min(touch_price, trigger_close)
return self.apply_entry_slippage(base_price, direction)
def clear_delay_reversal(self):
"""清理延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
def mark_delay_reversal(self, reverse_type, trigger_price, kline_index, timestamp):
"""记录延迟反转触发信息"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_index = kline_index
if reverse_type == 'long_to_short':
logger.info(f"[{timestamp}] 多仓触上轨 @ {trigger_price:.2f},进入延迟反转")
else:
logger.info(f"[{timestamp}] 空仓触下轨 @ {trigger_price:.2f},进入延迟反转")
def reverse_position(self, price, new_direction, timestamp, reason):
"""全平后反向开仓"""
if self.position == 0:
self.clear_delay_reversal()
return False
close_side = "" if self.position > 0 else ""
open_side = "" if new_direction == "long" else ""
self.close_position(price, 1.0, timestamp, f"{reason}-平{close_side}")
open_price = self.apply_entry_slippage(price, new_direction)
self.open_position(open_price, new_direction, timestamp, f"{reason}-开{open_side}")
self.mid_closed_half = False
self.clear_delay_reversal()
return True
def check_and_execute_delay_reversal(self, i, row, prev_row, timestamp):
"""执行延迟反转三段判断逻辑"""
if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None:
return False
offset = i - self.delay_reverse_kline_index
high = row['high']
low = row['low']
if self.delay_reverse_type == 'long_to_short':
# 情况1/2触上轨后同K或下一K回调到记录上轨价
if offset <= 1 and low <= self.delay_reverse_price:
tag = "同K回调确认" if offset == 0 else "次K回调确认"
return self.reverse_position(self.delay_reverse_price, 'short', timestamp, f"延迟反转-{tag}")
# 情况3持续等待动态追踪上一根K线条件
if offset >= 2 and prev_row is not None:
prev_touch_upper = prev_row['high'] >= prev_row['upper']
if prev_touch_upper:
ref_price = prev_row['upper']
if low <= ref_price:
return self.reverse_position(ref_price, 'short', timestamp, "延迟反转-上一根触上轨后回调确认")
else:
prev_body_low = min(prev_row['open'], prev_row['close'])
if low <= prev_body_low:
return self.reverse_position(prev_body_low, 'short', timestamp, "延迟反转-跌破上一根实体确认")
elif self.delay_reverse_type == 'short_to_long':
# 情况1/2触下轨后同K或下一K反弹到记录下轨价
if offset <= 1 and high >= self.delay_reverse_price:
tag = "同K反弹确认" if offset == 0 else "次K反弹确认"
return self.reverse_position(self.delay_reverse_price, 'long', timestamp, f"延迟反转-{tag}")
# 情况3持续等待动态追踪上一根K线条件
if offset >= 2 and prev_row is not None:
prev_touch_lower = prev_row['low'] <= prev_row['lower']
if prev_touch_lower:
ref_price = prev_row['lower']
if high >= ref_price:
return self.reverse_position(ref_price, 'long', timestamp, "延迟反转-上一根触下轨后反弹确认")
else:
prev_body_high = max(prev_row['open'], prev_row['close'])
if high >= prev_body_high:
return self.reverse_position(prev_body_high, 'long', timestamp, "延迟反转-突破上一根实体确认")
return False
def open_position(self, price, direction, timestamp, reason):
"""开仓或加仓"""
if self.position_count not in (0, 1):
return False
if self.position_count == 1:
current_direction = 'long' if self.position > 0 else 'short'
if direction != current_direction:
logger.warning("加仓方向不一致,跳过")
return False
margin_ratio = self.margin_ratio_1 if self.position_count == 0 else self.margin_ratio_2
margin = self.capital * margin_ratio
if margin <= 0:
logger.warning(f"[{timestamp}] 资金不足,无法开仓 | 可用资金: {self.capital:.4f}U")
return False
position_size = margin * self.leverage / price
fee = position_size * price * self.fee_rate
net_fee = self.get_net_fee(fee)
required = margin + net_fee
if self.capital < required:
logger.warning(
f"[{timestamp}] 可用资金不足,无法开仓 | 需要: {required:.4f}U | 可用: {self.capital:.4f}U"
)
return False
# 冻结保证金并扣除手续费
self.capital -= required
if self.position_count == 0:
self.position = position_size if direction == 'long' else -position_size
self.entry_price = price
self.total_margin = margin
self.position_count = 1
action = f'{direction}'
else:
old_size = abs(self.position)
new_size = old_size + position_size
old_value = old_size * self.entry_price
new_value = position_size * price
self.entry_price = (old_value + new_value) / new_size
self.position = new_size if direction == 'long' else -new_size
self.total_margin += margin
self.position_count = 2
action = f'{direction}'
self.mid_closed_half = False
self.trades.append({
'timestamp': timestamp,
'action': action,
'price': price,
'size': position_size,
'margin': margin,
'fee': net_fee,
'capital': self.capital,
'reason': reason
})
logger.info(
f"[{timestamp}] {action} @ {price:.2f} | 仓位: {position_size:.4f} | "
f"保证金: {margin:.4f}U | 手续费: {net_fee:.4f}U | 可用资金: {self.capital:.4f}U | {reason}"
)
return True
def close_position(self, price, ratio, timestamp, reason):
"""平仓"""
if self.position == 0:
return False
ratio = min(max(ratio, 0.0), 1.0)
if ratio == 0:
return False
close_size = abs(self.position) * ratio
if self.position > 0:
pnl = close_size * (price - self.entry_price)
else:
pnl = close_size * (self.entry_price - price)
fee = close_size * price * self.fee_rate
net_fee = self.get_net_fee(fee)
released_margin = self.total_margin * ratio
self.capital += released_margin + pnl - net_fee
if ratio >= 0.999:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
else:
self.position *= (1 - ratio)
self.total_margin *= (1 - ratio)
if abs(self.position) < 1e-12:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
self.trades.append({
'timestamp': timestamp,
'action': f'平仓{int(ratio*100)}%',
'price': price,
'size': close_size,
'pnl': pnl,
'fee': net_fee,
'capital': self.capital,
'reason': reason
})
logger.info(f"[{timestamp}] 平仓{int(ratio*100)}% @ {price:.2f} | "
f"盈亏: {pnl:.4f}U | 手续费: {net_fee:.4f}U | 可用资金: {self.capital:.4f}U | {reason}")
return True
def check_stop_loss(self, high, low, timestamp):
"""检查止损"""
if self.position == 0:
return False
stop_price = low if self.position > 0 else high
if self.position > 0:
unrealized_pnl = abs(self.position) * (stop_price - self.entry_price)
else:
unrealized_pnl = abs(self.position) * (self.entry_price - stop_price)
# 止损条件亏损达到保证金的50%
if unrealized_pnl <= -self.total_margin * self.stop_loss_ratio:
logger.warning(f"触发止损!浮亏: {unrealized_pnl:.2f}U | 保证金: {self.total_margin:.2f}U")
self.close_position(stop_price, 1.0, timestamp, "止损")
return True
return False
def run_backtest(self, start_date, end_date):
"""运行回测"""
logger.info(f"{'='*80}")
logger.info(f"开始回测: {start_date} ~ {end_date}")
logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})")
logger.info(f"{'='*80}")
# 从数据库加载数据
start_ts = int(pd.Timestamp(start_date).timestamp() * 1000)
end_ts = int(pd.Timestamp(end_date).timestamp() * 1000)
query = BitMartETH5M.select().where(
(BitMartETH5M.id >= start_ts) & (BitMartETH5M.id <= end_ts)
).order_by(BitMartETH5M.id)
data = []
for row in query:
data.append({
'timestamp': row.id,
'open': row.open,
'high': row.high,
'low': row.low,
'close': row.close
})
if not data:
logger.error("没有找到数据!")
return None
df = pd.DataFrame(data)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.info(f"加载数据: {len(df)} 根K线")
logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}")
# 加载对应时间范围的1分钟数据用于中轨平半确认
one_min_start = int(df['timestamp'].min())
one_min_end = int(df['timestamp'].max()) + 300000
self.load_one_minute_cache(one_min_start, one_min_end)
# 计算布林带
df = self.calculate_bollinger_bands(df)
# 逐根K线回测
for i in range(self.bb_period, len(df)):
row = df.iloc[i]
prev_row = df.iloc[i-1] if i > 0 else None
timestamp = row['datetime'].strftime('%Y-%m-%d %H:%M')
five_min_ts = int(row['timestamp'])
high = row['high']
low = row['low']
upper = row['upper']
lower = row['lower']
middle = row['middle']
# 检查止损
if self.check_stop_loss(high, low, timestamp):
continue
# 已处于延迟反转状态时,先执行确认逻辑
if self.delay_reverse_price is not None:
if self.check_and_execute_delay_reversal(i, row, prev_row, timestamp):
continue
# === 中轨平仓逻辑 ===
if self.position != 0:
if self.position > 0: # 多仓
# 1m顺序确认先上方再回踩中轨平50%
if not self.mid_closed_half and low <= middle <= high:
should_close_half, reason_1m = self.should_close_half_by_1m_trend(
five_min_ts, middle, 'long'
)
if should_close_half:
self.close_position(middle, 0.5, timestamp, f"触中轨平50%-{reason_1m}")
self.mid_closed_half = True
# 继续回落到开仓均价全平
if self.position != 0 and self.mid_closed_half and low <= self.entry_price:
self.close_position(self.entry_price, 1.0, timestamp, "回开仓价全平")
else: # 空仓
# 1m顺序确认先下方再反抽中轨平50%
if not self.mid_closed_half and low <= middle <= high:
should_close_half, reason_1m = self.should_close_half_by_1m_trend(
five_min_ts, middle, 'short'
)
if should_close_half:
self.close_position(middle, 0.5, timestamp, f"触中轨平50%-{reason_1m}")
self.mid_closed_half = True
# 继续反弹到开仓均价全平
if self.position != 0 and self.mid_closed_half and high >= self.entry_price:
self.close_position(self.entry_price, 1.0, timestamp, "回开仓价全平")
# === 开仓与加仓逻辑 ===
if self.position == 0: # 空仓
self.clear_delay_reversal()
# 触上轨开空
if high >= upper:
entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper)
self.open_position(entry_price, 'short', timestamp, "触上轨开空")
# 触下轨开多
elif low <= lower:
entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower)
self.open_position(entry_price, 'long', timestamp, "触下轨开多")
continue
# 有持仓:先检查是否触发延迟反转(核心)
if self.position > 0 and high >= upper:
self.mark_delay_reversal('long_to_short', upper, i, timestamp)
if self.check_and_execute_delay_reversal(i, row, prev_row, timestamp):
continue
elif self.position < 0 and low <= lower:
self.mark_delay_reversal('short_to_long', lower, i, timestamp)
if self.check_and_execute_delay_reversal(i, row, prev_row, timestamp):
continue
# 进入延迟反转等待后,不再执行加仓
if self.delay_reverse_price is not None:
continue
# 同向加仓最多1次
if self.position_count == 1:
if self.position > 0 and low <= lower:
entry_price = self.get_touch_entry_price(five_min_ts, 'long', lower)
self.open_position(entry_price, 'long', timestamp, "触下轨加多")
elif self.position < 0 and high >= upper:
entry_price = self.get_touch_entry_price(five_min_ts, 'short', upper)
self.open_position(entry_price, 'short', timestamp, "触上轨加空")
# 最后平仓
if self.position != 0:
final_price = df.iloc[-1]['close']
final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M')
self.close_position(final_price, 1.0, final_time, "回测结束平仓")
# 生成报告
return self.generate_report(df)
def generate_report(self, df):
"""生成回测报告"""
logger.info(f"\n{'='*80}")
logger.info("回测报告")
logger.info(f"{'='*80}")
# 基本统计
total_trades = len([t for t in self.trades if '' in t['action']])
win_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) > 0])
loss_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) < 0])
total_pnl = sum([t.get('pnl', 0) for t in self.trades])
total_fee = sum([t.get('fee', 0) for t in self.trades])
final_capital = self.capital
roi = (final_capital - self.initial_capital) / self.initial_capital * 100
logger.info(f"初始资金: {self.initial_capital:.2f}U")
logger.info(f"最终资金: {final_capital:.2f}U")
logger.info(f"总盈亏: {total_pnl:.2f}U")
logger.info(f"总手续费: {total_fee:.2f}U")
logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U")
logger.info(f"收益率: {roi:.2f}%")
logger.info(f"总交易次数: {total_trades}")
logger.info(f"盈利次数: {win_trades}")
logger.info(f"亏损次数: {loss_trades}")
if win_trades + loss_trades > 0:
logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%")
# 保存交易记录
trades_df = pd.DataFrame(self.trades)
output_file = 'bb_backtest_march_2026_trades.csv'
trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"\n交易记录已保存到: {output_file}")
return {
'initial_capital': self.initial_capital,
'final_capital': final_capital,
'total_pnl': total_pnl,
'total_fee': total_fee,
'roi': roi,
'total_trades': total_trades,
'win_trades': win_trades,
'loss_trades': loss_trades,
'trades': self.trades
}
if __name__ == '__main__':
# 连接数据库
db.connect(reuse_if_open=True)
try:
# 创建回测实例
backtest = BollingerBandBacktest()
# 运行回测2026年3月
result = backtest.run_backtest('2026-03-01', '2026-03-31')
if result:
logger.success(f"\n回测完成!最终收益率: {result['roi']:.2f}%")
finally:
db.close()