优化前改动

This commit is contained in:
ddrwode
2026-02-06 11:12:43 +08:00
parent 0a7f206ad2
commit 2f9d589b33
5 changed files with 1345 additions and 0 deletions

View File

@@ -0,0 +1,94 @@
# 保守模式参数优化
这个目录包含两部分:
- 实盘脚本:`四分之一五分钟反手条件充足_保守模式.py`
- 参数优化:`optimize_params.py` + `backtest_engine.py`
`current_params.json` 是优化结果文件,实盘脚本启动时会自动读取并覆盖默认参数。
## 1. 使用 CSV 做 30 天参数优化(推荐)
CSV 至少包含:
- `id`(秒级时间戳)
- `open`
- `high`
- `low`
- `close`
运行示例:
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--data-file "/Users/ddrwode/code/lm_code/bitmart/数据/your_5m_30days.csv" \
--days 30 \
--train-days 20 \
--valid-days 10 \
--n-trials 300
```
优化完成后会写入:
- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/current_params.json`
## 2. 不提供 CSV直接从 API 自动抓取 30 天数据并优化
最简单一条命令(会自动抓取并计算):
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--days 30 \
--step 5 \
--n-trials 300
```
脚本会按以下顺序找凭证:
- 命令行 `--api-key/--secret-key`
- 环境变量 `BITMART_API_KEY/BITMART_SECRET_KEY`
- 保守模式脚本里的 `self.api_key/self.secret_key`
抓取到的K线会自动保存为
- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/auto_ethusdt_5m_30d.csv`
你也可以显式指定保存位置:
```bash
BITMART_API_KEY="xxx" BITMART_SECRET_KEY="xxx" \
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--days 30 \
--step 5 \
--n-trials 300 \
--save-data-file "/Users/ddrwode/code/lm_code/bitmart/数据/eth_5m_30days.csv"
```
## 3. 运行保守模式实盘脚本
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一五分钟反手条件充足_保守模式.py"
```
如果你想加载其他参数文件:
```bash
BITMART_PARAMS_PATH="/absolute/path/to/current_params.json" \
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一五分钟反手条件充足_保守模式.py"
```
## 4. 费用模型说明
优化器会按下面公式计入手续费返佣:
- `effective_fee_rate = raw_fee_rate * (1 - rebate_ratio)`
默认:
- `raw_fee_rate = 0.0006`
- `rebate_ratio = 0.90`
- `effective_fee_rate = 0.00006`
可通过命令行改:
- `--raw-fee-rate`
- `--rebate-ratio`
## 5. 重要提示
- 回测撮合属于简化模型,不等于实盘撮合。
- 参数应周期性重训(例如每天或每周)。
- 若出现交易次数过低,适当降低 `open_breakout_buffer_pct` 或冷却时间。

View File

@@ -0,0 +1,653 @@
from __future__ import annotations
import csv
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean
from typing import Any, Dict, List, Optional, Sequence, Tuple
DEFAULT_PARAMS: Dict[str, Any] = {
"leverage": "20",
"open_type": "cross",
"default_order_size": 10,
"take_profit_usd": 3.0,
"stop_loss_usd": -2.0,
"trailing_activation_usd": 1.2,
"trailing_distance_usd": 0.6,
"break_even_activation_usd": 1.0,
"break_even_floor_usd": 0.2,
"open_breakout_buffer_pct": 0.03,
"enable_shadow_reverse": False,
"shadow_reverse_threshold_pct": 0.15,
"open_cooldown_seconds": 300,
"reverse_cooldown_seconds": 300,
"reverse_min_move_pct": 0.15,
# Backtest-only calibration parameters (not used by live runtime script)
"order_notional_usd": 100.0,
"pnl_per_usd_move": 1.0,
"slippage_pct": 0.01,
}
def _as_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except Exception:
return default
def _as_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return False
def _parse_timestamp(value: Any) -> int:
text = str(value).strip()
if not text:
raise ValueError("empty timestamp")
if text.isdigit() or (text.startswith("-") and text[1:].isdigit()):
ts = int(text)
if ts > 10**12:
ts //= 1000
return ts
try:
ts = int(float(text))
if ts > 10**12:
ts //= 1000
return ts
except ValueError:
pass
dt_text = text.replace("Z", "+00:00")
dt = datetime.fromisoformat(dt_text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp())
def load_klines_from_csv(csv_path: str | Path, last_n_days: Optional[int] = 30) -> List[Dict[str, float]]:
"""
Load kline data from CSV. Supports these field names (case-insensitive):
- timestamp: id / timestamp / time / ts / datetime / date
- open: open / open_price
- high: high / high_price
- low: low / low_price
- close: close / close_price
- volume (optional): volume / vol
"""
path = Path(csv_path)
if not path.exists():
raise FileNotFoundError(f"CSV not found: {path}")
records: Dict[int, Dict[str, float]] = {}
with path.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
if not reader.fieldnames:
raise ValueError(f"CSV has no header: {path}")
field_map = {name.strip().lower(): name for name in reader.fieldnames if name}
def find_key(candidates: Sequence[str]) -> Optional[str]:
for candidate in candidates:
if candidate in field_map:
return field_map[candidate]
return None
ts_key = find_key(("id", "timestamp", "time", "ts", "datetime", "date"))
open_key = find_key(("open", "open_price"))
high_key = find_key(("high", "high_price"))
low_key = find_key(("low", "low_price"))
close_key = find_key(("close", "close_price"))
volume_key = find_key(("volume", "vol"))
if not ts_key or not open_key or not high_key or not low_key or not close_key:
raise ValueError(
"CSV is missing required columns. Need timestamp/open/high/low/close (or aliases)."
)
for row in reader:
try:
ts = _parse_timestamp(row.get(ts_key, ""))
kline = {
"id": ts,
"open": _as_float(row.get(open_key), 0.0),
"high": _as_float(row.get(high_key), 0.0),
"low": _as_float(row.get(low_key), 0.0),
"close": _as_float(row.get(close_key), 0.0),
}
if volume_key and row.get(volume_key) not in (None, ""):
kline["volume"] = _as_float(row.get(volume_key), 0.0)
records[ts] = kline
except Exception:
continue
klines = sorted(records.values(), key=lambda x: x["id"])
if last_n_days is not None and klines:
cutoff = klines[-1]["id"] - int(last_n_days * 24 * 3600)
klines = [k for k in klines if k["id"] >= cutoff]
return klines
def save_klines_to_csv(klines: Sequence[Dict[str, float]], output_path: str | Path) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "open", "high", "low", "close", "volume"])
for k in klines:
writer.writerow([
int(k["id"]),
k["open"],
k["high"],
k["low"],
k["close"],
k.get("volume", ""),
])
return path
def fetch_klines_from_api(
contract_api: Any,
contract_symbol: str = "ETHUSDT",
step: int = 5,
days: int = 30,
batch_hours: int = 12,
sleep_seconds: float = 0.05,
) -> List[Dict[str, float]]:
"""Fetch recent kline data from Bitmart APIContract-compatible client."""
end_ts = int(time.time())
start_ts = end_ts - days * 24 * 3600
cursor = start_ts
all_rows: Dict[int, Dict[str, float]] = {}
while cursor < end_ts:
batch_end = min(cursor + batch_hours * 3600, end_ts)
response = contract_api.get_kline(
contract_symbol=contract_symbol,
step=step,
start_time=cursor,
end_time=batch_end,
)[0]
if response.get("code") != 1000:
cursor = batch_end + 1
continue
for item in response.get("data", []):
try:
ts = int(item["timestamp"])
all_rows[ts] = {
"id": ts,
"open": float(item["open_price"]),
"high": float(item["high_price"]),
"low": float(item["low_price"]),
"close": float(item["close_price"]),
"volume": float(item.get("volume", 0) or 0),
}
except Exception:
continue
cursor = batch_end + 1
if sleep_seconds > 0:
time.sleep(sleep_seconds)
return sorted(all_rows.values(), key=lambda x: x["id"])
def split_rolling_windows(
klines: Sequence[Dict[str, float]],
train_days: int = 20,
valid_days: int = 10,
step_days: int = 5,
) -> List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]]:
"""Build rolling (train, valid) windows by timestamp ranges."""
if not klines:
return []
train_secs = train_days * 24 * 3600
valid_secs = valid_days * 24 * 3600
step_secs = max(1, step_days) * 24 * 3600
first_ts = int(klines[0]["id"])
last_ts = int(klines[-1]["id"])
required_span = train_secs + valid_secs
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]] = []
def slice_range(start_ts: int, end_ts: int) -> List[Dict[str, float]]:
return [k for k in klines if start_ts <= int(k["id"]) < end_ts]
cursor = first_ts
while cursor + required_span <= last_ts:
train_start = cursor
train_end = train_start + train_secs
valid_end = train_end + valid_secs
train_part = slice_range(train_start, train_end)
valid_part = slice_range(train_end, valid_end)
if len(train_part) >= 50 and len(valid_part) >= 30:
windows.append((train_part, valid_part))
cursor += step_secs
if not windows and (last_ts - first_ts) >= required_span:
valid_end = last_ts
valid_start = valid_end - valid_secs
train_start = valid_start - train_secs
train_part = slice_range(train_start, valid_start)
valid_part = slice_range(valid_start, valid_end)
if len(train_part) >= 50 and len(valid_part) >= 30:
windows.append((train_part, valid_part))
return windows
@dataclass
class BacktestResult:
net_pnl: float
gross_pnl: float
total_fees: float
trades: int
win_rate: float
max_drawdown: float
loss_days: int
start_ts: int
end_ts: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class ConservativeBacktester:
"""Backtester for the conservative quarter-breakout + reverse strategy."""
def __init__(
self,
params: Optional[Dict[str, Any]] = None,
raw_fee_rate: float = 0.0006,
rebate_ratio: float = 0.90,
) -> None:
self.params: Dict[str, Any] = dict(DEFAULT_PARAMS)
if params:
self.params.update(params)
self.raw_fee_rate = float(raw_fee_rate)
self.rebate_ratio = float(rebate_ratio)
self.effective_fee_rate = self.raw_fee_rate * (1.0 - self.rebate_ratio)
size_scale = max(_as_float(self.params.get("default_order_size", 10), 10.0) / 10.0, 0.05)
self.order_notional_usd = _as_float(self.params.get("order_notional_usd", 100.0), 100.0) * size_scale
self.pnl_per_usd_move = _as_float(self.params.get("pnl_per_usd_move", 1.0), 1.0) * size_scale
self.slippage_pct = _as_float(self.params.get("slippage_pct", 0.01), 0.01)
@staticmethod
def _entity_size(kline: Dict[str, float]) -> float:
return abs(kline["close"] - kline["open"])
@staticmethod
def _entity_edges(kline: Dict[str, float]) -> Tuple[float, float]:
upper = max(kline["open"], kline["close"])
lower = min(kline["open"], kline["close"])
return upper, lower
@staticmethod
def _upper_shadow_pct(kline: Dict[str, float]) -> float:
body_top = max(kline["open"], kline["close"])
if body_top <= 0:
return 0.0
return (kline["high"] - body_top) / body_top * 100.0
@staticmethod
def _lower_shadow_pct(kline: Dict[str, float]) -> float:
body_bottom = min(kline["open"], kline["close"])
if body_bottom <= 0:
return 0.0
return (body_bottom - kline["low"]) / body_bottom * 100.0
def _calc_fee(self) -> float:
return self.order_notional_usd * self.effective_fee_rate
def _apply_slippage(self, price: float, is_buy: bool) -> float:
if self.slippage_pct <= 0:
return price
shift = self.slippage_pct / 100.0
return price * (1.0 + shift) if is_buy else price * (1.0 - shift)
def _calc_levels(self, prev_kline: Dict[str, float], current_kline: Dict[str, float]) -> Optional[Dict[str, float]]:
prev_entity = self._entity_size(prev_kline)
if prev_entity < 0.1:
return None
prev_upper, prev_lower = self._entity_edges(prev_kline)
prev_bull_for_calc = prev_kline["close"] > prev_kline["open"]
prev_bear_for_calc = prev_kline["close"] < prev_kline["open"]
current_open_above_prev_close = current_kline["open"] > prev_kline["close"]
current_open_below_prev_close = current_kline["open"] < prev_kline["close"]
use_current_open_as_base = (
(prev_bull_for_calc and current_open_above_prev_close)
or (prev_bear_for_calc and current_open_below_prev_close)
)
if use_current_open_as_base:
base = current_kline["open"]
long_trigger = base + prev_entity / 4.0
short_trigger = base - prev_entity / 4.0
long_breakout = base + prev_entity / 4.0
short_breakout = base - prev_entity / 4.0
else:
long_trigger = prev_lower + prev_entity / 4.0
short_trigger = prev_upper - prev_entity / 4.0
long_breakout = prev_upper + prev_entity / 4.0
short_breakout = prev_lower - prev_entity / 4.0
open_buffer_pct = _as_float(self.params.get("open_breakout_buffer_pct", 0.03), 0.03)
long_entry_price = long_breakout * (1.0 + open_buffer_pct / 100.0)
short_entry_price = short_breakout * (1.0 - open_buffer_pct / 100.0)
prev_bearish = prev_kline["close"] < prev_kline["open"]
current_bullish = current_kline["close"] > current_kline["open"]
skip_short_by_upper_third = prev_bearish and current_bullish
prev_bullish = prev_kline["close"] > prev_kline["open"]
current_bearish = current_kline["close"] < current_kline["open"]
skip_long_by_lower_third = prev_bullish and current_bearish
return {
"prev_entity_upper": prev_upper,
"prev_entity_lower": prev_lower,
"long_trigger": long_trigger,
"short_trigger": short_trigger,
"long_entry_price": long_entry_price,
"short_entry_price": short_entry_price,
"skip_short_by_upper_third": float(skip_short_by_upper_third),
"skip_long_by_lower_third": float(skip_long_by_lower_third),
"upper_shadow_pct": self._upper_shadow_pct(prev_kline),
"lower_shadow_pct": self._lower_shadow_pct(prev_kline),
}
def run(self, klines: Sequence[Dict[str, float]]) -> BacktestResult:
if len(klines) < 3:
start_ts = int(klines[0]["id"]) if klines else 0
end_ts = int(klines[-1]["id"]) if klines else 0
return BacktestResult(
net_pnl=0.0,
gross_pnl=0.0,
total_fees=0.0,
trades=0,
win_rate=0.0,
max_drawdown=0.0,
loss_days=0,
start_ts=start_ts,
end_ts=end_ts,
)
open_cooldown = int(_as_float(self.params.get("open_cooldown_seconds", 300), 300))
reverse_cooldown = int(_as_float(self.params.get("reverse_cooldown_seconds", 300), 300))
reverse_min_move_pct = _as_float(self.params.get("reverse_min_move_pct", 0.15), 0.15)
take_profit = _as_float(self.params.get("take_profit_usd", 3.0), 3.0)
stop_loss = _as_float(self.params.get("stop_loss_usd", -2.0), -2.0)
trailing_activation = _as_float(self.params.get("trailing_activation_usd", 1.2), 1.2)
trailing_distance = _as_float(self.params.get("trailing_distance_usd", 0.6), 0.6)
be_activation = _as_float(self.params.get("break_even_activation_usd", 1.0), 1.0)
be_floor = _as_float(self.params.get("break_even_floor_usd", 0.2), 0.2)
enable_shadow_reverse = _as_bool(self.params.get("enable_shadow_reverse", False))
shadow_reverse_threshold = _as_float(self.params.get("shadow_reverse_threshold_pct", 0.15), 0.15)
position = 0 # 1 long, -1 short, 0 flat
entry_price = 0.0
entry_ts = 0
current_trade_open_fee = 0.0
last_open_ts: Optional[int] = None
last_open_kline_id: Optional[int] = None
last_reverse_ts: Optional[int] = None
max_unrealized_pnl_seen: Optional[float] = None
gross_pnl = 0.0
total_fees = 0.0
equity = 0.0
peak_equity = 0.0
max_drawdown = 0.0
trades = 0
wins = 0
daily_pnl: Dict[str, float] = {}
def mark_to_market(mark_price: float) -> float:
if position == 0:
return 0.0
return (mark_price - entry_price) * position * self.pnl_per_usd_move
def update_drawdown(mark_price: float) -> None:
nonlocal peak_equity, max_drawdown
mtm = mark_to_market(mark_price)
equity_mtm = equity + mtm
if equity_mtm > peak_equity:
peak_equity = equity_mtm
drawdown = peak_equity - equity_mtm
if drawdown > max_drawdown:
max_drawdown = drawdown
def can_open(current_kline_id: int) -> bool:
if last_open_kline_id is not None and current_kline_id == last_open_kline_id:
return False
if last_open_ts is not None and current_kline_id - last_open_ts < open_cooldown:
return False
return True
def can_reverse(current_price: float, trigger_price: float, now_ts: int) -> bool:
if last_reverse_ts is not None and now_ts - last_reverse_ts < reverse_cooldown:
return False
if trigger_price > 0:
move_pct = abs(current_price - trigger_price) / trigger_price * 100.0
if move_pct < reverse_min_move_pct:
return False
return True
def open_position(direction: int, raw_price: float, now_ts: int) -> None:
nonlocal position, entry_price, entry_ts
nonlocal last_open_ts, last_open_kline_id
nonlocal total_fees, equity, current_trade_open_fee
nonlocal max_unrealized_pnl_seen
is_buy = direction == 1
fill_price = self._apply_slippage(raw_price, is_buy=is_buy)
fee = self._calc_fee()
position = direction
entry_price = fill_price
entry_ts = now_ts
last_open_ts = now_ts
last_open_kline_id = now_ts
max_unrealized_pnl_seen = 0.0
total_fees += fee
equity -= fee
current_trade_open_fee = fee
def close_position(raw_price: float, now_ts: int) -> float:
nonlocal position, entry_price, entry_ts
nonlocal total_fees, equity, gross_pnl, trades, wins
nonlocal current_trade_open_fee, max_unrealized_pnl_seen
if position == 0:
return 0.0
is_buy = position == -1
fill_price = self._apply_slippage(raw_price, is_buy=is_buy)
realized = (fill_price - entry_price) * position * self.pnl_per_usd_move
close_fee = self._calc_fee()
trade_net = realized - close_fee - current_trade_open_fee
gross_pnl += realized
total_fees += close_fee
equity += realized - close_fee
day_key = datetime.utcfromtimestamp(now_ts).date().isoformat()
daily_pnl[day_key] = daily_pnl.get(day_key, 0.0) + trade_net
trades += 1
if trade_net > 0:
wins += 1
position = 0
entry_price = 0.0
entry_ts = 0
current_trade_open_fee = 0.0
max_unrealized_pnl_seen = None
return trade_net
for idx in range(1, len(klines)):
prev_kline = klines[idx - 1]
current_kline = klines[idx]
current_ts = int(current_kline["id"])
levels = self._calc_levels(prev_kline, current_kline)
if not levels:
update_drawdown(current_kline["close"])
continue
skip_short = bool(levels["skip_short_by_upper_third"])
skip_long = bool(levels["skip_long_by_lower_third"])
# Risk controls come first, matching live behavior.
if position != 0:
pnl_close = mark_to_market(current_kline["close"])
if max_unrealized_pnl_seen is None:
max_unrealized_pnl_seen = pnl_close
else:
max_unrealized_pnl_seen = max(max_unrealized_pnl_seen, pnl_close)
should_close = False
if pnl_close <= stop_loss:
should_close = True
elif max_unrealized_pnl_seen >= be_activation and pnl_close <= be_floor:
should_close = True
elif max_unrealized_pnl_seen >= trailing_activation and pnl_close < max_unrealized_pnl_seen - trailing_distance:
should_close = True
elif pnl_close >= take_profit:
should_close = True
if should_close:
close_position(current_kline["close"], current_ts)
update_drawdown(current_kline["close"])
continue
signal_type: Optional[str] = None
signal_price = 0.0
if position == 0:
long_hit = current_kline["high"] >= levels["long_entry_price"] and not skip_long
short_hit = current_kline["low"] <= levels["short_entry_price"] and not skip_short
# Conservative tie-break: if both sides touched in one bar, skip.
if long_hit and not short_hit:
signal_type = "long"
signal_price = levels["long_entry_price"]
elif short_hit and not long_hit:
signal_type = "short"
signal_price = levels["short_entry_price"]
if signal_type and can_open(current_ts):
open_position(1 if signal_type == "long" else -1, signal_price, current_ts)
elif position == 1:
reverse_hit = current_kline["low"] <= levels["short_trigger"] and not skip_short
shadow_hit = (
enable_shadow_reverse
and levels["upper_shadow_pct"] > shadow_reverse_threshold
and current_kline["low"] <= levels["prev_entity_lower"]
)
if reverse_hit or shadow_hit:
trigger = levels["short_trigger"] if reverse_hit else levels["prev_entity_lower"]
if can_reverse(current_kline["close"], trigger, current_ts):
close_position(trigger, current_ts)
open_position(-1, trigger, current_ts)
last_reverse_ts = current_ts
elif position == -1:
reverse_hit = current_kline["high"] >= levels["long_trigger"] and not skip_long
shadow_hit = (
enable_shadow_reverse
and levels["lower_shadow_pct"] > shadow_reverse_threshold
and current_kline["high"] >= levels["prev_entity_upper"]
)
if reverse_hit or shadow_hit:
trigger = levels["long_trigger"] if reverse_hit else levels["prev_entity_upper"]
if can_reverse(current_kline["close"], trigger, current_ts):
close_position(trigger, current_ts)
open_position(1, trigger, current_ts)
last_reverse_ts = current_ts
update_drawdown(current_kline["close"])
if position != 0:
last_bar = klines[-1]
close_position(last_bar["close"], int(last_bar["id"]))
update_drawdown(last_bar["close"])
win_rate = (wins / trades * 100.0) if trades > 0 else 0.0
loss_days = sum(1 for pnl in daily_pnl.values() if pnl < 0)
return BacktestResult(
net_pnl=equity,
gross_pnl=gross_pnl,
total_fees=total_fees,
trades=trades,
win_rate=win_rate,
max_drawdown=max_drawdown,
loss_days=loss_days,
start_ts=int(klines[0]["id"]),
end_ts=int(klines[-1]["id"]),
)
def score_result(
result: BacktestResult,
drawdown_weight: float = 1.4,
loss_day_weight: float = 0.8,
min_trades: int = 6,
undertrade_penalty: float = 0.5,
) -> float:
score = result.net_pnl - drawdown_weight * result.max_drawdown - loss_day_weight * result.loss_days
if result.trades < min_trades:
score -= (min_trades - result.trades) * undertrade_penalty
return score
def aggregate_results(results: Sequence[BacktestResult]) -> Dict[str, float]:
if not results:
return {
"net_pnl": 0.0,
"gross_pnl": 0.0,
"total_fees": 0.0,
"trades": 0.0,
"win_rate": 0.0,
"max_drawdown": 0.0,
"loss_days": 0.0,
}
return {
"net_pnl": mean([r.net_pnl for r in results]),
"gross_pnl": mean([r.gross_pnl for r in results]),
"total_fees": mean([r.total_fees for r in results]),
"trades": mean([r.trades for r in results]),
"win_rate": mean([r.win_rate for r in results]),
"max_drawdown": mean([r.max_drawdown for r in results]),
"loss_days": mean([r.loss_days for r in results]),
}

View File

@@ -0,0 +1,36 @@
{
"updated_at": "2026-02-06T00:00:00+00:00",
"source": {
"type": "manual_default"
},
"optimization": {
"method": "manual_default",
"n_trials": 0,
"windows": 0
},
"fee_model": {
"raw_fee_rate": 0.0006,
"rebate_ratio": 0.9,
"effective_fee_rate": 0.00006
},
"score": 0,
"params": {
"leverage": "20",
"take_profit_usd": 3.0,
"stop_loss_usd": -2.0,
"trailing_activation_usd": 1.2,
"trailing_distance_usd": 0.6,
"break_even_activation_usd": 1.0,
"break_even_floor_usd": 0.2,
"default_order_size": 10,
"open_breakout_buffer_pct": 0.03,
"enable_shadow_reverse": false,
"shadow_reverse_threshold_pct": 0.15,
"open_cooldown_seconds": 300,
"reverse_cooldown_seconds": 300,
"reverse_min_move_pct": 0.15,
"order_notional_usd": 100.0,
"pnl_per_usd_move": 1.0,
"slippage_pct": 0.01
}
}

View File

@@ -0,0 +1,492 @@
from __future__ import annotations
import argparse
import json
import os
import random
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean, pstdev
from typing import Any, Dict, List, Optional, Tuple
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from backtest_engine import (
DEFAULT_PARAMS,
ConservativeBacktester,
aggregate_results,
fetch_klines_from_api,
load_klines_from_csv,
save_klines_to_csv,
score_result,
split_rolling_windows,
)
STRATEGY_FILENAME = "四分之一五分钟反手条件充足_保守模式.py"
def _as_float(value: Any, default: float) -> float:
try:
return float(value)
except Exception:
return default
def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]:
p = dict(params)
p["take_profit_usd"] = round(max(_as_float(p.get("take_profit_usd", 3.0), 3.0), 0.5), 4)
p["stop_loss_usd"] = round(min(_as_float(p.get("stop_loss_usd", -2.0), -2.0), -0.1), 4)
p["trailing_activation_usd"] = round(max(_as_float(p.get("trailing_activation_usd", 1.2), 1.2), 0.2), 4)
p["trailing_distance_usd"] = round(max(_as_float(p.get("trailing_distance_usd", 0.6), 0.6), 0.1), 4)
p["break_even_activation_usd"] = round(max(_as_float(p.get("break_even_activation_usd", 1.0), 1.0), 0.2), 4)
p["break_even_floor_usd"] = round(max(_as_float(p.get("break_even_floor_usd", 0.2), 0.2), 0.0), 4)
p["open_breakout_buffer_pct"] = round(max(_as_float(p.get("open_breakout_buffer_pct", 0.03), 0.03), 0.0), 4)
p["shadow_reverse_threshold_pct"] = round(max(_as_float(p.get("shadow_reverse_threshold_pct", 0.15), 0.15), 0.01), 4)
p["reverse_min_move_pct"] = round(max(_as_float(p.get("reverse_min_move_pct", 0.15), 0.15), 0.01), 4)
p["open_cooldown_seconds"] = int(max(_as_float(p.get("open_cooldown_seconds", 300), 300), 1))
p["reverse_cooldown_seconds"] = int(max(_as_float(p.get("reverse_cooldown_seconds", 300), 300), 1))
p["default_order_size"] = int(max(_as_float(p.get("default_order_size", 10), 10), 1))
p["order_notional_usd"] = round(max(_as_float(p.get("order_notional_usd", 100.0), 100.0), 5.0), 4)
p["pnl_per_usd_move"] = round(max(_as_float(p.get("pnl_per_usd_move", 1.0), 1.0), 0.01), 4)
p["slippage_pct"] = round(max(_as_float(p.get("slippage_pct", 0.01), 0.01), 0.0), 5)
p["enable_shadow_reverse"] = bool(p.get("enable_shadow_reverse", False))
# Keep relationships sane.
if p["trailing_activation_usd"] <= p["break_even_activation_usd"]:
p["trailing_activation_usd"] = round(p["break_even_activation_usd"] + 0.2, 4)
if p["take_profit_usd"] <= p["trailing_activation_usd"]:
p["take_profit_usd"] = round(p["trailing_activation_usd"] + 0.4, 4)
max_floor = p["break_even_activation_usd"] * 0.9
if p["break_even_floor_usd"] > max_floor:
p["break_even_floor_usd"] = round(max_floor, 4)
if p["trailing_distance_usd"] >= p["trailing_activation_usd"]:
p["trailing_distance_usd"] = round(max(0.1, p["trailing_activation_usd"] * 0.6), 4)
return p
def load_credentials_from_strategy_file(strategy_path: Path) -> Dict[str, str]:
"""
从保守模式策略文件里提取 self.api_key/self.secret_key/self.memo作为自动抓取兜底。
"""
if not strategy_path.exists():
return {}
try:
content = strategy_path.read_text(encoding="utf-8")
except Exception:
return {}
def extract(name: str) -> str:
pattern = rf'self\.{name}\s*=\s*["\']([^"\']+)["\']'
m = re.search(pattern, content)
return m.group(1).strip() if m else ""
api_key = extract("api_key")
secret_key = extract("secret_key")
memo = extract("memo")
result: Dict[str, str] = {}
if api_key:
result["api_key"] = api_key
if secret_key:
result["secret_key"] = secret_key
if memo:
result["memo"] = memo
return result
def sample_random_params(rng: random.Random) -> Dict[str, Any]:
params = dict(DEFAULT_PARAMS)
params.update(
{
"take_profit_usd": rng.uniform(1.2, 6.0),
"stop_loss_usd": -rng.uniform(0.8, 4.0),
"trailing_activation_usd": rng.uniform(0.5, 3.0),
"trailing_distance_usd": rng.uniform(0.2, 1.8),
"break_even_activation_usd": rng.uniform(0.3, 2.2),
"break_even_floor_usd": rng.uniform(0.0, 1.0),
"open_breakout_buffer_pct": rng.uniform(0.0, 0.25),
"reverse_min_move_pct": rng.uniform(0.05, 0.5),
"open_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]),
"reverse_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]),
"enable_shadow_reverse": rng.random() < 0.15,
"shadow_reverse_threshold_pct": rng.uniform(0.08, 0.5),
"slippage_pct": rng.uniform(0.005, 0.05),
}
)
return normalize_params(params)
def evaluate_candidate(
params: Dict[str, Any],
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]],
raw_fee_rate: float,
rebate_ratio: float,
drawdown_weight: float,
loss_day_weight: float,
min_trades: int,
undertrade_penalty: float,
valid_score_weight: float,
drawdown_guard: float,
stability_weight: float,
) -> Tuple[float, Dict[str, float], Dict[str, float]]:
train_results = []
valid_results = []
window_scores = []
for train_data, valid_data in windows:
backtester = ConservativeBacktester(params=params, raw_fee_rate=raw_fee_rate, rebate_ratio=rebate_ratio)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_results.append(train_res)
valid_results.append(valid_res)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
combined = valid_score_weight * valid_score + (1.0 - valid_score_weight) * train_score
# Hard guard against large validation drawdown.
if valid_res.max_drawdown > drawdown_guard:
combined -= (valid_res.max_drawdown - drawdown_guard) * 2.0
window_scores.append(combined)
mean_score = mean(window_scores) if window_scores else -1e9
if len(valid_results) > 1:
stability_penalty = pstdev([r.net_pnl for r in valid_results]) * stability_weight
mean_score -= stability_penalty
train_agg = aggregate_results(train_results)
valid_agg = aggregate_results(valid_results)
return mean_score, train_agg, valid_agg
def optimize_with_optuna(
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]],
args: argparse.Namespace,
) -> Dict[str, Any]:
import optuna # type: ignore
sampler = optuna.samplers.TPESampler(seed=args.seed)
study = optuna.create_study(direction="maximize", sampler=sampler)
best: Dict[str, Any] = {}
def objective(trial: Any) -> float:
params = dict(DEFAULT_PARAMS)
params.update(
{
"take_profit_usd": trial.suggest_float("take_profit_usd", 1.2, 6.0),
"stop_loss_usd": -trial.suggest_float("stop_loss_abs", 0.8, 4.0),
"trailing_activation_usd": trial.suggest_float("trailing_activation_usd", 0.5, 3.0),
"trailing_distance_usd": trial.suggest_float("trailing_distance_usd", 0.2, 1.8),
"break_even_activation_usd": trial.suggest_float("break_even_activation_usd", 0.3, 2.2),
"break_even_floor_usd": trial.suggest_float("break_even_floor_usd", 0.0, 1.0),
"open_breakout_buffer_pct": trial.suggest_float("open_breakout_buffer_pct", 0.0, 0.25),
"reverse_min_move_pct": trial.suggest_float("reverse_min_move_pct", 0.05, 0.5),
"open_cooldown_seconds": trial.suggest_int("open_cooldown_seconds", 60, 900, step=60),
"reverse_cooldown_seconds": trial.suggest_int("reverse_cooldown_seconds", 60, 900, step=60),
"enable_shadow_reverse": trial.suggest_categorical("enable_shadow_reverse", [False, True]),
"shadow_reverse_threshold_pct": trial.suggest_float("shadow_reverse_threshold_pct", 0.08, 0.5),
"slippage_pct": trial.suggest_float("slippage_pct", 0.005, 0.05),
}
)
params = normalize_params(params)
score, train_agg, valid_agg = evaluate_candidate(
params=params,
windows=windows,
raw_fee_rate=args.raw_fee_rate,
rebate_ratio=args.rebate_ratio,
drawdown_weight=args.drawdown_weight,
loss_day_weight=args.loss_day_weight,
min_trades=args.min_trades,
undertrade_penalty=args.undertrade_penalty,
valid_score_weight=args.valid_score_weight,
drawdown_guard=args.drawdown_guard,
stability_weight=args.stability_weight,
)
trial.set_user_attr("train_agg", train_agg)
trial.set_user_attr("valid_agg", valid_agg)
nonlocal best
if not best or score > best["score"]:
best = {
"score": score,
"params": params,
"train_agg": train_agg,
"valid_agg": valid_agg,
}
return score
study.optimize(objective, n_trials=args.n_trials, show_progress_bar=False)
if best:
return best
trial = study.best_trial
return {
"score": trial.value,
"params": normalize_params(dict(DEFAULT_PARAMS)),
"train_agg": trial.user_attrs.get("train_agg", {}),
"valid_agg": trial.user_attrs.get("valid_agg", {}),
}
def optimize_with_random(
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]],
args: argparse.Namespace,
) -> Dict[str, Any]:
rng = random.Random(args.seed)
best: Optional[Dict[str, Any]] = None
for i in range(args.n_trials):
params = sample_random_params(rng)
score, train_agg, valid_agg = evaluate_candidate(
params=params,
windows=windows,
raw_fee_rate=args.raw_fee_rate,
rebate_ratio=args.rebate_ratio,
drawdown_weight=args.drawdown_weight,
loss_day_weight=args.loss_day_weight,
min_trades=args.min_trades,
undertrade_penalty=args.undertrade_penalty,
valid_score_weight=args.valid_score_weight,
drawdown_guard=args.drawdown_guard,
stability_weight=args.stability_weight,
)
if best is None or score > best["score"]:
best = {
"score": score,
"params": params,
"train_agg": train_agg,
"valid_agg": valid_agg,
"trial": i + 1,
}
if (i + 1) % 20 == 0:
print(f"[random] trial {i + 1}/{args.n_trials}, best_score={best['score']:.6f}")
if best is None:
raise RuntimeError("random optimization failed: no trial executed")
return best
def load_klines(args: argparse.Namespace) -> Tuple[List[Dict[str, float]], Dict[str, Any]]:
if args.data_file:
csv_path = Path(args.data_file).expanduser()
klines = load_klines_from_csv(csv_path, last_n_days=args.days)
source_meta = {"type": "csv", "path": str(csv_path)}
return klines, source_meta
api_key = args.api_key or os.getenv("BITMART_API_KEY")
secret_key = args.secret_key or os.getenv("BITMART_SECRET_KEY")
memo = args.memo or os.getenv("BITMART_MEMO", "参数优化")
credential_source = "args_or_env"
if not api_key or not secret_key:
strategy_path = Path(__file__).with_name(STRATEGY_FILENAME)
extracted = load_credentials_from_strategy_file(strategy_path)
if extracted:
api_key = api_key or extracted.get("api_key", "")
secret_key = secret_key or extracted.get("secret_key", "")
memo = memo or extracted.get("memo", "参数优化")
credential_source = f"strategy_file:{strategy_path.name}"
if not api_key or not secret_key:
raise ValueError(
"No --data-file provided and API credentials missing. "
"Provide --api-key/--secret-key or env BITMART_API_KEY/BITMART_SECRET_KEY. "
f"Fallback strategy file attempted: {STRATEGY_FILENAME}"
)
from bitmart.api_contract import APIContract
print(f"Using API credentials source: {credential_source}")
client = APIContract(api_key, secret_key, memo, timeout=(5, 15))
klines = fetch_klines_from_api(
contract_api=client,
contract_symbol=args.symbol,
step=args.step,
days=args.days,
batch_hours=args.batch_hours,
sleep_seconds=args.api_sleep,
)
source_meta = {
"type": "api",
"symbol": args.symbol,
"step": args.step,
"days": args.days,
"credential_source": credential_source,
}
save_path = args.save_data_file
if not save_path:
save_path = str(Path(__file__).with_name(f"auto_{args.symbol.lower()}_{args.step}m_{args.days}d.csv"))
if save_path:
output_path = save_klines_to_csv(klines, save_path)
source_meta["saved_csv"] = str(output_path)
print(f"Fetched klines saved to: {output_path}")
return klines, source_meta
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Optimize conservative strategy params using last N days of kline data.")
parser.add_argument("--data-file", type=str, default="", help="CSV path for kline data (recommended).")
parser.add_argument("--save-data-file", type=str, default="", help="When fetching from API, also save CSV here.")
parser.add_argument("--days", type=int, default=30, help="Use recent N days of data.")
parser.add_argument("--symbol", type=str, default="ETHUSDT", help="Bitmart contract symbol for API mode.")
parser.add_argument("--step", type=int, default=5, help="Kline step in minutes for API mode.")
parser.add_argument("--batch-hours", type=int, default=12, help="API fetch window size in hours.")
parser.add_argument("--api-sleep", type=float, default=0.05, help="Sleep seconds between API windows.")
parser.add_argument("--api-key", type=str, default="", help="Bitmart API key (optional if using env).")
parser.add_argument("--secret-key", type=str, default="", help="Bitmart secret key (optional if using env).")
parser.add_argument("--memo", type=str, default="", help="Bitmart memo (optional).")
parser.add_argument("--train-days", type=int, default=20, help="Rolling train window in days.")
parser.add_argument("--valid-days", type=int, default=10, help="Rolling validation window in days.")
parser.add_argument("--window-step-days", type=int, default=5, help="Rolling window step in days.")
parser.add_argument("--method", type=str, default="auto", choices=["auto", "optuna", "random"], help="Search method.")
parser.add_argument("--n-trials", type=int, default=300, help="Number of optimization trials.")
parser.add_argument("--seed", type=int, default=42, help="Random seed.")
parser.add_argument("--raw-fee-rate", type=float, default=0.0006, help="Raw taker fee rate (e.g. 0.0006).")
parser.add_argument("--rebate-ratio", type=float, default=0.90, help="Fee rebate ratio (e.g. 0.90 means 90%% rebate).")
parser.add_argument("--drawdown-weight", type=float, default=1.4, help="Penalty weight for max drawdown.")
parser.add_argument("--loss-day-weight", type=float, default=0.8, help="Penalty weight for losing days.")
parser.add_argument("--min-trades", type=int, default=6, help="Minimum trades per window before penalty.")
parser.add_argument("--undertrade-penalty", type=float, default=0.5, help="Penalty per missing trade.")
parser.add_argument("--valid-score-weight", type=float, default=0.8, help="Weight of validation score in combined score.")
parser.add_argument("--drawdown-guard", type=float, default=10.0, help="Extra hard penalty beyond this validation drawdown.")
parser.add_argument("--stability-weight", type=float, default=0.3, help="Penalty for net-pnl variance across windows.")
parser.add_argument(
"--output",
type=str,
default=str(Path(__file__).with_name("current_params.json")),
help="Output JSON path used by live conservative script.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
klines, source_meta = load_klines(args)
if len(klines) < 200:
raise RuntimeError(f"Not enough klines for optimization: {len(klines)}")
windows = split_rolling_windows(
klines,
train_days=args.train_days,
valid_days=args.valid_days,
step_days=args.window_step_days,
)
if not windows:
raise RuntimeError(
"Could not build rolling windows. Increase data days or reduce train/valid window sizes."
)
print(f"Loaded klines: {len(klines)}, rolling windows: {len(windows)}")
use_method = args.method
if use_method == "auto":
try:
import optuna # noqa: F401
use_method = "optuna"
except Exception:
use_method = "random"
if use_method == "optuna":
best = optimize_with_optuna(windows, args)
else:
best = optimize_with_random(windows, args)
best_params = normalize_params(best["params"])
result_payload = {
"updated_at": datetime.now(timezone.utc).isoformat(),
"source": {
**source_meta,
"bars": len(klines),
"start_ts": int(klines[0]["id"]),
"end_ts": int(klines[-1]["id"]),
},
"optimization": {
"method": use_method,
"n_trials": args.n_trials,
"train_days": args.train_days,
"valid_days": args.valid_days,
"window_step_days": args.window_step_days,
"windows": len(windows),
"seed": args.seed,
},
"fee_model": {
"raw_fee_rate": args.raw_fee_rate,
"rebate_ratio": args.rebate_ratio,
"effective_fee_rate": args.raw_fee_rate * (1 - args.rebate_ratio),
},
"score": best["score"],
"train_metrics_avg": best.get("train_agg", {}),
"valid_metrics_avg": best.get("valid_agg", {}),
"params": best_params,
}
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(result_payload, f, ensure_ascii=False, indent=2)
print("Optimization done.")
print(f"Best score: {best['score']:.6f}")
print(f"Saved params to: {output_path}")
print("Best params:")
for k in sorted(best_params):
print(f" {k}: {best_params[k]}")
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,12 @@
import json
import os
import sys
import time
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from tqdm import tqdm
from loguru import logger
@@ -64,6 +72,68 @@ class BitmartFuturesTransactionConservative:
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
# 启动时尝试读取动态参数(可由优化脚本自动生成)
self.load_runtime_params()
def load_runtime_params(self):
"""
current_params.json 或环境变量 BITMART_PARAMS_PATH 指向的文件加载参数
文件格式支持两种
1) {"params": {...}}
2) {...} 直接是参数字典
"""
params_path_env = os.getenv("BITMART_PARAMS_PATH")
params_path = Path(params_path_env).expanduser() if params_path_env else Path(__file__).with_name("current_params.json")
if not params_path.exists():
logger.info(f"未找到动态参数文件,使用内置保守参数: {params_path}")
return
allowed_keys = {
"leverage",
"take_profit_usd",
"stop_loss_usd",
"trailing_activation_usd",
"trailing_distance_usd",
"break_even_activation_usd",
"break_even_floor_usd",
"default_order_size",
"open_breakout_buffer_pct",
"enable_shadow_reverse",
"shadow_reverse_threshold_pct",
"open_cooldown_seconds",
"reverse_cooldown_seconds",
"reverse_min_move_pct",
}
try:
with params_path.open("r", encoding="utf-8") as f:
loaded = json.load(f)
params = loaded.get("params", loaded)
if not isinstance(params, dict):
logger.warning(f"参数文件格式不正确,忽略: {params_path}")
return
for key, value in params.items():
if key not in allowed_keys:
continue
if key == "leverage":
setattr(self, key, str(value))
else:
setattr(self, key, value)
logger.success(f"已加载动态参数文件: {params_path}")
logger.info(
"动态参数生效: "
f"TP={self.take_profit_usd}, SL={self.stop_loss_usd}, "
f"TrailAct={self.trailing_activation_usd}, TrailDist={self.trailing_distance_usd}, "
f"BEAct={self.break_even_activation_usd}, BEFloor={self.break_even_floor_usd}, "
f"BreakoutBuf={self.open_breakout_buffer_pct}%, "
f"OpenCD={self.open_cooldown_seconds}s, ReverseCD={self.reverse_cooldown_seconds}s, "
f"ReverseMove={self.reverse_min_move_pct}%"
)
except Exception as e:
logger.error(f"加载动态参数文件失败: {e} | path={params_path}")
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try: