代码结构优化

This commit is contained in:
ddrwode
2026-02-02 17:03:49 +08:00
parent a938495d40
commit 71c025b627
2 changed files with 836 additions and 0 deletions

View File

@@ -0,0 +1,418 @@
"""
量化交易回测系统 - 三分之一策略5分钟K线 + 1分钟精准判断
========== 策略规则(与 交易/bitmart-三分之一策略交易.py 一致)==========
1. 触发价格计算基于有效的前一根5分钟K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/3从收盘价往上涨1/3
- 做空触发价格 = 收盘价 - 实体/3从收盘价往下跌1/3
2. 信号触发条件:
- 当前5分钟K线最高价 >= 做多触发价格 → 做多信号
- 当前5分钟K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根5分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根5分钟K线同时触及做多和做空价格时
- 使用该5分钟K线对应的5根1分钟K线来判断哪个方向先被触发
- 使回测更贴近真实成交顺序
"""
import datetime
import calendar
from typing import List, Dict, Optional
from loguru import logger
from models.bitmart_klines import BitMartETH5M, BitMartETH1M
# ========================= 工具函数 =========================
def is_bullish(c):
"""阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if get_body_size(prev) >= min_body_size:
return i, prev
return None, None
def get_one_third_levels(prev: Dict):
"""
计算前一根K线实体的 1/3 双向触发价格
返回:(做多触发价格, 做空触发价格)
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = p_close + body / 3
short_trigger = p_close - body / 3
return long_trigger, short_trigger
def get_1m_data_by_range(start_ts_ms: int, end_ts_ms: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据毫秒时间戳
"""
query = BitMartETH1M.select().where(
BitMartETH1M.id.between(start_ts_ms, end_ts_ms - 1)
).order_by(BitMartETH1M.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
def get_1m_data_for_5m_bar(bar_5m: Dict) -> List[Dict]:
"""获取一根5分钟K线对应的5根1分钟K线"""
start_ts = bar_5m['id']
end_ts = start_ts + 5 * 60 * 1000
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> Optional[str]:
"""
使用1分钟K线判断在一根5分钟周期内先触发做多还是做空。
按时间顺序遍历每根1分钟K线先触及哪个方向则返回该方向
若同一根1分钟K线内两个方向都触及用开盘价距离判断。
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
if long_triggered and short_triggered:
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
return 'short' if dist_to_short <= dist_to_long else 'long'
return None
def check_trigger_with_1m(
all_data_5m: List[Dict],
current_idx: int,
min_body_size: float = 0.1
) -> tuple:
"""
检查当前5分钟K线是否触发交易信号。
若同时触发多空则用该5分钟内的1分钟K线判断先后顺序。
返回:(方向, 触发价格, 有效前一根K线索引, 是否使用了1分钟精准判断, 是否双触)
"""
if current_idx <= 0:
return None, None, None, False, False
curr = all_data_5m[current_idx]
valid_prev_idx, prev = find_valid_prev_bar(all_data_5m, current_idx, min_body_size)
if prev is None:
return None, None, None, False, False
long_trigger, short_trigger = get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, False, False
c_high = float(curr['high'])
c_low = float(curr['low'])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
both_triggered = long_triggered and short_triggered
if both_triggered:
bars_1m = get_1m_data_for_5m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
return direction, trigger_price, valid_prev_idx, True, True
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False, True
return 'long', long_trigger, valid_prev_idx, False, True
if short_triggered:
return 'short', short_trigger, valid_prev_idx, False, False
if long_triggered:
return 'long', long_trigger, valid_prev_idx, False, False
return None, None, None, False, False
def get_data_by_date(model, date_str: str) -> List[Dict]:
"""
按天获取指定表的K线数据毫秒时间戳
返回格式id(ms), open, high, low, close已按 id 升序。
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_third_precise(dates: List[str], min_body_size: float = 0.1):
"""
三分之一策略回测5分钟K线 + 1分钟精准判断
风格与 回测数据-30分钟版 一致:按日期拉取、合并排序、统计与输出。
"""
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_data_by_date(BitMartETH5M, d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条5分钟K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查:")
logger.warning("1. 数据库连接与 bitmart_eth_5m / bitmart_eth_1m 表是否存在")
logger.warning("2. 是否已用 抓取多周期K线.py 抓取过 1 分钟和 5 分钟数据")
return [], {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}, {'precise_1m_count': 0, 'fallback_count': 0}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
extra = {'precise_1m_count': 0, 'fallback_count': 0}
trades: List[Dict] = []
current_position: Optional[Dict] = None
last_trade_bar: Optional[int] = None
for idx in range(1, len(all_data)):
curr = all_data[idx]
direction, trigger_price, valid_prev_idx, used_1m, both_triggered = check_trigger_with_1m(all_data, idx, min_body_size)
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
if used_1m:
extra['precise_1m_count'] += 1
elif both_triggered and direction:
extra['fallback_count'] += 1
if direction is None:
continue
if last_trade_bar == idx:
continue
if current_position is None:
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
continue
pos_dir = current_position['direction']
if direction == pos_dir:
continue
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'signal': '三分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'signal': '三分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {extra['precise_1m_count']} 次,使用开盘价距离判断 {extra['fallback_count']}")
return trades, stats, extra
# ========================= 运行示例(与 回测数据-30分钟版 风格一致)=========================
if __name__ == '__main__':
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
START_YEAR = None
START_MONTH = None
END_YEAR = None
END_MONTH = None
dates = []
if START_DATE and END_DATE:
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"使用指定日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式")
exit(1)
elif START_YEAR and END_YEAR:
start_m = START_MONTH if START_MONTH else 1
end_m = END_MONTH if END_MONTH else 12
for year in range(START_YEAR, END_YEAR + 1):
month_start = start_m if year == START_YEAR else 1
month_end = end_m if year == END_YEAR else 12
for month in range(month_start, month_end + 1):
days_in_month = calendar.monthrange(year, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"{year}-{month:02d}-{day:02d}")
logger.info(f"使用年份月份范围:{START_YEAR}{start_m}月 到 {END_YEAR}{end_m}月,共 {len(dates)}")
else:
logger.warning("未指定日期范围使用默认2025年1-12月")
for month in range(1, 13):
days_in_month = calendar.monthrange(2025, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"2025-{month:02d}-{day:02d}")
if not dates:
logger.error("未生成任何查询日期,请检查配置参数")
exit(1)
trades, stats, extra = backtest_one_third_precise(dates, min_body_size=0.1)
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = (exit_p - entry) if direction == '做多' else (entry - exit_p)
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
total_net_profit = total_money_profit - total_fee
print("\n【BitMart 三分之一策略回测结果5分钟K线 + 1分钟精准判断")
print(f"一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
print(f"使用1分钟K线精准判断双触次数: {extra['precise_1m_count']}")
print(f"使用开盘价距离判断次数: {extra['fallback_count']}")

View File

@@ -0,0 +1,418 @@
"""
量化交易回测系统 - 五分之一策略3分钟K线 + 1分钟精准判断
========== 策略规则 ==========
1. 触发价格计算基于有效的前一根3分钟K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件:
- 当前3分钟K线最高价 >= 做多触发价格 → 做多信号
- 当前3分钟K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根3分钟K线内只交易一次
4. 精准判断使用1分钟K线
- 当一根3分钟K线同时触及做多和做空价格时
- 使用该3分钟K线对应的3根1分钟K线来判断哪个方向先被触发
- 使回测更贴近真实成交顺序
"""
import datetime
import calendar
from typing import List, Dict, Optional
from loguru import logger
from models.bitmart_klines import BitMartETH3M, BitMartETH1M
# ========================= 工具函数 =========================
def is_bullish(c):
"""阳线"""
return float(c['close']) > float(c['open'])
def is_bearish(c):
"""阴线"""
return float(c['close']) < float(c['open'])
def get_body_size(candle):
"""K线实体大小绝对值"""
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1):
"""
从当前索引往前查找,直到找到实体>=min_body_size的K线
返回:(有效K线的索引, K线数据) 或 (None, None)
"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if get_body_size(prev) >= min_body_size:
return i, prev
return None, None
def get_one_fifth_levels(prev: Dict):
"""
计算前一根K线实体的 1/5 双向触发价格
返回:(做多触发价格, 做空触发价格)
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = p_close + body / 5
short_trigger = p_close - body / 5
return long_trigger, short_trigger
def get_1m_data_by_range(start_ts_ms: int, end_ts_ms: int) -> List[Dict]:
"""
获取指定时间范围内的1分钟K线数据毫秒时间戳
"""
query = BitMartETH1M.select().where(
BitMartETH1M.id.between(start_ts_ms, end_ts_ms - 1)
).order_by(BitMartETH1M.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
def get_1m_data_for_3m_bar(bar_3m: Dict) -> List[Dict]:
"""获取一根3分钟K线对应的3根1分钟K线"""
start_ts = bar_3m['id']
end_ts = start_ts + 3 * 60 * 1000
return get_1m_data_by_range(start_ts, end_ts)
def determine_trigger_order_by_1m(
bars_1m: List[Dict],
long_trigger: float,
short_trigger: float
) -> Optional[str]:
"""
使用1分钟K线判断在一根3分钟周期内先触发做多还是做空。
按时间顺序遍历每根1分钟K线先触及哪个方向则返回该方向
若同一根1分钟K线内两个方向都触及用开盘价距离判断。
"""
if not bars_1m:
return None
for bar in bars_1m:
high = float(bar['high'])
low = float(bar['low'])
open_price = float(bar['open'])
long_triggered = high >= long_trigger
short_triggered = low <= short_trigger
if long_triggered and not short_triggered:
return 'long'
if short_triggered and not long_triggered:
return 'short'
if long_triggered and short_triggered:
dist_to_long = abs(long_trigger - open_price)
dist_to_short = abs(short_trigger - open_price)
return 'short' if dist_to_short <= dist_to_long else 'long'
return None
def check_trigger_with_1m(
all_data_3m: List[Dict],
current_idx: int,
min_body_size: float = 0.1
) -> tuple:
"""
检查当前3分钟K线是否触发交易信号。
若同时触发多空则用该3分钟内的1分钟K线判断先后顺序。
返回:(方向, 触发价格, 有效前一根K线索引, 是否使用了1分钟精准判断, 是否双触)
"""
if current_idx <= 0:
return None, None, None, False, False
curr = all_data_3m[current_idx]
valid_prev_idx, prev = find_valid_prev_bar(all_data_3m, current_idx, min_body_size)
if prev is None:
return None, None, None, False, False
long_trigger, short_trigger = get_one_fifth_levels(prev)
if long_trigger is None:
return None, None, None, False, False
c_high = float(curr['high'])
c_low = float(curr['low'])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
both_triggered = long_triggered and short_triggered
if both_triggered:
bars_1m = get_1m_data_for_3m_bar(curr)
if bars_1m:
direction = determine_trigger_order_by_1m(bars_1m, long_trigger, short_trigger)
if direction:
trigger_price = long_trigger if direction == 'long' else short_trigger
return direction, trigger_price, valid_prev_idx, True, True
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
return 'short', short_trigger, valid_prev_idx, False, True
return 'long', long_trigger, valid_prev_idx, False, True
if short_triggered:
return 'short', short_trigger, valid_prev_idx, False, False
if long_triggered:
return 'long', long_trigger, valid_prev_idx, False, False
return None, None, None, False, False
def get_data_by_date(model, date_str: str) -> List[Dict]:
"""
按天获取指定表的K线数据毫秒时间戳
返回格式id(ms), open, high, low, close已按 id 升序。
"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc())
data = [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query]
if data:
data.sort(key=lambda x: x['id'])
return data
# ========================= 回测逻辑 =========================
def backtest_one_fifth_precise(dates: List[str], min_body_size: float = 0.1):
"""
五分之一策略回测3分钟K线 + 1分钟精准判断
风格与 回测数据-30分钟版 一致:按日期拉取、合并排序、统计与输出。
"""
all_data: List[Dict] = []
total_queried = 0
for d in dates:
day_data = get_data_by_date(BitMartETH3M, d)
all_data.extend(day_data)
if day_data:
total_queried += len(day_data)
logger.info(f"总共查询了 {len(dates)} 天,获取到 {total_queried} 条3分钟K线数据")
if not all_data:
logger.warning("未获取到任何数据,请检查:")
logger.warning("1. 数据库连接与 bitmart_eth_3m / bitmart_eth_1m 表是否存在")
logger.warning("2. 是否已用 抓取多周期K线.py 抓取过 1 分钟和 3 分钟数据")
return [], {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}, {'precise_1m_count': 0, 'fallback_count': 0}
all_data.sort(key=lambda x: x['id'])
if len(all_data) > 1:
first_ts = all_data[0]['id']
last_ts = all_data[-1]['id']
first_time = datetime.datetime.fromtimestamp(first_ts / 1000)
last_time = datetime.datetime.fromtimestamp(last_ts / 1000)
logger.info(f"数据已按时间排序:{first_time.strftime('%Y-%m-%d %H:%M:%S')}{last_time.strftime('%Y-%m-%d %H:%M:%S')}")
stats = {
'long': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做多'},
'short': {'count': 0, 'wins': 0, 'total_profit': 0.0, 'name': '做空'},
}
extra = {'precise_1m_count': 0, 'fallback_count': 0}
trades: List[Dict] = []
current_position: Optional[Dict] = None
last_trade_bar: Optional[int] = None
for idx in range(1, len(all_data)):
curr = all_data[idx]
direction, trigger_price, valid_prev_idx, used_1m, both_triggered = check_trigger_with_1m(all_data, idx, min_body_size)
valid_prev = all_data[valid_prev_idx] if valid_prev_idx is not None else None
if used_1m:
extra['precise_1m_count'] += 1
elif both_triggered and direction:
extra['fallback_count'] += 1
if direction is None:
continue
if last_trade_bar == idx:
continue
if current_position is None:
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
continue
pos_dir = current_position['direction']
if direction == pos_dir:
continue
exit_price = trigger_price
if pos_dir == 'long':
diff = exit_price - current_position['entry_price']
else:
diff = current_position['entry_price'] - exit_price
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(curr['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': curr['id'],
'signal': '五分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
current_position = {
'direction': direction,
'entry_price': trigger_price,
'entry_time': curr['id'],
'entry_bar': idx,
}
stats[direction]['count'] += 1
last_trade_bar = idx
if current_position:
last = all_data[-1]
exit_price = float(last['close'])
pos_dir = current_position['direction']
diff = (exit_price - current_position['entry_price']) if pos_dir == 'long' else (
current_position['entry_price'] - exit_price)
trades.append({
'entry_time': datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
'exit_time': datetime.datetime.fromtimestamp(last['id'] / 1000),
'entry_time_ms': current_position['entry_time'],
'exit_time_ms': last['id'],
'signal': '五分之一',
'direction': '做多' if pos_dir == 'long' else '做空',
'entry': current_position['entry_price'],
'exit': exit_price,
'diff': diff,
})
stats[pos_dir]['total_profit'] += diff
if diff > 0:
stats[pos_dir]['wins'] += 1
logger.info(f"回测完成使用1分钟精准判断 {extra['precise_1m_count']} 次,使用开盘价距离判断 {extra['fallback_count']}")
return trades, stats, extra
# ========================= 运行示例(与 回测数据-30分钟版 风格一致)=========================
if __name__ == '__main__':
START_DATE = "2025-01-01"
END_DATE = "2025-12-31"
START_YEAR = None
START_MONTH = None
END_YEAR = None
END_MONTH = None
dates = []
if START_DATE and END_DATE:
try:
start_dt = datetime.datetime.strptime(START_DATE, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(END_DATE, '%Y-%m-%d')
if start_dt > end_dt:
logger.error(f"开始日期 {START_DATE} 不能晚于结束日期 {END_DATE}")
exit(1)
current_dt = start_dt
while current_dt <= end_dt:
dates.append(current_dt.strftime('%Y-%m-%d'))
current_dt += datetime.timedelta(days=1)
logger.info(f"使用指定日期范围:{START_DATE}{END_DATE},共 {len(dates)}")
except ValueError as e:
logger.error(f"日期格式错误:{e},请使用 YYYY-MM-DD 格式")
exit(1)
elif START_YEAR and END_YEAR:
start_m = START_MONTH if START_MONTH else 1
end_m = END_MONTH if END_MONTH else 12
for year in range(START_YEAR, END_YEAR + 1):
month_start = start_m if year == START_YEAR else 1
month_end = end_m if year == END_YEAR else 12
for month in range(month_start, month_end + 1):
days_in_month = calendar.monthrange(year, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"{year}-{month:02d}-{day:02d}")
logger.info(f"使用年份月份范围:{START_YEAR}{start_m}月 到 {END_YEAR}{end_m}月,共 {len(dates)}")
else:
logger.warning("未指定日期范围使用默认2025年1-12月")
for month in range(1, 13):
days_in_month = calendar.monthrange(2025, month)[1]
for day in range(1, days_in_month + 1):
dates.append(f"2025-{month:02d}-{day:02d}")
if not dates:
logger.error("未生成任何查询日期,请检查配置参数")
exit(1)
trades, stats, extra = backtest_one_fifth_precise(dates, min_body_size=0.1)
logger.info("===== 每笔交易详情 =====")
contract_size = 10000
open_fee_fixed = 5
close_fee_rate = 0.0005
total_points_profit = 0
total_money_profit = 0
total_fee = 0
for t in trades:
entry = t['entry']
exit_p = t['exit']
direction = t['direction']
point_diff = (exit_p - entry) if direction == '做多' else (entry - exit_p)
money_profit = point_diff / entry * contract_size
fee = open_fee_fixed + (contract_size / entry * exit_p * close_fee_rate)
net_profit = money_profit - fee
t.update({
'point_diff': point_diff,
'raw_profit': money_profit,
'fee': fee,
'net_profit': net_profit
})
total_points_profit += point_diff
total_money_profit += money_profit
total_fee += fee
logger.info(
f"{t['entry_time']} {direction}({t['signal']}) "
f"入={entry:.2f} 出={exit_p:.2f} 差价={point_diff:.2f} "
f"原始盈利={money_profit:.2f} 手续费={fee:.2f} 净利润={net_profit:.2f} {t['exit_time']}"
)
total_net_profit = total_money_profit - total_fee
print("\n【BitMart 五分之一策略回测结果3分钟K线 + 1分钟精准判断")
print(f"一共交易笔数:{len(trades)}")
print(f"总点差:{total_points_profit:.2f}")
print(f"总原始盈利(未扣费):{total_money_profit:.2f}")
print(f"总手续费:{total_fee:.2f}")
print(f"总净利润:{total_net_profit:.2f}\n")
print("===== 信号统计 =====")
for k, v in stats.items():
name, count, wins, total_p = v['name'], v['count'], v['wins'], v['total_profit']
win_rate = (wins / count * 100) if count > 0 else 0.0
avg_p = (total_p / count) if count > 0 else 0.0
print(f"{name}: 次数={count} 胜率={win_rate:.2f}% 总价差={total_p:.2f} 平均价差={avg_p:.2f}")
print(f"使用1分钟K线精准判断双触次数: {extra['precise_1m_count']}")
print(f"使用开盘价距离判断次数: {extra['fallback_count']}")