""" 布林带均线策略 - 全参数组合扫描 (0.5~1000, 0.5~1000) 分层搜索:粗扫 → 精扫,在合理时间内覆盖全参数空间 策略: - 阳线 + 先涨碰到均线(1m判断) → 开多 - 持多: 碰上轨止盈 - 阴线 + 先跌碰到均线(1m判断) → 平多开空 - 持空: 碰下轨止盈 配置: 200U | 1%权益/单 | 万五手续费 | 90%返佣次日8点 | 100x杠杆 | 全仓 """ from __future__ import annotations import os import sys import tempfile import time from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[0])) sys.stdout.reconfigure(line_buffering=True) import numpy as np import pandas as pd from strategy.bb_midline_backtest import BBMidlineConfig, run_bb_midline_backtest from strategy.data_loader import get_1m_touch_direction, load_klines from strategy.indicators import bollinger # ─── 全局变量 (多进程 worker 共享) ─── G_DF: pd.DataFrame | None = None G_DF_1M: pd.DataFrame | None = None G_USE_1M: bool = True G_STEP_MIN: int = 5 def _init_worker(df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int): global G_DF, G_DF_1M, G_USE_1M, G_STEP_MIN G_DF = pd.read_pickle(df_path) G_DF_1M = pd.read_pickle(df_1m_path) if (use_1m and df_1m_path) else None G_USE_1M = bool(use_1m) G_STEP_MIN = int(step_min) def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]: """评估一个 period 下的所有 std 组合""" period, std_list = args assert G_DF is not None # 对同一个 period,1m 触及方向只需计算一次 arr_touch_dir = None if G_USE_1M and G_DF_1M is not None: close = G_DF["close"].astype(float) bb_mid, _, _, _ = bollinger(close, period, 1.0) arr_touch_dir = get_1m_touch_direction( G_DF, G_DF_1M, bb_mid.values, kline_step_min=G_STEP_MIN ) rows: list[dict] = [] for std in std_list: cfg = BBMidlineConfig( bb_period=period, bb_std=float(std), initial_capital=200.0, margin_pct=0.01, leverage=100.0, cross_margin=True, fee_rate=0.0005, rebate_pct=0.90, rebate_hour_utc=0, fill_at_close=True, use_1m_touch_filter=G_USE_1M, kline_step_min=G_STEP_MIN, ) result = run_bb_midline_backtest( G_DF, cfg, df_1m=G_DF_1M if G_USE_1M else None, arr_touch_dir_override=arr_touch_dir, ) eq = result.equity_curve["equity"].dropna() if len(eq) == 0: final_eq = 0.0 ret_pct = -100.0 dd_u = -200.0 dd_pct = 100.0 else: final_eq = float(eq.iloc[-1]) ret_pct = (final_eq - cfg.initial_capital) / cfg.initial_capital * 100.0 dd_u = float((eq.astype(float) - eq.astype(float).cummax()).min()) dd_pct = abs(dd_u) / cfg.initial_capital * 100.0 n_trades = len(result.trades) win_rate = ( sum(1 for t in result.trades if t.net_pnl > 0) / n_trades * 100.0 if n_trades > 0 else 0.0 ) pnl = result.daily_stats["pnl"].astype(float) sharpe = ( float(pnl.mean() / pnl.std()) * np.sqrt(365.0) if pnl.std() > 0 else 0.0 ) # 稳定性评分 sparse_penalty = -5.0 if n_trades < 200 else 0.0 score = ret_pct + sharpe * 12.0 - abs(dd_pct) * 0.8 + sparse_penalty rows.append({ "period": period, "std": round(float(std), 2), "final_eq": round(final_eq, 2), "ret_pct": round(ret_pct, 2), "n_trades": n_trades, "win_rate": round(win_rate, 2), "sharpe": round(sharpe, 4), "max_dd_u": round(dd_u, 2), "max_dd_pct": round(dd_pct, 2), "stable_score": round(score, 2), }) return rows def evaluate_grid( params: list[tuple[int, float]], *, workers: int, df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int, label: str = "", ) -> pd.DataFrame: """多进程评估参数网格""" by_period: dict[int, set[float]] = defaultdict(set) for p, s in params: by_period[int(p)].add(round(float(s), 2)) tasks = [(p, sorted(stds)) for p, stds in sorted(by_period.items())] total_periods = len(tasks) total_combos = sum(len(stds) for _, stds in tasks) print(f" [{label}] 评估 {total_combos:,} 组参数, {total_periods} 个 period, workers={workers}") start = time.time() rows: list[dict] = [] done_periods = 0 done_combos = 0 with ProcessPoolExecutor( max_workers=workers, initializer=_init_worker, initargs=(df_path, df_1m_path, use_1m, step_min), ) as ex: future_map = {ex.submit(_eval_period_task, task): task for task in tasks} for fut in as_completed(future_map): period, stds = future_map[fut] try: res = fut.result() rows.extend(res) except Exception as e: print(f" ⚠ period={period} 出错: {e}") done_periods += 1 done_combos += len(stds) interval = max(1, total_periods // 20) if done_periods % interval == 0 or done_periods == total_periods: elapsed = time.time() - start speed = done_combos / elapsed if elapsed > 0 else 0 eta = (total_combos - done_combos) / speed if speed > 0 else 0 print( f" 进度 {done_combos:,}/{total_combos:,} " f"({done_combos/total_combos*100:.1f}%) " f"| {elapsed:.0f}s | ETA {eta:.0f}s" ) df = pd.DataFrame(rows) print(f" [{label}] 完成, 用时 {time.time() - start:.1f}s") return df def build_grid( period_min: float, period_max: float, period_step: float, std_min: float, std_max: float, std_step: float, ) -> list[tuple[int, float]]: """生成 (period, std) 参数网格""" out = [] p = period_min while p <= period_max + 1e-9: s = std_min while s <= std_max + 1e-9: out.append((max(1, int(round(p))), round(s, 2))) s += std_step p += period_step return sorted(set(out)) def main(): import argparse parser = argparse.ArgumentParser(description="布林带均线策略 - 全参数扫描 (分层搜索)") parser.add_argument("-p", "--kline-period", default="5m", choices=["5m", "15m", "30m"]) parser.add_argument("-j", "--workers", type=int, default=max(1, (os.cpu_count() or 4) - 1)) parser.add_argument("--no-1m", action="store_true", help="禁用 1m 触及方向过滤") parser.add_argument("--source", default="bitmart", choices=["bitmart", "binance"]) parser.add_argument("--coarse-only", action="store_true", help="只做粗扫") parser.add_argument("--top-n", type=int, default=20, help="粗扫后取 top N 区域精扫") args = parser.parse_args() use_1m = not args.no_1m step_min = int(args.kline_period.replace("m", "")) out_dir = Path(__file__).resolve().parent / "strategy" / "results" out_dir.mkdir(parents=True, exist_ok=True) # ─── 加载数据 ─── print("=" * 90) print("布林带均线策略 | 全参数扫描 | 2020-2025 | 200U | 1%/单 | 万五 | 90%返佣 | 100x全仓") print("=" * 90) print(f"\n加载 K 线数据 (2020-01-01 ~ 2026-01-01)...") t0 = time.time() try: df = load_klines(args.kline_period, "2020-01-01", "2026-01-01", source=args.source) df_1m = load_klines("1m", "2020-01-01", "2026-01-01", source=args.source) if use_1m else None except Exception as e: alt = "binance" if args.source == "bitmart" else "bitmart" print(f" {args.source} 加载失败 ({e}), 尝试 {alt}...") df = load_klines(args.kline_period, "2020-01-01", "2026-01-01", source=alt) df_1m = load_klines("1m", "2020-01-01", "2026-01-01", source=alt) if use_1m else None args.source = alt print( f" {args.kline_period}: {len(df):,} 条" + (f", 1m: {len(df_1m):,} 条" if df_1m is not None else "") + f" | 数据源: {args.source} ({time.time()-t0:.1f}s)\n" ) # 序列化数据给子进程 with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_df: df.to_pickle(f_df.name) df_path = f_df.name df_1m_path = None if df_1m is not None: with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_1m: df_1m.to_pickle(f_1m.name) df_1m_path = f_1m.name try: # ─── 第一阶段:粗扫 ─── # period: 1~1000 步长50, std: 0.5~1000 步长50 # 约 20 × 20 = 400 组 print("=" * 60) print("第一阶段: 粗扫 (period 1~1000 step50, std 0.5~1000 step50)") print("=" * 60) coarse_grid = build_grid(1, 1000, 50, 0.5, 1000, 50) print(f" 参数组合数: {len(coarse_grid):,}") coarse_df = evaluate_grid( coarse_grid, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="粗扫", ) stamp = time.strftime("%Y%m%d_%H%M%S") coarse_csv = out_dir / f"bb_sweep_coarse_{args.kline_period}_{stamp}.csv" coarse_df.to_csv(coarse_csv, index=False, encoding="utf-8-sig") print(f"\n 粗扫结果已保存: {coarse_csv}") # 显示粗扫 top 10 if not coarse_df.empty: top10 = coarse_df.sort_values("stable_score", ascending=False).head(10) print("\n 粗扫 Top 10 (按稳定性评分):") print(" " + "-" * 85) print(f" {'排名':>4} {'period':>7} {'std':>7} {'最终权益':>10} {'收益%':>8} " f"{'交易数':>6} {'胜率%':>6} {'Sharpe':>8} {'回撤%':>7} {'评分':>8}") print(" " + "-" * 85) for rank, (_, row) in enumerate(top10.iterrows(), 1): print( f" {rank:>4} {int(row['period']):>7} {row['std']:>7.1f} " f"{row['final_eq']:>10.2f} {row['ret_pct']:>+8.1f} " f"{int(row['n_trades']):>6} {row['win_rate']:>6.1f} " f"{row['sharpe']:>8.4f} {row['max_dd_pct']:>7.1f} " f"{row['stable_score']:>8.2f}" ) if args.coarse_only or coarse_df.empty: print("\n粗扫完成。") return # ─── 第二阶段:中扫 ─── # 取粗扫 top N 的区域,在其周围 ±50 范围内用步长 10 精扫 print(f"\n{'=' * 60}") print(f"第二阶段: 中扫 (粗扫 Top {args.top_n} 区域, 步长 10)") print("=" * 60) top_coarse = coarse_df.sort_values("stable_score", ascending=False).head(args.top_n) mid_params = set() for _, row in top_coarse.iterrows(): p_center = int(row["period"]) s_center = float(row["std"]) for p in range(max(1, p_center - 50), min(1001, p_center + 51), 10): for s_val in np.arange(max(0.5, s_center - 50), min(1000.5, s_center + 51), 10): mid_params.add((max(1, int(round(p))), round(float(s_val), 2))) mid_grid = sorted(mid_params) print(f" 参数组合数: {len(mid_grid):,}") mid_df = evaluate_grid( mid_grid, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="中扫", ) mid_csv = out_dir / f"bb_sweep_mid_{args.kline_period}_{stamp}.csv" mid_df.to_csv(mid_csv, index=False, encoding="utf-8-sig") print(f"\n 中扫结果已保存: {mid_csv}") # ─── 第三阶段:精扫 ─── # 取中扫 top 10 区域,在其周围 ±10 范围内用步长 1 精扫 print(f"\n{'=' * 60}") print("第三阶段: 精扫 (中扫 Top 10 区域, 步长 1)") print("=" * 60) all_mid = pd.concat([coarse_df, mid_df], ignore_index=True) top_mid = all_mid.sort_values("stable_score", ascending=False).head(10) fine_params = set() for _, row in top_mid.iterrows(): p_center = int(row["period"]) s_center = float(row["std"]) for p in range(max(1, p_center - 10), min(1001, p_center + 11)): for s_val in np.arange(max(0.5, s_center - 10), min(1000.5, s_center + 11), 1.0): fine_params.add((max(1, int(round(p))), round(float(s_val), 2))) fine_grid = sorted(fine_params) print(f" 参数组合数: {len(fine_grid):,}") fine_df = evaluate_grid( fine_grid, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, label="精扫", ) fine_csv = out_dir / f"bb_sweep_fine_{args.kline_period}_{stamp}.csv" fine_df.to_csv(fine_csv, index=False, encoding="utf-8-sig") print(f"\n 精扫结果已保存: {fine_csv}") # ─── 汇总 ─── all_results = pd.concat([coarse_df, mid_df, fine_df], ignore_index=True) all_results = all_results.drop_duplicates(subset=["period", "std"], keep="last") all_results = all_results.sort_values("stable_score", ascending=False) all_csv = out_dir / f"bb_sweep_all_{args.kline_period}_{stamp}.csv" all_results.to_csv(all_csv, index=False, encoding="utf-8-sig") print(f"\n{'=' * 90}") print("全部扫描完成 | 汇总结果") print("=" * 90) print(f"总计评估: {len(all_results):,} 组参数") print(f"结果文件: {all_csv}\n") # Top 20 top20 = all_results.head(20) print("Top 20 (按稳定性评分):") print("-" * 95) print(f"{'排名':>4} {'period':>7} {'std':>7} {'最终权益':>10} {'收益%':>8} " f"{'交易数':>6} {'胜率%':>6} {'Sharpe':>8} {'回撤%':>7} {'评分':>8}") print("-" * 95) for rank, (_, row) in enumerate(top20.iterrows(), 1): print( f"{rank:>4} {int(row['period']):>7} {row['std']:>7.1f} " f"{row['final_eq']:>10.2f} {row['ret_pct']:>+8.1f} " f"{int(row['n_trades']):>6} {row['win_rate']:>6.1f} " f"{row['sharpe']:>8.4f} {row['max_dd_pct']:>7.1f} " f"{row['stable_score']:>8.2f}" ) # 最佳参数详细回测 best = all_results.iloc[0] print(f"\n{'=' * 90}") print(f"最佳参数: BB({int(best['period'])}, {best['std']})") print(f"最终权益: {best['final_eq']:.2f} U | 收益: {best['ret_pct']:+.2f}%") print(f"交易次数: {int(best['n_trades'])} | 胜率: {best['win_rate']:.1f}%") print(f"Sharpe: {best['sharpe']:.4f} | 最大回撤: {best['max_dd_pct']:.1f}%") print("=" * 90) # 逐年权益 cfg = BBMidlineConfig( bb_period=int(best["period"]), bb_std=float(best["std"]), initial_capital=200.0, margin_pct=0.01, leverage=100.0, cross_margin=True, fee_rate=0.0005, rebate_pct=0.90, rebate_hour_utc=0, fill_at_close=True, use_1m_touch_filter=use_1m, kline_step_min=step_min, ) final_res = run_bb_midline_backtest(df, cfg, df_1m=df_1m if use_1m else None) eq = final_res.equity_curve["equity"].dropna() print("\n逐年权益 (年末):") eq_ts = eq.copy() eq_ts.index = pd.to_datetime(eq_ts.index) prev = 200.0 for y in range(2020, 2026): sub = eq_ts[eq_ts.index.year == y] if len(sub) > 0: ye = float(sub.iloc[-1]) ret = (ye - prev) / prev * 100.0 if prev > 0 else 0.0 print(f" {y}: {ye:.2f} U (当年收益 {ret:+.1f}%)") prev = ye print(f"\n总手续费: {final_res.total_fee:.2f} U") print(f"总返佣: {final_res.total_rebate:.2f} U") print(f"净手续费: {final_res.total_fee - final_res.total_rebate:.2f} U") # 保存最佳参数交易明细 trade_path = out_dir / f"bb_sweep_best_trades_{args.kline_period}_{stamp}.csv" trade_rows = [] for i, t in enumerate(final_res.trades, 1): trade_rows.append({ "序号": i, "方向": "做多" if t.side == "long" else "做空", "开仓时间": t.entry_time, "平仓时间": t.exit_time, "开仓价": round(t.entry_price, 2), "平仓价": round(t.exit_price, 2), "净盈亏": round(t.net_pnl, 4), "平仓原因": t.exit_reason, }) pd.DataFrame(trade_rows).to_csv(trade_path, index=False, encoding="utf-8-sig") print(f"\n最佳参数交易明细: {trade_path}") finally: Path(df_path).unlink(missing_ok=True) if df_1m_path: Path(df_1m_path).unlink(missing_ok=True) if __name__ == "__main__": main()