Files
codex_jxs_code/run_bb_midline_full_grid.py
2026-02-28 13:10:47 +08:00

390 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带均线策略 - 全参数组合扫描 (1-1000, 1-1000)
策略:
- 阳线 + 先涨碰到均线(1m判断) → 开多
- 持多: 碰上轨止盈
- 阴线 + 先跌碰到均线(1m判断) → 平多开空
- 持空: 碰下轨止盈
配置: 200U | 1%权益/单 | 万五手续费 | 90%返佣次日8点 | 100x杠杆 | 全仓
参数遍历: (0.5,0.5)(0.5,1)...(0.5,std_max), (1,0.5)(1,1)...(1,std_max), ...
直至 (period_max, std_max)
"""
from __future__ import annotations
import os
import sys
import tempfile
import time
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[0]))
sys.stdout.reconfigure(line_buffering=True)
import numpy as np
import pandas as pd
from strategy.bb_midline_backtest import BBMidlineConfig, run_bb_midline_backtest
from strategy.data_loader import get_1m_touch_direction, load_klines
from strategy.indicators import bollinger
def build_full_param_grid(
period_min: float = 1.0,
period_max: float = 1000.0,
period_step: float = 1.0,
std_min: float = 1.0,
std_max: float = 1000.0,
std_step: float = 1.0,
) -> list[tuple[int, float]]:
"""生成全量 (period, std) 组合period 取整"""
out = []
p = period_min
while p <= period_max:
s = std_min
while s <= std_max:
out.append((max(1, int(round(p))), round(s, 2)))
s += std_step
p += period_step
return sorted(set(out))
def stable_score(ret_pct: float, sharpe: float, dd_pct: float, n_trades: int) -> float:
"""收益稳定性评分"""
sparse_penalty = -5.0 if n_trades < 200 else 0.0
return ret_pct + sharpe * 12.0 - abs(dd_pct) * 0.8 + sparse_penalty
G_DF: pd.DataFrame | None = None
G_DF_1M: pd.DataFrame | None = None
G_USE_1M: bool = True
G_STEP_MIN: int = 5
def _init_worker(df_path: str, df_1m_path: str | None, use_1m: bool, step_min: int):
global G_DF, G_DF_1M, G_USE_1M, G_STEP_MIN
G_DF = pd.read_pickle(df_path)
G_DF_1M = pd.read_pickle(df_1m_path) if (use_1m and df_1m_path) else None
G_USE_1M = bool(use_1m)
G_STEP_MIN = int(step_min)
def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]:
period, std_list = args
assert G_DF is not None
arr_touch_dir = None
if G_USE_1M and G_DF_1M is not None:
close = G_DF["close"].astype(float)
bb_mid, _, _, _ = bollinger(close, period, 1.0)
arr_touch_dir = get_1m_touch_direction(
G_DF, G_DF_1M, bb_mid.values, kline_step_min=G_STEP_MIN
)
rows: list[dict] = []
for std in std_list:
cfg = BBMidlineConfig(
bb_period=period,
bb_std=float(std),
initial_capital=200.0,
margin_pct=0.01,
leverage=100.0,
cross_margin=True,
fee_rate=0.0005,
rebate_pct=0.90,
rebate_hour_utc=0,
fill_at_close=True,
use_1m_touch_filter=G_USE_1M,
kline_step_min=G_STEP_MIN,
)
result = run_bb_midline_backtest(
G_DF,
cfg,
df_1m=G_DF_1M if G_USE_1M else None,
arr_touch_dir_override=arr_touch_dir,
)
eq = result.equity_curve["equity"].dropna()
if len(eq) == 0:
final_eq = 0.0
ret_pct = -100.0
dd_u = -200.0
dd_pct = 100.0
else:
final_eq = float(eq.iloc[-1])
ret_pct = (final_eq - cfg.initial_capital) / cfg.initial_capital * 100.0
dd_u = float((eq.astype(float) - eq.astype(float).cummax()).min())
dd_pct = abs(dd_u) / cfg.initial_capital * 100.0
n_trades = len(result.trades)
win_rate = (
sum(1 for t in result.trades if t.net_pnl > 0) / n_trades * 100.0
if n_trades > 0
else 0.0
)
pnl = result.daily_stats["pnl"].astype(float)
sharpe = (
float(pnl.mean() / pnl.std()) * np.sqrt(365.0) if pnl.std() > 0 else 0.0
)
score = stable_score(ret_pct, sharpe, dd_pct, n_trades)
rows.append(
{
"period": period,
"std": round(float(std), 2),
"final_eq": final_eq,
"ret_pct": ret_pct,
"n_trades": n_trades,
"win_rate": win_rate,
"sharpe": sharpe,
"max_dd_u": dd_u,
"max_dd_pct": dd_pct,
"stable_score": score,
}
)
return rows
def evaluate_grid(
params: list[tuple[int, float]],
*,
workers: int,
df_path: str,
df_1m_path: str | None,
use_1m: bool,
step_min: int,
) -> pd.DataFrame:
by_period: dict[int, set[float]] = defaultdict(set)
for p, s in params:
by_period[int(p)].add(round(float(s), 2))
tasks = [(p, sorted(stds)) for p, stds in sorted(by_period.items())]
total_periods = len(tasks)
total_combos = sum(len(stds) for _, stds in tasks)
print(f" 评估 {total_combos:,} 组参数, {total_periods} 个 period, workers={workers}")
start = time.time()
rows: list[dict] = []
done_periods = 0
done_combos = 0
with ProcessPoolExecutor(
max_workers=workers,
initializer=_init_worker,
initargs=(df_path, df_1m_path, use_1m, step_min),
) as ex:
future_map = {ex.submit(_eval_period_task, task): task for task in tasks}
for fut in as_completed(future_map):
period, stds = future_map[fut]
res = fut.result()
rows.extend(res)
done_periods += 1
done_combos += len(stds)
if done_periods % max(1, total_periods // 20) == 0 or done_periods == total_periods:
elapsed = time.time() - start
print(f" 进度 {done_combos:,}/{total_combos:,} ({elapsed:.0f}s)")
df = pd.DataFrame(rows)
print(f" 完成, 用时 {time.time() - start:.1f}s")
return df
def main():
import argparse
parser = argparse.ArgumentParser(description="布林带均线策略全参数扫描 (1-1000, 1-1000)")
parser.add_argument(
"--period-min", type=float, default=1.0, help="period 下限"
)
parser.add_argument(
"--period-max", type=float, default=1000.0, help="period 上限"
)
parser.add_argument(
"--period-step", type=float, default=10.0, help="period 步长 (建议10以缩短时间)"
)
parser.add_argument("--std-min", type=float, default=0.5, help="std 下限")
parser.add_argument("--std-max", type=float, default=1000.0, help="std 上限")
parser.add_argument(
"--std-step", type=float, default=1.0, help="std 步长"
)
parser.add_argument(
"-p", "--kline-period", default="5m", choices=["5m", "15m", "30m"]
)
parser.add_argument(
"-j", "--workers", type=int, default=max(1, (os.cpu_count() or 4) - 1)
)
parser.add_argument("--no-1m", action="store_true", help="禁用 1m 触及方向过滤")
parser.add_argument(
"--source",
default="bitmart",
choices=["bitmart", "binance"],
help="数据源",
)
parser.add_argument(
"--quick",
action="store_true",
help="快速模式: period 1-200 step20, std 1-20 step2",
)
args = parser.parse_args()
use_1m = not args.no_1m
step_min = int(args.kline_period.replace("m", ""))
if args.quick:
args.period_min = 1.0
args.period_max = 200.0
args.period_step = 20.0
args.std_min = 0.5
args.std_max = 20.0
args.std_step = 1.0
print(" 快速模式: period 1-200 step20, std 1-20 step2")
out_dir = Path(__file__).resolve().parent / "strategy" / "results"
out_dir.mkdir(parents=True, exist_ok=True)
print("加载 K 线数据 (2020-01-01 ~ 2026-01-01)...")
t0 = time.time()
try:
df = load_klines(args.kline_period, "2020-01-01", "2026-01-01", source=args.source)
df_1m = (
load_klines("1m", "2020-01-01", "2026-01-01", source=args.source)
if use_1m
else None
)
except Exception as e:
alt = "binance" if args.source == "bitmart" else "bitmart"
print(f" {args.source} 加载失败 ({e}), 尝试 {alt}...")
df = load_klines(args.kline_period, "2020-01-01", "2026-01-01", source=alt)
df_1m = (
load_klines("1m", "2020-01-01", "2026-01-01", source=alt)
if use_1m
else None
)
args.source = alt
print(
f" {args.kline_period}: {len(df):,}"
+ (f", 1m: {len(df_1m):,}" if df_1m is not None else "")
+ f" | 数据源: {args.source} ({time.time()-t0:.1f}s)\n"
)
grid = build_full_param_grid(
period_min=args.period_min,
period_max=args.period_max,
period_step=args.period_step,
std_min=args.std_min,
std_max=args.std_max,
std_step=args.std_step,
)
print(f"参数网格: {len(grid):,}")
print(
f" period: {args.period_min}~{args.period_max} step{args.period_step}, "
f"std: {args.std_min}~{args.std_max} step{args.std_step}"
)
with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_df:
df.to_pickle(f_df.name)
df_path = f_df.name
df_1m_path = None
if df_1m is not None:
with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f_1m:
df_1m.to_pickle(f_1m.name)
df_1m_path = f_1m.name
try:
result_df = evaluate_grid(
grid,
workers=args.workers,
df_path=df_path,
df_1m_path=df_1m_path,
use_1m=use_1m,
step_min=step_min,
)
finally:
Path(df_path).unlink(missing_ok=True)
if df_1m_path:
Path(df_1m_path).unlink(missing_ok=True)
if result_df.empty:
print("无有效结果")
return
best_stable = result_df.sort_values("stable_score", ascending=False).iloc[0]
best_return = result_df.sort_values("ret_pct", ascending=False).iloc[0]
stamp = time.strftime("%Y%m%d_%H%M%S")
csv_path = out_dir / f"bb_midline_full_grid_{args.kline_period}_{stamp}.csv"
result_df.to_csv(csv_path, index=False)
print(f"\n扫描结果已保存: {csv_path}")
print("\n" + "=" * 90)
print("布林带均线策略 | 2020-2025 | 200U | 1%权益/单 | 万五 | 90%返佣次日8点 | 100x全仓")
print("=" * 90)
print(
f"最佳稳定参数: BB({int(best_stable['period'])},{best_stable['std']}) | "
f"权益={best_stable['final_eq']:.1f}U | 收益={best_stable['ret_pct']:+.1f}% | "
f"回撤={best_stable['max_dd_pct']:.1f}% | Sharpe={best_stable['sharpe']:.2f} | "
f"交易={int(best_stable['n_trades'])}"
)
print(
f"最高收益参数: BB({int(best_return['period'])},{best_return['std']}) | "
f"权益={best_return['final_eq']:.1f}U | 收益={best_return['ret_pct']:+.1f}% | "
f"回撤={best_return['max_dd_pct']:.1f}% | Sharpe={best_return['sharpe']:.2f} | "
f"交易={int(best_return['n_trades'])}"
)
print("=" * 90)
cfg = BBMidlineConfig(
bb_period=int(best_stable["period"]),
bb_std=float(best_stable["std"]),
initial_capital=200.0,
margin_pct=0.01,
leverage=100.0,
cross_margin=True,
fee_rate=0.0005,
rebate_pct=0.90,
rebate_hour_utc=0,
fill_at_close=True,
use_1m_touch_filter=use_1m,
kline_step_min=step_min,
)
final_res = run_bb_midline_backtest(
df, cfg, df_1m=df_1m if use_1m else None
)
eq = final_res.equity_curve["equity"].dropna()
print("\n逐年权益 (年末):")
eq_ts = eq.copy()
eq_ts.index = pd.to_datetime(eq_ts.index)
prev = 200.0
for y in range(2020, 2026):
sub = eq_ts[eq_ts.index.year == y]
if len(sub) > 0:
ye = float(sub.iloc[-1])
ret = (ye - prev) / prev * 100.0 if prev > 0 else 0.0
print(f" {y}: {ye:.1f} U (当年收益 {ret:+.1f}%)")
prev = ye
trade_path = out_dir / f"bb_midline_best_trades_{args.kline_period}_{stamp}.csv"
rows = []
for i, t in enumerate(final_res.trades, 1):
rows.append({
"序号": i,
"方向": "做多" if t.side == "long" else "做空",
"开仓时间": t.entry_time,
"平仓时间": t.exit_time,
"开仓价": round(t.entry_price, 2),
"平仓价": round(t.exit_price, 2),
"净盈亏": round(t.net_pnl, 2),
"平仓原因": t.exit_reason,
})
pd.DataFrame(rows).to_csv(trade_path, index=False, encoding="utf-8-sig")
print(f"\n最佳参数交易明细: {trade_path}")
if __name__ == "__main__":
main()