""" 布林带延迟反转策略回测 - 2025年多周期测试 测试1分钟和15分钟周期的收益对比 策略规则: 1. BB(10, 2.5) 2. 空仓触上轨开空,触下轨开多 3. 同向加仓最多1次,保证金1% 4. 延迟反转:触轨不立刻平仓,记录价格,回调到该价再平仓+反向开仓 5. 中轨平半+回开仓价全平 回测参数: - 本金: 100U - 杠杆: 100x - 逐仓模式 - 开仓保证金: 1% - 手续费: 0.05% (万五) - 返佣: 90%,次日早上8点到账 """ import pandas as pd from pathlib import Path from peewee import * from loguru import logger import numpy as np # 数据库配置 DB_PATH = Path(__file__).parent / 'models' / 'database.db' db = SqliteDatabase(str(DB_PATH)) class BitMartETH1M(Model): """1分钟K线模型""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) class Meta: database = db table_name = 'bitmart_eth_1m' class BitMartETH3M(Model): """3分钟K线模型""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) class Meta: database = db table_name = 'bitmart_eth_3m' class BitMartETH5M(Model): """5分钟K线模型""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) class Meta: database = db table_name = 'bitmart_eth_5m' class BitMartETH15M(Model): """15分钟K线模型""" id = BigIntegerField(primary_key=True) open = FloatField(null=True) high = FloatField(null=True) low = FloatField(null=True) close = FloatField(null=True) class Meta: database = db table_name = 'bitmart_eth_15m' class BBDelayReversalBacktest: """布林带延迟反转策略回测""" def __init__(self, timeframe='1m'): # 策略参数 self.timeframe = timeframe self.bb_period = 10 self.bb_std = 2.5 self.initial_capital = 100 # 初始本金100U self.leverage = 100 # 100倍杠杆 self.fee_rate = 0.0005 # 万五手续费 self.rebate_rate = 0.9 # 90%返佣 self.margin_ratio = 0.01 # 开仓保证金1% self.rebate_credit_hour = 8 # 次日早上8点返佣到账 # 账户状态 self.capital = self.initial_capital self.position = 0 # 持仓量(正=多,负=空) self.position_count = 0 # 持仓次数(0=空仓,1=首次,2=加仓) self.entry_price = 0 # 开仓均价 self.total_margin = 0 # 总保证金 # 延迟反转状态 self.delay_reverse_price = None self.delay_reverse_type = None self.delay_reverse_kline_index = None # 中轨平仓记录 self.mid_closed_half = False # 交易记录 self.trades = [] self.pending_rebates = [] self.total_rebate_credited = 0.0 def calculate_bollinger_bands(self, df): """计算布林带(整体右移1根,避免使用当前K收盘价)""" df['sma'] = df['close'].rolling(window=self.bb_period).mean() df['std'] = df['close'].rolling(window=self.bb_period).std() df['upper'] = (df['sma'] + self.bb_std * df['std']).shift(1) df['lower'] = (df['sma'] - self.bb_std * df['std']).shift(1) df['middle'] = df['sma'].shift(1) return df def schedule_rebate(self, fee, timestamp): """登记返佣到账时间(次日08:00,上海时间)""" rebate = fee * self.rebate_rate if rebate <= 0: return trade_utc = pd.Timestamp(timestamp, tz='UTC') trade_local = trade_utc.tz_convert('Asia/Shanghai') credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour) credit_utc = credit_local.tz_convert('UTC').tz_localize(None) self.pending_rebates.append({ 'credit_time': credit_utc, 'amount': rebate, 'trade_time': trade_utc.tz_localize(None), }) def apply_pending_rebates(self, current_time): """处理当前时刻前应到账的返佣""" if not self.pending_rebates: return remaining = [] for item in self.pending_rebates: if item['credit_time'] <= current_time: amount = item['amount'] self.capital += amount self.total_rebate_credited += amount else: remaining.append(item) self.pending_rebates = remaining def clear_delay_reversal(self): """清理延迟反转状态""" self.delay_reverse_price = None self.delay_reverse_type = None self.delay_reverse_kline_index = None def mark_delay_reversal(self, reverse_type, trigger_price, kline_index): """记录延迟反转触发信息""" self.delay_reverse_type = reverse_type self.delay_reverse_price = trigger_price self.delay_reverse_kline_index = kline_index def check_delay_reversal_signal(self, i, row, prev_row): """检查延迟反转是否成立""" if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None: return None offset = i - self.delay_reverse_kline_index if offset <= 0: return None high = row['high'] low = row['low'] if self.delay_reverse_type == 'long_to_short': # 多转空:回调确认 if offset == 1 and low <= self.delay_reverse_price: return 'short', self.delay_reverse_price, "次K回调确认" if offset >= 2 and prev_row is not None: prev_upper = prev_row['upper'] prev_touch_upper = pd.notna(prev_upper) and prev_row['high'] >= prev_upper if prev_touch_upper: if low <= prev_upper: return 'short', prev_upper, "上一根触上轨后回调确认" else: prev_body_low = min(prev_row['open'], prev_row['close']) if low <= prev_body_low: return 'short', prev_body_low, "跌破上一根实体确认" elif self.delay_reverse_type == 'short_to_long': # 空转多:反弹确认 if offset == 1 and high >= self.delay_reverse_price: return 'long', self.delay_reverse_price, "次K反弹确认" if offset >= 2 and prev_row is not None: prev_lower = prev_row['lower'] prev_touch_lower = pd.notna(prev_lower) and prev_row['low'] <= prev_lower if prev_touch_lower: if high >= prev_lower: return 'long', prev_lower, "上一根触下轨后反弹确认" else: prev_body_high = max(prev_row['open'], prev_row['close']) if high >= prev_body_high: return 'long', prev_body_high, "突破上一根实体确认" return None def open_position(self, price, direction, timestamp, reason): """开仓或加仓""" if self.position_count not in (0, 1): return False if self.position_count == 1: current_direction = 'long' if self.position > 0 else 'short' if direction != current_direction: return False margin = self.capital * self.margin_ratio if margin <= 0: return False position_size = margin * self.leverage / price fee = position_size * price * self.fee_rate required = margin + fee if self.capital < required: return False self.capital -= required self.schedule_rebate(fee, timestamp) if self.position_count == 0: self.position = position_size if direction == 'long' else -position_size self.entry_price = price self.total_margin = margin self.position_count = 1 else: old_size = abs(self.position) new_size = old_size + position_size old_value = old_size * self.entry_price new_value = position_size * price self.entry_price = (old_value + new_value) / new_size self.position = new_size if direction == 'long' else -new_size self.total_margin += margin self.position_count = 2 self.mid_closed_half = False self.trades.append({ 'timestamp': timestamp, 'action': f'开{direction}' if self.position_count == 1 else f'加{direction}', 'price': price, 'size': position_size, 'margin': margin, 'fee': fee, 'capital': self.capital, 'reason': reason }) return True def close_position(self, price, ratio, timestamp, reason): """平仓""" if self.position == 0: return False ratio = min(max(ratio, 0.0), 1.0) if ratio == 0: return False close_size = abs(self.position) * ratio if self.position > 0: pnl = close_size * (price - self.entry_price) else: pnl = close_size * (self.entry_price - price) fee = close_size * price * self.fee_rate released_margin = self.total_margin * ratio self.capital += released_margin + pnl - fee self.schedule_rebate(fee, timestamp) if ratio >= 0.999: self.position = 0 self.position_count = 0 self.total_margin = 0 self.entry_price = 0 self.mid_closed_half = False self.clear_delay_reversal() else: self.position *= (1 - ratio) self.total_margin *= (1 - ratio) self.trades.append({ 'timestamp': timestamp, 'action': f'平仓{int(ratio*100)}%', 'price': price, 'size': close_size, 'pnl': pnl, 'fee': fee, 'capital': self.capital, 'reason': reason }) return True def run_backtest(self, start_date, end_date): """运行回测""" # 重置状态 self.capital = self.initial_capital self.position = 0 self.position_count = 0 self.entry_price = 0 self.total_margin = 0 self.mid_closed_half = False self.trades = [] self.pending_rebates = [] self.total_rebate_credited = 0.0 self.clear_delay_reversal() logger.info(f"{'='*80}") logger.info(f"开始回测: {start_date} ~ {end_date} | 周期: {self.timeframe}") logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})") logger.info(f"{'='*80}") # 从数据库加载数据 start_dt = pd.Timestamp(start_date) end_dt = pd.Timestamp(end_date) if isinstance(end_date, str) and len(end_date) <= 10: end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1) start_ts = int(start_dt.timestamp() * 1000) end_ts = int(end_dt.timestamp() * 1000) # 根据周期选择数据表 model_mapping = { '1m': BitMartETH1M, '3m': BitMartETH3M, '5m': BitMartETH5M, '15m': BitMartETH15M, } Model = model_mapping.get(self.timeframe) if Model is None: logger.error(f"不支持的周期: {self.timeframe}") return None query = Model.select().where( (Model.id >= start_ts) & (Model.id <= end_ts) ).order_by(Model.id) data = [] for row in query: data.append({ 'timestamp': row.id, 'open': row.open, 'high': row.high, 'low': row.low, 'close': row.close }) if not data: logger.error("没有找到数据!") return None df = pd.DataFrame(data) df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms') logger.info(f"加载数据: {len(df)} 根K线") logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}") # 计算布林带 df = self.calculate_bollinger_bands(df) if len(df) <= self.bb_period + 1: logger.error("数据不足,无法执行回测") return None # 逐根K线回测 for i in range(self.bb_period, len(df)): row = df.iloc[i] prev_row = df.iloc[i-1] if i > 0 else None signal_dt = row['datetime'] signal_ts = signal_dt.strftime('%Y-%m-%d %H:%M') high = row['high'] low = row['low'] close = row['close'] upper = row['upper'] lower = row['lower'] middle = row['middle'] # 处理返佣到账 self.apply_pending_rebates(signal_dt) if pd.isna(upper) or pd.isna(lower) or pd.isna(middle): continue # 检查延迟反转确认 if self.delay_reverse_price is not None: reversal_signal = self.check_delay_reversal_signal(i, row, prev_row) if reversal_signal is not None and self.position != 0: new_direction, reversal_price, reason = reversal_signal self.close_position(reversal_price, 1.0, signal_ts, f"{reason}-平仓") self.open_position(reversal_price, new_direction, signal_ts, f"{reason}-开仓") self.mid_closed_half = False self.clear_delay_reversal() continue # 中轨平仓逻辑 if self.position != 0: if self.position > 0: # 回到开仓价全平+反手 if self.mid_closed_half and low <= self.entry_price: self.close_position(close, 1.0, signal_ts, "回开仓价全平") self.open_position(close, 'short', signal_ts, "回开仓价反手开空") self.mid_closed_half = False continue # 触中轨平半 if not self.mid_closed_half and low <= middle <= high: self.close_position(close, 0.5, signal_ts, "触中轨平50%") self.mid_closed_half = True continue else: # 空仓 # 回到开仓价全平+反手 if self.mid_closed_half and high >= self.entry_price: self.close_position(close, 1.0, signal_ts, "回开仓价全平") self.open_position(close, 'long', signal_ts, "回开仓价反手开多") self.mid_closed_half = False continue # 触中轨平半 if not self.mid_closed_half and low <= middle <= high: self.close_position(close, 0.5, signal_ts, "触中轨平50%") self.mid_closed_half = True continue # 开仓与加仓逻辑 if self.position == 0: self.clear_delay_reversal() # 触上轨开空 if high >= upper: self.open_position(upper, 'short', signal_ts, "触上轨开空") # 触下轨开多 elif low <= lower: self.open_position(lower, 'long', signal_ts, "触下轨开多") continue # 延迟反转触发 if self.position > 0 and high >= upper: self.mark_delay_reversal('long_to_short', upper, i) continue elif self.position < 0 and low <= lower: self.mark_delay_reversal('short_to_long', lower, i) continue # 加仓 if self.delay_reverse_price is None and self.position_count == 1: if self.position > 0 and low <= lower: self.open_position(lower, 'long', signal_ts, "触下轨加多") elif self.position < 0 and high >= upper: self.open_position(upper, 'short', signal_ts, "触上轨加空") # 回测末尾处理返佣 self.apply_pending_rebates(df.iloc[-1]['datetime']) # 最后平仓 if self.position != 0: final_price = df.iloc[-1]['close'] final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M') self.close_position(final_price, 1.0, final_time, "回测结束平仓") # 生成报告 return self.generate_report(df) def generate_report(self, df): """生成回测报告""" logger.info(f"\n{'='*80}") logger.info(f"回测报告 - {self.timeframe}") logger.info(f"{'='*80}") # 基本统计 total_trades = len([t for t in self.trades if '开' in t['action']]) win_trades = len([t for t in self.trades if '平' in t['action'] and t.get('pnl', 0) > 0]) loss_trades = len([t for t in self.trades if '平' in t['action'] and t.get('pnl', 0) < 0]) total_pnl = sum([t.get('pnl', 0) for t in self.trades]) total_fee = sum([t.get('fee', 0) for t in self.trades]) pending_rebate = sum([x['amount'] for x in self.pending_rebates]) realized_net_fee = total_fee - self.total_rebate_credited final_capital = self.capital roi = (final_capital - self.initial_capital) / self.initial_capital * 100 logger.info(f"初始资金: {self.initial_capital:.2f}U") logger.info(f"最终资金: {final_capital:.2f}U") logger.info(f"总盈亏: {total_pnl:.2f}U") logger.info(f"总手续费: {total_fee:.2f}U") logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U") logger.info(f"返佣待到账: {pending_rebate:.2f}U") logger.info(f"已实现净手续费: {realized_net_fee:.2f}U") logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U") logger.info(f"收益率: {roi:.2f}%") logger.info(f"总交易次数: {total_trades}") logger.info(f"盈利次数: {win_trades}") logger.info(f"亏损次数: {loss_trades}") if win_trades + loss_trades > 0: logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%") # 保存交易记录 trades_df = pd.DataFrame(self.trades) output_dir = Path(__file__).parent / 'backtest_outputs' / 'trades' output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f'bb_backtest_2025_{self.timeframe}_trades.csv' trades_df.to_csv(output_file, index=False, encoding='utf-8-sig') logger.info(f"\n交易记录已保存到: {output_file}") return { 'timeframe': self.timeframe, 'initial_capital': self.initial_capital, 'final_capital': final_capital, 'total_pnl': total_pnl, 'total_fee': total_fee, 'total_rebate_credited': self.total_rebate_credited, 'pending_rebate': pending_rebate, 'realized_net_fee': realized_net_fee, 'roi': roi, 'total_trades': total_trades, 'win_trades': win_trades, 'loss_trades': loss_trades, 'win_rate': win_trades/(win_trades+loss_trades)*100 if (win_trades+loss_trades) > 0 else 0, 'trades_file': str(output_file) } if __name__ == '__main__': # 连接数据库 db.connect(reuse_if_open=True) try: results = [] for tf in ['1m', '3m', '5m', '15m']: logger.info("\n" + "="*80) logger.info(f"测试 {tf} 周期") logger.info("="*80) backtest = BBDelayReversalBacktest(timeframe=tf) result = backtest.run_backtest('2025-01-01', '2025-12-31') if result: results.append(result) # 对比总结 if len(results) > 0: logger.info("\n" + "="*80) logger.info("回测对比总结") logger.info("="*80) for result in results: logger.info(f"\n【{result['timeframe']}周期】") logger.info(f" 收益率: {result['roi']:.2f}%") logger.info(f" 最终资金: {result['final_capital']:.2f}U") logger.info(f" 总交易次数: {result['total_trades']}") logger.info(f" 胜率: {result['win_rate']:.2f}%") logger.info(f" 净手续费: {result['realized_net_fee']:.2f}U") finally: db.close()