diff --git a/models/database.db b/models/database.db index 0ab4dde..ceb5482 100644 Binary files a/models/database.db and b/models/database.db differ diff --git a/weex/stock_data.xlsx b/weex/stock_data.xlsx deleted file mode 100644 index b08fa95..0000000 Binary files a/weex/stock_data.xlsx and /dev/null differ diff --git a/weex/读取数据库分析数据2.0-优化信号版.py b/weex/读取数据库分析数据2.0-优化信号版.py index ee54563..e4f0ac9 100644 --- a/weex/读取数据库分析数据2.0-优化信号版.py +++ b/weex/读取数据库分析数据2.0-优化信号版.py @@ -1,12 +1,15 @@ """ -量化交易回测系统 - 优化版(v2.1) +量化交易回测系统 - 优化版 功能:基于包住形态的交易信号识别和回测分析 作者:量化交易团队 +版本:2.0 """ import datetime +from typing import List, Dict, Tuple, Optional, Any from dataclasses import dataclass from loguru import logger +from peewee import fn from models.weex import Weex15, Weex1 @@ -17,27 +20,65 @@ from models.weex import Weex15, Weex1 @dataclass class BacktestConfig: """回测配置类""" + # 交易参数 take_profit: float = 8.0 # 止盈点数 stop_loss: float = -1.0 # 止损点数 contract_size: float = 10000 # 合约规模 open_fee: float = 5.0 # 开仓手续费 close_fee_rate: float = 0.0005 # 平仓手续费率 + # 回测日期范围 start_date: str = "2025-7-1" end_date: str = "2025-7-31" + # 信号参数 enable_bear_bull_engulf: bool = True # 涨包跌信号 enable_bull_bear_engulf: bool = True # 跌包涨信号 def __post_init__(self): + """验证配置参数""" if self.take_profit <= 0: raise ValueError("止盈点数必须大于0") if self.stop_loss >= 0: raise ValueError("止损点数必须小于0") +@dataclass +class TradeRecord: + """交易记录类""" + entry_time: datetime.datetime + exit_time: datetime.datetime + signal_type: str + direction: str + entry_price: float + exit_price: float + profit_loss: float + profit_amount: float + total_fee: float + net_profit: float + + +@dataclass +class SignalStats: + """信号统计类""" + signal_name: str + count: int = 0 + wins: int = 0 + total_profit: float = 0.0 + + @property + def win_rate(self) -> float: + """胜率计算""" + return (self.wins / self.count * 100) if self.count > 0 else 0.0 + + @property + def avg_profit(self) -> float: + """平均盈利""" + return self.total_profit / self.count if self.count > 0 else 0.0 + + # =============================================================== -# 📊 数据模块 +# 📊 数据获取模块 # =============================================================== def get_data_by_date(model, date_str): @@ -72,10 +113,12 @@ def get_future_data_1min(start_ts, end_ts): # =============================================================== -# 📈 信号模块 +# 📈 信号判定模块 # =============================================================== def is_bullish(c): return float(c['open']) < float(c['close']) + + def is_bearish(c): return float(c['open']) > float(c['close']) @@ -96,33 +139,44 @@ def check_signal(prev, curr): # =============================================================== -# 💹 模拟模块(1分钟级止盈止损) +# 💹 回测模拟模块(使用 1 分钟数据) # =============================================================== def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl=-1): - """用 1 分钟数据进行精细化止盈止损模拟""" + """ + 用 1 分钟数据进行精细化止盈止损模拟 + entry_time: 当前信号的 entry candle id(毫秒时间戳) + next_15min_time: 下一个15min时间戳,用于界定止盈止损分析范围 + + direction:信号类型 + entry_price:开仓价格 + entry_time:开仓时间 + next_15min_time:15分钟未来行情 + + """ + # 查 15 分钟之间的 1 分钟数据 future_candles = get_future_data_1min(entry_time, next_15min_time) if not future_candles: return None, 0, None - tp_price = entry_price + tp if direction == "long" else entry_price - tp - sl_price = entry_price + sl if direction == "long" else entry_price - sl + tp_price = entry_price + tp if direction == "long" else entry_price - tp # 止盈价位 + sl_price = entry_price + sl if direction == "long" else entry_price - sl # 止损价位 for candle in future_candles: open_p, high, low = map(float, (candle['open'], candle['high'], candle['low'])) - if direction == "long": - if open_p >= tp_price: # 跳空止盈 + if direction == "long": # long + if open_p >= tp_price: # 开盘跳空止盈 涨信号, return open_p, open_p - entry_price, candle['id'] - if open_p <= sl_price: # 跳空止损 + if open_p <= sl_price: # 开盘跳空止损 return open_p, open_p - entry_price, candle['id'] if high >= tp_price: return tp_price, tp, candle['id'] if low <= sl_price: return sl_price, sl, candle['id'] - else: # short - if open_p <= tp_price: + else: # short 跌信号 + if open_p <= tp_price: # return open_p, entry_price - open_p, candle['id'] if open_p >= sl_price: return open_p, entry_price - open_p, candle['id'] @@ -139,13 +193,22 @@ def simulate_trade(direction, entry_price, entry_time, next_15min_time, tp=8, sl # =============================================================== -# 📊 回测主流程 +# 📊 主回测流程 # =============================================================== def backtest(dates, tp, sl): + """ + datas:日期的列表 + + :param dates: + :param tp: + :param sl: + :return: + """ + all_data = [] for date_str in dates: - all_data.extend(get_data_by_date(Weex15, date_str)) + all_data.extend(get_data_by_date(Weex15, date_str)) # 获取每天的数据,15分钟k线数据 all_data.sort(key=lambda x: x['id']) @@ -155,52 +218,31 @@ def backtest(dates, tp, sl): } trades = [] - idx = 1 - open_position = None - while idx < len(all_data) - 1: - prev, curr = all_data[idx - 1], all_data[idx] - entry_candle = all_data[idx + 1] + for idx in range(1, len(all_data) - 1): + prev, curr = all_data[idx - 1], all_data[idx] # 前一笔,当前一笔 + entry_candle = all_data[idx + 1] # 下一笔开仓k线 + direction, signal = check_signal(prev, curr) - - # === 检查信号 === if not direction: - idx += 1 continue - # === 当前有持仓 === - if open_position: - if direction == open_position['direction']: - # 同方向信号:忽略 - idx += 1 - continue - else: - # 反方向信号:立即平仓 - exit_price = float(entry_candle['open']) - diff = (exit_price - open_position['entry_price']) if open_position['direction'] == 'long' else ( - open_position['entry_price'] - exit_price) - trades.append({ - "entry_time": datetime.datetime.fromtimestamp(open_position['entry_time'] / 1000), - "exit_time": datetime.datetime.fromtimestamp(entry_candle['id'] / 1000), - "signal": "反向平仓", - "direction": "平仓", - "entry": open_position['entry_price'], - "exit": exit_price, - "diff": diff - }) - open_position = None # 平仓后可立即反手 - - # === 开新仓 === + # 下一个 15 分钟K线的时间范围 next_15min_time = all_data[idx + 50]['id'] if idx + 50 < len(all_data) else all_data[-1]['id'] - entry_price = float(entry_candle['open']) + + entry_price = float(entry_candle['open']) # 开仓价格 exit_price, diff, exit_time = simulate_trade( - direction, entry_price, entry_candle['id'], next_15min_time, tp=tp, sl=sl + direction, + entry_price, + entry_candle['id'], + next_15min_time, + tp=tp, + sl=sl ) + if exit_price is None: - idx += 1 continue - # 记录统计 stats[signal]['count'] += 1 stats[signal]['total_profit'] += diff if diff > 0: @@ -216,25 +258,116 @@ def backtest(dates, tp, sl): "diff": diff }) - # === 跳过到平仓时间点 === - # 找到 exit_time 对应的 candle 索引,防止未平仓时重复触发信号 - while idx < len(all_data) - 1 and all_data[idx]['id'] < exit_time: - idx += 1 + return trades, stats - open_position = None # 已平仓 - idx += 1 + +def backtest_single_position(dates, tp, sl): + """单笔持仓回测,处理同向/反向信号""" + all_data = [] + for date_str in dates: + all_data.extend(get_data_by_date(Weex15, date_str)) + all_data.sort(key=lambda x: x['id']) + + stats = { + "bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "涨包跌"}, + "bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0, "name": "跌包涨"}, + } + + trades = [] + current_position = None # 当前持仓信息 + + for idx in range(1, len(all_data) - 1): + prev, curr = all_data[idx - 1], all_data[idx] + entry_candle = all_data[idx + 1] + + direction, signal = check_signal(prev, curr) + if not direction: + continue + + # 下一个 15 分钟K线的时间范围 + next_15min_time = all_data[idx + 50]['id'] if idx + 50 < len(all_data) else all_data[-1]['id'] + entry_price = float(entry_candle['open']) + + # 有持仓 + if current_position: + # 同向信号 -> 跳过 + if current_position['direction'] == direction: + continue + # 反向信号 -> 先平掉当前持仓,再开新仓 + else: + # 先按当前位置止盈止损平仓 + exit_price, diff, exit_time = simulate_trade( + current_position['direction'], + current_position['entry_price'], + current_position['entry_time'], + entry_candle['id'], + tp=tp, + sl=sl + ) + if exit_price is not None: + trades.append({ + "entry_time": datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + "exit_time": datetime.datetime.fromtimestamp(exit_time / 1000), + "signal": current_position['signal'], + "direction": "做多" if current_position['direction'] == "long" else "做空", + "entry": current_position['entry_price'], + "exit": exit_price, + "diff": diff + }) + # 更新统计 + stats_key = 'bear_bull_engulf' if current_position['signal'] == '涨包跌' else 'bull_bear_engulf' + stats[stats_key]['count'] += 1 + stats[stats_key]['total_profit'] += diff + if diff > 0: + stats[stats_key]['wins'] += 1 + + current_position = None # 清空持仓 + + # 开新仓 + current_position = { + "direction": direction, + "signal": stats[signal]['name'], + "entry_price": entry_price, + "entry_time": entry_candle['id'] + } + + # 最后一笔持仓如果未平仓,用最后收盘价平掉 + if current_position: + exit_price, diff, exit_time = simulate_trade( + current_position['direction'], + current_position['entry_price'], + current_position['entry_time'], + all_data[-1]['id'], + tp=tp, + sl=sl + ) + if exit_price is not None: + trades.append({ + "entry_time": datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + "exit_time": datetime.datetime.fromtimestamp(exit_time / 1000), + "signal": current_position['signal'], + "direction": "做多" if current_position['direction'] == "long" else "做空", + "entry": current_position['entry_price'], + "exit": exit_price, + "diff": diff + }) + stats_key = 'bear_bull_engulf' if current_position['signal'] == '涨包跌' else 'bull_bear_engulf' + stats[stats_key]['count'] += 1 + stats[stats_key]['total_profit'] += diff + if diff > 0: + stats[stats_key]['wins'] += 1 return trades, stats # =============================================================== -# 🚀 主入口 +# 🚀 启动主流程 # =============================================================== if __name__ == '__main__': dates = [f"2025-9-{i}" for i in range(1, 31)] - trades, stats = backtest(dates, tp=50, sl=-10) + trades, stats = backtest_single_position(dates, tp=10000, sl=-10) logger.info("===== 每笔交易详情 =====") for t in trades: @@ -246,13 +379,32 @@ if __name__ == '__main__': total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades) total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades) + # print(f"止盈:{i1}, 止损:{i}") print(f"\n一共交易笔数:{len(trades)}") - print(f"总盈利:{total_profit:.2f}") - print(f"总手续费:{total_fee:.2f}") + print(f"一共盈利:{total_profit:.2f}") + print(f"一共手续费:{total_fee:.2f}") print(f"净利润:{total_profit - total_fee:.2f}") print("\n===== 信号统计 =====") - for k, v in stats.items(): - win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0 - print(f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}") + # =============================================================================================================================== + + # for i in range(1, 16): + # for i1 in range(1, 51): + # trades, stats = backtest_single_position(dates, tp=i1, sl=-i) + # + # total_profit = sum(t['diff'] / t['entry'] * 10000 for t in trades) + # total_fee = sum(5 + 10000 / t['entry'] * t['exit'] * 0.0005 for t in trades) + # + # if total_profit > total_fee * 0.1: + # print("\n===== 信号统计 =====") + # print(f"止盈:{i1}, 止损:{i}") + # print(f"\n一共交易笔数:{len(trades)}") + # print(f"一共盈利:{total_profit:.2f}") + # print(f"一共手续费:{total_fee:.2f}") + # print(f"净利润:{total_profit - total_fee * 0.1}") + +# 需要优化,目前有两种情况,第一种,同向,不如说上一单开单是涨,上一单还没有结束,当前信号来了,就不开单,等上一单到了上一单的止损位或者止盈位在平仓 +# 第二种,方向,上一单是涨,上一单还没有结束,当前信号来了,是跌,然后就按照现在这个信号要开仓的位置,平掉上一单,然后开一单方向的, +# 一笔中可能有好几次信号,都按照上面的规则去判断,要保证同一时间,只会有一笔持仓, +# 打印每笔的交易详细,如果一笔中同向,输入为一条交易记录 diff --git a/weex/读取数据库分析数据2.0-优化信号版2.0.py b/weex/读取数据库分析数据2.0-优化信号版2.0.py new file mode 100644 index 0000000..245133f --- /dev/null +++ b/weex/读取数据库分析数据2.0-优化信号版2.0.py @@ -0,0 +1,793 @@ +""" +量化交易回测系统 - 优化修正版 +功能:基于包住形态的交易信号识别和回测分析 +作者:量化交易团队 +版本:2.1 (修正版) +""" + +import datetime +import os +from pathlib import Path +from typing import List, Dict, Tuple, Optional, Any +import pandas as pd +import plotly.graph_objects as go +from typing import List, Dict, Tuple, Optional, Any +from dataclasses import dataclass +from loguru import logger +from peewee import fn +from models.weex import Weex15, Weex1 + + +# =============================================================== +# 📊 配置管理类 +# =============================================================== + +@dataclass +class BacktestConfig: + """回测配置类""" + # 交易参数 + take_profit: float = 8.0 # 止盈点数 + stop_loss: float = -1.0 # 止损点数 + contract_size: float = 10000 # 合约规模 + open_fee: float = 5.0 # 开仓手续费 + close_fee_rate: float = 0.0005 # 平仓手续费率 + slippage_rate: float = 0.0001 # 滑点率 0.01% + + # 回测日期范围 + start_date: str = "2025-7-1" + end_date: str = "2025-7-31" + + # 信号参数 + enable_bear_bull_engulf: bool = True # 涨包跌信号 + enable_bull_bear_engulf: bool = True # 跌包涨信号 + + def __post_init__(self): + """验证配置参数""" + if self.take_profit <= 0: + raise ValueError("止盈点数必须大于0") + if self.stop_loss >= 0: + raise ValueError("止损点数必须小于0") + if self.slippage_rate < 0 or self.slippage_rate > 0.01: + raise ValueError("滑点率应在0-1%之间") + + +@dataclass +class TradeRecord: + """交易记录类""" + entry_time: datetime.datetime + exit_time: datetime.datetime + signal_type: str + direction: str + entry_price: float + exit_price: float + profit_loss: float + profit_amount: float + total_fee: float + net_profit: float + slippage_cost: float = 0.0 + + +@dataclass +class SignalStats: + """信号统计类""" + signal_name: str + count: int = 0 + wins: int = 0 + total_profit: float = 0.0 + total_fee: float = 0.0 + total_slippage: float = 0.0 + + @property + def win_rate(self) -> float: + """胜率计算""" + return (self.wins / self.count * 100) if self.count > 0 else 0.0 + + @property + def avg_profit(self) -> float: + """平均盈利""" + return self.total_profit / self.count if self.count > 0 else 0.0 + + @property + def net_profit(self) -> float: + """净利润(扣除手续费和滑点)""" + return self.total_profit - self.total_fee - self.total_slippage + + +@dataclass +class PositionState: + direction: Optional[str] = None # "long" | "short" + entry_price: Optional[float] = None + entry_time: Optional[int] = None # ms + last_checked_time: Optional[int] = None # ms + + +# =============================================================== +# 📊 数据获取模块 +# =============================================================== + +def get_data_by_date(model, date_str): + """按天获取指定表的数据""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + logger.error("日期格式不正确,请使用 YYYY-MM-DD 格式。") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + try: + query = (model + .select() + .where(model.id.between(start_ts, end_ts)) + .order_by(model.id.asc())) + + data = [] + for i in query: + # 验证数据完整性 + if all(hasattr(i, attr) for attr in ['open', 'high', 'low', 'close']): + data.append({ + 'id': i.id, + 'open': float(i.open), + 'high': float(i.high), + 'low': float(i.low), + 'close': float(i.close) + }) + + logger.info(f"获取到 {len(data)} 条 {date_str} 的数据") + return data + + except Exception as e: + logger.error(f"获取数据失败 {date_str}: {e}") + return [] + + +def get_future_data_1min(start_ts, end_ts): + """获取指定时间范围内的 1 分钟数据""" + try: + query = (Weex1 + .select() + .where(Weex1.id.between(start_ts, end_ts)) + .order_by(Weex1.id.asc())) + + data = [] + for i in query: + if all(hasattr(i, attr) for attr in ['open', 'high', 'low', 'close']): + data.append({ + 'id': i.id, + 'open': float(i.open), + 'high': float(i.high), + 'low': float(i.low), + 'close': float(i.close) + }) + + return data + + except Exception as e: + logger.error(f"获取1分钟数据失败: {e}") + return [] + + +def get_1min_window(center_ts_ms: int, minutes_before: int = 30, minutes_after: int = 60): + """基于中心时间,获取前后窗口的一分钟K线数据。 + 返回按时间升序的列表[{id, open, high, low, close}]。 + """ + try: + start_ts = center_ts_ms - minutes_before * 60 * 1000 + end_ts = center_ts_ms + minutes_after * 60 * 1000 + return get_future_data_1min(start_ts, end_ts) + except Exception as e: + logger.error(f"获取一分钟窗口数据失败: {e}") + return [] + + +# =============================================================== +# 📈 信号判定模块(修正版) +# =============================================================== + +def is_bullish(candle): + """判断是否为阳线""" + return float(candle['open']) < float(candle['close']) + + +def is_bearish(candle): + """判断是否为阴线""" + return float(candle['open']) > float(candle['close']) + + +def check_signal(prev, curr): + """ + 判断是否出现包住形态(修正版) + + 包住形态定义: + - 看涨包住:前一根阴线,当前阳线完全包含前一根阴线的实体 + - 看跌包住:前一根阳线,当前阴线完全包含前一根阳线的实体 + """ + try: + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 确保数据有效 + if not all(isinstance(x, (int, float)) and x > 0 for x in [p_open, p_close, c_open, c_close]): + return None, None + + # 看涨包住:前一根是阴线,当前是阳线,且当前阳线完全包住前一根阴线 + if (is_bearish(prev) and is_bullish(curr) and + c_open <= p_close and c_close >= p_open): + logger.debug(f"发现看涨包住信号: 前阴线({p_open:.2f}-{p_close:.2f}) 当前阳线({c_open:.2f}-{c_close:.2f})") + return "long", "bear_bull_engulf" + + # 看跌包住:前一根是阳线,当前是阴线,且当前阴线完全包住前一根阳线 + if (is_bullish(prev) and is_bearish(curr) and + c_open >= p_close and c_close <= p_open): + logger.debug(f"发现看跌包住信号: 前阳线({p_open:.2f}-{p_close:.2f}) 当前阴线({c_open:.2f}-{c_close:.2f})") + return "short", "bull_bear_engulf" + + return None, None + + except Exception as e: + logger.error(f"信号判断出错: {e}") + return None, None + + +# =============================================================== +# 💹 回测模拟模块(修正版) +# =============================================================== + +def simulate_trade(direction, entry_price, entry_time, next_15min_time, config: BacktestConfig): + """ + 用 1 分钟数据进行精细化止盈止损模拟(修正版) + + Args: + direction: 交易方向 ("long" 或 "short") + entry_price: 开仓价格 + entry_time: 开仓时间(毫秒时间戳) + next_15min_time: 下一个15分钟K线时间戳 + config: 回测配置 + + Returns: + (exit_price, profit_loss_points, exit_time, slippage_cost) + """ + try: + # 获取未来1分钟数据 + future_candles = get_future_data_1min(entry_time, next_15min_time) + if not future_candles: + logger.warning(f"未获取到1分钟数据: {entry_time} - {next_15min_time}") + return None, 0, None, 0 + + # 计算止盈止损价格 + if direction == "long": + tp_price = entry_price + config.take_profit + sl_price = entry_price + config.stop_loss + else: # short + tp_price = entry_price - config.take_profit + sl_price = entry_price - config.stop_loss + + slippage_cost = 0.0 + + for candle in future_candles: + open_p, high, low, close = map(float, (candle['open'], candle['high'], candle['low'], candle['close'])) + + if direction == "long": + # 检查止损 + if low <= sl_price: + exit_price = sl_price - (sl_price * config.slippage_rate) + slippage_cost = sl_price * config.slippage_rate + profit_loss = config.stop_loss - slippage_cost + logger.debug(f"多头止损触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 亏损{profit_loss:.2f}") + return exit_price, profit_loss, candle['id'], slippage_cost + + # 检查止盈 + if high >= tp_price: + exit_price = tp_price - (tp_price * config.slippage_rate) + slippage_cost = tp_price * config.slippage_rate + profit_loss = config.take_profit - slippage_cost + logger.debug(f"多头止盈触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈利{profit_loss:.2f}") + return exit_price, profit_loss, candle['id'], slippage_cost + + else: # short + # 检查止损 + if high >= sl_price: + exit_price = sl_price + (sl_price * config.slippage_rate) + slippage_cost = sl_price * config.slippage_rate + profit_loss = config.stop_loss - slippage_cost + logger.debug(f"空头止损触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 亏损{profit_loss:.2f}") + return exit_price, profit_loss, candle['id'], slippage_cost + + # 检查止盈 + if low <= tp_price: + exit_price = tp_price + (tp_price * config.slippage_rate) + slippage_cost = tp_price * config.slippage_rate + profit_loss = config.take_profit - slippage_cost + logger.debug(f"空头止盈触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈利{profit_loss:.2f}") + return exit_price, profit_loss, candle['id'], slippage_cost + + # 未触发止盈止损,用最后一根收盘价平仓 + final_candle = future_candles[-1] + final_price = float(final_candle['close']) + + if direction == "long": + exit_price = final_price - (final_price * config.slippage_rate) + profit_loss = (exit_price - entry_price) + else: + exit_price = final_price + (final_price * config.slippage_rate) + profit_loss = (entry_price - exit_price) + + slippage_cost = final_price * config.slippage_rate + logger.debug(f"时间到期平仓: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈亏{profit_loss:.2f}") + + return exit_price, profit_loss, final_candle['id'], slippage_cost + + except Exception as e: + logger.error(f"交易模拟出错: {e}") + return None, 0, None, 0 + + +def simulate_until(direction, entry_price, entry_time, end_time, config: BacktestConfig): + """ + 从entry_time开始向后检查直到end_time(不跨越end_time), + 返回在此区间内是否触发TP/SL以及对应的退出信息。 + 若未触发,返回(None, 0, None, 0)。 + """ + try: + candles = get_future_data_1min(entry_time, end_time) + if not candles: + return None, 0, None, 0 + + if direction == "long": + tp_price = entry_price + config.take_profit + sl_price = entry_price + config.stop_loss + else: + tp_price = entry_price - config.take_profit + sl_price = entry_price - config.stop_loss + + for candle in candles: + open_p, high, low = map(float, (candle['open'], candle['high'], candle['low'])) + if direction == "long": + if low <= sl_price: + exit_price = sl_price - (sl_price * config.slippage_rate) + slippage_cost = sl_price * config.slippage_rate + return exit_price, config.stop_loss - slippage_cost, candle['id'], slippage_cost + if high >= tp_price: + exit_price = tp_price - (tp_price * config.slippage_rate) + slippage_cost = tp_price * config.slippage_rate + return exit_price, config.take_profit - slippage_cost, candle['id'], slippage_cost + else: + if high >= sl_price: + exit_price = sl_price + (sl_price * config.slippage_rate) + slippage_cost = sl_price * config.slippage_rate + return exit_price, config.stop_loss - slippage_cost, candle['id'], slippage_cost + if low <= tp_price: + exit_price = tp_price + (tp_price * config.slippage_rate) + slippage_cost = tp_price * config.slippage_rate + return exit_price, config.take_profit - slippage_cost, candle['id'], slippage_cost + return None, 0, None, 0 + except Exception as e: + logger.error(f"分段交易模拟出错: {e}") + return None, 0, None, 0 + + +def calculate_fees(entry_price, exit_price, config: BacktestConfig): + """计算手续费""" + open_fee = config.open_fee + close_fee = config.contract_size * config.close_fee_rate + return open_fee + close_fee + + +# =============================================================== +# 📈 可视化:一分钟K线与开仓位置 +# =============================================================== + +def _to_datetime(ms: int) -> datetime.datetime: + return datetime.datetime.fromtimestamp(ms / 1000) + + +def visualize_trade_1min(trade: 'TradeRecord', *, + minutes_before: int = 30, + minutes_after: int = 60, + take_profit_points: Optional[float] = None, + stop_loss_points: Optional[float] = None, + output_dir: str = "charts_1m") -> Optional[str]: + """生成指定交易周边的一分钟K线图(Plotly HTML)。 + + Args: + trade: 回测产生的交易记录 + minutes_before: 开仓前取多少分钟的数据 + minutes_after: 开仓后取多少分钟的数据 + take_profit_points: 可选,绘制入场±止盈线(点) + stop_loss_points: 可选,绘制入场±止损线(点) + output_dir: 输出目录 + + Returns: + 生成的HTML路径,失败返回None。 + """ + try: + entry_ms = int(trade.entry_time.timestamp() * 1000) + data = get_1min_window(entry_ms, minutes_before, minutes_after) + if not data: + logger.warning("一分钟数据为空,跳过可视化") + return None + + # 构造DataFrame以便排序与渲染 + df = pd.DataFrame(data) + df = df.sort_values('id').reset_index(drop=True) + df['time'] = df['id'].apply(lambda x: _to_datetime(int(x))) + df['open'] = df['open'].astype(float) + df['high'] = df['high'].astype(float) + df['low'] = df['low'].astype(float) + df['close'] = df['close'].astype(float) + + fig = go.Figure() + fig.add_trace(go.Candlestick( + x=df['time'], + open=df['open'], high=df['high'], low=df['low'], close=df['close'], + name="1分钟K线" + )) + + # 将入场/出场时间吸附到最近的一分钟K线时间(提高对齐准确度) + def _snap_time(target_dt: datetime.datetime, tolerance_ms: int = 90_000): + target_ms = int(target_dt.timestamp() * 1000) + diffs = (df['id'] - target_ms).abs() + idx = int(diffs.idxmin()) + if abs(int(df.at[idx, 'id']) - target_ms) <= tolerance_ms: + return df.at[idx, 'time'] + return target_dt + + snapped_entry_time = _snap_time(trade.entry_time) + snapped_exit_time = _snap_time(trade.exit_time) + + # 标注入场、出场 + entry_y = float(trade.entry_price) + exit_y = float(trade.exit_price) + fig.add_trace(go.Scatter( + x=[snapped_entry_time], + y=[entry_y], + mode="markers+text", + name="入场", + text=["入场"], + textposition="top center", + marker=dict(color="#2ecc71", size=10, symbol="triangle-up") + )) + fig.add_trace(go.Scatter( + x=[snapped_exit_time], + y=[exit_y], + mode="markers+text", + name="出场", + text=["出场"], + textposition="bottom center", + marker=dict(color="#e74c3c", size=10, symbol="x") + )) + + # 可选:止盈止损参考线 + shapes = [] + annotations = [] + if take_profit_points is not None: + tp_price = entry_y + take_profit_points if trade.direction == "做多" else entry_y - take_profit_points + shapes.append(dict(type="line", xref="x", yref="y", + x0=df['time'].min(), x1=df['time'].max(), y0=tp_price, y1=tp_price, + line=dict(color="rgba(46, 204, 113, 0.5)", width=1, dash="dash"))) + annotations.append(dict(xref="x", yref="y", x=df['time'].min(), y=tp_price, + text="TP", showarrow=False, font=dict(color="#2ecc71"))) + if stop_loss_points is not None: + sl_price = entry_y + stop_loss_points if trade.direction == "做多" else entry_y - stop_loss_points + shapes.append(dict(type="line", xref="x", yref="y", + x0=df['time'].min(), x1=df['time'].max(), y0=sl_price, y1=sl_price, + line=dict(color="rgba(231, 76, 60, 0.5)", width=1, dash="dot"))) + annotations.append(dict(xref="x", yref="y", x=df['time'].min(), y=sl_price, + text="SL", showarrow=False, font=dict(color="#e74c3c"))) + + title = ( + f"{trade.entry_time.strftime('%Y-%m-%d %H:%M')} 开仓 - {trade.direction}({trade.signal_type}) " + f"入场={trade.entry_price:.2f} 出场={trade.exit_price:.2f} 盈亏={trade.profit_loss:.2f}点" + ) + fig.update_layout( + title=title, + xaxis_title="时间", + yaxis_title="价格", + xaxis=dict(rangeslider=dict(visible=False)), + shapes=shapes, + annotations=annotations, + hovermode="x unified" + ) + + Path(output_dir).mkdir(parents=True, exist_ok=True) + fname = f"trade_{trade.entry_time.strftime('%Y%m%d_%H%M%S')}.html" + out_path = os.path.join(output_dir, fname) + fig.write_html(out_path, include_plotlyjs="cdn", auto_open=False) + logger.info(f"一分钟K线可视化已生成: {out_path}") + return out_path + except Exception as e: + logger.error(f"生成一分钟K线图失败: {e}") + return None + + +# =============================================================== +# 📊 主回测流程(修正版) +# =============================================================== + +def backtest(dates, config: BacktestConfig): + """ + 主回测函数(修正版) + + Args: + dates: 日期列表 + config: 回测配置 + + Returns: + (trades, stats) + """ + logger.info(f"开始回测,日期范围: {dates[0]} 到 {dates[-1]}") + + # 获取所有15分钟K线数据 + all_data = [] + for date_str in dates: + daily_data = get_data_by_date(Weex15, date_str) + if daily_data: + all_data.extend(daily_data) + else: + logger.warning(f"日期 {date_str} 没有数据") + + if not all_data: + logger.error("没有获取到任何数据") + return [], {} + + all_data.sort(key=lambda x: x['id']) + logger.info(f"总共获取到 {len(all_data)} 条15分钟K线数据") + + # 初始化统计 + stats = { + "bear_bull_engulf": SignalStats(signal_name="看涨包住"), + "bull_bear_engulf": SignalStats(signal_name="看跌包住"), + } + + trades = [] + total_trades = 0 + + # 主回测循环(加入持仓管理:同向不加仓,反向平旧开新) + position = PositionState() + for idx in range(1, len(all_data) - 1): + try: + prev, curr = all_data[idx - 1], all_data[idx] + + # 检查信号 + direction, signal = check_signal(prev, curr) + if not direction: + continue + + # 检查信号是否启用 + if signal == "bear_bull_engulf" and not config.enable_bear_bull_engulf: + continue + if signal == "bull_bear_engulf" and not config.enable_bull_bear_engulf: + continue + + # 当前K线时间与价格 + current_time = curr['id'] + current_close = float(curr['close']) + + # 若有持仓,先滚动检查从上次检查时间到当前时间是否触发TP/SL + if position.direction is not None: + check_from = position.last_checked_time or position.entry_time + if current_time > check_from: + e_price, pl_pts, e_time, slip = simulate_until( + position.direction, position.entry_price, check_from, current_time, config + ) + if e_price is not None: + # 生成平仓记录(由持仓信号驱动,不计入当前信号统计) + total_fee = calculate_fees(position.entry_price, e_price, config) + profit_amount = pl_pts * config.contract_size + trade = TradeRecord( + entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000), + exit_time=datetime.datetime.fromtimestamp(e_time / 1000), + signal_type="持仓止盈/止损", + direction="做多" if position.direction == "long" else "做空", + entry_price=position.entry_price, + exit_price=e_price, + profit_loss=pl_pts, + profit_amount=profit_amount, + total_fee=total_fee, + net_profit=profit_amount - total_fee, + slippage_cost=slip * config.contract_size + ) + trades.append(trade) + total_trades += 1 + # 清空持仓 + position = PositionState() + else: + position.last_checked_time = current_time + + # 根据信号与持仓关系决定是否开/平仓 + if direction: + if position.direction is None: + # 无持仓 -> 开仓 + position = PositionState(direction=direction, entry_price=current_close, entry_time=current_time, last_checked_time=current_time) + else: + if position.direction == direction: + # 同向信号,不加仓,保持原持仓 + pass + else: + # 反向信号:先以当前价立即平旧仓,再开新仓 + e_price = current_close + if position.direction == "long": + pl_pts = e_price - position.entry_price + else: + pl_pts = position.entry_price - e_price + total_fee = calculate_fees(position.entry_price, e_price, config) + profit_amount = pl_pts * config.contract_size + trade = TradeRecord( + entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000), + exit_time=datetime.datetime.fromtimestamp(current_time / 1000), + signal_type="反向信号平仓", + direction="做多" if position.direction == "long" else "做空", + entry_price=position.entry_price, + exit_price=e_price, + profit_loss=pl_pts, + profit_amount=profit_amount, + total_fee=total_fee, + net_profit=profit_amount - total_fee, + slippage_cost=0.0 + ) + trades.append(trade) + total_trades += 1 + # 开新仓 + position = PositionState(direction=direction, entry_price=current_close, entry_time=current_time, last_checked_time=current_time) + + except Exception as e: + logger.error(f"处理第 {idx} 条数据时出错: {e}") + continue + + # 循环结束后,如仍有持仓,按最后一根收盘价平仓 + if position.direction is not None: + final = all_data[-1] + final_time = final['id'] + final_price = float(final['close']) + if position.direction == "long": + pl_pts = final_price - position.entry_price + else: + pl_pts = position.entry_price - final_price + total_fee = calculate_fees(position.entry_price, final_price, config) + profit_amount = pl_pts * config.contract_size + trade = TradeRecord( + entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000), + exit_time=datetime.datetime.fromtimestamp(final_time / 1000), + signal_type="时间到期平仓", + direction="做多" if position.direction == "long" else "做空", + entry_price=position.entry_price, + exit_price=final_price, + profit_loss=pl_pts, + profit_amount=profit_amount, + total_fee=total_fee, + net_profit=profit_amount - total_fee, + slippage_cost=0.0 + ) + trades.append(trade) + total_trades += 1 + position = PositionState() + + logger.info(f"回测完成,总共 {total_trades} 笔交易") + return trades, stats + + +# =============================================================== +# 📊 结果分析模块 +# =============================================================== + +def analyze_results(trades, stats): + """分析回测结果""" + if not trades: + logger.warning("没有交易记录") + return + + total_trades = len(trades) + total_profit = sum(t.profit_amount for t in trades) + total_fee = sum(t.total_fee for t in trades) + total_slippage = sum(t.slippage_cost for t in trades) + net_profit = total_profit - total_fee - total_slippage + + wins = sum(1 for t in trades if t.profit_loss > 0) + losses = total_trades - wins + win_rate = (wins / total_trades * 100) if total_trades > 0 else 0 + + avg_profit = total_profit / total_trades if total_trades > 0 else 0 + avg_fee = total_fee / total_trades if total_trades > 0 else 0 + + logger.info("=" * 50) + logger.info("📊 回测结果汇总") + logger.info("=" * 50) + logger.info(f"总交易次数: {total_trades}") + logger.info(f"盈利次数: {wins}") + logger.info(f"亏损次数: {losses}") + logger.info(f"胜率: {win_rate:.2f}%") + logger.info(f"总盈亏: {total_profit:.2f}") + logger.info(f"总手续费: {total_fee:.2f}") + logger.info(f"总滑点成本: {total_slippage:.2f}") + logger.info(f"净利润: {net_profit:.2f}") + logger.info(f"平均每笔盈亏: {avg_profit:.2f}") + logger.info(f"平均每笔手续费: {avg_fee:.2f}") + + # 按信号类型分析 + logger.info("\n" + "=" * 30) + logger.info("📈 信号类型分析") + logger.info("=" * 30) + + for signal_key, signal_stat in stats.items(): + if signal_stat.count > 0: + logger.info(f"\n{signal_stat.signal_name}:") + logger.info(f" 信号次数: {signal_stat.count}") + logger.info(f" 胜率: {signal_stat.win_rate:.2f}%") + logger.info(f" 总盈亏: {signal_stat.total_profit:.2f}") + logger.info(f" 总手续费: {signal_stat.total_fee:.2f}") + logger.info(f" 总滑点: {signal_stat.total_slippage:.2f}") + logger.info(f" 净利润: {signal_stat.net_profit:.2f}") + logger.info(f" 平均盈亏: {signal_stat.avg_profit:.2f}") + + +# =============================================================== +# 🚀 启动主流程 +# =============================================================== + +if __name__ == '__main__': + # 配置日志 + logger.add("backtest.log", rotation="1 day", retention="7 days") + + # 创建回测配置 + config = BacktestConfig( + take_profit=10.0, # 止盈10点 + stop_loss=-1.0, # 止损1点 + contract_size=10000, # 合约规模 + open_fee=5.0, # 开仓手续费 + close_fee_rate=0.0005, # 平仓手续费率 + slippage_rate=0.0001, # 滑点率0.01% + start_date="2025-9-1", + end_date="2025-9-30", + enable_bear_bull_engulf=True, + enable_bull_bear_engulf=True + ) + + # 生成日期列表 + dates = [f"2025-9-{i}" for i in range(1, 31)] + + try: + # 执行回测 + trades, stats = backtest(dates, config) + + # 输出详细交易记录 + logger.info("\n" + "=" * 80) + logger.info("📋 详细交易记录") + logger.info("=" * 80) + + for i, trade in enumerate(trades, 1): + logger.info( + f"{i:3d}. {trade.entry_time.strftime('%m-%d %H:%M')} " + f"{trade.direction}({trade.signal_type}) " + f"入场={trade.entry_price:.2f} 出场={trade.exit_price:.2f} " + f"出场时间={trade.exit_time.strftime('%m-%d %H:%M')} " + f"盈亏={trade.profit_loss:.2f}点 金额={trade.profit_amount:.2f} " + f"手续费={trade.total_fee:.2f} 滑点={trade.slippage_cost:.2f} " + f"净利润={trade.net_profit:.2f}" + ) + + # 分析结果 + analyze_results(trades, stats) + + logger.info("\n✅ 回测完成!") + + except Exception as e: + logger.error(f"回测执行失败: {e}") + raise + + # ============== 生成一分钟K线可视化(前10笔) ============== + try: + to_show = trades[:10] + for t in to_show: + visualize_trade_1min( + t, + minutes_before=30, + minutes_after=90, + take_profit_points=config.take_profit, + stop_loss_points=config.stop_loss + ) + logger.info("已为前10笔交易生成一分钟K线图(charts_1m 目录)") + except Exception as e: + logger.error(f"生成一分钟K线图时出错: {e}") + + diff --git a/weex/读取数据库分析数据2.0.py b/weex/读取数据库分析数据2.0.py index 3e0b1b3..7379d47 100644 --- a/weex/读取数据库分析数据2.0.py +++ b/weex/读取数据库分析数据2.0.py @@ -304,3 +304,9 @@ if __name__ == '__main__': # win_rate = (v['wins'] / v['count'] * 100) if v['count'] > 0 else 0 # print( # f"{v['name']} ({k}) - 信号数: {v['count']} | 胜率: {win_rate:.2f}% | 总盈利: {v['total_profit']:.2f}") + + +# 需要优化,目前有两种情况,第一种,同向,不如说上一单开单是涨,上一单还没有结束,当前信号来了,就不开单,等上一单到了上一单的止损位或者止盈位在平仓 +# 第二种,方向,上一单是涨,上一单还没有结束,当前信号来了,是跌,然后就按照现在这个信号要开仓的位置,平掉上一单,然后开一单方向的, +# 一笔中可能有好几次信号,都按照上面的规则去判断,要保证同一时间,只会有一笔持仓, +# 打印每笔的交易详细,如果一笔中同向,输入为一条交易记录,一条加以记录能够直观的看出中间有多少笔