""" 布林带中轨策略参数分层搜索(2020-2025) 说明: - 全区间覆盖: period 1~1000, std 0.5~1000 - 分层搜索: 先粗扫全区间,再在候选周围细化,最终细化到 std=0.5 步长 - 使用 1m 触及方向过滤(先涨/先跌)时,按 period 复用触及方向以提速 """ from __future__ import annotations import argparse import math import os import tempfile import time from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path import numpy as np import pandas as pd from strategy.bb_midline_backtest import BBMidlineConfig, run_bb_midline_backtest from strategy.data_loader import get_1m_touch_direction, load_klines from strategy.indicators import bollinger G_DF: pd.DataFrame | None = None G_DF_1M: pd.DataFrame | None = None G_USE_1M: bool = True G_STEP_MIN: int = 5 def frange(start: float, end: float, step: float) -> list[float]: out: list[float] = [] x = float(start) while x <= end + 1e-9: out.append(round(x, 6)) x += step return out def build_grid( p_start: float, p_end: float, p_step: float, s_start: float, s_end: float, s_step: float, ) -> list[tuple[int, float]]: periods = sorted({max(1, min(1000, int(round(v)))) for v in frange(p_start, p_end, p_step)}) stds = sorted({round(max(0.5, min(1000.0, v)), 2) for v in frange(s_start, s_end, s_step)}) if 1000 not in periods: periods.append(1000) if 1000.0 not in stds: stds.append(1000.0) out = [(p, s) for p in periods for s in stds] return sorted(set(out)) def build_local_grid( centers: pd.DataFrame, p_window: int, p_step: int, s_window: float, s_step: float, ) -> list[tuple[int, float]]: out: set[tuple[int, float]] = set() for _, row in centers.iterrows(): p0 = int(row["period"]) s0 = float(row["std"]) p_min = max(1, p0 - p_window) p_max = min(1000, p0 + p_window) s_min = max(0.5, s0 - s_window) s_max = min(1000.0, s0 + s_window) periods = sorted({max(1, min(1000, int(round(v)))) for v in frange(p_min, p_max, p_step)}) stds = sorted({round(max(0.5, min(1000.0, v)), 2) for v in frange(s_min, s_max, s_step)}) for p in periods: for s in stds: out.add((p, s)) return sorted(out) def score_row(ret_pct: float, sharpe: float, dd_pct: float, n_trades: int) -> float: # 偏向“收益稳定”: 收益和夏普加分,回撤和极少交易惩罚 sparse_penalty = -5.0 if n_trades < 200 else 0.0 return ret_pct + sharpe * 12.0 - dd_pct * 0.8 + sparse_penalty def _init_worker(df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int): global G_DF, G_DF_1M, G_USE_1M, G_STEP_MIN G_DF = pd.read_pickle(df_path) G_DF_1M = pd.read_pickle(df_1m_path) if (use_1m and df_1m_path) else None G_USE_1M = bool(use_1m) G_STEP_MIN = int(step_min) def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]: period, std_list = args assert G_DF is not None arr_touch_dir = None if G_USE_1M and G_DF_1M is not None: close = G_DF["close"].astype(float) bb_mid, _, _, _ = bollinger(close, period, 1.0) arr_touch_dir = get_1m_touch_direction(G_DF, G_DF_1M, bb_mid.values, kline_step_min=G_STEP_MIN) rows: list[dict] = [] for std in std_list: cfg = BBMidlineConfig( bb_period=period, bb_std=float(std), initial_capital=200.0, margin_pct=0.01, leverage=100.0, cross_margin=True, fee_rate=0.0005, rebate_pct=0.90, rebate_hour_utc=0, fill_at_close=True, use_1m_touch_filter=G_USE_1M, kline_step_min=G_STEP_MIN, ) result = run_bb_midline_backtest( G_DF, cfg, df_1m=G_DF_1M if G_USE_1M else None, arr_touch_dir_override=arr_touch_dir, ) eq = result.equity_curve["equity"].dropna() if len(eq) == 0: final_eq = 0.0 ret_pct = -100.0 dd_u = -200.0 dd_pct = 100.0 else: final_eq = float(eq.iloc[-1]) ret_pct = (final_eq - cfg.initial_capital) / cfg.initial_capital * 100.0 dd_u = float((eq.astype(float) - eq.astype(float).cummax()).min()) dd_pct = abs(dd_u) / cfg.initial_capital * 100.0 n_trades = len(result.trades) win_rate = ( sum(1 for t in result.trades if t.net_pnl > 0) / n_trades * 100.0 if n_trades > 0 else 0.0 ) pnl = result.daily_stats["pnl"].astype(float) sharpe = float(pnl.mean() / pnl.std()) * math.sqrt(365.0) if pnl.std() > 0 else 0.0 stable_score = score_row(ret_pct, sharpe, dd_pct, n_trades) rows.append( { "period": period, "std": round(float(std), 2), "final_eq": final_eq, "ret_pct": ret_pct, "n_trades": n_trades, "win_rate": win_rate, "sharpe": sharpe, "max_dd_u": dd_u, "max_dd_pct": dd_pct, "stable_score": stable_score, "use_1m_filter": int(G_USE_1M), } ) return rows def evaluate_grid( params: list[tuple[int, float]], *, workers: int, df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int, label: str, ) -> pd.DataFrame: by_period: dict[int, set[float]] = defaultdict(set) for p, s in params: by_period[int(p)].add(round(float(s), 2)) tasks = [(p, sorted(stds)) for p, stds in sorted(by_period.items())] total_periods = len(tasks) total_combos = sum(len(stds) for _, stds in tasks) if total_combos == 0: return pd.DataFrame() print(f"[{label}] period组数={total_periods}, 参数组合={total_combos}, workers={workers}") start = time.time() rows: list[dict] = [] done_periods = 0 done_combos = 0 with ProcessPoolExecutor( max_workers=workers, initializer=_init_worker, initargs=(df_path, df_1m_path, use_1m, step_min), ) as ex: future_map = {ex.submit(_eval_period_task, task): task for task in tasks} for fut in as_completed(future_map): period, stds = future_map[fut] res = fut.result() rows.extend(res) done_periods += 1 done_combos += len(stds) if ( done_periods == total_periods or done_periods % max(1, total_periods // 10) == 0 ): elapsed = time.time() - start print( f"[{label}] 进度 {done_combos}/{total_combos} 组合 " f"({done_periods}/{total_periods} periods), {elapsed:.0f}s" ) df = pd.DataFrame(rows) print(f"[{label}] 完成, 用时 {time.time() - start:.1f}s") return df def summarize_yearly(eq: pd.Series, initial_capital: float = 200.0) -> pd.DataFrame: s = eq.dropna().copy() s.index = pd.to_datetime(s.index) out_rows: list[dict] = [] prev = initial_capital for year in range(2020, 2026): sub = s[s.index.year == year] if len(sub) == 0: continue ye = float(sub.iloc[-1]) ret = (ye - prev) / prev * 100.0 if prev > 0 else 0.0 out_rows.append({"year": year, "year_end_equity": ye, "year_return_pct": ret}) prev = ye return pd.DataFrame(out_rows) def main(): parser = argparse.ArgumentParser() parser.add_argument("-p", "--period", default="5m", choices=["5m", "15m", "30m"]) parser.add_argument("--start", default="2020-01-01") parser.add_argument("--end", default="2026-01-01") parser.add_argument("-j", "--workers", type=int, default=max(1, (os.cpu_count() or 4) - 1)) parser.add_argument("--no-1m", action="store_true", help="禁用 1m 方向过滤") args = parser.parse_args() use_1m = not args.no_1m step_min = int(args.period.replace("m", "")) out_dir = Path(__file__).resolve().parent / "strategy" / "results" out_dir.mkdir(parents=True, exist_ok=True) print(f"加载数据: {args.period} {args.start}~{args.end}") t0 = time.time() df = load_klines(args.period, args.start, args.end) df_1m = load_klines("1m", args.start, args.end) if use_1m else None print( f" {args.period}: {len(df):,} 条" + (f", 1m: {len(df_1m):,} 条" if df_1m is not None else "") + f", {time.time()-t0:.1f}s\n" ) with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_df: df.to_pickle(f_df.name) df_path = f_df.name df_1m_path = None if df_1m is not None: with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_1m: df_1m.to_pickle(f_1m.name) df_1m_path = f_1m.name try: evaluated: set[tuple[int, float]] = set() all_parts: list[pd.DataFrame] = [] # Stage 1: 全区间粗扫 stage1 = build_grid(1, 1000, 50, 0.5, 1000, 50) stage1 = [x for x in stage1 if x not in evaluated] df1 = evaluate_grid( stage1, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="stage1-global", ) if not df1.empty: all_parts.append(df1) evaluated.update((int(r["period"]), float(r["std"])) for _, r in df1.iterrows()) seed1 = ( df1.sort_values("stable_score", ascending=False).head(6) if not df1.empty else pd.DataFrame(columns=["period", "std"]) ) # Stage 2: 候选周围中等步长细化 stage2 = build_local_grid(seed1, p_window=25, p_step=5, s_window=50, s_step=10) stage2 = [x for x in stage2 if x not in evaluated] df2 = evaluate_grid( stage2, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="stage2-local", ) if not df2.empty: all_parts.append(df2) evaluated.update((int(r["period"]), float(r["std"])) for _, r in df2.iterrows()) pool2 = pd.concat([d for d in [df1, df2] if not d.empty], ignore_index=True) seed2 = ( pool2.sort_values("stable_score", ascending=False).head(4) if len(pool2) > 0 else pd.DataFrame(columns=["period", "std"]) ) # Stage 3: 候选周围更细化 stage3 = build_local_grid(seed2, p_window=8, p_step=1, s_window=10, s_step=1) stage3 = [x for x in stage3 if x not in evaluated] df3 = evaluate_grid( stage3, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="stage3-fine", ) if not df3.empty: all_parts.append(df3) evaluated.update((int(r["period"]), float(r["std"])) for _, r in df3.iterrows()) pool3 = pd.concat([d for d in [df1, df2, df3] if not d.empty], ignore_index=True) seed3 = ( pool3.sort_values("stable_score", ascending=False).head(2) if len(pool3) > 0 else pd.DataFrame(columns=["period", "std"]) ) # Stage 4: 最终细化(std 0.5 步长) stage4 = build_local_grid(seed3, p_window=3, p_step=1, s_window=4, s_step=0.5) stage4 = [x for x in stage4 if x not in evaluated] df4 = evaluate_grid( stage4, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="stage4-final", ) if not df4.empty: all_parts.append(df4) evaluated.update((int(r["period"]), float(r["std"])) for _, r in df4.iterrows()) if not all_parts: raise RuntimeError("未得到任何评估结果") all_df = pd.concat(all_parts, ignore_index=True) all_df = all_df.drop_duplicates(subset=["period", "std"], keep="last") best_stable = all_df.sort_values("stable_score", ascending=False).iloc[0] best_return = all_df.sort_values("ret_pct", ascending=False).iloc[0] # 对最佳稳定参数再跑一次,导出逐年收益 cfg = BBMidlineConfig( bb_period=int(best_stable["period"]), bb_std=float(best_stable["std"]), initial_capital=200.0, margin_pct=0.01, leverage=100.0, cross_margin=True, fee_rate=0.0005, rebate_pct=0.90, rebate_hour_utc=0, fill_at_close=True, use_1m_touch_filter=use_1m, kline_step_min=step_min, ) final_res = run_bb_midline_backtest(df, cfg, df_1m=df_1m if use_1m else None) final_eq = final_res.equity_curve["equity"].dropna() yearly = summarize_yearly(final_eq, initial_capital=200.0) stamp = time.strftime("%Y%m%d_%H%M%S") all_path = out_dir / f"bb_midline_hier_search_{args.period}_{stamp}.csv" yearly_path = out_dir / f"bb_midline_hier_search_{args.period}_{stamp}_yearly.csv" all_df.sort_values("stable_score", ascending=False).to_csv(all_path, index=False) yearly.to_csv(yearly_path, index=False) print("\n" + "=" * 96) print("分层搜索完成") print( f"最佳稳定参数: period={int(best_stable['period'])}, std={float(best_stable['std']):.2f} | " f"final={best_stable['final_eq']:.4f}U | ret={best_stable['ret_pct']:+.2f}% | " f"dd={best_stable['max_dd_pct']:.2f}% | sharpe={best_stable['sharpe']:.3f} | " f"trades={int(best_stable['n_trades'])}" ) print( f"最高收益参数: period={int(best_return['period'])}, std={float(best_return['std']):.2f} | " f"final={best_return['final_eq']:.4f}U | ret={best_return['ret_pct']:+.2f}% | " f"dd={best_return['max_dd_pct']:.2f}% | sharpe={best_return['sharpe']:.3f} | " f"trades={int(best_return['n_trades'])}" ) print(f"结果文件: {all_path}") print(f"逐年文件: {yearly_path}") print("=" * 96) finally: Path(df_path).unlink(missing_ok=True) if df_1m_path: Path(df_1m_path).unlink(missing_ok=True) if __name__ == "__main__": main()