This commit is contained in:
ddrwode
2026-02-06 14:43:20 +08:00
parent c58f020e9e
commit 51458e2649
3 changed files with 205 additions and 47 deletions

View File

@@ -94,6 +94,18 @@ python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之
- 实盘下单数量现在会直接使用 `default_order_size`(不再固定写死 25与回测/优化口径一致。
- `current_params.json` 新增支持趋势过滤和 AI 门控参数(见第 7 节)。
### 启动时自动优化与加速(不减少计算数据)
数据量、试验数、窗口数、压力场景均与配置一致,加速仅通过**代码并行**实现:
- **并行试验**Optuna`BITMART_AUTO_OPT_N_JOBS=4` 或 优化脚本 `--n-jobs 4`,多组试验同时跑。
- **并行压力场景**:每个窗口内多组 slip×fee 同时回测。环境变量 `BITMART_AUTO_OPT_STRESS_WORKERS=4` 或 优化脚本 `--n-stress-workers 4`
- **跳过近期已优化**`BITMART_AUTO_OPT_SKIP_IF_FRESH_HOURS=24` 表示若 `current_params.json` 24 小时内已更新则跳过本次优化,直接加载。
单独运行优化脚本时:
- `--n-jobs 4`Optuna 并行 4 个试验。
- `--n-stress-workers 4`:每窗口 4 个进程并行跑压力场景(不减少 slip/fee 组数)。
## 4. 实时价格WebSocket
保守模式脚本已支持:

View File

@@ -6,6 +6,7 @@ import os
import random
import re
import sys
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean, pstdev
@@ -156,6 +157,77 @@ def load_credentials_from_strategy_file(strategy_path: Path) -> Dict[str, str]:
return result
def _run_one_stress_scenario(
task: Tuple[
Dict[str, Any],
float,
float,
List[Dict[str, float]],
List[Dict[str, float]],
float,
float,
float,
float,
int,
float,
]
) -> Tuple[float, float, float, Any, Any]:
"""
单压力场景回测(供多进程调用,不减少计算量)。
返回: (train_score, valid_score, valid_drawdown, base_train_res, base_valid_res)
base_* 仅当 slip≈1 且 fee≈1 时非 None。
"""
(
params,
slip_mult,
fee_mult,
train_data,
valid_data,
raw_fee_rate,
rebate_ratio,
drawdown_weight,
loss_day_weight,
min_trades,
undertrade_penalty,
) = task
scenario_params = dict(params)
scenario_params["slippage_pct"] = max(
0.0,
_as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult),
)
scenario_params = normalize_params(scenario_params)
scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult))
backtester = ConservativeBacktester(
params=scenario_params,
raw_fee_rate=scenario_raw_fee_rate,
rebate_ratio=rebate_ratio,
)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
is_base = abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9
return (
train_score,
valid_score,
valid_res.max_drawdown,
train_res if is_base else None,
valid_res if is_base else None,
)
def sample_random_params(rng: random.Random) -> Dict[str, Any]:
params = dict(DEFAULT_PARAMS)
params.update(
@@ -207,10 +279,13 @@ def evaluate_candidate(
stability_weight: float,
stress_slippage_multipliers: List[float],
stress_fee_multipliers: List[float],
n_stress_workers: int = 0,
) -> Tuple[float, Dict[str, float], Dict[str, float]]:
train_results = []
valid_results = []
window_scores = []
num_stress = len(stress_slippage_multipliers) * len(stress_fee_multipliers)
use_parallel = n_stress_workers > 0 and num_stress > 1
for train_data, valid_data in windows:
scenario_train_scores: List[float] = []
@@ -219,50 +294,78 @@ def evaluate_candidate(
base_train_res = None
base_valid_res = None
for slip_mult in stress_slippage_multipliers:
for fee_mult in stress_fee_multipliers:
scenario_params = dict(params)
scenario_params["slippage_pct"] = max(
0.0,
_as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult),
)
scenario_params = normalize_params(scenario_params)
scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult))
backtester = ConservativeBacktester(
params=scenario_params,
raw_fee_rate=scenario_raw_fee_rate,
rebate_ratio=rebate_ratio,
if use_parallel:
tasks = [
(
params,
slip_mult,
fee_mult,
train_data,
valid_data,
raw_fee_rate,
rebate_ratio,
drawdown_weight,
loss_day_weight,
min_trades,
undertrade_penalty,
)
for slip_mult in stress_slippage_multipliers
for fee_mult in stress_fee_multipliers
]
max_workers = min(n_stress_workers, len(tasks))
with ProcessPoolExecutor(max_workers=max_workers) as executor:
for res in executor.map(_run_one_stress_scenario, tasks):
train_score, valid_score, valid_dd, tr, vr = res
scenario_train_scores.append(train_score)
scenario_valid_scores.append(valid_score)
scenario_valid_drawdowns.append(valid_dd)
if tr is not None and vr is not None:
base_train_res, base_valid_res = tr, vr
else:
for slip_mult in stress_slippage_multipliers:
for fee_mult in stress_fee_multipliers:
scenario_params = dict(params)
scenario_params["slippage_pct"] = max(
0.0,
_as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult),
)
scenario_params = normalize_params(scenario_params)
scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult))
backtester = ConservativeBacktester(
params=scenario_params,
raw_fee_rate=scenario_raw_fee_rate,
rebate_ratio=rebate_ratio,
)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
scenario_train_scores.append(train_score)
scenario_valid_scores.append(valid_score)
scenario_valid_drawdowns.append(valid_res.max_drawdown)
scenario_train_scores.append(train_score)
scenario_valid_scores.append(valid_score)
scenario_valid_drawdowns.append(valid_res.max_drawdown)
if base_train_res is None and abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None and abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None or base_valid_res is None:
fallback_backtester = ConservativeBacktester(
@@ -360,6 +463,7 @@ def optimize_with_optuna(
stability_weight=args.stability_weight,
stress_slippage_multipliers=args.stress_slippage_multipliers,
stress_fee_multipliers=args.stress_fee_multipliers,
n_stress_workers=getattr(args, "n_stress_workers", 0),
)
trial.set_user_attr("train_agg", train_agg)
@@ -375,7 +479,8 @@ def optimize_with_optuna(
}
return score
study.optimize(objective, n_trials=args.n_trials, show_progress_bar=False)
n_jobs = max(1, int(getattr(args, "n_jobs", 1)))
study.optimize(objective, n_trials=args.n_trials, n_jobs=n_jobs, show_progress_bar=(n_jobs == 1))
if best:
return best
@@ -411,6 +516,7 @@ def optimize_with_random(
stability_weight=args.stability_weight,
stress_slippage_multipliers=args.stress_slippage_multipliers,
stress_fee_multipliers=args.stress_fee_multipliers,
n_stress_workers=getattr(args, "n_stress_workers", 0),
)
if best is None or score > best["score"]:
@@ -513,6 +619,9 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--method", type=str, default="auto", choices=["auto", "optuna", "random"], help="Search method.")
parser.add_argument("--n-trials", type=int, default=300, help="Number of optimization trials.")
parser.add_argument("--n-jobs", type=int, default=1, help="Parallel trials (Optuna only). 1=sequential, 2+=parallel.")
parser.add_argument("--n-stress-workers", type=int, default=0, help="Parallel stress scenarios per window (0=sequential). e.g. 4 runs slip×fee in parallel.")
parser.add_argument("--max-windows", type=int, default=0, help="Cap rolling windows (0=no cap). Speeds up when set to 2-3.")
parser.add_argument("--seed", type=int, default=42, help="Random seed.")
parser.add_argument("--raw-fee-rate", type=float, default=0.0006, help="Raw taker fee rate (e.g. 0.0006).")
@@ -568,6 +677,10 @@ def main() -> None:
"Could not build rolling windows. Increase data days or reduce train/valid window sizes."
)
max_windows = max(0, int(getattr(args, "max_windows", 0)))
if max_windows > 0 and len(windows) > max_windows:
windows = windows[:max_windows]
print(f"Capped rolling windows to {max_windows} (--max-windows)")
print(f"Loaded klines: {len(klines)}, rolling windows: {len(windows)}")
use_method = args.method

View File

@@ -148,6 +148,11 @@ class BitmartFuturesTransactionConservative:
self.auto_optimize_data_file = os.getenv("BITMART_AUTO_OPT_DATA_FILE", "").strip()
self.auto_optimize_script_path = Path(__file__).with_name("optimize_params.py")
self.auto_optimized_this_run = False
# 若参数文件在 N 小时内更新过则跳过优化0=不跳过)
self.auto_optimize_skip_if_fresh_hours = float(os.getenv("BITMART_AUTO_OPT_SKIP_IF_FRESH_HOURS", "0"))
# 并行试验数Optuna 有效)、压力场景数(每窗口 slip×fee 并行)
self.auto_optimize_n_jobs = int(os.getenv("BITMART_AUTO_OPT_N_JOBS", "0"))
self.auto_optimize_stress_workers = int(os.getenv("BITMART_AUTO_OPT_STRESS_WORKERS", "0"))
# 方案B智能动态模式
self.enable_smart_mode = os.getenv("BITMART_SMART_MODE_ENABLED", "1").strip().lower() not in {"0", "false", "off", "no"}
@@ -301,6 +306,7 @@ class BitmartFuturesTransactionConservative:
def run_auto_optimization_before_trade(self):
"""
启动前自动运行 optimize_params.py完成后重新加载参数。
支持快速启动(少天数/少试验/单压力场景)与“参数文件若为新则跳过优化”。
"""
if not self.auto_optimize_before_trade:
logger.info("启动前自动优化已禁用BITMART_AUTO_OPTIMIZE_BEFORE_TRADE=0")
@@ -308,6 +314,21 @@ class BitmartFuturesTransactionConservative:
if self.auto_optimized_this_run:
return True
output_path = self.get_runtime_params_path()
if self.auto_optimize_skip_if_fresh_hours > 0 and output_path.exists():
try:
mtime = output_path.stat().st_mtime
age_hours = (time.time() - mtime) / 3600.0
if age_hours <= self.auto_optimize_skip_if_fresh_hours:
logger.info(
f"参数文件 {output_path.name} {age_hours:.1f}h 内已更新跳过本次优化BITMART_AUTO_OPT_SKIP_IF_FRESH_HOURS={self.auto_optimize_skip_if_fresh_hours}"
)
self.load_runtime_params()
self.capture_base_risk_params()
return True
except Exception as e:
logger.debug(f"检查参数文件新鲜度失败: {e}")
script_path = self.auto_optimize_script_path
if not script_path.exists():
msg = f"未找到参数优化脚本: {script_path}"
@@ -317,9 +338,17 @@ class BitmartFuturesTransactionConservative:
logger.warning(f"{msg},继续使用当前参数")
return True
output_path = self.get_runtime_params_path()
output_path.parent.mkdir(parents=True, exist_ok=True)
# 使用完整数据与试验数不缩减加速依赖优化脚本内部并行n_jobs / 压力场景并行)
days = self.auto_optimize_days
train_days = self.auto_optimize_train_days
valid_days = self.auto_optimize_valid_days
window_step_days = self.auto_optimize_window_step_days
n_trials = self.auto_optimize_n_trials
stress_slippage = self.auto_optimize_stress_slippage
stress_fee = self.auto_optimize_stress_fee
method = str(self.auto_optimize_method).strip().lower() or "auto"
if method not in {"auto", "optuna", "random"}:
logger.warning(f"未知优化方法 {method},自动改为 auto")
@@ -338,21 +367,21 @@ class BitmartFuturesTransactionConservative:
sys.executable,
str(script_path),
"--days",
str(self.auto_optimize_days),
str(days),
"--symbol",
self.contract_symbol,
"--step",
"5",
"--train-days",
str(self.auto_optimize_train_days),
str(train_days),
"--valid-days",
str(self.auto_optimize_valid_days),
str(valid_days),
"--window-step-days",
str(self.auto_optimize_window_step_days),
str(window_step_days),
"--method",
method,
"--n-trials",
str(self.auto_optimize_n_trials),
str(n_trials),
"--valid-score-weight",
str(self.auto_optimize_valid_score_weight),
"--drawdown-guard",
@@ -360,12 +389,16 @@ class BitmartFuturesTransactionConservative:
"--stability-weight",
str(self.auto_optimize_stability_weight),
"--stress-slippage-multipliers",
str(self.auto_optimize_stress_slippage),
stress_slippage,
"--stress-fee-multipliers",
str(self.auto_optimize_stress_fee),
stress_fee,
"--output",
str(output_path),
]
if self.auto_optimize_n_jobs > 0:
cmd.extend(["--n-jobs", str(self.auto_optimize_n_jobs)])
if self.auto_optimize_stress_workers > 0:
cmd.extend(["--n-stress-workers", str(self.auto_optimize_stress_workers)])
if self.auto_optimize_data_file:
cmd.extend(["--data-file", self.auto_optimize_data_file])