Files
codex_jxs_code/bb_backtest_2025_multi_timeframe.py
2026-03-05 17:46:34 +08:00

581 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带延迟反转策略回测 - 2025年多周期测试
测试1分钟和15分钟周期的收益对比
策略规则:
1. BB(10, 2.5)
2. 空仓触上轨开空,触下轨开多
3. 同向加仓最多1次保证金1%
4. 延迟反转:触轨不立刻平仓,记录价格,回调到该价再平仓+反向开仓
5. 中轨平半+回开仓价全平
回测参数:
- 本金: 100U
- 杠杆: 100x
- 逐仓模式
- 开仓保证金: 1%
- 手续费: 0.05% (万五)
- 返佣: 90%次日早上8点到账
"""
import pandas as pd
from pathlib import Path
from peewee import *
from loguru import logger
import numpy as np
# 数据库配置
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
class BitMartETH1M(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BitMartETH3M(Model):
"""3分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_3m'
class BitMartETH5M(Model):
"""5分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
class BitMartETH15M(Model):
"""15分钟K线模型"""
id = BigIntegerField(primary_key=True)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_15m'
class BBDelayReversalBacktest:
"""布林带延迟反转策略回测"""
def __init__(self, timeframe='1m'):
# 策略参数
self.timeframe = timeframe
self.bb_period = 10
self.bb_std = 2.5
self.initial_capital = 100 # 初始本金100U
self.leverage = 100 # 100倍杠杆
self.fee_rate = 0.0005 # 万五手续费
self.rebate_rate = 0.9 # 90%返佣
self.margin_ratio = 0.01 # 开仓保证金1%
self.rebate_credit_hour = 8 # 次日早上8点返佣到账
# 账户状态
self.capital = self.initial_capital
self.position = 0 # 持仓量(正=多,负=空)
self.position_count = 0 # 持仓次数0=空仓1=首次2=加仓)
self.entry_price = 0 # 开仓均价
self.total_margin = 0 # 总保证金
# 延迟反转状态
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
# 中轨平仓记录
self.mid_closed_half = False
# 交易记录
self.trades = []
self.pending_rebates = []
self.total_rebate_credited = 0.0
def calculate_bollinger_bands(self, df):
"""计算布林带整体右移1根避免使用当前K收盘价"""
df['sma'] = df['close'].rolling(window=self.bb_period).mean()
df['std'] = df['close'].rolling(window=self.bb_period).std()
df['upper'] = (df['sma'] + self.bb_std * df['std']).shift(1)
df['lower'] = (df['sma'] - self.bb_std * df['std']).shift(1)
df['middle'] = df['sma'].shift(1)
return df
def schedule_rebate(self, fee, timestamp):
"""登记返佣到账时间次日08:00上海时间"""
rebate = fee * self.rebate_rate
if rebate <= 0:
return
trade_utc = pd.Timestamp(timestamp, tz='UTC')
trade_local = trade_utc.tz_convert('Asia/Shanghai')
credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour)
credit_utc = credit_local.tz_convert('UTC').tz_localize(None)
self.pending_rebates.append({
'credit_time': credit_utc,
'amount': rebate,
'trade_time': trade_utc.tz_localize(None),
})
def apply_pending_rebates(self, current_time):
"""处理当前时刻前应到账的返佣"""
if not self.pending_rebates:
return
remaining = []
for item in self.pending_rebates:
if item['credit_time'] <= current_time:
amount = item['amount']
self.capital += amount
self.total_rebate_credited += amount
else:
remaining.append(item)
self.pending_rebates = remaining
def clear_delay_reversal(self):
"""清理延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_index = None
def mark_delay_reversal(self, reverse_type, trigger_price, kline_index):
"""记录延迟反转触发信息"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_index = kline_index
def check_delay_reversal_signal(self, i, row, prev_row):
"""检查延迟反转是否成立"""
if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None:
return None
offset = i - self.delay_reverse_kline_index
if offset <= 0:
return None
high = row['high']
low = row['low']
if self.delay_reverse_type == 'long_to_short':
# 多转空:回调确认
if offset == 1 and low <= self.delay_reverse_price:
return 'short', self.delay_reverse_price, "次K回调确认"
if offset >= 2 and prev_row is not None:
prev_upper = prev_row['upper']
prev_touch_upper = pd.notna(prev_upper) and prev_row['high'] >= prev_upper
if prev_touch_upper:
if low <= prev_upper:
return 'short', prev_upper, "上一根触上轨后回调确认"
else:
prev_body_low = min(prev_row['open'], prev_row['close'])
if low <= prev_body_low:
return 'short', prev_body_low, "跌破上一根实体确认"
elif self.delay_reverse_type == 'short_to_long':
# 空转多:反弹确认
if offset == 1 and high >= self.delay_reverse_price:
return 'long', self.delay_reverse_price, "次K反弹确认"
if offset >= 2 and prev_row is not None:
prev_lower = prev_row['lower']
prev_touch_lower = pd.notna(prev_lower) and prev_row['low'] <= prev_lower
if prev_touch_lower:
if high >= prev_lower:
return 'long', prev_lower, "上一根触下轨后反弹确认"
else:
prev_body_high = max(prev_row['open'], prev_row['close'])
if high >= prev_body_high:
return 'long', prev_body_high, "突破上一根实体确认"
return None
def open_position(self, price, direction, timestamp, reason):
"""开仓或加仓"""
if self.position_count not in (0, 1):
return False
if self.position_count == 1:
current_direction = 'long' if self.position > 0 else 'short'
if direction != current_direction:
return False
margin = self.capital * self.margin_ratio
if margin <= 0:
return False
position_size = margin * self.leverage / price
fee = position_size * price * self.fee_rate
required = margin + fee
if self.capital < required:
return False
self.capital -= required
self.schedule_rebate(fee, timestamp)
if self.position_count == 0:
self.position = position_size if direction == 'long' else -position_size
self.entry_price = price
self.total_margin = margin
self.position_count = 1
else:
old_size = abs(self.position)
new_size = old_size + position_size
old_value = old_size * self.entry_price
new_value = position_size * price
self.entry_price = (old_value + new_value) / new_size
self.position = new_size if direction == 'long' else -new_size
self.total_margin += margin
self.position_count = 2
self.mid_closed_half = False
self.trades.append({
'timestamp': timestamp,
'action': f'{direction}' if self.position_count == 1 else f'{direction}',
'price': price,
'size': position_size,
'margin': margin,
'fee': fee,
'capital': self.capital,
'reason': reason
})
return True
def close_position(self, price, ratio, timestamp, reason):
"""平仓"""
if self.position == 0:
return False
ratio = min(max(ratio, 0.0), 1.0)
if ratio == 0:
return False
close_size = abs(self.position) * ratio
if self.position > 0:
pnl = close_size * (price - self.entry_price)
else:
pnl = close_size * (self.entry_price - price)
fee = close_size * price * self.fee_rate
released_margin = self.total_margin * ratio
self.capital += released_margin + pnl - fee
self.schedule_rebate(fee, timestamp)
if ratio >= 0.999:
self.position = 0
self.position_count = 0
self.total_margin = 0
self.entry_price = 0
self.mid_closed_half = False
self.clear_delay_reversal()
else:
self.position *= (1 - ratio)
self.total_margin *= (1 - ratio)
self.trades.append({
'timestamp': timestamp,
'action': f'平仓{int(ratio*100)}%',
'price': price,
'size': close_size,
'pnl': pnl,
'fee': fee,
'capital': self.capital,
'reason': reason
})
return True
def run_backtest(self, start_date, end_date):
"""运行回测"""
# 重置状态
self.capital = self.initial_capital
self.position = 0
self.position_count = 0
self.entry_price = 0
self.total_margin = 0
self.mid_closed_half = False
self.trades = []
self.pending_rebates = []
self.total_rebate_credited = 0.0
self.clear_delay_reversal()
logger.info(f"{'='*80}")
logger.info(f"开始回测: {start_date} ~ {end_date} | 周期: {self.timeframe}")
logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})")
logger.info(f"{'='*80}")
# 从数据库加载数据
start_dt = pd.Timestamp(start_date)
end_dt = pd.Timestamp(end_date)
if isinstance(end_date, str) and len(end_date) <= 10:
end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
start_ts = int(start_dt.timestamp() * 1000)
end_ts = int(end_dt.timestamp() * 1000)
# 根据周期选择数据表
model_mapping = {
'1m': BitMartETH1M,
'3m': BitMartETH3M,
'5m': BitMartETH5M,
'15m': BitMartETH15M,
}
Model = model_mapping.get(self.timeframe)
if Model is None:
logger.error(f"不支持的周期: {self.timeframe}")
return None
query = Model.select().where(
(Model.id >= start_ts) & (Model.id <= end_ts)
).order_by(Model.id)
data = []
for row in query:
data.append({
'timestamp': row.id,
'open': row.open,
'high': row.high,
'low': row.low,
'close': row.close
})
if not data:
logger.error("没有找到数据!")
return None
df = pd.DataFrame(data)
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.info(f"加载数据: {len(df)} 根K线")
logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}")
# 计算布林带
df = self.calculate_bollinger_bands(df)
if len(df) <= self.bb_period + 1:
logger.error("数据不足,无法执行回测")
return None
# 逐根K线回测
for i in range(self.bb_period, len(df)):
row = df.iloc[i]
prev_row = df.iloc[i-1] if i > 0 else None
signal_dt = row['datetime']
signal_ts = signal_dt.strftime('%Y-%m-%d %H:%M')
high = row['high']
low = row['low']
close = row['close']
upper = row['upper']
lower = row['lower']
middle = row['middle']
# 处理返佣到账
self.apply_pending_rebates(signal_dt)
if pd.isna(upper) or pd.isna(lower) or pd.isna(middle):
continue
# 检查延迟反转确认
if self.delay_reverse_price is not None:
reversal_signal = self.check_delay_reversal_signal(i, row, prev_row)
if reversal_signal is not None and self.position != 0:
new_direction, reversal_price, reason = reversal_signal
self.close_position(reversal_price, 1.0, signal_ts, f"{reason}-平仓")
self.open_position(reversal_price, new_direction, signal_ts, f"{reason}-开仓")
self.mid_closed_half = False
self.clear_delay_reversal()
continue
# 中轨平仓逻辑
if self.position != 0:
if self.position > 0:
# 回到开仓价全平+反手
if self.mid_closed_half and low <= self.entry_price:
self.close_position(close, 1.0, signal_ts, "回开仓价全平")
self.open_position(close, 'short', signal_ts, "回开仓价反手开空")
self.mid_closed_half = False
continue
# 触中轨平半
if not self.mid_closed_half and low <= middle <= high:
self.close_position(close, 0.5, signal_ts, "触中轨平50%")
self.mid_closed_half = True
continue
else: # 空仓
# 回到开仓价全平+反手
if self.mid_closed_half and high >= self.entry_price:
self.close_position(close, 1.0, signal_ts, "回开仓价全平")
self.open_position(close, 'long', signal_ts, "回开仓价反手开多")
self.mid_closed_half = False
continue
# 触中轨平半
if not self.mid_closed_half and low <= middle <= high:
self.close_position(close, 0.5, signal_ts, "触中轨平50%")
self.mid_closed_half = True
continue
# 开仓与加仓逻辑
if self.position == 0:
self.clear_delay_reversal()
# 触上轨开空
if high >= upper:
self.open_position(upper, 'short', signal_ts, "触上轨开空")
# 触下轨开多
elif low <= lower:
self.open_position(lower, 'long', signal_ts, "触下轨开多")
continue
# 延迟反转触发
if self.position > 0 and high >= upper:
self.mark_delay_reversal('long_to_short', upper, i)
continue
elif self.position < 0 and low <= lower:
self.mark_delay_reversal('short_to_long', lower, i)
continue
# 加仓
if self.delay_reverse_price is None and self.position_count == 1:
if self.position > 0 and low <= lower:
self.open_position(lower, 'long', signal_ts, "触下轨加多")
elif self.position < 0 and high >= upper:
self.open_position(upper, 'short', signal_ts, "触上轨加空")
# 回测末尾处理返佣
self.apply_pending_rebates(df.iloc[-1]['datetime'])
# 最后平仓
if self.position != 0:
final_price = df.iloc[-1]['close']
final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M')
self.close_position(final_price, 1.0, final_time, "回测结束平仓")
# 生成报告
return self.generate_report(df)
def generate_report(self, df):
"""生成回测报告"""
logger.info(f"\n{'='*80}")
logger.info(f"回测报告 - {self.timeframe}")
logger.info(f"{'='*80}")
# 基本统计
total_trades = len([t for t in self.trades if '' in t['action']])
win_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) > 0])
loss_trades = len([t for t in self.trades if '' in t['action'] and t.get('pnl', 0) < 0])
total_pnl = sum([t.get('pnl', 0) for t in self.trades])
total_fee = sum([t.get('fee', 0) for t in self.trades])
pending_rebate = sum([x['amount'] for x in self.pending_rebates])
realized_net_fee = total_fee - self.total_rebate_credited
final_capital = self.capital
roi = (final_capital - self.initial_capital) / self.initial_capital * 100
logger.info(f"初始资金: {self.initial_capital:.2f}U")
logger.info(f"最终资金: {final_capital:.2f}U")
logger.info(f"总盈亏: {total_pnl:.2f}U")
logger.info(f"总手续费: {total_fee:.2f}U")
logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U")
logger.info(f"返佣待到账: {pending_rebate:.2f}U")
logger.info(f"已实现净手续费: {realized_net_fee:.2f}U")
logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U")
logger.info(f"收益率: {roi:.2f}%")
logger.info(f"总交易次数: {total_trades}")
logger.info(f"盈利次数: {win_trades}")
logger.info(f"亏损次数: {loss_trades}")
if win_trades + loss_trades > 0:
logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%")
# 保存交易记录
trades_df = pd.DataFrame(self.trades)
output_dir = Path(__file__).parent / 'backtest_outputs' / 'trades'
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f'bb_backtest_2025_{self.timeframe}_trades.csv'
trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"\n交易记录已保存到: {output_file}")
return {
'timeframe': self.timeframe,
'initial_capital': self.initial_capital,
'final_capital': final_capital,
'total_pnl': total_pnl,
'total_fee': total_fee,
'total_rebate_credited': self.total_rebate_credited,
'pending_rebate': pending_rebate,
'realized_net_fee': realized_net_fee,
'roi': roi,
'total_trades': total_trades,
'win_trades': win_trades,
'loss_trades': loss_trades,
'win_rate': win_trades/(win_trades+loss_trades)*100 if (win_trades+loss_trades) > 0 else 0,
'trades_file': str(output_file)
}
if __name__ == '__main__':
# 连接数据库
db.connect(reuse_if_open=True)
try:
results = []
for tf in ['1m', '3m', '5m', '15m']:
logger.info("\n" + "="*80)
logger.info(f"测试 {tf} 周期")
logger.info("="*80)
backtest = BBDelayReversalBacktest(timeframe=tf)
result = backtest.run_backtest('2025-01-01', '2025-12-31')
if result:
results.append(result)
# 对比总结
if len(results) > 0:
logger.info("\n" + "="*80)
logger.info("回测对比总结")
logger.info("="*80)
for result in results:
logger.info(f"\n{result['timeframe']}周期】")
logger.info(f" 收益率: {result['roi']:.2f}%")
logger.info(f" 最终资金: {result['final_capital']:.2f}U")
logger.info(f" 总交易次数: {result['total_trades']}")
logger.info(f" 胜率: {result['win_rate']:.2f}%")
logger.info(f" 净手续费: {result['realized_net_fee']:.2f}U")
finally:
db.close()