优化阴线回调止盈

This commit is contained in:
ddrwode
2026-02-06 15:40:47 +08:00
parent 51458e2649
commit b979f8ff42
7 changed files with 752 additions and 4112 deletions

View File

@@ -1,186 +0,0 @@
# 保守模式参数优化
这个目录包含两部分:
- 实盘脚本:`四分之一五分钟反手条件充足_保守模式.py`
- 参数优化:`optimize_params.py` + `backtest_engine.py`
`current_params.json` 是优化结果文件,实盘脚本启动时会自动读取并覆盖默认参数。
## 1. 使用 CSV 做 30 天参数优化(推荐)
CSV 至少包含:
- `id`(秒级时间戳)
- `open`
- `high`
- `low`
- `close`
运行示例:
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--data-file "/Users/ddrwode/code/lm_code/bitmart/数据/your_5m_30days.csv" \
--days 30 \
--train-days 20 \
--valid-days 10 \
--n-trials 300
```
优化完成后会写入:
- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/current_params.json`
如果你想做更稳健的参数(推荐实盘前使用):
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--data-file "/Users/ddrwode/code/lm_code/bitmart/数据/your_5m_90days.csv" \
--days 90 \
--train-days 45 \
--valid-days 15 \
--window-step-days 7 \
--method optuna \
--n-trials 1200 \
--valid-score-weight 0.9 \
--drawdown-guard 8 \
--stability-weight 0.6 \
--stress-slippage-multipliers "0.8,1.0,1.2,1.4" \
--stress-fee-multipliers "1.0,1.15,1.3"
```
## 2. 不提供 CSV直接从 API 自动抓取 30 天数据并优化
最简单一条命令(会自动抓取并计算):
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--days 30 \
--step 5 \
--n-trials 300
```
脚本会按以下顺序找凭证:
- 命令行 `--api-key/--secret-key`
- 环境变量 `BITMART_API_KEY/BITMART_SECRET_KEY`
- 保守模式脚本里的 `self.api_key/self.secret_key`
抓取到的K线会自动保存为
- `/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/auto_ethusdt_5m_30d.csv`
你也可以显式指定保存位置:
```bash
BITMART_API_KEY="xxx" BITMART_SECRET_KEY="xxx" \
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/optimize_params.py" \
--days 30 \
--step 5 \
--n-trials 300 \
--save-data-file "/Users/ddrwode/code/lm_code/bitmart/数据/eth_5m_30days.csv"
```
## 3. 运行保守模式实盘脚本
```bash
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一五分钟反手条件充足_保守模式.py"
```
如果你想加载其他参数文件:
```bash
BITMART_PARAMS_PATH="/absolute/path/to/current_params.json" \
python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一五分钟反手条件充足_保守模式.py"
```
注意:
- 实盘下单数量现在会直接使用 `default_order_size`(不再固定写死 25与回测/优化口径一致。
- `current_params.json` 新增支持趋势过滤和 AI 门控参数(见第 7 节)。
### 启动时自动优化与加速(不减少计算数据)
数据量、试验数、窗口数、压力场景均与配置一致,加速仅通过**代码并行**实现:
- **并行试验**Optuna`BITMART_AUTO_OPT_N_JOBS=4` 或 优化脚本 `--n-jobs 4`,多组试验同时跑。
- **并行压力场景**:每个窗口内多组 slip×fee 同时回测。环境变量 `BITMART_AUTO_OPT_STRESS_WORKERS=4` 或 优化脚本 `--n-stress-workers 4`
- **跳过近期已优化**`BITMART_AUTO_OPT_SKIP_IF_FRESH_HOURS=24` 表示若 `current_params.json` 24 小时内已更新则跳过本次优化,直接加载。
单独运行优化脚本时:
- `--n-jobs 4`Optuna 并行 4 个试验。
- `--n-stress-workers 4`:每窗口 4 个进程并行跑压力场景(不减少 slip/fee 组数)。
## 4. 实时价格WebSocket
保守模式脚本已支持:
- WebSocket 实时价优先
- 自动重连
- 超时自动回退 API 价格
默认 WebSocket 订阅:
- `wss://openapi-ws-v2.bitmart.com/api?protocol=1.1`
- topic: `futures/bookticker:ETHUSDT`(实时推送,默认更快)
如果你本机没有 WebSocket 依赖,会自动回退 API。安装方式
```bash
pip3 install websocket-client
```
可选环境变量:
- `BITMART_WS_ENABLED=0`:禁用 WebSocket
- `BITMART_WS_URL=...`:自定义 WS 地址
- `BITMART_WS_TOPIC=...`:自定义订阅 topic
- `BITMART_WS_PRICE_TTL=2.0`:价格新鲜度阈值(秒)
## 5. 动态止盈止损
保守模式已支持按近期波动率自动调整:
- `take_profit_usd`
- `stop_loss_usd`
- `trailing_activation_usd`
- `trailing_distance_usd`
- `break_even_activation_usd`
- `break_even_floor_usd`
触发方式:
- 每进入一根新的 5 分钟 K 线,自动计算近 N 根K线平均振幅trimmed mean
- 用当前波动率和目标波动率比值计算缩放系数,再更新上述参数
可选环境变量:
- `BITMART_DYNAMIC_RISK_ENABLED=1`:开启/关闭动态风控
- `BITMART_DYNAMIC_RISK_WINDOW_BARS=36`波动率窗口5m K线根数
- `BITMART_DYNAMIC_VOL_TARGET_PCT=0.25`:目标波动率(%
- `BITMART_DYNAMIC_SCALE_MIN=0.7`:最小缩放倍数
- `BITMART_DYNAMIC_SCALE_MAX=1.8`:最大缩放倍数
这些参数也可写到 `current_params.json``params` 中。
## 6. 费用模型说明
优化器会按下面公式计入手续费返佣:
- `effective_fee_rate = raw_fee_rate * (1 - rebate_ratio)`
默认:
- `raw_fee_rate = 0.0006`
- `rebate_ratio = 0.90`
- `effective_fee_rate = 0.00006`
可通过命令行改:
- `--raw-fee-rate`
- `--rebate-ratio`
- `--stress-slippage-multipliers`
- `--stress-fee-multipliers`
## 7. 趋势过滤 + AI 门控(新)
实盘与回测都支持以下参数(可由优化器自动搜索):
- 趋势过滤:`enable_trend_filter``trend_ema_fast``trend_ema_slow``trend_strength_threshold_pct``trend_slope_lookback``trend_slope_threshold_pct`
- AI 门控:`enable_ai_filter``ai_min_confidence``ai_bias``ai_w_trend_align``ai_w_breakout_strength``ai_w_entity_strength``ai_w_shadow_balance``ai_w_volatility_fit`
说明:
- 趋势过滤只影响新开仓,不阻断风控平仓和反手。
- AI 门控是轻量打分器logistic 风格),用于 `trade/skip`,不是黑盒大模型。
## 8. 重要提示
- 回测撮合属于简化模型,不等于实盘撮合。
- 参数应周期性重训(例如每天或每周)。
- 若出现交易次数过低,适当降低 `open_breakout_buffer_pct` 或冷却时间。

View File

@@ -1,789 +0,0 @@
from __future__ import annotations
import csv
import math
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean
from typing import Any, Dict, List, Optional, Sequence, Tuple
DEFAULT_PARAMS: Dict[str, Any] = {
"leverage": "20",
"open_type": "cross",
"default_order_size": 10,
"take_profit_usd": 3.0,
"stop_loss_usd": -2.0,
"trailing_activation_usd": 1.2,
"trailing_distance_usd": 0.6,
"break_even_activation_usd": 1.0,
"break_even_floor_usd": 0.2,
"open_breakout_buffer_pct": 0.03,
"enable_shadow_reverse": False,
"shadow_reverse_threshold_pct": 0.15,
"enable_trend_filter": True,
"trend_ema_fast": 21,
"trend_ema_slow": 55,
"trend_strength_threshold_pct": 0.02,
"trend_slope_lookback": 3,
"trend_slope_threshold_pct": 0.005,
"enable_ai_filter": True,
"ai_min_confidence": 0.58,
"ai_bias": -0.1,
"ai_w_trend_align": 1.1,
"ai_w_breakout_strength": 0.8,
"ai_w_entity_strength": 0.55,
"ai_w_shadow_balance": 0.35,
"ai_w_volatility_fit": 0.45,
"dynamic_vol_target_pct": 0.25,
"open_cooldown_seconds": 300,
"reverse_cooldown_seconds": 300,
"reverse_min_move_pct": 0.15,
# Backtest-only calibration parameters (not used by live runtime script)
"order_notional_usd": 100.0,
"pnl_per_usd_move": 1.0,
"slippage_pct": 0.01,
}
def _as_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except Exception:
return default
def _as_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return False
def _parse_timestamp(value: Any) -> int:
text = str(value).strip()
if not text:
raise ValueError("empty timestamp")
if text.isdigit() or (text.startswith("-") and text[1:].isdigit()):
ts = int(text)
if ts > 10**12:
ts //= 1000
return ts
try:
ts = int(float(text))
if ts > 10**12:
ts //= 1000
return ts
except ValueError:
pass
dt_text = text.replace("Z", "+00:00")
dt = datetime.fromisoformat(dt_text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp())
def load_klines_from_csv(csv_path: str | Path, last_n_days: Optional[int] = 30) -> List[Dict[str, float]]:
"""
Load kline data from CSV. Supports these field names (case-insensitive):
- timestamp: id / timestamp / time / ts / datetime / date
- open: open / open_price
- high: high / high_price
- low: low / low_price
- close: close / close_price
- volume (optional): volume / vol
"""
path = Path(csv_path)
if not path.exists():
raise FileNotFoundError(f"CSV not found: {path}")
records: Dict[int, Dict[str, float]] = {}
with path.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
if not reader.fieldnames:
raise ValueError(f"CSV has no header: {path}")
field_map = {name.strip().lower(): name for name in reader.fieldnames if name}
def find_key(candidates: Sequence[str]) -> Optional[str]:
for candidate in candidates:
if candidate in field_map:
return field_map[candidate]
return None
ts_key = find_key(("id", "timestamp", "time", "ts", "datetime", "date"))
open_key = find_key(("open", "open_price"))
high_key = find_key(("high", "high_price"))
low_key = find_key(("low", "low_price"))
close_key = find_key(("close", "close_price"))
volume_key = find_key(("volume", "vol"))
if not ts_key or not open_key or not high_key or not low_key or not close_key:
raise ValueError(
"CSV is missing required columns. Need timestamp/open/high/low/close (or aliases)."
)
for row in reader:
try:
ts = _parse_timestamp(row.get(ts_key, ""))
kline = {
"id": ts,
"open": _as_float(row.get(open_key), 0.0),
"high": _as_float(row.get(high_key), 0.0),
"low": _as_float(row.get(low_key), 0.0),
"close": _as_float(row.get(close_key), 0.0),
}
if volume_key and row.get(volume_key) not in (None, ""):
kline["volume"] = _as_float(row.get(volume_key), 0.0)
records[ts] = kline
except Exception:
continue
klines = sorted(records.values(), key=lambda x: x["id"])
if last_n_days is not None and klines:
cutoff = klines[-1]["id"] - int(last_n_days * 24 * 3600)
klines = [k for k in klines if k["id"] >= cutoff]
return klines
def save_klines_to_csv(klines: Sequence[Dict[str, float]], output_path: str | Path) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "open", "high", "low", "close", "volume"])
for k in klines:
writer.writerow([
int(k["id"]),
k["open"],
k["high"],
k["low"],
k["close"],
k.get("volume", ""),
])
return path
def fetch_klines_from_api(
contract_api: Any,
contract_symbol: str = "ETHUSDT",
step: int = 5,
days: int = 30,
batch_hours: int = 12,
sleep_seconds: float = 0.05,
) -> List[Dict[str, float]]:
"""Fetch recent kline data from Bitmart APIContract-compatible client."""
end_ts = int(time.time())
start_ts = end_ts - days * 24 * 3600
cursor = start_ts
all_rows: Dict[int, Dict[str, float]] = {}
while cursor < end_ts:
batch_end = min(cursor + batch_hours * 3600, end_ts)
response = contract_api.get_kline(
contract_symbol=contract_symbol,
step=step,
start_time=cursor,
end_time=batch_end,
)[0]
if response.get("code") != 1000:
cursor = batch_end + 1
continue
for item in response.get("data", []):
try:
ts = int(item["timestamp"])
all_rows[ts] = {
"id": ts,
"open": float(item["open_price"]),
"high": float(item["high_price"]),
"low": float(item["low_price"]),
"close": float(item["close_price"]),
"volume": float(item.get("volume", 0) or 0),
}
except Exception:
continue
cursor = batch_end + 1
if sleep_seconds > 0:
time.sleep(sleep_seconds)
return sorted(all_rows.values(), key=lambda x: x["id"])
def split_rolling_windows(
klines: Sequence[Dict[str, float]],
train_days: int = 20,
valid_days: int = 10,
step_days: int = 5,
) -> List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]]:
"""Build rolling (train, valid) windows by timestamp ranges."""
if not klines:
return []
train_secs = train_days * 24 * 3600
valid_secs = valid_days * 24 * 3600
step_secs = max(1, step_days) * 24 * 3600
first_ts = int(klines[0]["id"])
last_ts = int(klines[-1]["id"])
required_span = train_secs + valid_secs
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]] = []
def slice_range(start_ts: int, end_ts: int) -> List[Dict[str, float]]:
return [k for k in klines if start_ts <= int(k["id"]) < end_ts]
cursor = first_ts
while cursor + required_span <= last_ts:
train_start = cursor
train_end = train_start + train_secs
valid_end = train_end + valid_secs
train_part = slice_range(train_start, train_end)
valid_part = slice_range(train_end, valid_end)
if len(train_part) >= 50 and len(valid_part) >= 30:
windows.append((train_part, valid_part))
cursor += step_secs
if not windows and (last_ts - first_ts) >= required_span:
valid_end = last_ts
valid_start = valid_end - valid_secs
train_start = valid_start - train_secs
train_part = slice_range(train_start, valid_start)
valid_part = slice_range(valid_start, valid_end)
if len(train_part) >= 50 and len(valid_part) >= 30:
windows.append((train_part, valid_part))
return windows
@dataclass
class BacktestResult:
net_pnl: float
gross_pnl: float
total_fees: float
trades: int
win_rate: float
max_drawdown: float
loss_days: int
start_ts: int
end_ts: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class ConservativeBacktester:
"""Backtester for the conservative quarter-breakout + reverse strategy."""
def __init__(
self,
params: Optional[Dict[str, Any]] = None,
raw_fee_rate: float = 0.0006,
rebate_ratio: float = 0.90,
) -> None:
self.params: Dict[str, Any] = dict(DEFAULT_PARAMS)
if params:
self.params.update(params)
self.raw_fee_rate = float(raw_fee_rate)
self.rebate_ratio = float(rebate_ratio)
self.effective_fee_rate = self.raw_fee_rate * (1.0 - self.rebate_ratio)
size_scale = max(_as_float(self.params.get("default_order_size", 10), 10.0) / 10.0, 0.05)
self.order_notional_usd = _as_float(self.params.get("order_notional_usd", 100.0), 100.0) * size_scale
self.pnl_per_usd_move = _as_float(self.params.get("pnl_per_usd_move", 1.0), 1.0) * size_scale
self.slippage_pct = _as_float(self.params.get("slippage_pct", 0.01), 0.01)
@staticmethod
def _entity_size(kline: Dict[str, float]) -> float:
return abs(kline["close"] - kline["open"])
@staticmethod
def _entity_edges(kline: Dict[str, float]) -> Tuple[float, float]:
upper = max(kline["open"], kline["close"])
lower = min(kline["open"], kline["close"])
return upper, lower
@staticmethod
def _upper_shadow_pct(kline: Dict[str, float]) -> float:
body_top = max(kline["open"], kline["close"])
if body_top <= 0:
return 0.0
return (kline["high"] - body_top) / body_top * 100.0
@staticmethod
def _lower_shadow_pct(kline: Dict[str, float]) -> float:
body_bottom = min(kline["open"], kline["close"])
if body_bottom <= 0:
return 0.0
return (body_bottom - kline["low"]) / body_bottom * 100.0
def _calc_fee(self) -> float:
return self.order_notional_usd * self.effective_fee_rate
def _apply_slippage(self, price: float, is_buy: bool) -> float:
if self.slippage_pct <= 0:
return price
shift = self.slippage_pct / 100.0
return price * (1.0 + shift) if is_buy else price * (1.0 - shift)
@staticmethod
def _clip(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _signal_confidence(
self,
side: str,
current_price: float,
entry_price: float,
prev_kline: Dict[str, float],
levels: Dict[str, float],
trend_bias: str,
trend_strength_pct: float,
target_volatility_pct: float,
) -> float:
prev_entity = self._entity_size(prev_kline)
prev_close = max(float(prev_kline.get("close", 0.0)), 0.000001)
entity_pct = prev_entity / prev_close * 100.0
upper_shadow_pct = levels["upper_shadow_pct"]
lower_shadow_pct = levels["lower_shadow_pct"]
if entry_price <= 0:
breakout_move_pct = 0.0
elif side == "long":
breakout_move_pct = max(0.0, (current_price - entry_price) / entry_price * 100.0)
else:
breakout_move_pct = max(0.0, (entry_price - current_price) / entry_price * 100.0)
if side == "long":
trend_align = 1.0 if trend_bias == "bull" else (-1.0 if trend_bias == "bear" else 0.0)
shadow_balance = lower_shadow_pct - upper_shadow_pct
else:
trend_align = 1.0 if trend_bias == "bear" else (-1.0 if trend_bias == "bull" else 0.0)
shadow_balance = upper_shadow_pct - lower_shadow_pct
trend_strength_threshold = max(_as_float(self.params.get("trend_strength_threshold_pct", 0.02), 0.02), 0.01)
trend_strength_scale = self._clip(abs(trend_strength_pct) / trend_strength_threshold, 0.0, 2.5)
trend_component = trend_align * trend_strength_scale
breakout_strength = self._clip(breakout_move_pct / 0.25, 0.0, 2.5)
entity_strength = self._clip(entity_pct / 0.50, 0.0, 2.5)
shadow_component = self._clip(shadow_balance / 0.30, -2.5, 2.5)
current_vol = 0.0
current_close = float(current_price)
if current_close > 0:
current_vol = abs(float(prev_kline["high"]) - float(prev_kline["low"])) / current_close * 100.0
target_vol = max(float(target_volatility_pct), 0.05)
vol_fit = 1.0 - abs(current_vol - target_vol) / (target_vol * 2.0)
vol_fit = self._clip(vol_fit, -1.5, 1.5)
score = (
_as_float(self.params.get("ai_bias", -0.1), -0.1)
+ _as_float(self.params.get("ai_w_trend_align", 1.1), 1.1) * trend_component
+ _as_float(self.params.get("ai_w_breakout_strength", 0.8), 0.8) * breakout_strength
+ _as_float(self.params.get("ai_w_entity_strength", 0.55), 0.55) * entity_strength
+ _as_float(self.params.get("ai_w_shadow_balance", 0.35), 0.35) * shadow_component
+ _as_float(self.params.get("ai_w_volatility_fit", 0.45), 0.45) * vol_fit
)
score = self._clip(score, -60.0, 60.0)
return 1.0 / (1.0 + math.exp(-score))
def _calc_levels(self, prev_kline: Dict[str, float], current_kline: Dict[str, float]) -> Optional[Dict[str, float]]:
prev_entity = self._entity_size(prev_kline)
if prev_entity < 0.1:
return None
prev_upper, prev_lower = self._entity_edges(prev_kline)
prev_bull_for_calc = prev_kline["close"] > prev_kline["open"]
prev_bear_for_calc = prev_kline["close"] < prev_kline["open"]
current_open_above_prev_close = current_kline["open"] > prev_kline["close"]
current_open_below_prev_close = current_kline["open"] < prev_kline["close"]
use_current_open_as_base = (
(prev_bull_for_calc and current_open_above_prev_close)
or (prev_bear_for_calc and current_open_below_prev_close)
)
if use_current_open_as_base:
base = current_kline["open"]
long_trigger = base + prev_entity / 4.0
short_trigger = base - prev_entity / 4.0
long_breakout = base + prev_entity / 4.0
short_breakout = base - prev_entity / 4.0
else:
long_trigger = prev_lower + prev_entity / 4.0
short_trigger = prev_upper - prev_entity / 4.0
long_breakout = prev_upper + prev_entity / 4.0
short_breakout = prev_lower - prev_entity / 4.0
open_buffer_pct = _as_float(self.params.get("open_breakout_buffer_pct", 0.03), 0.03)
long_entry_price = long_breakout * (1.0 + open_buffer_pct / 100.0)
short_entry_price = short_breakout * (1.0 - open_buffer_pct / 100.0)
prev_bearish = prev_kline["close"] < prev_kline["open"]
current_bullish = current_kline["close"] > current_kline["open"]
skip_short_by_upper_third = prev_bearish and current_bullish
prev_bullish = prev_kline["close"] > prev_kline["open"]
current_bearish = current_kline["close"] < current_kline["open"]
skip_long_by_lower_third = prev_bullish and current_bearish
return {
"prev_entity_upper": prev_upper,
"prev_entity_lower": prev_lower,
"long_trigger": long_trigger,
"short_trigger": short_trigger,
"long_entry_price": long_entry_price,
"short_entry_price": short_entry_price,
"skip_short_by_upper_third": float(skip_short_by_upper_third),
"skip_long_by_lower_third": float(skip_long_by_lower_third),
"upper_shadow_pct": self._upper_shadow_pct(prev_kline),
"lower_shadow_pct": self._lower_shadow_pct(prev_kline),
}
def run(self, klines: Sequence[Dict[str, float]]) -> BacktestResult:
if len(klines) < 3:
start_ts = int(klines[0]["id"]) if klines else 0
end_ts = int(klines[-1]["id"]) if klines else 0
return BacktestResult(
net_pnl=0.0,
gross_pnl=0.0,
total_fees=0.0,
trades=0,
win_rate=0.0,
max_drawdown=0.0,
loss_days=0,
start_ts=start_ts,
end_ts=end_ts,
)
open_cooldown = int(_as_float(self.params.get("open_cooldown_seconds", 300), 300))
reverse_cooldown = int(_as_float(self.params.get("reverse_cooldown_seconds", 300), 300))
reverse_min_move_pct = _as_float(self.params.get("reverse_min_move_pct", 0.15), 0.15)
take_profit = _as_float(self.params.get("take_profit_usd", 3.0), 3.0)
stop_loss = _as_float(self.params.get("stop_loss_usd", -2.0), -2.0)
trailing_activation = _as_float(self.params.get("trailing_activation_usd", 1.2), 1.2)
trailing_distance = _as_float(self.params.get("trailing_distance_usd", 0.6), 0.6)
be_activation = _as_float(self.params.get("break_even_activation_usd", 1.0), 1.0)
be_floor = _as_float(self.params.get("break_even_floor_usd", 0.2), 0.2)
enable_shadow_reverse = _as_bool(self.params.get("enable_shadow_reverse", False))
shadow_reverse_threshold = _as_float(self.params.get("shadow_reverse_threshold_pct", 0.15), 0.15)
enable_trend_filter = _as_bool(self.params.get("enable_trend_filter", True))
trend_ema_fast = int(max(_as_float(self.params.get("trend_ema_fast", 21), 21), 2))
trend_ema_slow = int(max(_as_float(self.params.get("trend_ema_slow", 55), 55), trend_ema_fast + 1))
trend_strength_threshold = _as_float(self.params.get("trend_strength_threshold_pct", 0.02), 0.02)
trend_slope_lookback = int(max(_as_float(self.params.get("trend_slope_lookback", 3), 3), 1))
trend_slope_threshold = _as_float(self.params.get("trend_slope_threshold_pct", 0.005), 0.005)
enable_ai_filter = _as_bool(self.params.get("enable_ai_filter", True))
ai_min_confidence = _as_float(self.params.get("ai_min_confidence", 0.58), 0.58)
target_volatility_pct = _as_float(self.params.get("dynamic_vol_target_pct", 0.25), 0.25)
position = 0 # 1 long, -1 short, 0 flat
entry_price = 0.0
entry_ts = 0
current_trade_open_fee = 0.0
last_open_ts: Optional[int] = None
last_open_kline_id: Optional[int] = None
last_reverse_ts: Optional[int] = None
max_unrealized_pnl_seen: Optional[float] = None
gross_pnl = 0.0
total_fees = 0.0
equity = 0.0
peak_equity = 0.0
max_drawdown = 0.0
trades = 0
wins = 0
daily_pnl: Dict[str, float] = {}
ema_fast = float(klines[0]["close"])
ema_slow = float(klines[0]["close"])
alpha_fast = 2.0 / (trend_ema_fast + 1.0)
alpha_slow = 2.0 / (trend_ema_slow + 1.0)
slow_history: List[float] = [ema_slow]
def mark_to_market(mark_price: float) -> float:
if position == 0:
return 0.0
return (mark_price - entry_price) * position * self.pnl_per_usd_move
def update_drawdown(mark_price: float) -> None:
nonlocal peak_equity, max_drawdown
mtm = mark_to_market(mark_price)
equity_mtm = equity + mtm
if equity_mtm > peak_equity:
peak_equity = equity_mtm
drawdown = peak_equity - equity_mtm
if drawdown > max_drawdown:
max_drawdown = drawdown
def can_open(current_kline_id: int) -> bool:
if last_open_kline_id is not None and current_kline_id == last_open_kline_id:
return False
if last_open_ts is not None and current_kline_id - last_open_ts < open_cooldown:
return False
return True
def can_reverse(current_price: float, trigger_price: float, now_ts: int) -> bool:
if last_reverse_ts is not None and now_ts - last_reverse_ts < reverse_cooldown:
return False
if trigger_price > 0:
move_pct = abs(current_price - trigger_price) / trigger_price * 100.0
if move_pct < reverse_min_move_pct:
return False
return True
def open_position(direction: int, raw_price: float, now_ts: int) -> None:
nonlocal position, entry_price, entry_ts
nonlocal last_open_ts, last_open_kline_id
nonlocal total_fees, equity, current_trade_open_fee
nonlocal max_unrealized_pnl_seen
is_buy = direction == 1
fill_price = self._apply_slippage(raw_price, is_buy=is_buy)
fee = self._calc_fee()
position = direction
entry_price = fill_price
entry_ts = now_ts
last_open_ts = now_ts
last_open_kline_id = now_ts
max_unrealized_pnl_seen = 0.0
total_fees += fee
equity -= fee
current_trade_open_fee = fee
def close_position(raw_price: float, now_ts: int) -> float:
nonlocal position, entry_price, entry_ts
nonlocal total_fees, equity, gross_pnl, trades, wins
nonlocal current_trade_open_fee, max_unrealized_pnl_seen
if position == 0:
return 0.0
is_buy = position == -1
fill_price = self._apply_slippage(raw_price, is_buy=is_buy)
realized = (fill_price - entry_price) * position * self.pnl_per_usd_move
close_fee = self._calc_fee()
trade_net = realized - close_fee - current_trade_open_fee
gross_pnl += realized
total_fees += close_fee
equity += realized - close_fee
day_key = datetime.utcfromtimestamp(now_ts).date().isoformat()
daily_pnl[day_key] = daily_pnl.get(day_key, 0.0) + trade_net
trades += 1
if trade_net > 0:
wins += 1
position = 0
entry_price = 0.0
entry_ts = 0
current_trade_open_fee = 0.0
max_unrealized_pnl_seen = None
return trade_net
for idx in range(1, len(klines)):
prev_kline = klines[idx - 1]
current_kline = klines[idx]
current_ts = int(current_kline["id"])
close_price = float(current_kline["close"])
ema_fast = ema_fast + alpha_fast * (close_price - ema_fast)
ema_slow = ema_slow + alpha_slow * (close_price - ema_slow)
slow_history.append(ema_slow)
max_slow_history = trend_slope_lookback + 5
if len(slow_history) > max_slow_history:
slow_history = slow_history[-max_slow_history:]
trend_strength_pct = 0.0
trend_slope_pct = 0.0
trend_bias = "neutral"
if ema_slow > 0:
trend_strength_pct = (ema_fast - ema_slow) / ema_slow * 100.0
if len(slow_history) > trend_slope_lookback:
ref_slow = slow_history[-1 - trend_slope_lookback]
if ref_slow > 0:
trend_slope_pct = (ema_slow - ref_slow) / ref_slow * 100.0
if trend_strength_pct >= trend_strength_threshold and trend_slope_pct >= trend_slope_threshold:
trend_bias = "bull"
elif trend_strength_pct <= -trend_strength_threshold and trend_slope_pct <= -trend_slope_threshold:
trend_bias = "bear"
levels = self._calc_levels(prev_kline, current_kline)
if not levels:
update_drawdown(current_kline["close"])
continue
skip_short = bool(levels["skip_short_by_upper_third"])
skip_long = bool(levels["skip_long_by_lower_third"])
# Risk controls come first, matching live behavior.
if position != 0:
pnl_close = mark_to_market(current_kline["close"])
if max_unrealized_pnl_seen is None:
max_unrealized_pnl_seen = pnl_close
else:
max_unrealized_pnl_seen = max(max_unrealized_pnl_seen, pnl_close)
should_close = False
if pnl_close <= stop_loss:
should_close = True
elif max_unrealized_pnl_seen >= be_activation and pnl_close <= be_floor:
should_close = True
elif max_unrealized_pnl_seen >= trailing_activation and pnl_close < max_unrealized_pnl_seen - trailing_distance:
should_close = True
elif pnl_close >= take_profit:
should_close = True
if should_close:
close_position(current_kline["close"], current_ts)
update_drawdown(current_kline["close"])
continue
signal_type: Optional[str] = None
signal_price = 0.0
if position == 0:
long_hit = current_kline["high"] >= levels["long_entry_price"] and not skip_long
short_hit = current_kline["low"] <= levels["short_entry_price"] and not skip_short
# Conservative tie-break: if both sides touched in one bar, skip.
if long_hit and not short_hit:
signal_type = "long"
signal_price = levels["long_entry_price"]
elif short_hit and not long_hit:
signal_type = "short"
signal_price = levels["short_entry_price"]
if signal_type and can_open(current_ts):
allow_signal = True
if enable_trend_filter:
if signal_type == "long" and trend_bias == "bear":
allow_signal = False
elif signal_type == "short" and trend_bias == "bull":
allow_signal = False
if allow_signal and enable_ai_filter:
confidence = self._signal_confidence(
side=signal_type,
current_price=close_price,
entry_price=signal_price,
prev_kline=prev_kline,
levels=levels,
trend_bias=trend_bias,
trend_strength_pct=trend_strength_pct,
target_volatility_pct=target_volatility_pct,
)
if confidence < ai_min_confidence:
allow_signal = False
if allow_signal:
open_position(1 if signal_type == "long" else -1, signal_price, current_ts)
elif position == 1:
reverse_hit = current_kline["low"] <= levels["short_trigger"] and not skip_short
shadow_hit = (
enable_shadow_reverse
and levels["upper_shadow_pct"] > shadow_reverse_threshold
and current_kline["low"] <= levels["prev_entity_lower"]
)
if reverse_hit or shadow_hit:
trigger = levels["short_trigger"] if reverse_hit else levels["prev_entity_lower"]
if can_reverse(current_kline["close"], trigger, current_ts):
close_position(trigger, current_ts)
open_position(-1, trigger, current_ts)
last_reverse_ts = current_ts
elif position == -1:
reverse_hit = current_kline["high"] >= levels["long_trigger"] and not skip_long
shadow_hit = (
enable_shadow_reverse
and levels["lower_shadow_pct"] > shadow_reverse_threshold
and current_kline["high"] >= levels["prev_entity_upper"]
)
if reverse_hit or shadow_hit:
trigger = levels["long_trigger"] if reverse_hit else levels["prev_entity_upper"]
if can_reverse(current_kline["close"], trigger, current_ts):
close_position(trigger, current_ts)
open_position(1, trigger, current_ts)
last_reverse_ts = current_ts
update_drawdown(current_kline["close"])
if position != 0:
last_bar = klines[-1]
close_position(last_bar["close"], int(last_bar["id"]))
update_drawdown(last_bar["close"])
win_rate = (wins / trades * 100.0) if trades > 0 else 0.0
loss_days = sum(1 for pnl in daily_pnl.values() if pnl < 0)
return BacktestResult(
net_pnl=equity,
gross_pnl=gross_pnl,
total_fees=total_fees,
trades=trades,
win_rate=win_rate,
max_drawdown=max_drawdown,
loss_days=loss_days,
start_ts=int(klines[0]["id"]),
end_ts=int(klines[-1]["id"]),
)
def score_result(
result: BacktestResult,
drawdown_weight: float = 1.4,
loss_day_weight: float = 0.8,
min_trades: int = 6,
undertrade_penalty: float = 0.5,
) -> float:
score = result.net_pnl - drawdown_weight * result.max_drawdown - loss_day_weight * result.loss_days
if result.trades < min_trades:
score -= (min_trades - result.trades) * undertrade_penalty
return score
def aggregate_results(results: Sequence[BacktestResult]) -> Dict[str, float]:
if not results:
return {
"net_pnl": 0.0,
"gross_pnl": 0.0,
"total_fees": 0.0,
"trades": 0.0,
"win_rate": 0.0,
"max_drawdown": 0.0,
"loss_days": 0.0,
}
return {
"net_pnl": mean([r.net_pnl for r in results]),
"gross_pnl": mean([r.gross_pnl for r in results]),
"total_fees": mean([r.total_fees for r in results]),
"trades": mean([r.trades for r in results]),
"win_rate": mean([r.win_rate for r in results]),
"max_drawdown": mean([r.max_drawdown for r in results]),
"loss_days": mean([r.loss_days for r in results]),
}

View File

@@ -1,41 +0,0 @@
{
"updated_at": "2026-02-06T00:00:00+00:00",
"source": {
"type": "manual_default"
},
"optimization": {
"method": "manual_default",
"n_trials": 0,
"windows": 0
},
"fee_model": {
"raw_fee_rate": 0.0006,
"rebate_ratio": 0.9,
"effective_fee_rate": 0.00006
},
"score": 0,
"params": {
"leverage": "20",
"take_profit_usd": 3.0,
"stop_loss_usd": -2.0,
"trailing_activation_usd": 1.2,
"trailing_distance_usd": 0.6,
"break_even_activation_usd": 1.0,
"break_even_floor_usd": 0.2,
"default_order_size": 10,
"open_breakout_buffer_pct": 0.03,
"enable_shadow_reverse": false,
"shadow_reverse_threshold_pct": 0.15,
"open_cooldown_seconds": 300,
"reverse_cooldown_seconds": 300,
"reverse_min_move_pct": 0.15,
"dynamic_risk_enabled": true,
"dynamic_risk_window_bars": 36,
"dynamic_vol_target_pct": 0.25,
"dynamic_scale_min": 0.7,
"dynamic_scale_max": 1.8,
"order_notional_usd": 100.0,
"pnl_per_usd_move": 1.0,
"slippage_pct": 0.01
}
}

View File

@@ -1,746 +0,0 @@
from __future__ import annotations
import argparse
import json
import os
import random
import re
import sys
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean, pstdev
from typing import Any, Dict, List, Optional, Tuple
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from backtest_engine import (
DEFAULT_PARAMS,
ConservativeBacktester,
aggregate_results,
fetch_klines_from_api,
load_klines_from_csv,
save_klines_to_csv,
score_result,
split_rolling_windows,
)
STRATEGY_FILENAME = "四分之一五分钟反手条件充足_保守模式.py"
def _as_float(value: Any, default: float) -> float:
try:
return float(value)
except Exception:
return default
def _as_bool(value: Any, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
return default
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _parse_float_list(text: str, default: List[float]) -> List[float]:
if not text:
return list(default)
values: List[float] = []
for part in text.split(","):
raw = part.strip()
if not raw:
continue
try:
values.append(float(raw))
except Exception:
continue
return values or list(default)
def normalize_params(params: Dict[str, Any]) -> Dict[str, Any]:
p = dict(params)
p["take_profit_usd"] = round(max(_as_float(p.get("take_profit_usd", 3.0), 3.0), 0.5), 4)
p["stop_loss_usd"] = round(min(_as_float(p.get("stop_loss_usd", -2.0), -2.0), -0.1), 4)
p["trailing_activation_usd"] = round(max(_as_float(p.get("trailing_activation_usd", 1.2), 1.2), 0.2), 4)
p["trailing_distance_usd"] = round(max(_as_float(p.get("trailing_distance_usd", 0.6), 0.6), 0.1), 4)
p["break_even_activation_usd"] = round(max(_as_float(p.get("break_even_activation_usd", 1.0), 1.0), 0.2), 4)
p["break_even_floor_usd"] = round(max(_as_float(p.get("break_even_floor_usd", 0.2), 0.2), 0.0), 4)
p["open_breakout_buffer_pct"] = round(max(_as_float(p.get("open_breakout_buffer_pct", 0.03), 0.03), 0.0), 4)
p["shadow_reverse_threshold_pct"] = round(max(_as_float(p.get("shadow_reverse_threshold_pct", 0.15), 0.15), 0.01), 4)
p["reverse_min_move_pct"] = round(max(_as_float(p.get("reverse_min_move_pct", 0.15), 0.15), 0.01), 4)
p["open_cooldown_seconds"] = int(max(_as_float(p.get("open_cooldown_seconds", 300), 300), 1))
p["reverse_cooldown_seconds"] = int(max(_as_float(p.get("reverse_cooldown_seconds", 300), 300), 1))
p["default_order_size"] = int(max(_as_float(p.get("default_order_size", 10), 10), 1))
p["order_notional_usd"] = round(max(_as_float(p.get("order_notional_usd", 100.0), 100.0), 5.0), 4)
p["pnl_per_usd_move"] = round(max(_as_float(p.get("pnl_per_usd_move", 1.0), 1.0), 0.01), 4)
p["slippage_pct"] = round(max(_as_float(p.get("slippage_pct", 0.01), 0.01), 0.0), 5)
p["dynamic_vol_target_pct"] = round(_clamp(_as_float(p.get("dynamic_vol_target_pct", 0.25), 0.25), 0.05, 1.5), 6)
p["enable_shadow_reverse"] = _as_bool(p.get("enable_shadow_reverse", False), False)
p["enable_trend_filter"] = _as_bool(p.get("enable_trend_filter", True), True)
p["trend_ema_fast"] = int(max(_as_float(p.get("trend_ema_fast", 21), 21), 2))
p["trend_ema_slow"] = int(max(_as_float(p.get("trend_ema_slow", 55), 55), p["trend_ema_fast"] + 1))
p["trend_strength_threshold_pct"] = round(_clamp(_as_float(p.get("trend_strength_threshold_pct", 0.02), 0.02), 0.0, 0.2), 6)
p["trend_slope_lookback"] = int(max(_as_float(p.get("trend_slope_lookback", 3), 3), 1))
p["trend_slope_threshold_pct"] = round(_clamp(_as_float(p.get("trend_slope_threshold_pct", 0.005), 0.005), 0.0, 0.1), 6)
p["enable_ai_filter"] = _as_bool(p.get("enable_ai_filter", True), True)
p["ai_min_confidence"] = round(_clamp(_as_float(p.get("ai_min_confidence", 0.58), 0.58), 0.5, 0.95), 6)
p["ai_bias"] = round(_clamp(_as_float(p.get("ai_bias", -0.1), -0.1), -3.0, 3.0), 6)
p["ai_w_trend_align"] = round(_clamp(_as_float(p.get("ai_w_trend_align", 1.1), 1.1), 0.0, 3.0), 6)
p["ai_w_breakout_strength"] = round(_clamp(_as_float(p.get("ai_w_breakout_strength", 0.8), 0.8), 0.0, 3.0), 6)
p["ai_w_entity_strength"] = round(_clamp(_as_float(p.get("ai_w_entity_strength", 0.55), 0.55), 0.0, 3.0), 6)
p["ai_w_shadow_balance"] = round(_clamp(_as_float(p.get("ai_w_shadow_balance", 0.35), 0.35), 0.0, 3.0), 6)
p["ai_w_volatility_fit"] = round(_clamp(_as_float(p.get("ai_w_volatility_fit", 0.45), 0.45), 0.0, 3.0), 6)
# Keep relationships sane.
if p["trailing_activation_usd"] <= p["break_even_activation_usd"]:
p["trailing_activation_usd"] = round(p["break_even_activation_usd"] + 0.2, 4)
if p["take_profit_usd"] <= p["trailing_activation_usd"]:
p["take_profit_usd"] = round(p["trailing_activation_usd"] + 0.4, 4)
max_floor = p["break_even_activation_usd"] * 0.9
if p["break_even_floor_usd"] > max_floor:
p["break_even_floor_usd"] = round(max_floor, 4)
if p["trailing_distance_usd"] >= p["trailing_activation_usd"]:
p["trailing_distance_usd"] = round(max(0.1, p["trailing_activation_usd"] * 0.6), 4)
return p
def load_credentials_from_strategy_file(strategy_path: Path) -> Dict[str, str]:
"""
从保守模式策略文件里提取 self.api_key/self.secret_key/self.memo作为自动抓取兜底。
"""
if not strategy_path.exists():
return {}
try:
content = strategy_path.read_text(encoding="utf-8")
except Exception:
return {}
def extract(name: str) -> str:
pattern = rf'self\.{name}\s*=\s*["\']([^"\']+)["\']'
m = re.search(pattern, content)
return m.group(1).strip() if m else ""
api_key = extract("api_key")
secret_key = extract("secret_key")
memo = extract("memo")
result: Dict[str, str] = {}
if api_key:
result["api_key"] = api_key
if secret_key:
result["secret_key"] = secret_key
if memo:
result["memo"] = memo
return result
def _run_one_stress_scenario(
task: Tuple[
Dict[str, Any],
float,
float,
List[Dict[str, float]],
List[Dict[str, float]],
float,
float,
float,
float,
int,
float,
]
) -> Tuple[float, float, float, Any, Any]:
"""
单压力场景回测(供多进程调用,不减少计算量)。
返回: (train_score, valid_score, valid_drawdown, base_train_res, base_valid_res)
base_* 仅当 slip≈1 且 fee≈1 时非 None。
"""
(
params,
slip_mult,
fee_mult,
train_data,
valid_data,
raw_fee_rate,
rebate_ratio,
drawdown_weight,
loss_day_weight,
min_trades,
undertrade_penalty,
) = task
scenario_params = dict(params)
scenario_params["slippage_pct"] = max(
0.0,
_as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult),
)
scenario_params = normalize_params(scenario_params)
scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult))
backtester = ConservativeBacktester(
params=scenario_params,
raw_fee_rate=scenario_raw_fee_rate,
rebate_ratio=rebate_ratio,
)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
is_base = abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9
return (
train_score,
valid_score,
valid_res.max_drawdown,
train_res if is_base else None,
valid_res if is_base else None,
)
def sample_random_params(rng: random.Random) -> Dict[str, Any]:
params = dict(DEFAULT_PARAMS)
params.update(
{
"take_profit_usd": rng.uniform(1.2, 6.0),
"stop_loss_usd": -rng.uniform(0.8, 4.0),
"trailing_activation_usd": rng.uniform(0.5, 3.0),
"trailing_distance_usd": rng.uniform(0.2, 1.8),
"break_even_activation_usd": rng.uniform(0.3, 2.2),
"break_even_floor_usd": rng.uniform(0.0, 1.0),
"open_breakout_buffer_pct": rng.uniform(0.0, 0.25),
"reverse_min_move_pct": rng.uniform(0.05, 0.5),
"open_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]),
"reverse_cooldown_seconds": rng.choice([60, 120, 180, 240, 300, 360, 420, 480, 600, 900]),
"enable_shadow_reverse": rng.random() < 0.15,
"shadow_reverse_threshold_pct": rng.uniform(0.08, 0.5),
"enable_trend_filter": rng.random() < 0.90,
"trend_ema_fast": rng.randint(8, 34),
"trend_ema_slow": rng.randint(35, 120),
"trend_strength_threshold_pct": rng.uniform(0.0, 0.08),
"trend_slope_lookback": rng.randint(1, 8),
"trend_slope_threshold_pct": rng.uniform(0.0, 0.03),
"enable_ai_filter": rng.random() < 0.90,
"ai_min_confidence": rng.uniform(0.50, 0.85),
"ai_bias": rng.uniform(-1.8, 1.0),
"ai_w_trend_align": rng.uniform(0.1, 2.5),
"ai_w_breakout_strength": rng.uniform(0.1, 2.5),
"ai_w_entity_strength": rng.uniform(0.1, 2.5),
"ai_w_shadow_balance": rng.uniform(0.0, 2.2),
"ai_w_volatility_fit": rng.uniform(0.0, 2.2),
"dynamic_vol_target_pct": rng.uniform(0.10, 0.80),
"slippage_pct": rng.uniform(0.005, 0.05),
}
)
return normalize_params(params)
def evaluate_candidate(
params: Dict[str, Any],
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]],
raw_fee_rate: float,
rebate_ratio: float,
drawdown_weight: float,
loss_day_weight: float,
min_trades: int,
undertrade_penalty: float,
valid_score_weight: float,
drawdown_guard: float,
stability_weight: float,
stress_slippage_multipliers: List[float],
stress_fee_multipliers: List[float],
n_stress_workers: int = 0,
) -> Tuple[float, Dict[str, float], Dict[str, float]]:
train_results = []
valid_results = []
window_scores = []
num_stress = len(stress_slippage_multipliers) * len(stress_fee_multipliers)
use_parallel = n_stress_workers > 0 and num_stress > 1
for train_data, valid_data in windows:
scenario_train_scores: List[float] = []
scenario_valid_scores: List[float] = []
scenario_valid_drawdowns: List[float] = []
base_train_res = None
base_valid_res = None
if use_parallel:
tasks = [
(
params,
slip_mult,
fee_mult,
train_data,
valid_data,
raw_fee_rate,
rebate_ratio,
drawdown_weight,
loss_day_weight,
min_trades,
undertrade_penalty,
)
for slip_mult in stress_slippage_multipliers
for fee_mult in stress_fee_multipliers
]
max_workers = min(n_stress_workers, len(tasks))
with ProcessPoolExecutor(max_workers=max_workers) as executor:
for res in executor.map(_run_one_stress_scenario, tasks):
train_score, valid_score, valid_dd, tr, vr = res
scenario_train_scores.append(train_score)
scenario_valid_scores.append(valid_score)
scenario_valid_drawdowns.append(valid_dd)
if tr is not None and vr is not None:
base_train_res, base_valid_res = tr, vr
else:
for slip_mult in stress_slippage_multipliers:
for fee_mult in stress_fee_multipliers:
scenario_params = dict(params)
scenario_params["slippage_pct"] = max(
0.0,
_as_float(params.get("slippage_pct", 0.01), 0.01) * float(slip_mult),
)
scenario_params = normalize_params(scenario_params)
scenario_raw_fee_rate = max(0.0, raw_fee_rate * float(fee_mult))
backtester = ConservativeBacktester(
params=scenario_params,
raw_fee_rate=scenario_raw_fee_rate,
rebate_ratio=rebate_ratio,
)
train_res = backtester.run(train_data)
valid_res = backtester.run(valid_data)
train_score = score_result(
train_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
valid_score = score_result(
valid_res,
drawdown_weight=drawdown_weight,
loss_day_weight=loss_day_weight,
min_trades=min_trades,
undertrade_penalty=undertrade_penalty,
)
scenario_train_scores.append(train_score)
scenario_valid_scores.append(valid_score)
scenario_valid_drawdowns.append(valid_res.max_drawdown)
if base_train_res is None and abs(slip_mult - 1.0) < 1e-9 and abs(fee_mult - 1.0) < 1e-9:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None:
base_train_res = train_res
base_valid_res = valid_res
if base_train_res is None or base_valid_res is None:
fallback_backtester = ConservativeBacktester(
params=params,
raw_fee_rate=raw_fee_rate,
rebate_ratio=rebate_ratio,
)
base_train_res = fallback_backtester.run(train_data)
base_valid_res = fallback_backtester.run(valid_data)
train_results.append(base_train_res)
valid_results.append(base_valid_res)
train_score_mean = mean(scenario_train_scores) if scenario_train_scores else -1e9
valid_score_mean = mean(scenario_valid_scores) if scenario_valid_scores else -1e9
valid_score_min = min(scenario_valid_scores) if scenario_valid_scores else -1e9
robustness_penalty = max(0.0, valid_score_mean - valid_score_min) * 0.35
combined = valid_score_weight * valid_score_mean + (1.0 - valid_score_weight) * train_score_mean - robustness_penalty
valid_drawdown_max = max(scenario_valid_drawdowns) if scenario_valid_drawdowns else 0.0
if valid_drawdown_max > drawdown_guard:
combined -= (valid_drawdown_max - drawdown_guard) * 2.0
window_scores.append(combined)
mean_score = mean(window_scores) if window_scores else -1e9
if len(valid_results) > 1:
stability_penalty = pstdev([r.net_pnl for r in valid_results]) * stability_weight
mean_score -= stability_penalty
train_agg = aggregate_results(train_results)
valid_agg = aggregate_results(valid_results)
return mean_score, train_agg, valid_agg
def optimize_with_optuna(
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]],
args: argparse.Namespace,
) -> Dict[str, Any]:
import optuna # type: ignore
sampler = optuna.samplers.TPESampler(seed=args.seed)
study = optuna.create_study(direction="maximize", sampler=sampler)
best: Dict[str, Any] = {}
def objective(trial: Any) -> float:
params = dict(DEFAULT_PARAMS)
params.update(
{
"take_profit_usd": trial.suggest_float("take_profit_usd", 1.2, 6.0),
"stop_loss_usd": -trial.suggest_float("stop_loss_abs", 0.8, 4.0),
"trailing_activation_usd": trial.suggest_float("trailing_activation_usd", 0.5, 3.0),
"trailing_distance_usd": trial.suggest_float("trailing_distance_usd", 0.2, 1.8),
"break_even_activation_usd": trial.suggest_float("break_even_activation_usd", 0.3, 2.2),
"break_even_floor_usd": trial.suggest_float("break_even_floor_usd", 0.0, 1.0),
"open_breakout_buffer_pct": trial.suggest_float("open_breakout_buffer_pct", 0.0, 0.25),
"reverse_min_move_pct": trial.suggest_float("reverse_min_move_pct", 0.05, 0.5),
"open_cooldown_seconds": trial.suggest_int("open_cooldown_seconds", 60, 900, step=60),
"reverse_cooldown_seconds": trial.suggest_int("reverse_cooldown_seconds", 60, 900, step=60),
"enable_shadow_reverse": trial.suggest_categorical("enable_shadow_reverse", [False, True]),
"shadow_reverse_threshold_pct": trial.suggest_float("shadow_reverse_threshold_pct", 0.08, 0.5),
"enable_trend_filter": trial.suggest_categorical("enable_trend_filter", [False, True]),
"trend_ema_fast": trial.suggest_int("trend_ema_fast", 8, 34),
"trend_ema_slow": trial.suggest_int("trend_ema_slow", 35, 120),
"trend_strength_threshold_pct": trial.suggest_float("trend_strength_threshold_pct", 0.0, 0.08),
"trend_slope_lookback": trial.suggest_int("trend_slope_lookback", 1, 8),
"trend_slope_threshold_pct": trial.suggest_float("trend_slope_threshold_pct", 0.0, 0.03),
"enable_ai_filter": trial.suggest_categorical("enable_ai_filter", [False, True]),
"ai_min_confidence": trial.suggest_float("ai_min_confidence", 0.50, 0.85),
"ai_bias": trial.suggest_float("ai_bias", -1.8, 1.0),
"ai_w_trend_align": trial.suggest_float("ai_w_trend_align", 0.1, 2.5),
"ai_w_breakout_strength": trial.suggest_float("ai_w_breakout_strength", 0.1, 2.5),
"ai_w_entity_strength": trial.suggest_float("ai_w_entity_strength", 0.1, 2.5),
"ai_w_shadow_balance": trial.suggest_float("ai_w_shadow_balance", 0.0, 2.2),
"ai_w_volatility_fit": trial.suggest_float("ai_w_volatility_fit", 0.0, 2.2),
"dynamic_vol_target_pct": trial.suggest_float("dynamic_vol_target_pct", 0.10, 0.80),
"slippage_pct": trial.suggest_float("slippage_pct", 0.005, 0.05),
}
)
params = normalize_params(params)
score, train_agg, valid_agg = evaluate_candidate(
params=params,
windows=windows,
raw_fee_rate=args.raw_fee_rate,
rebate_ratio=args.rebate_ratio,
drawdown_weight=args.drawdown_weight,
loss_day_weight=args.loss_day_weight,
min_trades=args.min_trades,
undertrade_penalty=args.undertrade_penalty,
valid_score_weight=args.valid_score_weight,
drawdown_guard=args.drawdown_guard,
stability_weight=args.stability_weight,
stress_slippage_multipliers=args.stress_slippage_multipliers,
stress_fee_multipliers=args.stress_fee_multipliers,
n_stress_workers=getattr(args, "n_stress_workers", 0),
)
trial.set_user_attr("train_agg", train_agg)
trial.set_user_attr("valid_agg", valid_agg)
nonlocal best
if not best or score > best["score"]:
best = {
"score": score,
"params": params,
"train_agg": train_agg,
"valid_agg": valid_agg,
}
return score
n_jobs = max(1, int(getattr(args, "n_jobs", 1)))
study.optimize(objective, n_trials=args.n_trials, n_jobs=n_jobs, show_progress_bar=(n_jobs == 1))
if best:
return best
trial = study.best_trial
return {
"score": trial.value,
"params": normalize_params(dict(DEFAULT_PARAMS)),
"train_agg": trial.user_attrs.get("train_agg", {}),
"valid_agg": trial.user_attrs.get("valid_agg", {}),
}
def optimize_with_random(
windows: List[Tuple[List[Dict[str, float]], List[Dict[str, float]]]],
args: argparse.Namespace,
) -> Dict[str, Any]:
rng = random.Random(args.seed)
best: Optional[Dict[str, Any]] = None
for i in range(args.n_trials):
params = sample_random_params(rng)
score, train_agg, valid_agg = evaluate_candidate(
params=params,
windows=windows,
raw_fee_rate=args.raw_fee_rate,
rebate_ratio=args.rebate_ratio,
drawdown_weight=args.drawdown_weight,
loss_day_weight=args.loss_day_weight,
min_trades=args.min_trades,
undertrade_penalty=args.undertrade_penalty,
valid_score_weight=args.valid_score_weight,
drawdown_guard=args.drawdown_guard,
stability_weight=args.stability_weight,
stress_slippage_multipliers=args.stress_slippage_multipliers,
stress_fee_multipliers=args.stress_fee_multipliers,
n_stress_workers=getattr(args, "n_stress_workers", 0),
)
if best is None or score > best["score"]:
best = {
"score": score,
"params": params,
"train_agg": train_agg,
"valid_agg": valid_agg,
"trial": i + 1,
}
if (i + 1) % 20 == 0:
print(f"[random] trial {i + 1}/{args.n_trials}, best_score={best['score']:.6f}")
if best is None:
raise RuntimeError("random optimization failed: no trial executed")
return best
def load_klines(args: argparse.Namespace) -> Tuple[List[Dict[str, float]], Dict[str, Any]]:
if args.data_file:
csv_path = Path(args.data_file).expanduser()
klines = load_klines_from_csv(csv_path, last_n_days=args.days)
source_meta = {"type": "csv", "path": str(csv_path)}
return klines, source_meta
api_key = args.api_key or os.getenv("BITMART_API_KEY")
secret_key = args.secret_key or os.getenv("BITMART_SECRET_KEY")
memo = args.memo or os.getenv("BITMART_MEMO", "参数优化")
credential_source = "args_or_env"
if not api_key or not secret_key:
strategy_path = Path(__file__).with_name(STRATEGY_FILENAME)
extracted = load_credentials_from_strategy_file(strategy_path)
if extracted:
api_key = api_key or extracted.get("api_key", "")
secret_key = secret_key or extracted.get("secret_key", "")
memo = memo or extracted.get("memo", "参数优化")
credential_source = f"strategy_file:{strategy_path.name}"
if not api_key or not secret_key:
raise ValueError(
"No --data-file provided and API credentials missing. "
"Provide --api-key/--secret-key or env BITMART_API_KEY/BITMART_SECRET_KEY. "
f"Fallback strategy file attempted: {STRATEGY_FILENAME}"
)
from bitmart.api_contract import APIContract
print(f"Using API credentials source: {credential_source}")
client = APIContract(api_key, secret_key, memo, timeout=(5, 15))
klines = fetch_klines_from_api(
contract_api=client,
contract_symbol=args.symbol,
step=args.step,
days=args.days,
batch_hours=args.batch_hours,
sleep_seconds=args.api_sleep,
)
source_meta = {
"type": "api",
"symbol": args.symbol,
"step": args.step,
"days": args.days,
"credential_source": credential_source,
}
save_path = args.save_data_file
if not save_path:
save_path = str(Path(__file__).with_name(f"auto_{args.symbol.lower()}_{args.step}m_{args.days}d.csv"))
if save_path:
output_path = save_klines_to_csv(klines, save_path)
source_meta["saved_csv"] = str(output_path)
print(f"Fetched klines saved to: {output_path}")
return klines, source_meta
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Optimize conservative strategy params using last N days of kline data.")
parser.add_argument("--data-file", type=str, default="", help="CSV path for kline data (recommended).")
parser.add_argument("--save-data-file", type=str, default="", help="When fetching from API, also save CSV here.")
parser.add_argument("--days", type=int, default=30, help="Use recent N days of data.")
parser.add_argument("--symbol", type=str, default="ETHUSDT", help="Bitmart contract symbol for API mode.")
parser.add_argument("--step", type=int, default=5, help="Kline step in minutes for API mode.")
parser.add_argument("--batch-hours", type=int, default=12, help="API fetch window size in hours.")
parser.add_argument("--api-sleep", type=float, default=0.05, help="Sleep seconds between API windows.")
parser.add_argument("--api-key", type=str, default="", help="Bitmart API key (optional if using env).")
parser.add_argument("--secret-key", type=str, default="", help="Bitmart secret key (optional if using env).")
parser.add_argument("--memo", type=str, default="", help="Bitmart memo (optional).")
parser.add_argument("--train-days", type=int, default=20, help="Rolling train window in days.")
parser.add_argument("--valid-days", type=int, default=10, help="Rolling validation window in days.")
parser.add_argument("--window-step-days", type=int, default=5, help="Rolling window step in days.")
parser.add_argument("--method", type=str, default="auto", choices=["auto", "optuna", "random"], help="Search method.")
parser.add_argument("--n-trials", type=int, default=300, help="Number of optimization trials.")
parser.add_argument("--n-jobs", type=int, default=1, help="Parallel trials (Optuna only). 1=sequential, 2+=parallel.")
parser.add_argument("--n-stress-workers", type=int, default=0, help="Parallel stress scenarios per window (0=sequential). e.g. 4 runs slip×fee in parallel.")
parser.add_argument("--max-windows", type=int, default=0, help="Cap rolling windows (0=no cap). Speeds up when set to 2-3.")
parser.add_argument("--seed", type=int, default=42, help="Random seed.")
parser.add_argument("--raw-fee-rate", type=float, default=0.0006, help="Raw taker fee rate (e.g. 0.0006).")
parser.add_argument("--rebate-ratio", type=float, default=0.90, help="Fee rebate ratio (e.g. 0.90 means 90%% rebate).")
parser.add_argument("--drawdown-weight", type=float, default=1.4, help="Penalty weight for max drawdown.")
parser.add_argument("--loss-day-weight", type=float, default=0.8, help="Penalty weight for losing days.")
parser.add_argument("--min-trades", type=int, default=6, help="Minimum trades per window before penalty.")
parser.add_argument("--undertrade-penalty", type=float, default=0.5, help="Penalty per missing trade.")
parser.add_argument("--valid-score-weight", type=float, default=0.8, help="Weight of validation score in combined score.")
parser.add_argument("--drawdown-guard", type=float, default=10.0, help="Extra hard penalty beyond this validation drawdown.")
parser.add_argument("--stability-weight", type=float, default=0.3, help="Penalty for net-pnl variance across windows.")
parser.add_argument(
"--stress-slippage-multipliers",
type=str,
default="0.8,1.0,1.2",
help="Comma-separated slippage multipliers used in robust scoring.",
)
parser.add_argument(
"--stress-fee-multipliers",
type=str,
default="1.0,1.15",
help="Comma-separated raw-fee multipliers used in robust scoring.",
)
parser.add_argument(
"--output",
type=str,
default=str(Path(__file__).with_name("current_params.json")),
help="Output JSON path used by live conservative script.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
args.stress_slippage_multipliers = _parse_float_list(args.stress_slippage_multipliers, [0.8, 1.0, 1.2])
args.stress_fee_multipliers = _parse_float_list(args.stress_fee_multipliers, [1.0, 1.15])
klines, source_meta = load_klines(args)
if len(klines) < 200:
raise RuntimeError(f"Not enough klines for optimization: {len(klines)}")
windows = split_rolling_windows(
klines,
train_days=args.train_days,
valid_days=args.valid_days,
step_days=args.window_step_days,
)
if not windows:
raise RuntimeError(
"Could not build rolling windows. Increase data days or reduce train/valid window sizes."
)
max_windows = max(0, int(getattr(args, "max_windows", 0)))
if max_windows > 0 and len(windows) > max_windows:
windows = windows[:max_windows]
print(f"Capped rolling windows to {max_windows} (--max-windows)")
print(f"Loaded klines: {len(klines)}, rolling windows: {len(windows)}")
use_method = args.method
if use_method == "auto":
try:
import optuna # noqa: F401
use_method = "optuna"
except Exception:
use_method = "random"
if use_method == "optuna":
best = optimize_with_optuna(windows, args)
else:
best = optimize_with_random(windows, args)
best_params = normalize_params(best["params"])
result_payload = {
"updated_at": datetime.now(timezone.utc).isoformat(),
"source": {
**source_meta,
"bars": len(klines),
"start_ts": int(klines[0]["id"]),
"end_ts": int(klines[-1]["id"]),
},
"optimization": {
"method": use_method,
"n_trials": args.n_trials,
"train_days": args.train_days,
"valid_days": args.valid_days,
"window_step_days": args.window_step_days,
"windows": len(windows),
"seed": args.seed,
"stress_slippage_multipliers": args.stress_slippage_multipliers,
"stress_fee_multipliers": args.stress_fee_multipliers,
},
"fee_model": {
"raw_fee_rate": args.raw_fee_rate,
"rebate_ratio": args.rebate_ratio,
"effective_fee_rate": args.raw_fee_rate * (1 - args.rebate_ratio),
},
"score": best["score"],
"train_metrics_avg": best.get("train_agg", {}),
"valid_metrics_avg": best.get("valid_agg", {}),
"params": best_params,
}
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(result_payload, f, ensure_ascii=False, indent=2)
print("Optimization done.")
print(f"Best score: {best['score']:.6f}")
print(f"Saved params to: {output_path}")
print("Best params:")
for k in sorted(best_params):
print(f" {k}: {best_params[k]}")
if __name__ == "__main__":
main()

View File

@@ -1,604 +0,0 @@
import random
import time
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None # 上一次处理的K线时间戳用于判断是否是新K线
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
# 策略相关变量
self.prev_kline = None # 上一根K线
self.current_kline = None # 当前K线
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
# 反手信号控制记录当前K线是否已执行过反手每根K线只执行一次
self.reverse_executed_kline_id = None # 已执行反手的K线时间戳
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
# 返回最近2根K线倒数第二根上一根和最后一根当前
if len(formatted) >= 2:
return formatted[-2], formatted[-1]
return None, None
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(text="获取K线异常", error=True)
return None, None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 1, # 取最近1小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = float(positions[0]['open_avg_price'])
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
# ele.scroll.to_see(center=True)
# time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
if marketPriceLongOrder == -1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
"""日志通知"""
if error:
logger.error(text)
else:
logger.info(text)
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
def calculate_upper_shadow(self, kline):
"""计算上影线涨幅百分比"""
# 上影线 = (最高价 - max(开盘价, 收盘价)) / max(开盘价, 收盘价)
body_top = max(kline['open'], kline['close'])
if body_top == 0:
return 0
return (kline['high'] - body_top) / body_top * 100
def calculate_lower_shadow(self, kline):
"""计算下影线跌幅百分比"""
# 下影线 = (min(开盘价, 收盘价) - 最低价) / min(开盘价, 收盘价)
body_bottom = min(kline['open'], kline['close'])
if body_bottom == 0:
return 0
return (body_bottom - kline['low']) / body_bottom * 100
def get_entity_edge(self, kline):
"""获取K线实体边收盘价或开盘价取决于是阳线还是阴线"""
# 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价
# 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价
return {
'upper': max(kline['open'], kline['close']), # 实体上边
'lower': min(kline['open'], kline['close']) # 实体下边
}
def check_signal(self, current_price, prev_kline, current_kline):
"""
检查交易信号
返回: ('long', trigger_price) / ('short', trigger_price) / None
"""
# 计算上一根K线实体
prev_entity = self.calculate_entity(prev_kline)
# 实体过小不交易(实体 < 0.1
if prev_entity < 0.1:
logger.info(f"上一根K线实体过小: {prev_entity:.4f},跳过信号检测")
return None
# 获取上一根K线的实体上下边
prev_entity_edge = self.get_entity_edge(prev_kline)
prev_entity_upper = prev_entity_edge['upper'] # 实体上边
prev_entity_lower = prev_entity_edge['lower'] # 实体下边
# 计算触发价基于上一根K线实体位置
long_trigger = prev_entity_lower + prev_entity / 4 # 做多触发价 = 实体下边 + 实体/4下四分之一处
short_trigger = prev_entity_upper - prev_entity / 4 # 做空触发价 = 实体上边 - 实体/4上四分之一处
# 上一根阴线 + 当前阳线做多形态不按上一根K线上三分之一做空
prev_is_bearish = prev_kline['close'] < prev_kline['open']
current_is_bullish = current_kline['close'] > current_kline['open']
skip_short_by_upper_third = prev_is_bearish and current_is_bullish
# 上一根阳线 + 当前阴线做空形态不按上一根K线下三分之一做多
prev_is_bullish = prev_kline['close'] > prev_kline['open']
current_is_bearish = current_kline['close'] < current_kline['open']
skip_long_by_lower_third = prev_is_bullish and current_is_bearish
logger.info(f"当前价格: {current_price:.2f}, 上一根实体: {prev_entity:.4f}")
logger.info(f"上一根实体上边: {prev_entity_upper:.2f}, 下边: {prev_entity_lower:.2f}")
logger.info(f"做多触发价(下1/4): {long_trigger:.2f}, 做空触发价(上1/4): {short_trigger:.2f}")
if skip_short_by_upper_third:
logger.info("上一根阴线+当前阳线(做多形态),不按上四分之一做空")
if skip_long_by_lower_third:
logger.info("上一根阳线+当前阴线(做空形态),不按下四分之一做多")
# 无持仓时检查开仓信号(保留原有开仓逻辑)
if self.start == 0:
if current_price >= long_trigger and not skip_long_by_lower_third:
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 触发价(下1/4) {long_trigger:.2f}")
return ('long', long_trigger)
elif current_price <= short_trigger and not skip_short_by_upper_third:
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 触发价(上1/4) {short_trigger:.2f}")
return ('short', short_trigger)
# ========== 止盈/止损逻辑(四分之一触发价)==========
# 计算基于当前K线开盘价的止盈止损触发价
current_open = current_kline['open']
stop_long_trigger = current_open + prev_entity / 4 # 止盈做多触发价 = 当前开盘价 + 上一根实体/4
stop_short_trigger = current_open - prev_entity / 4 # 止损做空触发价 = 当前开盘价 - 上一根实体/4
logger.info(f"止盈止损触发价: 做多止盈={stop_long_trigger:.2f}, 做空止损={stop_short_trigger:.2f}")
# 持多仓时检查止损信号(只平仓,不反手)
if self.start == 1:
if current_price <= stop_short_trigger and not skip_short_by_upper_third:
logger.info(f"【止损平仓】持多仓, 价格 {current_price:.2f} <= 止损价 {stop_short_trigger:.2f}")
return ('close_long', stop_short_trigger)
# 持空仓时检查止盈信号(只平仓,不反手)
elif self.start == -1:
if current_price >= stop_long_trigger and not skip_long_by_lower_third:
logger.info(f"【止盈平仓】持空仓, 价格 {current_price:.2f} >= 止盈价 {stop_long_trigger:.2f}")
return ('close_short', stop_long_trigger)
# ========== 反手逻辑(影线条件)==========
# 检查当前K线是否已执行过反手每根K线只执行一次
current_kline_id = current_kline['id']
if self.reverse_executed_kline_id == current_kline_id:
logger.debug(f"当前K线 {current_kline_id} 已执行过反手,跳过反手检测")
return None
# 持多仓时检查反手做空信号
if self.start == 1:
# 反手条件: 上一根阳线 + 下影线>=0.1% + 当前先涨后跌到上一根最低价
if prev_is_bullish: # 上一根是阳线
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
current_went_up_first = current_kline['high'] > current_kline['open']
logger.debug(f"反手做空检测: 上一根阳线, 下影线跌幅={lower_shadow_pct:.4f}%, "
f"当前K线先涨过={current_went_up_first}, "
f"当前价格={current_price:.2f}, 上一根最低价={prev_kline['low']:.2f}")
if lower_shadow_pct >= 0.1 and current_went_up_first and current_price <= prev_kline['low']:
logger.info(f"【反手做空】上一根阳线, 下影线跌幅 {lower_shadow_pct:.4f}% >= 0.1%, "
f"当前K线先涨过, 价格 {current_price:.2f} <= 上一根最低价 {prev_kline['low']:.2f}")
return ('reverse_short', prev_kline['low'])
# 持空仓时检查反手做多信号
elif self.start == -1:
# 反手条件: 上一根阴线 + 上影线>=0.1% + 当前先跌后涨到上一根最高价
if prev_is_bearish: # 上一根是阴线
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
current_went_down_first = current_kline['low'] < current_kline['open']
logger.debug(f"反手做多检测: 上一根阴线, 上影线涨幅={upper_shadow_pct:.4f}%, "
f"当前K线先跌过={current_went_down_first}, "
f"当前价格={current_price:.2f}, 上一根最高价={prev_kline['high']:.2f}")
if upper_shadow_pct >= 0.1 and current_went_down_first and current_price >= prev_kline['high']:
logger.info(f"【反手做多】上一根阴线, 上影线涨幅 {upper_shadow_pct:.4f}% >= 0.1%, "
f"当前K线先跌过, 价格 {current_price:.2f} >= 上一根最高价 {prev_kline['high']:.2f}")
return ('reverse_long', prev_kline['high'])
return None
def verify_no_position(self, max_retries=5, retry_interval=3):
"""
验证当前无持仓
返回: True 表示无持仓可以开仓False 表示有持仓不能开仓
"""
for i in range(max_retries):
if self.get_position_status():
if self.start == 0:
logger.info(f"确认无持仓,可以开仓")
return True
else:
logger.warning(
f"仍有持仓 (方向: {self.start}),等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
else:
logger.warning(f"查询持仓状态失败,等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
logger.error(f"经过 {max_retries} 次重试仍有持仓或查询失败,放弃开仓")
return False
def verify_position_direction(self, expected_direction):
"""
验证当前持仓方向是否与预期一致
expected_direction: 1 多仓, -1 空仓
返回: True 表示持仓方向正确False 表示不正确
"""
if self.get_position_status():
if self.start == expected_direction:
logger.info(f"持仓方向验证成功: {self.start}")
return True
else:
logger.warning(f"持仓方向不符: 期望 {expected_direction}, 实际 {self.start}")
return False
else:
logger.error("查询持仓状态失败")
return False
def execute_trade(self, signal, current_kline_id, size=1):
"""执行交易"""
signal_type, trigger_price = signal
size = 25
if signal_type == 'long':
# 开多前先确认无持仓
logger.info(f"准备开多,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开多前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(1):
logger.success("开多成功")
return True
else:
logger.error("开多后持仓验证失败")
return False
elif signal_type == 'short':
# 开空前先确认无持仓
logger.info(f"准备开空,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开空前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(-1):
logger.success("开空成功")
return True
else:
logger.error("开空后持仓验证失败")
return False
elif signal_type == 'close_long':
# 止损平多仓(只平仓,不反手)
logger.info(f"执行止损平多仓,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(3) # 等待订单执行
# 验证平仓是否成功
if self.get_position_status() and self.start == 0:
logger.success("止损平多仓成功")
time.sleep(20) # 额外等待避免频繁交易
return True
else:
logger.error("止损平多仓后验证失败")
return False
elif signal_type == 'close_short':
# 止盈平空仓(只平仓,不反手)
logger.info(f"执行止盈平空仓,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(3) # 等待订单执行
# 验证平仓是否成功
if self.get_position_status() and self.start == 0:
logger.success("止盈平空仓成功")
time.sleep(20) # 额外等待避免频繁交易
return True
else:
logger.error("止盈平空仓后验证失败")
return False
elif signal_type == 'reverse_long':
# 平空 + 开多(反手做多)- 优化:平仓后立即开仓
logger.info(f"执行反手做多,触发价: {trigger_price:.2f}")
self.平仓()
# time.sleep(1) # 等待1秒让平仓订单提交并更新UI
# 立即执行开多,不等待平仓验证完成(市价单通常毫秒级成交)
logger.info("平仓已提交,立即执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(1):
logger.success("反手做多成功")
# 记录当前K线已执行反手
self.reverse_executed_kline_id = current_kline_id
time.sleep(20) # 额外等待避免频繁交易
return True
else:
logger.error("反手做多后持仓验证失败")
return False
elif signal_type == 'reverse_short':
# 平多 + 开空(反手做空)- 优化:平仓后立即开仓
logger.info(f"执行反手做空,触发价: {trigger_price:.2f}")
self.平仓()
# time.sleep(1) # 等待1秒让平仓订单提交并更新UI
# 立即执行开空,不等待平仓验证完成(市价单通常毫秒级成交)
logger.info("平仓已提交,立即执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(-1):
logger.success("反手做空成功")
# 记录当前K线已执行反手
self.reverse_executed_kline_id = current_kline_id
time.sleep(20) # 额外等待避免频繁交易
return True
else:
logger.error("反手做空后持仓验证失败")
return False
return False
def action(self):
"""主循环"""
logger.info("开始运行四分之一策略交易(新反手逻辑)...")
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
page_start = True
while True:
if page_start:
# 打开浏览器
for i in range(5):
if self.openBrowser():
logger.info("浏览器打开成功")
break
else:
self.ding("打开浏览器失败!", error=True)
return
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
size = 25
self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
page_start = False
try:
# 1. 获取K线数据当前K线和上一根K线
prev_kline, current_kline = self.get_klines()
if not prev_kline or not current_kline:
logger.warning("获取K线失败等待重试...")
time.sleep(5)
continue
# 2. 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.warning("获取价格失败,等待重试...")
time.sleep(2)
continue
# 3. 每次循环都通过SDK获取真实持仓状态避免状态不同步导致双向持仓
if not self.get_position_status():
logger.warning("获取持仓状态失败,等待重试...")
time.sleep(2)
continue
logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)")
# 4. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# 5. 有信号则执行交易
if signal:
trade_success = self.execute_trade(signal, current_kline['id'], size=1)
if trade_success:
logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}")
page_start = True
else:
logger.warning(f"交易执行失败或被阻止: {signal[0]}")
# 6. 短暂等待后继续循环同一根K线遇到信号就操作
time.sleep(0.5)
if page_start:
self.page.close()
time.sleep(25)
page_start = True
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(5)
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()

View File

@@ -0,0 +1,752 @@
import time
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.pbar = tqdm(total=30, desc="等待K线", ncols=80) # 可选:用于长时间等待时展示进度
self.last_kline_time = None # 上一次处理的K线时间戳用于判断是否是新K线
# 反手频率控制
self.reverse_cooldown_seconds = 1.5 * 60 # 反手冷却时间(秒)
self.reverse_min_move_pct = 0.05 # 反手最小价差过滤(百分比)
self.last_reverse_time = None # 上次反手时间
# 开仓频率控制
self.open_cooldown_seconds = 60 # 开仓冷却时间(秒),两次开仓至少间隔此时长
self.last_open_time = None # 上次开仓时间
self.last_open_kline_id = None # 上次开仓所在 K 线 id同一根 K 线只允许开仓一次
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0 # 未使用;若启用则可为每次开仓占可用余额的百分比
self.take_profit_usd = 5 # 仓位盈利达到此金额(美元)时平仓止盈
self.stop_loss_usd = -3 # 固定止损:亏损达到 3 美元平仓
self.trailing_activation_usd = 2 # 盈利达到此金额后启动移动止损
self.trailing_distance_usd = 1.5 # 从最高盈利回撤此金额则平仓
self.max_unrealized_pnl_seen = None # 持仓期间见过的最大盈利(用于移动止损)
# 当前K线从极值回落平仓
# 模式: 'fixed'=固定点数(drop_from_high_to_close)'pct_retrace'=按本K线涨幅比例动态算回撤
self.drop_from_high_mode = 'pct_retrace' # 'fixed' | 'pct_retrace'
self.drop_from_high_to_close = 2 # fixed 模式下:回落/反弹超过此价格点数则平仓0 表示关闭
# pct_retrace 模式本K线涨幅 = (最高-开盘)/开盘*100允许回撤% = 涨幅% * retrace_ratio从最高点回撤超过则平仓
self.retrace_ratio = 0.5 # 回撤系数,如 0.5 表示允许回撤涨幅的 50%(类似斐波那契 50% 回撤)
self.min_rise_pct_to_activate = 0.02 # 至少涨/跌这么多才启用动态回撤,避免噪音
self.min_drop_pct_from_high = 0.03 # 至少从最高点回撤这么多%才平仓(保底,避免过于敏感)
self._candle_high_seen = None # 当前K线内见过的最高价多头用
self._candle_low_seen = None # 当前K线内见过的最低价空头用
self._candle_id_for_high_low = None # 记录高低对应的K线 id换线则重置
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
self.default_order_size = 25 # 开仓/反手张数,统一在此修改
# 策略相关变量
self.prev_kline = None # 上一根K线
self.current_kline = None # 当前K线
self.prev_entity = None # 上一根K线实体大小
self.current_open = None # 当前K线开盘价
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
# 返回最近2根K线倒数第二根上一根和最后一根当前
if len(formatted) >= 2:
return formatted[-2], formatted[-1]
return None, None
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(text="获取K线异常", error=True)
return None, None
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 1, # 取最近1小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.unrealized_pnl = None
return True
pos = positions[0]
self.start = 1 if pos['position_type'] == 1 else -1
self.open_avg_price = float(pos['open_avg_price'])
self.current_amount = float(pos['current_amount'])
self.position_cross = pos["position_cross"]
# 直接从API获取未实现盈亏Bitmart返回的是 unrealized_value 字段)
self.unrealized_pnl = float(pos.get('unrealized_value', 0))
logger.debug(f"持仓详情: 方向={self.start}, 开仓均价={self.open_avg_price}, "
f"持仓量={self.current_amount}, 未实现盈亏={self.unrealized_pnl:.2f}")
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def get_unrealized_pnl_usd(self):
"""
获取当前持仓未实现盈亏美元直接使用API返回值
"""
if self.start == 0 or self.unrealized_pnl is None:
return None
return self.unrealized_pnl
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
# ele.scroll.to_see(center=True)
# time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
if marketPriceLongOrder == -1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
"""日志通知"""
if error:
logger.error(text)
else:
logger.info(text)
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
def calculate_upper_shadow(self, kline):
"""计算上阴线(上影线)涨幅百分比"""
# 上阴线 = (最高价 - max(开盘价, 收盘价)) / max(开盘价, 收盘价)
body_top = max(kline['open'], kline['close'])
if body_top == 0:
return 0
return (kline['high'] - body_top) / body_top * 100
def calculate_lower_shadow(self, kline):
"""计算下阴线(下影线)跌幅百分比"""
# 下阴线 = (min(开盘价, 收盘价) - 最低价) / min(开盘价, 收盘价)
body_bottom = min(kline['open'], kline['close'])
if body_bottom == 0:
return 0
return (body_bottom - kline['low']) / body_bottom * 100
def get_entity_edge(self, kline):
"""获取K线实体边收盘价或开盘价取决于是阳线还是阴线"""
# 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价
# 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价
return {
'upper': max(kline['open'], kline['close']), # 实体上边
'lower': min(kline['open'], kline['close']) # 实体下边
}
def check_signal(self, current_price, prev_kline, current_kline):
"""
检查交易信号
返回: ('long', trigger_price) / ('short', trigger_price) / None
"""
# 计算上一根K线实体
prev_entity = self.calculate_entity(prev_kline)
# 实体过小不交易(实体 < 0.1
if prev_entity < 0.1:
logger.info(f"上一根K线实体过小: {prev_entity:.4f},跳过信号检测")
return None
# 获取上一根K线的实体上下边
prev_entity_edge = self.get_entity_edge(prev_kline)
prev_entity_upper = prev_entity_edge['upper'] # 实体上边
prev_entity_lower = prev_entity_edge['lower'] # 实体下边
# 优化:以下两种情况以当前这根的开盘价作为计算基准
# 1) 上一根阳线 且 当前开盘价 > 上一根收盘价(跳空高开)
# 2) 上一根阴线 且 当前开盘价 < 上一根收盘价(跳空低开)
prev_is_bullish_for_calc = prev_kline['close'] > prev_kline['open']
prev_is_bearish_for_calc = prev_kline['close'] < prev_kline['open']
current_open_above_prev_close = current_kline['open'] > prev_kline['close']
current_open_below_prev_close = current_kline['open'] < prev_kline['close']
use_current_open_as_base = (prev_is_bullish_for_calc and current_open_above_prev_close) or (prev_is_bearish_for_calc and current_open_below_prev_close)
if use_current_open_as_base:
# 以当前K线开盘价为基准计算跳空时用当前开盘价参与计算
calc_lower = current_kline['open']
calc_upper = current_kline['open'] # 同一基准,上下四分之一对称
long_trigger = calc_lower + prev_entity / 4
short_trigger = calc_upper - prev_entity / 4
long_breakout = calc_upper + prev_entity / 4
short_breakout = calc_lower - prev_entity / 4
else:
# 原有计算方式
long_trigger = prev_entity_lower + prev_entity / 4 # 做多触发价 = 实体下边 + 实体/4下四分之一处
short_trigger = prev_entity_upper - prev_entity / 4 # 做空触发价 = 实体上边 - 实体/4上四分之一处
long_breakout = prev_entity_upper + prev_entity / 4 # 做多突破价 = 实体上边 + 实体/4
short_breakout = prev_entity_lower - prev_entity / 4 # 做空突破价 = 实体下边 - 实体/4
# 上一根阴线 + 当前阳线做多形态不按上一根K线上三分之一做空
prev_is_bearish = prev_kline['close'] < prev_kline['open']
current_is_bullish = current_kline['close'] > current_kline['open']
skip_short_by_upper_third = prev_is_bearish and current_is_bullish
# 上一根阳线 + 当前阴线做空形态不按上一根K线下三分之一做多
prev_is_bullish = prev_kline['close'] > prev_kline['open']
current_is_bearish = current_kline['close'] < current_kline['open']
skip_long_by_lower_third = prev_is_bullish and current_is_bearish
if use_current_open_as_base:
if prev_is_bullish_for_calc and current_open_above_prev_close:
logger.info(f"上一根阳线且当前开盘价({current_kline['open']:.2f})>上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算")
else:
logger.info(f"上一根阴线且当前开盘价({current_kline['open']:.2f})<上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算")
logger.info(f"当前价格: {current_price:.2f}, 上一根实体: {prev_entity:.4f}")
logger.info(f"上一根实体上边: {prev_entity_upper:.2f}, 下边: {prev_entity_lower:.2f}")
logger.info(f"做多触发价(下1/4): {long_trigger:.2f}, 做空触发价(上1/4): {short_trigger:.2f}")
logger.info(f"突破做多价(上1/4外): {long_breakout:.2f}, 突破做空价(下1/4外): {short_breakout:.2f}")
if skip_short_by_upper_third:
logger.info("上一根阴线+当前阳线(做多形态),不按上四分之一做空")
if skip_long_by_lower_third:
logger.info("上一根阳线+当前阴线(做空形态),不按下四分之一做多")
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_breakout and not skip_long_by_lower_third:
logger.info(f"触发做多信号!价格 {current_price:.2f} >= 突破价(上1/4外) {long_breakout:.2f}")
return ('long', long_breakout)
elif current_price <= short_breakout and not skip_short_by_upper_third:
logger.info(f"触发做空信号!价格 {current_price:.2f} <= 突破价(下1/4外) {short_breakout:.2f}")
return ('short', short_breakout)
# 持仓时检查反手信号
elif self.start == 1: # 持多仓
# 反手条件1: 价格跌到上一根K线的上三分之一处做空触发价上一根阴线+当前阳线做多时跳过
if current_price <= short_trigger and not skip_short_by_upper_third:
logger.info(f"持多反手做空!价格 {current_price:.2f} <= 触发价(上1/4) {short_trigger:.2f}")
return ('reverse_short', short_trigger)
# 反手条件2: 上一根K线上阴线涨幅>0.01%,当前跌到上一根实体下边
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
if upper_shadow_pct > 0.01 and current_price <= prev_entity_lower:
logger.info(f"持多反手做空!上阴线涨幅 {upper_shadow_pct:.4f}% > 0.01%"
f"价格 {current_price:.2f} <= 实体下边 {prev_entity_lower:.2f}")
return ('reverse_short', prev_entity_lower)
elif self.start == -1: # 持空仓
# 反手条件1: 价格涨到上一根K线的下三分之一处做多触发价上一根阳线+当前阴线做空时跳过
if current_price >= long_trigger and not skip_long_by_lower_third:
logger.info(f"持空反手做多!价格 {current_price:.2f} >= 触发价(下1/4) {long_trigger:.2f}")
return ('reverse_long', long_trigger)
# 反手条件2: 上一根K线下阴线跌幅>0.01%,当前涨到上一根实体上边
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
if lower_shadow_pct > 0.01 and current_price >= prev_entity_upper:
logger.info(f"持空反手做多!下阴线跌幅 {lower_shadow_pct:.4f}% > 0.01%"
f"价格 {current_price:.2f} >= 实体上边 {prev_entity_upper:.2f}")
return ('reverse_long', prev_entity_upper)
return None
def can_open(self, current_kline_id):
"""开仓前过滤:同一根 K 线只开一次 + 开仓冷却时间。仅用于 long/short 新开仓。"""
now = time.time()
if self.last_open_kline_id is not None and self.last_open_kline_id == current_kline_id:
logger.info(f"开仓频率控制:本 K 线({current_kline_id})已开过仓,跳过")
return False
if self.last_open_time is not None and now - self.last_open_time < self.open_cooldown_seconds:
remain = self.open_cooldown_seconds - (now - self.last_open_time)
logger.info(f"开仓冷却中,剩余 {remain:.0f}")
return False
return True
def can_reverse(self, current_price, trigger_price):
"""反手前过滤:冷却时间 + 最小价差"""
now = time.time()
if self.last_reverse_time and now - self.last_reverse_time < self.reverse_cooldown_seconds:
remain = self.reverse_cooldown_seconds - (now - self.last_reverse_time)
logger.info(f"反手冷却中,剩余 {remain:.0f}")
return False
if trigger_price and trigger_price > 0:
move_pct = abs(current_price - trigger_price) / trigger_price * 100
if move_pct < self.reverse_min_move_pct:
logger.info(f"反手价差不足: {move_pct:.4f}% < {self.reverse_min_move_pct}%")
return False
return True
def verify_no_position(self, max_retries=5, retry_interval=3):
"""
验证当前无持仓
返回: True 表示无持仓可以开仓False 表示有持仓不能开仓
"""
for i in range(max_retries):
if self.get_position_status():
if self.start == 0:
logger.info(f"确认无持仓,可以开仓")
return True
else:
logger.warning(
f"仍有持仓 (方向: {self.start}),等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
else:
logger.warning(f"查询持仓状态失败,等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
logger.error(f"经过 {max_retries} 次重试仍有持仓或查询失败,放弃开仓")
return False
def verify_position_direction(self, expected_direction):
"""
验证当前持仓方向是否与预期一致
expected_direction: 1 多仓, -1 空仓
返回: True 表示持仓方向正确False 表示不正确
"""
if self.get_position_status():
if self.start == expected_direction:
logger.info(f"持仓方向验证成功: {self.start}")
return True
else:
logger.warning(f"持仓方向不符: 期望 {expected_direction}, 实际 {self.start}")
return False
else:
logger.error("查询持仓状态失败")
return False
def execute_trade(self, signal, size=None):
"""执行交易。size 不传或为 None 时使用 default_order_size。"""
signal_type, trigger_price = signal
size = self.default_order_size if size is None else size
if signal_type == 'long':
# 开多前先确认无持仓
logger.info(f"准备开多,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开多前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(1):
self.max_unrealized_pnl_seen = None # 新仓位重置移动止损记录
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success("开多成功")
return True
else:
logger.error("开多后持仓验证失败")
return False
elif signal_type == 'short':
# 开空前先确认无持仓
logger.info(f"准备开空,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开空前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(f"确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(-1):
self.max_unrealized_pnl_seen = None # 新仓位重置移动止损记录
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success("开空成功")
return True
else:
logger.error("开空后持仓验证失败")
return False
elif signal_type == 'reverse_long':
# 平空 + 开多(反手做多):先平仓,确认无仓后再开多,避免双向持仓
logger.info(f"执行反手做多,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1) # 给交易所处理平仓的时间
# 轮询确认已无持仓再开多(最多等约 10 秒)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做多:平仓后仍有持仓,放弃本次开多")
return False
logger.info("已确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3)
if self.verify_position_direction(1):
self.max_unrealized_pnl_seen = None
logger.success("反手做多成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做多后持仓验证失败")
return False
elif signal_type == 'reverse_short':
# 平多 + 开空(反手做空):先平仓,确认无仓后再开空
logger.info(f"执行反手做空,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做空:平仓后仍有持仓,放弃本次开空")
return False
logger.info("已确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3)
if self.verify_position_direction(-1):
self.max_unrealized_pnl_seen = None
logger.success("反手做空成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做空后持仓验证失败")
return False
return False
def action(self):
"""主循环"""
logger.info("开始运行四分之一策略交易...")
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
page_start = True
while True:
if page_start:
# 打开浏览器
for i in range(5):
if self.openBrowser():
logger.info("浏览器打开成功")
break
else:
self.ding("打开浏览器失败!", error=True)
return
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True)
page_start = False
try:
# 1. 获取K线数据当前K线和上一根K线
prev_kline, current_kline = self.get_klines()
if not prev_kline or not current_kline:
logger.warning("获取K线失败等待重试...")
time.sleep(5)
continue
# 记录进入新的K线
current_kline_time = current_kline['id']
if self.last_kline_time != current_kline_time:
self.last_kline_time = current_kline_time
logger.info(f"进入新K线: {current_kline_time}")
# 2. 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.warning("获取价格失败,等待重试...")
time.sleep(2)
continue
# 3. 每次循环都通过SDK获取真实持仓状态避免状态不同步导致双向持仓
if not self.get_position_status():
logger.warning("获取持仓状态失败,等待重试...")
time.sleep(2)
continue
logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)")
# 3.5 止损/止盈/移动止损 + 当前K线从极值回落平仓
if self.start != 0:
# 当前K线从最高/最低点回落平仓换线重置跟踪有持仓时更新本K线内最高/最低价并检查
if self._candle_id_for_high_low != current_kline_time:
self._candle_high_seen = None
self._candle_low_seen = None
self._candle_id_for_high_low = current_kline_time
use_fixed = self.drop_from_high_mode == 'fixed' and self.drop_from_high_to_close and self.drop_from_high_to_close > 0
use_pct = self.drop_from_high_mode == 'pct_retrace'
if use_fixed or use_pct:
if self.start == 1: # 多头跟踪当前K线最高价从最高点回落超过阈值则平仓
self._candle_high_seen = max(self._candle_high_seen or 0, current_price)
do_close = False
if use_fixed and self._candle_high_seen and current_price <= self._candle_high_seen - self.drop_from_high_to_close:
do_close = True
reason = f"固定回落 {self.drop_from_high_to_close}"
elif use_pct and self._candle_high_seen and current_kline.get('open'):
candle_open = float(current_kline['open'])
rise_pct = (self._candle_high_seen - candle_open) / candle_open * 100 if candle_open > 0 else 0
if rise_pct >= self.min_rise_pct_to_activate:
drop_trigger_pct = max(self.min_drop_pct_from_high, rise_pct * self.retrace_ratio)
drop_pct = (self._candle_high_seen - current_price) / self._candle_high_seen * 100 if self._candle_high_seen else 0
if drop_pct >= drop_trigger_pct:
do_close = True
reason = f"涨幅 {rise_pct:.3f}% → 允许回撤 {drop_trigger_pct:.3f}%,实际回撤 {drop_pct:.3f}%"
else:
pass # 涨幅不足,不启用动态回撤
if do_close:
logger.info(f"当前K线从最高点回落平仓最高 {self._candle_high_seen:.2f},当前 {current_price:.2f}{reason}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_high_seen = None
time.sleep(3)
continue
elif self.start == -1: # 空头跟踪当前K线最低价从最低点反弹超过阈值则平仓
self._candle_low_seen = min(self._candle_low_seen or float('inf'), current_price)
do_close = False
if use_fixed and self._candle_low_seen and current_price >= self._candle_low_seen + self.drop_from_high_to_close:
do_close = True
reason = f"固定反弹 {self.drop_from_high_to_close}"
elif use_pct and self._candle_low_seen and current_kline.get('open'):
candle_open = float(current_kline['open'])
rise_pct = (candle_open - self._candle_low_seen) / candle_open * 100 if candle_open > 0 else 0 # 对空头是“跌幅”
if rise_pct >= self.min_rise_pct_to_activate:
drop_trigger_pct = max(self.min_drop_pct_from_high, rise_pct * self.retrace_ratio)
bounce_pct = (current_price - self._candle_low_seen) / self._candle_low_seen * 100 if self._candle_low_seen else 0
if bounce_pct >= drop_trigger_pct:
do_close = True
reason = f"跌幅 {rise_pct:.3f}% → 允许反弹 {drop_trigger_pct:.3f}%,实际反弹 {bounce_pct:.3f}%"
if do_close:
logger.info(f"当前K线从最低点反弹平仓最低 {self._candle_low_seen:.2f},当前 {current_price:.2f}{reason}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_low_seen = None
time.sleep(3)
continue
pnl_usd = self.get_unrealized_pnl_usd()
if pnl_usd is not None:
# 固定止损:亏损达到 3 美元平仓
if pnl_usd <= self.stop_loss_usd:
logger.info(f"仓位亏损 {pnl_usd:.2f} 美元 <= 止损 {self.stop_loss_usd} 美元,执行止损平仓")
self.平仓()
self.max_unrealized_pnl_seen = None
time.sleep(3)
continue
# 更新持仓期间最大盈利(用于移动止损)
if self.max_unrealized_pnl_seen is None:
self.max_unrealized_pnl_seen = pnl_usd
else:
self.max_unrealized_pnl_seen = max(self.max_unrealized_pnl_seen, pnl_usd)
# 移动止损:盈利曾达到 activation 后,从最高盈利回撤 trailing_distance 则平仓
if self.max_unrealized_pnl_seen >= self.trailing_activation_usd:
if pnl_usd < self.max_unrealized_pnl_seen - self.trailing_distance_usd:
logger.info(f"移动止损:当前盈利 {pnl_usd:.2f} 从最高 {self.max_unrealized_pnl_seen:.2f} 回撤 >= {self.trailing_distance_usd} 美元,平仓")
self.平仓()
self.max_unrealized_pnl_seen = None
time.sleep(3)
continue
# 止盈:盈利达到 take_profit_usd 平仓
if pnl_usd >= self.take_profit_usd:
logger.info(f"仓位盈利 {pnl_usd:.2f} 美元 >= {self.take_profit_usd} 美元,执行止盈平仓")
self.平仓()
self.max_unrealized_pnl_seen = None
time.sleep(3)
continue
# 4. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# 5. 反手过滤:冷却时间 + 最小价差
if signal and signal[0].startswith('reverse_'):
if not self.can_reverse(current_price, signal[1]):
signal = None
# 5.5 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却
if signal and signal[0] in ('long', 'short'):
if not self.can_open(current_kline_time):
signal = None
else:
self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录
# 6. 有信号则执行交易
if signal:
trade_success = self.execute_trade(signal)
if trade_success:
logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}")
page_start = True
else:
logger.warning(f"交易执行失败或被阻止: {signal[0]}")
# 短暂等待后继续循环同一根K线遇到信号就操作
time.sleep(0.1)
if page_start:
self.page.close()
time.sleep(5)
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(5)
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()