import time import uuid import datetime import requests from typing import Tuple from tqdm import tqdm from loguru import logger from dataclasses import dataclass from bitmart.api_contract import APIContract from DrissionPage import ChromiumPage, ChromiumOptions from bitmart.lib.cloud_exceptions import APIException from bit_tools import openBrowser from 交易.tools import send_dingtalk_message @dataclass class StrategyConfig: # ============================= # 1m | ETH 永续 | 控止损≤5/日 # ============================= # ===== 合约 ===== contract_symbol: str = "ETHUSDT" open_type: str = "cross" leverage: str = "30" # ===== K线与指标 ===== step_min: int = 1 lookback_min: int = 240 ema_len: int = 36 atr_len: int = 14 # ========================================================= # ✅ 自动阈值:ATR/Price 分位数基准(更稳,不被短时噪声带跑) # ========================================================= vol_baseline_window: int = 60 vol_baseline_quantile: float = 0.65 vol_scale_min: float = 0.80 vol_scale_max: float = 1.60 # ✅ baseline 每 60 秒刷新一次(体感更明显、也省CPU) base_ratio_refresh_sec: int = 60 # ========================================================= # ✅ 动态 floor(方案一) # floor = clamp(min, base_k * base_ratio, max) # 目的:跟着典型波动变,过滤小噪声;tp/sl 也随环境自适应 # ========================================================= # entry_dev_floor 动态 entry_dev_floor_min: float = 0.0012 # 0.12% entry_dev_floor_max: float = 0.0030 # 0.30%(可按你偏好调) entry_dev_floor_base_k: float = 1.10 # entry_floor = 1.10 * base_ratio # tp_floor 动态 tp_floor_min: float = 0.0006 # 0.06% tp_floor_max: float = 0.0020 # 0.20% tp_floor_base_k: float = 0.55 # tp_floor = 0.55 * base_ratio(止盈别太大,1m回归更实际) # sl_floor 动态 sl_floor_min: float = 0.0018 # 0.18% sl_floor_max: float = 0.0060 # 0.60% sl_floor_base_k: float = 1.35 # sl_floor = 1.35 * base_ratio(ETH 1m 插针多,止损下限可更稳) # ========================================================= # ✅ 动态阈值倍率(仍然保留你原来思路) # ========================================================= entry_k: float = 1.45 tp_k: float = 0.65 sl_k: float = 1.05 # ===== 时间/冷却 ===== max_hold_sec: int = 75 cooldown_sec_after_exit: int = 20 # ===== 下单/仓位 ===== risk_percent: float = 0.004 min_size: int = 1 max_size: int = 5000 # ===== 日内风控 ===== daily_loss_limit: float = 0.02 daily_profit_cap: float = 0.01 # ===== 危险模式过滤 ===== atr_ratio_kill: float = 0.0038 big_body_kill: float = 0.010 # ===== 轮询节奏 ===== klines_refresh_sec: int = 10 tick_refresh_sec: int = 1 status_notify_sec: int = 60 # ========================================================= # ✅ 止损后同向入场加门槛(但不禁止同向重入) # ========================================================= reentry_penalty_mult: float = 1.55 reentry_penalty_max_sec: int = 180 reset_band_k: float = 0.45 reset_band_floor: float = 0.0006 # ========================================================= # ✅ 止损后同方向 SL 放宽幅度与"止损时 vol_scale"联动 # ========================================================= post_sl_sl_max_sec: int = 90 post_sl_mult_min: float = 1.02 post_sl_mult_max: float = 1.16 post_sl_vol_alpha: float = 0.20 # ========================================================= # ✅ 正常平仓条件:价格回归到EMA附近时平仓 # ========================================================= normal_exit_threshold: float = 0.0003 # 0.03%,价格回归到EMA附近时平仓 normal_exit_min_profit: float = 0.0002 # 0.02%,正常平仓最小盈利要求 # ========================================================= # ✅ 手续费配置(固定10u,30倍杠杆) # ========================================================= fixed_margin: float = 10.0 # 固定每单10u保证金 platform_fee_rate: float = 0.0005 # 平台手续费:开仓价值的万分之五 rebate_rate: float = 0.90 # 返佣比例:90% class BitmartFuturesMeanReversionBot: def __init__(self, cfg: StrategyConfig, bit_id=None): self.bit_id = bit_id self.page: ChromiumPage | None = None self.cfg = cfg self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" self.memo = "合约交易" if not self.api_key or not self.secret_key: raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)") self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) # 持仓状态: -1 空, 0 无, 1 多 self.pos = 0 self.entry_price = None self.entry_ts = None self.last_exit_ts = 0 # 开仓信息(用于手续费计算) self.entry_margin = None # 开仓保证金(固定10u) self.entry_position_value = None # 开仓时的仓位价值(保证金 * 杠杆) # 日内权益基准 self.day_start_equity = None self.trading_enabled = True self.day_tag = datetime.date.today() # 缓存 self._klines_cache = None self._klines_cache_ts = 0 self._last_status_notify_ts = 0 # ✅ base_ratio 缓存 self._base_ratio_cached = 0.0015 # 初始化默认值 0.15% self._base_ratio_ts = 0.0 # ✅ 止损后"同向入场加门槛"状态 self.last_sl_dir = 0 # 1=多止损,-1=空止损,0=无 self.last_sl_ts = 0.0 # ✅ 止损后"同方向 SL 联动放宽"状态 self.post_sl_dir = 0 self.post_sl_ts = 0.0 self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90) logger.info(f"初始化完成,基准波动率默认值: {self._base_ratio_cached * 100:.4f}%") # ----------------- 通用工具 ----------------- def ding(self, msg, error=False): prefix = "❌bitmart:" if error else "🔔bitmart:" if error: for _ in range(1): send_dingtalk_message(f"{prefix}{msg}") else: send_dingtalk_message(f"{prefix}{msg}") def set_leverage(self) -> bool: try: resp = self.contractAPI.post_submit_leverage( contract_symbol=self.cfg.contract_symbol, leverage=self.cfg.leverage, open_type=self.cfg.open_type )[0] if resp.get("code") == 1000: logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x") return True logger.error(f"设置杠杆失败: {resp}") self.ding(f"设置杠杆失败: {resp}", error=True) return False except Exception as e: logger.error(f"设置杠杆异常: {e}") self.ding(f"设置杠杆异常: {e}", error=True) return False # ----------------- 行情/指标 ----------------- def get_klines_cached(self): now = time.time() if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec: return self._klines_cache kl = self.get_klines() if kl: self._klines_cache = kl self._klines_cache_ts = now return self._klines_cache def get_klines(self): try: end_time = int(time.time()) start_time = end_time - 60 * self.cfg.lookback_min resp = self.contractAPI.get_kline( contract_symbol=self.cfg.contract_symbol, step=self.cfg.step_min, start_time=start_time, end_time=end_time )[0] if resp.get("code") != 1000: logger.error(f"获取K线失败: {resp}") return None data = resp.get("data", []) formatted = [] for k in data: formatted.append({ "id": int(k["timestamp"]), "open": float(k["open_price"]), "high": float(k["high_price"]), "low": float(k["low_price"]), "close": float(k["close_price"]), }) formatted.sort(key=lambda x: x["id"]) return formatted except Exception as e: logger.error(f"获取K线异常: {e}") self.ding(f"获取K线异常: {e}", error=True) return None def get_last_price(self, fallback_close: float) -> float: try: if hasattr(self.contractAPI, "get_contract_details"): r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0] d = r.get("data") if isinstance(r, dict) else None if isinstance(d, dict): for key in ("last_price", "mark_price", "index_price"): if key in d and d[key] is not None: return float(d[key]) if hasattr(self.contractAPI, "get_ticker"): r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0] d = r.get("data") if isinstance(r, dict) else None if isinstance(d, dict): for key in ("last_price", "price", "last", "close"): if key in d and d[key] is not None: return float(d[key]) except Exception: pass return float(fallback_close) @staticmethod def ema(values, n: int) -> float: k = 2 / (n + 1) e = values[0] for v in values[1:]: e = v * k + e * (1 - k) return e @staticmethod def atr(klines, n: int) -> float: if len(klines) < n + 1: return 0.0 trs = [] for i in range(-n, 0): cur = klines[i] prev = klines[i - 1] tr = max( cur["high"] - cur["low"], abs(cur["high"] - prev["close"]), abs(cur["low"] - prev["close"]), ) trs.append(tr) return sum(trs) / len(trs) def is_danger_market(self, klines, price: float) -> bool: last = klines[-1] body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0 if body >= self.cfg.big_body_kill: return True a = self.atr(klines, self.cfg.atr_len) atr_ratio = (a / price) if price > 0 else 0.0 if atr_ratio >= self.cfg.atr_ratio_kill: return True return False def atr_ratio_baseline(self, klines) -> float: """简化版ATR基准计算""" window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1) if window <= 10: # 数据太少 logger.warning(f"数据不足计算基准: {len(klines)}根K线") return 0.0 ratios = [] # 简化计算:每隔3根K线计算一个ATR比率(减少计算量) step = 3 for i in range(-window, 0, step): if len(klines) + i < self.cfg.atr_len + 1: continue # 计算当前位置的ATR start_idx = len(klines) + i - self.cfg.atr_len end_idx = len(klines) + i if start_idx < 0 or end_idx <= start_idx: continue sub_klines = klines[start_idx:end_idx] # 确保有足够数据计算ATR if len(sub_klines) >= self.cfg.atr_len + 1: a = self.atr(sub_klines, self.cfg.atr_len) price = klines[end_idx - 1]["close"] if a > 0 and price > 0: ratio = a / price if 0.0001 < ratio < 0.01: # 过滤异常值 ratios.append(ratio) if len(ratios) < 5: # 样本太少 # 尝试直接使用整个数据计算一个ATR比率 a = self.atr(klines[-60:], self.cfg.atr_len) # 使用最近60根K线 price = klines[-1]["close"] if a > 0 and price > 0: baseline = a / price logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%") return baseline else: return 0.0 # 计算分位数 ratios.sort() idx = min(len(ratios) - 1, max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1)))) baseline = ratios[idx] logger.debug(f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%, " f"范围=[{ratios[0] * 100:.4f}%, {ratios[-1] * 100:.4f}%]") return baseline def get_base_ratio_cached(self, klines) -> float: """获取缓存的基准波动率,定期刷新""" now = time.time() refresh_sec = self.cfg.base_ratio_refresh_sec if (self._base_ratio_cached is None or (now - self._base_ratio_ts) >= refresh_sec): # 使用简单版本的基准计算 baseline = self.atr_ratio_baseline(klines) if baseline > 0.0001: # 大于0.01%才认为是有效值 self._base_ratio_cached = baseline self._base_ratio_ts = now logger.info(f"基准波动率更新: {baseline * 100:.4f}%") else: # 使用基于价格的动态默认值 current_price = klines[-1]["close"] if klines else 3000 # ETH价格越高,基准波动率越小(百分比) if current_price > 4000: default_baseline = 0.0010 # 0.10% elif current_price > 3500: default_baseline = 0.0012 # 0.12% elif current_price > 3000: default_baseline = 0.0015 # 0.15% elif current_price > 2500: default_baseline = 0.0018 # 0.18% else: default_baseline = 0.0020 # 0.20% self._base_ratio_cached = default_baseline self._base_ratio_ts = now logger.warning(f"使用价格动态默认基准: {default_baseline * 100:.4f}% " f"(价格=${current_price:.0f})") return self._base_ratio_cached @staticmethod def _clamp(x: float, lo: float, hi: float) -> float: """限制数值在指定范围内""" return max(lo, min(hi, x)) def dynamic_thresholds(self, atr_ratio: float, base_ratio: float): """ ✅ entry/tp/sl 全部动态(修复版): - vol_scale:atr_ratio/base_ratio 限幅 - floor:方案一 (floor = clamp(min, k*base_ratio, max)) - 最终阈值:max(floor, k * vol_scale * atr_ratio) """ # 1) 检查输入有效性 if atr_ratio <= 0: logger.warning(f"ATR比率异常: {atr_ratio}") atr_ratio = 0.001 # 默认值 0.1% # 2) 如果base_ratio太小或无效,使用调整后的atr_ratio if base_ratio < 0.0005: # 小于0.05%视为无效 base_ratio = max(0.001, atr_ratio * 1.2) # 比当前ATR比率稍大 logger.debug(f"基准太小,使用调整后的atr_ratio: {base_ratio * 100:.4f}%") # 3) vol_scale计算 if base_ratio > 0: raw_scale = atr_ratio / base_ratio vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max) logger.debug( f"vol_scale: {raw_scale:.2f} → {vol_scale:.2f} (atr={atr_ratio * 100:.3f}%, base={base_ratio * 100:.3f}%)") else: vol_scale = 1.0 logger.warning(f"基准无效,使用默认vol_scale=1.0") # 4) 动态floor计算 # Entry floor entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio entry_floor = self._clamp( entry_floor_raw, self.cfg.entry_dev_floor_min, self.cfg.entry_dev_floor_max, ) # TP floor tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio tp_floor = self._clamp( tp_floor_raw, self.cfg.tp_floor_min, self.cfg.tp_floor_max, ) # SL floor sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio sl_floor = self._clamp( sl_floor_raw, self.cfg.sl_floor_min, self.cfg.sl_floor_max, ) # 5) 最终阈值计算 entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio entry_dev = max(entry_floor, entry_dev_atr_part) tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio tp = max(tp_floor, tp_atr_part) sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio sl = max(sl_floor, sl_atr_part) # 6) 确保entry_dev不会太小 entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min) # 7) 输出详细信息 logger.info( f"动态阈值: atr={atr_ratio * 100:.4f}%, base={base_ratio * 100:.4f}%, " f"vol_scale={vol_scale:.2f}, floor={entry_floor * 100:.4f}%, " f"atr_part={entry_dev_atr_part * 100:.4f}%, 最终entry_dev={entry_dev * 100:.4f}%" ) return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor # ----------------- 账户/仓位 ----------------- def get_assets_available(self) -> float: try: resp = self.contractAPI.get_assets_detail()[0] if resp.get("code") != 1000: return 0.0 data = resp.get("data") if isinstance(data, dict): return float(data.get("available_balance", 0)) if isinstance(data, list): for asset in data: if asset.get("currency") == "USDT": return float(asset.get("available_balance", 0)) return 0.0 except Exception as e: logger.error(f"余额查询异常: {e}") return 0.0 def get_position_status(self) -> bool: try: resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0] if resp.get("code") != 1000: return False positions = resp.get("data", []) if not positions: self.pos = 0 return True p = positions[0] self.pos = 1 if p["position_type"] == 1 else -1 return True except Exception as e: logger.error(f"持仓查询异常: {e}") self.ding(f"持仓查询异常: {e}", error=True) return False def get_equity_proxy(self) -> float: return self.get_assets_available() def refresh_daily_baseline(self): today = datetime.date.today() if today != self.day_tag: self.day_tag = today self.day_start_equity = None self.trading_enabled = True self.ding(f"新的一天({today}):重置日内风控基准") def risk_kill_switch(self): self.refresh_daily_baseline() equity = self.get_equity_proxy() if equity <= 0: return if self.day_start_equity is None: self.day_start_equity = equity logger.info(f"日内权益基准设定:{equity:.2f} USDT") return pnl = (equity - self.day_start_equity) / self.day_start_equity if pnl <= -self.cfg.daily_loss_limit: self.trading_enabled = False self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True) if pnl >= self.cfg.daily_profit_cap: self.trading_enabled = False self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机") # ----------------- 下单 ----------------- def calculate_size(self, price: float) -> int: """ 计算仓位大小 固定每单10u保证金,30倍杠杆 """ bal = self.get_assets_available() if bal < self.cfg.fixed_margin: logger.warning(f"余额不足:{bal:.2f} USDT < {self.cfg.fixed_margin} USDT") return 0 # 固定保证金10u margin = self.cfg.fixed_margin lev = int(self.cfg.leverage) # ⚠️ 沿用你的原假设:1张≈0.001ETH # 仓位价值 = 保证金 * 杠杆 = 10 * 30 = 300u # size = 仓位价值 / (价格 * 0.001) size = int((margin * lev) / (price * 0.001)) size = max(self.cfg.min_size, size) size = min(self.cfg.max_size, size) logger.info(f"计算仓位:保证金={margin}u, 杠杆={lev}x, 仓位价值={margin * lev}u, size={size}") return size def place_market_order(self, side: int, size: int) -> bool: """ 【下单函数】实际执行下单操作 side: 1=开多, 4=开空, 2=平多, 3=平空 """ if size <= 0: return False try: # 开多单 if side == 1: self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(size, clear=True) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') logger.info(f"✅ 开多单: size={size}") return True # 开空单 elif side == 4: self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]') self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(size, clear=True) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') logger.info(f"✅ 开空单: size={size}") return True # 平多单(平多 = 卖出) elif side == 2: self.click_safe('x://button[normalize-space(text()) ="市价"]') time.sleep(0.3) # 等待界面响应 # 平仓时size可以设置大一些确保全部平仓 self.page.ele('x://*[@id="size_0"]').input(size, clear=True) time.sleep(0.3) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') logger.info(f"✅ 平多单操作执行: size={size}") return True # 平空单(平空 = 买入) elif side == 3: self.click_safe('x://button[normalize-space(text()) ="市价"]') time.sleep(0.3) # 等待界面响应 self.page.ele('x://*[@id="size_0"]').input(size, clear=True) time.sleep(0.3) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') logger.info(f"✅ 平空单操作执行: size={size}") return True else: logger.error(f"未知的订单方向: side={side}") return False except Exception as e: logger.error(f"下单异常: {e}") self.ding(f"下单异常: {e}", error=True) return False def calculate_fee(self, position_value: float) -> float: """ 计算手续费 平台手续费 = 仓位价值 * 0.0005(万分之五) 实际手续费 = 平台手续费 * (1 - 返佣比例) = 平台手续费 * 0.1 """ platform_fee = position_value * self.cfg.platform_fee_rate actual_fee = platform_fee * (1 - self.cfg.rebate_rate) return actual_fee def calculate_net_pnl(self, price: float) -> Tuple[float, float, float]: """ 计算扣除手续费后的净盈亏 返回: (净盈亏比例, 总手续费, 毛盈亏比例) 注意:在杠杆交易中,盈亏比例 = 杠杆倍数 * 价格变动比例 例如:30倍杠杆,价格涨1%,实际盈亏是30% """ if self.pos == 0 or self.entry_price is None or self.entry_margin is None or self.entry_position_value is None: return 0.0, 0.0, 0.0 leverage = int(self.cfg.leverage) # 计算价格变动比例 if self.pos == 1: price_change_ratio = (price - self.entry_price) / self.entry_price else: price_change_ratio = (self.entry_price - price) / self.entry_price # 计算毛盈亏比例(考虑杠杆) gross_pnl_ratio = leverage * price_change_ratio # 计算开仓手续费 entry_fee = self.calculate_fee(self.entry_position_value) # 计算平仓手续费(使用当前价格计算平仓时的仓位价值) # 平仓时的仓位价值 ≈ 开仓时的仓位价值(假设size不变) exit_position_value = self.entry_position_value exit_fee = self.calculate_fee(exit_position_value) # 总手续费 total_fee = entry_fee + exit_fee # 手续费相对于保证金的比率 fee_ratio = total_fee / self.entry_margin # 净盈亏 = 毛盈亏 - 手续费比率 net_pnl_ratio = gross_pnl_ratio - fee_ratio return net_pnl_ratio, total_fee, gross_pnl_ratio def close_position_all(self) -> bool: """ 平掉所有持仓 重试机制:最多尝试3次,每次平仓后通过SDK验证是否成功 返回: True=平仓成功, False=平仓失败 """ if self.pos == 0: logger.info("当前无持仓,无需平仓") return True max_retries = 3 retry_delay = 1.0 # 每次重试间隔1秒 for attempt in range(1, max_retries + 1): logger.info(f"平仓尝试 {attempt}/{max_retries}...") # 记录平仓前的持仓状态 old_pos = self.pos # 执行平仓操作 if self.pos == 1: # 平多单,使用side=2 ok = self.place_market_order(2, 999999) if not ok: logger.warning(f"平多单操作失败 (尝试 {attempt}/{max_retries})") if attempt < max_retries: time.sleep(retry_delay) continue else: self.ding(f"平多单失败:已重试{max_retries}次仍失败", error=True) return False elif self.pos == -1: # 平空单,使用side=3 ok = self.place_market_order(3, 999999) if not ok: logger.warning(f"平空单操作失败 (尝试 {attempt}/{max_retries})") if attempt < max_retries: time.sleep(retry_delay) continue else: self.ding(f"平空单失败:已重试{max_retries}次仍失败", error=True) return False else: logger.info("持仓状态异常,无需平仓") return True # 等待订单执行 time.sleep(1.5) # 等待订单执行 # 通过SDK验证是否平仓成功 verify_success = self._verify_position_closed(old_pos) if verify_success: # 平仓成功,清空状态 self.pos = 0 self.entry_margin = None self.entry_position_value = None logger.success(f"✅ 平仓成功 (尝试 {attempt}/{max_retries})") return True else: # 平仓失败,准备重试 logger.warning(f"平仓验证失败,持仓仍存在 (尝试 {attempt}/{max_retries})") if attempt < max_retries: time.sleep(retry_delay) # 重新获取持仓状态 self.get_position_status() else: self.ding(f"平仓失败:已重试{max_retries}次,持仓仍未平掉", error=True) return False return False def _verify_position_closed(self, expected_old_pos: int) -> bool: """ 验证持仓是否已平仓 通过SDK查询持仓状态,确认是否真的平仓成功 返回: True=平仓成功, False=仍有持仓 """ try: # 调用SDK查询持仓状态 resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0] if resp.get("code") != 1000: logger.warning(f"查询持仓状态失败: {resp}") return False positions = resp.get("data", []) # 如果没有持仓,说明平仓成功 if not positions or len(positions) == 0: logger.info("✅ SDK验证:持仓已平仓") return True # 检查持仓是否还存在 # position_type: 1=多, 2=空 (根据get_position_status的逻辑) for p in positions: position_type = p.get("position_type", 0) # 根据get_position_status的逻辑:1=多(pos=1), 其他=空(pos=-1) current_pos = 1 if position_type == 1 else -1 if current_pos == expected_old_pos: # 持仓仍然存在(与平仓前的方向一致) logger.warning( f"⚠️ SDK验证:持仓仍存在 (position_type={position_type}, 方向={'多' if current_pos == 1 else '空'})") return False # 持仓已改变或不存在(平仓成功) logger.info("✅ SDK验证:持仓已平仓或已改变") return True except Exception as e: logger.error(f"验证持仓状态异常: {e}") # 验证失败时,保守处理:认为可能未平仓 return False # ----------------- 止损后机制 ----------------- def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool: """检查是否需要应用重新入场惩罚""" if self.last_sl_dir == 0: return False if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec: self.last_sl_dir = 0 return False reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev) if abs(dev) <= reset_band: self.last_sl_dir = 0 return False return True def _post_sl_dynamic_mult(self) -> float: """计算止损后SL放宽倍数""" if self.post_sl_dir == 0: return 1.0 if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec: self.post_sl_dir = 0 self.post_sl_vol_scale = 1.0 return 1.0 raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0) raw = max(1.0, raw) # 不缩小止损,只放宽 return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw)) # ----------------- 交易逻辑 ----------------- def in_cooldown(self) -> bool: """检查是否在冷却期内""" return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit def maybe_enter(self, price: float, ema_value: float, entry_dev: float): """ 检查并执行入场 【开单入口函数】此函数负责判断是否开单以及开单方向 - 开多:价格低于EMA超过阈值时 - 开空:价格高于EMA超过阈值时 """ if self.pos != 0: return if self.in_cooldown(): return dev = (price - ema_value) / ema_value if ema_value else 0.0 size = self.calculate_size(price) if size <= 0: return penalty_active = self._reentry_penalty_active(dev, entry_dev) long_th = -entry_dev short_th = entry_dev if penalty_active: if self.last_sl_dir == 1: long_th = -entry_dev * self.cfg.reentry_penalty_mult logger.info( f"多头止损后惩罚生效: 入场阈值从 {long_th * 100:.3f}% 调整为 {(-entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%") elif self.last_sl_dir == -1: short_th = entry_dev * self.cfg.reentry_penalty_mult logger.info( f"空头止损后惩罚生效: 入场阈值从 {short_th * 100:.3f}% 调整为 {(entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%") logger.info( f"入场检查: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% " f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) " f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}" ) # ========== 【开单位置1】开多单 ========== # 方向:多(做多,买入) # 条件:价格偏离EMA向下超过阈值 (dev <= long_th) if dev <= long_th: if self.place_market_order(1, size): # side=1 表示开多 self.pos = 1 self.entry_price = price self.entry_ts = time.time() # 记录开仓信息(用于手续费计算) self.entry_margin = self.cfg.fixed_margin self.entry_position_value = self.entry_margin * int(self.cfg.leverage) self.ding( f"✅开多:dev={dev * 100:.3f}% size={size} entry={price:.2f} margin={self.entry_margin}u value={self.entry_position_value}u") # ========== 【开单位置2】开空单 ========== # 方向:空(做空,卖出) # 条件:价格偏离EMA向上超过阈值 (dev >= short_th) elif dev >= short_th: if self.place_market_order(4, size): # side=4 表示开空 self.pos = -1 self.entry_price = price self.entry_ts = time.time() # 记录开仓信息(用于手续费计算) self.entry_margin = self.cfg.fixed_margin self.entry_position_value = self.entry_margin * int(self.cfg.leverage) self.ding( f"✅开空:dev={dev * 100:.3f}% size={size} entry={price:.2f} margin={self.entry_margin}u value={self.entry_position_value}u") def maybe_exit(self, price: float, ema_value: float, tp: float, sl: float, vol_scale: float): """ 检查并执行出场 【平仓函数】包含四种平仓条件,所有平仓都会考虑手续费: 1. 正常平仓:价格回归到EMA附近时平仓(扣除手续费后仍有盈利) 2. 止盈:达到止盈阈值(扣除手续费后仍有盈利) 3. 止损:达到止损阈值 4. 超时平仓:持仓时间超过限制(扣除手续费后仍有盈利) """ if self.pos == 0 or self.entry_price is None or self.entry_ts is None: return hold = time.time() - self.entry_ts # 计算毛盈亏和净盈亏(扣除手续费后) # calculate_net_pnl已经考虑了杠杆倍数 net_pnl, total_fee, gross_pnl = self.calculate_net_pnl(price) # 计算价格偏离EMA的程度 dev = (price - ema_value) / ema_value if ema_value else 0.0 # 计算止损倍数(考虑止损后放宽) sl_mult = 1.0 if self.post_sl_dir == self.pos and self.post_sl_dir != 0: sl_mult = self._post_sl_dynamic_mult() effective_sl = sl * sl_mult # ========== 【平仓条件1】正常平仓:价格回归到EMA附近 ========== # 这是均值回归策略的核心:当价格回归到EMA附近时,说明均值回归已经完成,应该平仓 # 但必须扣除手续费后仍有盈利才平仓 if abs(dev) <= self.cfg.normal_exit_threshold: if net_pnl > 0: # 扣除手续费后仍有盈利 if self.close_position_all(): self.ding( f"📊正常平仓:价格回归EMA附近 dev={dev * 100:.3f}% " f"毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}% " f"手续费={total_fee:.4f}u price={price:.2f}" ) self.entry_price, self.entry_ts = None, None self.entry_margin, self.entry_position_value = None, None self.last_exit_ts = time.time() return else: logger.error("正常平仓失败,持仓可能仍存在") # 平仓失败时不更新状态,下次循环会继续尝试 return # ========== 【平仓条件2】止盈 ========== # 达到止盈阈值,且扣除手续费后仍有盈利 if gross_pnl >= tp: if net_pnl > 0: # 扣除手续费后仍有盈利 if self.close_position_all(): self.ding( f"🎯止盈:毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}% " f"手续费={total_fee:.4f}u price={price:.2f} tp={tp * 100:.3f}%" ) self.entry_price, self.entry_ts = None, None self.entry_margin, self.entry_position_value = None, None self.last_exit_ts = time.time() return else: logger.error("止盈平仓失败,持仓可能仍存在") return # ========== 【平仓条件3】止损 ========== # 止损使用毛盈亏判断(因为止损是风险控制,即使扣除手续费后亏损也要止损) if gross_pnl <= -effective_sl: sl_dir = self.pos if self.close_position_all(): self.ding( f"🛑止损:毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}% " f"手续费={total_fee:.4f}u price={price:.2f} " f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})", error=True ) self.last_sl_dir = sl_dir self.last_sl_ts = time.time() self.post_sl_dir = sl_dir self.post_sl_ts = time.time() self.post_sl_vol_scale = float(vol_scale) self.entry_price, self.entry_ts = None, None self.entry_margin, self.entry_position_value = None, None self.last_exit_ts = time.time() return else: logger.error("止损平仓失败,持仓可能仍存在,风险较高!") self.ding("⚠️ 止损平仓失败,请手动检查持仓!", error=True) # 即使平仓失败,也更新止损状态,避免重复触发 self.last_sl_dir = sl_dir self.last_sl_ts = time.time() return # ========== 【平仓条件4】超时平仓 ========== # 超时平仓也要考虑手续费,只有扣除手续费后仍有盈利才平仓 if hold >= self.cfg.max_hold_sec: if net_pnl > 0: # 扣除手续费后仍有盈利 if self.close_position_all(): self.ding( f"⏱超时:hold={int(hold)}s 毛盈亏={gross_pnl * 100:.3f}% " f"净盈亏={net_pnl * 100:.3f}% 手续费={total_fee:.4f}u price={price:.2f}" ) self.entry_price, self.entry_ts = None, None self.entry_margin, self.entry_position_value = None, None self.last_exit_ts = time.time() return else: logger.error("超时平仓失败,持仓可能仍存在") return else: # 超时但扣除手续费后亏损,继续持有等待盈利 logger.debug( f"⏱超时但净盈亏为负,继续持有:hold={int(hold)}s " f"毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}%" ) def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float, atr_ratio: float, base_ratio: float, vol_scale: float, entry_dev: float, tp: float, sl: float, entry_floor: float, tp_floor: float, sl_floor: float): """限频状态通知""" now = time.time() if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec: return self._last_status_notify_ts = now direction_str = "多" if self.pos == 1 else ("空" if self.pos == -1 else "无") penalty_active = self._reentry_penalty_active(dev, entry_dev) sl_mult = 1.0 if self.pos != 0 and self.post_sl_dir == self.pos: sl_mult = self._post_sl_dynamic_mult() base_age = int(now - self._base_ratio_ts) if self._base_ratio_ts else -1 # 计算当前盈亏(如果有持仓) current_gross_pnl = 0.0 current_net_pnl = 0.0 current_fee = 0.0 if self.pos != 0 and self.entry_price: current_net_pnl, current_fee, current_gross_pnl = self.calculate_net_pnl(price) msg = ( f"【BitMart {self.cfg.contract_symbol}|1m均值回归(动态阈值)】\n" f"📊 状态:{direction_str}\n" f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}:{ema_value:.2f}\n" f"📈 偏离:{dev * 100:.3f}% (入场阈值:±{entry_dev * 100:.3f}%)\n" f"🌊 波动率:ATR比={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n" f"🎯 动态Floor:入场={entry_floor * 100:.3f}% | 止盈={tp_floor * 100:.3f}% | 止损={sl_floor * 100:.3f}%\n" f"💰 止盈/止损:{tp * 100:.3f}% / {sl * 100:.3f}% (盈亏比:{tp / sl:.2f})\n" f"📊 正常平仓阈值:±{self.cfg.normal_exit_threshold * 100:.3f}%\n" f"🔄 基准刷新:{self.cfg.base_ratio_refresh_sec}s (已过={base_age}s)\n" f"⚠️ 止损同向加门槛:{'开启' if penalty_active else '关闭'} (方向={self.last_sl_dir})\n" f"💳 可用余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x | 固定保证金:{self.cfg.fixed_margin}u\n" ) if self.pos != 0 and self.entry_price is not None and self.entry_position_value is not None: msg += ( f"📈 当前盈亏:毛盈亏={current_gross_pnl * 100:.3f}% | " f"净盈亏={current_net_pnl * 100:.3f}% | " f"手续费={current_fee:.4f}u\n" f"📍 入场价:{self.entry_price:.2f} | " f"仓位价值:{self.entry_position_value:.2f}u\n" ) msg += f"⏱️ 持仓限制:{self.cfg.max_hold_sec}s | 冷却:{self.cfg.cooldown_sec_after_exit}s" self.ding(msg) def openBrowser(self): """打开 TGE 对应浏览器实例""" try: bit_port = openBrowser(id=self.bit_id) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) return True except: return False def take_over_browser(self): """接管浏览器""" try: co = ChromiumOptions() co.set_local_port(self.tge_port) self.page = ChromiumPage(addr_or_opts=co) self.page.set.window.max() return True except: return False def close_extra_tabs(self): """关闭多余 tab""" try: for idx, tab in enumerate(self.page.get_tabs()): if idx > 0: tab.close() return True except: return False def click_safe(self, xpath, sleep=0.5): """安全点击""" try: ele = self.page.ele(xpath) if not ele: return False ele.scroll.to_see(center=True) time.sleep(sleep) ele.click() return True except: return False def 平仓(self): self.click_safe('x://span[normalize-space(text()) ="市价"]') def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None): """ marketPriceLongOrder 市价最多或者做空,1是做多,-1是做空 limitPriceShortOrder 限价最多或者做空 """ if marketPriceLongOrder == -1: self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(size) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') elif marketPriceLongOrder == 1: self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(size) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') if limitPriceShortOrder == -1: self.click_safe('x://button[normalize-space(text()) ="限价"]') self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) time.sleep(1) self.page.ele('x://*[@id="size_0"]').input(1) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') elif limitPriceShortOrder == 1: self.click_safe('x://button[normalize-space(text()) ="限价"]') self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) time.sleep(1) self.page.ele('x://*[@id="size_0"]').input(1) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') def action(self): """主循环""" if not self.set_leverage(): self.ding("杠杆设置失败,停止运行", error=True) return # 1. 打开浏览器 if not self.openBrowser(): self.ding("打开 TGE 失败!", error=True) return logger.info("TGE 端口获取成功") # # 2. 接管浏览器 # if not self.take_over_browser(): # self.ding("接管浏览器失败!", error=True) # return logger.info("浏览器接管成功") self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") while True: now_dt = datetime.datetime.now() self.pbar.n = now_dt.second self.pbar.refresh() # 1. 获取K线数据 klines = self.get_klines_cached() if not klines or len(klines) < (self.cfg.ema_len + 5): logger.warning("K线数据不足,等待...") time.sleep(1) continue # 2. 计算技术指标 last_k = klines[-1] closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]] ema_value = self.ema(closes, self.cfg.ema_len) price = self.get_last_price(fallback_close=float(last_k["close"])) dev = (price - ema_value) / ema_value if ema_value else 0.0 # 3. 计算波动率相关指标 a = self.atr(klines, self.cfg.atr_len) atr_ratio = (a / price) if price > 0 else 0.0 base_ratio = self.get_base_ratio_cached(klines) # 4. 计算动态阈值 entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds( atr_ratio, base_ratio ) # 记录调试信息 logger.debug( f"循环数据: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}%, " f"atr_ratio={atr_ratio * 100:.3f}%, base_ratio={base_ratio * 100:.3f}%, " f"entry_dev={entry_dev * 100:.3f}%" ) # 5. 风控检查 self.risk_kill_switch() # 6. 获取持仓状态 if not self.get_position_status(): time.sleep(1) continue # 7. 检查交易是否启用 if not self.trading_enabled: if self.pos != 0: if self.close_position_all(): logger.info("交易被禁用,已平仓") else: logger.error("交易被禁用,但平仓失败,请手动检查!") self.ding("⚠️ 交易被禁用但平仓失败,请手动检查持仓!", error=True) logger.warning("交易被禁用(风控触发),等待...") time.sleep(5) continue # 8. 检查危险市场 if self.is_danger_market(klines, price): logger.warning("危险模式:高波动/大实体K,暂停开仓") self.maybe_exit(price, ema_value, tp, sl, vol_scale) time.sleep(self.cfg.tick_refresh_sec) continue # 9. 执行交易逻辑 # 先检查平仓(包括正常平仓、止盈、止损) self.maybe_exit(price, ema_value, tp, sl, vol_scale) # 再检查开仓 self.maybe_enter(price, ema_value, entry_dev) # 10. 状态通知 bal = self.get_assets_available() self.notify_status_throttled( price, ema_value, dev, bal, atr_ratio, base_ratio, vol_scale, entry_dev, tp, sl, entry_floor, tp_floor, sl_floor ) time.sleep(self.cfg.tick_refresh_sec) if __name__ == "__main__": """ Windows PowerShell: setx BITMART_API_KEY "你的key" setx BITMART_SECRET_KEY "你的secret" setx BITMART_MEMO "合约交易" 重新打开终端再运行。 Linux/macOS: export BITMART_API_KEY="你的key" export BITMART_SECRET_KEY="你的secret" export BITMART_MEMO "合约交易" """ cfg = StrategyConfig() bot = BitmartFuturesMeanReversionBot(cfg, bit_id="f2320f57e24c45529a009e1541e25961") # 设置日志级别为INFO以便查看详细计算过程 logger.remove() logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO") try: bot.action() except KeyboardInterrupt: logger.info("程序被用户中断") bot.ding("🤖 策略已手动停止") except Exception as e: logger.error(f"程序异常退出: {e}") bot.ding(f"❌ 策略异常退出: {e}", error=True) raise