From 0ec23b28ebec8aa31e1dab6cdd7240ea1711a075 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Sat, 28 Feb 2026 16:43:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=93=88=E5=93=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run_bb_full_grid_search.py | 283 ++++++++++++++++++++----------------- test.py | 39 ++++- 2 files changed, 191 insertions(+), 131 deletions(-) diff --git a/run_bb_full_grid_search.py b/run_bb_full_grid_search.py index e4e16bd..d7fd280 100644 --- a/run_bb_full_grid_search.py +++ b/run_bb_full_grid_search.py @@ -18,6 +18,7 @@ from __future__ import annotations import argparse import hashlib +import heapq import io import json import math @@ -25,7 +26,6 @@ import os import sys import tempfile import time -from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path @@ -145,8 +145,9 @@ def _init_worker(df_path: str, df_1m_path: str | None, use_1m: bool, step_min: i G_STEP_MIN = int(step_min) -def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]: - period, std_list = args +def _eval_single_task(args: tuple[int, float]) -> dict: + """单个 (period, std) 组合的回测任务""" + period, std = args assert G_DF is not None arr_touch_dir = None @@ -155,64 +156,96 @@ def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]: bb_mid, _, _, _ = bollinger(close, period, 1.0) arr_touch_dir = get_1m_touch_direction(G_DF, G_DF_1M, bb_mid.values, kline_step_min=G_STEP_MIN) - rows: list[dict] = [] - for std in std_list: - cfg = BBMidlineConfig( - bb_period=period, - bb_std=float(std), - initial_capital=200.0, - margin_pct=0.01, - leverage=100.0, - cross_margin=True, - fee_rate=0.0005, - rebate_pct=0.90, - rebate_hour_utc=0, - fill_at_close=True, - use_1m_touch_filter=G_USE_1M, - kline_step_min=G_STEP_MIN, - ) - result = run_bb_midline_backtest( - G_DF, - cfg, - df_1m=G_DF_1M if G_USE_1M else None, - arr_touch_dir_override=arr_touch_dir, - ) + cfg = BBMidlineConfig( + bb_period=period, + bb_std=float(std), + initial_capital=200.0, + margin_pct=0.01, + leverage=100.0, + cross_margin=True, + fee_rate=0.0005, + rebate_pct=0.90, + rebate_hour_utc=0, + fill_at_close=True, + use_1m_touch_filter=G_USE_1M, + kline_step_min=G_STEP_MIN, + ) + result = run_bb_midline_backtest( + G_DF, + cfg, + df_1m=G_DF_1M if G_USE_1M else None, + arr_touch_dir_override=arr_touch_dir, + ) - eq = result.equity_curve["equity"].dropna() - if len(eq) == 0: - final_eq = 0.0 - ret_pct = -100.0 - dd_u = -200.0 - dd_pct = 100.0 - else: - final_eq = float(eq.iloc[-1]) - ret_pct = (final_eq - cfg.initial_capital) / cfg.initial_capital * 100.0 - dd_u = float((eq.astype(float) - eq.astype(float).cummax()).min()) - dd_pct = abs(dd_u) / cfg.initial_capital * 100.0 + eq = result.equity_curve["equity"].dropna() + if len(eq) == 0: + final_eq = 0.0 + ret_pct = -100.0 + dd_u = -200.0 + dd_pct = 100.0 + else: + final_eq = float(eq.iloc[-1]) + ret_pct = (final_eq - cfg.initial_capital) / cfg.initial_capital * 100.0 + dd_u = float((eq.astype(float) - eq.astype(float).cummax()).min()) + dd_pct = abs(dd_u) / cfg.initial_capital * 100.0 - n_trades = len(result.trades) - win_rate = ( - sum(1 for t in result.trades if t.net_pnl > 0) / n_trades * 100.0 - if n_trades > 0 - else 0.0 - ) - pnl = result.daily_stats["pnl"].astype(float) - sharpe = float(pnl.mean() / pnl.std()) * math.sqrt(365.0) if pnl.std() > 0 else 0.0 - stable_score = score_stable(ret_pct, sharpe, dd_pct, n_trades) + n_trades = len(result.trades) + win_rate = ( + sum(1 for t in result.trades if t.net_pnl > 0) / n_trades * 100.0 + if n_trades > 0 + else 0.0 + ) + pnl = result.daily_stats["pnl"].astype(float) + sharpe = float(pnl.mean() / pnl.std()) * math.sqrt(365.0) if pnl.std() > 0 else 0.0 + stable_score = score_stable(ret_pct, sharpe, dd_pct, n_trades) - rows.append({ - "period": period, - "std": round(float(std), 2), - "final_eq": final_eq, - "ret_pct": ret_pct, - "n_trades": n_trades, - "win_rate": win_rate, - "sharpe": sharpe, - "max_dd_u": dd_u, - "max_dd_pct": dd_pct, - "stable_score": stable_score, - }) - return rows + return { + "period": period, + "std": round(float(std), 2), + "final_eq": final_eq, + "ret_pct": ret_pct, + "n_trades": n_trades, + "win_rate": win_rate, + "sharpe": sharpe, + "max_dd_u": dd_u, + "max_dd_pct": dd_pct, + "stable_score": stable_score, + } + + +def _eval_period_task(args: tuple[int, list[float]]) -> list[dict]: + """兼容旧接口:一个 period 组的批量回测""" + period, std_list = args + return [_eval_single_task((period, s)) for s in std_list] + + + +def _format_eta(seconds: float) -> str: + """格式化剩余时间""" + if seconds < 60: + return f"{seconds:.0f}s" + elif seconds < 3600: + return f"{seconds / 60:.1f}min" + else: + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + return f"{h}h{m:02d}m" + + +def _print_top_n(rows: list[dict], n: int = 10, label: str = "当前 Top 10") -> None: + """打印当前 Top N 排行榜""" + if not rows: + return + sorted_rows = sorted(rows, key=lambda r: r["stable_score"], reverse=True)[:n] + print(f"\n{'─' * 90}", flush=True) + print(f" 📊 {label} (按稳定性评分排序)", flush=True) + print(f" {'排名':>4s} {'period':>6s} {'std':>7s} {'收益%':>9s} {'回撤%':>7s} {'夏普':>7s} {'交易数':>6s} {'评分':>8s}", flush=True) + print(f" {'─' * 82}", flush=True) + for i, r in enumerate(sorted_rows, 1): + print(f" {i:4d} {int(r['period']):6d} {r['std']:7.2f} {r['ret_pct']:+9.2f} " + f"{r['max_dd_pct']:7.2f} {r['sharpe']:7.3f} {int(r['n_trades']):6d} " + f"{r['stable_score']:8.1f}", flush=True) + print(f"{'─' * 90}\n", flush=True) def run_grid_search( @@ -229,100 +262,94 @@ def run_grid_search( meta: dict | None = None, checkpoint_interval: int = 5, ) -> pd.DataFrame: - by_period: dict[int, set[float]] = defaultdict(set) - for p, s in params: - by_period[int(p)].add(round(float(s), 2)) - - tasks = [(p, sorted(stds)) for p, stds in sorted(by_period.items())] - total_periods = len(tasks) - total_combos = sum(len(stds) for _, stds in tasks) + total_combos = len(params) rows: list[dict] = list(existing_rows) if existing_rows else [] - print(f"待运行: {total_combos} 组合 ({total_periods} period组), workers={workers}" + ( + print(f"待运行: {total_combos} 组合, workers={workers}" + ( f", 断点续跑 (已有 {len(rows)} 条)" if rows else "" )) - start = time.time() - last_save = 0 + t_start = time.time() + done_combos = 0 + last_save_count = 0 + last_top_time = t_start + # checkpoint 按组合数保存,默认每 500 个组合保存一次 + ckpt_combo_interval = max(100, checkpoint_interval * 50) + # Top N 排行榜刷新间隔(秒) + top_n_interval = 30.0 - def maybe_save(n_done_periods: int): - nonlocal last_save - if ckpt_path and meta_path and meta and n_done_periods > 0: - if n_done_periods - last_save >= checkpoint_interval: + def maybe_save(): + nonlocal last_save_count + if ckpt_path and meta_path and meta and done_combos > 0: + if done_combos - last_save_count >= ckpt_combo_interval: save_checkpoint(ckpt_path, meta_path, meta, rows) - last_save = n_done_periods + last_save_count = done_combos + + def maybe_print_top(): + nonlocal last_top_time + now = time.time() + if now - last_top_time >= top_n_interval: + _print_top_n(rows) + last_top_time = now + + def on_result(row: dict): + nonlocal done_combos + rows.append(row) + done_combos += 1 + elapsed = time.time() - t_start + speed = done_combos / elapsed if elapsed > 0 else 0 + remaining = total_combos - done_combos + eta = remaining / speed if speed > 0 else 0 + pct = done_combos / total_combos * 100 + + # 每个结果都实时打印 + print(f"✓ [{done_combos:>7d}/{total_combos} {pct:5.1f}% ETA {_format_eta(eta)}] " + f"p={int(row['period']):4d} s={row['std']:7.2f} | " + f"收益:{row['ret_pct']:+8.2f}% 回撤:{row['max_dd_pct']:6.2f}% " + f"夏普:{row['sharpe']:7.3f} 交易:{int(row['n_trades']):5d} " + f"评分:{row['stable_score']:8.1f}", flush=True) + + maybe_save() + maybe_print_top() if workers <= 1: _init_worker(df_path, df_1m_path, use_1m, step_min) - done_periods = 0 - done_combos = 0 - for task in tasks: - res = _eval_period_task(task) - period = task[0] - # 打印该period的所有结果 - for row in res: - print(f"✓ period={int(row['period']):4d}, std={float(row['std']):7.2f} | " - f"收益: {row['ret_pct']:+7.2f}% | 回撤: {row['max_dd_pct']:6.2f}% | " - f"夏普: {row['sharpe']:7.3f} | 交易: {int(row['n_trades']):6d} | " - f"评分: {row['stable_score']:7.1f}", flush=True) - rows.extend(res) - done_periods += 1 - done_combos += len(task[1]) - maybe_save(done_periods) - if done_periods % max(1, total_periods // 20) == 0 or done_periods == total_periods: - elapsed = time.time() - start - print(f"进度 {done_combos}/{total_combos} ({done_periods}/{total_periods} periods), 用时 {elapsed:.0f}s", flush=True) + for p, s in params: + row = _eval_single_task((p, s)) + on_result(row) else: - done_periods = 0 - done_combos = 0 + # 多进程:逐个 (period, std) 提交,实时返回 + # 为避免提交 200 万个 future 占用过多内存,分批提交 + batch_size = workers * 20 # 每批提交的任务数 try: with ProcessPoolExecutor( max_workers=workers, initializer=_init_worker, initargs=(df_path, df_1m_path, use_1m, step_min), ) as ex: - future_map = {ex.submit(_eval_period_task, task): task for task in tasks} - for fut in as_completed(future_map): - period, stds = future_map[fut] - res = fut.result() - # 打印该period的所有结果 - for row in res: - print(f"✓ period={int(row['period']):4d}, std={float(row['std']):7.2f} | " - f"收益: {row['ret_pct']:+7.2f}% | 回撤: {row['max_dd_pct']:6.2f}% | " - f"夏普: {row['sharpe']:7.3f} | 交易: {int(row['n_trades']):6d} | " - f"评分: {row['stable_score']:7.1f}", flush=True) - rows.extend(res) - done_periods += 1 - done_combos += len(stds) - maybe_save(done_periods) - if done_periods % max(1, total_periods // 20) == 0 or done_periods == total_periods: - elapsed = time.time() - start - print(f"进度 {done_combos}/{total_combos} ({done_periods}/{total_periods} periods), 用时 {elapsed:.0f}s", flush=True) + idx = 0 + while idx < total_combos: + batch_end = min(idx + batch_size, total_combos) + batch = params[idx:batch_end] + future_map = {ex.submit(_eval_single_task, (p, s)): (p, s) for p, s in batch} + for fut in as_completed(future_map): + row = fut.result() + on_result(row) + idx = batch_end except (PermissionError, OSError) as e: print(f"多进程不可用 ({e}),改用单进程...") _init_worker(df_path, df_1m_path, use_1m, step_min) - done_periods = 0 - done_combos = 0 - for task in tasks: - res = _eval_period_task(task) - # 打印该period的所有结果 - for row in res: - print(f"✓ period={int(row['period']):4d}, std={float(row['std']):7.2f} | " - f"收益: {row['ret_pct']:+7.2f}% | 回撤: {row['max_dd_pct']:6.2f}% | " - f"夏普: {row['sharpe']:7.3f} | 交易: {int(row['n_trades']):6d} | " - f"评分: {row['stable_score']:7.1f}", flush=True) - rows.extend(res) - done_periods += 1 - done_combos += len(task[1]) - maybe_save(done_periods) - if done_periods % max(1, total_periods // 20) == 0 or done_periods == total_periods: - elapsed = time.time() - start - print(f"进度 {done_combos}/{total_combos} ({done_periods}/{total_periods} periods), 用时 {elapsed:.0f}s", flush=True) + for p, s in params[done_combos:]: + row = _eval_single_task((p, s)) + on_result(row) if ckpt_path and meta_path and meta and rows: save_checkpoint(ckpt_path, meta_path, meta, rows) + # 最终排行榜 + _print_top_n(rows, n=20, label="最终 Top 20") + df = pd.DataFrame(rows) - print(f"完成, 总用时 {time.time() - start:.1f}s") + print(f"完成, 总用时 {time.time() - t_start:.1f}s, 平均 {total_combos / (time.time() - t_start):.1f} 组合/秒") return df diff --git a/test.py b/test.py index 084d46d..76c247d 100644 --- a/test.py +++ b/test.py @@ -1,5 +1,38 @@ +import subprocess +import time from wxautox4 import WeChat -wx = WeChat() -wx.ChatWith('Rainbow') # 切换到目标聊天 -# 再通过 UIAutomation 定位并点击语音/视频通话按钮 \ No newline at end of file +def make_wechat_call(contact_name, retry_times=3): + """ + 可靠的微信拨电话方法 + + Args: + contact_name: 联系人名称 + retry_times: 重试次数 + """ + for attempt in range(retry_times): + try: + # 打开微信应用 + subprocess.Popen(['/Applications/WeChat.app/Contents/MacOS/WeChat']) + time.sleep(2) # 等待微信启动 + + # 初始化 + wx = WeChat() + time.sleep(1) + + # 切换聊天 + wx.ChatWith(contact_name) + time.sleep(1) + + print(f"✓ 已切换到 {contact_name} 聊天") + return True + + except Exception as e: + print(f"⚠ 第 {attempt+1} 次尝试失败: {e}") + if attempt < retry_times - 1: + time.sleep(1) + + return False + +# 使用 +make_wechat_call('Rainbow') \ No newline at end of file