This commit is contained in:
27942
2025-10-16 17:25:41 +08:00
parent 144ae64981
commit acf3af55fa
5 changed files with 1013 additions and 62 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -1,12 +1,15 @@
"""
量化交易回测系统 - 优化版v2.1
量化交易回测系统 - 优化版
功能:基于包住形态的交易信号识别和回测分析
作者:量化交易团队
版本2.0
"""
import datetime
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
from loguru import logger
from peewee import fn
from models.weex import Weex15, Weex1
@@ -17,27 +20,65 @@ from models.weex import Weex15, Weex1
@dataclass
class BacktestConfig:
"""回测配置类"""
# 交易参数
take_profit: float = 8.0 # 止盈点数
stop_loss: float = -1.0 # 止损点数
contract_size: float = 10000 # 合约规模
open_fee: float = 5.0 # 开仓手续费
close_fee_rate: float = 0.0005 # 平仓手续费率
# 回测日期范围
start_date: str = "2025-7-1"
end_date: str = "2025-7-31"
# 信号参数
enable_bear_bull_engulf: bool = True # 涨包跌信号
enable_bull_bear_engulf: bool = True # 跌包涨信号
def __post_init__(self):
"""验证配置参数"""
if self.take_profit <= 0:
raise ValueError("止盈点数必须大于0")
if self.stop_loss >= 0:
raise ValueError("止损点数必须小于0")
@dataclass
class TradeRecord:
"""交易记录类"""
entry_time: datetime.datetime
exit_time: datetime.datetime
signal_type: str
direction: str
entry_price: float
exit_price: float
profit_loss: float
profit_amount: float
total_fee: float
net_profit: float
@dataclass
class SignalStats:
"""信号统计类"""
signal_name: str
count: int = 0
wins: int = 0
total_profit: float = 0.0
@property
def win_rate(self) -> float:
"""胜率计算"""
return (self.wins / self.count * 100) if self.count > 0 else 0.0
@property
def avg_profit(self) -> float:
"""平均盈利"""
return self.total_profit / self.count if self.count > 0 else 0.0
# ===============================================================
# 📊 数据模块
# 📊 数据获取模块
# ===============================================================
def get_data_by_date(model, date_str):
@@ -72,10 +113,12 @@ def get_future_data_1min(start_ts, end_ts):
# ===============================================================
# 📈 信号模块
# 📈 信号判定模块
# ===============================================================
def is_bullish(c): return float(c['open']) < float(c['close'])
def is_bearish(c): return float(c['open']) > float(c['close'])
@@ -96,33 +139,44 @@ def check_signal(prev, curr):
# ===============================================================
# 💹 模拟模块(1分钟级止盈止损
# 💹 回测模拟模块(使用 1 分钟数据
# ===============================================================
def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl=-1):
"""用 1 分钟数据进行精细化止盈止损模拟"""
"""
用 1 分钟数据进行精细化止盈止损模拟
entry_time: 当前信号的 entry candle id毫秒时间戳
next_15min_time: 下一个15min时间戳用于界定止盈止损分析范围
direction信号类型
entry_price开仓价格
entry_time开仓时间
next_15min_time15分钟未来行情
"""
# 查 15 分钟之间的 1 分钟数据
future_candles = get_future_data_1min(entry_time, next_15min_time)
if not future_candles:
return None, 0, None
tp_price = entry_price + tp if direction == "long" else entry_price - tp
sl_price = entry_price + sl if direction == "long" else entry_price - sl
tp_price = entry_price + tp if direction == "long" else entry_price - tp # 止盈价位
sl_price = entry_price + sl if direction == "long" else entry_price - sl # 止损价位
for candle in future_candles:
open_p, high, low = map(float, (candle['open'], candle['high'], candle['low']))
if direction == "long":
if open_p >= tp_price: # 跳空止盈
if direction == "long": # long
if open_p >= tp_price: # 开盘跳空止盈 涨信号,
return open_p, open_p - entry_price, candle['id']
if open_p <= sl_price: # 跳空止损
if open_p <= sl_price: # 开盘跳空止损
return open_p, open_p - entry_price, candle['id']
if high >= tp_price:
return tp_price, tp, candle['id']
if low <= sl_price:
return sl_price, sl, candle['id']
else: # short
if open_p <= tp_price:
else: # short 跌信号
if open_p <= tp_price: #
return open_p, entry_price - open_p, candle['id']
if open_p >= sl_price:
return open_p, entry_price - open_p, candle['id']
@@ -139,13 +193,22 @@ def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl
# ===============================================================
# 📊 回测流程
# 📊 回测流程
# ===============================================================
def backtest(dates, tp, sl):
"""
datas日期的列表
:param dates:
:param tp:
:param sl:
:return:
"""
all_data = []
for date_str in dates:
all_data.extend(get_data_by_date(Weex15, date_str))
all_data.extend(get_data_by_date(Weex15, date_str)) # 获取每天的数据15分钟k线数据
all_data.sort(key=lambda x: x['id'])
@@ -155,52 +218,31 @@ def backtest(dates, tp, sl):
}
trades = []
idx = 1
open_position = None
while idx < len(all_data) - 1:
prev, curr = all_data[idx - 1], all_data[idx]
entry_candle = all_data[idx + 1]
for idx in range(1, len(all_data) - 1):
prev, curr = all_data[idx - 1], all_data[idx] # 前一笔,当前一笔
entry_candle = all_data[idx + 1] # 下一笔开仓k线
direction, signal = check_signal(prev, curr)
# === 检查信号 ===
if not direction:
idx += 1
continue
# === 当前有持仓 ===
if open_position:
if direction == open_position['direction']:
# 同方向信号:忽略
idx += 1
continue
else:
# 反方向信号:立即平仓
exit_price = float(entry_candle['open'])
diff = (exit_price - open_position['entry_price']) if open_position['direction'] == 'long' else (
open_position['entry_price'] - exit_price)
trades.append({
"entry_time": datetime.datetime.fromtimestamp(open_position['entry_time'] / 1000),
"exit_time": datetime.datetime.fromtimestamp(entry_candle['id'] / 1000),
"signal": "反向平仓",
"direction": "平仓",
"entry": open_position['entry_price'],
"exit": exit_price,
"diff": diff
})
open_position = None # 平仓后可立即反手
# === 开新仓 ===
# 下一个 15 分钟K线的时间范围
next_15min_time = all_data[idx + 50]['id'] if idx + 50 < len(all_data) else all_data[-1]['id']
entry_price = float(entry_candle['open'])
entry_price = float(entry_candle['open']) # 开仓价格
exit_price, diff, exit_time = simulate_trade(
direction, entry_price, entry_candle['id'], next_15min_time, tp=tp, sl=sl
direction,
entry_price,
entry_candle['id'],
next_15min_time,
tp=tp,
sl=sl
)
if exit_price is None:
idx += 1
continue
# 记录统计
stats[signal]['count'] += 1
stats[signal]['total_profit'] += diff
if diff > 0:
@@ -216,25 +258,116 @@ def backtest(dates, tp, sl):
"diff": diff
})
# === 跳过到平仓时间点 ===
# 找到 exit_time 对应的 candle 索引,防止未平仓时重复触发信号
while idx < len(all_data) - 1 and all_data[idx]['id'] < exit_time:
idx += 1
return trades, stats
open_position = None # 已平仓
idx += 1
def backtest_single_position(dates, tp, sl):
"""单笔持仓回测,处理同向/反向信号"""
all_data = []
for date_str in dates:
all_data.extend(get_data_by_date(Weex15, date_str))
all_data.sort(key=lambda x: x['id'])
stats = {
"bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包跌"},
"bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包涨"},
}
trades = []
current_position = None # 当前持仓信息
for idx in range(1, len(all_data) - 1):
prev, curr = all_data[idx - 1], all_data[idx]
entry_candle = all_data[idx + 1]
direction, signal = check_signal(prev, curr)
if not direction:
continue
# 下一个 15 分钟K线的时间范围
next_15min_time = all_data[idx + 50]['id'] if idx + 50 < len(all_data) else all_data[-1]['id']
entry_price = float(entry_candle['open'])
# 有持仓
if current_position:
# 同向信号 -> 跳过
if current_position['direction'] == direction:
continue
# 反向信号 -> 先平掉当前持仓,再开新仓
else:
# 先按当前位置止盈止损平仓
exit_price, diff, exit_time = simulate_trade(
current_position['direction'],
current_position['entry_price'],
current_position['entry_time'],
entry_candle['id'],
tp=tp,
sl=sl
)
if exit_price is not None:
trades.append({
"entry_time": datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
"exit_time": datetime.datetime.fromtimestamp(exit_time / 1000),
"signal": current_position['signal'],
"direction": "做多" if current_position['direction'] == "long" else "做空",
"entry": current_position['entry_price'],
"exit": exit_price,
"diff": diff
})
# 更新统计
stats_key = 'bear_bull_engulf' if current_position['signal'] == '涨包跌' else 'bull_bear_engulf'
stats[stats_key]['count'] += 1
stats[stats_key]['total_profit'] += diff
if diff > 0:
stats[stats_key]['wins'] += 1
current_position = None # 清空持仓
# 开新仓
current_position = {
"direction": direction,
"signal": stats[signal]['name'],
"entry_price": entry_price,
"entry_time": entry_candle['id']
}
# 最后一笔持仓如果未平仓,用最后收盘价平掉
if current_position:
exit_price, diff, exit_time = simulate_trade(
current_position['direction'],
current_position['entry_price'],
current_position['entry_time'],
all_data[-1]['id'],
tp=tp,
sl=sl
)
if exit_price is not None:
trades.append({
"entry_time": datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000),
"exit_time": datetime.datetime.fromtimestamp(exit_time / 1000),
"signal": current_position['signal'],
"direction": "做多" if current_position['direction'] == "long" else "做空",
"entry": current_position['entry_price'],
"exit": exit_price,
"diff": diff
})
stats_key = 'bear_bull_engulf' if current_position['signal'] == '涨包跌' else 'bull_bear_engulf'
stats[stats_key]['count'] += 1
stats[stats_key]['total_profit'] += diff
if diff > 0:
stats[stats_key]['wins'] += 1
return trades, stats
# ===============================================================
# 🚀 主入口
# 🚀 启动主流程
# ===============================================================
if __name__ == '__main__':
dates = [f"2025-9-{i}" for i in range(1, 31)]
trades, stats = backtest(dates, tp=50, sl=-10)
trades, stats = backtest_single_position(dates, tp=10000, sl=-10)
logger.info("===== 每笔交易详情 =====")
for t in trades:
@@ -246,13 +379,32 @@ if __name__ == '__main__':
total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades)
total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades)
# print(f"止盈:{i1}, 止损:{i}")
print(f"\n一共交易笔数:{len(trades)}")
print(f"盈利:{total_profit:.2f}")
print(f"手续费:{total_fee:.2f}")
print(f"一共盈利:{total_profit:.2f}")
print(f"一共手续费:{total_fee:.2f}")
print(f"净利润:{total_profit - total_fee:.2f}")
print("\n===== 信号统计 =====")
for k, v in stats.items():
win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0
print(f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}")
# ===============================================================================================================================
# for i in range(1, 16):
# for i1 in range(1, 51):
# trades, stats = backtest_single_position(dates, tp=i1, sl=-i)
#
# total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades)
# total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades)
#
# if total_profit > total_fee * 0.1:
# print("\n===== 信号统计 =====")
# print(f"止盈:{i1}, 止损:{i}")
# print(f"\n一共交易笔数{len(trades)}")
# print(f"一共盈利:{total_profit:.2f}")
# print(f"一共手续费:{total_fee:.2f}")
# print(f"净利润:{total_profit - total_fee * 0.1}")
# 需要优化,目前有两种情况,第一种,同向,不如说上一单开单是涨,上一单还没有结束,当前信号来了,就不开单,等上一单到了上一单的止损位或者止盈位在平仓
# 第二种,方向,上一单是涨,上一单还没有结束,当前信号来了,是跌,然后就按照现在这个信号要开仓的位置,平掉上一单,然后开一单方向的,
# 一笔中可能有好几次信号,都按照上面的规则去判断,要保证同一时间,只会有一笔持仓,
# 打印每笔的交易详细,如果一笔中同向,输入为一条交易记录

View File

@@ -0,0 +1,793 @@
"""
量化交易回测系统 - 优化修正版
功能:基于包住形态的交易信号识别和回测分析
作者:量化交易团队
版本2.1 (修正版)
"""
import datetime
import os
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
import pandas as pd
import plotly.graph_objects as go
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
from loguru import logger
from peewee import fn
from models.weex import Weex15, Weex1
# ===============================================================
# 📊 配置管理类
# ===============================================================
@dataclass
class BacktestConfig:
"""回测配置类"""
# 交易参数
take_profit: float = 8.0 # 止盈点数
stop_loss: float = -1.0 # 止损点数
contract_size: float = 10000 # 合约规模
open_fee: float = 5.0 # 开仓手续费
close_fee_rate: float = 0.0005 # 平仓手续费率
slippage_rate: float = 0.0001 # 滑点率 0.01%
# 回测日期范围
start_date: str = "2025-7-1"
end_date: str = "2025-7-31"
# 信号参数
enable_bear_bull_engulf: bool = True # 涨包跌信号
enable_bull_bear_engulf: bool = True # 跌包涨信号
def __post_init__(self):
"""验证配置参数"""
if self.take_profit <= 0:
raise ValueError("止盈点数必须大于0")
if self.stop_loss >= 0:
raise ValueError("止损点数必须小于0")
if self.slippage_rate < 0 or self.slippage_rate > 0.01:
raise ValueError("滑点率应在0-1%之间")
@dataclass
class TradeRecord:
"""交易记录类"""
entry_time: datetime.datetime
exit_time: datetime.datetime
signal_type: str
direction: str
entry_price: float
exit_price: float
profit_loss: float
profit_amount: float
total_fee: float
net_profit: float
slippage_cost: float = 0.0
@dataclass
class SignalStats:
"""信号统计类"""
signal_name: str
count: int = 0
wins: int = 0
total_profit: float = 0.0
total_fee: float = 0.0
total_slippage: float = 0.0
@property
def win_rate(self) -> float:
"""胜率计算"""
return (self.wins / self.count * 100) if self.count > 0 else 0.0
@property
def avg_profit(self) -> float:
"""平均盈利"""
return self.total_profit / self.count if self.count > 0 else 0.0
@property
def net_profit(self) -> float:
"""净利润(扣除手续费和滑点)"""
return self.total_profit - self.total_fee - self.total_slippage
@dataclass
class PositionState:
direction: Optional[str] = None # "long" | "short"
entry_price: Optional[float] = None
entry_time: Optional[int] = None # ms
last_checked_time: Optional[int] = None # ms
# ===============================================================
# 📊 数据获取模块
# ===============================================================
def get_data_by_date(model, date_str):
"""按天获取指定表的数据"""
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。")
return []
start_ts = int(target_date.timestamp() * 1000)
end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1
try:
query = (model
.select()
.where(model.id.between(start_ts, end_ts))
.order_by(model.id.asc()))
data = []
for i in query:
# 验证数据完整性
if all(hasattr(i, attr) for attr in ['open', 'high', 'low', 'close']):
data.append({
'id': i.id,
'open': float(i.open),
'high': float(i.high),
'low': float(i.low),
'close': float(i.close)
})
logger.info(f"获取到 {len(data)}{date_str} 的数据")
return data
except Exception as e:
logger.error(f"获取数据失败 {date_str}: {e}")
return []
def get_future_data_1min(start_ts, end_ts):
"""获取指定时间范围内的 1 分钟数据"""
try:
query = (Weex1
.select()
.where(Weex1.id.between(start_ts, end_ts))
.order_by(Weex1.id.asc()))
data = []
for i in query:
if all(hasattr(i, attr) for attr in ['open', 'high', 'low', 'close']):
data.append({
'id': i.id,
'open': float(i.open),
'high': float(i.high),
'low': float(i.low),
'close': float(i.close)
})
return data
except Exception as e:
logger.error(f"获取1分钟数据失败: {e}")
return []
def get_1min_window(center_ts_ms: int, minutes_before: int = 30, minutes_after: int = 60):
"""基于中心时间获取前后窗口的一分钟K线数据。
返回按时间升序的列表[{id, open, high, low, close}]。
"""
try:
start_ts = center_ts_ms - minutes_before * 60 * 1000
end_ts = center_ts_ms + minutes_after * 60 * 1000
return get_future_data_1min(start_ts, end_ts)
except Exception as e:
logger.error(f"获取一分钟窗口数据失败: {e}")
return []
# ===============================================================
# 📈 信号判定模块(修正版)
# ===============================================================
def is_bullish(candle):
"""判断是否为阳线"""
return float(candle['open']) < float(candle['close'])
def is_bearish(candle):
"""判断是否为阴线"""
return float(candle['open']) > float(candle['close'])
def check_signal(prev, curr):
"""
判断是否出现包住形态(修正版)
包住形态定义:
- 看涨包住:前一根阴线,当前阳线完全包含前一根阴线的实体
- 看跌包住:前一根阳线,当前阴线完全包含前一根阳线的实体
"""
try:
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 确保数据有效
if not all(isinstance(x, (int, float)) and x > 0 for x in [p_open, p_close, c_open, c_close]):
return None, None
# 看涨包住:前一根是阴线,当前是阳线,且当前阳线完全包住前一根阴线
if (is_bearish(prev) and is_bullish(curr) and
c_open <= p_close and c_close >= p_open):
logger.debug(f"发现看涨包住信号: 前阴线({p_open:.2f}-{p_close:.2f}) 当前阳线({c_open:.2f}-{c_close:.2f})")
return "long", "bear_bull_engulf"
# 看跌包住:前一根是阳线,当前是阴线,且当前阴线完全包住前一根阳线
if (is_bullish(prev) and is_bearish(curr) and
c_open >= p_close and c_close <= p_open):
logger.debug(f"发现看跌包住信号: 前阳线({p_open:.2f}-{p_close:.2f}) 当前阴线({c_open:.2f}-{c_close:.2f})")
return "short", "bull_bear_engulf"
return None, None
except Exception as e:
logger.error(f"信号判断出错: {e}")
return None, None
# ===============================================================
# 💹 回测模拟模块(修正版)
# ===============================================================
def simulate_trade(direction, entry_price, entry_time, next_15min_time, config: BacktestConfig):
"""
用 1 分钟数据进行精细化止盈止损模拟(修正版)
Args:
direction: 交易方向 ("long""short")
entry_price: 开仓价格
entry_time: 开仓时间(毫秒时间戳)
next_15min_time: 下一个15分钟K线时间戳
config: 回测配置
Returns:
(exit_price, profit_loss_points, exit_time, slippage_cost)
"""
try:
# 获取未来1分钟数据
future_candles = get_future_data_1min(entry_time, next_15min_time)
if not future_candles:
logger.warning(f"未获取到1分钟数据: {entry_time} - {next_15min_time}")
return None, 0, None, 0
# 计算止盈止损价格
if direction == "long":
tp_price = entry_price + config.take_profit
sl_price = entry_price + config.stop_loss
else: # short
tp_price = entry_price - config.take_profit
sl_price = entry_price - config.stop_loss
slippage_cost = 0.0
for candle in future_candles:
open_p, high, low, close = map(float, (candle['open'], candle['high'], candle['low'], candle['close']))
if direction == "long":
# 检查止损
if low <= sl_price:
exit_price = sl_price - (sl_price * config.slippage_rate)
slippage_cost = sl_price * config.slippage_rate
profit_loss = config.stop_loss - slippage_cost
logger.debug(f"多头止损触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 亏损{profit_loss:.2f}")
return exit_price, profit_loss, candle['id'], slippage_cost
# 检查止盈
if high >= tp_price:
exit_price = tp_price - (tp_price * config.slippage_rate)
slippage_cost = tp_price * config.slippage_rate
profit_loss = config.take_profit - slippage_cost
logger.debug(f"多头止盈触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈利{profit_loss:.2f}")
return exit_price, profit_loss, candle['id'], slippage_cost
else: # short
# 检查止损
if high >= sl_price:
exit_price = sl_price + (sl_price * config.slippage_rate)
slippage_cost = sl_price * config.slippage_rate
profit_loss = config.stop_loss - slippage_cost
logger.debug(f"空头止损触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 亏损{profit_loss:.2f}")
return exit_price, profit_loss, candle['id'], slippage_cost
# 检查止盈
if low <= tp_price:
exit_price = tp_price + (tp_price * config.slippage_rate)
slippage_cost = tp_price * config.slippage_rate
profit_loss = config.take_profit - slippage_cost
logger.debug(f"空头止盈触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈利{profit_loss:.2f}")
return exit_price, profit_loss, candle['id'], slippage_cost
# 未触发止盈止损,用最后一根收盘价平仓
final_candle = future_candles[-1]
final_price = float(final_candle['close'])
if direction == "long":
exit_price = final_price - (final_price * config.slippage_rate)
profit_loss = (exit_price - entry_price)
else:
exit_price = final_price + (final_price * config.slippage_rate)
profit_loss = (entry_price - exit_price)
slippage_cost = final_price * config.slippage_rate
logger.debug(f"时间到期平仓: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈亏{profit_loss:.2f}")
return exit_price, profit_loss, final_candle['id'], slippage_cost
except Exception as e:
logger.error(f"交易模拟出错: {e}")
return None, 0, None, 0
def simulate_until(direction, entry_price, entry_time, end_time, config: BacktestConfig):
"""
从entry_time开始向后检查直到end_time不跨越end_time
返回在此区间内是否触发TP/SL以及对应的退出信息。
若未触发,返回(None, 0, None, 0)。
"""
try:
candles = get_future_data_1min(entry_time, end_time)
if not candles:
return None, 0, None, 0
if direction == "long":
tp_price = entry_price + config.take_profit
sl_price = entry_price + config.stop_loss
else:
tp_price = entry_price - config.take_profit
sl_price = entry_price - config.stop_loss
for candle in candles:
open_p, high, low = map(float, (candle['open'], candle['high'], candle['low']))
if direction == "long":
if low <= sl_price:
exit_price = sl_price - (sl_price * config.slippage_rate)
slippage_cost = sl_price * config.slippage_rate
return exit_price, config.stop_loss - slippage_cost, candle['id'], slippage_cost
if high >= tp_price:
exit_price = tp_price - (tp_price * config.slippage_rate)
slippage_cost = tp_price * config.slippage_rate
return exit_price, config.take_profit - slippage_cost, candle['id'], slippage_cost
else:
if high >= sl_price:
exit_price = sl_price + (sl_price * config.slippage_rate)
slippage_cost = sl_price * config.slippage_rate
return exit_price, config.stop_loss - slippage_cost, candle['id'], slippage_cost
if low <= tp_price:
exit_price = tp_price + (tp_price * config.slippage_rate)
slippage_cost = tp_price * config.slippage_rate
return exit_price, config.take_profit - slippage_cost, candle['id'], slippage_cost
return None, 0, None, 0
except Exception as e:
logger.error(f"分段交易模拟出错: {e}")
return None, 0, None, 0
def calculate_fees(entry_price, exit_price, config: BacktestConfig):
"""计算手续费"""
open_fee = config.open_fee
close_fee = config.contract_size * config.close_fee_rate
return open_fee + close_fee
# ===============================================================
# 📈 可视化一分钟K线与开仓位置
# ===============================================================
def _to_datetime(ms: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(ms / 1000)
def visualize_trade_1min(trade: 'TradeRecord', *,
minutes_before: int = 30,
minutes_after: int = 60,
take_profit_points: Optional[float] = None,
stop_loss_points: Optional[float] = None,
output_dir: str = "charts_1m") -> Optional[str]:
"""生成指定交易周边的一分钟K线图Plotly HTML
Args:
trade: 回测产生的交易记录
minutes_before: 开仓前取多少分钟的数据
minutes_after: 开仓后取多少分钟的数据
take_profit_points: 可选,绘制入场±止盈线(点)
stop_loss_points: 可选,绘制入场±止损线(点)
output_dir: 输出目录
Returns:
生成的HTML路径失败返回None。
"""
try:
entry_ms = int(trade.entry_time.timestamp() * 1000)
data = get_1min_window(entry_ms, minutes_before, minutes_after)
if not data:
logger.warning("一分钟数据为空,跳过可视化")
return None
# 构造DataFrame以便排序与渲染
df = pd.DataFrame(data)
df = df.sort_values('id').reset_index(drop=True)
df['time'] = df['id'].apply(lambda x: _to_datetime(int(x)))
df['open'] = df['open'].astype(float)
df['high'] = df['high'].astype(float)
df['low'] = df['low'].astype(float)
df['close'] = df['close'].astype(float)
fig = go.Figure()
fig.add_trace(go.Candlestick(
x=df['time'],
open=df['open'], high=df['high'], low=df['low'], close=df['close'],
name="1分钟K线"
))
# 将入场/出场时间吸附到最近的一分钟K线时间提高对齐准确度
def _snap_time(target_dt: datetime.datetime, tolerance_ms: int = 90_000):
target_ms = int(target_dt.timestamp() * 1000)
diffs = (df['id'] - target_ms).abs()
idx = int(diffs.idxmin())
if abs(int(df.at[idx, 'id']) - target_ms) <= tolerance_ms:
return df.at[idx, 'time']
return target_dt
snapped_entry_time = _snap_time(trade.entry_time)
snapped_exit_time = _snap_time(trade.exit_time)
# 标注入场、出场
entry_y = float(trade.entry_price)
exit_y = float(trade.exit_price)
fig.add_trace(go.Scatter(
x=[snapped_entry_time],
y=[entry_y],
mode="markers+text",
name="入场",
text=["入场"],
textposition="top center",
marker=dict(color="#2ecc71", size=10, symbol="triangle-up")
))
fig.add_trace(go.Scatter(
x=[snapped_exit_time],
y=[exit_y],
mode="markers+text",
name="出场",
text=["出场"],
textposition="bottom center",
marker=dict(color="#e74c3c", size=10, symbol="x")
))
# 可选:止盈止损参考线
shapes = []
annotations = []
if take_profit_points is not None:
tp_price = entry_y + take_profit_points if trade.direction == "做多" else entry_y - take_profit_points
shapes.append(dict(type="line", xref="x", yref="y",
x0=df['time'].min(), x1=df['time'].max(), y0=tp_price, y1=tp_price,
line=dict(color="rgba(46, 204, 113, 0.5)", width=1, dash="dash")))
annotations.append(dict(xref="x", yref="y", x=df['time'].min(), y=tp_price,
text="TP", showarrow=False, font=dict(color="#2ecc71")))
if stop_loss_points is not None:
sl_price = entry_y + stop_loss_points if trade.direction == "做多" else entry_y - stop_loss_points
shapes.append(dict(type="line", xref="x", yref="y",
x0=df['time'].min(), x1=df['time'].max(), y0=sl_price, y1=sl_price,
line=dict(color="rgba(231, 76, 60, 0.5)", width=1, dash="dot")))
annotations.append(dict(xref="x", yref="y", x=df['time'].min(), y=sl_price,
text="SL", showarrow=False, font=dict(color="#e74c3c")))
title = (
f"{trade.entry_time.strftime('%Y-%m-%d %H:%M')} 开仓 - {trade.direction}({trade.signal_type}) "
f"入场={trade.entry_price:.2f} 出场={trade.exit_price:.2f} 盈亏={trade.profit_loss:.2f}"
)
fig.update_layout(
title=title,
xaxis_title="时间",
yaxis_title="价格",
xaxis=dict(rangeslider=dict(visible=False)),
shapes=shapes,
annotations=annotations,
hovermode="x unified"
)
Path(output_dir).mkdir(parents=True, exist_ok=True)
fname = f"trade_{trade.entry_time.strftime('%Y%m%d_%H%M%S')}.html"
out_path = os.path.join(output_dir, fname)
fig.write_html(out_path, include_plotlyjs="cdn", auto_open=False)
logger.info(f"一分钟K线可视化已生成: {out_path}")
return out_path
except Exception as e:
logger.error(f"生成一分钟K线图失败: {e}")
return None
# ===============================================================
# 📊 主回测流程(修正版)
# ===============================================================
def backtest(dates, config: BacktestConfig):
"""
主回测函数(修正版)
Args:
dates: 日期列表
config: 回测配置
Returns:
(trades, stats)
"""
logger.info(f"开始回测,日期范围: {dates[0]}{dates[-1]}")
# 获取所有15分钟K线数据
all_data = []
for date_str in dates:
daily_data = get_data_by_date(Weex15, date_str)
if daily_data:
all_data.extend(daily_data)
else:
logger.warning(f"日期 {date_str} 没有数据")
if not all_data:
logger.error("没有获取到任何数据")
return [], {}
all_data.sort(key=lambda x: x['id'])
logger.info(f"总共获取到 {len(all_data)} 条15分钟K线数据")
# 初始化统计
stats = {
"bear_bull_engulf": SignalStats(signal_name="看涨包住"),
"bull_bear_engulf": SignalStats(signal_name="看跌包住"),
}
trades = []
total_trades = 0
# 主回测循环(加入持仓管理:同向不加仓,反向平旧开新)
position = PositionState()
for idx in range(1, len(all_data) - 1):
try:
prev, curr = all_data[idx - 1], all_data[idx]
# 检查信号
direction, signal = check_signal(prev, curr)
if not direction:
continue
# 检查信号是否启用
if signal == "bear_bull_engulf" and not config.enable_bear_bull_engulf:
continue
if signal == "bull_bear_engulf" and not config.enable_bull_bear_engulf:
continue
# 当前K线时间与价格
current_time = curr['id']
current_close = float(curr['close'])
# 若有持仓先滚动检查从上次检查时间到当前时间是否触发TP/SL
if position.direction is not None:
check_from = position.last_checked_time or position.entry_time
if current_time > check_from:
e_price, pl_pts, e_time, slip = simulate_until(
position.direction, position.entry_price, check_from, current_time, config
)
if e_price is not None:
# 生成平仓记录(由持仓信号驱动,不计入当前信号统计)
total_fee = calculate_fees(position.entry_price, e_price, config)
profit_amount = pl_pts * config.contract_size
trade = TradeRecord(
entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000),
exit_time=datetime.datetime.fromtimestamp(e_time / 1000),
signal_type="持仓止盈/止损",
direction="做多" if position.direction == "long" else "做空",
entry_price=position.entry_price,
exit_price=e_price,
profit_loss=pl_pts,
profit_amount=profit_amount,
total_fee=total_fee,
net_profit=profit_amount - total_fee,
slippage_cost=slip * config.contract_size
)
trades.append(trade)
total_trades += 1
# 清空持仓
position = PositionState()
else:
position.last_checked_time = current_time
# 根据信号与持仓关系决定是否开/平仓
if direction:
if position.direction is None:
# 无持仓 -> 开仓
position = PositionState(direction=direction, entry_price=current_close, entry_time=current_time, last_checked_time=current_time)
else:
if position.direction == direction:
# 同向信号,不加仓,保持原持仓
pass
else:
# 反向信号:先以当前价立即平旧仓,再开新仓
e_price = current_close
if position.direction == "long":
pl_pts = e_price - position.entry_price
else:
pl_pts = position.entry_price - e_price
total_fee = calculate_fees(position.entry_price, e_price, config)
profit_amount = pl_pts * config.contract_size
trade = TradeRecord(
entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000),
exit_time=datetime.datetime.fromtimestamp(current_time / 1000),
signal_type="反向信号平仓",
direction="做多" if position.direction == "long" else "做空",
entry_price=position.entry_price,
exit_price=e_price,
profit_loss=pl_pts,
profit_amount=profit_amount,
total_fee=total_fee,
net_profit=profit_amount - total_fee,
slippage_cost=0.0
)
trades.append(trade)
total_trades += 1
# 开新仓
position = PositionState(direction=direction, entry_price=current_close, entry_time=current_time, last_checked_time=current_time)
except Exception as e:
logger.error(f"处理第 {idx} 条数据时出错: {e}")
continue
# 循环结束后,如仍有持仓,按最后一根收盘价平仓
if position.direction is not None:
final = all_data[-1]
final_time = final['id']
final_price = float(final['close'])
if position.direction == "long":
pl_pts = final_price - position.entry_price
else:
pl_pts = position.entry_price - final_price
total_fee = calculate_fees(position.entry_price, final_price, config)
profit_amount = pl_pts * config.contract_size
trade = TradeRecord(
entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000),
exit_time=datetime.datetime.fromtimestamp(final_time / 1000),
signal_type="时间到期平仓",
direction="做多" if position.direction == "long" else "做空",
entry_price=position.entry_price,
exit_price=final_price,
profit_loss=pl_pts,
profit_amount=profit_amount,
total_fee=total_fee,
net_profit=profit_amount - total_fee,
slippage_cost=0.0
)
trades.append(trade)
total_trades += 1
position = PositionState()
logger.info(f"回测完成,总共 {total_trades} 笔交易")
return trades, stats
# ===============================================================
# 📊 结果分析模块
# ===============================================================
def analyze_results(trades, stats):
"""分析回测结果"""
if not trades:
logger.warning("没有交易记录")
return
total_trades = len(trades)
total_profit = sum(t.profit_amount for t in trades)
total_fee = sum(t.total_fee for t in trades)
total_slippage = sum(t.slippage_cost for t in trades)
net_profit = total_profit - total_fee - total_slippage
wins = sum(1 for t in trades if t.profit_loss > 0)
losses = total_trades - wins
win_rate = (wins / total_trades * 100) if total_trades > 0 else 0
avg_profit = total_profit / total_trades if total_trades > 0 else 0
avg_fee = total_fee / total_trades if total_trades > 0 else 0
logger.info("=" * 50)
logger.info("📊 回测结果汇总")
logger.info("=" * 50)
logger.info(f"总交易次数: {total_trades}")
logger.info(f"盈利次数: {wins}")
logger.info(f"亏损次数: {losses}")
logger.info(f"胜率: {win_rate:.2f}%")
logger.info(f"总盈亏: {total_profit:.2f}")
logger.info(f"总手续费: {total_fee:.2f}")
logger.info(f"总滑点成本: {total_slippage:.2f}")
logger.info(f"净利润: {net_profit:.2f}")
logger.info(f"平均每笔盈亏: {avg_profit:.2f}")
logger.info(f"平均每笔手续费: {avg_fee:.2f}")
# 按信号类型分析
logger.info("\n" + "=" * 30)
logger.info("📈 信号类型分析")
logger.info("=" * 30)
for signal_key, signal_stat in stats.items():
if signal_stat.count > 0:
logger.info(f"\n{signal_stat.signal_name}:")
logger.info(f" 信号次数: {signal_stat.count}")
logger.info(f" 胜率: {signal_stat.win_rate:.2f}%")
logger.info(f" 总盈亏: {signal_stat.total_profit:.2f}")
logger.info(f" 总手续费: {signal_stat.total_fee:.2f}")
logger.info(f" 总滑点: {signal_stat.total_slippage:.2f}")
logger.info(f" 净利润: {signal_stat.net_profit:.2f}")
logger.info(f" 平均盈亏: {signal_stat.avg_profit:.2f}")
# ===============================================================
# 🚀 启动主流程
# ===============================================================
if __name__ == '__main__':
# 配置日志
logger.add("backtest.log", rotation="1 day", retention="7 days")
# 创建回测配置
config = BacktestConfig(
take_profit=10.0, # 止盈10点
stop_loss=-1.0, # 止损1点
contract_size=10000, # 合约规模
open_fee=5.0, # 开仓手续费
close_fee_rate=0.0005, # 平仓手续费率
slippage_rate=0.0001, # 滑点率0.01%
start_date="2025-9-1",
end_date="2025-9-30",
enable_bear_bull_engulf=True,
enable_bull_bear_engulf=True
)
# 生成日期列表
dates = [f"2025-9-{i}" for i in range(1, 31)]
try:
# 执行回测
trades, stats = backtest(dates, config)
# 输出详细交易记录
logger.info("\n" + "=" * 80)
logger.info("📋 详细交易记录")
logger.info("=" * 80)
for i, trade in enumerate(trades, 1):
logger.info(
f"{i:3d}. {trade.entry_time.strftime('%m-%d %H:%M')} "
f"{trade.direction}({trade.signal_type}) "
f"入场={trade.entry_price:.2f} 出场={trade.exit_price:.2f} "
f"出场时间={trade.exit_time.strftime('%m-%d %H:%M')} "
f"盈亏={trade.profit_loss:.2f}点 金额={trade.profit_amount:.2f} "
f"手续费={trade.total_fee:.2f} 滑点={trade.slippage_cost:.2f} "
f"净利润={trade.net_profit:.2f}"
)
# 分析结果
analyze_results(trades, stats)
logger.info("\n✅ 回测完成!")
except Exception as e:
logger.error(f"回测执行失败: {e}")
raise
# ============== 生成一分钟K线可视化前10笔 ==============
try:
to_show = trades[:10]
for t in to_show:
visualize_trade_1min(
t,
minutes_before=30,
minutes_after=90,
take_profit_points=config.take_profit,
stop_loss_points=config.stop_loss
)
logger.info("已为前10笔交易生成一分钟K线图charts_1m 目录)")
except Exception as e:
logger.error(f"生成一分钟K线图时出错: {e}")

View File

@@ -304,3 +304,9 @@ if __name__ == '__main__':
# win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0
# print(
# f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}")
# 需要优化,目前有两种情况,第一种,同向,不如说上一单开单是涨,上一单还没有结束,当前信号来了,就不开单,等上一单到了上一单的止损位或者止盈位在平仓
# 第二种,方向,上一单是涨,上一单还没有结束,当前信号来了,是跌,然后就按照现在这个信号要开仓的位置,平掉上一单,然后开一单方向的,
# 一笔中可能有好几次信号,都按照上面的规则去判断,要保证同一时间,只会有一笔持仓,
# 打印每笔的交易详细,如果一笔中同向,输入为一条交易记录,一条加以记录能够直观的看出中间有多少笔