Files
lm_code/test2.py

950 lines
35 KiB
Python
Raw Normal View History

2025-12-19 19:16:41 +08:00
import os
2025-12-23 11:12:32 +08:00
import time
2025-12-19 19:16:41 +08:00
import uuid
2025-12-23 11:12:32 +08:00
import datetime
from dataclasses import dataclass
2025-12-19 19:16:41 +08:00
2025-12-23 11:12:32 +08:00
from tqdm import tqdm
from loguru import logger
2025-12-19 19:16:41 +08:00
2025-12-23 11:12:32 +08:00
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
2025-12-19 19:16:41 +08:00
2025-12-23 11:12:32 +08:00
from 交易.tools import send_dingtalk_message
2025-12-19 19:16:41 +08:00
2025-12-23 11:12:32 +08:00
@dataclass
class StrategyConfig:
# =============================
# 1m | ETH 永续 | 控止损≤5/日
# =============================
# ===== 合约 =====
contract_symbol: str = "ETHUSDT"
open_type: str = "cross"
leverage: str = "30"
# ===== K线与指标 =====
step_min: int = 1
lookback_min: int = 240
ema_len: int = 36
atr_len: int = 14
# ===== ADX 趋势过滤(新增)=====
# 目的单边趋势ADX高抑制/禁止逆势均值回归单,避免反复反向开仓止损
enable_adx_filter: bool = True
adx_len: int = 14
adx_threshold: float = 25.0 # 常用20~30区间你可按回测调整
# 过滤模式:
# - "block_countertrend": 只禁止逆着 DI 的方向开仓(推荐,既防反手又不完全停机)
# - "block_all": ADX 高时直接不允许任何新开仓(更保守)
adx_mode: str = "block_countertrend"
# 趋势保护冷却:当 ADX 高且刚止损,延长冷却,减少“止损->立刻反手”的连环
cooldown_sec_after_sl_extra: int = 40
# =========================================================
# ✅ 自动阈值ATR/Price 分位数基准(更稳,不被短时噪声带跑)
# =========================================================
vol_baseline_window: int = 60
vol_baseline_quantile: float = 0.65
vol_scale_min: float = 0.80
vol_scale_max: float = 1.60
# ✅ baseline 每 60 秒刷新一次体感更明显、也省CPU
base_ratio_refresh_sec: int = 180
# =========================================================
# ✅ 动态 floor方案一
# floor = clamp(min, base_k * base_ratio, max)
# 目的跟着典型波动变过滤小噪声tp/sl 也随环境自适应
# =========================================================
# entry_dev_floor 动态
entry_dev_floor_min: float = 0.0012 # 0.12%
entry_dev_floor_max: float = 0.0030 # 0.30%
entry_dev_floor_base_k: float = 1.10 # entry_floor = 1.10 * base_ratio
# tp_floor 动态
tp_floor_min: float = 0.0006 # 0.06%
tp_floor_max: float = 0.0020 # 0.20%
tp_floor_base_k: float = 0.55 # tp_floor = 0.55 * base_ratio
# sl_floor 动态
sl_floor_min: float = 0.0018 # 0.18%
sl_floor_max: float = 0.0060 # 0.60%
sl_floor_base_k: float = 1.35 # sl_floor = 1.35 * base_ratio
# =========================================================
# ✅ 动态阈值倍率
# =========================================================
entry_k: float = 1.45
tp_k: float = 0.65
sl_k: float = 1.05
# ===== 时间/冷却 =====
max_hold_sec: int = 75
cooldown_sec_after_exit: int = 20
# ===== 下单/仓位 =====
risk_percent: float = 0.004
min_size: int = 1
max_size: int = 5000
# ===== 日内风控 =====
daily_loss_limit: float = 0.02
daily_profit_cap: float = 0.01
# ===== 危险模式过滤 =====
atr_ratio_kill: float = 0.0038
big_body_kill: float = 0.010
# ===== 轮询节奏 =====
klines_refresh_sec: int = 10
tick_refresh_sec: int = 1
status_notify_sec: int = 60
# =========================================================
# ✅ 止损后同向入场加门槛(你原来的逻辑保留)
# =========================================================
reentry_penalty_mult: float = 1.55
reentry_penalty_max_sec: int = 180
reset_band_k: float = 0.45
reset_band_floor: float = 0.0006
# =========================================================
# ✅ 止损后同方向 SL 放宽幅度与"止损时 vol_scale"联动
# =========================================================
post_sl_sl_max_sec: int = 90
post_sl_mult_min: float = 1.02
post_sl_mult_max: float = 1.16
post_sl_vol_alpha: float = 0.20
class BitmartFuturesMeanReversionBot:
def __init__(self, cfg: StrategyConfig):
self.cfg = cfg
self.api_key = os.getenv("BITMART_API_KEY", "").strip()
self.secret_key = os.getenv("BITMART_SECRET_KEY", "").strip()
self.memo = os.getenv("BITMART_MEMO", "合约交易").strip()
if not self.api_key or not self.secret_key:
raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)")
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 持仓状态: -1 空, 0 无, 1 多
self.pos = 0
self.entry_price = None
self.entry_ts = None
self.last_exit_ts = 0
# 日内权益基准
self.day_start_equity = None
self.trading_enabled = True
self.day_tag = datetime.date.today()
# 缓存
self._klines_cache = None
self._klines_cache_ts = 0
self._last_status_notify_ts = 0
# ✅ base_ratio 缓存
self._base_ratio_cached = 0.0015 # 初始化默认值 0.15%
self._base_ratio_ts = 0.0
# ✅ 止损后"同向入场加门槛"状态
self.last_sl_dir = 0 # 1=多止损,-1=空止损0=无
self.last_sl_ts = 0.0
# ✅ 止损后"同方向 SL 联动放宽"状态
self.post_sl_dir = 0
self.post_sl_ts = 0.0
self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale
self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90)
logger.info(f"初始化完成,基准波动率默认值: {self._base_ratio_cached * 100:.4f}%")
# ----------------- 通用工具 -----------------
def ding(self, msg, error=False):
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
for _ in range(3):
send_dingtalk_message(f"{prefix}{msg}")
else:
send_dingtalk_message(f"{prefix}{msg}")
def set_leverage(self) -> bool:
try:
resp = self.contractAPI.post_submit_leverage(
contract_symbol=self.cfg.contract_symbol,
leverage=self.cfg.leverage,
open_type=self.cfg.open_type
)[0]
if resp.get("code") == 1000:
logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x")
return True
logger.error(f"设置杠杆失败: {resp}")
self.ding(f"设置杠杆失败: {resp}", error=True)
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
self.ding(f"设置杠杆异常: {e}", error=True)
return False
# ----------------- 行情/指标 -----------------
def get_klines_cached(self):
now = time.time()
if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec:
return self._klines_cache
kl = self.get_klines()
if kl:
self._klines_cache = kl
self._klines_cache_ts = now
return self._klines_cache
def get_klines(self):
try:
end_time = int(time.time())
start_time = end_time - 60 * self.cfg.lookback_min
resp = self.contractAPI.get_kline(
contract_symbol=self.cfg.contract_symbol,
step=self.cfg.step_min,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp.get("data", [])
formatted = []
for k in data:
formatted.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
formatted.sort(key=lambda x: x["id"])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(f"获取K线异常: {e}", error=True)
return None
def get_last_price(self, fallback_close: float) -> float:
try:
if hasattr(self.contractAPI, "get_contract_details"):
r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "mark_price", "index_price"):
if key in d and d[key] is not None:
return float(d[key])
if hasattr(self.contractAPI, "get_ticker"):
r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "price", "last", "close"):
if key in d and d[key] is not None:
return float(d[key])
except Exception:
pass
return float(fallback_close)
@staticmethod
def ema(values, n: int) -> float:
k = 2 / (n + 1)
e = values[0]
for v in values[1:]:
e = v * k + e * (1 - k)
return e
@staticmethod
def atr(klines, n: int) -> float:
if len(klines) < n + 1:
return 0.0
trs = []
for i in range(-n, 0):
cur = klines[i]
prev = klines[i - 1]
tr = max(
cur["high"] - cur["low"],
abs(cur["high"] - prev["close"]),
abs(cur["low"] - prev["close"]),
)
trs.append(tr)
return sum(trs) / len(trs)
@staticmethod
def _wilder_smooth(prev: float, val: float, n: int) -> float:
# Wilder smoothing: prev - prev/n + val
return prev - (prev / n) + val
def adx(self, klines, n: int):
"""
返回 (adx, plus_di, minus_di)
采用经典 Wilder ADX/DI 计算足够稳避免趋势期逆势反复开仓
"""
if len(klines) < (2 * n + 2):
return 0.0, 0.0, 0.0
highs = [k["high"] for k in klines]
lows = [k["low"] for k in klines]
closes = [k["close"] for k in klines]
# 计算 TR, +DM, -DM 序列从1开始
tr_list = []
pdm_list = []
mdm_list = []
for i in range(1, len(klines)):
high = highs[i]
low = lows[i]
prev_close = closes[i - 1]
prev_high = highs[i - 1]
prev_low = lows[i - 1]
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
up_move = high - prev_high
down_move = prev_low - low
pdm = up_move if (up_move > down_move and up_move > 0) else 0.0
mdm = down_move if (down_move > up_move and down_move > 0) else 0.0
tr_list.append(tr)
pdm_list.append(pdm)
mdm_list.append(mdm)
# 初始平滑值第一个n段的和
tr14 = sum(tr_list[:n])
pdm14 = sum(pdm_list[:n])
mdm14 = sum(mdm_list[:n])
def safe_div(a, b):
return (a / b) if b != 0 else 0.0
plus_di = 100.0 * safe_div(pdm14, tr14)
minus_di = 100.0 * safe_div(mdm14, tr14)
dx = 100.0 * safe_div(abs(plus_di - minus_di), (plus_di + minus_di))
dx_list = [dx]
# 继续平滑并计算后续 DX
for i in range(n, len(tr_list)):
tr14 = self._wilder_smooth(tr14, tr_list[i], n)
pdm14 = self._wilder_smooth(pdm14, pdm_list[i], n)
mdm14 = self._wilder_smooth(mdm14, mdm_list[i], n)
plus_di = 100.0 * safe_div(pdm14, tr14)
minus_di = 100.0 * safe_div(mdm14, tr14)
dx = 100.0 * safe_div(abs(plus_di - minus_di), (plus_di + minus_di))
dx_list.append(dx)
# ADX 是 DX 的 Wilder 平滑,常见做法:先取前 n 个 DX 的均值作为初值
if len(dx_list) < (n + 1):
return 0.0, plus_di, minus_di
adx0 = sum(dx_list[:n]) / n
adx_val = adx0
for j in range(n, len(dx_list)):
adx_val = (adx_val * (n - 1) + dx_list[j]) / n
return float(adx_val), float(plus_di), float(minus_di)
def is_danger_market(self, klines, price: float) -> bool:
last = klines[-1]
body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0
if body >= self.cfg.big_body_kill:
return True
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
if atr_ratio >= self.cfg.atr_ratio_kill:
return True
return False
def atr_ratio_baseline(self, klines) -> float:
"""简化版ATR基准计算"""
window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1)
if window <= 10:
logger.warning(f"数据不足计算基准: {len(klines)}根K线")
return 0.0
ratios = []
step = 3
for i in range(-window, 0, step):
if len(klines) + i < self.cfg.atr_len + 1:
continue
start_idx = len(klines) + i - self.cfg.atr_len
end_idx = len(klines) + i
if start_idx < 0 or end_idx <= start_idx:
continue
sub_klines = klines[start_idx:end_idx]
if len(sub_klines) >= self.cfg.atr_len + 1:
a = self.atr(sub_klines, self.cfg.atr_len)
price = klines[end_idx - 1]["close"]
if a > 0 and price > 0:
ratio = a / price
if 0.0001 < ratio < 0.01:
ratios.append(ratio)
if len(ratios) < 5:
a = self.atr(klines[-60:], self.cfg.atr_len)
price = klines[-1]["close"]
if a > 0 and price > 0:
baseline = a / price
logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%")
return baseline
return 0.0
ratios.sort()
idx = min(len(ratios) - 1, max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1))))
baseline = ratios[idx]
logger.debug(
f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%, "
f"范围=[{ratios[0] * 100:.4f}%, {ratios[-1] * 100:.4f}%]"
)
return baseline
def get_base_ratio_cached(self, klines) -> float:
"""获取缓存的基准波动率,定期刷新"""
now = time.time()
refresh_sec = self.cfg.base_ratio_refresh_sec
if (self._base_ratio_cached is None or (now - self._base_ratio_ts) >= refresh_sec):
baseline = self.atr_ratio_baseline(klines)
if baseline > 0.0001:
self._base_ratio_cached = baseline
self._base_ratio_ts = now
logger.info(f"基准波动率更新: {baseline * 100:.4f}%")
else:
current_price = klines[-1]["close"] if klines else 3000
if current_price > 4000:
default_baseline = 0.0010
elif current_price > 3500:
default_baseline = 0.0012
elif current_price > 3000:
default_baseline = 0.0015
elif current_price > 2500:
default_baseline = 0.0018
else:
default_baseline = 0.0020
self._base_ratio_cached = default_baseline
self._base_ratio_ts = now
logger.warning(
f"使用价格动态默认基准: {default_baseline * 100:.4f}% (价格=${current_price:.0f})"
)
return self._base_ratio_cached
@staticmethod
def _clamp(x: float, lo: float, hi: float) -> float:
return max(lo, min(hi, x))
def dynamic_thresholds(self, atr_ratio: float, base_ratio: float):
"""
entry/tp/sl 全部动态修复版
- vol_scaleatr_ratio/base_ratio 限幅
- floor方案一 (floor = clamp(min, k*base_ratio, max))
- 最终阈值max(floor, k * vol_scale * atr_ratio)
"""
if atr_ratio <= 0:
logger.warning(f"ATR比率异常: {atr_ratio}")
atr_ratio = 0.001
if base_ratio < 0.0005:
base_ratio = max(0.001, atr_ratio * 1.2)
logger.debug(f"基准太小使用调整后的atr_ratio: {base_ratio * 100:.4f}%")
if base_ratio > 0:
raw_scale = atr_ratio / base_ratio
vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max)
logger.debug(
f"vol_scale: {raw_scale:.2f}{vol_scale:.2f} (atr={atr_ratio * 100:.3f}%, base={base_ratio * 100:.3f}%)"
)
else:
vol_scale = 1.0
logger.warning("基准无效使用默认vol_scale=1.0")
entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio
entry_floor = self._clamp(entry_floor_raw, self.cfg.entry_dev_floor_min, self.cfg.entry_dev_floor_max)
tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio
tp_floor = self._clamp(tp_floor_raw, self.cfg.tp_floor_min, self.cfg.tp_floor_max)
sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio
sl_floor = self._clamp(sl_floor_raw, self.cfg.sl_floor_min, self.cfg.sl_floor_max)
entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio
entry_dev = max(entry_floor, entry_dev_atr_part)
tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio
tp = max(tp_floor, tp_atr_part)
sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio
sl = max(sl_floor, sl_atr_part)
entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min)
logger.info(
f"动态阈值: atr={atr_ratio * 100:.4f}%, base={base_ratio * 100:.4f}%, "
f"vol_scale={vol_scale:.2f}, floor={entry_floor * 100:.4f}%, "
f"atr_part={entry_dev_atr_part * 100:.4f}%, 最终entry_dev={entry_dev * 100:.4f}%"
)
return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor
# ----------------- 账户/仓位 -----------------
def get_assets_available(self) -> float:
try:
resp = self.contractAPI.get_assets_detail()[0]
if resp.get("code") != 1000:
return 0.0
data = resp.get("data")
if isinstance(data, dict):
return float(data.get("available_balance", 0))
if isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return 0.0
except Exception as e:
logger.error(f"余额查询异常: {e}")
return 0.0
def get_position_status(self) -> bool:
try:
resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
if resp.get("code") != 1000:
return False
positions = resp.get("data", [])
if not positions:
self.pos = 0
return True
p = positions[0]
self.pos = 1 if p["position_type"] == 1 else -1
return True
except Exception as e:
logger.error(f"持仓查询异常: {e}")
self.ding(f"持仓查询异常: {e}", error=True)
return False
def get_equity_proxy(self) -> float:
return self.get_assets_available()
def refresh_daily_baseline(self):
today = datetime.date.today()
if today != self.day_tag:
self.day_tag = today
self.day_start_equity = None
self.trading_enabled = True
self.ding(f"新的一天({today}):重置日内风控基准")
def risk_kill_switch(self):
self.refresh_daily_baseline()
equity = self.get_equity_proxy()
if equity <= 0:
return
if self.day_start_equity is None:
self.day_start_equity = equity
logger.info(f"日内权益基准设定:{equity:.2f} USDT")
return
pnl = (equity - self.day_start_equity) / self.day_start_equity
if pnl <= -self.cfg.daily_loss_limit:
self.trading_enabled = False
self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True)
if pnl >= self.cfg.daily_profit_cap:
self.trading_enabled = False
self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机")
# ----------------- 下单 -----------------
def calculate_size(self, price: float) -> int:
bal = self.get_assets_available()
if bal < 10:
return 0
margin = bal * self.cfg.risk_percent
lev = int(self.cfg.leverage)
# ⚠️ 沿用你的原假设1张≈0.001ETH
size = int((margin * lev) / (price * 0.001))
size = max(self.cfg.min_size, size)
size = min(self.cfg.max_size, size)
return size
def place_market_order(self, side: int, size: int) -> bool:
if size <= 0:
return False
client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
resp = self.contractAPI.post_submit_order(
contract_symbol=self.cfg.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type="market",
leverage=self.cfg.leverage,
open_type=self.cfg.open_type,
size=size
)[0]
logger.info(f"order_resp: {resp}")
if resp.get("code") == 1000:
return True
self.ding(f"下单失败: {resp}", error=True)
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
self.ding(f"API下单异常: {e}", error=True)
return False
except Exception as e:
logger.error(f"下单未知异常: {e}")
self.ding(f"下单未知异常: {e}", error=True)
return False
def close_position_all(self):
if self.pos == 1:
ok = self.place_market_order(3, 999999)
if ok:
self.pos = 0
elif self.pos == -1:
ok = self.place_market_order(2, 999999)
if ok:
self.pos = 0
# ----------------- 止损后机制 -----------------
def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool:
"""检查是否需要应用重新入场惩罚(你原逻辑保留)"""
if self.last_sl_dir == 0:
return False
if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec:
self.last_sl_dir = 0
return False
reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev)
if abs(dev) <= reset_band:
self.last_sl_dir = 0
return False
return True
def _post_sl_dynamic_mult(self) -> float:
"""计算止损后SL放宽倍数"""
if self.post_sl_dir == 0:
return 1.0
if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec:
self.post_sl_dir = 0
self.post_sl_vol_scale = 1.0
return 1.0
raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0)
raw = max(1.0, raw) # 不缩小止损,只放宽
return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw))
# ----------------- 交易逻辑 -----------------
def in_cooldown(self) -> bool:
"""检查是否在冷却期内(新增:止损后可额外延长冷却,用于抑制反手连环)"""
cd = self.cfg.cooldown_sec_after_exit
if self.last_sl_ts and (time.time() - self.last_sl_ts) < self.cfg.reentry_penalty_max_sec:
cd += max(0, self.cfg.cooldown_sec_after_sl_extra)
return (time.time() - self.last_exit_ts) < cd
def _adx_blocks_entry(self, adx_val: float, plus_di: float, minus_di: float, want_dir: int) -> bool:
"""
ADX 趋势过滤
- want_dir: 1=想开多, -1=想开空
"""
if not self.cfg.enable_adx_filter:
return False
if adx_val < self.cfg.adx_threshold:
return False
if self.cfg.adx_mode == "block_all":
return True
# block_countertrend只禁止逆 DI 方向
# 上升趋势:+DI > -DI => 禁止开空
# 下降趋势:-DI > +DI => 禁止开多
if plus_di > minus_di and want_dir == -1:
return True
if minus_di > plus_di and want_dir == 1:
return True
return False
def maybe_enter(self, price: float, ema_value: float, entry_dev: float,
adx_val: float, plus_di: float, minus_di: float):
"""检查并执行入场新增ADX趋势过滤防止趋势期逆势反复开仓"""
if self.pos != 0:
return
if self.in_cooldown():
return
dev = (price - ema_value) / ema_value if ema_value else 0.0
size = self.calculate_size(price)
if size <= 0:
return
penalty_active = self._reentry_penalty_active(dev, entry_dev)
long_th = -entry_dev
short_th = entry_dev
if penalty_active:
if self.last_sl_dir == 1:
long_th = -entry_dev * self.cfg.reentry_penalty_mult
logger.info(f"多头止损后惩罚生效: long_th={long_th * 100:.3f}%")
elif self.last_sl_dir == -1:
short_th = entry_dev * self.cfg.reentry_penalty_mult
logger.info(f"空头止损后惩罚生效: short_th={short_th * 100:.3f}%")
logger.info(
f"入场检查: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% "
f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) "
f"ADX={adx_val:.2f} +DI={plus_di:.2f} -DI={minus_di:.2f} "
f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}"
)
# 先判断信号,再用 ADX 过滤(这样日志更直观)
if dev <= long_th:
if self._adx_blocks_entry(adx_val, plus_di, minus_di, want_dir=1):
logger.warning(
f"ADX过滤趋势期禁止逆势开多ADX={adx_val:.2f}, +DI={plus_di:.2f}, -DI={minus_di:.2f}"
)
return
if self.place_market_order(1, size):
self.pos = 1
self.entry_price = price
self.entry_ts = time.time()
self.ding(f"✅开多dev={dev * 100:.3f}% size={size} entry={price:.2f}")
elif dev >= short_th:
if self._adx_blocks_entry(adx_val, plus_di, minus_di, want_dir=-1):
logger.warning(
f"ADX过滤趋势期禁止逆势开空ADX={adx_val:.2f}, +DI={plus_di:.2f}, -DI={minus_di:.2f}"
)
return
if self.place_market_order(4, size):
self.pos = -1
self.entry_price = price
self.entry_ts = time.time()
self.ding(f"✅开空dev={dev * 100:.3f}% size={size} entry={price:.2f}")
def maybe_exit(self, price: float, tp: float, sl: float, vol_scale: float):
"""检查并执行出场"""
if self.pos == 0 or self.entry_price is None or self.entry_ts is None:
return
hold = time.time() - self.entry_ts
if self.pos == 1:
pnl = (price - self.entry_price) / self.entry_price
else:
pnl = (self.entry_price - price) / self.entry_price
sl_mult = 1.0
if self.post_sl_dir == self.pos and self.post_sl_dir != 0:
sl_mult = self._post_sl_dynamic_mult()
effective_sl = sl * sl_mult
if pnl >= tp:
self.close_position_all()
self.ding(f"🎯止盈pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%")
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
elif pnl <= -effective_sl:
sl_dir = self.pos
self.close_position_all()
self.ding(
f"🛑止损pnl={pnl * 100:.3f}% price={price:.2f} "
f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})",
error=True
)
# 记录止损方向与时间
self.last_sl_dir = sl_dir
self.last_sl_ts = time.time()
self.post_sl_dir = sl_dir
self.post_sl_ts = time.time()
self.post_sl_vol_scale = float(vol_scale)
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
elif hold >= self.cfg.max_hold_sec:
self.close_position_all()
self.ding(f"⏱超时hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}")
self.entry_price, self.entry_ts = None, None
self.last_exit_ts = time.time()
def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float,
atr_ratio: float, base_ratio: float, vol_scale: float,
entry_dev: float, tp: float, sl: float,
entry_floor: float, tp_floor: float, sl_floor: float,
adx_val: float, plus_di: float, minus_di: float):
"""限频状态通知"""
now = time.time()
if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec:
return
self._last_status_notify_ts = now
direction_str = "" if self.pos == 1 else ("" if self.pos == -1 else "")
penalty_active = self._reentry_penalty_active(dev, entry_dev)
sl_mult = 1.0
if self.pos != 0 and self.post_sl_dir == self.pos:
sl_mult = self._post_sl_dynamic_mult()
base_age = int(now - self._base_ratio_ts) if self._base_ratio_ts else -1
msg = (
f"【BitMart {self.cfg.contract_symbol}1m均值回归(动态阈值+ADX过滤)】\n"
f"📊 状态:{direction_str}\n"
f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}{ema_value:.2f}\n"
f"📈 偏离:{dev * 100:.3f}% (入场阈值:±{entry_dev * 100:.3f}%)\n"
f"🌊 波动率ATR比={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n"
f"🧭 趋势ADX={adx_val:.2f} | +DI={plus_di:.2f} | -DI={minus_di:.2f} "
f"(阈值={self.cfg.adx_threshold:.1f}, 模式={self.cfg.adx_mode})\n"
f"🎯 动态Floor入场={entry_floor * 100:.3f}% | 止盈={tp_floor * 100:.3f}% | 止损={sl_floor * 100:.3f}%\n"
f"💰 止盈/止损:{tp * 100:.3f}% / {sl * 100:.3f}% (盈亏比:{tp / sl:.2f})\n"
f"🔄 基准刷新:{self.cfg.base_ratio_refresh_sec}s (已过={base_age}s)\n"
f"⚠️ 止损同向加门槛:{'开启' if penalty_active else '关闭'} (方向={self.last_sl_dir})\n"
f"💳 可用余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x\n"
f"⏱️ 持仓限制:{self.cfg.max_hold_sec}s | 冷却:{self.cfg.cooldown_sec_after_exit}s"
)
self.ding(msg)
def action(self):
"""主循环"""
if not self.set_leverage():
self.ding("杠杆设置失败,停止运行", error=True)
return
while True:
now_dt = datetime.datetime.now()
self.pbar.n = now_dt.second
self.pbar.refresh()
# 1. 获取K线数据
klines = self.get_klines_cached()
if not klines or len(klines) < (max(self.cfg.ema_len + 5, 2 * self.cfg.adx_len + 5)):
logger.warning("K线数据不足等待...")
time.sleep(1)
continue
# 2. 计算技术指标
last_k = klines[-1]
closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]]
ema_value = self.ema(closes, self.cfg.ema_len)
price = self.get_last_price(fallback_close=float(last_k["close"]))
dev = (price - ema_value) / ema_value if ema_value else 0.0
# 3. 计算波动率相关指标
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
base_ratio = self.get_base_ratio_cached(klines)
# 4. 计算动态阈值
entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds(
atr_ratio, base_ratio
)
# 5. 计算 ADX新增
adx_val, plus_di, minus_di = self.adx(klines, self.cfg.adx_len)
logger.info(f"ADX: {adx_val:.2f} (+DI={plus_di:.2f}, -DI={minus_di:.2f})")
# 6. 风控检查
self.risk_kill_switch()
# 7. 获取持仓状态
if not self.get_position_status():
time.sleep(1)
continue
# 8. 检查交易是否启用
if not self.trading_enabled:
if self.pos != 0:
self.close_position_all()
logger.warning("交易被禁用(风控触发),等待...")
time.sleep(5)
continue
# 9. 检查危险市场
if self.is_danger_market(klines, price):
logger.warning("危险模式:高波动/大实体K暂停开仓")
self.maybe_exit(price, tp, sl, vol_scale)
time.sleep(self.cfg.tick_refresh_sec)
continue
# 10. 执行交易逻辑
self.maybe_exit(price, tp, sl, vol_scale)
self.maybe_enter(price, ema_value, entry_dev, adx_val, plus_di, minus_di)
# 11. 状态通知
bal = self.get_assets_available()
self.notify_status_throttled(
price, ema_value, dev, bal,
atr_ratio, base_ratio, vol_scale,
entry_dev, tp, sl,
entry_floor, tp_floor, sl_floor,
adx_val, plus_di, minus_di
)
time.sleep(self.cfg.tick_refresh_sec)
if __name__ == "__main__":
"""
Windows PowerShell:
setx BITMART_API_KEY "你的key"
setx BITMART_SECRET_KEY "你的secret"
setx BITMART_MEMO "合约交易"
重新打开终端再运行
Linux/macOS:
export BITMART_API_KEY="你的key"
export BITMART_SECRET_KEY="你的secret"
export BITMART_MEMO "合约交易"
"""
cfg = StrategyConfig()
bot = BitmartFuturesMeanReversionBot(cfg)
logger.remove()
logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO")
2025-12-19 19:16:41 +08:00
try:
2025-12-23 11:12:32 +08:00
bot.action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
bot.ding("🤖 策略已手动停止")
except Exception as e:
logger.error(f"程序异常退出: {e}")
bot.ding(f"❌ 策略异常退出: {e}", error=True)
raise