""" 布林带均线策略 - 全参数组合扫描 (1-1000, 1-1000) 策略: - 阳线 + 先涨碰到均线(1m判断) → 开多 - 持多: 碰上轨止盈 - 阴线 + 先跌碰到均线(1m判断) → 平多开空 - 持空: 碰下轨止盈 配置: 200U | 1%权益/单 | 万五手续费 | 90%返佣次日8点 | 100x杠杆 | 全仓 参数遍历: (0.5,0.5)(0.5,1)...(0.5,std_max), (1,0.5)(1,1)...(1,std_max), ... 直至 (period_max, std_max) """ from __future__ import annotations import os import sys import tempfile import time from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[0])) sys.stdout.reconfigure(line_buffering=True) import numpy as np import pandas as pd from strategy.bb_midline_backtest import BBMidlineConfig, run_bb_midline_backtest from strategy.data_loader import get_1m_touch_direction, load_klines from strategy.indicators import bollinger def build_full_param_grid( period_min: float = 1.0, period_max: float = 1000.0, period_step: float = 1.0, std_min: float = 1.0, std_max: float = 1000.0, std_step: float = 1.0, ) -> list[tuple[int, float]]: """生成全量 (period, std) 组合,period 取整""" out = [] p = period_min while p <= period_max: s = std_min while s <= std_max: out.append((max(1, int(round(p))), round(s, 2))) s += std_step p += period_step return sorted(set(out)) def stable_score(ret_pct: float, sharpe: float, dd_pct: float, n_trades: int) -> float: """收益稳定性评分""" sparse_penalty = -5.0 if n_trades < 200 else 0.0 return ret_pct + sharpe * 12.0 - abs(dd_pct) * 0.8 + sparse_penalty G_DF: pd.DataFrame | None = None G_DF_1M: pd.DataFrame | None = None G_USE_1M: bool = True G_STEP_MIN: int = 5 def _init_worker(df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int): global G_DF, G_DF_1M, G_USE_1M, G_STEP_MIN G_DF = pd.read_pickle(df_path) G_DF_1M = pd.read_pickle(df_1m_path) if (use_1m and df_1m_path) else None G_USE_1M = bool(use_1m) G_STEP_MIN = int(step_min) def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]: period, std_list = args assert G_DF is not None arr_touch_dir = None if G_USE_1M and G_DF_1M is not None: close = G_DF["close"].astype(float) bb_mid, _, _, _ = bollinger(close, period, 1.0) arr_touch_dir = get_1m_touch_direction( G_DF, G_DF_1M, bb_mid.values, kline_step_min=G_STEP_MIN ) rows: list[dict] = [] for std in std_list: cfg = BBMidlineConfig( bb_period=period, bb_std=float(std), initial_capital=200.0, margin_pct=0.01, leverage=100.0, cross_margin=True, fee_rate=0.0005, rebate_pct=0.90, rebate_hour_utc=0, fill_at_close=True, use_1m_touch_filter=G_USE_1M, kline_step_min=G_STEP_MIN, ) result = run_bb_midline_backtest( G_DF, cfg, df_1m=G_DF_1M if G_USE_1M else None, arr_touch_dir_override=arr_touch_dir, ) eq = result.equity_curve["equity"].dropna() if len(eq) == 0: final_eq = 0.0 ret_pct = -100.0 dd_u = -200.0 dd_pct = 100.0 else: final_eq = float(eq.iloc[-1]) ret_pct = (final_eq - cfg.initial_capital) / cfg.initial_capital * 100.0 dd_u = float((eq.astype(float) - eq.astype(float).cummax()).min()) dd_pct = abs(dd_u) / cfg.initial_capital * 100.0 n_trades = len(result.trades) win_rate = ( sum(1 for t in result.trades if t.net_pnl > 0) / n_trades * 100.0 if n_trades > 0 else 0.0 ) pnl = result.daily_stats["pnl"].astype(float) sharpe = ( float(pnl.mean() / pnl.std()) * np.sqrt(365.0) if pnl.std() > 0 else 0.0 ) score = stable_score(ret_pct, sharpe, dd_pct, n_trades) rows.append( { "period": period, "std": round(float(std), 2), "final_eq": final_eq, "ret_pct": ret_pct, "n_trades": n_trades, "win_rate": win_rate, "sharpe": sharpe, "max_dd_u": dd_u, "max_dd_pct": dd_pct, "stable_score": score, } ) return rows def evaluate_grid( params: list[tuple[int, float]], *, workers: int, df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int, ) -> pd.DataFrame: by_period: dict[int, set[float]] = defaultdict(set) for p, s in params: by_period[int(p)].add(round(float(s), 2)) tasks = [(p, sorted(stds)) for p, stds in sorted(by_period.items())] total_periods = len(tasks) total_combos = sum(len(stds) for _, stds in tasks) print(f" 评估 {total_combos:,} 组参数, {total_periods} 个 period, workers={workers}") start = time.time() rows: list[dict] = [] done_periods = 0 done_combos = 0 with ProcessPoolExecutor( max_workers=workers, initializer=_init_worker, initargs=(df_path, df_1m_path, use_1m, step_min), ) as ex: future_map = {ex.submit(_eval_period_task, task): task for task in tasks} for fut in as_completed(future_map): period, stds = future_map[fut] res = fut.result() rows.extend(res) done_periods += 1 done_combos += len(stds) if done_periods % max(1, total_periods // 20) == 0 or done_periods == total_periods: elapsed = time.time() - start print(f" 进度 {done_combos:,}/{total_combos:,} ({elapsed:.0f}s)") df = pd.DataFrame(rows) print(f" 完成, 用时 {time.time() - start:.1f}s") return df def main(): import argparse parser = argparse.ArgumentParser(description="布林带均线策略全参数扫描 (1-1000, 1-1000)") parser.add_argument( "--period-min", type=float, default=1.0, help="period 下限" ) parser.add_argument( "--period-max", type=float, default=1000.0, help="period 上限" ) parser.add_argument( "--period-step", type=float, default=10.0, help="period 步长 (建议10以缩短时间)" ) parser.add_argument("--std-min", type=float, default=0.5, help="std 下限") parser.add_argument("--std-max", type=float, default=1000.0, help="std 上限") parser.add_argument( "--std-step", type=float, default=1.0, help="std 步长" ) parser.add_argument( "-p", "--kline-period", default="5m", choices=["5m", "15m", "30m"] ) parser.add_argument( "-j", "--workers", type=int, default=max(1, (os.cpu_count() or 4) - 1) ) parser.add_argument("--no-1m", action="store_true", help="禁用 1m 触及方向过滤") parser.add_argument( "--source", default="bitmart", choices=["bitmart", "binance"], help="数据源", ) parser.add_argument( "--quick", action="store_true", help="快速模式: period 1-200 step20, std 1-20 step2", ) args = parser.parse_args() use_1m = not args.no_1m step_min = int(args.kline_period.replace("m", "")) if args.quick: args.period_min = 1.0 args.period_max = 200.0 args.period_step = 20.0 args.std_min = 0.5 args.std_max = 20.0 args.std_step = 1.0 print(" 快速模式: period 1-200 step20, std 1-20 step2") out_dir = Path(__file__).resolve().parent / "strategy" / "results" out_dir.mkdir(parents=True, exist_ok=True) print("加载 K 线数据 (2020-01-01 ~ 2026-01-01)...") t0 = time.time() try: df = load_klines(args.kline_period, "2020-01-01", "2026-01-01", source=args.source) df_1m = ( load_klines("1m", "2020-01-01", "2026-01-01", source=args.source) if use_1m else None ) except Exception as e: alt = "binance" if args.source == "bitmart" else "bitmart" print(f" {args.source} 加载失败 ({e}), 尝试 {alt}...") df = load_klines(args.kline_period, "2020-01-01", "2026-01-01", source=alt) df_1m = ( load_klines("1m", "2020-01-01", "2026-01-01", source=alt) if use_1m else None ) args.source = alt print( f" {args.kline_period}: {len(df):,} 条" + (f", 1m: {len(df_1m):,} 条" if df_1m is not None else "") + f" | 数据源: {args.source} ({time.time()-t0:.1f}s)\n" ) grid = build_full_param_grid( period_min=args.period_min, period_max=args.period_max, period_step=args.period_step, std_min=args.std_min, std_max=args.std_max, std_step=args.std_step, ) print(f"参数网格: {len(grid):,} 组") print( f" period: {args.period_min}~{args.period_max} step{args.period_step}, " f"std: {args.std_min}~{args.std_max} step{args.std_step}" ) with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_df: df.to_pickle(f_df.name) df_path = f_df.name df_1m_path = None if df_1m is not None: with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_1m: df_1m.to_pickle(f_1m.name) df_1m_path = f_1m.name try: result_df = evaluate_grid( grid, workers=args.workers, df_path=df_path, df_1m_path=df_1m_path, use_1m=use_1m, step_min=step_min, ) finally: Path(df_path).unlink(missing_ok=True) if df_1m_path: Path(df_1m_path).unlink(missing_ok=True) if result_df.empty: print("无有效结果") return best_stable = result_df.sort_values("stable_score", ascending=False).iloc[0] best_return = result_df.sort_values("ret_pct", ascending=False).iloc[0] stamp = time.strftime("%Y%m%d_%H%M%S") csv_path = out_dir / f"bb_midline_full_grid_{args.kline_period}_{stamp}.csv" result_df.to_csv(csv_path, index=False) print(f"\n扫描结果已保存: {csv_path}") print("\n" + "=" * 90) print("布林带均线策略 | 2020-2025 | 200U | 1%权益/单 | 万五 | 90%返佣次日8点 | 100x全仓") print("=" * 90) print( f"最佳稳定参数: BB({int(best_stable['period'])},{best_stable['std']}) | " f"权益={best_stable['final_eq']:.1f}U | 收益={best_stable['ret_pct']:+.1f}% | " f"回撤={best_stable['max_dd_pct']:.1f}% | Sharpe={best_stable['sharpe']:.2f} | " f"交易={int(best_stable['n_trades'])}" ) print( f"最高收益参数: BB({int(best_return['period'])},{best_return['std']}) | " f"权益={best_return['final_eq']:.1f}U | 收益={best_return['ret_pct']:+.1f}% | " f"回撤={best_return['max_dd_pct']:.1f}% | Sharpe={best_return['sharpe']:.2f} | " f"交易={int(best_return['n_trades'])}" ) print("=" * 90) cfg = BBMidlineConfig( bb_period=int(best_stable["period"]), bb_std=float(best_stable["std"]), initial_capital=200.0, margin_pct=0.01, leverage=100.0, cross_margin=True, fee_rate=0.0005, rebate_pct=0.90, rebate_hour_utc=0, fill_at_close=True, use_1m_touch_filter=use_1m, kline_step_min=step_min, ) final_res = run_bb_midline_backtest( df, cfg, df_1m=df_1m if use_1m else None ) eq = final_res.equity_curve["equity"].dropna() print("\n逐年权益 (年末):") eq_ts = eq.copy() eq_ts.index = pd.to_datetime(eq_ts.index) prev = 200.0 for y in range(2020, 2026): sub = eq_ts[eq_ts.index.year == y] if len(sub) > 0: ye = float(sub.iloc[-1]) ret = (ye - prev) / prev * 100.0 if prev > 0 else 0.0 print(f" {y}: {ye:.1f} U (当年收益 {ret:+.1f}%)") prev = ye trade_path = out_dir / f"bb_midline_best_trades_{args.kline_period}_{stamp}.csv" rows = [] for i, t in enumerate(final_res.trades, 1): rows.append({ "序号": i, "方向": "做多" if t.side == "long" else "做空", "开仓时间": t.entry_time, "平仓时间": t.exit_time, "开仓价": round(t.entry_price, 2), "平仓价": round(t.exit_price, 2), "净盈亏": round(t.net_pnl, 2), "平仓原因": t.exit_reason, }) pd.DataFrame(rows).to_csv(trade_path, index=False, encoding="utf-8-sig") print(f"\n最佳参数交易明细: {trade_path}") if __name__ == "__main__": main()