diff --git a/bitmart/保守模式参数优化/README.md b/bitmart/保守模式参数优化/README.md index 95d6f39..36920bb 100644 --- a/bitmart/保守模式参数优化/README.md +++ b/bitmart/保守模式参数优化/README.md @@ -29,6 +29,24 @@ python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_p 优化完成后会写入: - `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/current_params.json` +如果你想做更稳健的参数(推荐实盘前使用): + +```bash +python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \ + --data-file "/Users/ddrwode/code/lm_code/bitmart/数据/your_5m_90days.csv" \ + --days 90 \ + --train-days 45 \ + --valid-days 15 \ + --window-step-days 7 \ + --method optuna \ + --n-trials 1200 \ + --valid-score-weight 0.9 \ + --drawdown-guard 8 \ + --stability-weight 0.6 \ + --stress-slippage-multipliers "0.8,1.0,1.2,1.4" \ + --stress-fee-multipliers "1.0,1.15,1.3" +``` + ## 2. 不提供 CSV,直接从 API 自动抓取 30 天数据并优化 最简单一条命令(会自动抓取并计算): @@ -72,6 +90,10 @@ BITMART_PARAMS_PATH="/absolute/path/to/current_params.json" \ python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py" ``` +注意: +- 实盘下单数量现在会直接使用 `default_order_size`(不再固定写死 25),与回测/优化口径一致。 +- `current_params.json` 新增支持趋势过滤和 AI 门控参数(见第 7 节)。 + ## 4. 实时价格(WebSocket) 保守模式脚本已支持: @@ -132,8 +154,20 @@ pip3 install websocket-client 可通过命令行改: - `--raw-fee-rate` - `--rebate-ratio` +- `--stress-slippage-multipliers` +- `--stress-fee-multipliers` -## 7. 重要提示 +## 7. 趋势过滤 + AI 门控(新) + +实盘与回测都支持以下参数(可由优化器自动搜索): +- 趋势过滤:`enable_trend_filter`、`trend_ema_fast`、`trend_ema_slow`、`trend_strength_threshold_pct`、`trend_slope_lookback`、`trend_slope_threshold_pct` +- AI 门控:`enable_ai_filter`、`ai_min_confidence`、`ai_bias`、`ai_w_trend_align`、`ai_w_breakout_strength`、`ai_w_entity_strength`、`ai_w_shadow_balance`、`ai_w_volatility_fit` + +说明: +- 趋势过滤只影响新开仓,不阻断风控平仓和反手。 +- AI 门控是轻量打分器(logistic 风格),用于 `trade/skip`,不是黑盒大模型。 + +## 8. 重要提示 - 回测撮合属于简化模型,不等于实盘撮合。 - 参数应周期性重训(例如每天或每周)。 diff --git a/bitmart/保守模式参数优化/backtest_engine.py b/bitmart/保守模式参数优化/backtest_engine.py index a84fc80..65ec8c9 100644 --- a/bitmart/保守模式参数优化/backtest_engine.py +++ b/bitmart/保守模式参数优化/backtest_engine.py @@ -1,6 +1,7 @@ from __future__ import annotations import csv +import math import time from dataclasses import asdict, dataclass from datetime import datetime, timezone @@ -22,6 +23,21 @@ DEFAULT_PARAMS: Dict[str, Any] = { "open_breakout_buffer_pct": 0.03, "enable_shadow_reverse": False, "shadow_reverse_threshold_pct": 0.15, + "enable_trend_filter": True, + "trend_ema_fast": 21, + "trend_ema_slow": 55, + "trend_strength_threshold_pct": 0.02, + "trend_slope_lookback": 3, + "trend_slope_threshold_pct": 0.005, + "enable_ai_filter": True, + "ai_min_confidence": 0.58, + "ai_bias": -0.1, + "ai_w_trend_align": 1.1, + "ai_w_breakout_strength": 0.8, + "ai_w_entity_strength": 0.55, + "ai_w_shadow_balance": 0.35, + "ai_w_volatility_fit": 0.45, + "dynamic_vol_target_pct": 0.25, "open_cooldown_seconds": 300, "reverse_cooldown_seconds": 300, "reverse_min_move_pct": 0.15, @@ -322,6 +338,69 @@ class ConservativeBacktester: shift = self.slippage_pct / 100.0 return price * (1.0 + shift) if is_buy else price * (1.0 - shift) + @staticmethod + def _clip(value: float, low: float, high: float) -> float: + return max(low, min(high, value)) + + def _signal_confidence( + self, + side: str, + current_price: float, + entry_price: float, + prev_kline: Dict[str, float], + levels: Dict[str, float], + trend_bias: str, + trend_strength_pct: float, + target_volatility_pct: float, + ) -> float: + prev_entity = self._entity_size(prev_kline) + prev_close = max(float(prev_kline.get("close", 0.0)), 0.000001) + entity_pct = prev_entity / prev_close * 100.0 + + upper_shadow_pct = levels["upper_shadow_pct"] + lower_shadow_pct = levels["lower_shadow_pct"] + + if entry_price <= 0: + breakout_move_pct = 0.0 + elif side == "long": + breakout_move_pct = max(0.0, (current_price - entry_price) / entry_price * 100.0) + else: + breakout_move_pct = max(0.0, (entry_price - current_price) / entry_price * 100.0) + + if side == "long": + trend_align = 1.0 if trend_bias == "bull" else (-1.0 if trend_bias == "bear" else 0.0) + shadow_balance = lower_shadow_pct - upper_shadow_pct + else: + trend_align = 1.0 if trend_bias == "bear" else (-1.0 if trend_bias == "bull" else 0.0) + shadow_balance = upper_shadow_pct - lower_shadow_pct + + trend_strength_threshold = max(_as_float(self.params.get("trend_strength_threshold_pct", 0.02), 0.02), 0.01) + trend_strength_scale = self._clip(abs(trend_strength_pct) / trend_strength_threshold, 0.0, 2.5) + trend_component = trend_align * trend_strength_scale + breakout_strength = self._clip(breakout_move_pct / 0.25, 0.0, 2.5) + entity_strength = self._clip(entity_pct / 0.50, 0.0, 2.5) + shadow_component = self._clip(shadow_balance / 0.30, -2.5, 2.5) + + current_vol = 0.0 + current_close = float(current_price) + if current_close > 0: + current_vol = abs(float(prev_kline["high"]) - float(prev_kline["low"])) / current_close * 100.0 + + target_vol = max(float(target_volatility_pct), 0.05) + vol_fit = 1.0 - abs(current_vol - target_vol) / (target_vol * 2.0) + vol_fit = self._clip(vol_fit, -1.5, 1.5) + + score = ( + _as_float(self.params.get("ai_bias", -0.1), -0.1) + + _as_float(self.params.get("ai_w_trend_align", 1.1), 1.1) * trend_component + + _as_float(self.params.get("ai_w_breakout_strength", 0.8), 0.8) * breakout_strength + + _as_float(self.params.get("ai_w_entity_strength", 0.55), 0.55) * entity_strength + + _as_float(self.params.get("ai_w_shadow_balance", 0.35), 0.35) * shadow_component + + _as_float(self.params.get("ai_w_volatility_fit", 0.45), 0.45) * vol_fit + ) + score = self._clip(score, -60.0, 60.0) + return 1.0 / (1.0 + math.exp(-score)) + def _calc_levels(self, prev_kline: Dict[str, float], current_kline: Dict[str, float]) -> Optional[Dict[str, float]]: prev_entity = self._entity_size(prev_kline) if prev_entity < 0.1: @@ -404,6 +483,15 @@ class ConservativeBacktester: enable_shadow_reverse = _as_bool(self.params.get("enable_shadow_reverse", False)) shadow_reverse_threshold = _as_float(self.params.get("shadow_reverse_threshold_pct", 0.15), 0.15) + enable_trend_filter = _as_bool(self.params.get("enable_trend_filter", True)) + trend_ema_fast = int(max(_as_float(self.params.get("trend_ema_fast", 21), 21), 2)) + trend_ema_slow = int(max(_as_float(self.params.get("trend_ema_slow", 55), 55), trend_ema_fast + 1)) + trend_strength_threshold = _as_float(self.params.get("trend_strength_threshold_pct", 0.02), 0.02) + trend_slope_lookback = int(max(_as_float(self.params.get("trend_slope_lookback", 3), 3), 1)) + trend_slope_threshold = _as_float(self.params.get("trend_slope_threshold_pct", 0.005), 0.005) + enable_ai_filter = _as_bool(self.params.get("enable_ai_filter", True)) + ai_min_confidence = _as_float(self.params.get("ai_min_confidence", 0.58), 0.58) + target_volatility_pct = _as_float(self.params.get("dynamic_vol_target_pct", 0.25), 0.25) position = 0 # 1 long, -1 short, 0 flat entry_price = 0.0 @@ -426,6 +514,11 @@ class ConservativeBacktester: trades = 0 wins = 0 daily_pnl: Dict[str, float] = {} + ema_fast = float(klines[0]["close"]) + ema_slow = float(klines[0]["close"]) + alpha_fast = 2.0 / (trend_ema_fast + 1.0) + alpha_slow = 2.0 / (trend_ema_slow + 1.0) + slow_history: List[float] = [ema_slow] def mark_to_market(mark_price: float) -> float: if position == 0: @@ -516,6 +609,27 @@ class ConservativeBacktester: prev_kline = klines[idx - 1] current_kline = klines[idx] current_ts = int(current_kline["id"]) + close_price = float(current_kline["close"]) + ema_fast = ema_fast + alpha_fast * (close_price - ema_fast) + ema_slow = ema_slow + alpha_slow * (close_price - ema_slow) + slow_history.append(ema_slow) + max_slow_history = trend_slope_lookback + 5 + if len(slow_history) > max_slow_history: + slow_history = slow_history[-max_slow_history:] + + trend_strength_pct = 0.0 + trend_slope_pct = 0.0 + trend_bias = "neutral" + if ema_slow > 0: + trend_strength_pct = (ema_fast - ema_slow) / ema_slow * 100.0 + if len(slow_history) > trend_slope_lookback: + ref_slow = slow_history[-1 - trend_slope_lookback] + if ref_slow > 0: + trend_slope_pct = (ema_slow - ref_slow) / ref_slow * 100.0 + if trend_strength_pct >= trend_strength_threshold and trend_slope_pct >= trend_slope_threshold: + trend_bias = "bull" + elif trend_strength_pct <= -trend_strength_threshold and trend_slope_pct <= -trend_slope_threshold: + trend_bias = "bear" levels = self._calc_levels(prev_kline, current_kline) if not levels: @@ -564,7 +678,29 @@ class ConservativeBacktester: signal_price = levels["short_entry_price"] if signal_type and can_open(current_ts): - open_position(1 if signal_type == "long" else -1, signal_price, current_ts) + allow_signal = True + if enable_trend_filter: + if signal_type == "long" and trend_bias == "bear": + allow_signal = False + elif signal_type == "short" and trend_bias == "bull": + allow_signal = False + + if allow_signal and enable_ai_filter: + confidence = self._signal_confidence( + side=signal_type, + current_price=close_price, + entry_price=signal_price, + prev_kline=prev_kline, + levels=levels, + trend_bias=trend_bias, + trend_strength_pct=trend_strength_pct, + target_volatility_pct=target_volatility_pct, + ) + if confidence < ai_min_confidence: + allow_signal = False + + if allow_signal: + open_position(1 if signal_type == "long" else -1, signal_price, current_ts) elif position == 1: reverse_hit = current_kline["low"] <= levels["short_trigger"] and not skip_short diff --git a/bitmart/保守模式参数优化/optimize_params.py b/bitmart/保守模式参数优化/optimize_params.py index c6ee50c..7a924eb 100644 --- a/bitmart/保守模式参数优化/optimize_params.py +++ b/bitmart/保守模式参数优化/optimize_params.py @@ -37,6 +37,35 @@ def _as_float(value: Any, default: float) -> float: return default +def _as_bool(value: Any, default: bool = False) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + return value.strip().lower() in {"1", "true", "yes", "y", "on"} + return default + + +def _clamp(value: float, low: float, high: float) -> float: + return max(low, min(high, value)) + + +def _parse_float_list(text: str, default: List[float]) -> List[float]: + if not text: + return list(default) + values: List[float] = [] + for part in text.split(","): + raw = part.strip() + if not raw: + continue + try: + values.append(float(raw)) + except Exception: + continue + return values or list(default) + + def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]: p = dict(params) @@ -60,8 +89,24 @@ def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]: p["order_notional_usd"] = round(max(_as_float(p.get("order_notional_usd", 100.0), 100.0), 5.0), 4) p["pnl_per_usd_move"] = round(max(_as_float(p.get("pnl_per_usd_move", 1.0), 1.0), 0.01), 4) p["slippage_pct"] = round(max(_as_float(p.get("slippage_pct", 0.01), 0.01), 0.0), 5) + p["dynamic_vol_target_pct"] = round(_clamp(_as_float(p.get("dynamic_vol_target_pct", 0.25), 0.25), 0.05, 1.5), 6) - p["enable_shadow_reverse"] = bool(p.get("enable_shadow_reverse", False)) + p["enable_shadow_reverse"] = _as_bool(p.get("enable_shadow_reverse", False), False) + p["enable_trend_filter"] = _as_bool(p.get("enable_trend_filter", True), True) + p["trend_ema_fast"] = int(max(_as_float(p.get("trend_ema_fast", 21), 21), 2)) + p["trend_ema_slow"] = int(max(_as_float(p.get("trend_ema_slow", 55), 55), p["trend_ema_fast"] + 1)) + p["trend_strength_threshold_pct"] = round(_clamp(_as_float(p.get("trend_strength_threshold_pct", 0.02), 0.02), 0.0, 0.2), 6) + p["trend_slope_lookback"] = int(max(_as_float(p.get("trend_slope_lookback", 3), 3), 1)) + p["trend_slope_threshold_pct"] = round(_clamp(_as_float(p.get("trend_slope_threshold_pct", 0.005), 0.005), 0.0, 0.1), 6) + + p["enable_ai_filter"] = _as_bool(p.get("enable_ai_filter", True), True) + p["ai_min_confidence"] = round(_clamp(_as_float(p.get("ai_min_confidence", 0.58), 0.58), 0.5, 0.95), 6) + p["ai_bias"] = round(_clamp(_as_float(p.get("ai_bias", -0.1), -0.1), -3.0, 3.0), 6) + p["ai_w_trend_align"] = round(_clamp(_as_float(p.get("ai_w_trend_align", 1.1), 1.1), 0.0, 3.0), 6) + p["ai_w_breakout_strength"] = round(_clamp(_as_float(p.get("ai_w_breakout_strength", 0.8), 0.8), 0.0, 3.0), 6) + p["ai_w_entity_strength"] = round(_clamp(_as_float(p.get("ai_w_entity_strength", 0.55), 0.55), 0.0, 3.0), 6) + p["ai_w_shadow_balance"] = round(_clamp(_as_float(p.get("ai_w_shadow_balance", 0.35), 0.35), 0.0, 3.0), 6) + p["ai_w_volatility_fit"] = round(_clamp(_as_float(p.get("ai_w_volatility_fit", 0.45), 0.45), 0.0, 3.0), 6) # Keep relationships sane. if p["trailing_activation_usd"] <= p["break_even_activation_usd"]: @@ -127,6 +172,21 @@ def sample_random_params(rng: random.Random) -> Dict[str, Any]: "reverse_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]), "enable_shadow_reverse": rng.random() < 0.15, "shadow_reverse_threshold_pct": rng.uniform(0.08, 0.5), + "enable_trend_filter": rng.random() < 0.90, + "trend_ema_fast": rng.randint(8, 34), + "trend_ema_slow": rng.randint(35, 120), + "trend_strength_threshold_pct": rng.uniform(0.0, 0.08), + "trend_slope_lookback": rng.randint(1, 8), + "trend_slope_threshold_pct": rng.uniform(0.0, 0.03), + "enable_ai_filter": rng.random() < 0.90, + "ai_min_confidence": rng.uniform(0.50, 0.85), + "ai_bias": rng.uniform(-1.8, 1.0), + "ai_w_trend_align": rng.uniform(0.1, 2.5), + "ai_w_breakout_strength": rng.uniform(0.1, 2.5), + "ai_w_entity_strength": rng.uniform(0.1, 2.5), + "ai_w_shadow_balance": rng.uniform(0.0, 2.2), + "ai_w_volatility_fit": rng.uniform(0.0, 2.2), + "dynamic_vol_target_pct": rng.uniform(0.10, 0.80), "slippage_pct": rng.uniform(0.005, 0.05), } ) @@ -145,40 +205,86 @@ def evaluate_candidate( valid_score_weight: float, drawdown_guard: float, stability_weight: float, + stress_slippage_multipliers: List[float], + stress_fee_multipliers: List[float], ) -> Tuple[float, Dict[str, float], Dict[str, float]]: train_results = [] valid_results = [] window_scores = [] for train_data, valid_data in windows: - backtester = ConservativeBacktester(params=params, raw_fee_rate=raw_fee_rate, rebate_ratio=rebate_ratio) + scenario_train_scores: List[float] = [] + scenario_valid_scores: List[float] = [] + scenario_valid_drawdowns: List[float] = [] + base_train_res = None + base_valid_res = None - train_res = backtester.run(train_data) - valid_res = backtester.run(valid_data) + for slip_mult in stress_slippage_multipliers: + for fee_mult in stress_fee_multipliers: + scenario_params = dict(params) + scenario_params["slippage_pct"] = max( + 0.0, + _as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult), + ) + scenario_params = normalize_params(scenario_params) + scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult)) + backtester = ConservativeBacktester( + params=scenario_params, + raw_fee_rate=scenario_raw_fee_rate, + rebate_ratio=rebate_ratio, + ) - train_results.append(train_res) - valid_results.append(valid_res) + train_res = backtester.run(train_data) + valid_res = backtester.run(valid_data) - train_score = score_result( - train_res, - drawdown_weight=drawdown_weight, - loss_day_weight=loss_day_weight, - min_trades=min_trades, - undertrade_penalty=undertrade_penalty, - ) - valid_score = score_result( - valid_res, - drawdown_weight=drawdown_weight, - loss_day_weight=loss_day_weight, - min_trades=min_trades, - undertrade_penalty=undertrade_penalty, - ) + train_score = score_result( + train_res, + drawdown_weight=drawdown_weight, + loss_day_weight=loss_day_weight, + min_trades=min_trades, + undertrade_penalty=undertrade_penalty, + ) + valid_score = score_result( + valid_res, + drawdown_weight=drawdown_weight, + loss_day_weight=loss_day_weight, + min_trades=min_trades, + undertrade_penalty=undertrade_penalty, + ) - combined = valid_score_weight * valid_score + (1.0 - valid_score_weight) * train_score + scenario_train_scores.append(train_score) + scenario_valid_scores.append(valid_score) + scenario_valid_drawdowns.append(valid_res.max_drawdown) - # Hard guard against large validation drawdown. - if valid_res.max_drawdown > drawdown_guard: - combined -= (valid_res.max_drawdown - drawdown_guard) * 2.0 + if base_train_res is None and abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9: + base_train_res = train_res + base_valid_res = valid_res + + if base_train_res is None: + base_train_res = train_res + base_valid_res = valid_res + + if base_train_res is None or base_valid_res is None: + fallback_backtester = ConservativeBacktester( + params=params, + raw_fee_rate=raw_fee_rate, + rebate_ratio=rebate_ratio, + ) + base_train_res = fallback_backtester.run(train_data) + base_valid_res = fallback_backtester.run(valid_data) + + train_results.append(base_train_res) + valid_results.append(base_valid_res) + + train_score_mean = mean(scenario_train_scores) if scenario_train_scores else -1e9 + valid_score_mean = mean(scenario_valid_scores) if scenario_valid_scores else -1e9 + valid_score_min = min(scenario_valid_scores) if scenario_valid_scores else -1e9 + robustness_penalty = max(0.0, valid_score_mean - valid_score_min) * 0.35 + combined = valid_score_weight * valid_score_mean + (1.0 - valid_score_weight) * train_score_mean - robustness_penalty + + valid_drawdown_max = max(scenario_valid_drawdowns) if scenario_valid_drawdowns else 0.0 + if valid_drawdown_max > drawdown_guard: + combined -= (valid_drawdown_max - drawdown_guard) * 2.0 window_scores.append(combined) @@ -220,6 +326,21 @@ def optimize_with_optuna( "reverse_cooldown_seconds": trial.suggest_int("reverse_cooldown_seconds", 60, 900, step=60), "enable_shadow_reverse": trial.suggest_categorical("enable_shadow_reverse", [False, True]), "shadow_reverse_threshold_pct": trial.suggest_float("shadow_reverse_threshold_pct", 0.08, 0.5), + "enable_trend_filter": trial.suggest_categorical("enable_trend_filter", [False, True]), + "trend_ema_fast": trial.suggest_int("trend_ema_fast", 8, 34), + "trend_ema_slow": trial.suggest_int("trend_ema_slow", 35, 120), + "trend_strength_threshold_pct": trial.suggest_float("trend_strength_threshold_pct", 0.0, 0.08), + "trend_slope_lookback": trial.suggest_int("trend_slope_lookback", 1, 8), + "trend_slope_threshold_pct": trial.suggest_float("trend_slope_threshold_pct", 0.0, 0.03), + "enable_ai_filter": trial.suggest_categorical("enable_ai_filter", [False, True]), + "ai_min_confidence": trial.suggest_float("ai_min_confidence", 0.50, 0.85), + "ai_bias": trial.suggest_float("ai_bias", -1.8, 1.0), + "ai_w_trend_align": trial.suggest_float("ai_w_trend_align", 0.1, 2.5), + "ai_w_breakout_strength": trial.suggest_float("ai_w_breakout_strength", 0.1, 2.5), + "ai_w_entity_strength": trial.suggest_float("ai_w_entity_strength", 0.1, 2.5), + "ai_w_shadow_balance": trial.suggest_float("ai_w_shadow_balance", 0.0, 2.2), + "ai_w_volatility_fit": trial.suggest_float("ai_w_volatility_fit", 0.0, 2.2), + "dynamic_vol_target_pct": trial.suggest_float("dynamic_vol_target_pct", 0.10, 0.80), "slippage_pct": trial.suggest_float("slippage_pct", 0.005, 0.05), } ) @@ -237,6 +358,8 @@ def optimize_with_optuna( valid_score_weight=args.valid_score_weight, drawdown_guard=args.drawdown_guard, stability_weight=args.stability_weight, + stress_slippage_multipliers=args.stress_slippage_multipliers, + stress_fee_multipliers=args.stress_fee_multipliers, ) trial.set_user_attr("train_agg", train_agg) @@ -286,6 +409,8 @@ def optimize_with_random( valid_score_weight=args.valid_score_weight, drawdown_guard=args.drawdown_guard, stability_weight=args.stability_weight, + stress_slippage_multipliers=args.stress_slippage_multipliers, + stress_fee_multipliers=args.stress_fee_multipliers, ) if best is None or score > best["score"]: @@ -400,6 +525,18 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--valid-score-weight", type=float, default=0.8, help="Weight of validation score in combined score.") parser.add_argument("--drawdown-guard", type=float, default=10.0, help="Extra hard penalty beyond this validation drawdown.") parser.add_argument("--stability-weight", type=float, default=0.3, help="Penalty for net-pnl variance across windows.") + parser.add_argument( + "--stress-slippage-multipliers", + type=str, + default="0.8,1.0,1.2", + help="Comma-separated slippage multipliers used in robust scoring.", + ) + parser.add_argument( + "--stress-fee-multipliers", + type=str, + default="1.0,1.15", + help="Comma-separated raw-fee multipliers used in robust scoring.", + ) parser.add_argument( "--output", @@ -413,6 +550,8 @@ def parse_args() -> argparse.Namespace: def main() -> None: args = parse_args() + args.stress_slippage_multipliers = _parse_float_list(args.stress_slippage_multipliers, [0.8, 1.0, 1.2]) + args.stress_fee_multipliers = _parse_float_list(args.stress_fee_multipliers, [1.0, 1.15]) klines, source_meta = load_klines(args) if len(klines) < 200: @@ -463,6 +602,8 @@ def main() -> None: "window_step_days": args.window_step_days, "windows": len(windows), "seed": args.seed, + "stress_slippage_multipliers": args.stress_slippage_multipliers, + "stress_fee_multipliers": args.stress_fee_multipliers, }, "fee_model": { "raw_fee_rate": args.raw_fee_rate, diff --git a/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py b/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py index 5eb30b0..cc0b47d 100644 --- a/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py +++ b/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py @@ -1,5 +1,7 @@ import json +import math import os +import subprocess import sys import threading import time @@ -71,6 +73,26 @@ class BitmartFuturesTransactionConservative: self.open_breakout_buffer_pct = 0.03 # 开仓突破附加缓冲(百分比),减少假突破 self.enable_shadow_reverse = False # 保守模式默认关闭影线反手 self.shadow_reverse_threshold_pct = 0.15 # 影线反手阈值(百分比) + # 趋势过滤:仅限制新开仓,不影响持仓风控与反手 + self.enable_trend_filter = True + self.trend_ema_fast = 21 + self.trend_ema_slow = 55 + self.trend_strength_threshold_pct = 0.02 + self.trend_slope_lookback = 3 + self.trend_slope_threshold_pct = 0.005 + # 轻量 AI 门控:对候选开仓信号打分,低置信度跳过 + self.enable_ai_filter = True + self.ai_min_confidence = 0.58 + self.ai_bias = -0.1 + self.ai_w_trend_align = 1.1 + self.ai_w_breakout_strength = 0.8 + self.ai_w_entity_strength = 0.55 + self.ai_w_shadow_balance = 0.35 + self.ai_w_volatility_fit = 0.45 + self.last_trend_bias = "neutral" + self.last_trend_strength_pct = 0.0 + self.last_trend_slope_pct = 0.0 + self.recent_klines_cache = [] # 策略相关变量 self.prev_kline = None # 上一根K线 @@ -102,10 +124,36 @@ class BitmartFuturesTransactionConservative: self.last_dynamic_risk_kline_id = None self.base_risk_params = {} + # 启动前自动参数优化(按你的命令默认开启) + self.auto_optimize_before_trade = os.getenv( + "BITMART_AUTO_OPTIMIZE_BEFORE_TRADE", "1" + ).strip().lower() not in {"0", "false", "off", "no"} + self.auto_optimize_require_success = os.getenv( + "BITMART_AUTO_OPTIMIZE_REQUIRE_SUCCESS", "1" + ).strip().lower() not in {"0", "false", "off", "no"} + self.auto_optimize_days = int(os.getenv("BITMART_AUTO_OPT_DAYS", "90")) + self.auto_optimize_train_days = int(os.getenv("BITMART_AUTO_OPT_TRAIN_DAYS", "45")) + self.auto_optimize_valid_days = int(os.getenv("BITMART_AUTO_OPT_VALID_DAYS", "15")) + self.auto_optimize_window_step_days = int(os.getenv("BITMART_AUTO_OPT_WINDOW_STEP_DAYS", "7")) + self.auto_optimize_method = os.getenv("BITMART_AUTO_OPT_METHOD", "optuna") + self.auto_optimize_n_trials = int(os.getenv("BITMART_AUTO_OPT_N_TRIALS", "1200")) + self.auto_optimize_valid_score_weight = float(os.getenv("BITMART_AUTO_OPT_VALID_SCORE_WEIGHT", "0.9")) + self.auto_optimize_drawdown_guard = float(os.getenv("BITMART_AUTO_OPT_DRAWDOWN_GUARD", "8")) + self.auto_optimize_stability_weight = float(os.getenv("BITMART_AUTO_OPT_STABILITY_WEIGHT", "0.6")) + self.auto_optimize_stress_slippage = os.getenv("BITMART_AUTO_OPT_STRESS_SLIPPAGE", "0.8,1.0,1.2,1.4") + self.auto_optimize_stress_fee = os.getenv("BITMART_AUTO_OPT_STRESS_FEE", "1.0,1.15,1.3") + self.auto_optimize_data_file = os.getenv("BITMART_AUTO_OPT_DATA_FILE", "").strip() + self.auto_optimize_script_path = Path(__file__).with_name("optimize_params.py") + self.auto_optimized_this_run = False + # 启动时尝试读取动态参数(可由优化脚本自动生成) self.load_runtime_params() self.capture_base_risk_params() + def get_runtime_params_path(self): + params_path_env = os.getenv("BITMART_PARAMS_PATH") + return Path(params_path_env).expanduser() if params_path_env else Path(__file__).with_name("current_params.json") + def load_runtime_params(self): """ 从 current_params.json 或环境变量 BITMART_PARAMS_PATH 指向的文件加载参数。 @@ -113,8 +161,7 @@ class BitmartFuturesTransactionConservative: 1) {"params": {...}} 2) {...} 直接是参数字典 """ - params_path_env = os.getenv("BITMART_PARAMS_PATH") - params_path = Path(params_path_env).expanduser() if params_path_env else Path(__file__).with_name("current_params.json") + params_path = self.get_runtime_params_path() if not params_path.exists(): logger.info(f"未找到动态参数文件,使用内置保守参数: {params_path}") return @@ -131,6 +178,20 @@ class BitmartFuturesTransactionConservative: "open_breakout_buffer_pct", "enable_shadow_reverse", "shadow_reverse_threshold_pct", + "enable_trend_filter", + "trend_ema_fast", + "trend_ema_slow", + "trend_strength_threshold_pct", + "trend_slope_lookback", + "trend_slope_threshold_pct", + "enable_ai_filter", + "ai_min_confidence", + "ai_bias", + "ai_w_trend_align", + "ai_w_breakout_strength", + "ai_w_entity_strength", + "ai_w_shadow_balance", + "ai_w_volatility_fit", "open_cooldown_seconds", "reverse_cooldown_seconds", "reverse_min_move_pct", @@ -154,7 +215,7 @@ class BitmartFuturesTransactionConservative: continue if key == "leverage": setattr(self, key, str(value)) - elif key == "dynamic_risk_enabled": + elif key in {"dynamic_risk_enabled", "enable_shadow_reverse", "enable_trend_filter", "enable_ai_filter"}: setattr(self, key, self._to_bool(value)) else: setattr(self, key, value) @@ -167,11 +228,108 @@ class BitmartFuturesTransactionConservative: f"BEAct={self.break_even_activation_usd}, BEFloor={self.break_even_floor_usd}, " f"BreakoutBuf={self.open_breakout_buffer_pct}%, " f"OpenCD={self.open_cooldown_seconds}s, ReverseCD={self.reverse_cooldown_seconds}s, " - f"ReverseMove={self.reverse_min_move_pct}%" + f"ReverseMove={self.reverse_min_move_pct}%, " + f"TrendFilter={self.enable_trend_filter}, AIFilter={self.enable_ai_filter}" ) except Exception as e: logger.error(f"加载动态参数文件失败: {e} | path={params_path}") + def run_auto_optimization_before_trade(self): + """ + 启动前自动运行 optimize_params.py,完成后重新加载参数。 + """ + if not self.auto_optimize_before_trade: + logger.info("启动前自动优化已禁用(BITMART_AUTO_OPTIMIZE_BEFORE_TRADE=0)") + return True + if self.auto_optimized_this_run: + return True + + script_path = self.auto_optimize_script_path + if not script_path.exists(): + msg = f"未找到参数优化脚本: {script_path}" + if self.auto_optimize_require_success: + logger.error(msg) + return False + logger.warning(f"{msg},继续使用当前参数") + return True + + output_path = self.get_runtime_params_path() + output_path.parent.mkdir(parents=True, exist_ok=True) + + cmd = [ + sys.executable, + str(script_path), + "--days", + str(self.auto_optimize_days), + "--symbol", + self.contract_symbol, + "--step", + "5", + "--train-days", + str(self.auto_optimize_train_days), + "--valid-days", + str(self.auto_optimize_valid_days), + "--window-step-days", + str(self.auto_optimize_window_step_days), + "--method", + str(self.auto_optimize_method), + "--n-trials", + str(self.auto_optimize_n_trials), + "--valid-score-weight", + str(self.auto_optimize_valid_score_weight), + "--drawdown-guard", + str(self.auto_optimize_drawdown_guard), + "--stability-weight", + str(self.auto_optimize_stability_weight), + "--stress-slippage-multipliers", + str(self.auto_optimize_stress_slippage), + "--stress-fee-multipliers", + str(self.auto_optimize_stress_fee), + "--output", + str(output_path), + ] + if self.auto_optimize_data_file: + cmd.extend(["--data-file", self.auto_optimize_data_file]) + + logger.info("启动前自动参数优化开始(可能耗时较长)") + logger.info(f"优化输出参数文件: {output_path}") + logger.info("优化命令: " + " ".join(cmd)) + + try: + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + if process.stdout is not None: + for line in process.stdout: + text = line.strip() + if text: + logger.info(f"[参数优化] {text}") + return_code = process.wait() + except Exception as e: + if self.auto_optimize_require_success: + logger.error(f"启动前自动优化异常: {e}") + return False + logger.warning(f"启动前自动优化异常: {e},继续使用当前参数") + return True + + if return_code != 0: + if self.auto_optimize_require_success: + logger.error(f"启动前自动优化失败,退出码: {return_code}") + return False + logger.warning(f"启动前自动优化失败(退出码={return_code}),继续使用当前参数") + return True + + self.auto_optimized_this_run = True + self.load_runtime_params() + self.capture_base_risk_params() + self.last_dynamic_risk_kline_id = None + logger.success("启动前自动优化完成,已加载最新参数") + return True + def capture_base_risk_params(self): """记录参数基线,动态风控在此基线之上按波动率缩放。""" self.base_risk_params = { @@ -295,6 +453,172 @@ class BitmartFuturesTransactionConservative: return value.strip().lower() in {"1", "true", "yes", "y", "on"} return False + def _clip(self, value, low, high): + return max(low, min(high, value)) + + def _ema_series(self, closes, period): + period = max(1, int(period)) + if not closes: + return [] + alpha = 2.0 / (period + 1.0) + ema_values = [float(closes[0])] + ema = float(closes[0]) + for close in closes[1:]: + ema = ema + alpha * (float(close) - ema) + ema_values.append(ema) + return ema_values + + def infer_trend_state(self): + """ + 根据缓存K线计算趋势状态。 + 返回: (bias, strength_pct, slope_pct) + bias: bull / bear / neutral + """ + klines = self.recent_klines_cache or [] + min_len = max(int(self.trend_ema_slow) + 2, int(self.trend_slope_lookback) + 3) + if len(klines) < min_len: + return "neutral", 0.0, 0.0 + + closes = [float(k["close"]) for k in klines if k.get("close") is not None] + if len(closes) < min_len: + return "neutral", 0.0, 0.0 + + fast_series = self._ema_series(closes, self.trend_ema_fast) + slow_series = self._ema_series(closes, self.trend_ema_slow) + if not fast_series or not slow_series: + return "neutral", 0.0, 0.0 + + fast = fast_series[-1] + slow = slow_series[-1] + if slow <= 0: + return "neutral", 0.0, 0.0 + + lookback = max(1, int(self.trend_slope_lookback)) + ref_idx = max(0, len(slow_series) - 1 - lookback) + slow_ref = slow_series[ref_idx] + slope_pct = (slow - slow_ref) / slow_ref * 100 if slow_ref > 0 else 0.0 + strength_pct = (fast - slow) / slow * 100 + + strength_th = max(0.0, float(self.trend_strength_threshold_pct)) + slope_th = max(0.0, float(self.trend_slope_threshold_pct)) + bias = "neutral" + if strength_pct >= strength_th and slope_pct >= slope_th: + bias = "bull" + elif strength_pct <= -strength_th and slope_pct <= -slope_th: + bias = "bear" + + return bias, strength_pct, slope_pct + + def get_recent_volatility_pct_from_cache(self, bars=20): + klines = self.recent_klines_cache or [] + bars = max(8, int(bars)) + if len(klines) < bars: + return None + + ranges = [] + for k in klines[-bars:]: + close = float(k["close"]) + if close <= 0: + continue + ranges.append((float(k["high"]) - float(k["low"])) / close * 100) + if not ranges: + return None + return sum(ranges) / len(ranges) + + def estimate_signal_confidence( + self, + side, + current_price, + entry_price, + prev_kline, + trend_bias, + trend_strength_pct, + ): + prev_entity = self.calculate_entity(prev_kline) + prev_close = max(float(prev_kline.get("close") or 0.0), 0.000001) + entity_pct = prev_entity / prev_close * 100.0 + upper_shadow_pct = self.calculate_upper_shadow(prev_kline) + lower_shadow_pct = self.calculate_lower_shadow(prev_kline) + + if entry_price <= 0: + breakout_move_pct = 0.0 + elif side == "long": + breakout_move_pct = max(0.0, (current_price - entry_price) / entry_price * 100.0) + else: + breakout_move_pct = max(0.0, (entry_price - current_price) / entry_price * 100.0) + + if side == "long": + trend_align = 1.0 if trend_bias == "bull" else (-1.0 if trend_bias == "bear" else 0.0) + shadow_balance = lower_shadow_pct - upper_shadow_pct + else: + trend_align = 1.0 if trend_bias == "bear" else (-1.0 if trend_bias == "bull" else 0.0) + shadow_balance = upper_shadow_pct - lower_shadow_pct + + trend_strength_scale = self._clip( + abs(trend_strength_pct) / max(float(self.trend_strength_threshold_pct), 0.01), + 0.0, + 2.5, + ) + trend_component = trend_align * trend_strength_scale + breakout_strength = self._clip(breakout_move_pct / 0.25, 0.0, 2.5) + entity_strength = self._clip(entity_pct / 0.50, 0.0, 2.5) + shadow_component = self._clip(shadow_balance / 0.30, -2.5, 2.5) + + vol_pct = self.get_recent_volatility_pct_from_cache(20) + target_vol = max(float(self.dynamic_vol_target_pct), 0.05) + if vol_pct is None: + vol_fit = 0.0 + else: + vol_fit = 1.0 - abs(vol_pct - target_vol) / (target_vol * 2.0) + vol_fit = self._clip(vol_fit, -1.5, 1.5) + + score = ( + float(self.ai_bias) + + float(self.ai_w_trend_align) * trend_component + + float(self.ai_w_breakout_strength) * breakout_strength + + float(self.ai_w_entity_strength) * entity_strength + + float(self.ai_w_shadow_balance) * shadow_component + + float(self.ai_w_volatility_fit) * vol_fit + ) + score = self._clip(score, -60.0, 60.0) + confidence = 1.0 / (1.0 + math.exp(-score)) + return confidence + + def should_take_open_signal( + self, + side, + current_price, + entry_price, + prev_kline, + trend_bias, + trend_strength_pct, + ): + if self.enable_trend_filter: + if side == "long" and trend_bias == "bear": + logger.info("趋势过滤拒绝做多:当前趋势偏空") + return False + if side == "short" and trend_bias == "bull": + logger.info("趋势过滤拒绝做空:当前趋势偏多") + return False + + if self.enable_ai_filter: + confidence = self.estimate_signal_confidence( + side=side, + current_price=current_price, + entry_price=entry_price, + prev_kline=prev_kline, + trend_bias=trend_bias, + trend_strength_pct=trend_strength_pct, + ) + logger.info( + f"AI信号打分: side={side}, confidence={confidence:.3f}, threshold={self.ai_min_confidence:.3f}" + ) + if confidence < float(self.ai_min_confidence): + logger.info("AI门控拒绝开仓:置信度不足") + return False + + return True + def _extract_price_from_ws_payload(self, payload): """ 尽量从 WS 消息中提取价格,兼容不同字段结构。 @@ -432,11 +756,15 @@ class BitmartFuturesTransactionConservative: """获取最近2根K线(当前K线和上一根K线)""" try: end_time = int(time.time()) + window_hours = max( + 3, + int((max(int(self.dynamic_risk_window_bars), int(self.trend_ema_slow) + int(self.trend_slope_lookback) + 30) * 5) / 60) + 1, + ) # 获取足够多的条目确保有最新的K线 response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=5, # 5分钟 - start_time=end_time - 3600 * 3, # 取最近3小时 + start_time=end_time - 3600 * window_hours, end_time=end_time )[0]["data"] @@ -451,6 +779,7 @@ class BitmartFuturesTransactionConservative: 'close': float(k["close_price"]) }) formatted.sort(key=lambda x: x['id']) + self.recent_klines_cache = formatted # 返回最近2根K线:倒数第二根(上一根)和最后一根(当前) if len(formatted) >= 2: @@ -581,31 +910,49 @@ class BitmartFuturesTransactionConservative: """平仓操作""" self.click_safe('x://span[normalize-space(text()) ="市价"]') + def set_order_size(self, size): + """设置下单数量,确保实盘与参数一致。""" + try: + qty = float(size if size is not None else self.default_order_size) + if qty <= 0: + qty = float(self.default_order_size) + qty_text = str(int(qty)) if float(qty).is_integer() else f"{qty:.4f}".rstrip("0").rstrip(".") + ele = self.page.ele('x://*[@id="size_0"]') + if not ele: + logger.warning("未找到数量输入框,无法设置下单数量") + return False + ele.input(vals=qty_text, clear=True) + return True + except Exception as e: + logger.warning(f"设置下单数量失败: {e}") + return False + def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None): """ marketPriceLongOrder 市价做多或者做空,1是做多,-1是做空 limitPriceShortOrder 限价做多或者做空 """ + size = self.default_order_size if size is None else size if marketPriceLongOrder == -1: - # self.click_safe('x://button[normalize-space(text()) ="市价"]') - # self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.set_order_size(size) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') elif marketPriceLongOrder == 1: - # self.click_safe('x://button[normalize-space(text()) ="市价"]') - # self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.set_order_size(size) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') if limitPriceShortOrder == -1: self.click_safe('x://button[normalize-space(text()) ="限价"]') self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) time.sleep(1) - self.page.ele('x://*[@id="size_0"]').input(1) + self.set_order_size(size) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') elif limitPriceShortOrder == 1: self.click_safe('x://button[normalize-space(text()) ="限价"]') self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) time.sleep(1) - self.page.ele('x://*[@id="size_0"]').input(1) + self.set_order_size(size) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') def ding(self, text, error=False): @@ -716,14 +1063,38 @@ class BitmartFuturesTransactionConservative: if skip_long_by_lower_third: logger.info("上一根阳线+当前阴线(做空形态),不按下四分之一做多") + trend_bias, trend_strength_pct, trend_slope_pct = self.infer_trend_state() + self.last_trend_bias = trend_bias + self.last_trend_strength_pct = trend_strength_pct + self.last_trend_slope_pct = trend_slope_pct + logger.info( + f"趋势状态: bias={trend_bias}, strength={trend_strength_pct:.4f}%, slope={trend_slope_pct:.4f}%" + ) + # 无持仓时检查开仓信号 if self.start == 0: if current_price >= long_entry_price and not skip_long_by_lower_third: - logger.info(f"触发做多信号!价格 {current_price:.2f} >= 保守开仓价 {long_entry_price:.2f}") - return ('long', long_entry_price) + if self.should_take_open_signal( + side="long", + current_price=current_price, + entry_price=long_entry_price, + prev_kline=prev_kline, + trend_bias=trend_bias, + trend_strength_pct=trend_strength_pct, + ): + logger.info(f"触发做多信号!价格 {current_price:.2f} >= 保守开仓价 {long_entry_price:.2f}") + return ('long', long_entry_price) elif current_price <= short_entry_price and not skip_short_by_upper_third: - logger.info(f"触发做空信号!价格 {current_price:.2f} <= 保守开仓价 {short_entry_price:.2f}") - return ('short', short_entry_price) + if self.should_take_open_signal( + side="short", + current_price=current_price, + entry_price=short_entry_price, + prev_kline=prev_kline, + trend_bias=trend_bias, + trend_strength_pct=trend_strength_pct, + ): + logger.info(f"触发做空信号!价格 {current_price:.2f} <= 保守开仓价 {short_entry_price:.2f}") + return ('short', short_entry_price) # 持仓时检查反手信号 elif self.start == 1: # 持多仓 @@ -941,6 +1312,11 @@ class BitmartFuturesTransactionConservative: logger.info("开始运行四分之一策略交易(保守模式)...") + # 启动前先自动优化参数,再进入交易 + if not self.run_auto_optimization_before_trade(): + logger.error("自动参数优化失败,策略停止启动") + return + # 启动时设置全仓高杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,程序继续运行但可能下单失败") @@ -969,8 +1345,7 @@ class BitmartFuturesTransactionConservative: # 进入交易页面 self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") self.click_safe('x://button[normalize-space(text()) ="市价"]') - - self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True) + self.set_order_size(self.default_order_size) page_start = False