diff --git a/weex/1.html b/weex/1.html new file mode 100644 index 0000000..1d9a420 --- /dev/null +++ b/weex/1.html @@ -0,0 +1,212 @@ + + + + + + Stock Details + + + +
+ + +
+ +
+
Symbol Info
+
Advanced Chart
+
Company Profile
+
Fundamental Data
+
Technical Analysis
+
Top Stories
+
+ + + +

+ Charts and financial information provided by TradingView, a popular + charting & trading platform. Check out even more + advanced features + or grab charts for + your website. +

+
+
+ + + + \ No newline at end of file diff --git a/weex/backtest_trades.html b/weex/backtest_trades.html new file mode 100644 index 0000000..23a4ad6 --- /dev/null +++ b/weex/backtest_trades.html @@ -0,0 +1,7 @@ + + + +
+
+ + \ No newline at end of file diff --git a/weex/读取数据库分析数据2.0-优化信号版2.0.py b/weex/读取数据库分析数据2.0-优化信号版2.0.py index 245133f..5871f02 100644 --- a/weex/读取数据库分析数据2.0-优化信号版2.0.py +++ b/weex/读取数据库分析数据2.0-优化信号版2.0.py @@ -1,21 +1,25 @@ """ -量化交易回测系统 - 优化修正版 +量化交易回测系统 - 优化版 功能:基于包住形态的交易信号识别和回测分析 作者:量化交易团队 -版本:2.1 (修正版) +版本:2.1(修复15分钟窗口、统计键名、费用口径、边界处理) """ import datetime -import os -from pathlib import Path -from typing import List, Dict, Tuple, Optional, Any -import pandas as pd -import plotly.graph_objects as go from typing import List, Dict, Tuple, Optional, Any from dataclasses import dataclass from loguru import logger -from peewee import fn from models.weex import Weex15, Weex1 +import os + +try: + # 可视化依赖(若未安装,将在运行时给出提示) + import plotly.graph_objects as go + from plotly.subplots import make_subplots + _PLOTLY_AVAILABLE = True +except Exception as _e: + _PLOTLY_AVAILABLE = False + _PLOTLY_IMPORT_ERROR = _e # =============================================================== @@ -25,21 +29,20 @@ from models.weex import Weex15, Weex1 @dataclass class BacktestConfig: """回测配置类""" - # 交易参数 - take_profit: float = 8.0 # 止盈点数 - stop_loss: float = -1.0 # 止损点数 - contract_size: float = 10000 # 合约规模 - open_fee: float = 5.0 # 开仓手续费 - close_fee_rate: float = 0.0005 # 平仓手续费率 - slippage_rate: float = 0.0001 # 滑点率 0.01% + # 交易参数(注意:tp/sl单位与K线报价同单位,diff = 价格差值) + take_profit: float = 8.0 # 止盈点数(价格差) + stop_loss: float = -1.0 # 止损点数(价格差,负数) + contract_size: float = 10000 # 合约规模(每1点价格差对应的盈亏金额 = diff * contract_size) + open_fee: float = 5.0 # 开仓固定手续费(金额) + close_fee_rate: float = 0.0005 # 平仓按成交额比例的手续费(金额 = rate * exit_price * contract_size) - # 回测日期范围 - start_date: str = "2025-7-1" - end_date: str = "2025-7-31" + # 回测日期范围(仅示例:若不传dates则使用) + start_date: str = "2025-07-01" + end_date: str = "2025-07-31" - # 信号参数 - enable_bear_bull_engulf: bool = True # 涨包跌信号 - enable_bull_bear_engulf: bool = True # 跌包涨信号 + # 信号参数(保留扩展位) + enable_bear_bull_engulf: bool = True # 涨包跌信号(前跌后涨包住 -> 做多) + enable_bull_bear_engulf: bool = True # 跌包涨信号(前涨后跌包住 -> 做空) def __post_init__(self): """验证配置参数""" @@ -47,66 +50,14 @@ class BacktestConfig: raise ValueError("止盈点数必须大于0") if self.stop_loss >= 0: raise ValueError("止损点数必须小于0") - if self.slippage_rate < 0 or self.slippage_rate > 0.01: - raise ValueError("滑点率应在0-1%之间") - - -@dataclass -class TradeRecord: - """交易记录类""" - entry_time: datetime.datetime - exit_time: datetime.datetime - signal_type: str - direction: str - entry_price: float - exit_price: float - profit_loss: float - profit_amount: float - total_fee: float - net_profit: float - slippage_cost: float = 0.0 - - -@dataclass -class SignalStats: - """信号统计类""" - signal_name: str - count: int = 0 - wins: int = 0 - total_profit: float = 0.0 - total_fee: float = 0.0 - total_slippage: float = 0.0 - - @property - def win_rate(self) -> float: - """胜率计算""" - return (self.wins / self.count * 100) if self.count > 0 else 0.0 - - @property - def avg_profit(self) -> float: - """平均盈利""" - return self.total_profit / self.count if self.count > 0 else 0.0 - - @property - def net_profit(self) -> float: - """净利润(扣除手续费和滑点)""" - return self.total_profit - self.total_fee - self.total_slippage - - -@dataclass -class PositionState: - direction: Optional[str] = None # "long" | "short" - entry_price: Optional[float] = None - entry_time: Optional[int] = None # ms - last_checked_time: Optional[int] = None # ms # =============================================================== # 📊 数据获取模块 # =============================================================== -def get_data_by_date(model, date_str): - """按天获取指定表的数据""" +def get_data_by_date(model, date_str: str) -> List[Dict[str, Any]]: + """按天获取指定表的数据;要求 model.id 为毫秒时间戳""" try: target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') except ValueError: @@ -116,609 +67,547 @@ def get_data_by_date(model, date_str): start_ts = int(target_date.timestamp() * 1000) end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 - try: - query = (model - .select() - .where(model.id.between(start_ts, end_ts)) - .order_by(model.id.asc())) + query = (model + .select() + .where(model.id.between(start_ts, end_ts)) + .order_by(model.id.asc())) - data = [] - for i in query: - # 验证数据完整性 - if all(hasattr(i, attr) for attr in ['open', 'high', 'low', 'close']): - data.append({ - 'id': i.id, - 'open': float(i.open), - 'high': float(i.high), - 'low': float(i.low), - 'close': float(i.close) - }) + return [ + {'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} + for i in query + ] - logger.info(f"获取到 {len(data)} 条 {date_str} 的数据") - return data - except Exception as e: - logger.error(f"获取数据失败 {date_str}: {e}") - return [] - - -def get_future_data_1min(start_ts, end_ts): - """获取指定时间范围内的 1 分钟数据""" - try: - query = (Weex1 - .select() - .where(Weex1.id.between(start_ts, end_ts)) - .order_by(Weex1.id.asc())) - - data = [] - for i in query: - if all(hasattr(i, attr) for attr in ['open', 'high', 'low', 'close']): - data.append({ - 'id': i.id, - 'open': float(i.open), - 'high': float(i.high), - 'low': float(i.low), - 'close': float(i.close) - }) - - return data - - except Exception as e: - logger.error(f"获取1分钟数据失败: {e}") - return [] - - -def get_1min_window(center_ts_ms: int, minutes_before: int = 30, minutes_after: int = 60): - """基于中心时间,获取前后窗口的一分钟K线数据。 - 返回按时间升序的列表[{id, open, high, low, close}]。 - """ - try: - start_ts = center_ts_ms - minutes_before * 60 * 1000 - end_ts = center_ts_ms + minutes_after * 60 * 1000 - return get_future_data_1min(start_ts, end_ts) - except Exception as e: - logger.error(f"获取一分钟窗口数据失败: {e}") +def get_future_data_1min(start_ts: int, end_ts: int) -> List[Dict[str, Any]]: + """获取指定时间范围内的 1 分钟数据(闭区间)""" + if end_ts < start_ts: return [] + query = (Weex1 + .select() + .where(Weex1.id.between(start_ts, end_ts)) + .order_by(Weex1.id.asc())) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] # =============================================================== -# 📈 信号判定模块(修正版) +# 📈 信号判定模块 # =============================================================== -def is_bullish(candle): - """判断是否为阳线""" - return float(candle['open']) < float(candle['close']) +def is_bullish(c) -> bool: + return float(c['open']) < float(c['close']) -def is_bearish(candle): - """判断是否为阴线""" - return float(candle['open']) > float(candle['close']) +def is_bearish(c) -> bool: + return float(c['open']) > float(c['close']) -def check_signal(prev, curr): +def check_signal(prev: Dict[str, Any], curr: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: """ - 判断是否出现包住形态(修正版) - - 包住形态定义: - - 看涨包住:前一根阴线,当前阳线完全包含前一根阴线的实体 - - 看跌包住:前一根阳线,当前阴线完全包含前一根阳线的实体 + 判断是否出现包住形态 + 返回 (方向direction, 信号键signal_key) + signal_key: 'bear_bull_engulf' 表示 涨包跌(前跌后涨包住 -> 做多) + 'bull_bear_engulf' 表示 跌包涨(前涨后跌包住 -> 做空) """ - try: - p_open, p_close = float(prev['open']), float(prev['close']) - c_open, c_close = float(curr['open']), float(curr['close']) + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) - # 确保数据有效 - if not all(isinstance(x, (int, float)) and x > 0 for x in [p_open, p_close, c_open, c_close]): - return None, None + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" - # 看涨包住:前一根是阴线,当前是阳线,且当前阳线完全包住前一根阴线 - if (is_bearish(prev) and is_bullish(curr) and - c_open <= p_close and c_close >= p_open): - logger.debug(f"发现看涨包住信号: 前阴线({p_open:.2f}-{p_close:.2f}) 当前阳线({c_open:.2f}-{c_close:.2f})") - return "long", "bear_bull_engulf" + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" - # 看跌包住:前一根是阳线,当前是阴线,且当前阴线完全包住前一根阳线 - if (is_bullish(prev) and is_bearish(curr) and - c_open >= p_close and c_close <= p_open): - logger.debug(f"发现看跌包住信号: 前阳线({p_open:.2f}-{p_close:.2f}) 当前阴线({c_open:.2f}-{c_close:.2f})") - return "short", "bull_bear_engulf" - - return None, None - - except Exception as e: - logger.error(f"信号判断出错: {e}") - return None, None + return None, None # =============================================================== -# 💹 回测模拟模块(修正版) +# 💹 回测模拟模块(使用 1 分钟数据) # =============================================================== -def simulate_trade(direction, entry_price, entry_time, next_15min_time, config: BacktestConfig): +def simulate_trade( + direction: str, + entry_price: float, + start_ts: int, + end_ts: int, + tp: float, + sl: float +) -> Tuple[Optional[float], float, Optional[int]]: """ - 用 1 分钟数据进行精细化止盈止损模拟(修正版) + 用 1 分钟数据进行精细化止盈止损模拟 + - direction: "long" / "short" + - entry_price: 开仓价格 + - start_ts: 开始时间(毫秒时间戳,包含) + - end_ts: 结束时间(毫秒时间戳,包含) + - tp: 止盈点(价格差,正数) + - sl: 止损点(价格差,负数) - Args: - direction: 交易方向 ("long" 或 "short") - entry_price: 开仓价格 - entry_time: 开仓时间(毫秒时间戳) - next_15min_time: 下一个15分钟K线时间戳 - config: 回测配置 - - Returns: - (exit_price, profit_loss_points, exit_time, slippage_cost) + 返回 (exit_price, diff, exit_time) + - exit_price: 平仓价格,None 表示区间内无数据 + - diff: 价格差 = 对多头 (exit - entry),对空头 (entry - exit) + - exit_time: 平仓发生时间戳 """ - try: - # 获取未来1分钟数据 - future_candles = get_future_data_1min(entry_time, next_15min_time) - if not future_candles: - logger.warning(f"未获取到1分钟数据: {entry_time} - {next_15min_time}") - return None, 0, None, 0 + future_candles = get_future_data_1min(start_ts, end_ts) + if not future_candles: + logger.warning(f"simulate_trade 无分钟数据: [{start_ts}, {end_ts}],跳过该段模拟。") + return None, 0.0, None - # 计算止盈止损价格 - if direction == "long": - tp_price = entry_price + config.take_profit - sl_price = entry_price + config.stop_loss - else: # short - tp_price = entry_price - config.take_profit - sl_price = entry_price - config.stop_loss + if direction == "long": + tp_price = entry_price + tp + sl_price = entry_price + sl # sl为负 + else: + tp_price = entry_price - tp + sl_price = entry_price - sl # 注意:sl为负,因此 entry - sl = entry - (-x) = entry + x - slippage_cost = 0.0 - - for candle in future_candles: - open_p, high, low, close = map(float, (candle['open'], candle['high'], candle['low'], candle['close'])) - - if direction == "long": - # 检查止损 - if low <= sl_price: - exit_price = sl_price - (sl_price * config.slippage_rate) - slippage_cost = sl_price * config.slippage_rate - profit_loss = config.stop_loss - slippage_cost - logger.debug(f"多头止损触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 亏损{profit_loss:.2f}") - return exit_price, profit_loss, candle['id'], slippage_cost - - # 检查止盈 - if high >= tp_price: - exit_price = tp_price - (tp_price * config.slippage_rate) - slippage_cost = tp_price * config.slippage_rate - profit_loss = config.take_profit - slippage_cost - logger.debug(f"多头止盈触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈利{profit_loss:.2f}") - return exit_price, profit_loss, candle['id'], slippage_cost - - else: # short - # 检查止损 - if high >= sl_price: - exit_price = sl_price + (sl_price * config.slippage_rate) - slippage_cost = sl_price * config.slippage_rate - profit_loss = config.stop_loss - slippage_cost - logger.debug(f"空头止损触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 亏损{profit_loss:.2f}") - return exit_price, profit_loss, candle['id'], slippage_cost - - # 检查止盈 - if low <= tp_price: - exit_price = tp_price + (tp_price * config.slippage_rate) - slippage_cost = tp_price * config.slippage_rate - profit_loss = config.take_profit - slippage_cost - logger.debug(f"空头止盈触发: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈利{profit_loss:.2f}") - return exit_price, profit_loss, candle['id'], slippage_cost - - # 未触发止盈止损,用最后一根收盘价平仓 - final_candle = future_candles[-1] - final_price = float(final_candle['close']) + for candle in future_candles: + open_p = float(candle['open']) + high = float(candle['high']) + low = float(candle['low']) if direction == "long": - exit_price = final_price - (final_price * config.slippage_rate) - profit_loss = (exit_price - entry_price) + # 开盘跳空优先 + if open_p >= tp_price: + return open_p, open_p - entry_price, candle['id'] + if open_p <= sl_price: + return open_p, open_p - entry_price, candle['id'] + # 盘中触发 + if high >= tp_price: + return tp_price, tp, candle['id'] + if low <= sl_price: + return sl_price, sl, candle['id'] else: - exit_price = final_price + (final_price * config.slippage_rate) - profit_loss = (entry_price - exit_price) + # 空头 + if open_p <= tp_price: + return open_p, entry_price - open_p, candle['id'] + if open_p >= sl_price: + return open_p, entry_price - open_p, candle['id'] + if low <= tp_price: + return tp_price, tp, candle['id'] + if high >= sl_price: + return sl_price, sl, candle['id'] - slippage_cost = final_price * config.slippage_rate - logger.debug(f"时间到期平仓: 入场{entry_price:.2f} 出场{exit_price:.2f} 盈亏{profit_loss:.2f}") - - return exit_price, profit_loss, final_candle['id'], slippage_cost - - except Exception as e: - logger.error(f"交易模拟出错: {e}") - return None, 0, None, 0 - - -def simulate_until(direction, entry_price, entry_time, end_time, config: BacktestConfig): - """ - 从entry_time开始向后检查直到end_time(不跨越end_time), - 返回在此区间内是否触发TP/SL以及对应的退出信息。 - 若未触发,返回(None, 0, None, 0)。 - """ - try: - candles = get_future_data_1min(entry_time, end_time) - if not candles: - return None, 0, None, 0 - - if direction == "long": - tp_price = entry_price + config.take_profit - sl_price = entry_price + config.stop_loss - else: - tp_price = entry_price - config.take_profit - sl_price = entry_price - config.stop_loss - - for candle in candles: - open_p, high, low = map(float, (candle['open'], candle['high'], candle['low'])) - if direction == "long": - if low <= sl_price: - exit_price = sl_price - (sl_price * config.slippage_rate) - slippage_cost = sl_price * config.slippage_rate - return exit_price, config.stop_loss - slippage_cost, candle['id'], slippage_cost - if high >= tp_price: - exit_price = tp_price - (tp_price * config.slippage_rate) - slippage_cost = tp_price * config.slippage_rate - return exit_price, config.take_profit - slippage_cost, candle['id'], slippage_cost - else: - if high >= sl_price: - exit_price = sl_price + (sl_price * config.slippage_rate) - slippage_cost = sl_price * config.slippage_rate - return exit_price, config.stop_loss - slippage_cost, candle['id'], slippage_cost - if low <= tp_price: - exit_price = tp_price + (tp_price * config.slippage_rate) - slippage_cost = tp_price * config.slippage_rate - return exit_price, config.take_profit - slippage_cost, candle['id'], slippage_cost - return None, 0, None, 0 - except Exception as e: - logger.error(f"分段交易模拟出错: {e}") - return None, 0, None, 0 - - -def calculate_fees(entry_price, exit_price, config: BacktestConfig): - """计算手续费""" - open_fee = config.open_fee - close_fee = config.contract_size * config.close_fee_rate - return open_fee + close_fee + # 未触发止盈止损,用最后一根收盘价平仓 + final = future_candles[-1] + final_price = float(final['close']) + diff = (final_price - entry_price) if direction == "long" else (entry_price - final_price) + return final_price, diff, final['id'] # =============================================================== -# 📈 可视化:一分钟K线与开仓位置 +# 📊 主回测流程(逐笔模式:每次信号只在下一根15m内评估TP/SL) # =============================================================== -def _to_datetime(ms: int) -> datetime.datetime: - return datetime.datetime.fromtimestamp(ms / 1000) - - -def visualize_trade_1min(trade: 'TradeRecord', *, - minutes_before: int = 30, - minutes_after: int = 60, - take_profit_points: Optional[float] = None, - stop_loss_points: Optional[float] = None, - output_dir: str = "charts_1m") -> Optional[str]: - """生成指定交易周边的一分钟K线图(Plotly HTML)。 - - Args: - trade: 回测产生的交易记录 - minutes_before: 开仓前取多少分钟的数据 - minutes_after: 开仓后取多少分钟的数据 - take_profit_points: 可选,绘制入场±止盈线(点) - stop_loss_points: 可选,绘制入场±止损线(点) - output_dir: 输出目录 - - Returns: - 生成的HTML路径,失败返回None。 +def backtest( + dates: List[str], + tp: float, + sl: float, + config: BacktestConfig = BacktestConfig() +) -> Tuple[List[Dict[str, Any]], Dict[str, Dict[str, Any]]]: """ - try: - entry_ms = int(trade.entry_time.timestamp() * 1000) - data = get_1min_window(entry_ms, minutes_before, minutes_after) - if not data: - logger.warning("一分钟数据为空,跳过可视化") - return None - - # 构造DataFrame以便排序与渲染 - df = pd.DataFrame(data) - df = df.sort_values('id').reset_index(drop=True) - df['time'] = df['id'].apply(lambda x: _to_datetime(int(x))) - df['open'] = df['open'].astype(float) - df['high'] = df['high'].astype(float) - df['low'] = df['low'].astype(float) - df['close'] = df['close'].astype(float) - - fig = go.Figure() - fig.add_trace(go.Candlestick( - x=df['time'], - open=df['open'], high=df['high'], low=df['low'], close=df['close'], - name="1分钟K线" - )) - - # 将入场/出场时间吸附到最近的一分钟K线时间(提高对齐准确度) - def _snap_time(target_dt: datetime.datetime, tolerance_ms: int = 90_000): - target_ms = int(target_dt.timestamp() * 1000) - diffs = (df['id'] - target_ms).abs() - idx = int(diffs.idxmin()) - if abs(int(df.at[idx, 'id']) - target_ms) <= tolerance_ms: - return df.at[idx, 'time'] - return target_dt - - snapped_entry_time = _snap_time(trade.entry_time) - snapped_exit_time = _snap_time(trade.exit_time) - - # 标注入场、出场 - entry_y = float(trade.entry_price) - exit_y = float(trade.exit_price) - fig.add_trace(go.Scatter( - x=[snapped_entry_time], - y=[entry_y], - mode="markers+text", - name="入场", - text=["入场"], - textposition="top center", - marker=dict(color="#2ecc71", size=10, symbol="triangle-up") - )) - fig.add_trace(go.Scatter( - x=[snapped_exit_time], - y=[exit_y], - mode="markers+text", - name="出场", - text=["出场"], - textposition="bottom center", - marker=dict(color="#e74c3c", size=10, symbol="x") - )) - - # 可选:止盈止损参考线 - shapes = [] - annotations = [] - if take_profit_points is not None: - tp_price = entry_y + take_profit_points if trade.direction == "做多" else entry_y - take_profit_points - shapes.append(dict(type="line", xref="x", yref="y", - x0=df['time'].min(), x1=df['time'].max(), y0=tp_price, y1=tp_price, - line=dict(color="rgba(46, 204, 113, 0.5)", width=1, dash="dash"))) - annotations.append(dict(xref="x", yref="y", x=df['time'].min(), y=tp_price, - text="TP", showarrow=False, font=dict(color="#2ecc71"))) - if stop_loss_points is not None: - sl_price = entry_y + stop_loss_points if trade.direction == "做多" else entry_y - stop_loss_points - shapes.append(dict(type="line", xref="x", yref="y", - x0=df['time'].min(), x1=df['time'].max(), y0=sl_price, y1=sl_price, - line=dict(color="rgba(231, 76, 60, 0.5)", width=1, dash="dot"))) - annotations.append(dict(xref="x", yref="y", x=df['time'].min(), y=sl_price, - text="SL", showarrow=False, font=dict(color="#e74c3c"))) - - title = ( - f"{trade.entry_time.strftime('%Y-%m-%d %H:%M')} 开仓 - {trade.direction}({trade.signal_type}) " - f"入场={trade.entry_price:.2f} 出场={trade.exit_price:.2f} 盈亏={trade.profit_loss:.2f}点" - ) - fig.update_layout( - title=title, - xaxis_title="时间", - yaxis_title="价格", - xaxis=dict(rangeslider=dict(visible=False)), - shapes=shapes, - annotations=annotations, - hovermode="x unified" - ) - - Path(output_dir).mkdir(parents=True, exist_ok=True) - fname = f"trade_{trade.entry_time.strftime('%Y%m%d_%H%M%S')}.html" - out_path = os.path.join(output_dir, fname) - fig.write_html(out_path, include_plotlyjs="cdn", auto_open=False) - logger.info(f"一分钟K线可视化已生成: {out_path}") - return out_path - except Exception as e: - logger.error(f"生成一分钟K线图失败: {e}") - return None - - -# =============================================================== -# 📊 主回测流程(修正版) -# =============================================================== - -def backtest(dates, config: BacktestConfig): + 逐笔回测: + - 形态出现在 idx(prev=idx-1, curr=idx),在 idx+1(下一根15mK)开盘入场 + - 仅在该入场K线的周期内(约15分钟)用1分钟数据评估TP/SL """ - 主回测函数(修正版) - - Args: - dates: 日期列表 - config: 回测配置 - - Returns: - (trades, stats) - """ - logger.info(f"开始回测,日期范围: {dates[0]} 到 {dates[-1]}") - - # 获取所有15分钟K线数据 - all_data = [] + all_data: List[Dict[str, Any]] = [] for date_str in dates: - daily_data = get_data_by_date(Weex15, date_str) - if daily_data: - all_data.extend(daily_data) - else: - logger.warning(f"日期 {date_str} 没有数据") - - if not all_data: - logger.error("没有获取到任何数据") - return [], {} - + all_data.extend(get_data_by_date(Weex15, date_str)) all_data.sort(key=lambda x: x['id']) - logger.info(f"总共获取到 {len(all_data)} 条15分钟K线数据") - # 初始化统计 - stats = { - "bear_bull_engulf": SignalStats(signal_name="看涨包住"), - "bull_bear_engulf": SignalStats(signal_name="看跌包住"), + stats: Dict[str, Dict[str, Any]] = { + "bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0.0, "name": "涨包跌"}, + "bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0.0, "name": "跌包涨"}, } - trades = [] - total_trades = 0 + trades: List[Dict[str, Any]] = [] - # 主回测循环(加入持仓管理:同向不加仓,反向平旧开新) - position = PositionState() - for idx in range(1, len(all_data) - 1): - try: - prev, curr = all_data[idx - 1], all_data[idx] + # 至少需要 prev(0), curr(1), entry(idx+1=2), next-of-entry(idx+2=3) + for idx in range(1, len(all_data) - 2): + prev, curr = all_data[idx - 1], all_data[idx] + entry_candle = all_data[idx + 1] + next_of_entry = all_data[idx + 2] # 用于界定入场K的结束边界 - # 检查信号 - direction, signal = check_signal(prev, curr) - if not direction: - continue - - # 检查信号是否启用 - if signal == "bear_bull_engulf" and not config.enable_bear_bull_engulf: - continue - if signal == "bull_bear_engulf" and not config.enable_bull_bear_engulf: - continue - - # 当前K线时间与价格 - current_time = curr['id'] - current_close = float(curr['close']) - - # 若有持仓,先滚动检查从上次检查时间到当前时间是否触发TP/SL - if position.direction is not None: - check_from = position.last_checked_time or position.entry_time - if current_time > check_from: - e_price, pl_pts, e_time, slip = simulate_until( - position.direction, position.entry_price, check_from, current_time, config - ) - if e_price is not None: - # 生成平仓记录(由持仓信号驱动,不计入当前信号统计) - total_fee = calculate_fees(position.entry_price, e_price, config) - profit_amount = pl_pts * config.contract_size - trade = TradeRecord( - entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000), - exit_time=datetime.datetime.fromtimestamp(e_time / 1000), - signal_type="持仓止盈/止损", - direction="做多" if position.direction == "long" else "做空", - entry_price=position.entry_price, - exit_price=e_price, - profit_loss=pl_pts, - profit_amount=profit_amount, - total_fee=total_fee, - net_profit=profit_amount - total_fee, - slippage_cost=slip * config.contract_size - ) - trades.append(trade) - total_trades += 1 - # 清空持仓 - position = PositionState() - else: - position.last_checked_time = current_time - - # 根据信号与持仓关系决定是否开/平仓 - if direction: - if position.direction is None: - # 无持仓 -> 开仓 - position = PositionState(direction=direction, entry_price=current_close, entry_time=current_time, last_checked_time=current_time) - else: - if position.direction == direction: - # 同向信号,不加仓,保持原持仓 - pass - else: - # 反向信号:先以当前价立即平旧仓,再开新仓 - e_price = current_close - if position.direction == "long": - pl_pts = e_price - position.entry_price - else: - pl_pts = position.entry_price - e_price - total_fee = calculate_fees(position.entry_price, e_price, config) - profit_amount = pl_pts * config.contract_size - trade = TradeRecord( - entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000), - exit_time=datetime.datetime.fromtimestamp(current_time / 1000), - signal_type="反向信号平仓", - direction="做多" if position.direction == "long" else "做空", - entry_price=position.entry_price, - exit_price=e_price, - profit_loss=pl_pts, - profit_amount=profit_amount, - total_fee=total_fee, - net_profit=profit_amount - total_fee, - slippage_cost=0.0 - ) - trades.append(trade) - total_trades += 1 - # 开新仓 - position = PositionState(direction=direction, entry_price=current_close, entry_time=current_time, last_checked_time=current_time) - - except Exception as e: - logger.error(f"处理第 {idx} 条数据时出错: {e}") + direction, signal_key = check_signal(prev, curr) + if not direction or signal_key is None: continue - # 循环结束后,如仍有持仓,按最后一根收盘价平仓 - if position.direction is not None: - final = all_data[-1] - final_time = final['id'] - final_price = float(final['close']) - if position.direction == "long": - pl_pts = final_price - position.entry_price - else: - pl_pts = position.entry_price - final_price - total_fee = calculate_fees(position.entry_price, final_price, config) - profit_amount = pl_pts * config.contract_size - trade = TradeRecord( - entry_time=datetime.datetime.fromtimestamp(position.entry_time / 1000), - exit_time=datetime.datetime.fromtimestamp(final_time / 1000), - signal_type="时间到期平仓", - direction="做多" if position.direction == "long" else "做空", - entry_price=position.entry_price, - exit_price=final_price, - profit_loss=pl_pts, - profit_amount=profit_amount, - total_fee=total_fee, - net_profit=profit_amount - total_fee, - slippage_cost=0.0 - ) - trades.append(trade) - total_trades += 1 - position = PositionState() + if (signal_key == "bear_bull_engulf" and not config.enable_bear_bull_engulf) or \ + (signal_key == "bull_bear_engulf" and not config.enable_bull_bear_engulf): + continue + + entry_price = float(entry_candle['open']) + entry_ts = entry_candle['id'] + # 只在入场这根15m内评估:结束边界 = 下一根15m开始时间 - 1毫秒 + end_ts = next_of_entry['id'] - 1 + + exit_price, diff, exit_time = simulate_trade( + direction=direction, + entry_price=entry_price, + start_ts=entry_ts, + end_ts=end_ts, + tp=tp, + sl=sl + ) + + if exit_price is None: + # 无分钟数据,跳过统计 + continue + + # 盈亏金额与费用 + profit_amount = diff * config.contract_size + total_fee = config.open_fee + config.close_fee_rate * exit_price * config.contract_size + net_profit = profit_amount - total_fee + + stats[signal_key]['count'] += 1 + stats[signal_key]['total_profit'] += net_profit + if net_profit > 0: + stats[signal_key]['wins'] += 1 + + trades.append({ + "entry_time": datetime.datetime.fromtimestamp(entry_ts / 1000), + "exit_time": datetime.datetime.fromtimestamp(exit_time / 1000), + "signal_key": signal_key, + "signal": stats[signal_key]['name'], + "direction": "做多" if direction == "long" else "做空", + "entry": entry_price, + "exit": exit_price, + "diff": diff, + "profit_amount": profit_amount, + "fee": total_fee, + "net_profit": net_profit, + }) - logger.info(f"回测完成,总共 {total_trades} 笔交易") return trades, stats # =============================================================== -# 📊 结果分析模块 +# 📊 主回测流程(单笔持仓模式:同向忽略,反向先平后开,始终仅一笔持仓) # =============================================================== -def analyze_results(trades, stats): - """分析回测结果""" - if not trades: - logger.warning("没有交易记录") - return +def backtest_single_position( + dates: List[str], + tp: float, + sl: float, + config: BacktestConfig = BacktestConfig() +) -> Tuple[List[Dict[str, Any]], Dict[str, Dict[str, Any]]]: + """ + 单笔持仓回测: + - 同向信号:忽略(不加仓) + - 反向信号:先用分钟数据从当前持仓entry_time模拟到“新信号的入场时刻-1毫秒”完成平仓,再按新信号在其入场K开仓 + - 始终保证同一时间只有一笔持仓 + - 如果一直没有反向信号出现,最后用全部数据的最后时刻完成平仓 + """ + all_data: List[Dict[str, Any]] = [] + for date_str in dates: + all_data.extend(get_data_by_date(Weex15, date_str)) + all_data.sort(key=lambda x: x['id']) - total_trades = len(trades) - total_profit = sum(t.profit_amount for t in trades) - total_fee = sum(t.total_fee for t in trades) - total_slippage = sum(t.slippage_cost for t in trades) - net_profit = total_profit - total_fee - total_slippage + stats: Dict[str, Dict[str, Any]] = { + "bear_bull_engulf": {"count": 0, "wins": 0, "total_profit": 0.0, "name": "涨包跌"}, + "bull_bear_engulf": {"count": 0, "wins": 0, "total_profit": 0.0, "name": "跌包涨"}, + } - wins = sum(1 for t in trades if t.profit_loss > 0) - losses = total_trades - wins - win_rate = (wins / total_trades * 100) if total_trades > 0 else 0 + trades: List[Dict[str, Any]] = [] + current_position: Optional[Dict[str, Any]] = None - avg_profit = total_profit / total_trades if total_trades > 0 else 0 - avg_fee = total_fee / total_trades if total_trades > 0 else 0 + # 为了在反向信号到来时平仓,至少需要 prev(0), curr(1), entry(idx+1=2) + for idx in range(1, len(all_data) - 1): + prev, curr = all_data[idx - 1], all_data[idx] + entry_candle = all_data[idx + 1] # 新信号的入场K + direction, signal_key = check_signal(prev, curr) + if not direction or signal_key is None: + continue - logger.info("=" * 50) - logger.info("📊 回测结果汇总") - logger.info("=" * 50) - logger.info(f"总交易次数: {total_trades}") - logger.info(f"盈利次数: {wins}") - logger.info(f"亏损次数: {losses}") - logger.info(f"胜率: {win_rate:.2f}%") - logger.info(f"总盈亏: {total_profit:.2f}") - logger.info(f"总手续费: {total_fee:.2f}") - logger.info(f"总滑点成本: {total_slippage:.2f}") - logger.info(f"净利润: {net_profit:.2f}") - logger.info(f"平均每笔盈亏: {avg_profit:.2f}") - logger.info(f"平均每笔手续费: {avg_fee:.2f}") + if (signal_key == "bear_bull_engulf" and not config.enable_bear_bull_engulf) or \ + (signal_key == "bull_bear_engulf" and not config.enable_bull_bear_engulf): + continue - # 按信号类型分析 - logger.info("\n" + "=" * 30) - logger.info("📈 信号类型分析") - logger.info("=" * 30) + entry_price_new = float(entry_candle['open']) + entry_ts_new = entry_candle['id'] - for signal_key, signal_stat in stats.items(): - if signal_stat.count > 0: - logger.info(f"\n{signal_stat.signal_name}:") - logger.info(f" 信号次数: {signal_stat.count}") - logger.info(f" 胜率: {signal_stat.win_rate:.2f}%") - logger.info(f" 总盈亏: {signal_stat.total_profit:.2f}") - logger.info(f" 总手续费: {signal_stat.total_fee:.2f}") - logger.info(f" 总滑点: {signal_stat.total_slippage:.2f}") - logger.info(f" 净利润: {signal_stat.net_profit:.2f}") - logger.info(f" 平均盈亏: {signal_stat.avg_profit:.2f}") + # 若已有持仓 + if current_position is not None: + # 同向 -> 忽略 + if current_position['direction'] == direction: + continue + # 反向 -> 先平旧仓(分钟模拟:从旧entry_time -> 新入场时刻-1毫秒) + exit_price, diff, exit_time = simulate_trade( + direction=current_position['direction'], + entry_price=current_position['entry_price'], + start_ts=current_position['entry_time'], + end_ts=entry_ts_new - 1, + tp=tp, + sl=sl + ) + if exit_price is not None: + profit_amount = diff * config.contract_size + total_fee = config.open_fee + config.close_fee_rate * exit_price * config.contract_size + net_profit = profit_amount - total_fee + + trades.append({ + "entry_time": datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + "exit_time": datetime.datetime.fromtimestamp(exit_time / 1000), + "signal_key": current_position['signal_key'], + "signal": current_position['signal_name'], + "direction": "做多" if current_position['direction'] == "long" else "做空", + "entry": current_position['entry_price'], + "exit": exit_price, + "diff": diff, + "profit_amount": profit_amount, + "fee": total_fee, + "net_profit": net_profit, + }) + + stats[current_position['signal_key']]['count'] += 1 + stats[current_position['signal_key']]['total_profit'] += net_profit + if net_profit > 0: + stats[current_position['signal_key']]['wins'] += 1 + + # 清空持仓,再开新仓 + current_position = None + + # 开新仓(在新信号的入场K开盘价) + current_position = { + "direction": direction, + "signal_key": signal_key, + "signal_name": stats[signal_key]['name'], + "entry_price": entry_price_new, + "entry_time": entry_ts_new + } + + # 数据末尾:若仍有持仓,用最后时间收盘价平仓(分钟模拟:从entry_time -> 全部数据最后时刻) + if current_position is not None and len(all_data) > 0: + final_ts = all_data[-1]['id'] + exit_price, diff, exit_time = simulate_trade( + direction=current_position['direction'], + entry_price=current_position['entry_price'], + start_ts=current_position['entry_time'], + end_ts=final_ts, + tp=tp, + sl=sl + ) + if exit_price is not None: + profit_amount = diff * config.contract_size + total_fee = config.open_fee + config.close_fee_rate * exit_price * config.contract_size + net_profit = profit_amount - total_fee + + trades.append({ + "entry_time": datetime.datetime.fromtimestamp(current_position['entry_time'] / 1000), + "exit_time": datetime.datetime.fromtimestamp(exit_time / 1000), + "signal_key": current_position['signal_key'], + "signal": current_position['signal_name'], + "direction": "做多" if current_position['direction'] == "long" else "做空", + "entry": current_position['entry_price'], + "exit": exit_price, + "diff": diff, + "profit_amount": profit_amount, + "fee": total_fee, + "net_profit": net_profit, + }) + + stats[current_position['signal_key']]['count'] += 1 + stats[current_position['signal_key']]['total_profit'] += net_profit + if net_profit > 0: + stats[current_position['signal_key']]['wins'] += 1 + + return trades, stats + + +# =============================================================== +# 🔧 辅助:日期范围生成 +# =============================================================== + +def gen_dates(start_date: str, end_date: str) -> List[str]: + """生成 [start_date, end_date] 闭区间的日期字符串列表,格式 YYYY-MM-DD""" + ds = datetime.datetime.strptime(start_date, "%Y-%m-%d") + de = datetime.datetime.strptime(end_date, "%Y-%m-%d") + res = [] + d = ds + while d <= de: + res.append(d.strftime("%Y-%m-%d")) + d += datetime.timedelta(days=1) + return res + + +# =============================================================== +# 📈 可视化:K线 + 交易点位(交互式HTML) +# =============================================================== + +def _collect_15m_candles(dates: List[str]) -> List[Dict[str, Any]]: + """收集一段日期内的15分钟K线,升序。""" + candles: List[Dict[str, Any]] = [] + for date_str in dates: + candles.extend(get_data_by_date(Weex15, date_str)) + candles.sort(key=lambda x: x['id']) + return candles + + +def plot_trades_candlestick( + dates: List[str], + trades: List[Dict[str, Any]], + output_html: str = "backtest_trades.html", + title: str = "回测交易可视化" +) -> Optional[str]: + """ + 绘制15m K线,叠加每笔交易的入场/出场标记与连线。 + - 绿色:盈利交易;红色:亏损交易 + - 做多入场标记:triangle-up;做空入场标记:triangle-down;出场标记:x + 返回生成的HTML路径,若缺少依赖则返回None。 + """ + if not _PLOTLY_AVAILABLE: + logger.error(f"缺少可视化依赖:plotly。请先安装:pip install plotly\n原始错误:{_PLOTLY_IMPORT_ERROR}") + return None + + candles = _collect_15m_candles(dates) + if not candles: + logger.warning("无15分钟K线数据,跳过可视化。") + return None + + ts = [c['id'] for c in candles] + opens = [float(c['open']) for c in candles] + highs = [float(c['high']) for c in candles] + lows = [float(c['low']) for c in candles] + closes = [float(c['close']) for c in candles] + + fig = go.Figure() + fig.add_trace(go.Candlestick( + x=[datetime.datetime.fromtimestamp(t/1000) for t in ts], + open=opens, high=highs, low=lows, close=closes, + name="15m K线", + increasing_line_color="#26a69a", + decreasing_line_color="#ef5350", + showlegend=True + )) + + # 预计算紧凑价格范围(5%-95%分位) + try: + all_prices = [*lows, *highs] + all_prices.sort() + n = len(all_prices) + if n >= 20: + i5 = max(0, int(n * 0.05)) + i95 = min(n - 1, int(n * 0.95)) + compact_low = all_prices[i5] + compact_high = all_prices[i95] + pad = (compact_high - compact_low) * 0.02 + init_y_range = [compact_low - pad, compact_high + pad] if compact_high > compact_low else None + else: + init_y_range = None + except Exception: + init_y_range = None + + # 叠加交易点位 + for trade in trades: + entry_time: datetime.datetime = trade['entry_time'] + exit_time: datetime.datetime = trade['exit_time'] + entry_px = float(trade['entry']) + exit_px = float(trade['exit']) + direction_text = str(trade['direction']) # "做多"/"做空" + signal_name = str(trade.get('signal', '')) + net_profit = float(trade.get('net_profit', trade.get('diff', 0.0))) + + color = "#2ecc71" if net_profit >= 0 else "#e74c3c" + entry_marker = "triangle-up" if direction_text == "做多" else "triangle-down" + + # 入场标记 + fig.add_trace(go.Scatter( + x=[entry_time], y=[entry_px], + mode="markers", + marker=dict(symbol=entry_marker, size=12, color=color, line=dict(width=1, color="#ffffff")), + name="入场", + hovertemplate=( + "信号:%{customdata[0]}
方向:%{customdata[1]}
入场:%{y:.2f}
时间:%{x}入场" + ), + customdata=[[signal_name, direction_text]], + showlegend=False + )) + + # 出场标记 + fig.add_trace(go.Scatter( + x=[exit_time], y=[exit_px], + mode="markers", + marker=dict(symbol="x", size=10, color=color), + name="出场", + hovertemplate=( + "出场:%{y:.2f}
时间:%{x}
净利润:%{customdata[0]:.2f}出场" + ), + customdata=[[net_profit]], + showlegend=False + )) + + # 连接线(入场->出场) + fig.add_trace(go.Scatter( + x=[entry_time, exit_time], y=[entry_px, exit_px], + mode="lines", + line=dict(color=color, width=2), + name="交易", + hoverinfo="skip", + showlegend=False + )) + + # 全局布局与交互 + fig.update_layout( + title=title, + template="plotly_dark", + xaxis_title="时间", + yaxis_title="价格", + hovermode="x unified", + margin=dict(l=60, r=30, t=60, b=40), + dragmode="zoom", + legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), + xaxis=dict( + rangeslider=dict(visible=True), + rangeselector=dict( + buttons=[ + dict(count=1, label="1h", step="hour", stepmode="backward"), + dict(count=4, label="4h", step="hour", stepmode="backward"), + dict(count=1, label="1d", step="day", stepmode="backward"), + dict(count=7, label="1w", step="day", stepmode="backward"), + dict(step="all", label="ALL") + ] + ) + ), + yaxis=dict( + fixedrange=False, + autorange=False if init_y_range else True, + range=init_y_range + ), + uirevision="keep" # 保持用户当前视图设置 + ) + + # 优化交互:十字光标、缩放细节 + fig.update_xaxes(showspikes=True, spikemode="across", spikesnap="cursor", showline=True) + fig.update_yaxes(showspikes=True, spikemode="across", spikesnap="cursor", showline=True) + + # 输出HTML + output_html = os.path.abspath(output_html) + fig.write_html( + output_html, + include_plotlyjs='cdn', + auto_open=False, + full_html=True, + config={ + "scrollZoom": True, # 鼠标滚轮缩放 + "displaylogo": False, # 隐藏Plotly Logo + "doubleClick": "autosize", # 双击自适应 + "modeBarButtonsToAdd": [ # 常用交互按钮 + "toggleSpikelines", + "zoom2d", + "pan2d", + "zoomIn2d", + "zoomOut2d", + "zoomInY", + "zoomOutY", + "autoScale2d", + "resetScale2d", + "drawline", + "drawopenpath", + "eraseshape" + ] + } + ) + logger.info(f"已生成可视化HTML:{output_html}") + return output_html # =============================================================== @@ -726,68 +615,56 @@ def analyze_results(trades, stats): # =============================================================== if __name__ == '__main__': - # 配置日志 - logger.add("backtest.log", rotation="1 day", retention="7 days") - - # 创建回测配置 + # 你可以根据需要修改此配置 config = BacktestConfig( - take_profit=10.0, # 止盈10点 - stop_loss=-1.0, # 止损1点 - contract_size=10000, # 合约规模 - open_fee=5.0, # 开仓手续费 - close_fee_rate=0.0005, # 平仓手续费率 - slippage_rate=0.0001, # 滑点率0.01% - start_date="2025-9-1", - end_date="2025-9-30", + take_profit=10.0, + stop_loss=-5.0, + contract_size=10000, + open_fee=5.0, + close_fee_rate=0.0005, + start_date="2025-09-01", + end_date="2025-09-30", enable_bear_bull_engulf=True, enable_bull_bear_engulf=True ) - # 生成日期列表 - dates = [f"2025-9-{i}" for i in range(1, 31)] + # 生成日期列表(也可以自行传入 dates) + dates = gen_dates(config.start_date, config.end_date) - try: - # 执行回测 - trades, stats = backtest(dates, config) + # 运行“单笔持仓”模式(同向忽略,反向先平后开) + trades, stats = backtest_single_position(dates, tp=config.take_profit, sl=config.stop_loss, config=config) - # 输出详细交易记录 - logger.info("\n" + "=" * 80) - logger.info("📋 详细交易记录") - logger.info("=" * 80) + logger.info("===== 每笔交易详情 =====") + for t in trades: + logger.info( + f"{t['entry_time']} {t['direction']}({t['signal']}) " + f"入场={t['entry']:.2f} 出场={t['exit']:.2f} 出场时间={t['exit_time']} " + f"差价={t['diff']:.2f} 盈亏金额={t['profit_amount']:.2f} 手续费={t['fee']:.2f} 净利润={t['net_profit']:.2f}" + ) - for i, trade in enumerate(trades, 1): - logger.info( - f"{i:3d}. {trade.entry_time.strftime('%m-%d %H:%M')} " - f"{trade.direction}({trade.signal_type}) " - f"入场={trade.entry_price:.2f} 出场={trade.exit_price:.2f} " - f"出场时间={trade.exit_time.strftime('%m-%d %H:%M')} " - f"盈亏={trade.profit_loss:.2f}点 金额={trade.profit_amount:.2f} " - f"手续费={trade.total_fee:.2f} 滑点={trade.slippage_cost:.2f} " - f"净利润={trade.net_profit:.2f}" - ) + total_trades = len(trades) + total_profit_amount = sum(t['profit_amount'] for t in trades) + total_fee = sum(t['fee'] for t in trades) + total_net = sum(t['net_profit'] for t in trades) - # 分析结果 - analyze_results(trades, stats) - - logger.info("\n✅ 回测完成!") - - except Exception as e: - logger.error(f"回测执行失败: {e}") - raise - - # ============== 生成一分钟K线可视化(前10笔) ============== - try: - to_show = trades[:10] - for t in to_show: - visualize_trade_1min( - t, - minutes_before=30, - minutes_after=90, - take_profit_points=config.take_profit, - stop_loss_points=config.stop_loss - ) - logger.info("已为前10笔交易生成一分钟K线图(charts_1m 目录)") - except Exception as e: - logger.error(f"生成一分钟K线图时出错: {e}") + print(f"\n一共交易笔数:{total_trades}") + print(f"总毛盈利金额:{total_profit_amount:.2f}") + print(f"总手续费:{total_fee:.2f}") + print(f"总净利润:{total_net:.2f}") + print("\n===== 信号统计(基于净利润) =====") + for k, v in stats.items(): + name = v['name'] + count = v['count'] + wins = v['wins'] + win_rate = (wins / count * 100) if count > 0 else 0.0 + total_profit = v['total_profit'] # 这里已累计为“净利润金额” + print(f"{name}:次数={count} 胜率={win_rate:.2f}% 净利润合计={total_profit:.2f}") + # 若需要“逐笔模式(仅入场当根15m内评估TP/SL)”,可改为: + # trades2, stats2 = backtest(dates, tp=config.take_profit, sl=config.stop_loss, config=config) + # ... 输出逻辑同上 + # 生成交互式可视化 + html_path = plot_trades_candlestick(dates, trades, output_html="backtest_trades.html") + if html_path: + print(f"\n可视化文件已生成:{html_path}\n请用浏览器打开进行交互分析。") \ No newline at end of file