From 2f9d589b33b16b76c1b0c9794888a2d2d429aaf6 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Fri, 6 Feb 2026 11:12:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=89=8D=E6=94=B9=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitmart/保守模式参数优化/README.md | 94 +++ bitmart/保守模式参数优化/backtest_engine.py | 653 ++++++++++++++++++ bitmart/保守模式参数优化/current_params.json | 36 + bitmart/保守模式参数优化/optimize_params.py | 492 +++++++++++++ .../四分之一,五分钟,反手条件充足_保守模式.py | 70 ++ 5 files changed, 1345 insertions(+) create mode 100644 bitmart/保守模式参数优化/README.md create mode 100644 bitmart/保守模式参数优化/backtest_engine.py create mode 100644 bitmart/保守模式参数优化/current_params.json create mode 100644 bitmart/保守模式参数优化/optimize_params.py rename bitmart/{ => 保守模式参数优化}/四分之一,五分钟,反手条件充足_保守模式.py (92%) diff --git a/bitmart/保守模式参数优化/README.md b/bitmart/保守模式参数优化/README.md new file mode 100644 index 0000000..b8828f0 --- /dev/null +++ b/bitmart/保守模式参数优化/README.md @@ -0,0 +1,94 @@ +# 保守模式参数优化 + +这个目录包含两部分: +- 实盘脚本:`四分之一,五分钟,反手条件充足_保守模式.py` +- 参数优化:`optimize_params.py` + `backtest_engine.py` + +`current_params.json` 是优化结果文件,实盘脚本启动时会自动读取并覆盖默认参数。 + +## 1. 使用 CSV 做 30 天参数优化(推荐) + +CSV 至少包含: +- `id`(秒级时间戳) +- `open` +- `high` +- `low` +- `close` + +运行示例: + +```bash +python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \ + --data-file "/Users/ddrwode/code/lm_code/bitmart/数据/your_5m_30days.csv" \ + --days 30 \ + --train-days 20 \ + --valid-days 10 \ + --n-trials 300 +``` + +优化完成后会写入: +- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/current_params.json` + +## 2. 不提供 CSV,直接从 API 自动抓取 30 天数据并优化 + +最简单一条命令(会自动抓取并计算): + +```bash +python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \ + --days 30 \ + --step 5 \ + --n-trials 300 +``` + +脚本会按以下顺序找凭证: +- 命令行 `--api-key/--secret-key` +- 环境变量 `BITMART_API_KEY/BITMART_SECRET_KEY` +- 保守模式脚本里的 `self.api_key/self.secret_key` + +抓取到的K线会自动保存为: +- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/auto_ethusdt_5m_30d.csv` + +你也可以显式指定保存位置: + +```bash +BITMART_API_KEY="xxx" BITMART_SECRET_KEY="xxx" \ +python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \ + --days 30 \ + --step 5 \ + --n-trials 300 \ + --save-data-file "/Users/ddrwode/code/lm_code/bitmart/数据/eth_5m_30days.csv" +``` + +## 3. 运行保守模式实盘脚本 + +```bash +python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py" +``` + +如果你想加载其他参数文件: + +```bash +BITMART_PARAMS_PATH="/absolute/path/to/current_params.json" \ +python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py" +``` + +## 4. 费用模型说明 + +优化器会按下面公式计入手续费返佣: + +- `effective_fee_rate = raw_fee_rate * (1 - rebate_ratio)` + +默认: +- `raw_fee_rate = 0.0006` +- `rebate_ratio = 0.90` +- `effective_fee_rate = 0.00006` + +可通过命令行改: +- `--raw-fee-rate` +- `--rebate-ratio` + +## 5. 重要提示 + +- 回测撮合属于简化模型,不等于实盘撮合。 +- 参数应周期性重训(例如每天或每周)。 +- 若出现交易次数过低,适当降低 `open_breakout_buffer_pct` 或冷却时间。 diff --git a/bitmart/保守模式参数优化/backtest_engine.py b/bitmart/保守模式参数优化/backtest_engine.py new file mode 100644 index 0000000..a84fc80 --- /dev/null +++ b/bitmart/保守模式参数优化/backtest_engine.py @@ -0,0 +1,653 @@ +from __future__ import annotations + +import csv +import time +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from pathlib import Path +from statistics import mean +from typing import Any, Dict, List, Optional, Sequence, Tuple + + +DEFAULT_PARAMS: Dict[str, Any] = { + "leverage": "20", + "open_type": "cross", + "default_order_size": 10, + "take_profit_usd": 3.0, + "stop_loss_usd": -2.0, + "trailing_activation_usd": 1.2, + "trailing_distance_usd": 0.6, + "break_even_activation_usd": 1.0, + "break_even_floor_usd": 0.2, + "open_breakout_buffer_pct": 0.03, + "enable_shadow_reverse": False, + "shadow_reverse_threshold_pct": 0.15, + "open_cooldown_seconds": 300, + "reverse_cooldown_seconds": 300, + "reverse_min_move_pct": 0.15, + # Backtest-only calibration parameters (not used by live runtime script) + "order_notional_usd": 100.0, + "pnl_per_usd_move": 1.0, + "slippage_pct": 0.01, +} + + +def _as_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except Exception: + return default + + +def _as_bool(value: Any) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + return value.strip().lower() in {"1", "true", "yes", "y", "on"} + return False + + +def _parse_timestamp(value: Any) -> int: + text = str(value).strip() + if not text: + raise ValueError("empty timestamp") + + if text.isdigit() or (text.startswith("-") and text[1:].isdigit()): + ts = int(text) + if ts > 10**12: + ts //= 1000 + return ts + + try: + ts = int(float(text)) + if ts > 10**12: + ts //= 1000 + return ts + except ValueError: + pass + + dt_text = text.replace("Z", "+00:00") + dt = datetime.fromisoformat(dt_text) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return int(dt.timestamp()) + + +def load_klines_from_csv(csv_path: str | Path, last_n_days: Optional[int] = 30) -> List[Dict[str, float]]: + """ + Load kline data from CSV. Supports these field names (case-insensitive): + - timestamp: id / timestamp / time / ts / datetime / date + - open: open / open_price + - high: high / high_price + - low: low / low_price + - close: close / close_price + - volume (optional): volume / vol + """ + path = Path(csv_path) + if not path.exists(): + raise FileNotFoundError(f"CSV not found: {path}") + + records: Dict[int, Dict[str, float]] = {} + with path.open("r", encoding="utf-8") as f: + reader = csv.DictReader(f) + if not reader.fieldnames: + raise ValueError(f"CSV has no header: {path}") + + field_map = {name.strip().lower(): name for name in reader.fieldnames if name} + + def find_key(candidates: Sequence[str]) -> Optional[str]: + for candidate in candidates: + if candidate in field_map: + return field_map[candidate] + return None + + ts_key = find_key(("id", "timestamp", "time", "ts", "datetime", "date")) + open_key = find_key(("open", "open_price")) + high_key = find_key(("high", "high_price")) + low_key = find_key(("low", "low_price")) + close_key = find_key(("close", "close_price")) + volume_key = find_key(("volume", "vol")) + + if not ts_key or not open_key or not high_key or not low_key or not close_key: + raise ValueError( + "CSV is missing required columns. Need timestamp/open/high/low/close (or aliases)." + ) + + for row in reader: + try: + ts = _parse_timestamp(row.get(ts_key, "")) + kline = { + "id": ts, + "open": _as_float(row.get(open_key), 0.0), + "high": _as_float(row.get(high_key), 0.0), + "low": _as_float(row.get(low_key), 0.0), + "close": _as_float(row.get(close_key), 0.0), + } + if volume_key and row.get(volume_key) not in (None, ""): + kline["volume"] = _as_float(row.get(volume_key), 0.0) + records[ts] = kline + except Exception: + continue + + klines = sorted(records.values(), key=lambda x: x["id"]) + if last_n_days is not None and klines: + cutoff = klines[-1]["id"] - int(last_n_days * 24 * 3600) + klines = [k for k in klines if k["id"] >= cutoff] + return klines + + +def save_klines_to_csv(klines: Sequence[Dict[str, float]], output_path: str | Path) -> Path: + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8", newline="") as f: + writer = csv.writer(f) + writer.writerow(["id", "open", "high", "low", "close", "volume"]) + for k in klines: + writer.writerow([ + int(k["id"]), + k["open"], + k["high"], + k["low"], + k["close"], + k.get("volume", ""), + ]) + return path + + +def fetch_klines_from_api( + contract_api: Any, + contract_symbol: str = "ETHUSDT", + step: int = 5, + days: int = 30, + batch_hours: int = 12, + sleep_seconds: float = 0.05, +) -> List[Dict[str, float]]: + """Fetch recent kline data from Bitmart APIContract-compatible client.""" + end_ts = int(time.time()) + start_ts = end_ts - days * 24 * 3600 + cursor = start_ts + all_rows: Dict[int, Dict[str, float]] = {} + + while cursor < end_ts: + batch_end = min(cursor + batch_hours * 3600, end_ts) + response = contract_api.get_kline( + contract_symbol=contract_symbol, + step=step, + start_time=cursor, + end_time=batch_end, + )[0] + if response.get("code") != 1000: + cursor = batch_end + 1 + continue + + for item in response.get("data", []): + try: + ts = int(item["timestamp"]) + all_rows[ts] = { + "id": ts, + "open": float(item["open_price"]), + "high": float(item["high_price"]), + "low": float(item["low_price"]), + "close": float(item["close_price"]), + "volume": float(item.get("volume", 0) or 0), + } + except Exception: + continue + + cursor = batch_end + 1 + if sleep_seconds > 0: + time.sleep(sleep_seconds) + + return sorted(all_rows.values(), key=lambda x: x["id"]) + + +def split_rolling_windows( + klines: Sequence[Dict[str, float]], + train_days: int = 20, + valid_days: int = 10, + step_days: int = 5, +) -> List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]]: + """Build rolling (train, valid) windows by timestamp ranges.""" + if not klines: + return [] + + train_secs = train_days * 24 * 3600 + valid_secs = valid_days * 24 * 3600 + step_secs = max(1, step_days) * 24 * 3600 + + first_ts = int(klines[0]["id"]) + last_ts = int(klines[-1]["id"]) + required_span = train_secs + valid_secs + + windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]] = [] + + def slice_range(start_ts: int, end_ts: int) -> List[Dict[str, float]]: + return [k for k in klines if start_ts <= int(k["id"]) < end_ts] + + cursor = first_ts + while cursor + required_span <= last_ts: + train_start = cursor + train_end = train_start + train_secs + valid_end = train_end + valid_secs + + train_part = slice_range(train_start, train_end) + valid_part = slice_range(train_end, valid_end) + if len(train_part) >= 50 and len(valid_part) >= 30: + windows.append((train_part, valid_part)) + + cursor += step_secs + + if not windows and (last_ts - first_ts) >= required_span: + valid_end = last_ts + valid_start = valid_end - valid_secs + train_start = valid_start - train_secs + train_part = slice_range(train_start, valid_start) + valid_part = slice_range(valid_start, valid_end) + if len(train_part) >= 50 and len(valid_part) >= 30: + windows.append((train_part, valid_part)) + + return windows + + +@dataclass +class BacktestResult: + net_pnl: float + gross_pnl: float + total_fees: float + trades: int + win_rate: float + max_drawdown: float + loss_days: int + start_ts: int + end_ts: int + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class ConservativeBacktester: + """Backtester for the conservative quarter-breakout + reverse strategy.""" + + def __init__( + self, + params: Optional[Dict[str, Any]] = None, + raw_fee_rate: float = 0.0006, + rebate_ratio: float = 0.90, + ) -> None: + self.params: Dict[str, Any] = dict(DEFAULT_PARAMS) + if params: + self.params.update(params) + + self.raw_fee_rate = float(raw_fee_rate) + self.rebate_ratio = float(rebate_ratio) + self.effective_fee_rate = self.raw_fee_rate * (1.0 - self.rebate_ratio) + + size_scale = max(_as_float(self.params.get("default_order_size", 10), 10.0) / 10.0, 0.05) + self.order_notional_usd = _as_float(self.params.get("order_notional_usd", 100.0), 100.0) * size_scale + self.pnl_per_usd_move = _as_float(self.params.get("pnl_per_usd_move", 1.0), 1.0) * size_scale + self.slippage_pct = _as_float(self.params.get("slippage_pct", 0.01), 0.01) + + @staticmethod + def _entity_size(kline: Dict[str, float]) -> float: + return abs(kline["close"] - kline["open"]) + + @staticmethod + def _entity_edges(kline: Dict[str, float]) -> Tuple[float, float]: + upper = max(kline["open"], kline["close"]) + lower = min(kline["open"], kline["close"]) + return upper, lower + + @staticmethod + def _upper_shadow_pct(kline: Dict[str, float]) -> float: + body_top = max(kline["open"], kline["close"]) + if body_top <= 0: + return 0.0 + return (kline["high"] - body_top) / body_top * 100.0 + + @staticmethod + def _lower_shadow_pct(kline: Dict[str, float]) -> float: + body_bottom = min(kline["open"], kline["close"]) + if body_bottom <= 0: + return 0.0 + return (body_bottom - kline["low"]) / body_bottom * 100.0 + + def _calc_fee(self) -> float: + return self.order_notional_usd * self.effective_fee_rate + + def _apply_slippage(self, price: float, is_buy: bool) -> float: + if self.slippage_pct <= 0: + return price + shift = self.slippage_pct / 100.0 + return price * (1.0 + shift) if is_buy else price * (1.0 - shift) + + def _calc_levels(self, prev_kline: Dict[str, float], current_kline: Dict[str, float]) -> Optional[Dict[str, float]]: + prev_entity = self._entity_size(prev_kline) + if prev_entity < 0.1: + return None + + prev_upper, prev_lower = self._entity_edges(prev_kline) + + prev_bull_for_calc = prev_kline["close"] > prev_kline["open"] + prev_bear_for_calc = prev_kline["close"] < prev_kline["open"] + current_open_above_prev_close = current_kline["open"] > prev_kline["close"] + current_open_below_prev_close = current_kline["open"] < prev_kline["close"] + use_current_open_as_base = ( + (prev_bull_for_calc and current_open_above_prev_close) + or (prev_bear_for_calc and current_open_below_prev_close) + ) + + if use_current_open_as_base: + base = current_kline["open"] + long_trigger = base + prev_entity / 4.0 + short_trigger = base - prev_entity / 4.0 + long_breakout = base + prev_entity / 4.0 + short_breakout = base - prev_entity / 4.0 + else: + long_trigger = prev_lower + prev_entity / 4.0 + short_trigger = prev_upper - prev_entity / 4.0 + long_breakout = prev_upper + prev_entity / 4.0 + short_breakout = prev_lower - prev_entity / 4.0 + + open_buffer_pct = _as_float(self.params.get("open_breakout_buffer_pct", 0.03), 0.03) + long_entry_price = long_breakout * (1.0 + open_buffer_pct / 100.0) + short_entry_price = short_breakout * (1.0 - open_buffer_pct / 100.0) + + prev_bearish = prev_kline["close"] < prev_kline["open"] + current_bullish = current_kline["close"] > current_kline["open"] + skip_short_by_upper_third = prev_bearish and current_bullish + + prev_bullish = prev_kline["close"] > prev_kline["open"] + current_bearish = current_kline["close"] < current_kline["open"] + skip_long_by_lower_third = prev_bullish and current_bearish + + return { + "prev_entity_upper": prev_upper, + "prev_entity_lower": prev_lower, + "long_trigger": long_trigger, + "short_trigger": short_trigger, + "long_entry_price": long_entry_price, + "short_entry_price": short_entry_price, + "skip_short_by_upper_third": float(skip_short_by_upper_third), + "skip_long_by_lower_third": float(skip_long_by_lower_third), + "upper_shadow_pct": self._upper_shadow_pct(prev_kline), + "lower_shadow_pct": self._lower_shadow_pct(prev_kline), + } + + def run(self, klines: Sequence[Dict[str, float]]) -> BacktestResult: + if len(klines) < 3: + start_ts = int(klines[0]["id"]) if klines else 0 + end_ts = int(klines[-1]["id"]) if klines else 0 + return BacktestResult( + net_pnl=0.0, + gross_pnl=0.0, + total_fees=0.0, + trades=0, + win_rate=0.0, + max_drawdown=0.0, + loss_days=0, + start_ts=start_ts, + end_ts=end_ts, + ) + + open_cooldown = int(_as_float(self.params.get("open_cooldown_seconds", 300), 300)) + reverse_cooldown = int(_as_float(self.params.get("reverse_cooldown_seconds", 300), 300)) + reverse_min_move_pct = _as_float(self.params.get("reverse_min_move_pct", 0.15), 0.15) + + take_profit = _as_float(self.params.get("take_profit_usd", 3.0), 3.0) + stop_loss = _as_float(self.params.get("stop_loss_usd", -2.0), -2.0) + trailing_activation = _as_float(self.params.get("trailing_activation_usd", 1.2), 1.2) + trailing_distance = _as_float(self.params.get("trailing_distance_usd", 0.6), 0.6) + be_activation = _as_float(self.params.get("break_even_activation_usd", 1.0), 1.0) + be_floor = _as_float(self.params.get("break_even_floor_usd", 0.2), 0.2) + + enable_shadow_reverse = _as_bool(self.params.get("enable_shadow_reverse", False)) + shadow_reverse_threshold = _as_float(self.params.get("shadow_reverse_threshold_pct", 0.15), 0.15) + + position = 0 # 1 long, -1 short, 0 flat + entry_price = 0.0 + entry_ts = 0 + current_trade_open_fee = 0.0 + + last_open_ts: Optional[int] = None + last_open_kline_id: Optional[int] = None + last_reverse_ts: Optional[int] = None + + max_unrealized_pnl_seen: Optional[float] = None + + gross_pnl = 0.0 + total_fees = 0.0 + equity = 0.0 + + peak_equity = 0.0 + max_drawdown = 0.0 + + trades = 0 + wins = 0 + daily_pnl: Dict[str, float] = {} + + def mark_to_market(mark_price: float) -> float: + if position == 0: + return 0.0 + return (mark_price - entry_price) * position * self.pnl_per_usd_move + + def update_drawdown(mark_price: float) -> None: + nonlocal peak_equity, max_drawdown + mtm = mark_to_market(mark_price) + equity_mtm = equity + mtm + if equity_mtm > peak_equity: + peak_equity = equity_mtm + drawdown = peak_equity - equity_mtm + if drawdown > max_drawdown: + max_drawdown = drawdown + + def can_open(current_kline_id: int) -> bool: + if last_open_kline_id is not None and current_kline_id == last_open_kline_id: + return False + if last_open_ts is not None and current_kline_id - last_open_ts < open_cooldown: + return False + return True + + def can_reverse(current_price: float, trigger_price: float, now_ts: int) -> bool: + if last_reverse_ts is not None and now_ts - last_reverse_ts < reverse_cooldown: + return False + if trigger_price > 0: + move_pct = abs(current_price - trigger_price) / trigger_price * 100.0 + if move_pct < reverse_min_move_pct: + return False + return True + + def open_position(direction: int, raw_price: float, now_ts: int) -> None: + nonlocal position, entry_price, entry_ts + nonlocal last_open_ts, last_open_kline_id + nonlocal total_fees, equity, current_trade_open_fee + nonlocal max_unrealized_pnl_seen + + is_buy = direction == 1 + fill_price = self._apply_slippage(raw_price, is_buy=is_buy) + fee = self._calc_fee() + + position = direction + entry_price = fill_price + entry_ts = now_ts + last_open_ts = now_ts + last_open_kline_id = now_ts + max_unrealized_pnl_seen = 0.0 + + total_fees += fee + equity -= fee + current_trade_open_fee = fee + + def close_position(raw_price: float, now_ts: int) -> float: + nonlocal position, entry_price, entry_ts + nonlocal total_fees, equity, gross_pnl, trades, wins + nonlocal current_trade_open_fee, max_unrealized_pnl_seen + + if position == 0: + return 0.0 + + is_buy = position == -1 + fill_price = self._apply_slippage(raw_price, is_buy=is_buy) + + realized = (fill_price - entry_price) * position * self.pnl_per_usd_move + close_fee = self._calc_fee() + trade_net = realized - close_fee - current_trade_open_fee + + gross_pnl += realized + total_fees += close_fee + equity += realized - close_fee + + day_key = datetime.utcfromtimestamp(now_ts).date().isoformat() + daily_pnl[day_key] = daily_pnl.get(day_key, 0.0) + trade_net + + trades += 1 + if trade_net > 0: + wins += 1 + + position = 0 + entry_price = 0.0 + entry_ts = 0 + current_trade_open_fee = 0.0 + max_unrealized_pnl_seen = None + return trade_net + + for idx in range(1, len(klines)): + prev_kline = klines[idx - 1] + current_kline = klines[idx] + current_ts = int(current_kline["id"]) + + levels = self._calc_levels(prev_kline, current_kline) + if not levels: + update_drawdown(current_kline["close"]) + continue + + skip_short = bool(levels["skip_short_by_upper_third"]) + skip_long = bool(levels["skip_long_by_lower_third"]) + + # Risk controls come first, matching live behavior. + if position != 0: + pnl_close = mark_to_market(current_kline["close"]) + if max_unrealized_pnl_seen is None: + max_unrealized_pnl_seen = pnl_close + else: + max_unrealized_pnl_seen = max(max_unrealized_pnl_seen, pnl_close) + + should_close = False + if pnl_close <= stop_loss: + should_close = True + elif max_unrealized_pnl_seen >= be_activation and pnl_close <= be_floor: + should_close = True + elif max_unrealized_pnl_seen >= trailing_activation and pnl_close < max_unrealized_pnl_seen - trailing_distance: + should_close = True + elif pnl_close >= take_profit: + should_close = True + + if should_close: + close_position(current_kline["close"], current_ts) + update_drawdown(current_kline["close"]) + continue + + signal_type: Optional[str] = None + signal_price = 0.0 + + if position == 0: + long_hit = current_kline["high"] >= levels["long_entry_price"] and not skip_long + short_hit = current_kline["low"] <= levels["short_entry_price"] and not skip_short + + # Conservative tie-break: if both sides touched in one bar, skip. + if long_hit and not short_hit: + signal_type = "long" + signal_price = levels["long_entry_price"] + elif short_hit and not long_hit: + signal_type = "short" + signal_price = levels["short_entry_price"] + + if signal_type and can_open(current_ts): + open_position(1 if signal_type == "long" else -1, signal_price, current_ts) + + elif position == 1: + reverse_hit = current_kline["low"] <= levels["short_trigger"] and not skip_short + shadow_hit = ( + enable_shadow_reverse + and levels["upper_shadow_pct"] > shadow_reverse_threshold + and current_kline["low"] <= levels["prev_entity_lower"] + ) + if reverse_hit or shadow_hit: + trigger = levels["short_trigger"] if reverse_hit else levels["prev_entity_lower"] + if can_reverse(current_kline["close"], trigger, current_ts): + close_position(trigger, current_ts) + open_position(-1, trigger, current_ts) + last_reverse_ts = current_ts + + elif position == -1: + reverse_hit = current_kline["high"] >= levels["long_trigger"] and not skip_long + shadow_hit = ( + enable_shadow_reverse + and levels["lower_shadow_pct"] > shadow_reverse_threshold + and current_kline["high"] >= levels["prev_entity_upper"] + ) + if reverse_hit or shadow_hit: + trigger = levels["long_trigger"] if reverse_hit else levels["prev_entity_upper"] + if can_reverse(current_kline["close"], trigger, current_ts): + close_position(trigger, current_ts) + open_position(1, trigger, current_ts) + last_reverse_ts = current_ts + + update_drawdown(current_kline["close"]) + + if position != 0: + last_bar = klines[-1] + close_position(last_bar["close"], int(last_bar["id"])) + update_drawdown(last_bar["close"]) + + win_rate = (wins / trades * 100.0) if trades > 0 else 0.0 + loss_days = sum(1 for pnl in daily_pnl.values() if pnl < 0) + + return BacktestResult( + net_pnl=equity, + gross_pnl=gross_pnl, + total_fees=total_fees, + trades=trades, + win_rate=win_rate, + max_drawdown=max_drawdown, + loss_days=loss_days, + start_ts=int(klines[0]["id"]), + end_ts=int(klines[-1]["id"]), + ) + + +def score_result( + result: BacktestResult, + drawdown_weight: float = 1.4, + loss_day_weight: float = 0.8, + min_trades: int = 6, + undertrade_penalty: float = 0.5, +) -> float: + score = result.net_pnl - drawdown_weight * result.max_drawdown - loss_day_weight * result.loss_days + if result.trades < min_trades: + score -= (min_trades - result.trades) * undertrade_penalty + return score + + +def aggregate_results(results: Sequence[BacktestResult]) -> Dict[str, float]: + if not results: + return { + "net_pnl": 0.0, + "gross_pnl": 0.0, + "total_fees": 0.0, + "trades": 0.0, + "win_rate": 0.0, + "max_drawdown": 0.0, + "loss_days": 0.0, + } + + return { + "net_pnl": mean([r.net_pnl for r in results]), + "gross_pnl": mean([r.gross_pnl for r in results]), + "total_fees": mean([r.total_fees for r in results]), + "trades": mean([r.trades for r in results]), + "win_rate": mean([r.win_rate for r in results]), + "max_drawdown": mean([r.max_drawdown for r in results]), + "loss_days": mean([r.loss_days for r in results]), + } diff --git a/bitmart/保守模式参数优化/current_params.json b/bitmart/保守模式参数优化/current_params.json new file mode 100644 index 0000000..30ae203 --- /dev/null +++ b/bitmart/保守模式参数优化/current_params.json @@ -0,0 +1,36 @@ +{ + "updated_at": "2026-02-06T00:00:00+00:00", + "source": { + "type": "manual_default" + }, + "optimization": { + "method": "manual_default", + "n_trials": 0, + "windows": 0 + }, + "fee_model": { + "raw_fee_rate": 0.0006, + "rebate_ratio": 0.9, + "effective_fee_rate": 0.00006 + }, + "score": 0, + "params": { + "leverage": "20", + "take_profit_usd": 3.0, + "stop_loss_usd": -2.0, + "trailing_activation_usd": 1.2, + "trailing_distance_usd": 0.6, + "break_even_activation_usd": 1.0, + "break_even_floor_usd": 0.2, + "default_order_size": 10, + "open_breakout_buffer_pct": 0.03, + "enable_shadow_reverse": false, + "shadow_reverse_threshold_pct": 0.15, + "open_cooldown_seconds": 300, + "reverse_cooldown_seconds": 300, + "reverse_min_move_pct": 0.15, + "order_notional_usd": 100.0, + "pnl_per_usd_move": 1.0, + "slippage_pct": 0.01 + } +} diff --git a/bitmart/保守模式参数优化/optimize_params.py b/bitmart/保守模式参数优化/optimize_params.py new file mode 100644 index 0000000..c6ee50c --- /dev/null +++ b/bitmart/保守模式参数优化/optimize_params.py @@ -0,0 +1,492 @@ +from __future__ import annotations + +import argparse +import json +import os +import random +import re +import sys +from datetime import datetime, timezone +from pathlib import Path +from statistics import mean, pstdev +from typing import Any, Dict, List, Optional, Tuple + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from backtest_engine import ( + DEFAULT_PARAMS, + ConservativeBacktester, + aggregate_results, + fetch_klines_from_api, + load_klines_from_csv, + save_klines_to_csv, + score_result, + split_rolling_windows, +) + + +STRATEGY_FILENAME = "四分之一,五分钟,反手条件充足_保守模式.py" + + +def _as_float(value: Any, default: float) -> float: + try: + return float(value) + except Exception: + return default + + +def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]: + p = dict(params) + + p["take_profit_usd"] = round(max(_as_float(p.get("take_profit_usd", 3.0), 3.0), 0.5), 4) + p["stop_loss_usd"] = round(min(_as_float(p.get("stop_loss_usd", -2.0), -2.0), -0.1), 4) + + p["trailing_activation_usd"] = round(max(_as_float(p.get("trailing_activation_usd", 1.2), 1.2), 0.2), 4) + p["trailing_distance_usd"] = round(max(_as_float(p.get("trailing_distance_usd", 0.6), 0.6), 0.1), 4) + + p["break_even_activation_usd"] = round(max(_as_float(p.get("break_even_activation_usd", 1.0), 1.0), 0.2), 4) + p["break_even_floor_usd"] = round(max(_as_float(p.get("break_even_floor_usd", 0.2), 0.2), 0.0), 4) + + p["open_breakout_buffer_pct"] = round(max(_as_float(p.get("open_breakout_buffer_pct", 0.03), 0.03), 0.0), 4) + p["shadow_reverse_threshold_pct"] = round(max(_as_float(p.get("shadow_reverse_threshold_pct", 0.15), 0.15), 0.01), 4) + p["reverse_min_move_pct"] = round(max(_as_float(p.get("reverse_min_move_pct", 0.15), 0.15), 0.01), 4) + + p["open_cooldown_seconds"] = int(max(_as_float(p.get("open_cooldown_seconds", 300), 300), 1)) + p["reverse_cooldown_seconds"] = int(max(_as_float(p.get("reverse_cooldown_seconds", 300), 300), 1)) + + p["default_order_size"] = int(max(_as_float(p.get("default_order_size", 10), 10), 1)) + p["order_notional_usd"] = round(max(_as_float(p.get("order_notional_usd", 100.0), 100.0), 5.0), 4) + p["pnl_per_usd_move"] = round(max(_as_float(p.get("pnl_per_usd_move", 1.0), 1.0), 0.01), 4) + p["slippage_pct"] = round(max(_as_float(p.get("slippage_pct", 0.01), 0.01), 0.0), 5) + + p["enable_shadow_reverse"] = bool(p.get("enable_shadow_reverse", False)) + + # Keep relationships sane. + if p["trailing_activation_usd"] <= p["break_even_activation_usd"]: + p["trailing_activation_usd"] = round(p["break_even_activation_usd"] + 0.2, 4) + + if p["take_profit_usd"] <= p["trailing_activation_usd"]: + p["take_profit_usd"] = round(p["trailing_activation_usd"] + 0.4, 4) + + max_floor = p["break_even_activation_usd"] * 0.9 + if p["break_even_floor_usd"] > max_floor: + p["break_even_floor_usd"] = round(max_floor, 4) + + if p["trailing_distance_usd"] >= p["trailing_activation_usd"]: + p["trailing_distance_usd"] = round(max(0.1, p["trailing_activation_usd"] * 0.6), 4) + + return p + + +def load_credentials_from_strategy_file(strategy_path: Path) -> Dict[str, str]: + """ + 从保守模式策略文件里提取 self.api_key/self.secret_key/self.memo,作为自动抓取兜底。 + """ + if not strategy_path.exists(): + return {} + + try: + content = strategy_path.read_text(encoding="utf-8") + except Exception: + return {} + + def extract(name: str) -> str: + pattern = rf'self\.{name}\s*=\s*["\']([^"\']+)["\']' + m = re.search(pattern, content) + return m.group(1).strip() if m else "" + + api_key = extract("api_key") + secret_key = extract("secret_key") + memo = extract("memo") + + result: Dict[str, str] = {} + if api_key: + result["api_key"] = api_key + if secret_key: + result["secret_key"] = secret_key + if memo: + result["memo"] = memo + return result + + +def sample_random_params(rng: random.Random) -> Dict[str, Any]: + params = dict(DEFAULT_PARAMS) + params.update( + { + "take_profit_usd": rng.uniform(1.2, 6.0), + "stop_loss_usd": -rng.uniform(0.8, 4.0), + "trailing_activation_usd": rng.uniform(0.5, 3.0), + "trailing_distance_usd": rng.uniform(0.2, 1.8), + "break_even_activation_usd": rng.uniform(0.3, 2.2), + "break_even_floor_usd": rng.uniform(0.0, 1.0), + "open_breakout_buffer_pct": rng.uniform(0.0, 0.25), + "reverse_min_move_pct": rng.uniform(0.05, 0.5), + "open_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]), + "reverse_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]), + "enable_shadow_reverse": rng.random() < 0.15, + "shadow_reverse_threshold_pct": rng.uniform(0.08, 0.5), + "slippage_pct": rng.uniform(0.005, 0.05), + } + ) + return normalize_params(params) + + +def evaluate_candidate( + params: Dict[str, Any], + windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]], + raw_fee_rate: float, + rebate_ratio: float, + drawdown_weight: float, + loss_day_weight: float, + min_trades: int, + undertrade_penalty: float, + valid_score_weight: float, + drawdown_guard: float, + stability_weight: float, +) -> Tuple[float, Dict[str, float], Dict[str, float]]: + train_results = [] + valid_results = [] + window_scores = [] + + for train_data, valid_data in windows: + backtester = ConservativeBacktester(params=params, raw_fee_rate=raw_fee_rate, rebate_ratio=rebate_ratio) + + train_res = backtester.run(train_data) + valid_res = backtester.run(valid_data) + + train_results.append(train_res) + valid_results.append(valid_res) + + train_score = score_result( + train_res, + drawdown_weight=drawdown_weight, + loss_day_weight=loss_day_weight, + min_trades=min_trades, + undertrade_penalty=undertrade_penalty, + ) + valid_score = score_result( + valid_res, + drawdown_weight=drawdown_weight, + loss_day_weight=loss_day_weight, + min_trades=min_trades, + undertrade_penalty=undertrade_penalty, + ) + + combined = valid_score_weight * valid_score + (1.0 - valid_score_weight) * train_score + + # Hard guard against large validation drawdown. + if valid_res.max_drawdown > drawdown_guard: + combined -= (valid_res.max_drawdown - drawdown_guard) * 2.0 + + window_scores.append(combined) + + mean_score = mean(window_scores) if window_scores else -1e9 + + if len(valid_results) > 1: + stability_penalty = pstdev([r.net_pnl for r in valid_results]) * stability_weight + mean_score -= stability_penalty + + train_agg = aggregate_results(train_results) + valid_agg = aggregate_results(valid_results) + return mean_score, train_agg, valid_agg + + +def optimize_with_optuna( + windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]], + args: argparse.Namespace, +) -> Dict[str, Any]: + import optuna # type: ignore + + sampler = optuna.samplers.TPESampler(seed=args.seed) + study = optuna.create_study(direction="maximize", sampler=sampler) + + best: Dict[str, Any] = {} + + def objective(trial: Any) -> float: + params = dict(DEFAULT_PARAMS) + params.update( + { + "take_profit_usd": trial.suggest_float("take_profit_usd", 1.2, 6.0), + "stop_loss_usd": -trial.suggest_float("stop_loss_abs", 0.8, 4.0), + "trailing_activation_usd": trial.suggest_float("trailing_activation_usd", 0.5, 3.0), + "trailing_distance_usd": trial.suggest_float("trailing_distance_usd", 0.2, 1.8), + "break_even_activation_usd": trial.suggest_float("break_even_activation_usd", 0.3, 2.2), + "break_even_floor_usd": trial.suggest_float("break_even_floor_usd", 0.0, 1.0), + "open_breakout_buffer_pct": trial.suggest_float("open_breakout_buffer_pct", 0.0, 0.25), + "reverse_min_move_pct": trial.suggest_float("reverse_min_move_pct", 0.05, 0.5), + "open_cooldown_seconds": trial.suggest_int("open_cooldown_seconds", 60, 900, step=60), + "reverse_cooldown_seconds": trial.suggest_int("reverse_cooldown_seconds", 60, 900, step=60), + "enable_shadow_reverse": trial.suggest_categorical("enable_shadow_reverse", [False, True]), + "shadow_reverse_threshold_pct": trial.suggest_float("shadow_reverse_threshold_pct", 0.08, 0.5), + "slippage_pct": trial.suggest_float("slippage_pct", 0.005, 0.05), + } + ) + params = normalize_params(params) + + score, train_agg, valid_agg = evaluate_candidate( + params=params, + windows=windows, + raw_fee_rate=args.raw_fee_rate, + rebate_ratio=args.rebate_ratio, + drawdown_weight=args.drawdown_weight, + loss_day_weight=args.loss_day_weight, + min_trades=args.min_trades, + undertrade_penalty=args.undertrade_penalty, + valid_score_weight=args.valid_score_weight, + drawdown_guard=args.drawdown_guard, + stability_weight=args.stability_weight, + ) + + trial.set_user_attr("train_agg", train_agg) + trial.set_user_attr("valid_agg", valid_agg) + + nonlocal best + if not best or score > best["score"]: + best = { + "score": score, + "params": params, + "train_agg": train_agg, + "valid_agg": valid_agg, + } + return score + + study.optimize(objective, n_trials=args.n_trials, show_progress_bar=False) + if best: + return best + + trial = study.best_trial + return { + "score": trial.value, + "params": normalize_params(dict(DEFAULT_PARAMS)), + "train_agg": trial.user_attrs.get("train_agg", {}), + "valid_agg": trial.user_attrs.get("valid_agg", {}), + } + + +def optimize_with_random( + windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]], + args: argparse.Namespace, +) -> Dict[str, Any]: + rng = random.Random(args.seed) + best: Optional[Dict[str, Any]] = None + + for i in range(args.n_trials): + params = sample_random_params(rng) + score, train_agg, valid_agg = evaluate_candidate( + params=params, + windows=windows, + raw_fee_rate=args.raw_fee_rate, + rebate_ratio=args.rebate_ratio, + drawdown_weight=args.drawdown_weight, + loss_day_weight=args.loss_day_weight, + min_trades=args.min_trades, + undertrade_penalty=args.undertrade_penalty, + valid_score_weight=args.valid_score_weight, + drawdown_guard=args.drawdown_guard, + stability_weight=args.stability_weight, + ) + + if best is None or score > best["score"]: + best = { + "score": score, + "params": params, + "train_agg": train_agg, + "valid_agg": valid_agg, + "trial": i + 1, + } + + if (i + 1) % 20 == 0: + print(f"[random] trial {i + 1}/{args.n_trials}, best_score={best['score']:.6f}") + + if best is None: + raise RuntimeError("random optimization failed: no trial executed") + return best + + +def load_klines(args: argparse.Namespace) -> Tuple[List[Dict[str, float]], Dict[str, Any]]: + if args.data_file: + csv_path = Path(args.data_file).expanduser() + klines = load_klines_from_csv(csv_path, last_n_days=args.days) + source_meta = {"type": "csv", "path": str(csv_path)} + return klines, source_meta + + api_key = args.api_key or os.getenv("BITMART_API_KEY") + secret_key = args.secret_key or os.getenv("BITMART_SECRET_KEY") + memo = args.memo or os.getenv("BITMART_MEMO", "参数优化") + credential_source = "args_or_env" + + if not api_key or not secret_key: + strategy_path = Path(__file__).with_name(STRATEGY_FILENAME) + extracted = load_credentials_from_strategy_file(strategy_path) + if extracted: + api_key = api_key or extracted.get("api_key", "") + secret_key = secret_key or extracted.get("secret_key", "") + memo = memo or extracted.get("memo", "参数优化") + credential_source = f"strategy_file:{strategy_path.name}" + + if not api_key or not secret_key: + raise ValueError( + "No --data-file provided and API credentials missing. " + "Provide --api-key/--secret-key or env BITMART_API_KEY/BITMART_SECRET_KEY. " + f"Fallback strategy file attempted: {STRATEGY_FILENAME}" + ) + + from bitmart.api_contract import APIContract + + print(f"Using API credentials source: {credential_source}") + + client = APIContract(api_key, secret_key, memo, timeout=(5, 15)) + klines = fetch_klines_from_api( + contract_api=client, + contract_symbol=args.symbol, + step=args.step, + days=args.days, + batch_hours=args.batch_hours, + sleep_seconds=args.api_sleep, + ) + + source_meta = { + "type": "api", + "symbol": args.symbol, + "step": args.step, + "days": args.days, + "credential_source": credential_source, + } + + save_path = args.save_data_file + if not save_path: + save_path = str(Path(__file__).with_name(f"auto_{args.symbol.lower()}_{args.step}m_{args.days}d.csv")) + + if save_path: + output_path = save_klines_to_csv(klines, save_path) + source_meta["saved_csv"] = str(output_path) + print(f"Fetched klines saved to: {output_path}") + + return klines, source_meta + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Optimize conservative strategy params using last N days of kline data.") + + parser.add_argument("--data-file", type=str, default="", help="CSV path for kline data (recommended).") + parser.add_argument("--save-data-file", type=str, default="", help="When fetching from API, also save CSV here.") + parser.add_argument("--days", type=int, default=30, help="Use recent N days of data.") + parser.add_argument("--symbol", type=str, default="ETHUSDT", help="Bitmart contract symbol for API mode.") + parser.add_argument("--step", type=int, default=5, help="Kline step in minutes for API mode.") + parser.add_argument("--batch-hours", type=int, default=12, help="API fetch window size in hours.") + parser.add_argument("--api-sleep", type=float, default=0.05, help="Sleep seconds between API windows.") + + parser.add_argument("--api-key", type=str, default="", help="Bitmart API key (optional if using env).") + parser.add_argument("--secret-key", type=str, default="", help="Bitmart secret key (optional if using env).") + parser.add_argument("--memo", type=str, default="", help="Bitmart memo (optional).") + + parser.add_argument("--train-days", type=int, default=20, help="Rolling train window in days.") + parser.add_argument("--valid-days", type=int, default=10, help="Rolling validation window in days.") + parser.add_argument("--window-step-days", type=int, default=5, help="Rolling window step in days.") + + parser.add_argument("--method", type=str, default="auto", choices=["auto", "optuna", "random"], help="Search method.") + parser.add_argument("--n-trials", type=int, default=300, help="Number of optimization trials.") + parser.add_argument("--seed", type=int, default=42, help="Random seed.") + + parser.add_argument("--raw-fee-rate", type=float, default=0.0006, help="Raw taker fee rate (e.g. 0.0006).") + parser.add_argument("--rebate-ratio", type=float, default=0.90, help="Fee rebate ratio (e.g. 0.90 means 90%% rebate).") + + parser.add_argument("--drawdown-weight", type=float, default=1.4, help="Penalty weight for max drawdown.") + parser.add_argument("--loss-day-weight", type=float, default=0.8, help="Penalty weight for losing days.") + parser.add_argument("--min-trades", type=int, default=6, help="Minimum trades per window before penalty.") + parser.add_argument("--undertrade-penalty", type=float, default=0.5, help="Penalty per missing trade.") + parser.add_argument("--valid-score-weight", type=float, default=0.8, help="Weight of validation score in combined score.") + parser.add_argument("--drawdown-guard", type=float, default=10.0, help="Extra hard penalty beyond this validation drawdown.") + parser.add_argument("--stability-weight", type=float, default=0.3, help="Penalty for net-pnl variance across windows.") + + parser.add_argument( + "--output", + type=str, + default=str(Path(__file__).with_name("current_params.json")), + help="Output JSON path used by live conservative script.", + ) + + return parser.parse_args() + + +def main() -> None: + args = parse_args() + + klines, source_meta = load_klines(args) + if len(klines) < 200: + raise RuntimeError(f"Not enough klines for optimization: {len(klines)}") + + windows = split_rolling_windows( + klines, + train_days=args.train_days, + valid_days=args.valid_days, + step_days=args.window_step_days, + ) + if not windows: + raise RuntimeError( + "Could not build rolling windows. Increase data days or reduce train/valid window sizes." + ) + + print(f"Loaded klines: {len(klines)}, rolling windows: {len(windows)}") + + use_method = args.method + if use_method == "auto": + try: + import optuna # noqa: F401 + + use_method = "optuna" + except Exception: + use_method = "random" + + if use_method == "optuna": + best = optimize_with_optuna(windows, args) + else: + best = optimize_with_random(windows, args) + + best_params = normalize_params(best["params"]) + + result_payload = { + "updated_at": datetime.now(timezone.utc).isoformat(), + "source": { + **source_meta, + "bars": len(klines), + "start_ts": int(klines[0]["id"]), + "end_ts": int(klines[-1]["id"]), + }, + "optimization": { + "method": use_method, + "n_trials": args.n_trials, + "train_days": args.train_days, + "valid_days": args.valid_days, + "window_step_days": args.window_step_days, + "windows": len(windows), + "seed": args.seed, + }, + "fee_model": { + "raw_fee_rate": args.raw_fee_rate, + "rebate_ratio": args.rebate_ratio, + "effective_fee_rate": args.raw_fee_rate * (1 - args.rebate_ratio), + }, + "score": best["score"], + "train_metrics_avg": best.get("train_agg", {}), + "valid_metrics_avg": best.get("valid_agg", {}), + "params": best_params, + } + + output_path = Path(args.output).expanduser() + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as f: + json.dump(result_payload, f, ensure_ascii=False, indent=2) + + print("Optimization done.") + print(f"Best score: {best['score']:.6f}") + print(f"Saved params to: {output_path}") + print("Best params:") + for k in sorted(best_params): + print(f" {k}: {best_params[k]}") + + +if __name__ == "__main__": + main() diff --git a/bitmart/四分之一,五分钟,反手条件充足_保守模式.py b/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py similarity index 92% rename from bitmart/四分之一,五分钟,反手条件充足_保守模式.py rename to bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py index f9250c9..f2760e8 100644 --- a/bitmart/四分之一,五分钟,反手条件充足_保守模式.py +++ b/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py @@ -1,4 +1,12 @@ +import json +import os +import sys import time +from pathlib import Path + +PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) from tqdm import tqdm from loguru import logger @@ -64,6 +72,68 @@ class BitmartFuturesTransactionConservative: self.prev_entity = None # 上一根K线实体大小 self.current_open = None # 当前K线开盘价 + # 启动时尝试读取动态参数(可由优化脚本自动生成) + self.load_runtime_params() + + def load_runtime_params(self): + """ + 从 current_params.json 或环境变量 BITMART_PARAMS_PATH 指向的文件加载参数。 + 文件格式支持两种: + 1) {"params": {...}} + 2) {...} 直接是参数字典 + """ + params_path_env = os.getenv("BITMART_PARAMS_PATH") + params_path = Path(params_path_env).expanduser() if params_path_env else Path(__file__).with_name("current_params.json") + if not params_path.exists(): + logger.info(f"未找到动态参数文件,使用内置保守参数: {params_path}") + return + + allowed_keys = { + "leverage", + "take_profit_usd", + "stop_loss_usd", + "trailing_activation_usd", + "trailing_distance_usd", + "break_even_activation_usd", + "break_even_floor_usd", + "default_order_size", + "open_breakout_buffer_pct", + "enable_shadow_reverse", + "shadow_reverse_threshold_pct", + "open_cooldown_seconds", + "reverse_cooldown_seconds", + "reverse_min_move_pct", + } + + try: + with params_path.open("r", encoding="utf-8") as f: + loaded = json.load(f) + params = loaded.get("params", loaded) + if not isinstance(params, dict): + logger.warning(f"参数文件格式不正确,忽略: {params_path}") + return + + for key, value in params.items(): + if key not in allowed_keys: + continue + if key == "leverage": + setattr(self, key, str(value)) + else: + setattr(self, key, value) + + logger.success(f"已加载动态参数文件: {params_path}") + logger.info( + "动态参数生效: " + f"TP={self.take_profit_usd}, SL={self.stop_loss_usd}, " + f"TrailAct={self.trailing_activation_usd}, TrailDist={self.trailing_distance_usd}, " + f"BEAct={self.break_even_activation_usd}, BEFloor={self.break_even_floor_usd}, " + f"BreakoutBuf={self.open_breakout_buffer_pct}%, " + f"OpenCD={self.open_cooldown_seconds}s, ReverseCD={self.reverse_cooldown_seconds}s, " + f"ReverseMove={self.reverse_min_move_pct}%" + ) + except Exception as e: + logger.error(f"加载动态参数文件失败: {e} | path={params_path}") + def get_klines(self): """获取最近2根K线(当前K线和上一根K线)""" try: