日志展示优化

This commit is contained in:
ddrwode
2026-02-10 11:31:12 +08:00
parent d94de1bf64
commit 3aade1ab00
3 changed files with 1191 additions and 21 deletions

View File

@@ -0,0 +1,57 @@
{
"apply_live": false,
"selection_metric": "robust_score",
"robust_score": -42.24269838750459,
"consistency_score": 0.6069453441126542,
"overfit_gap": 40.47267326756849,
"train_risk_adj": -71.72112718512093,
"valid_risk_adj": -31.248453917552446,
"split": {
"train_ratio": 0.7,
"split_gap_bars": 20
},
"train_metrics": {
"score": -67.1828968000868,
"total_return_pct": -44.49174487491609,
"max_drawdown_pct": 45.382303850341415,
"win_rate_pct": 22.328767123287673,
"profit_factor": 0.4784084680856937,
"trades": 730,
"wins": 163
},
"valid_metrics": {
"score": -29.273526277306964,
"total_return_pct": -19.398888076079544,
"max_drawdown_pct": 19.749276402454836,
"win_rate_pct": 24.842767295597483,
"profit_factor": 0.5745025296635685,
"trades": 318,
"wins": 79
},
"full_metrics": {
"score": -82.88705309316238,
"total_return_pct": -55.23085815685731,
"max_drawdown_pct": 55.31238987261013,
"win_rate_pct": 23.15689981096408,
"profit_factor": 0.5119705777123974,
"trades": 1058,
"wins": 245
},
"params_for_trade_py": {
"min_prev_entity_pct": 0.1,
"breakout_buffer_pct": 0.03,
"shadow_threshold_pct": 0.15,
"stop_loss_pct": 0.35,
"take_profit_pct": 0.8,
"trailing_start_pct": 0.5,
"trailing_backoff_pct": 0.25,
"use_atr_dynamic_threshold": true,
"atr_length": 10,
"breakout_buffer_atr_mult": 0.08,
"shadow_threshold_atr_mult": 0.14,
"stop_loss_atr_mult": 0.5,
"take_profit_atr_mult": 0.8,
"trailing_start_atr_mult": 0.45,
"trailing_backoff_atr_mult": 0.2
}
}

View File

@@ -0,0 +1,757 @@
import argparse
import csv
import itertools
import json
from dataclasses import asdict, dataclass
from pathlib import Path
@dataclass
class Bar:
ts: int
open: float
high: float
low: float
close: float
@dataclass
class StrategyParams:
min_prev_entity_pct: float = 0.1
breakout_buffer_pct: float = 0.03
shadow_threshold_pct: float = 0.15
stop_loss_pct: float = 0.35
take_profit_pct: float = 0.8
trailing_start_pct: float = 0.5
trailing_backoff_pct: float = 0.25
use_atr_dynamic_threshold: bool = True
atr_length: int = 14
breakout_buffer_atr_mult: float = 0.12
shadow_threshold_atr_mult: float = 0.18
stop_loss_atr_mult: float = 0.45
take_profit_atr_mult: float = 0.95
trailing_start_atr_mult: float = 0.6
trailing_backoff_atr_mult: float = 0.3
@dataclass
class BacktestResult:
score: float
total_return_pct: float
max_drawdown_pct: float
win_rate_pct: float
profit_factor: float
trades: int
wins: int
params: StrategyParams
@dataclass
class RobustEvalResult:
robust_score: float
consistency_score: float
overfit_gap: float
train_risk_adj: float
valid_risk_adj: float
train: BacktestResult
valid: BacktestResult
full: BacktestResult
params: StrategyParams
def load_csv_bars(path: Path) -> list[Bar]:
bars: list[Bar] = []
with path.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
required = {"id", "open", "high", "low", "close"}
if not required.issubset(reader.fieldnames or set()):
raise ValueError(f"CSV missing columns: {required}")
for row in reader:
try:
bars.append(
Bar(
ts=int(float(row["id"])),
open=float(row["open"]),
high=float(row["high"]),
low=float(row["low"]),
close=float(row["close"]),
)
)
except (ValueError, TypeError):
continue
bars.sort(key=lambda x: x.ts)
return bars
def resample_to_minutes(bars: list[Bar], minutes: int) -> list[Bar]:
if not bars:
return []
bucket_sec = minutes * 60
grouped: list[Bar] = []
cur_bucket = None
cur_open = cur_high = cur_low = cur_close = None
for bar in bars:
b = bar.ts // bucket_sec
if cur_bucket is None or b != cur_bucket:
if cur_bucket is not None:
grouped.append(
Bar(
ts=cur_bucket * bucket_sec,
open=cur_open,
high=cur_high,
low=cur_low,
close=cur_close,
)
)
cur_bucket = b
cur_open = bar.open
cur_high = bar.high
cur_low = bar.low
cur_close = bar.close
else:
cur_high = max(cur_high, bar.high)
cur_low = min(cur_low, bar.low)
cur_close = bar.close
if cur_bucket is not None:
grouped.append(
Bar(
ts=cur_bucket * bucket_sec,
open=cur_open,
high=cur_high,
low=cur_low,
close=cur_close,
)
)
return grouped
def compute_atr_series(bars: list[Bar], length: int) -> list[float | None]:
atr: list[float | None] = [None] * len(bars)
if len(bars) < length + 1:
return atr
tr_list: list[float] = [0.0] * len(bars)
for i in range(1, len(bars)):
high = bars[i].high
low = bars[i].low
prev_close = bars[i - 1].close
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
tr_list[i] = tr
for i in range(length, len(bars)):
window = tr_list[i - length + 1:i + 1]
atr[i] = sum(window) / length
return atr
def resolve_dynamic_distance(
base_price: float,
fixed_pct: float,
atr_value: float | None,
atr_mult: float,
use_atr_dynamic_threshold: bool,
) -> float:
fixed_distance = base_price * fixed_pct / 100
if use_atr_dynamic_threshold and atr_value and atr_value > 0:
return max(fixed_distance, atr_value * atr_mult)
return fixed_distance
def kline_entity_abs(bar: Bar) -> float:
return abs(bar.close - bar.open)
def kline_entity_edges(bar: Bar) -> tuple[float, float]:
return max(bar.open, bar.close), min(bar.open, bar.close)
def upper_shadow_abs(bar: Bar) -> float:
return max(0.0, bar.high - max(bar.open, bar.close))
def lower_shadow_abs(bar: Bar) -> float:
return max(0.0, min(bar.open, bar.close) - bar.low)
def close_position(
equity: float,
side: int,
entry_price: float,
exit_price: float,
fee_rate: float,
) -> tuple[float, float]:
net_ret = side * (exit_price - entry_price) / entry_price - 2 * fee_rate
equity *= (1 + net_ret)
return equity, net_ret
def backtest_strategy(
bars: list[Bar],
params: StrategyParams,
fee_rate: float = 0.0004,
min_trades: int = 20,
) -> BacktestResult:
atr_series = compute_atr_series(bars, params.atr_length)
position = 0
entry_price = None
max_favorable_price = None
min_favorable_price = None
equity = 1.0
peak_equity = 1.0
max_drawdown = 0.0
trades = 0
wins = 0
gross_profit = 0.0
gross_loss = 0.0
for i in range(1, len(bars)):
current = bars[i]
current_price = current.close
atr_value = atr_series[i - 1]
prev_idx = None
for j in range(i - 1, -1, -1):
prev_bar = bars[j]
entity = kline_entity_abs(prev_bar)
entity_pct = (entity / prev_bar.open * 100) if prev_bar.open else 0
if entity_pct > params.min_prev_entity_pct:
prev_idx = j
break
if prev_idx is None:
continue
prev = bars[prev_idx]
prev_entity = kline_entity_abs(prev)
prev_entity_upper, prev_entity_lower = kline_entity_edges(prev)
prev_is_bullish_for_calc = prev.close > prev.open
prev_is_bearish_for_calc = prev.close < prev.open
current_open_above_prev_close = current.open > prev.close
current_open_below_prev_close = current.open < prev.close
use_current_open_as_base = (
(prev_is_bullish_for_calc and current_open_above_prev_close)
or (prev_is_bearish_for_calc and current_open_below_prev_close)
)
if use_current_open_as_base:
calc_lower = current.open
calc_upper = current.open
long_trigger = calc_lower + prev_entity / 3
short_trigger = calc_upper - prev_entity / 3
long_breakout = calc_upper + prev_entity / 3
short_breakout = calc_lower - prev_entity / 3
else:
long_trigger = prev_entity_lower + prev_entity / 3
short_trigger = prev_entity_upper - prev_entity / 3
long_breakout = prev_entity_upper + prev_entity / 3
short_breakout = prev_entity_lower - prev_entity / 3
breakout_buffer = max(
prev_entity * 0.1,
current_price * params.breakout_buffer_pct / 100,
(atr_value * params.breakout_buffer_atr_mult)
if (params.use_atr_dynamic_threshold and atr_value and atr_value > 0)
else 0,
)
long_breakout_effective = long_breakout + breakout_buffer
short_breakout_effective = short_breakout - breakout_buffer
prev_is_bearish = prev.close < prev.open
current_is_bullish = current.close > current.open
skip_short_by_upper_third = prev_is_bearish and current_is_bullish
prev_is_bullish = prev.close > prev.open
current_is_bearish = current.close < current.open
skip_long_by_lower_third = prev_is_bullish and current_is_bearish
if position != 0 and entry_price is not None:
sl_distance = resolve_dynamic_distance(
base_price=entry_price,
fixed_pct=params.stop_loss_pct,
atr_value=atr_value,
atr_mult=params.stop_loss_atr_mult,
use_atr_dynamic_threshold=params.use_atr_dynamic_threshold,
)
tp_distance = resolve_dynamic_distance(
base_price=entry_price,
fixed_pct=params.take_profit_pct,
atr_value=atr_value,
atr_mult=params.take_profit_atr_mult,
use_atr_dynamic_threshold=params.use_atr_dynamic_threshold,
)
trail_start_distance = resolve_dynamic_distance(
base_price=entry_price,
fixed_pct=params.trailing_start_pct,
atr_value=atr_value,
atr_mult=params.trailing_start_atr_mult,
use_atr_dynamic_threshold=params.use_atr_dynamic_threshold,
)
trail_backoff_distance = resolve_dynamic_distance(
base_price=entry_price,
fixed_pct=params.trailing_backoff_pct,
atr_value=atr_value,
atr_mult=params.trailing_backoff_atr_mult,
use_atr_dynamic_threshold=params.use_atr_dynamic_threshold,
)
should_close = False
if position == 1:
max_favorable_price = max(max_favorable_price or entry_price, current_price)
profit_distance = current_price - entry_price
loss_distance = entry_price - current_price
if loss_distance >= sl_distance:
should_close = True
elif profit_distance >= tp_distance:
should_close = True
elif (
profit_distance >= trail_start_distance
and (max_favorable_price - current_price) >= trail_backoff_distance
):
should_close = True
else:
min_favorable_price = min(min_favorable_price or entry_price, current_price)
profit_distance = entry_price - current_price
loss_distance = current_price - entry_price
if loss_distance >= sl_distance:
should_close = True
elif profit_distance >= tp_distance:
should_close = True
elif (
profit_distance >= trail_start_distance
and (current_price - min_favorable_price) >= trail_backoff_distance
):
should_close = True
if should_close:
equity, net_ret = close_position(
equity=equity,
side=position,
entry_price=entry_price,
exit_price=current_price,
fee_rate=fee_rate,
)
trades += 1
if net_ret > 0:
wins += 1
gross_profit += net_ret
else:
gross_loss += net_ret
position = 0
entry_price = None
max_favorable_price = None
min_favorable_price = None
peak_equity = max(peak_equity, equity)
max_drawdown = max(max_drawdown, (peak_equity - equity) / peak_equity)
continue
signal = None
if position == 0:
if current_price >= long_breakout_effective and not skip_long_by_lower_third:
signal = "long"
elif current_price <= short_breakout_effective and not skip_short_by_upper_third:
signal = "short"
elif position == 1:
if current_price <= short_trigger and not skip_short_by_upper_third:
signal = "reverse_short"
else:
upper_abs = upper_shadow_abs(prev)
upper_thr = resolve_dynamic_distance(
base_price=max(prev.open, prev.close),
fixed_pct=params.shadow_threshold_pct,
atr_value=atr_value,
atr_mult=params.shadow_threshold_atr_mult,
use_atr_dynamic_threshold=params.use_atr_dynamic_threshold,
)
if upper_abs > upper_thr and current_price <= prev_entity_lower:
signal = "reverse_short"
elif position == -1:
if current_price >= long_trigger and not skip_long_by_lower_third:
signal = "reverse_long"
else:
lower_abs = lower_shadow_abs(prev)
lower_thr = resolve_dynamic_distance(
base_price=min(prev.open, prev.close),
fixed_pct=params.shadow_threshold_pct,
atr_value=atr_value,
atr_mult=params.shadow_threshold_atr_mult,
use_atr_dynamic_threshold=params.use_atr_dynamic_threshold,
)
if lower_abs > lower_thr and current_price >= prev_entity_upper:
signal = "reverse_long"
if signal == "long":
position = 1
entry_price = current_price
max_favorable_price = current_price
min_favorable_price = None
elif signal == "short":
position = -1
entry_price = current_price
min_favorable_price = current_price
max_favorable_price = None
elif signal == "reverse_long":
equity, net_ret = close_position(
equity=equity,
side=position,
entry_price=entry_price,
exit_price=current_price,
fee_rate=fee_rate,
)
trades += 1
if net_ret > 0:
wins += 1
gross_profit += net_ret
else:
gross_loss += net_ret
position = 1
entry_price = current_price
max_favorable_price = current_price
min_favorable_price = None
peak_equity = max(peak_equity, equity)
max_drawdown = max(max_drawdown, (peak_equity - equity) / peak_equity)
elif signal == "reverse_short":
equity, net_ret = close_position(
equity=equity,
side=position,
entry_price=entry_price,
exit_price=current_price,
fee_rate=fee_rate,
)
trades += 1
if net_ret > 0:
wins += 1
gross_profit += net_ret
else:
gross_loss += net_ret
position = -1
entry_price = current_price
min_favorable_price = current_price
max_favorable_price = None
peak_equity = max(peak_equity, equity)
max_drawdown = max(max_drawdown, (peak_equity - equity) / peak_equity)
if position != 0 and entry_price is not None:
equity, net_ret = close_position(
equity=equity,
side=position,
entry_price=entry_price,
exit_price=bars[-1].close,
fee_rate=fee_rate,
)
trades += 1
if net_ret > 0:
wins += 1
gross_profit += net_ret
else:
gross_loss += net_ret
peak_equity = max(peak_equity, equity)
max_drawdown = max(max_drawdown, (peak_equity - equity) / peak_equity)
total_return_pct = (equity - 1) * 100
win_rate_pct = (wins / trades * 100) if trades else 0.0
loss_abs = abs(gross_loss)
profit_factor = (gross_profit / loss_abs) if loss_abs > 1e-12 else 999.0
score = total_return_pct - max_drawdown * 100 * 0.5
if trades < min_trades:
score -= (min_trades - trades) * 0.8
return BacktestResult(
score=score,
total_return_pct=total_return_pct,
max_drawdown_pct=max_drawdown * 100,
win_rate_pct=win_rate_pct,
profit_factor=profit_factor,
trades=trades,
wins=wins,
params=params,
)
def split_train_valid(
bars: list[Bar],
train_ratio: float = 0.7,
gap_bars: int = 0,
) -> tuple[list[Bar], list[Bar]]:
"""
时间序列分离:前段训练、后段验证。
gap_bars 用于在训练与验证之间留空,降低相邻样本泄漏。
"""
if not bars:
return [], []
train_ratio = min(max(train_ratio, 0.5), 0.9)
split_idx = int(len(bars) * train_ratio)
split_idx = max(1, min(split_idx, len(bars) - 1))
valid_start = min(len(bars), split_idx + max(0, gap_bars))
train_bars = bars[:split_idx]
valid_bars = bars[valid_start:]
return train_bars, valid_bars
def risk_adjusted_return(result: BacktestResult) -> float:
"""简单风险调整收益:收益 - 0.6 * 回撤。"""
return result.total_return_pct - 0.6 * result.max_drawdown_pct
def compute_robust_score(
train_result: BacktestResult,
valid_result: BacktestResult,
min_train_trades: int = 20,
min_valid_trades: int = 10,
) -> tuple[float, float, float, float, float]:
"""
稳健性分数(越高越稳健):
- 以验证集风险调整收益为主
- 奖励训练/验证一致性
- 惩罚过拟合(训练好、验证差)
- 惩罚验证成交次数过少
"""
train_ra = risk_adjusted_return(train_result)
valid_ra = risk_adjusted_return(valid_result)
overfit_gap = abs(train_ra - valid_ra)
denom = abs(train_ra) + abs(valid_ra) + 1e-9
consistency = max(0.0, 1.0 - overfit_gap / denom)
train_trade_penalty = max(0, min_train_trades - train_result.trades) * 0.4
valid_trade_penalty = max(0, min_valid_trades - valid_result.trades) * 1.2
pf_bonus = min(valid_result.profit_factor, 3.0) * 2.0
win_bonus = max(0.0, (valid_result.win_rate_pct - 50.0) * 0.08)
direction_penalty = 0.0
if train_ra > 0 and valid_ra <= 0:
direction_penalty += 12.0
if train_result.total_return_pct > 0 and valid_result.total_return_pct < 0:
direction_penalty += 8.0
robust_score = (
0.75 * valid_ra
+ 0.25 * train_ra
+ 10.0 * consistency
+ pf_bonus
+ win_bonus
- 0.2 * overfit_gap
- train_trade_penalty
- valid_trade_penalty
- direction_penalty
)
return robust_score, consistency, overfit_gap, train_ra, valid_ra
def evaluate_param_set(
train_bars: list[Bar],
valid_bars: list[Bar],
full_bars: list[Bar],
params: StrategyParams,
fee_rate: float,
min_train_trades: int,
min_valid_trades: int,
) -> RobustEvalResult:
train_result = backtest_strategy(
bars=train_bars,
params=params,
fee_rate=fee_rate,
min_trades=0,
)
valid_result = backtest_strategy(
bars=valid_bars,
params=params,
fee_rate=fee_rate,
min_trades=0,
)
full_result = backtest_strategy(
bars=full_bars,
params=params,
fee_rate=fee_rate,
min_trades=0,
)
robust_score, consistency, overfit_gap, train_ra, valid_ra = compute_robust_score(
train_result=train_result,
valid_result=valid_result,
min_train_trades=min_train_trades,
min_valid_trades=min_valid_trades,
)
return RobustEvalResult(
robust_score=robust_score,
consistency_score=consistency,
overfit_gap=overfit_gap,
train_risk_adj=train_ra,
valid_risk_adj=valid_ra,
train=train_result,
valid=valid_result,
full=full_result,
params=params,
)
def quick_grid() -> dict[str, list[float | int]]:
return {
"atr_length": [10, 14, 20],
"breakout_buffer_atr_mult": [0.08, 0.12],
"shadow_threshold_atr_mult": [0.14, 0.2],
"stop_loss_atr_mult": [0.35, 0.5],
"take_profit_atr_mult": [0.8, 1.0],
"trailing_start_atr_mult": [0.45, 0.65],
"trailing_backoff_atr_mult": [0.2, 0.3],
}
def full_grid() -> dict[str, list[float | int]]:
return {
"atr_length": [10, 14, 20],
"breakout_buffer_atr_mult": [0.08, 0.12, 0.16],
"shadow_threshold_atr_mult": [0.12, 0.18, 0.24],
"stop_loss_atr_mult": [0.35, 0.5, 0.65],
"take_profit_atr_mult": [0.8, 1.0, 1.2],
"trailing_start_atr_mult": [0.45, 0.65, 0.85],
"trailing_backoff_atr_mult": [0.2, 0.3, 0.4],
}
def iter_param_combos(base: StrategyParams, grid: dict[str, list[float | int]]):
keys = list(grid.keys())
values = [grid[k] for k in keys]
for combo in itertools.product(*values):
data = asdict(base)
for k, v in zip(keys, combo):
data[k] = v
yield StrategyParams(**data)
def result_metrics_dict(result: BacktestResult) -> dict[str, float | int]:
return {
"score": result.score,
"total_return_pct": result.total_return_pct,
"max_drawdown_pct": result.max_drawdown_pct,
"win_rate_pct": result.win_rate_pct,
"profit_factor": result.profit_factor,
"trades": result.trades,
"wins": result.wins,
}
def save_best_params(
best: RobustEvalResult,
out_path: Path,
train_ratio: float,
split_gap_bars: int,
):
payload = {
"apply_live": False,
"selection_metric": "robust_score",
"robust_score": best.robust_score,
"consistency_score": best.consistency_score,
"overfit_gap": best.overfit_gap,
"train_risk_adj": best.train_risk_adj,
"valid_risk_adj": best.valid_risk_adj,
"split": {
"train_ratio": train_ratio,
"split_gap_bars": split_gap_bars,
},
"train_metrics": result_metrics_dict(best.train),
"valid_metrics": result_metrics_dict(best.valid),
"full_metrics": result_metrics_dict(best.full),
"params_for_trade_py": asdict(best.params),
}
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def main():
parser = argparse.ArgumentParser(description="ATR dynamic parameter optimizer for bitmart strategy")
parser.add_argument(
"--csv",
default=str(Path(__file__).resolve().parent / "数据" / "kline_1.csv"),
help="path to csv with id/open/high/low/close",
)
parser.add_argument("--interval-min", type=int, default=5, help="resample interval minutes")
parser.add_argument("--mode", choices=["quick", "full"], default="quick", help="grid size")
parser.add_argument("--limit-bars", type=int, default=30000, help="use latest N bars after resample")
parser.add_argument("--train-ratio", type=float, default=0.7, help="train split ratio in time order")
parser.add_argument("--split-gap-bars", type=int, default=0, help="gap bars between train and valid")
parser.add_argument("--min-trades", type=int, default=20, help="deprecated: kept for compatibility")
parser.add_argument("--min-train-trades", type=int, default=20, help="min train trades for robustness")
parser.add_argument("--min-valid-trades", type=int, default=10, help="min valid trades for robustness")
parser.add_argument("--fee-rate", type=float, default=0.0004, help="round-trip half fee per side")
parser.add_argument("--top-n", type=int, default=10, help="print top N results")
parser.add_argument(
"--out-json",
default=str(Path(__file__).resolve().parent / "atr_best_params.json"),
help="best result json output path",
)
args = parser.parse_args()
csv_path = Path(args.csv).resolve()
if not csv_path.exists():
raise FileNotFoundError(f"CSV not found: {csv_path}")
bars = load_csv_bars(csv_path)
bars = resample_to_minutes(bars, args.interval_min)
if args.limit_bars and args.limit_bars > 0:
bars = bars[-args.limit_bars:]
if len(bars) < 400:
raise ValueError("not enough bars for optimization")
train_bars, valid_bars = split_train_valid(
bars=bars,
train_ratio=args.train_ratio,
gap_bars=args.split_gap_bars,
)
if len(train_bars) < 200 or len(valid_bars) < 100:
raise ValueError(
f"insufficient split bars: train={len(train_bars)} valid={len(valid_bars)}; "
"consider increasing --limit-bars or adjusting --train-ratio"
)
base = StrategyParams()
grid = quick_grid() if args.mode == "quick" else full_grid()
combos = list(iter_param_combos(base, grid))
print(
f"bars={len(bars)} train={len(train_bars)} valid={len(valid_bars)} "
f"| combos={len(combos)} | mode={args.mode} | train_ratio={args.train_ratio:.2f} gap={args.split_gap_bars}"
)
results: list[RobustEvalResult] = []
for idx, params in enumerate(combos, 1):
result = evaluate_param_set(
train_bars=train_bars,
valid_bars=valid_bars,
full_bars=bars,
params=params,
fee_rate=args.fee_rate,
min_train_trades=args.min_train_trades,
min_valid_trades=args.min_valid_trades,
)
results.append(result)
if idx % 50 == 0 or idx == len(combos):
print(f"progress {idx}/{len(combos)}")
results.sort(key=lambda x: x.robust_score, reverse=True)
top_n = max(1, args.top_n)
top = results[:top_n]
for i, r in enumerate(top, 1):
p = r.params
print(
f"[{i}] robust={r.robust_score:.2f} consistency={r.consistency_score:.3f} gap={r.overfit_gap:.2f} | "
f"train(ret={r.train.total_return_pct:.2f}% dd={r.train.max_drawdown_pct:.2f}% trades={r.train.trades}) | "
f"valid(ret={r.valid.total_return_pct:.2f}% dd={r.valid.max_drawdown_pct:.2f}% trades={r.valid.trades}) | "
f"full(ret={r.full.total_return_pct:.2f}% dd={r.full.max_drawdown_pct:.2f}% trades={r.full.trades}) | "
f"atr={p.atr_length} brk={p.breakout_buffer_atr_mult:.2f} shadow={p.shadow_threshold_atr_mult:.2f} "
f"sl={p.stop_loss_atr_mult:.2f} tp={p.take_profit_atr_mult:.2f} "
f"ts={p.trailing_start_atr_mult:.2f} tb={p.trailing_backoff_atr_mult:.2f}"
)
out_path = Path(args.out_json).resolve()
save_best_params(
top[0],
out_path,
train_ratio=args.train_ratio,
split_gap_bars=args.split_gap_bars,
)
print(f"saved best params -> {out_path}")
print("note: set apply_live=true in json when you want 交易.py to auto-load it")
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,6 @@
import time
import json
from pathlib import Path
from tqdm import tqdm
from loguru import logger
@@ -9,6 +11,24 @@ from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
PRECOMPUTED_TRADE_PARAMS = {
# 你已经算好参数时,把 enabled 改成 True并在下面填入你的值。
# 直接运行本文件即可生效,不需要任何命令行参数。
"enabled": False,
"params": {
# 示例(按需修改):
# "use_atr_dynamic_threshold": True,
# "atr_length": 14,
# "breakout_buffer_atr_mult": 0.12,
# "shadow_threshold_atr_mult": 0.18,
# "stop_loss_atr_mult": 0.45,
# "take_profit_atr_mult": 0.95,
# "trailing_start_atr_mult": 0.6,
# "trailing_backoff_atr_mult": 0.3,
}
}
class BitmartFuturesTransaction:
def __init__(self, bit_id):
@@ -53,6 +73,34 @@ class BitmartFuturesTransaction:
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
# 策略优化参数
self.min_prev_entity_pct = 0.1 # 上一根K线实体最小百分比%
self.breakout_buffer_pct = 0.03 # 突破缓冲百分比(%),过滤假突破
self.shadow_threshold_pct = 0.15 # 上下影线触发反手的最小阈值(%
# ATR 动态阈值(启用后:动态值与固定值取较大者)
self.use_atr_dynamic_threshold = True
self.atr_length = 14
self.current_atr = None
self.breakout_buffer_atr_mult = 0.12
self.shadow_threshold_atr_mult = 0.18
# 风控参数
self.stop_loss_pct = 0.35 # 固定止损(%
self.take_profit_pct = 0.8 # 固定止盈(%
self.trailing_start_pct = 0.5 # 浮盈达到该值后启动移动止盈(%
self.trailing_backoff_pct = 0.25 # 从最优价回撤达到该值触发移动止盈(%
self.stop_loss_atr_mult = 0.45
self.take_profit_atr_mult = 0.95
self.trailing_start_atr_mult = 0.6
self.trailing_backoff_atr_mult = 0.3
self.max_favorable_price = None # 多仓期间的最优价格
self.min_favorable_price = None # 空仓期间的最优价格
self.optimized_params_file = Path(__file__).resolve().parent / "atr_best_params.json"
self.apply_precomputed_params()
self.load_optimized_params()
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
@@ -131,6 +179,7 @@ class BitmartFuturesTransaction:
self.open_avg_price = None
self.current_amount = None
self.unrealized_pnl = None
self.reset_trailing_state()
return True
pos = positions[0]
self.start = 1 if pos['position_type'] == 1 else -1
@@ -236,6 +285,252 @@ class BitmartFuturesTransaction:
else:
logger.info(text)
def load_optimized_params(self):
"""从本地优化结果文件加载参数(可选)。"""
try:
if PRECOMPUTED_TRADE_PARAMS.get("enabled"):
logger.info("已启用 PRECOMPUTED_TRADE_PARAMS跳过 atr_best_params.json 加载")
return
if not self.optimized_params_file.exists():
return
data = json.loads(self.optimized_params_file.read_text(encoding="utf-8"))
if data.get("apply_live") is not True:
logger.info(f"检测到优化参数文件但 apply_live != true跳过加载: {self.optimized_params_file}")
return
params = data.get("params_for_trade_py", data)
allow_keys = {
"min_prev_entity_pct",
"breakout_buffer_pct",
"shadow_threshold_pct",
"stop_loss_pct",
"take_profit_pct",
"trailing_start_pct",
"trailing_backoff_pct",
"use_atr_dynamic_threshold",
"atr_length",
"breakout_buffer_atr_mult",
"shadow_threshold_atr_mult",
"stop_loss_atr_mult",
"take_profit_atr_mult",
"trailing_start_atr_mult",
"trailing_backoff_atr_mult",
}
applied = []
for key, val in params.items():
if key in allow_keys and hasattr(self, key):
setattr(self, key, val)
applied.append(key)
if applied:
logger.info(f"已加载优化参数文件: {self.optimized_params_file},字段数: {len(applied)}")
except Exception as e:
logger.warning(f"加载优化参数文件失败,将继续使用默认参数: {e}")
def apply_precomputed_params(self):
"""
直接应用本文件内预计算参数(不依赖命令)。
当 PRECOMPUTED_TRADE_PARAMS.enabled=True 时生效。
"""
try:
conf = PRECOMPUTED_TRADE_PARAMS or {}
if conf.get("enabled") is not True:
return
params = conf.get("params") or {}
if not isinstance(params, dict):
logger.warning("PRECOMPUTED_TRADE_PARAMS.params 不是字典,忽略")
return
allow_keys = {
"min_prev_entity_pct",
"breakout_buffer_pct",
"shadow_threshold_pct",
"stop_loss_pct",
"take_profit_pct",
"trailing_start_pct",
"trailing_backoff_pct",
"use_atr_dynamic_threshold",
"atr_length",
"breakout_buffer_atr_mult",
"shadow_threshold_atr_mult",
"stop_loss_atr_mult",
"take_profit_atr_mult",
"trailing_start_atr_mult",
"trailing_backoff_atr_mult",
}
applied = []
for key, val in params.items():
if key in allow_keys and hasattr(self, key):
setattr(self, key, val)
applied.append(key)
logger.info(f"已应用文件内预计算参数,字段数: {len(applied)}")
except Exception as e:
logger.warning(f"应用文件内预计算参数失败,将继续使用默认参数: {e}")
def compute_atr(self, klines, length=None):
"""
计算 ATRSMA 版本)
klines 需按时间升序,且每根含 high/low/close
"""
length = self.atr_length if length is None else length
if not klines or len(klines) < length + 1:
return None
tr_list = []
for i in range(1, len(klines)):
high = klines[i]['high']
low = klines[i]['low']
prev_close = klines[i - 1]['close']
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
tr_list.append(tr)
if len(tr_list) < length:
return None
return sum(tr_list[-length:]) / length
def resolve_dynamic_distance(self, base_price, fixed_pct, atr_value, atr_mult):
"""
把阈值统一换算为价格距离:
- 固定阈值: base_price * fixed_pct
- ATR 阈值: atr_value * atr_mult
启用 ATR 后取较大者,避免在高波动时阈值过窄。
"""
fixed_distance = base_price * fixed_pct / 100
if self.use_atr_dynamic_threshold and atr_value and atr_value > 0:
return max(fixed_distance, atr_value * atr_mult)
return fixed_distance
def get_upper_shadow_abs(self, kline):
"""上影线绝对长度"""
return max(0.0, kline['high'] - max(kline['open'], kline['close']))
def get_lower_shadow_abs(self, kline):
"""下影线绝对长度"""
return max(0.0, min(kline['open'], kline['close']) - kline['low'])
def get_shadow_threshold_distance(self, kline, atr_value, side):
"""
影线触发阈值(价格距离)
side: 'upper' / 'lower'
"""
if side == 'upper':
base_price = max(kline['open'], kline['close'])
else:
base_price = min(kline['open'], kline['close'])
return self.resolve_dynamic_distance(
base_price=base_price,
fixed_pct=self.shadow_threshold_pct,
atr_value=atr_value,
atr_mult=self.shadow_threshold_atr_mult
)
def reset_trailing_state(self):
"""重置移动止盈状态"""
self.max_favorable_price = None
self.min_favorable_price = None
def set_trailing_anchor_by_position(self):
"""根据当前持仓初始化移动止盈锚点"""
if self.start == 1 and self.open_avg_price:
self.max_favorable_price = self.open_avg_price
self.min_favorable_price = None
elif self.start == -1 and self.open_avg_price:
self.min_favorable_price = self.open_avg_price
self.max_favorable_price = None
else:
self.reset_trailing_state()
def check_risk_exit(self, current_price, atr_value=None):
"""
风控退出检查:
1) 固定止损
2) 固定止盈
3) 移动止盈
返回: (reason, detail) / None
"""
if self.start == 0 or not self.open_avg_price:
self.reset_trailing_state()
return None
sl_distance = self.resolve_dynamic_distance(
base_price=self.open_avg_price,
fixed_pct=self.stop_loss_pct,
atr_value=atr_value,
atr_mult=self.stop_loss_atr_mult
)
tp_distance = self.resolve_dynamic_distance(
base_price=self.open_avg_price,
fixed_pct=self.take_profit_pct,
atr_value=atr_value,
atr_mult=self.take_profit_atr_mult
)
trail_start_distance = self.resolve_dynamic_distance(
base_price=self.open_avg_price,
fixed_pct=self.trailing_start_pct,
atr_value=atr_value,
atr_mult=self.trailing_start_atr_mult
)
trail_backoff_distance = self.resolve_dynamic_distance(
base_price=self.open_avg_price,
fixed_pct=self.trailing_backoff_pct,
atr_value=atr_value,
atr_mult=self.trailing_backoff_atr_mult
)
# 多仓
if self.start == 1:
profit_distance = current_price - self.open_avg_price
loss_distance = self.open_avg_price - current_price
if self.max_favorable_price is None:
self.max_favorable_price = max(self.open_avg_price, current_price)
else:
self.max_favorable_price = max(self.max_favorable_price, current_price)
if loss_distance >= sl_distance:
return (
'stop_loss',
f"多仓回撤 {loss_distance:.3f} >= 止损阈值 {sl_distance:.3f}"
)
if profit_distance >= tp_distance:
return (
'take_profit',
f"多仓盈利 {profit_distance:.3f} >= 止盈阈值 {tp_distance:.3f}"
)
if profit_distance >= trail_start_distance and self.max_favorable_price:
retrace_distance = self.max_favorable_price - current_price
if retrace_distance >= trail_backoff_distance:
return (
'trailing_stop',
f"多仓回撤 {retrace_distance:.3f} >= 移动回撤阈值 {trail_backoff_distance:.3f}"
)
# 空仓
elif self.start == -1:
profit_distance = self.open_avg_price - current_price
loss_distance = current_price - self.open_avg_price
if self.min_favorable_price is None:
self.min_favorable_price = min(self.open_avg_price, current_price)
else:
self.min_favorable_price = min(self.min_favorable_price, current_price)
if loss_distance >= sl_distance:
return (
'stop_loss',
f"空仓回撤 {loss_distance:.3f} >= 止损阈值 {sl_distance:.3f}"
)
if profit_distance >= tp_distance:
return (
'take_profit',
f"空仓盈利 {profit_distance:.3f} >= 止盈阈值 {tp_distance:.3f}"
)
if profit_distance >= trail_start_distance and self.min_favorable_price:
retrace_distance = current_price - self.min_favorable_price
if retrace_distance >= trail_backoff_distance:
return (
'trailing_stop',
f"空仓回撤 {retrace_distance:.3f} >= 移动回撤阈值 {trail_backoff_distance:.3f}"
)
return None
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
@@ -265,7 +560,7 @@ class BitmartFuturesTransaction:
'lower': min(kline['open'], kline['close']) # 实体下边
}
def check_signal(self, current_price, prev_kline, current_kline):
def check_signal(self, current_price, prev_kline, current_kline, atr_value=None):
"""
检查交易信号
返回: ('long', trigger_price) / ('short', trigger_price) / None
@@ -302,6 +597,15 @@ class BitmartFuturesTransaction:
long_breakout = prev_entity_upper + prev_entity / 3 # 做多突破价 = 实体上边 + 实体/3
short_breakout = prev_entity_lower - prev_entity / 3 # 做空突破价 = 实体下边 - 实体/3
# 突破缓冲:实体比例 + 固定百分比 + ATR 动态阈值,三者取最大值
breakout_buffer = max(
prev_entity * 0.1,
current_price * self.breakout_buffer_pct / 100,
(atr_value * self.breakout_buffer_atr_mult) if (self.use_atr_dynamic_threshold and atr_value and atr_value > 0) else 0
)
long_breakout_effective = long_breakout + breakout_buffer
short_breakout_effective = short_breakout - breakout_buffer
# 上一根阴线 + 当前阳线做多形态不按上一根K线上三分之一做空
prev_is_bearish = prev_kline['close'] < prev_kline['open']
current_is_bullish = current_kline['close'] > current_kline['open']
@@ -320,6 +624,16 @@ class BitmartFuturesTransaction:
logger.info(f"上一根实体上边: {prev_entity_upper:.2f}, 下边: {prev_entity_lower:.2f}")
logger.info(f"做多触发价(下1/3): {long_trigger:.2f}, 做空触发价(上1/3): {short_trigger:.2f}")
logger.info(f"突破做多价(上1/3外): {long_breakout:.2f}, 突破做空价(下1/3外): {short_breakout:.2f}")
logger.info(
f"突破缓冲: {breakout_buffer:.4f} ({self.breakout_buffer_pct:.3f}%)"
f"有效做多突破价: {long_breakout_effective:.2f},有效做空突破价: {short_breakout_effective:.2f}"
)
if atr_value:
logger.info(
f"ATR({self.atr_length})={atr_value:.4f}, "
f"突破ATR倍数={self.breakout_buffer_atr_mult:.3f}, "
f"影线ATR倍数={self.shadow_threshold_atr_mult:.3f}"
)
if skip_short_by_upper_third:
logger.info("上一根阴线+当前阳线(做多形态),不按上三分之一做空")
if skip_long_by_lower_third:
@@ -327,12 +641,16 @@ class BitmartFuturesTransaction:
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_breakout and not skip_long_by_lower_third:
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 突破价(上1/3外) {long_breakout:.2f}")
return ('long', long_breakout)
elif current_price <= short_breakout and not skip_short_by_upper_third:
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 突破价(下1/3外) {short_breakout:.2f}")
return ('short', short_breakout)
if current_price >= long_breakout_effective and not skip_long_by_lower_third:
logger.info(
f"触发做多信号!价格 {current_price:.2f} >= 有效突破价(上1/3外+缓冲) {long_breakout_effective:.2f}"
)
return ('long', long_breakout_effective)
elif current_price <= short_breakout_effective and not skip_short_by_upper_third:
logger.info(
f"触发做空信号!价格 {current_price:.2f} <= 有效突破价(下1/3外-缓冲) {short_breakout_effective:.2f}"
)
return ('short', short_breakout_effective)
# 持仓时检查反手信号
elif self.start == 1: # 持多仓
@@ -341,10 +659,13 @@ class BitmartFuturesTransaction:
logger.info(f"持多反手做空!价格 {current_price:.2f} <= 触发价(上1/3) {short_trigger:.2f}")
return ('reverse_short', short_trigger)
# 反手条件2: 上一根K线上线涨幅>0.01%,当前跌到上一根实体下边
# 反手条件2: 上一根K线上线涨幅超过阈值,当前跌到上一根实体下边
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
if upper_shadow_pct > 0.01 and current_price <= prev_entity_lower:
logger.info(f"持多反手做空!上阴线涨幅 {upper_shadow_pct:.4f}% > 0.01%"
upper_shadow_abs = self.get_upper_shadow_abs(prev_kline)
upper_shadow_threshold = self.get_shadow_threshold_distance(prev_kline, atr_value=atr_value, side='upper')
if upper_shadow_abs > upper_shadow_threshold and current_price <= prev_entity_lower:
logger.info(
f"持多反手做空!上阴线涨幅 {upper_shadow_pct:.4f}%,上影线长度 {upper_shadow_abs:.4f} > 阈值 {upper_shadow_threshold:.4f}"
f"价格 {current_price:.2f} <= 实体下边 {prev_entity_lower:.2f}")
return ('reverse_short', prev_entity_lower)
@@ -354,10 +675,13 @@ class BitmartFuturesTransaction:
logger.info(f"持空反手做多!价格 {current_price:.2f} >= 触发价(下1/3) {long_trigger:.2f}")
return ('reverse_long', long_trigger)
# 反手条件2: 上一根K线下线跌幅>0.01%,当前涨到上一根实体上边
# 反手条件2: 上一根K线下线跌幅超过阈值,当前涨到上一根实体上边
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
if lower_shadow_pct > 0.01 and current_price >= prev_entity_upper:
logger.info(f"持空反手做多!下阴线跌幅 {lower_shadow_pct:.4f}% > 0.01%"
lower_shadow_abs = self.get_lower_shadow_abs(prev_kline)
lower_shadow_threshold = self.get_shadow_threshold_distance(prev_kline, atr_value=atr_value, side='lower')
if lower_shadow_abs > lower_shadow_threshold and current_price >= prev_entity_upper:
logger.info(
f"持空反手做多!下阴线跌幅 {lower_shadow_pct:.4f}%,下影线长度 {lower_shadow_abs:.4f} > 阈值 {lower_shadow_threshold:.4f}"
f"价格 {current_price:.2f} >= 实体上边 {prev_entity_upper:.2f}")
return ('reverse_long', prev_entity_upper)
@@ -446,6 +770,7 @@ class BitmartFuturesTransaction:
if self.verify_position_direction(1):
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
self.set_trailing_anchor_by_position()
logger.success("开多成功")
return True
else:
@@ -470,6 +795,7 @@ class BitmartFuturesTransaction:
if self.verify_position_direction(-1):
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
self.set_trailing_anchor_by_position()
logger.success("开空成功")
return True
else:
@@ -496,6 +822,7 @@ class BitmartFuturesTransaction:
if self.verify_position_direction(1):
logger.success("反手做多成功")
self.last_reverse_time = time.time()
self.set_trailing_anchor_by_position()
time.sleep(20)
return True
else:
@@ -521,6 +848,7 @@ class BitmartFuturesTransaction:
if self.verify_position_direction(-1):
logger.success("反手做空成功")
self.last_reverse_time = time.time()
self.set_trailing_anchor_by_position()
time.sleep(20)
return True
else:
@@ -570,16 +898,20 @@ class BitmartFuturesTransaction:
continue
current_kline = formatted[-1]
# ATR 使用已完成K线优先排除当前进行中的K线
atr_source = formatted[:-1] if len(formatted) > self.atr_length + 1 else formatted
atr_value = self.compute_atr(atr_source, self.atr_length)
self.current_atr = atr_value
prev_kline = None
for i in range(len(formatted) - 2, -1, -1):
k = formatted[i]
entity = abs(k['close'] - k['open'])
entity_pct = entity / k['open'] * 100 if k['open'] else 0
if entity_pct > 0.1:
if entity_pct > self.min_prev_entity_pct:
prev_kline = k
break
if prev_kline is None:
logger.info("没有实体>0.1%的上一根K线跳过信号检测")
logger.info(f"没有实体>{self.min_prev_entity_pct:.3f}%的上一根K线跳过信号检测")
time.sleep(0.1)
continue
@@ -603,28 +935,52 @@ class BitmartFuturesTransaction:
continue
logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)")
logger.debug(f"当前 ATR({self.atr_length}): {f'{atr_value:.4f}' if atr_value else 'N/A'}")
# 4. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# 4. 持仓中优先检查风控退出(止损/止盈/移动止盈)
risk_exit = self.check_risk_exit(current_price, atr_value=atr_value)
if risk_exit:
reason, detail = risk_exit
logger.warning(f"触发风控退出: {reason}{detail}")
self.平仓()
time.sleep(1)
if self.verify_no_position(max_retries=8, retry_interval=1):
self.last_open_time = time.time()
self.last_open_kline_id = current_kline_time
self.reset_trailing_state()
logger.success("风控平仓成功")
page_start = True
if self.page:
try:
self.page.close()
time.sleep(5)
except Exception:
pass
continue
logger.error("风控平仓后仍有持仓,等待下一轮重试")
continue
# 5. 反手过滤:冷却时间
# 5. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline, atr_value=atr_value)
# 6. 反手过滤:冷却时间
if signal and signal[0].startswith('reverse_'):
if not self.can_reverse(current_price, signal[1]):
signal = None
# 5.5 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却
# 6.5 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却
if signal and signal[0] in ('long', 'short'):
if not self.can_open(current_kline_time):
signal = None
else:
self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录
# 5.6 当前 K 线已反手过则本 K 线内不再操作仓位
# 6.6 当前 K 线已反手过则本 K 线内不再操作仓位
if signal and self.last_reverse_kline_id == current_kline_time:
logger.info(f"本 K 线({current_kline_time})已反手过,本 K 线内不再操作仓位")
signal = None
# 6. 有信号则执行交易
# 7. 有信号则执行交易
if signal:
trade_success = self.execute_trade(signal)
if trade_success: