581 lines
21 KiB
Python
581 lines
21 KiB
Python
"""
|
||
布林带延迟反转策略回测 - 2025年多周期测试
|
||
测试1分钟和15分钟周期的收益对比
|
||
|
||
策略规则:
|
||
1. BB(10, 2.5)
|
||
2. 空仓触上轨开空,触下轨开多
|
||
3. 同向加仓最多1次,保证金1%
|
||
4. 延迟反转:触轨不立刻平仓,记录价格,回调到该价再平仓+反向开仓
|
||
5. 中轨平半+回开仓价全平
|
||
|
||
回测参数:
|
||
- 本金: 100U
|
||
- 杠杆: 100x
|
||
- 逐仓模式
|
||
- 开仓保证金: 1%
|
||
- 手续费: 0.05% (万五)
|
||
- 返佣: 90%,次日早上8点到账
|
||
"""
|
||
|
||
import pandas as pd
|
||
from pathlib import Path
|
||
from peewee import *
|
||
from loguru import logger
|
||
import numpy as np
|
||
|
||
# 数据库配置
|
||
DB_PATH = Path(__file__).parent / 'models' / 'database.db'
|
||
db = SqliteDatabase(str(DB_PATH))
|
||
|
||
|
||
class BitMartETH1M(Model):
|
||
"""1分钟K线模型"""
|
||
id = BigIntegerField(primary_key=True)
|
||
open = FloatField(null=True)
|
||
high = FloatField(null=True)
|
||
low = FloatField(null=True)
|
||
close = FloatField(null=True)
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'bitmart_eth_1m'
|
||
|
||
|
||
class BitMartETH3M(Model):
|
||
"""3分钟K线模型"""
|
||
id = BigIntegerField(primary_key=True)
|
||
open = FloatField(null=True)
|
||
high = FloatField(null=True)
|
||
low = FloatField(null=True)
|
||
close = FloatField(null=True)
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'bitmart_eth_3m'
|
||
|
||
|
||
class BitMartETH5M(Model):
|
||
"""5分钟K线模型"""
|
||
id = BigIntegerField(primary_key=True)
|
||
open = FloatField(null=True)
|
||
high = FloatField(null=True)
|
||
low = FloatField(null=True)
|
||
close = FloatField(null=True)
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'bitmart_eth_5m'
|
||
|
||
|
||
class BitMartETH15M(Model):
|
||
"""15分钟K线模型"""
|
||
id = BigIntegerField(primary_key=True)
|
||
open = FloatField(null=True)
|
||
high = FloatField(null=True)
|
||
low = FloatField(null=True)
|
||
close = FloatField(null=True)
|
||
|
||
class Meta:
|
||
database = db
|
||
table_name = 'bitmart_eth_15m'
|
||
|
||
|
||
class BBDelayReversalBacktest:
|
||
"""布林带延迟反转策略回测"""
|
||
|
||
def __init__(self, timeframe='1m'):
|
||
# 策略参数
|
||
self.timeframe = timeframe
|
||
self.bb_period = 10
|
||
self.bb_std = 2.5
|
||
self.initial_capital = 100 # 初始本金100U
|
||
self.leverage = 100 # 100倍杠杆
|
||
self.fee_rate = 0.0005 # 万五手续费
|
||
self.rebate_rate = 0.9 # 90%返佣
|
||
self.margin_ratio = 0.01 # 开仓保证金1%
|
||
self.rebate_credit_hour = 8 # 次日早上8点返佣到账
|
||
|
||
# 账户状态
|
||
self.capital = self.initial_capital
|
||
self.position = 0 # 持仓量(正=多,负=空)
|
||
self.position_count = 0 # 持仓次数(0=空仓,1=首次,2=加仓)
|
||
self.entry_price = 0 # 开仓均价
|
||
self.total_margin = 0 # 总保证金
|
||
|
||
# 延迟反转状态
|
||
self.delay_reverse_price = None
|
||
self.delay_reverse_type = None
|
||
self.delay_reverse_kline_index = None
|
||
|
||
# 中轨平仓记录
|
||
self.mid_closed_half = False
|
||
|
||
# 交易记录
|
||
self.trades = []
|
||
self.pending_rebates = []
|
||
self.total_rebate_credited = 0.0
|
||
|
||
def calculate_bollinger_bands(self, df):
|
||
"""计算布林带(整体右移1根,避免使用当前K收盘价)"""
|
||
df['sma'] = df['close'].rolling(window=self.bb_period).mean()
|
||
df['std'] = df['close'].rolling(window=self.bb_period).std()
|
||
df['upper'] = (df['sma'] + self.bb_std * df['std']).shift(1)
|
||
df['lower'] = (df['sma'] - self.bb_std * df['std']).shift(1)
|
||
df['middle'] = df['sma'].shift(1)
|
||
return df
|
||
|
||
def schedule_rebate(self, fee, timestamp):
|
||
"""登记返佣到账时间(次日08:00,上海时间)"""
|
||
rebate = fee * self.rebate_rate
|
||
if rebate <= 0:
|
||
return
|
||
|
||
trade_utc = pd.Timestamp(timestamp, tz='UTC')
|
||
trade_local = trade_utc.tz_convert('Asia/Shanghai')
|
||
credit_local = (trade_local + pd.Timedelta(days=1)).normalize() + pd.Timedelta(hours=self.rebate_credit_hour)
|
||
credit_utc = credit_local.tz_convert('UTC').tz_localize(None)
|
||
|
||
self.pending_rebates.append({
|
||
'credit_time': credit_utc,
|
||
'amount': rebate,
|
||
'trade_time': trade_utc.tz_localize(None),
|
||
})
|
||
|
||
def apply_pending_rebates(self, current_time):
|
||
"""处理当前时刻前应到账的返佣"""
|
||
if not self.pending_rebates:
|
||
return
|
||
|
||
remaining = []
|
||
for item in self.pending_rebates:
|
||
if item['credit_time'] <= current_time:
|
||
amount = item['amount']
|
||
self.capital += amount
|
||
self.total_rebate_credited += amount
|
||
else:
|
||
remaining.append(item)
|
||
|
||
self.pending_rebates = remaining
|
||
|
||
def clear_delay_reversal(self):
|
||
"""清理延迟反转状态"""
|
||
self.delay_reverse_price = None
|
||
self.delay_reverse_type = None
|
||
self.delay_reverse_kline_index = None
|
||
|
||
def mark_delay_reversal(self, reverse_type, trigger_price, kline_index):
|
||
"""记录延迟反转触发信息"""
|
||
self.delay_reverse_type = reverse_type
|
||
self.delay_reverse_price = trigger_price
|
||
self.delay_reverse_kline_index = kline_index
|
||
|
||
def check_delay_reversal_signal(self, i, row, prev_row):
|
||
"""检查延迟反转是否成立"""
|
||
if self.position == 0 or self.delay_reverse_price is None or self.delay_reverse_kline_index is None:
|
||
return None
|
||
|
||
offset = i - self.delay_reverse_kline_index
|
||
if offset <= 0:
|
||
return None
|
||
|
||
high = row['high']
|
||
low = row['low']
|
||
|
||
if self.delay_reverse_type == 'long_to_short':
|
||
# 多转空:回调确认
|
||
if offset == 1 and low <= self.delay_reverse_price:
|
||
return 'short', self.delay_reverse_price, "次K回调确认"
|
||
|
||
if offset >= 2 and prev_row is not None:
|
||
prev_upper = prev_row['upper']
|
||
prev_touch_upper = pd.notna(prev_upper) and prev_row['high'] >= prev_upper
|
||
if prev_touch_upper:
|
||
if low <= prev_upper:
|
||
return 'short', prev_upper, "上一根触上轨后回调确认"
|
||
else:
|
||
prev_body_low = min(prev_row['open'], prev_row['close'])
|
||
if low <= prev_body_low:
|
||
return 'short', prev_body_low, "跌破上一根实体确认"
|
||
|
||
elif self.delay_reverse_type == 'short_to_long':
|
||
# 空转多:反弹确认
|
||
if offset == 1 and high >= self.delay_reverse_price:
|
||
return 'long', self.delay_reverse_price, "次K反弹确认"
|
||
|
||
if offset >= 2 and prev_row is not None:
|
||
prev_lower = prev_row['lower']
|
||
prev_touch_lower = pd.notna(prev_lower) and prev_row['low'] <= prev_lower
|
||
if prev_touch_lower:
|
||
if high >= prev_lower:
|
||
return 'long', prev_lower, "上一根触下轨后反弹确认"
|
||
else:
|
||
prev_body_high = max(prev_row['open'], prev_row['close'])
|
||
if high >= prev_body_high:
|
||
return 'long', prev_body_high, "突破上一根实体确认"
|
||
|
||
return None
|
||
|
||
def open_position(self, price, direction, timestamp, reason):
|
||
"""开仓或加仓"""
|
||
if self.position_count not in (0, 1):
|
||
return False
|
||
|
||
if self.position_count == 1:
|
||
current_direction = 'long' if self.position > 0 else 'short'
|
||
if direction != current_direction:
|
||
return False
|
||
|
||
margin = self.capital * self.margin_ratio
|
||
if margin <= 0:
|
||
return False
|
||
|
||
position_size = margin * self.leverage / price
|
||
fee = position_size * price * self.fee_rate
|
||
required = margin + fee
|
||
if self.capital < required:
|
||
return False
|
||
|
||
self.capital -= required
|
||
self.schedule_rebate(fee, timestamp)
|
||
|
||
if self.position_count == 0:
|
||
self.position = position_size if direction == 'long' else -position_size
|
||
self.entry_price = price
|
||
self.total_margin = margin
|
||
self.position_count = 1
|
||
else:
|
||
old_size = abs(self.position)
|
||
new_size = old_size + position_size
|
||
old_value = old_size * self.entry_price
|
||
new_value = position_size * price
|
||
self.entry_price = (old_value + new_value) / new_size
|
||
self.position = new_size if direction == 'long' else -new_size
|
||
self.total_margin += margin
|
||
self.position_count = 2
|
||
|
||
self.mid_closed_half = False
|
||
|
||
self.trades.append({
|
||
'timestamp': timestamp,
|
||
'action': f'开{direction}' if self.position_count == 1 else f'加{direction}',
|
||
'price': price,
|
||
'size': position_size,
|
||
'margin': margin,
|
||
'fee': fee,
|
||
'capital': self.capital,
|
||
'reason': reason
|
||
})
|
||
|
||
return True
|
||
|
||
def close_position(self, price, ratio, timestamp, reason):
|
||
"""平仓"""
|
||
if self.position == 0:
|
||
return False
|
||
|
||
ratio = min(max(ratio, 0.0), 1.0)
|
||
if ratio == 0:
|
||
return False
|
||
|
||
close_size = abs(self.position) * ratio
|
||
if self.position > 0:
|
||
pnl = close_size * (price - self.entry_price)
|
||
else:
|
||
pnl = close_size * (self.entry_price - price)
|
||
|
||
fee = close_size * price * self.fee_rate
|
||
|
||
released_margin = self.total_margin * ratio
|
||
self.capital += released_margin + pnl - fee
|
||
self.schedule_rebate(fee, timestamp)
|
||
|
||
if ratio >= 0.999:
|
||
self.position = 0
|
||
self.position_count = 0
|
||
self.total_margin = 0
|
||
self.entry_price = 0
|
||
self.mid_closed_half = False
|
||
self.clear_delay_reversal()
|
||
else:
|
||
self.position *= (1 - ratio)
|
||
self.total_margin *= (1 - ratio)
|
||
|
||
self.trades.append({
|
||
'timestamp': timestamp,
|
||
'action': f'平仓{int(ratio*100)}%',
|
||
'price': price,
|
||
'size': close_size,
|
||
'pnl': pnl,
|
||
'fee': fee,
|
||
'capital': self.capital,
|
||
'reason': reason
|
||
})
|
||
|
||
return True
|
||
|
||
def run_backtest(self, start_date, end_date):
|
||
"""运行回测"""
|
||
# 重置状态
|
||
self.capital = self.initial_capital
|
||
self.position = 0
|
||
self.position_count = 0
|
||
self.entry_price = 0
|
||
self.total_margin = 0
|
||
self.mid_closed_half = False
|
||
self.trades = []
|
||
self.pending_rebates = []
|
||
self.total_rebate_credited = 0.0
|
||
self.clear_delay_reversal()
|
||
|
||
logger.info(f"{'='*80}")
|
||
logger.info(f"开始回测: {start_date} ~ {end_date} | 周期: {self.timeframe}")
|
||
logger.info(f"初始资金: {self.initial_capital}U | 杠杆: {self.leverage}x | BB({self.bb_period}, {self.bb_std})")
|
||
logger.info(f"{'='*80}")
|
||
|
||
# 从数据库加载数据
|
||
start_dt = pd.Timestamp(start_date)
|
||
end_dt = pd.Timestamp(end_date)
|
||
if isinstance(end_date, str) and len(end_date) <= 10:
|
||
end_dt = end_dt + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
|
||
|
||
start_ts = int(start_dt.timestamp() * 1000)
|
||
end_ts = int(end_dt.timestamp() * 1000)
|
||
|
||
# 根据周期选择数据表
|
||
model_mapping = {
|
||
'1m': BitMartETH1M,
|
||
'3m': BitMartETH3M,
|
||
'5m': BitMartETH5M,
|
||
'15m': BitMartETH15M,
|
||
}
|
||
Model = model_mapping.get(self.timeframe)
|
||
if Model is None:
|
||
logger.error(f"不支持的周期: {self.timeframe}")
|
||
return None
|
||
|
||
query = Model.select().where(
|
||
(Model.id >= start_ts) & (Model.id <= end_ts)
|
||
).order_by(Model.id)
|
||
|
||
data = []
|
||
for row in query:
|
||
data.append({
|
||
'timestamp': row.id,
|
||
'open': row.open,
|
||
'high': row.high,
|
||
'low': row.low,
|
||
'close': row.close
|
||
})
|
||
|
||
if not data:
|
||
logger.error("没有找到数据!")
|
||
return None
|
||
|
||
df = pd.DataFrame(data)
|
||
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
|
||
|
||
logger.info(f"加载数据: {len(df)} 根K线")
|
||
logger.info(f"时间范围: {df['datetime'].min()} ~ {df['datetime'].max()}")
|
||
|
||
# 计算布林带
|
||
df = self.calculate_bollinger_bands(df)
|
||
|
||
if len(df) <= self.bb_period + 1:
|
||
logger.error("数据不足,无法执行回测")
|
||
return None
|
||
|
||
# 逐根K线回测
|
||
for i in range(self.bb_period, len(df)):
|
||
row = df.iloc[i]
|
||
prev_row = df.iloc[i-1] if i > 0 else None
|
||
|
||
signal_dt = row['datetime']
|
||
signal_ts = signal_dt.strftime('%Y-%m-%d %H:%M')
|
||
|
||
high = row['high']
|
||
low = row['low']
|
||
close = row['close']
|
||
upper = row['upper']
|
||
lower = row['lower']
|
||
middle = row['middle']
|
||
|
||
# 处理返佣到账
|
||
self.apply_pending_rebates(signal_dt)
|
||
|
||
if pd.isna(upper) or pd.isna(lower) or pd.isna(middle):
|
||
continue
|
||
|
||
# 检查延迟反转确认
|
||
if self.delay_reverse_price is not None:
|
||
reversal_signal = self.check_delay_reversal_signal(i, row, prev_row)
|
||
if reversal_signal is not None and self.position != 0:
|
||
new_direction, reversal_price, reason = reversal_signal
|
||
self.close_position(reversal_price, 1.0, signal_ts, f"{reason}-平仓")
|
||
self.open_position(reversal_price, new_direction, signal_ts, f"{reason}-开仓")
|
||
self.mid_closed_half = False
|
||
self.clear_delay_reversal()
|
||
continue
|
||
|
||
# 中轨平仓逻辑
|
||
if self.position != 0:
|
||
if self.position > 0:
|
||
# 回到开仓价全平+反手
|
||
if self.mid_closed_half and low <= self.entry_price:
|
||
self.close_position(close, 1.0, signal_ts, "回开仓价全平")
|
||
self.open_position(close, 'short', signal_ts, "回开仓价反手开空")
|
||
self.mid_closed_half = False
|
||
continue
|
||
# 触中轨平半
|
||
if not self.mid_closed_half and low <= middle <= high:
|
||
self.close_position(close, 0.5, signal_ts, "触中轨平50%")
|
||
self.mid_closed_half = True
|
||
continue
|
||
|
||
else: # 空仓
|
||
# 回到开仓价全平+反手
|
||
if self.mid_closed_half and high >= self.entry_price:
|
||
self.close_position(close, 1.0, signal_ts, "回开仓价全平")
|
||
self.open_position(close, 'long', signal_ts, "回开仓价反手开多")
|
||
self.mid_closed_half = False
|
||
continue
|
||
# 触中轨平半
|
||
if not self.mid_closed_half and low <= middle <= high:
|
||
self.close_position(close, 0.5, signal_ts, "触中轨平50%")
|
||
self.mid_closed_half = True
|
||
continue
|
||
|
||
# 开仓与加仓逻辑
|
||
if self.position == 0:
|
||
self.clear_delay_reversal()
|
||
|
||
# 触上轨开空
|
||
if high >= upper:
|
||
self.open_position(upper, 'short', signal_ts, "触上轨开空")
|
||
|
||
# 触下轨开多
|
||
elif low <= lower:
|
||
self.open_position(lower, 'long', signal_ts, "触下轨开多")
|
||
continue
|
||
|
||
# 延迟反转触发
|
||
if self.position > 0 and high >= upper:
|
||
self.mark_delay_reversal('long_to_short', upper, i)
|
||
continue
|
||
|
||
elif self.position < 0 and low <= lower:
|
||
self.mark_delay_reversal('short_to_long', lower, i)
|
||
continue
|
||
|
||
# 加仓
|
||
if self.delay_reverse_price is None and self.position_count == 1:
|
||
if self.position > 0 and low <= lower:
|
||
self.open_position(lower, 'long', signal_ts, "触下轨加多")
|
||
elif self.position < 0 and high >= upper:
|
||
self.open_position(upper, 'short', signal_ts, "触上轨加空")
|
||
|
||
# 回测末尾处理返佣
|
||
self.apply_pending_rebates(df.iloc[-1]['datetime'])
|
||
|
||
# 最后平仓
|
||
if self.position != 0:
|
||
final_price = df.iloc[-1]['close']
|
||
final_time = df.iloc[-1]['datetime'].strftime('%Y-%m-%d %H:%M')
|
||
self.close_position(final_price, 1.0, final_time, "回测结束平仓")
|
||
|
||
# 生成报告
|
||
return self.generate_report(df)
|
||
|
||
def generate_report(self, df):
|
||
"""生成回测报告"""
|
||
logger.info(f"\n{'='*80}")
|
||
logger.info(f"回测报告 - {self.timeframe}")
|
||
logger.info(f"{'='*80}")
|
||
|
||
# 基本统计
|
||
total_trades = len([t for t in self.trades if '开' in t['action']])
|
||
win_trades = len([t for t in self.trades if '平' in t['action'] and t.get('pnl', 0) > 0])
|
||
loss_trades = len([t for t in self.trades if '平' in t['action'] and t.get('pnl', 0) < 0])
|
||
|
||
total_pnl = sum([t.get('pnl', 0) for t in self.trades])
|
||
total_fee = sum([t.get('fee', 0) for t in self.trades])
|
||
pending_rebate = sum([x['amount'] for x in self.pending_rebates])
|
||
realized_net_fee = total_fee - self.total_rebate_credited
|
||
|
||
final_capital = self.capital
|
||
roi = (final_capital - self.initial_capital) / self.initial_capital * 100
|
||
|
||
logger.info(f"初始资金: {self.initial_capital:.2f}U")
|
||
logger.info(f"最终资金: {final_capital:.2f}U")
|
||
logger.info(f"总盈亏: {total_pnl:.2f}U")
|
||
logger.info(f"总手续费: {total_fee:.2f}U")
|
||
logger.info(f"返佣已到账: {self.total_rebate_credited:.2f}U")
|
||
logger.info(f"返佣待到账: {pending_rebate:.2f}U")
|
||
logger.info(f"已实现净手续费: {realized_net_fee:.2f}U")
|
||
logger.info(f"净收益: {final_capital - self.initial_capital:.2f}U")
|
||
logger.info(f"收益率: {roi:.2f}%")
|
||
logger.info(f"总交易次数: {total_trades}")
|
||
logger.info(f"盈利次数: {win_trades}")
|
||
logger.info(f"亏损次数: {loss_trades}")
|
||
if win_trades + loss_trades > 0:
|
||
logger.info(f"胜率: {win_trades/(win_trades+loss_trades)*100:.2f}%")
|
||
|
||
# 保存交易记录
|
||
trades_df = pd.DataFrame(self.trades)
|
||
output_dir = Path(__file__).parent / 'backtest_outputs' / 'trades'
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_file = output_dir / f'bb_backtest_2025_{self.timeframe}_trades.csv'
|
||
trades_df.to_csv(output_file, index=False, encoding='utf-8-sig')
|
||
logger.info(f"\n交易记录已保存到: {output_file}")
|
||
|
||
return {
|
||
'timeframe': self.timeframe,
|
||
'initial_capital': self.initial_capital,
|
||
'final_capital': final_capital,
|
||
'total_pnl': total_pnl,
|
||
'total_fee': total_fee,
|
||
'total_rebate_credited': self.total_rebate_credited,
|
||
'pending_rebate': pending_rebate,
|
||
'realized_net_fee': realized_net_fee,
|
||
'roi': roi,
|
||
'total_trades': total_trades,
|
||
'win_trades': win_trades,
|
||
'loss_trades': loss_trades,
|
||
'win_rate': win_trades/(win_trades+loss_trades)*100 if (win_trades+loss_trades) > 0 else 0,
|
||
'trades_file': str(output_file)
|
||
}
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 连接数据库
|
||
db.connect(reuse_if_open=True)
|
||
|
||
try:
|
||
results = []
|
||
|
||
for tf in ['1m', '3m', '5m', '15m']:
|
||
logger.info("\n" + "="*80)
|
||
logger.info(f"测试 {tf} 周期")
|
||
logger.info("="*80)
|
||
backtest = BBDelayReversalBacktest(timeframe=tf)
|
||
result = backtest.run_backtest('2025-01-01', '2025-12-31')
|
||
if result:
|
||
results.append(result)
|
||
|
||
# 对比总结
|
||
if len(results) > 0:
|
||
logger.info("\n" + "="*80)
|
||
logger.info("回测对比总结")
|
||
logger.info("="*80)
|
||
|
||
for result in results:
|
||
logger.info(f"\n【{result['timeframe']}周期】")
|
||
logger.info(f" 收益率: {result['roi']:.2f}%")
|
||
logger.info(f" 最终资金: {result['final_capital']:.2f}U")
|
||
logger.info(f" 总交易次数: {result['total_trades']}")
|
||
logger.info(f" 胜率: {result['win_rate']:.2f}%")
|
||
logger.info(f" 净手续费: {result['realized_net_fee']:.2f}U")
|
||
|
||
finally:
|
||
db.close()
|