加入精准回测数据

This commit is contained in:
27942
2026-02-01 22:16:05 +08:00
parent b5d4f6dfdc
commit f25c725eda
2 changed files with 51076 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,554 @@
"""
BitMart 三分之一回归策略回测(精准版)
使用5分钟K线周期计算触发价格1分钟K线判断触发顺序
========== 策略规则 ==========
1. 触发价格计算基于有效的前一根K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3
2. 信号触发条件:
- 当前K线最高价 >= 做多触发价格 → 做多信号
- 当前K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根5分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根5分钟K线同时触及做多和做空价格时
- 使用该5分钟K线对应的5根1分钟K线来判断哪个方向先被触发
- 这样可以更精准地还原真实交易场景
"""
import datetime
import calendar
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger
from peewee import *
# 数据库配置
DB_PATH = Path(__file__).parent.parent / 'models' / 'database.db'
db = SqliteDatabase(str(DB_PATH))
# ========================= 数据库模型 =========================
class BitMartETH1m(Model):
"""1分钟K线模型"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_1m'
class BitMartETH5m(Model):
"""5分钟K线模型"""
id = BigIntegerField(primary_key=True) # 时间戳(毫秒级)
open = FloatField(null=True)
high = FloatField(null=True)
low = FloatField(null=True)
close = FloatField(null=True)
class Meta:
database = db
table_name = 'bitmart_eth_5m'
# 连接数据库
db.connect(reuse_if_open=True)
# ========================= 工具函数 =========================
def is_bullish(c):
"""判断阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""判断阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""计算K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data, current_idx, min_body_size=0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
body_size = get_body_size(prev)
if body_size >= min_body_size:
return i, prev
return None, None
def get_one_third_levels(prev):
"""
计算前一根K线实体的 1/3 双向触发价格
返回:(做多触发价格, 做空触发价格)
基于收盘价计算(无论阴线阳线):
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3实体
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3实体
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001: # 十字星,忽略
return None, None
# 基于收盘价的双向触发价格
long_trigger = p_close + body / 3
short_trigger = p_close - body / 3
return long_trigger, short_trigger
def get_5m_data_by_date(date_str: str) -> List[Dict]:
"""按天获取5分钟K线数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = BitMartETH5m.select().where(
BitMartETH5m.id.between(start_ts, end_ts)
).order_by(BitMartETH5m.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
return data
def get_1m_data_by_range(start_ts: int, end_ts: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据
:param start_ts: 开始时间戳(毫秒)
:param end_ts: 结束时间戳(毫秒)
:return: 1分钟K线数据列表
"""
query = BitMartETH1m.select().where(
BitMartETH1m.id.between(start_ts, end_ts - 1)
).order_by(BitMartETH1m.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
return data
def get_1m_data_for_5m_bar(bar_5m: Dict) -> List[Dict]:
"""
获取5分钟K线对应的5根1分钟K线
:param bar_5m: 5分钟K线数据
:return: 对应的1分钟K线数据列表最多5根
"""
start_ts = bar_5m['id']
end_ts = start_ts + 5 * 60 * 1000 # 5分钟后
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> str:
"""
使用1分钟K线精确判断在5分钟周期内是先触发做多还是做空
:param bars_1m: 5根1分钟K线数据
:param long_trigger: 做多触发价格
:param short_trigger: 做空触发价格
:return: 'long', 'short', 或 None
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
# 如果只触发了一个方向
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
# 如果两个方向都触发了在同一根1分钟K线内
if long_triggered and short_triggered:
# 根据开盘价判断:
# 如果开盘价更接近做空触发价,说明先往下走,先触发做空
# 如果开盘价更接近做多触发价,说明先往上走,先触发做多
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
if dist_to_short < dist_to_long:
return 'short'
else:
return 'long'
return None
def check_trigger_with_1m(
all_data_5m: List[Dict],
current_idx: int,
min_body_size: float = 0.1
) -> tuple:
"""
检查当前5分钟K线是否触发了交易信号
如果同时触发两个方向使用1分钟K线精确判断顺序
返回:(方向, 触发价格, 有效前一根K线索引, 1分钟数据是否使用)
"""
if current_idx <= 0:
return None, None, None, False
curr = all_data_5m[current_idx]
# 查找实体>=min_body_size的前一根K线
valid_prev_idx, prev = find_valid_prev_bar(all_data_5m, current_idx, min_body_size)
if prev is None:
return None, None, None, False
long_trigger, short_trigger = get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, False
c_high = float(curr['high'])
c_low = float(curr['low'])
# 检测是否触发
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
# 如果两个方向都触发使用1分钟K线精确判断
if long_triggered and short_triggered:
bars_1m = get_1m_data_for_5m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
return direction, trigger_price, valid_prev_idx, True
# 如果没有1分钟数据使用开盘价距离判断
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False
else:
return 'long', long_trigger, valid_prev_idx, False
if short_triggered:
return 'short', short_trigger, valid_prev_idx, False
if long_triggered:
return 'long', long_trigger, valid_prev_idx, False
return None, None, None, False
# ========================= 回测逻辑 =========================
def backtest_one_third_strategy(dates: List[str], min_body_size: float = 0.1):
"""
三分之一策略回测(精准版)
:param dates: 日期列表
:param min_body_size: 最小实体大小
:return: (trades, stats)
"""
# 获取所有5分钟K线数据
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_5m_data_by_date(d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条5分钟K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查数据库")
return [], {'long': {'count': 0, 'wins': 0, 'total_profit': 0.0},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0}}
# 按时间戳排序
all_data.sort(key=lambda x: x['id'])
# 验证排序结果
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据范围:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
trades: List[Dict] = []
current_position: Optional[Dict] = None
# 统计使用1分钟数据精准判断的次数
precise_count = 0
fallback_count = 0
idx = 1
while idx < len(all_data):
curr = all_data[idx]
# 检测信号使用1分钟K线精准判断
direction, trigger_price, valid_prev_idx, used_1m = check_trigger_with_1m(
all_data, idx, min_body_size
)
if used_1m:
precise_count += 1
elif direction:
fallback_count += 1
# 无持仓 -> 开仓
if current_position is None:
if direction:
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar_idx': idx
}
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 开仓{direction} @ {trigger_price:.2f}")
idx += 1
continue
# 有持仓 -> 检查是否需要反向
pos_dir = current_position['direction']
if direction and direction != pos_dir:
# 反向信号,平仓并反手
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
# 反手开仓
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar_idx': idx
}
stats[direction]['count'] += 1
time_str = datetime.datetime.fromtimestamp(curr['id'] / 1000).strftime('%Y-%m-%d %H:%M')
logger.debug(f"[{time_str}] 平{pos_dir}反手{direction} @ {trigger_price:.2f} 盈亏={diff:.2f}")
idx += 1
# 尾仓处理最后一根K线收盘价平仓
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {precise_count} 次,使用开盘价距离判断 {fallback_count}")
return trades, stats
# ========================= 运行回测 =========================
if __name__ == '__main__':
# ==================== 配置参数 ====================
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
MIN_BODY_SIZE = 0.1 # 最小实体大小
# ==================== 生成查询日期列表 ====================
dates = []
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"回测日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e}")
exit(1)
# ==================== 执行回测 ====================
trades, stats = backtest_one_third_strategy(dates, MIN_BODY_SIZE)
# ==================== 输出交易详情 ====================
logger.info("===== 每笔交易详情 =====")
# 参数设定
contract_size = 10000 # 合约规模
open_fee_fixed = 5 # 固定开仓手续费
close_fee_rate = 0.0005 # 平仓手续费率
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_price = t['exit']
direction = t['direction']
# 原始价差
point_diff = t['diff']
# 金额盈利
money_profit = point_diff / entry * contract_size
# 手续费
fee = open_fee_fixed + (contract_size / entry * exit_price * close_fee_rate)
# 净利润
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction} "
f"入={entry:.2f} 出={exit_price:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
# ==================== 汇总统计 ====================
total_net_profit = total_money_profit - total_fee
print(f"\n{'='*60}")
print(f"【BitMart 三分之一策略回测结果5分钟K线 + 1分钟精准判断")
print(f"{'='*60}")
print(f"回测周期:{START_DATE}{END_DATE}")
print(f"最小实体要求:{MIN_BODY_SIZE}")
print(f"{'='*60}")
print(f"总交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}")
print(f"{'='*60}")
print("\n===== 方向统计 =====")
for k, v in stats.items():
name = v['name']
count = v['count']
wins = v['wins']
total_p = v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
# 保存交易记录到CSV
if trades:
import csv
csv_path = Path(__file__).parent / 'backtest_one_third_trades.csv'
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[
'entry_time', 'exit_time', 'direction', 'entry', 'exit',
'point_diff', 'raw_profit', 'fee', 'net_profit'
])
writer.writeheader()
for t in trades:
writer.writerow({
'entry_time': t['entry_time'],
'exit_time': t['exit_time'],
'direction': t['direction'],
'entry': t['entry'],
'exit': t['exit'],
'point_diff': t['point_diff'],
'raw_profit': t['raw_profit'],
'fee': t['fee'],
'net_profit': t['net_profit']
})
print(f"\n交易记录已保存到:{csv_path}")