Files
codex_jxs_code/bb_backtest_2025_multi_timeframe.py

581 lines
21 KiB
Python
Raw Permalink Normal View History

2026-03-05 17:46:34 +08:00
"""
布林带延迟反转策略回测 - 2025年多周期测试
测试1分钟和15分钟周期的收益对比
策略规则
1. BB(10, 2.5)
2. 空仓触上轨开空触下轨开多
3. 同向加仓最多1次保证金1%
4. 延迟反转触轨不立刻平仓记录价格回调到该价再平仓+反向开仓
5. 中轨平半+回开仓价全平
回测参数
- 本金: 100U
- 杠杆: 100x
- 逐仓模式
- 开仓保证金: 1%
- 手续费: 0.05% (万五)
- 返佣: 90%次日早上8点到账
"""
import pandas as pd
from pathlib import Path
from peewee import *
from loguru import logger
import numpy as np
# 数据库配置
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
class BitMartETH1M(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BitMartETH3M(Model):
"""3分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_3m'
class BitMartETH5M(Model):
"""5分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
class BitMartETH15M(Model):
"""15分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_15m'
class BBDelayReversalBacktest:
"""布林带延迟反转策略回测"""
def __init__(self, timeframe='1m'):
# 策略参数
self.timeframe = timeframe
self.bb_period = 10
self.bb_std = 2.5
self.initial_capital = 100 # 初始本金100U
self.leverage = 100 # 100倍杠杆
self.fee_rate = 0.0005 # 万五手续费
self.rebate_rate = 0.9 # 90%返佣
self.margin_ratio = 0.01 # 开仓保证金1%
self.rebate_credit_hour = 8 # 次日早上8点返佣到账
# 账户状态
self.capital = self.initial_capital
self.position = 0 # 持仓量(正=多,负=空)
self.position_count = 0 # 持仓次数0=空仓1=首次2=加仓)
self.entry_price = 0 # 开仓均价
self.total_margin = 0 # 总保证金
# 延迟反转状态
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
# 中轨平仓记录
self.mid_closed_half = False
# 交易记录
self.trades = []
self.pending_rebates = []
self.total_rebate_credited = 0.0
def calculate_bollinger_bands(self, df):
"""计算布林带整体右移1根避免使用当前K收盘价"""
df['sma'] = df['close'].rolling(window=self.bb_period).mean()
df['std'] = df['close'].rolling(window=self.bb_period).std()
df['upper'] = (df['sma'] + self.bb_std * df['std']).shift(1)
df['lower'] = (df['sma'] - self.bb_std * df['std']).shift(1)
df['middle'] = df['sma'].shift(1)
return df
def schedule_rebate(self, fee, timestamp):
"""登记返佣到账时间次日08:00上海时间"""
rebate = fee * self.rebate_rate
if rebate <= 0:
return
trade_utc = pd.Timestamp(timestamp, tz='UTC')
trade_local = trade_utc.tz_convert('Asia/Shanghai')
credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour)
credit_utc = credit_local.tz_convert('UTC').tz_localize(None)
self.pending_rebates.append({
'credit_time': credit_utc,
'amount': rebate,
'trade_time': trade_utc.tz_localize(None),
})
def apply_pending_rebates(self, current_time):
"""处理当前时刻前应到账的返佣"""
if not self.pending_rebates:
return
remaining = []
for item in self.pending_rebates:
if item['credit_time'] <= current_time:
amount = item['amount']
self.capital += amount
self.total_rebate_credited += amount
else:
remaining.append(item)
self.pending_rebates = remaining
def clear_delay_reversal(self):
"""清理延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
def mark_delay_reversal(self, reverse_type, trigger_price, kline_index):
"""记录延迟反转触发信息"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_index = kline_index
def check_delay_reversal_signal(self, i, row, prev_row):
"""检查延迟反转是否成立"""
if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None:
return None
offset = i - self.delay_reverse_kline_index
if offset <= 0:
return None
high = row['high']
low = row['low']
if self.delay_reverse_type == 'long_to_short':
# 多转空:回调确认
if offset == 1 and low <= self.delay_reverse_price:
return 'short', self.delay_reverse_price, "次K回调确认"
if offset >= 2 and prev_row is not None:
prev_upper = prev_row['upper']
prev_touch_upper = pd.notna(prev_upper) and prev_row['high'] >= prev_upper
if prev_touch_upper:
if low <= prev_upper:
return 'short', prev_upper, "上一根触上轨后回调确认"
else:
prev_body_low = min(prev_row['open'], prev_row['close'])
if low <= prev_body_low:
return 'short', prev_body_low, "跌破上一根实体确认"
elif self.delay_reverse_type == 'short_to_long':
# 空转多:反弹确认
if offset == 1 and high >= self.delay_reverse_price:
return 'long', self.delay_reverse_price, "次K反弹确认"
if offset >= 2 and prev_row is not None:
prev_lower = prev_row['lower']
prev_touch_lower = pd.notna(prev_lower) and prev_row['low'] <= prev_lower
if prev_touch_lower:
if high >= prev_lower:
return 'long', prev_lower, "上一根触下轨后反弹确认"
else:
prev_body_high = max(prev_row['open'], prev_row['close'])
if high >= prev_body_high:
return 'long', prev_body_high, "突破上一根实体确认"
return None
def open_position(self, price, direction, timestamp, reason):
"""开仓或加仓"""
if self.position_count not in (0, 1):
return False
if self.position_count == 1:
current_direction = 'long' if self.position > 0 else 'short'
if direction != current_direction:
return False
margin = self.capital * self.margin_ratio
if margin <= 0:
return False
position_size = margin * self.leverage / price
fee = position_size * price * self.fee_rate
required = margin + fee
if self.capital < required:
return False
self.capital -= required
self.schedule_rebate(fee, timestamp)
if self.position_count == 0:
self.position = position_size if direction == 'long' else -position_size
self.entry_price = price
self.total_margin = margin
self.position_count = 1
else:
old_size = abs(self.position)
new_size = old_size + position_size
old_value = old_size * self.entry_price
new_value = position_size * price
self.entry_price = (old_value + new_value) / new_size
self.position = new_size if direction == 'long' else -new_size
self.total_margin += margin
self.position_count = 2
self.mid_closed_half = False
self.trades.append({
'timestamp': timestamp,
'action': f'{direction}' if self.position_count == 1 else f'{direction}',
'price': price,
'size': position_size,
'margin': margin,
'fee': fee,
'capital': self.capital,
'reason': reason
})
return True
def close_position(self, price, ratio, timestamp, reason):
"""平仓"""
if self.position == 0:
return False
ratio = min(max(ratio, 0.0), 1.0)
if ratio == 0:
return False
close_size = abs(self.position) * ratio
if self.position > 0:
pnl = close_size * (price - self.entry_price)
else:
pnl = close_size * (self.entry_price - price)
fee = close_size * price * self.fee_rate
released_margin = self.total_margin * ratio
self.capital += released_margin + pnl - fee
self.schedule_rebate(fee, timestamp)
if ratio >= 0.999:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
else:
self.position *= (1 - ratio)
self.total_margin *= (1 - ratio)
self.trades.append({
'timestamp': timestamp,
'action': f'平仓{int(ratio*100)}%',
'price': price,
'size': close_size,
'pnl': pnl,
'fee': fee,
'capital': self.capital,
'reason': reason
})
return True
def run_backtest(self, start_date, end_date):
"""运行回测"""
# 重置状态
self.capital = self.initial_capital
self.position = 0
self.position_count = 0
self.entry_price = 0
self.total_margin = 0
self.mid_closed_half = False
self.trades = []
self.pending_rebates = []
self.total_rebate_credited = 0.0
self.clear_delay_reversal()
logger.info(f"{'='*80}")
logger.info(f"开始回测: {start_date} ~ {end_date} | 周期: {self.timeframe}")
logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})")
logger.info(f"{'='*80}")
# 从数据库加载数据
start_dt = pd.Timestamp(start_date)
end_dt = pd.Timestamp(end_date)
if isinstance(end_date, str) and len(end_date) <= 10:
end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
start_ts = int(start_dt.timestamp() * 1000)
end_ts = int(end_dt.timestamp() * 1000)
# 根据周期选择数据表
model_mapping = {
'1m': BitMartETH1M,
'3m': BitMartETH3M,
'5m': BitMartETH5M,
'15m': BitMartETH15M,
}
Model = model_mapping.get(self.timeframe)
if Model is None:
logger.error(f"不支持的周期: {self.timeframe}")
return None
query = Model.select().where(
(Model.id >= start_ts) & (Model.id <= end_ts)
).order_by(Model.id)
data = []
for row in query:
data.append({
'timestamp': row.id,
'open': row.open,
'high': row.high,
'low': row.low,
'close': row.close
})
if not data:
logger.error("没有找到数据!")
return None
df = pd.DataFrame(data)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.info(f"加载数据: {len(df)} 根K线")
logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}")
# 计算布林带
df = self.calculate_bollinger_bands(df)
if len(df) <= self.bb_period + 1:
logger.error("数据不足,无法执行回测")
return None
# 逐根K线回测
for i in range(self.bb_period, len(df)):
row = df.iloc[i]
prev_row = df.iloc[i-1] if i > 0 else None
signal_dt = row['datetime']
signal_ts = signal_dt.strftime('%Y-%m-%d %H:%M')
high = row['high']
low = row['low']
close = row['close']
upper = row['upper']
lower = row['lower']
middle = row['middle']
# 处理返佣到账
self.apply_pending_rebates(signal_dt)
if pd.isna(upper) or pd.isna(lower) or pd.isna(middle):
continue
# 检查延迟反转确认
if self.delay_reverse_price is not None:
reversal_signal = self.check_delay_reversal_signal(i, row, prev_row)
if reversal_signal is not None and self.position != 0:
new_direction, reversal_price, reason = reversal_signal
self.close_position(reversal_price, 1.0, signal_ts, f"{reason}-平仓")
self.open_position(reversal_price, new_direction, signal_ts, f"{reason}-开仓")
self.mid_closed_half = False
self.clear_delay_reversal()
continue
# 中轨平仓逻辑
if self.position != 0:
if self.position > 0:
# 回到开仓价全平+反手
if self.mid_closed_half and low <= self.entry_price:
self.close_position(close, 1.0, signal_ts, "回开仓价全平")
self.open_position(close, 'short', signal_ts, "回开仓价反手开空")
self.mid_closed_half = False
continue
# 触中轨平半
if not self.mid_closed_half and low <= middle <= high:
self.close_position(close, 0.5, signal_ts, "触中轨平50%")
self.mid_closed_half = True
continue
else: # 空仓
# 回到开仓价全平+反手
if self.mid_closed_half and high >= self.entry_price:
self.close_position(close, 1.0, signal_ts, "回开仓价全平")
self.open_position(close, 'long', signal_ts, "回开仓价反手开多")
self.mid_closed_half = False
continue
# 触中轨平半
if not self.mid_closed_half and low <= middle <= high:
self.close_position(close, 0.5, signal_ts, "触中轨平50%")
self.mid_closed_half = True
continue
# 开仓与加仓逻辑
if self.position == 0:
self.clear_delay_reversal()
# 触上轨开空
if high >= upper:
self.open_position(upper, 'short', signal_ts, "触上轨开空")
# 触下轨开多
elif low <= lower:
self.open_position(lower, 'long', signal_ts, "触下轨开多")
continue
# 延迟反转触发
if self.position > 0 and high >= upper:
self.mark_delay_reversal('long_to_short', upper, i)
continue
elif self.position < 0 and low <= lower:
self.mark_delay_reversal('short_to_long', lower, i)
continue
# 加仓
if self.delay_reverse_price is None and self.position_count == 1:
if self.position > 0 and low <= lower:
self.open_position(lower, 'long', signal_ts, "触下轨加多")
elif self.position < 0 and high >= upper:
self.open_position(upper, 'short', signal_ts, "触上轨加空")
# 回测末尾处理返佣
self.apply_pending_rebates(df.iloc[-1]['datetime'])
# 最后平仓
if self.position != 0:
final_price = df.iloc[-1]['close']
final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M')
self.close_position(final_price, 1.0, final_time, "回测结束平仓")
# 生成报告
return self.generate_report(df)
def generate_report(self, df):
"""生成回测报告"""
logger.info(f"\n{'='*80}")
logger.info(f"回测报告 - {self.timeframe}")
logger.info(f"{'='*80}")
# 基本统计
total_trades = len([t for t in self.trades if '' in t['action']])
win_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) > 0])
loss_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) < 0])
total_pnl = sum([t.get('pnl', 0) for t in self.trades])
total_fee = sum([t.get('fee', 0) for t in self.trades])
pending_rebate = sum([x['amount'] for x in self.pending_rebates])
realized_net_fee = total_fee - self.total_rebate_credited
final_capital = self.capital
roi = (final_capital - self.initial_capital) / self.initial_capital * 100
logger.info(f"初始资金: {self.initial_capital:.2f}U")
logger.info(f"最终资金: {final_capital:.2f}U")
logger.info(f"总盈亏: {total_pnl:.2f}U")
logger.info(f"总手续费: {total_fee:.2f}U")
logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U")
logger.info(f"返佣待到账: {pending_rebate:.2f}U")
logger.info(f"已实现净手续费: {realized_net_fee:.2f}U")
logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U")
logger.info(f"收益率: {roi:.2f}%")
logger.info(f"总交易次数: {total_trades}")
logger.info(f"盈利次数: {win_trades}")
logger.info(f"亏损次数: {loss_trades}")
if win_trades + loss_trades > 0:
logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%")
# 保存交易记录
trades_df = pd.DataFrame(self.trades)
output_dir = Path(__file__).parent / 'backtest_outputs' / 'trades'
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f'bb_backtest_2025_{self.timeframe}_trades.csv'
trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"\n交易记录已保存到: {output_file}")
return {
'timeframe': self.timeframe,
'initial_capital': self.initial_capital,
'final_capital': final_capital,
'total_pnl': total_pnl,
'total_fee': total_fee,
'total_rebate_credited': self.total_rebate_credited,
'pending_rebate': pending_rebate,
'realized_net_fee': realized_net_fee,
'roi': roi,
'total_trades': total_trades,
'win_trades': win_trades,
'loss_trades': loss_trades,
'win_rate': win_trades/(win_trades+loss_trades)*100 if (win_trades+loss_trades) > 0 else 0,
'trades_file': str(output_file)
}
if __name__ == '__main__':
# 连接数据库
db.connect(reuse_if_open=True)
try:
results = []
for tf in ['1m', '3m', '5m', '15m']:
logger.info("\n" + "="*80)
logger.info(f"测试 {tf} 周期")
logger.info("="*80)
backtest = BBDelayReversalBacktest(timeframe=tf)
result = backtest.run_backtest('2025-01-01', '2025-12-31')
if result:
results.append(result)
# 对比总结
if len(results) > 0:
logger.info("\n" + "="*80)
logger.info("回测对比总结")
logger.info("="*80)
for result in results:
logger.info(f"\n{result['timeframe']}周期】")
logger.info(f" 收益率: {result['roi']:.2f}%")
logger.info(f" 最终资金: {result['final_capital']:.2f}U")
logger.info(f" 总交易次数: {result['total_trades']}")
logger.info(f" 胜率: {result['win_rate']:.2f}%")
logger.info(f" 净手续费: {result['realized_net_fee']:.2f}U")
finally:
db.close()