优化前改动

This commit is contained in:
ddrwode
2026-02-06 14:07:19 +08:00
parent 275b1319f8
commit 649858761e
4 changed files with 729 additions and 43 deletions

View File

@@ -29,6 +29,24 @@ python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_p
优化完成后会写入:
- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/current_params.json`
如果你想做更稳健的参数(推荐实盘前使用):
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--data-file "/Users/ddrwode/code/lm_code/bitmart/数据/your_5m_90days.csv" \
--days 90 \
--train-days 45 \
--valid-days 15 \
--window-step-days 7 \
--method optuna \
--n-trials 1200 \
--valid-score-weight 0.9 \
--drawdown-guard 8 \
--stability-weight 0.6 \
--stress-slippage-multipliers "0.8,1.0,1.2,1.4" \
--stress-fee-multipliers "1.0,1.15,1.3"
```
## 2. 不提供 CSV直接从 API 自动抓取 30 天数据并优化
最简单一条命令(会自动抓取并计算):
@@ -72,6 +90,10 @@ BITMART_PARAMS_PATH="/absolute/path/to/current_params.json" \
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一五分钟反手条件充足_保守模式.py"
```
注意:
- 实盘下单数量现在会直接使用 `default_order_size`(不再固定写死 25与回测/优化口径一致。
- `current_params.json` 新增支持趋势过滤和 AI 门控参数(见第 7 节)。
## 4. 实时价格WebSocket
保守模式脚本已支持:
@@ -132,8 +154,20 @@ pip3 install websocket-client
可通过命令行改:
- `--raw-fee-rate`
- `--rebate-ratio`
- `--stress-slippage-multipliers`
- `--stress-fee-multipliers`
## 7. 重要提示
## 7. 趋势过滤 + AI 门控(新)
实盘与回测都支持以下参数(可由优化器自动搜索):
- 趋势过滤:`enable_trend_filter``trend_ema_fast``trend_ema_slow``trend_strength_threshold_pct``trend_slope_lookback``trend_slope_threshold_pct`
- AI 门控:`enable_ai_filter``ai_min_confidence``ai_bias``ai_w_trend_align``ai_w_breakout_strength``ai_w_entity_strength``ai_w_shadow_balance``ai_w_volatility_fit`
说明:
- 趋势过滤只影响新开仓,不阻断风控平仓和反手。
- AI 门控是轻量打分器logistic 风格),用于 `trade/skip`,不是黑盒大模型。
## 8. 重要提示
- 回测撮合属于简化模型,不等于实盘撮合。
- 参数应周期性重训(例如每天或每周)。

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import csv
import math
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
@@ -22,6 +23,21 @@ DEFAULT_PARAMS: Dict[str, Any] = {
"open_breakout_buffer_pct": 0.03,
"enable_shadow_reverse": False,
"shadow_reverse_threshold_pct": 0.15,
"enable_trend_filter": True,
"trend_ema_fast": 21,
"trend_ema_slow": 55,
"trend_strength_threshold_pct": 0.02,
"trend_slope_lookback": 3,
"trend_slope_threshold_pct": 0.005,
"enable_ai_filter": True,
"ai_min_confidence": 0.58,
"ai_bias": -0.1,
"ai_w_trend_align": 1.1,
"ai_w_breakout_strength": 0.8,
"ai_w_entity_strength": 0.55,
"ai_w_shadow_balance": 0.35,
"ai_w_volatility_fit": 0.45,
"dynamic_vol_target_pct": 0.25,
"open_cooldown_seconds": 300,
"reverse_cooldown_seconds": 300,
"reverse_min_move_pct": 0.15,
@@ -322,6 +338,69 @@ class ConservativeBacktester:
shift = self.slippage_pct / 100.0
return price * (1.0 + shift) if is_buy else price * (1.0 - shift)
@staticmethod
def _clip(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _signal_confidence(
self,
side: str,
current_price: float,
entry_price: float,
prev_kline: Dict[str, float],
levels: Dict[str, float],
trend_bias: str,
trend_strength_pct: float,
target_volatility_pct: float,
) -> float:
prev_entity = self._entity_size(prev_kline)
prev_close = max(float(prev_kline.get("close", 0.0)), 0.000001)
entity_pct = prev_entity / prev_close * 100.0
upper_shadow_pct = levels["upper_shadow_pct"]
lower_shadow_pct = levels["lower_shadow_pct"]
if entry_price <= 0:
breakout_move_pct = 0.0
elif side == "long":
breakout_move_pct = max(0.0, (current_price - entry_price) / entry_price * 100.0)
else:
breakout_move_pct = max(0.0, (entry_price - current_price) / entry_price * 100.0)
if side == "long":
trend_align = 1.0 if trend_bias == "bull" else (-1.0 if trend_bias == "bear" else 0.0)
shadow_balance = lower_shadow_pct - upper_shadow_pct
else:
trend_align = 1.0 if trend_bias == "bear" else (-1.0 if trend_bias == "bull" else 0.0)
shadow_balance = upper_shadow_pct - lower_shadow_pct
trend_strength_threshold = max(_as_float(self.params.get("trend_strength_threshold_pct", 0.02), 0.02), 0.01)
trend_strength_scale = self._clip(abs(trend_strength_pct) / trend_strength_threshold, 0.0, 2.5)
trend_component = trend_align * trend_strength_scale
breakout_strength = self._clip(breakout_move_pct / 0.25, 0.0, 2.5)
entity_strength = self._clip(entity_pct / 0.50, 0.0, 2.5)
shadow_component = self._clip(shadow_balance / 0.30, -2.5, 2.5)
current_vol = 0.0
current_close = float(current_price)
if current_close > 0:
current_vol = abs(float(prev_kline["high"]) - float(prev_kline["low"])) / current_close * 100.0
target_vol = max(float(target_volatility_pct), 0.05)
vol_fit = 1.0 - abs(current_vol - target_vol) / (target_vol * 2.0)
vol_fit = self._clip(vol_fit, -1.5, 1.5)
score = (
_as_float(self.params.get("ai_bias", -0.1), -0.1)
+ _as_float(self.params.get("ai_w_trend_align", 1.1), 1.1) * trend_component
+ _as_float(self.params.get("ai_w_breakout_strength", 0.8), 0.8) * breakout_strength
+ _as_float(self.params.get("ai_w_entity_strength", 0.55), 0.55) * entity_strength
+ _as_float(self.params.get("ai_w_shadow_balance", 0.35), 0.35) * shadow_component
+ _as_float(self.params.get("ai_w_volatility_fit", 0.45), 0.45) * vol_fit
)
score = self._clip(score, -60.0, 60.0)
return 1.0 / (1.0 + math.exp(-score))
def _calc_levels(self, prev_kline: Dict[str, float], current_kline: Dict[str, float]) -> Optional[Dict[str, float]]:
prev_entity = self._entity_size(prev_kline)
if prev_entity < 0.1:
@@ -404,6 +483,15 @@ class ConservativeBacktester:
enable_shadow_reverse = _as_bool(self.params.get("enable_shadow_reverse", False))
shadow_reverse_threshold = _as_float(self.params.get("shadow_reverse_threshold_pct", 0.15), 0.15)
enable_trend_filter = _as_bool(self.params.get("enable_trend_filter", True))
trend_ema_fast = int(max(_as_float(self.params.get("trend_ema_fast", 21), 21), 2))
trend_ema_slow = int(max(_as_float(self.params.get("trend_ema_slow", 55), 55), trend_ema_fast + 1))
trend_strength_threshold = _as_float(self.params.get("trend_strength_threshold_pct", 0.02), 0.02)
trend_slope_lookback = int(max(_as_float(self.params.get("trend_slope_lookback", 3), 3), 1))
trend_slope_threshold = _as_float(self.params.get("trend_slope_threshold_pct", 0.005), 0.005)
enable_ai_filter = _as_bool(self.params.get("enable_ai_filter", True))
ai_min_confidence = _as_float(self.params.get("ai_min_confidence", 0.58), 0.58)
target_volatility_pct = _as_float(self.params.get("dynamic_vol_target_pct", 0.25), 0.25)
position = 0 # 1 long, -1 short, 0 flat
entry_price = 0.0
@@ -426,6 +514,11 @@ class ConservativeBacktester:
trades = 0
wins = 0
daily_pnl: Dict[str, float] = {}
ema_fast = float(klines[0]["close"])
ema_slow = float(klines[0]["close"])
alpha_fast = 2.0 / (trend_ema_fast + 1.0)
alpha_slow = 2.0 / (trend_ema_slow + 1.0)
slow_history: List[float] = [ema_slow]
def mark_to_market(mark_price: float) -> float:
if position == 0:
@@ -516,6 +609,27 @@ class ConservativeBacktester:
prev_kline = klines[idx - 1]
current_kline = klines[idx]
current_ts = int(current_kline["id"])
close_price = float(current_kline["close"])
ema_fast = ema_fast + alpha_fast * (close_price - ema_fast)
ema_slow = ema_slow + alpha_slow * (close_price - ema_slow)
slow_history.append(ema_slow)
max_slow_history = trend_slope_lookback + 5
if len(slow_history) > max_slow_history:
slow_history = slow_history[-max_slow_history:]
trend_strength_pct = 0.0
trend_slope_pct = 0.0
trend_bias = "neutral"
if ema_slow > 0:
trend_strength_pct = (ema_fast - ema_slow) / ema_slow * 100.0
if len(slow_history) > trend_slope_lookback:
ref_slow = slow_history[-1 - trend_slope_lookback]
if ref_slow > 0:
trend_slope_pct = (ema_slow - ref_slow) / ref_slow * 100.0
if trend_strength_pct >= trend_strength_threshold and trend_slope_pct >= trend_slope_threshold:
trend_bias = "bull"
elif trend_strength_pct <= -trend_strength_threshold and trend_slope_pct <= -trend_slope_threshold:
trend_bias = "bear"
levels = self._calc_levels(prev_kline, current_kline)
if not levels:
@@ -564,7 +678,29 @@ class ConservativeBacktester:
signal_price = levels["short_entry_price"]
if signal_type and can_open(current_ts):
open_position(1 if signal_type == "long" else -1, signal_price, current_ts)
allow_signal = True
if enable_trend_filter:
if signal_type == "long" and trend_bias == "bear":
allow_signal = False
elif signal_type == "short" and trend_bias == "bull":
allow_signal = False
if allow_signal and enable_ai_filter:
confidence = self._signal_confidence(
side=signal_type,
current_price=close_price,
entry_price=signal_price,
prev_kline=prev_kline,
levels=levels,
trend_bias=trend_bias,
trend_strength_pct=trend_strength_pct,
target_volatility_pct=target_volatility_pct,
)
if confidence < ai_min_confidence:
allow_signal = False
if allow_signal:
open_position(1 if signal_type == "long" else -1, signal_price, current_ts)
elif position == 1:
reverse_hit = current_kline["low"] <= levels["short_trigger"] and not skip_short

View File

@@ -37,6 +37,35 @@ def _as_float(value: Any, default: float) -> float:
return default
def _as_bool(value: Any, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return default
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _parse_float_list(text: str, default: List[float]) -> List[float]:
if not text:
return list(default)
values: List[float] = []
for part in text.split(","):
raw = part.strip()
if not raw:
continue
try:
values.append(float(raw))
except Exception:
continue
return values or list(default)
def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]:
p = dict(params)
@@ -60,8 +89,24 @@ def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]:
p["order_notional_usd"] = round(max(_as_float(p.get("order_notional_usd", 100.0), 100.0), 5.0), 4)
p["pnl_per_usd_move"] = round(max(_as_float(p.get("pnl_per_usd_move", 1.0), 1.0), 0.01), 4)
p["slippage_pct"] = round(max(_as_float(p.get("slippage_pct", 0.01), 0.01), 0.0), 5)
p["dynamic_vol_target_pct"] = round(_clamp(_as_float(p.get("dynamic_vol_target_pct", 0.25), 0.25), 0.05, 1.5), 6)
p["enable_shadow_reverse"] = bool(p.get("enable_shadow_reverse", False))
p["enable_shadow_reverse"] = _as_bool(p.get("enable_shadow_reverse", False), False)
p["enable_trend_filter"] = _as_bool(p.get("enable_trend_filter", True), True)
p["trend_ema_fast"] = int(max(_as_float(p.get("trend_ema_fast", 21), 21), 2))
p["trend_ema_slow"] = int(max(_as_float(p.get("trend_ema_slow", 55), 55), p["trend_ema_fast"] + 1))
p["trend_strength_threshold_pct"] = round(_clamp(_as_float(p.get("trend_strength_threshold_pct", 0.02), 0.02), 0.0, 0.2), 6)
p["trend_slope_lookback"] = int(max(_as_float(p.get("trend_slope_lookback", 3), 3), 1))
p["trend_slope_threshold_pct"] = round(_clamp(_as_float(p.get("trend_slope_threshold_pct", 0.005), 0.005), 0.0, 0.1), 6)
p["enable_ai_filter"] = _as_bool(p.get("enable_ai_filter", True), True)
p["ai_min_confidence"] = round(_clamp(_as_float(p.get("ai_min_confidence", 0.58), 0.58), 0.5, 0.95), 6)
p["ai_bias"] = round(_clamp(_as_float(p.get("ai_bias", -0.1), -0.1), -3.0, 3.0), 6)
p["ai_w_trend_align"] = round(_clamp(_as_float(p.get("ai_w_trend_align", 1.1), 1.1), 0.0, 3.0), 6)
p["ai_w_breakout_strength"] = round(_clamp(_as_float(p.get("ai_w_breakout_strength", 0.8), 0.8), 0.0, 3.0), 6)
p["ai_w_entity_strength"] = round(_clamp(_as_float(p.get("ai_w_entity_strength", 0.55), 0.55), 0.0, 3.0), 6)
p["ai_w_shadow_balance"] = round(_clamp(_as_float(p.get("ai_w_shadow_balance", 0.35), 0.35), 0.0, 3.0), 6)
p["ai_w_volatility_fit"] = round(_clamp(_as_float(p.get("ai_w_volatility_fit", 0.45), 0.45), 0.0, 3.0), 6)
# Keep relationships sane.
if p["trailing_activation_usd"] <= p["break_even_activation_usd"]:
@@ -127,6 +172,21 @@ def sample_random_params(rng: random.Random) -> Dict[str, Any]:
"reverse_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]),
"enable_shadow_reverse": rng.random() < 0.15,
"shadow_reverse_threshold_pct": rng.uniform(0.08, 0.5),
"enable_trend_filter": rng.random() < 0.90,
"trend_ema_fast": rng.randint(8, 34),
"trend_ema_slow": rng.randint(35, 120),
"trend_strength_threshold_pct": rng.uniform(0.0, 0.08),
"trend_slope_lookback": rng.randint(1, 8),
"trend_slope_threshold_pct": rng.uniform(0.0, 0.03),
"enable_ai_filter": rng.random() < 0.90,
"ai_min_confidence": rng.uniform(0.50, 0.85),
"ai_bias": rng.uniform(-1.8, 1.0),
"ai_w_trend_align": rng.uniform(0.1, 2.5),
"ai_w_breakout_strength": rng.uniform(0.1, 2.5),
"ai_w_entity_strength": rng.uniform(0.1, 2.5),
"ai_w_shadow_balance": rng.uniform(0.0, 2.2),
"ai_w_volatility_fit": rng.uniform(0.0, 2.2),
"dynamic_vol_target_pct": rng.uniform(0.10, 0.80),
"slippage_pct": rng.uniform(0.005, 0.05),
}
)
@@ -145,40 +205,86 @@ def evaluate_candidate(
valid_score_weight: float,
drawdown_guard: float,
stability_weight: float,
stress_slippage_multipliers: List[float],
stress_fee_multipliers: List[float],
) -> Tuple[float, Dict[str, float], Dict[str, float]]:
train_results = []
valid_results = []
window_scores = []
for train_data, valid_data in windows:
backtester = ConservativeBacktester(params=params, raw_fee_rate=raw_fee_rate, rebate_ratio=rebate_ratio)
scenario_train_scores: List[float] = []
scenario_valid_scores: List[float] = []
scenario_valid_drawdowns: List[float] = []
base_train_res = None
base_valid_res = None
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
for slip_mult in stress_slippage_multipliers:
for fee_mult in stress_fee_multipliers:
scenario_params = dict(params)
scenario_params["slippage_pct"] = max(
0.0,
_as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult),
)
scenario_params = normalize_params(scenario_params)
scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult))
backtester = ConservativeBacktester(
params=scenario_params,
raw_fee_rate=scenario_raw_fee_rate,
rebate_ratio=rebate_ratio,
)
train_results.append(train_res)
valid_results.append(valid_res)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
combined = valid_score_weight * valid_score + (1.0 - valid_score_weight) * train_score
scenario_train_scores.append(train_score)
scenario_valid_scores.append(valid_score)
scenario_valid_drawdowns.append(valid_res.max_drawdown)
# Hard guard against large validation drawdown.
if valid_res.max_drawdown > drawdown_guard:
combined -= (valid_res.max_drawdown - drawdown_guard) * 2.0
if base_train_res is None and abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None or base_valid_res is None:
fallback_backtester = ConservativeBacktester(
params=params,
raw_fee_rate=raw_fee_rate,
rebate_ratio=rebate_ratio,
)
base_train_res = fallback_backtester.run(train_data)
base_valid_res = fallback_backtester.run(valid_data)
train_results.append(base_train_res)
valid_results.append(base_valid_res)
train_score_mean = mean(scenario_train_scores) if scenario_train_scores else -1e9
valid_score_mean = mean(scenario_valid_scores) if scenario_valid_scores else -1e9
valid_score_min = min(scenario_valid_scores) if scenario_valid_scores else -1e9
robustness_penalty = max(0.0, valid_score_mean - valid_score_min) * 0.35
combined = valid_score_weight * valid_score_mean + (1.0 - valid_score_weight) * train_score_mean - robustness_penalty
valid_drawdown_max = max(scenario_valid_drawdowns) if scenario_valid_drawdowns else 0.0
if valid_drawdown_max > drawdown_guard:
combined -= (valid_drawdown_max - drawdown_guard) * 2.0
window_scores.append(combined)
@@ -220,6 +326,21 @@ def optimize_with_optuna(
"reverse_cooldown_seconds": trial.suggest_int("reverse_cooldown_seconds", 60, 900, step=60),
"enable_shadow_reverse": trial.suggest_categorical("enable_shadow_reverse", [False, True]),
"shadow_reverse_threshold_pct": trial.suggest_float("shadow_reverse_threshold_pct", 0.08, 0.5),
"enable_trend_filter": trial.suggest_categorical("enable_trend_filter", [False, True]),
"trend_ema_fast": trial.suggest_int("trend_ema_fast", 8, 34),
"trend_ema_slow": trial.suggest_int("trend_ema_slow", 35, 120),
"trend_strength_threshold_pct": trial.suggest_float("trend_strength_threshold_pct", 0.0, 0.08),
"trend_slope_lookback": trial.suggest_int("trend_slope_lookback", 1, 8),
"trend_slope_threshold_pct": trial.suggest_float("trend_slope_threshold_pct", 0.0, 0.03),
"enable_ai_filter": trial.suggest_categorical("enable_ai_filter", [False, True]),
"ai_min_confidence": trial.suggest_float("ai_min_confidence", 0.50, 0.85),
"ai_bias": trial.suggest_float("ai_bias", -1.8, 1.0),
"ai_w_trend_align": trial.suggest_float("ai_w_trend_align", 0.1, 2.5),
"ai_w_breakout_strength": trial.suggest_float("ai_w_breakout_strength", 0.1, 2.5),
"ai_w_entity_strength": trial.suggest_float("ai_w_entity_strength", 0.1, 2.5),
"ai_w_shadow_balance": trial.suggest_float("ai_w_shadow_balance", 0.0, 2.2),
"ai_w_volatility_fit": trial.suggest_float("ai_w_volatility_fit", 0.0, 2.2),
"dynamic_vol_target_pct": trial.suggest_float("dynamic_vol_target_pct", 0.10, 0.80),
"slippage_pct": trial.suggest_float("slippage_pct", 0.005, 0.05),
}
)
@@ -237,6 +358,8 @@ def optimize_with_optuna(
valid_score_weight=args.valid_score_weight,
drawdown_guard=args.drawdown_guard,
stability_weight=args.stability_weight,
stress_slippage_multipliers=args.stress_slippage_multipliers,
stress_fee_multipliers=args.stress_fee_multipliers,
)
trial.set_user_attr("train_agg", train_agg)
@@ -286,6 +409,8 @@ def optimize_with_random(
valid_score_weight=args.valid_score_weight,
drawdown_guard=args.drawdown_guard,
stability_weight=args.stability_weight,
stress_slippage_multipliers=args.stress_slippage_multipliers,
stress_fee_multipliers=args.stress_fee_multipliers,
)
if best is None or score > best["score"]:
@@ -400,6 +525,18 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--valid-score-weight", type=float, default=0.8, help="Weight of validation score in combined score.")
parser.add_argument("--drawdown-guard", type=float, default=10.0, help="Extra hard penalty beyond this validation drawdown.")
parser.add_argument("--stability-weight", type=float, default=0.3, help="Penalty for net-pnl variance across windows.")
parser.add_argument(
"--stress-slippage-multipliers",
type=str,
default="0.8,1.0,1.2",
help="Comma-separated slippage multipliers used in robust scoring.",
)
parser.add_argument(
"--stress-fee-multipliers",
type=str,
default="1.0,1.15",
help="Comma-separated raw-fee multipliers used in robust scoring.",
)
parser.add_argument(
"--output",
@@ -413,6 +550,8 @@ def parse_args() -> argparse.Namespace:
def main() -> None:
args = parse_args()
args.stress_slippage_multipliers = _parse_float_list(args.stress_slippage_multipliers, [0.8, 1.0, 1.2])
args.stress_fee_multipliers = _parse_float_list(args.stress_fee_multipliers, [1.0, 1.15])
klines, source_meta = load_klines(args)
if len(klines) < 200:
@@ -463,6 +602,8 @@ def main() -> None:
"window_step_days": args.window_step_days,
"windows": len(windows),
"seed": args.seed,
"stress_slippage_multipliers": args.stress_slippage_multipliers,
"stress_fee_multipliers": args.stress_fee_multipliers,
},
"fee_model": {
"raw_fee_rate": args.raw_fee_rate,

View File

@@ -1,5 +1,7 @@
import json
import math
import os
import subprocess
import sys
import threading
import time
@@ -71,6 +73,26 @@ class BitmartFuturesTransactionConservative:
self.open_breakout_buffer_pct = 0.03 # 开仓突破附加缓冲(百分比),减少假突破
self.enable_shadow_reverse = False # 保守模式默认关闭影线反手
self.shadow_reverse_threshold_pct = 0.15 # 影线反手阈值(百分比)
# 趋势过滤:仅限制新开仓,不影响持仓风控与反手
self.enable_trend_filter = True
self.trend_ema_fast = 21
self.trend_ema_slow = 55
self.trend_strength_threshold_pct = 0.02
self.trend_slope_lookback = 3
self.trend_slope_threshold_pct = 0.005
# 轻量 AI 门控:对候选开仓信号打分,低置信度跳过
self.enable_ai_filter = True
self.ai_min_confidence = 0.58
self.ai_bias = -0.1
self.ai_w_trend_align = 1.1
self.ai_w_breakout_strength = 0.8
self.ai_w_entity_strength = 0.55
self.ai_w_shadow_balance = 0.35
self.ai_w_volatility_fit = 0.45
self.last_trend_bias = "neutral"
self.last_trend_strength_pct = 0.0
self.last_trend_slope_pct = 0.0
self.recent_klines_cache = []
# 策略相关变量
self.prev_kline = None # 上一根K线
@@ -102,10 +124,36 @@ class BitmartFuturesTransactionConservative:
self.last_dynamic_risk_kline_id = None
self.base_risk_params = {}
# 启动前自动参数优化(按你的命令默认开启)
self.auto_optimize_before_trade = os.getenv(
"BITMART_AUTO_OPTIMIZE_BEFORE_TRADE", "1"
).strip().lower() not in {"0", "false", "off", "no"}
self.auto_optimize_require_success = os.getenv(
"BITMART_AUTO_OPTIMIZE_REQUIRE_SUCCESS", "1"
).strip().lower() not in {"0", "false", "off", "no"}
self.auto_optimize_days = int(os.getenv("BITMART_AUTO_OPT_DAYS", "90"))
self.auto_optimize_train_days = int(os.getenv("BITMART_AUTO_OPT_TRAIN_DAYS", "45"))
self.auto_optimize_valid_days = int(os.getenv("BITMART_AUTO_OPT_VALID_DAYS", "15"))
self.auto_optimize_window_step_days = int(os.getenv("BITMART_AUTO_OPT_WINDOW_STEP_DAYS", "7"))
self.auto_optimize_method = os.getenv("BITMART_AUTO_OPT_METHOD", "optuna")
self.auto_optimize_n_trials = int(os.getenv("BITMART_AUTO_OPT_N_TRIALS", "1200"))
self.auto_optimize_valid_score_weight = float(os.getenv("BITMART_AUTO_OPT_VALID_SCORE_WEIGHT", "0.9"))
self.auto_optimize_drawdown_guard = float(os.getenv("BITMART_AUTO_OPT_DRAWDOWN_GUARD", "8"))
self.auto_optimize_stability_weight = float(os.getenv("BITMART_AUTO_OPT_STABILITY_WEIGHT", "0.6"))
self.auto_optimize_stress_slippage = os.getenv("BITMART_AUTO_OPT_STRESS_SLIPPAGE", "0.8,1.0,1.2,1.4")
self.auto_optimize_stress_fee = os.getenv("BITMART_AUTO_OPT_STRESS_FEE", "1.0,1.15,1.3")
self.auto_optimize_data_file = os.getenv("BITMART_AUTO_OPT_DATA_FILE", "").strip()
self.auto_optimize_script_path = Path(__file__).with_name("optimize_params.py")
self.auto_optimized_this_run = False
# 启动时尝试读取动态参数(可由优化脚本自动生成)
self.load_runtime_params()
self.capture_base_risk_params()
def get_runtime_params_path(self):
params_path_env = os.getenv("BITMART_PARAMS_PATH")
return Path(params_path_env).expanduser() if params_path_env else Path(__file__).with_name("current_params.json")
def load_runtime_params(self):
"""
从 current_params.json 或环境变量 BITMART_PARAMS_PATH 指向的文件加载参数。
@@ -113,8 +161,7 @@ class BitmartFuturesTransactionConservative:
1) {"params": {...}}
2) {...} 直接是参数字典
"""
params_path_env = os.getenv("BITMART_PARAMS_PATH")
params_path = Path(params_path_env).expanduser() if params_path_env else Path(__file__).with_name("current_params.json")
params_path = self.get_runtime_params_path()
if not params_path.exists():
logger.info(f"未找到动态参数文件,使用内置保守参数: {params_path}")
return
@@ -131,6 +178,20 @@ class BitmartFuturesTransactionConservative:
"open_breakout_buffer_pct",
"enable_shadow_reverse",
"shadow_reverse_threshold_pct",
"enable_trend_filter",
"trend_ema_fast",
"trend_ema_slow",
"trend_strength_threshold_pct",
"trend_slope_lookback",
"trend_slope_threshold_pct",
"enable_ai_filter",
"ai_min_confidence",
"ai_bias",
"ai_w_trend_align",
"ai_w_breakout_strength",
"ai_w_entity_strength",
"ai_w_shadow_balance",
"ai_w_volatility_fit",
"open_cooldown_seconds",
"reverse_cooldown_seconds",
"reverse_min_move_pct",
@@ -154,7 +215,7 @@ class BitmartFuturesTransactionConservative:
continue
if key == "leverage":
setattr(self, key, str(value))
elif key == "dynamic_risk_enabled":
elif key in {"dynamic_risk_enabled", "enable_shadow_reverse", "enable_trend_filter", "enable_ai_filter"}:
setattr(self, key, self._to_bool(value))
else:
setattr(self, key, value)
@@ -167,11 +228,108 @@ class BitmartFuturesTransactionConservative:
f"BEAct={self.break_even_activation_usd}, BEFloor={self.break_even_floor_usd}, "
f"BreakoutBuf={self.open_breakout_buffer_pct}%, "
f"OpenCD={self.open_cooldown_seconds}s, ReverseCD={self.reverse_cooldown_seconds}s, "
f"ReverseMove={self.reverse_min_move_pct}%"
f"ReverseMove={self.reverse_min_move_pct}%, "
f"TrendFilter={self.enable_trend_filter}, AIFilter={self.enable_ai_filter}"
)
except Exception as e:
logger.error(f"加载动态参数文件失败: {e} | path={params_path}")
def run_auto_optimization_before_trade(self):
"""
启动前自动运行 optimize_params.py完成后重新加载参数。
"""
if not self.auto_optimize_before_trade:
logger.info("启动前自动优化已禁用BITMART_AUTO_OPTIMIZE_BEFORE_TRADE=0")
return True
if self.auto_optimized_this_run:
return True
script_path = self.auto_optimize_script_path
if not script_path.exists():
msg = f"未找到参数优化脚本: {script_path}"
if self.auto_optimize_require_success:
logger.error(msg)
return False
logger.warning(f"{msg},继续使用当前参数")
return True
output_path = self.get_runtime_params_path()
output_path.parent.mkdir(parents=True, exist_ok=True)
cmd = [
sys.executable,
str(script_path),
"--days",
str(self.auto_optimize_days),
"--symbol",
self.contract_symbol,
"--step",
"5",
"--train-days",
str(self.auto_optimize_train_days),
"--valid-days",
str(self.auto_optimize_valid_days),
"--window-step-days",
str(self.auto_optimize_window_step_days),
"--method",
str(self.auto_optimize_method),
"--n-trials",
str(self.auto_optimize_n_trials),
"--valid-score-weight",
str(self.auto_optimize_valid_score_weight),
"--drawdown-guard",
str(self.auto_optimize_drawdown_guard),
"--stability-weight",
str(self.auto_optimize_stability_weight),
"--stress-slippage-multipliers",
str(self.auto_optimize_stress_slippage),
"--stress-fee-multipliers",
str(self.auto_optimize_stress_fee),
"--output",
str(output_path),
]
if self.auto_optimize_data_file:
cmd.extend(["--data-file", self.auto_optimize_data_file])
logger.info("启动前自动参数优化开始(可能耗时较长)")
logger.info(f"优化输出参数文件: {output_path}")
logger.info("优化命令: " + " ".join(cmd))
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
if process.stdout is not None:
for line in process.stdout:
text = line.strip()
if text:
logger.info(f"[参数优化] {text}")
return_code = process.wait()
except Exception as e:
if self.auto_optimize_require_success:
logger.error(f"启动前自动优化异常: {e}")
return False
logger.warning(f"启动前自动优化异常: {e},继续使用当前参数")
return True
if return_code != 0:
if self.auto_optimize_require_success:
logger.error(f"启动前自动优化失败,退出码: {return_code}")
return False
logger.warning(f"启动前自动优化失败(退出码={return_code}),继续使用当前参数")
return True
self.auto_optimized_this_run = True
self.load_runtime_params()
self.capture_base_risk_params()
self.last_dynamic_risk_kline_id = None
logger.success("启动前自动优化完成,已加载最新参数")
return True
def capture_base_risk_params(self):
"""记录参数基线,动态风控在此基线之上按波动率缩放。"""
self.base_risk_params = {
@@ -295,6 +453,172 @@ class BitmartFuturesTransactionConservative:
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return False
def _clip(self, value, low, high):
return max(low, min(high, value))
def _ema_series(self, closes, period):
period = max(1, int(period))
if not closes:
return []
alpha = 2.0 / (period + 1.0)
ema_values = [float(closes[0])]
ema = float(closes[0])
for close in closes[1:]:
ema = ema + alpha * (float(close) - ema)
ema_values.append(ema)
return ema_values
def infer_trend_state(self):
"""
根据缓存K线计算趋势状态。
返回: (bias, strength_pct, slope_pct)
bias: bull / bear / neutral
"""
klines = self.recent_klines_cache or []
min_len = max(int(self.trend_ema_slow) + 2, int(self.trend_slope_lookback) + 3)
if len(klines) < min_len:
return "neutral", 0.0, 0.0
closes = [float(k["close"]) for k in klines if k.get("close") is not None]
if len(closes) < min_len:
return "neutral", 0.0, 0.0
fast_series = self._ema_series(closes, self.trend_ema_fast)
slow_series = self._ema_series(closes, self.trend_ema_slow)
if not fast_series or not slow_series:
return "neutral", 0.0, 0.0
fast = fast_series[-1]
slow = slow_series[-1]
if slow <= 0:
return "neutral", 0.0, 0.0
lookback = max(1, int(self.trend_slope_lookback))
ref_idx = max(0, len(slow_series) - 1 - lookback)
slow_ref = slow_series[ref_idx]
slope_pct = (slow - slow_ref) / slow_ref * 100 if slow_ref > 0 else 0.0
strength_pct = (fast - slow) / slow * 100
strength_th = max(0.0, float(self.trend_strength_threshold_pct))
slope_th = max(0.0, float(self.trend_slope_threshold_pct))
bias = "neutral"
if strength_pct >= strength_th and slope_pct >= slope_th:
bias = "bull"
elif strength_pct <= -strength_th and slope_pct <= -slope_th:
bias = "bear"
return bias, strength_pct, slope_pct
def get_recent_volatility_pct_from_cache(self, bars=20):
klines = self.recent_klines_cache or []
bars = max(8, int(bars))
if len(klines) < bars:
return None
ranges = []
for k in klines[-bars:]:
close = float(k["close"])
if close <= 0:
continue
ranges.append((float(k["high"]) - float(k["low"])) / close * 100)
if not ranges:
return None
return sum(ranges) / len(ranges)
def estimate_signal_confidence(
self,
side,
current_price,
entry_price,
prev_kline,
trend_bias,
trend_strength_pct,
):
prev_entity = self.calculate_entity(prev_kline)
prev_close = max(float(prev_kline.get("close") or 0.0), 0.000001)
entity_pct = prev_entity / prev_close * 100.0
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
if entry_price <= 0:
breakout_move_pct = 0.0
elif side == "long":
breakout_move_pct = max(0.0, (current_price - entry_price) / entry_price * 100.0)
else:
breakout_move_pct = max(0.0, (entry_price - current_price) / entry_price * 100.0)
if side == "long":
trend_align = 1.0 if trend_bias == "bull" else (-1.0 if trend_bias == "bear" else 0.0)
shadow_balance = lower_shadow_pct - upper_shadow_pct
else:
trend_align = 1.0 if trend_bias == "bear" else (-1.0 if trend_bias == "bull" else 0.0)
shadow_balance = upper_shadow_pct - lower_shadow_pct
trend_strength_scale = self._clip(
abs(trend_strength_pct) / max(float(self.trend_strength_threshold_pct), 0.01),
0.0,
2.5,
)
trend_component = trend_align * trend_strength_scale
breakout_strength = self._clip(breakout_move_pct / 0.25, 0.0, 2.5)
entity_strength = self._clip(entity_pct / 0.50, 0.0, 2.5)
shadow_component = self._clip(shadow_balance / 0.30, -2.5, 2.5)
vol_pct = self.get_recent_volatility_pct_from_cache(20)
target_vol = max(float(self.dynamic_vol_target_pct), 0.05)
if vol_pct is None:
vol_fit = 0.0
else:
vol_fit = 1.0 - abs(vol_pct - target_vol) / (target_vol * 2.0)
vol_fit = self._clip(vol_fit, -1.5, 1.5)
score = (
float(self.ai_bias)
+ float(self.ai_w_trend_align) * trend_component
+ float(self.ai_w_breakout_strength) * breakout_strength
+ float(self.ai_w_entity_strength) * entity_strength
+ float(self.ai_w_shadow_balance) * shadow_component
+ float(self.ai_w_volatility_fit) * vol_fit
)
score = self._clip(score, -60.0, 60.0)
confidence = 1.0 / (1.0 + math.exp(-score))
return confidence
def should_take_open_signal(
self,
side,
current_price,
entry_price,
prev_kline,
trend_bias,
trend_strength_pct,
):
if self.enable_trend_filter:
if side == "long" and trend_bias == "bear":
logger.info("趋势过滤拒绝做多:当前趋势偏空")
return False
if side == "short" and trend_bias == "bull":
logger.info("趋势过滤拒绝做空:当前趋势偏多")
return False
if self.enable_ai_filter:
confidence = self.estimate_signal_confidence(
side=side,
current_price=current_price,
entry_price=entry_price,
prev_kline=prev_kline,
trend_bias=trend_bias,
trend_strength_pct=trend_strength_pct,
)
logger.info(
f"AI信号打分: side={side}, confidence={confidence:.3f}, threshold={self.ai_min_confidence:.3f}"
)
if confidence < float(self.ai_min_confidence):
logger.info("AI门控拒绝开仓置信度不足")
return False
return True
def _extract_price_from_ws_payload(self, payload):
"""
尽量从 WS 消息中提取价格,兼容不同字段结构。
@@ -432,11 +756,15 @@ class BitmartFuturesTransactionConservative:
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
window_hours = max(
3,
int((max(int(self.dynamic_risk_window_bars), int(self.trend_ema_slow) + int(self.trend_slope_lookback) + 30) * 5) / 60) + 1,
)
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
start_time=end_time - 3600 * window_hours,
end_time=end_time
)[0]["data"]
@@ -451,6 +779,7 @@ class BitmartFuturesTransactionConservative:
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
self.recent_klines_cache = formatted
# 返回最近2根K线倒数第二根上一根和最后一根当前
if len(formatted) >= 2:
@@ -581,31 +910,49 @@ class BitmartFuturesTransactionConservative:
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def set_order_size(self, size):
"""设置下单数量,确保实盘与参数一致。"""
try:
qty = float(size if size is not None else self.default_order_size)
if qty <= 0:
qty = float(self.default_order_size)
qty_text = str(int(qty)) if float(qty).is_integer() else f"{qty:.4f}".rstrip("0").rstrip(".")
ele = self.page.ele('x://*[@id="size_0"]')
if not ele:
logger.warning("未找到数量输入框,无法设置下单数量")
return False
ele.input(vals=qty_text, clear=True)
return True
except Exception as e:
logger.warning(f"设置下单数量失败: {e}")
return False
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
size = self.default_order_size if size is None else size
if marketPriceLongOrder == -1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.set_order_size(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.set_order_size(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.set_order_size(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.set_order_size(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
@@ -716,14 +1063,38 @@ class BitmartFuturesTransactionConservative:
if skip_long_by_lower_third:
logger.info("上一根阳线+当前阴线(做空形态),不按下四分之一做多")
trend_bias, trend_strength_pct, trend_slope_pct = self.infer_trend_state()
self.last_trend_bias = trend_bias
self.last_trend_strength_pct = trend_strength_pct
self.last_trend_slope_pct = trend_slope_pct
logger.info(
f"趋势状态: bias={trend_bias}, strength={trend_strength_pct:.4f}%, slope={trend_slope_pct:.4f}%"
)
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_entry_price and not skip_long_by_lower_third:
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 保守开仓价 {long_entry_price:.2f}")
return ('long', long_entry_price)
if self.should_take_open_signal(
side="long",
current_price=current_price,
entry_price=long_entry_price,
prev_kline=prev_kline,
trend_bias=trend_bias,
trend_strength_pct=trend_strength_pct,
):
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 保守开仓价 {long_entry_price:.2f}")
return ('long', long_entry_price)
elif current_price <= short_entry_price and not skip_short_by_upper_third:
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 保守开仓价 {short_entry_price:.2f}")
return ('short', short_entry_price)
if self.should_take_open_signal(
side="short",
current_price=current_price,
entry_price=short_entry_price,
prev_kline=prev_kline,
trend_bias=trend_bias,
trend_strength_pct=trend_strength_pct,
):
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 保守开仓价 {short_entry_price:.2f}")
return ('short', short_entry_price)
# 持仓时检查反手信号
elif self.start == 1: # 持多仓
@@ -941,6 +1312,11 @@ class BitmartFuturesTransactionConservative:
logger.info("开始运行四分之一策略交易(保守模式)...")
# 启动前先自动优化参数,再进入交易
if not self.run_auto_optimization_before_trade():
logger.error("自动参数优化失败,策略停止启动")
return
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
@@ -969,8 +1345,7 @@ class BitmartFuturesTransactionConservative:
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True)
self.set_order_size(self.default_order_size)
page_start = False