Files
lm_code/test1.py
2025-12-29 18:29:21 +08:00

1284 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import uuid
import datetime
import requests
from typing import Tuple
from tqdm import tqdm
from loguru import logger
from dataclasses import dataclass
from bitmart.api_contract import APIContract
from DrissionPage import ChromiumPage, ChromiumOptions
from bitmart.lib.cloud_exceptions import APIException
from bit_tools import openBrowser
from 交易.tools import send_dingtalk_message
@dataclass
class StrategyConfig:
# =============================
# 1m | ETH 永续 | 控止损≤5/日
# =============================
# ===== 合约 =====
contract_symbol: str = "ETHUSDT"
open_type: str = "cross"
leverage: str = "30"
# ===== K线与指标 =====
step_min: int = 1
lookback_min: int = 240
ema_len: int = 36
atr_len: int = 14
# =========================================================
# ✅ 自动阈值ATR/Price 分位数基准(更稳,不被短时噪声带跑)
# =========================================================
vol_baseline_window: int = 60
vol_baseline_quantile: float = 0.65
vol_scale_min: float = 0.80
vol_scale_max: float = 1.60
# ✅ baseline 每 60 秒刷新一次体感更明显、也省CPU
base_ratio_refresh_sec: int = 60
# =========================================================
# ✅ 动态 floor方案一
# floor = clamp(min, base_k * base_ratio, max)
# 目的跟着典型波动变过滤小噪声tp/sl 也随环境自适应
# =========================================================
# entry_dev_floor 动态
entry_dev_floor_min: float = 0.0012 # 0.12%
entry_dev_floor_max: float = 0.0030 # 0.30%(可按你偏好调)
entry_dev_floor_base_k: float = 1.10 # entry_floor = 1.10 * base_ratio
# tp_floor 动态
tp_floor_min: float = 0.0006 # 0.06%
tp_floor_max: float = 0.0020 # 0.20%
tp_floor_base_k: float = 0.55 # tp_floor = 0.55 * base_ratio止盈别太大1m回归更实际
# sl_floor 动态
sl_floor_min: float = 0.0018 # 0.18%
sl_floor_max: float = 0.0060 # 0.60%
sl_floor_base_k: float = 1.35 # sl_floor = 1.35 * base_ratioETH 1m 插针多,止损下限可更稳)
# =========================================================
# ✅ 动态阈值倍率(仍然保留你原来思路)
# =========================================================
entry_k: float = 1.45
tp_k: float = 0.65
sl_k: float = 1.05
# ===== 时间/冷却 =====
max_hold_sec: int = 75
cooldown_sec_after_exit: int = 20
# ===== 下单/仓位 =====
risk_percent: float = 0.004
min_size: int = 1
max_size: int = 5000
# ===== 日内风控 =====
daily_loss_limit: float = 0.02
daily_profit_cap: float = 0.01
# ===== 危险模式过滤 =====
atr_ratio_kill: float = 0.0038
big_body_kill: float = 0.010
# ===== 轮询节奏 =====
klines_refresh_sec: int = 10
tick_refresh_sec: int = 1
status_notify_sec: int = 60
# =========================================================
# ✅ 止损后同向入场加门槛(但不禁止同向重入)
# =========================================================
reentry_penalty_mult: float = 1.55
reentry_penalty_max_sec: int = 180
reset_band_k: float = 0.45
reset_band_floor: float = 0.0006
# =========================================================
# ✅ 止损后同方向 SL 放宽幅度与"止损时 vol_scale"联动
# =========================================================
post_sl_sl_max_sec: int = 90
post_sl_mult_min: float = 1.02
post_sl_mult_max: float = 1.16
post_sl_vol_alpha: float = 0.20
# =========================================================
# ✅ 正常平仓条件价格回归到EMA附近时平仓
# =========================================================
normal_exit_threshold: float = 0.0003 # 0.03%价格回归到EMA附近时平仓
normal_exit_min_profit: float = 0.0002 # 0.02%,正常平仓最小盈利要求
# =========================================================
# ✅ 手续费配置固定10u30倍杠杆
# =========================================================
fixed_margin: float = 10.0 # 固定每单10u保证金
platform_fee_rate: float = 0.0005 # 平台手续费:开仓价值的万分之五
rebate_rate: float = 0.90 # 返佣比例90%
class BitmartFuturesMeanReversionBot:
def __init__(self, cfg: StrategyConfig, bit_id=None):
self.bit_id = bit_id
self.page: ChromiumPage | None = None
self.cfg = cfg
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
if not self.api_key or not self.secret_key:
raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)")
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
# 持仓状态: -1 空, 0 无, 1 多
self.pos = 0
self.entry_price = None
self.entry_ts = None
self.last_exit_ts = 0
# 开仓信息(用于手续费计算)
self.entry_margin = None # 开仓保证金固定10u
self.entry_position_value = None # 开仓时的仓位价值(保证金 * 杠杆)
# 日内权益基准
self.day_start_equity = None
self.trading_enabled = True
self.day_tag = datetime.date.today()
# 缓存
self._klines_cache = None
self._klines_cache_ts = 0
self._last_status_notify_ts = 0
# ✅ base_ratio 缓存
self._base_ratio_cached = 0.0015 # 初始化默认值 0.15%
self._base_ratio_ts = 0.0
# ✅ 止损后"同向入场加门槛"状态
self.last_sl_dir = 0 # 1=多止损,-1=空止损0=无
self.last_sl_ts = 0.0
# ✅ 止损后"同方向 SL 联动放宽"状态
self.post_sl_dir = 0
self.post_sl_ts = 0.0
self.post_sl_vol_scale = 1.0 # 记录止损时的 vol_scale
self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90)
logger.info(f"初始化完成,基准波动率默认值: {self._base_ratio_cached * 100:.4f}%")
# ----------------- 通用工具 -----------------
def ding(self, msg, error=False):
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
for _ in range(1):
send_dingtalk_message(f"{prefix}{msg}")
else:
send_dingtalk_message(f"{prefix}{msg}")
def set_leverage(self) -> bool:
try:
resp = self.contractAPI.post_submit_leverage(
contract_symbol=self.cfg.contract_symbol,
leverage=self.cfg.leverage,
open_type=self.cfg.open_type
)[0]
if resp.get("code") == 1000:
logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x")
return True
logger.error(f"设置杠杆失败: {resp}")
self.ding(f"设置杠杆失败: {resp}", error=True)
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
self.ding(f"设置杠杆异常: {e}", error=True)
return False
# ----------------- 行情/指标 -----------------
def get_klines_cached(self):
now = time.time()
if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec:
return self._klines_cache
kl = self.get_klines()
if kl:
self._klines_cache = kl
self._klines_cache_ts = now
return self._klines_cache
def get_klines(self):
try:
end_time = int(time.time())
start_time = end_time - 60 * self.cfg.lookback_min
resp = self.contractAPI.get_kline(
contract_symbol=self.cfg.contract_symbol,
step=self.cfg.step_min,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp.get("data", [])
formatted = []
for k in data:
formatted.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
formatted.sort(key=lambda x: x["id"])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(f"获取K线异常: {e}", error=True)
return None
def get_last_price(self, fallback_close: float) -> float:
try:
if hasattr(self.contractAPI, "get_contract_details"):
r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "mark_price", "index_price"):
if key in d and d[key] is not None:
return float(d[key])
if hasattr(self.contractAPI, "get_ticker"):
r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0]
d = r.get("data") if isinstance(r, dict) else None
if isinstance(d, dict):
for key in ("last_price", "price", "last", "close"):
if key in d and d[key] is not None:
return float(d[key])
except Exception:
pass
return float(fallback_close)
@staticmethod
def ema(values, n: int) -> float:
k = 2 / (n + 1)
e = values[0]
for v in values[1:]:
e = v * k + e * (1 - k)
return e
@staticmethod
def atr(klines, n: int) -> float:
if len(klines) < n + 1:
return 0.0
trs = []
for i in range(-n, 0):
cur = klines[i]
prev = klines[i - 1]
tr = max(
cur["high"] - cur["low"],
abs(cur["high"] - prev["close"]),
abs(cur["low"] - prev["close"]),
)
trs.append(tr)
return sum(trs) / len(trs)
def is_danger_market(self, klines, price: float) -> bool:
last = klines[-1]
body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0
if body >= self.cfg.big_body_kill:
return True
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
if atr_ratio >= self.cfg.atr_ratio_kill:
return True
return False
def atr_ratio_baseline(self, klines) -> float:
"""简化版ATR基准计算"""
window = min(self.cfg.vol_baseline_window, len(klines) - self.cfg.atr_len - 1)
if window <= 10: # 数据太少
logger.warning(f"数据不足计算基准: {len(klines)}根K线")
return 0.0
ratios = []
# 简化计算每隔3根K线计算一个ATR比率减少计算量
step = 3
for i in range(-window, 0, step):
if len(klines) + i < self.cfg.atr_len + 1:
continue
# 计算当前位置的ATR
start_idx = len(klines) + i - self.cfg.atr_len
end_idx = len(klines) + i
if start_idx < 0 or end_idx <= start_idx:
continue
sub_klines = klines[start_idx:end_idx]
# 确保有足够数据计算ATR
if len(sub_klines) >= self.cfg.atr_len + 1:
a = self.atr(sub_klines, self.cfg.atr_len)
price = klines[end_idx - 1]["close"]
if a > 0 and price > 0:
ratio = a / price
if 0.0001 < ratio < 0.01: # 过滤异常值
ratios.append(ratio)
if len(ratios) < 5: # 样本太少
# 尝试直接使用整个数据计算一个ATR比率
a = self.atr(klines[-60:], self.cfg.atr_len) # 使用最近60根K线
price = klines[-1]["close"]
if a > 0 and price > 0:
baseline = a / price
logger.debug(f"使用全量数据计算基准: {baseline * 100:.4f}%")
return baseline
else:
return 0.0
# 计算分位数
ratios.sort()
idx = min(len(ratios) - 1,
max(0, int(self.cfg.vol_baseline_quantile * (len(ratios) - 1))))
baseline = ratios[idx]
logger.debug(f"基准计算: 样本数={len(ratios)}, 基准={baseline * 100:.4f}%, "
f"范围=[{ratios[0] * 100:.4f}%, {ratios[-1] * 100:.4f}%]")
return baseline
def get_base_ratio_cached(self, klines) -> float:
"""获取缓存的基准波动率,定期刷新"""
now = time.time()
refresh_sec = self.cfg.base_ratio_refresh_sec
if (self._base_ratio_cached is None or
(now - self._base_ratio_ts) >= refresh_sec):
# 使用简单版本的基准计算
baseline = self.atr_ratio_baseline(klines)
if baseline > 0.0001: # 大于0.01%才认为是有效值
self._base_ratio_cached = baseline
self._base_ratio_ts = now
logger.info(f"基准波动率更新: {baseline * 100:.4f}%")
else:
# 使用基于价格的动态默认值
current_price = klines[-1]["close"] if klines else 3000
# ETH价格越高基准波动率越小百分比
if current_price > 4000:
default_baseline = 0.0010 # 0.10%
elif current_price > 3500:
default_baseline = 0.0012 # 0.12%
elif current_price > 3000:
default_baseline = 0.0015 # 0.15%
elif current_price > 2500:
default_baseline = 0.0018 # 0.18%
else:
default_baseline = 0.0020 # 0.20%
self._base_ratio_cached = default_baseline
self._base_ratio_ts = now
logger.warning(f"使用价格动态默认基准: {default_baseline * 100:.4f}% "
f"(价格=${current_price:.0f})")
return self._base_ratio_cached
@staticmethod
def _clamp(x: float, lo: float, hi: float) -> float:
"""限制数值在指定范围内"""
return max(lo, min(hi, x))
def dynamic_thresholds(self, atr_ratio: float, base_ratio: float):
"""
✅ entry/tp/sl 全部动态(修复版):
- vol_scaleatr_ratio/base_ratio 限幅
- floor方案一 (floor = clamp(min, k*base_ratio, max))
- 最终阈值max(floor, k * vol_scale * atr_ratio)
"""
# 1) 检查输入有效性
if atr_ratio <= 0:
logger.warning(f"ATR比率异常: {atr_ratio}")
atr_ratio = 0.001 # 默认值 0.1%
# 2) 如果base_ratio太小或无效使用调整后的atr_ratio
if base_ratio < 0.0005: # 小于0.05%视为无效
base_ratio = max(0.001, atr_ratio * 1.2) # 比当前ATR比率稍大
logger.debug(f"基准太小使用调整后的atr_ratio: {base_ratio * 100:.4f}%")
# 3) vol_scale计算
if base_ratio > 0:
raw_scale = atr_ratio / base_ratio
vol_scale = self._clamp(raw_scale, self.cfg.vol_scale_min, self.cfg.vol_scale_max)
logger.debug(
f"vol_scale: {raw_scale:.2f}{vol_scale:.2f} (atr={atr_ratio * 100:.3f}%, base={base_ratio * 100:.3f}%)")
else:
vol_scale = 1.0
logger.warning(f"基准无效使用默认vol_scale=1.0")
# 4) 动态floor计算
# Entry floor
entry_floor_raw = self.cfg.entry_dev_floor_base_k * base_ratio
entry_floor = self._clamp(
entry_floor_raw,
self.cfg.entry_dev_floor_min,
self.cfg.entry_dev_floor_max,
)
# TP floor
tp_floor_raw = self.cfg.tp_floor_base_k * base_ratio
tp_floor = self._clamp(
tp_floor_raw,
self.cfg.tp_floor_min,
self.cfg.tp_floor_max,
)
# SL floor
sl_floor_raw = self.cfg.sl_floor_base_k * base_ratio
sl_floor = self._clamp(
sl_floor_raw,
self.cfg.sl_floor_min,
self.cfg.sl_floor_max,
)
# 5) 最终阈值计算
entry_dev_atr_part = self.cfg.entry_k * vol_scale * atr_ratio
entry_dev = max(entry_floor, entry_dev_atr_part)
tp_atr_part = self.cfg.tp_k * vol_scale * atr_ratio
tp = max(tp_floor, tp_atr_part)
sl_atr_part = self.cfg.sl_k * vol_scale * atr_ratio
sl = max(sl_floor, sl_atr_part)
# 6) 确保entry_dev不会太小
entry_dev = max(entry_dev, self.cfg.entry_dev_floor_min)
# 7) 输出详细信息
logger.info(
f"动态阈值: atr={atr_ratio * 100:.4f}%, base={base_ratio * 100:.4f}%, "
f"vol_scale={vol_scale:.2f}, floor={entry_floor * 100:.4f}%, "
f"atr_part={entry_dev_atr_part * 100:.4f}%, 最终entry_dev={entry_dev * 100:.4f}%"
)
return entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor
# ----------------- 账户/仓位 -----------------
def get_assets_available(self) -> float:
try:
resp = self.contractAPI.get_assets_detail()[0]
if resp.get("code") != 1000:
return 0.0
data = resp.get("data")
if isinstance(data, dict):
return float(data.get("available_balance", 0))
if isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return 0.0
except Exception as e:
logger.error(f"余额查询异常: {e}")
return 0.0
def get_position_status(self) -> bool:
try:
resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
if resp.get("code") != 1000:
return False
positions = resp.get("data", [])
if not positions:
self.pos = 0
return True
p = positions[0]
self.pos = 1 if p["position_type"] == 1 else -1
return True
except Exception as e:
logger.error(f"持仓查询异常: {e}")
self.ding(f"持仓查询异常: {e}", error=True)
return False
def get_equity_proxy(self) -> float:
return self.get_assets_available()
def refresh_daily_baseline(self):
today = datetime.date.today()
if today != self.day_tag:
self.day_tag = today
self.day_start_equity = None
self.trading_enabled = True
self.ding(f"新的一天({today}):重置日内风控基准")
def risk_kill_switch(self):
self.refresh_daily_baseline()
equity = self.get_equity_proxy()
if equity <= 0:
return
if self.day_start_equity is None:
self.day_start_equity = equity
logger.info(f"日内权益基准设定:{equity:.2f} USDT")
return
pnl = (equity - self.day_start_equity) / self.day_start_equity
if pnl <= -self.cfg.daily_loss_limit:
self.trading_enabled = False
self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True)
if pnl >= self.cfg.daily_profit_cap:
self.trading_enabled = False
self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机")
# ----------------- 下单 -----------------
def calculate_size(self, price: float) -> int:
"""
计算仓位大小
固定每单10u保证金30倍杠杆
"""
bal = self.get_assets_available()
if bal < self.cfg.fixed_margin:
logger.warning(f"余额不足:{bal:.2f} USDT < {self.cfg.fixed_margin} USDT")
return 0
# 固定保证金10u
margin = self.cfg.fixed_margin
lev = int(self.cfg.leverage)
# ⚠️ 沿用你的原假设1张≈0.001ETH
# 仓位价值 = 保证金 * 杠杆 = 10 * 30 = 300u
# size = 仓位价值 / (价格 * 0.001)
size = int((margin * lev) / (price * 0.001))
size = max(self.cfg.min_size, size)
size = min(self.cfg.max_size, size)
logger.info(f"计算仓位:保证金={margin}u, 杠杆={lev}x, 仓位价值={margin * lev}u, size={size}")
return size
def place_market_order(self, side: int, size: int) -> bool:
"""
【下单函数】实际执行下单操作
side: 1=开多, 4=开空, 2=平多, 3=平空
"""
if size <= 0:
return False
try:
# 开多单
if side == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
logger.info(f"✅ 开多单: size={size}")
return True
# 开空单
elif side == 4:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
logger.info(f"✅ 开空单: size={size}")
return True
# 平多单(平多 = 卖出)
elif side == 2:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
time.sleep(0.3) # 等待界面响应
# 平仓时size可以设置大一些确保全部平仓
self.page.ele('x://*[@id="size_0"]').input(size, clear=True)
time.sleep(0.3)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
logger.info(f"✅ 平多单操作执行: size={size}")
return True
# 平空单(平空 = 买入)
elif side == 3:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
time.sleep(0.3) # 等待界面响应
self.page.ele('x://*[@id="size_0"]').input(size, clear=True)
time.sleep(0.3)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
logger.info(f"✅ 平空单操作执行: size={size}")
return True
else:
logger.error(f"未知的订单方向: side={side}")
return False
except Exception as e:
logger.error(f"下单异常: {e}")
self.ding(f"下单异常: {e}", error=True)
return False
def calculate_fee(self, position_value: float) -> float:
"""
计算手续费
平台手续费 = 仓位价值 * 0.0005(万分之五)
实际手续费 = 平台手续费 * (1 - 返佣比例) = 平台手续费 * 0.1
"""
platform_fee = position_value * self.cfg.platform_fee_rate
actual_fee = platform_fee * (1 - self.cfg.rebate_rate)
return actual_fee
def calculate_net_pnl(self, price: float) -> Tuple[float, float, float]:
"""
计算扣除手续费后的净盈亏
返回: (净盈亏比例, 总手续费, 毛盈亏比例)
注意:在杠杆交易中,盈亏比例 = 杠杆倍数 * 价格变动比例
例如30倍杠杆价格涨1%实际盈亏是30%
"""
if self.pos == 0 or self.entry_price is None or self.entry_margin is None or self.entry_position_value is None:
return 0.0, 0.0, 0.0
leverage = int(self.cfg.leverage)
# 计算价格变动比例
if self.pos == 1:
price_change_ratio = (price - self.entry_price) / self.entry_price
else:
price_change_ratio = (self.entry_price - price) / self.entry_price
# 计算毛盈亏比例(考虑杠杆)
gross_pnl_ratio = leverage * price_change_ratio
# 计算开仓手续费
entry_fee = self.calculate_fee(self.entry_position_value)
# 计算平仓手续费(使用当前价格计算平仓时的仓位价值)
# 平仓时的仓位价值 ≈ 开仓时的仓位价值假设size不变
exit_position_value = self.entry_position_value
exit_fee = self.calculate_fee(exit_position_value)
# 总手续费
total_fee = entry_fee + exit_fee
# 手续费相对于保证金的比率
fee_ratio = total_fee / self.entry_margin
# 净盈亏 = 毛盈亏 - 手续费比率
net_pnl_ratio = gross_pnl_ratio - fee_ratio
return net_pnl_ratio, total_fee, gross_pnl_ratio
def close_position_all(self) -> bool:
"""
平掉所有持仓
重试机制最多尝试3次每次平仓后通过SDK验证是否成功
返回: True=平仓成功, False=平仓失败
"""
if self.pos == 0:
logger.info("当前无持仓,无需平仓")
return True
max_retries = 3
retry_delay = 1.0 # 每次重试间隔1秒
for attempt in range(1, max_retries + 1):
logger.info(f"平仓尝试 {attempt}/{max_retries}...")
# 记录平仓前的持仓状态
old_pos = self.pos
# 执行平仓操作
if self.pos == 1:
# 平多单使用side=2
ok = self.place_market_order(2, 999999)
if not ok:
logger.warning(f"平多单操作失败 (尝试 {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
continue
else:
self.ding(f"平多单失败:已重试{max_retries}次仍失败", error=True)
return False
elif self.pos == -1:
# 平空单使用side=3
ok = self.place_market_order(3, 999999)
if not ok:
logger.warning(f"平空单操作失败 (尝试 {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
continue
else:
self.ding(f"平空单失败:已重试{max_retries}次仍失败", error=True)
return False
else:
logger.info("持仓状态异常,无需平仓")
return True
# 等待订单执行
time.sleep(1.5) # 等待订单执行
# 通过SDK验证是否平仓成功
verify_success = self._verify_position_closed(old_pos)
if verify_success:
# 平仓成功,清空状态
self.pos = 0
self.entry_margin = None
self.entry_position_value = None
logger.success(f"✅ 平仓成功 (尝试 {attempt}/{max_retries})")
return True
else:
# 平仓失败,准备重试
logger.warning(f"平仓验证失败,持仓仍存在 (尝试 {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
# 重新获取持仓状态
self.get_position_status()
else:
self.ding(f"平仓失败:已重试{max_retries}次,持仓仍未平掉", error=True)
return False
return False
def _verify_position_closed(self, expected_old_pos: int) -> bool:
"""
验证持仓是否已平仓
通过SDK查询持仓状态确认是否真的平仓成功
返回: True=平仓成功, False=仍有持仓
"""
try:
# 调用SDK查询持仓状态
resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0]
if resp.get("code") != 1000:
logger.warning(f"查询持仓状态失败: {resp}")
return False
positions = resp.get("data", [])
# 如果没有持仓,说明平仓成功
if not positions or len(positions) == 0:
logger.info("✅ SDK验证持仓已平仓")
return True
# 检查持仓是否还存在
# position_type: 1=多, 2=空 (根据get_position_status的逻辑)
for p in positions:
position_type = p.get("position_type", 0)
# 根据get_position_status的逻辑1=多(pos=1), 其他=空(pos=-1)
current_pos = 1 if position_type == 1 else -1
if current_pos == expected_old_pos:
# 持仓仍然存在(与平仓前的方向一致)
logger.warning(
f"⚠️ SDK验证持仓仍存在 (position_type={position_type}, 方向={'' if current_pos == 1 else ''})")
return False
# 持仓已改变或不存在(平仓成功)
logger.info("✅ SDK验证持仓已平仓或已改变")
return True
except Exception as e:
logger.error(f"验证持仓状态异常: {e}")
# 验证失败时,保守处理:认为可能未平仓
return False
# ----------------- 止损后机制 -----------------
def _reentry_penalty_active(self, dev: float, entry_dev: float) -> bool:
"""检查是否需要应用重新入场惩罚"""
if self.last_sl_dir == 0:
return False
if (time.time() - self.last_sl_ts) > self.cfg.reentry_penalty_max_sec:
self.last_sl_dir = 0
return False
reset_band = max(self.cfg.reset_band_floor, self.cfg.reset_band_k * entry_dev)
if abs(dev) <= reset_band:
self.last_sl_dir = 0
return False
return True
def _post_sl_dynamic_mult(self) -> float:
"""计算止损后SL放宽倍数"""
if self.post_sl_dir == 0:
return 1.0
if (time.time() - self.post_sl_ts) > self.cfg.post_sl_sl_max_sec:
self.post_sl_dir = 0
self.post_sl_vol_scale = 1.0
return 1.0
raw = 1.0 + self.cfg.post_sl_vol_alpha * (self.post_sl_vol_scale - 1.0)
raw = max(1.0, raw) # 不缩小止损,只放宽
return max(self.cfg.post_sl_mult_min, min(self.cfg.post_sl_mult_max, raw))
# ----------------- 交易逻辑 -----------------
def in_cooldown(self) -> bool:
"""检查是否在冷却期内"""
return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit
def maybe_enter(self, price: float, ema_value: float, entry_dev: float):
"""
检查并执行入场
【开单入口函数】此函数负责判断是否开单以及开单方向
- 开多价格低于EMA超过阈值时
- 开空价格高于EMA超过阈值时
"""
if self.pos != 0:
return
if self.in_cooldown():
return
dev = (price - ema_value) / ema_value if ema_value else 0.0
size = self.calculate_size(price)
if size <= 0:
return
penalty_active = self._reentry_penalty_active(dev, entry_dev)
long_th = -entry_dev
short_th = entry_dev
if penalty_active:
if self.last_sl_dir == 1:
long_th = -entry_dev * self.cfg.reentry_penalty_mult
logger.info(
f"多头止损后惩罚生效: 入场阈值从 {long_th * 100:.3f}% 调整为 {(-entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%")
elif self.last_sl_dir == -1:
short_th = entry_dev * self.cfg.reentry_penalty_mult
logger.info(
f"空头止损后惩罚生效: 入场阈值从 {short_th * 100:.3f}% 调整为 {(entry_dev * self.cfg.reentry_penalty_mult) * 100:.3f}%")
logger.info(
f"入场检查: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% "
f"(entry_dev={entry_dev * 100:.3f}%, long_th={long_th * 100:.3f}%, short_th={short_th * 100:.3f}%) "
f"size={size}, penalty={penalty_active}, last_sl_dir={self.last_sl_dir}"
)
# ========== 【开单位置1】开多单 ==========
# 方向:多(做多,买入)
# 条件价格偏离EMA向下超过阈值 (dev <= long_th)
if dev <= long_th:
if self.place_market_order(1, size): # side=1 表示开多
self.pos = 1
self.entry_price = price
self.entry_ts = time.time()
# 记录开仓信息(用于手续费计算)
self.entry_margin = self.cfg.fixed_margin
self.entry_position_value = self.entry_margin * int(self.cfg.leverage)
self.ding(
f"✅开多dev={dev * 100:.3f}% size={size} entry={price:.2f} margin={self.entry_margin}u value={self.entry_position_value}u")
# ========== 【开单位置2】开空单 ==========
# 方向:空(做空,卖出)
# 条件价格偏离EMA向上超过阈值 (dev >= short_th)
elif dev >= short_th:
if self.place_market_order(4, size): # side=4 表示开空
self.pos = -1
self.entry_price = price
self.entry_ts = time.time()
# 记录开仓信息(用于手续费计算)
self.entry_margin = self.cfg.fixed_margin
self.entry_position_value = self.entry_margin * int(self.cfg.leverage)
self.ding(
f"✅开空dev={dev * 100:.3f}% size={size} entry={price:.2f} margin={self.entry_margin}u value={self.entry_position_value}u")
def maybe_exit(self, price: float, ema_value: float, tp: float, sl: float, vol_scale: float):
"""
检查并执行出场
【平仓函数】包含四种平仓条件,所有平仓都会考虑手续费:
1. 正常平仓价格回归到EMA附近时平仓扣除手续费后仍有盈利
2. 止盈:达到止盈阈值(扣除手续费后仍有盈利)
3. 止损:达到止损阈值
4. 超时平仓:持仓时间超过限制(扣除手续费后仍有盈利)
"""
if self.pos == 0 or self.entry_price is None or self.entry_ts is None:
return
hold = time.time() - self.entry_ts
# 计算毛盈亏和净盈亏(扣除手续费后)
# calculate_net_pnl已经考虑了杠杆倍数
net_pnl, total_fee, gross_pnl = self.calculate_net_pnl(price)
# 计算价格偏离EMA的程度
dev = (price - ema_value) / ema_value if ema_value else 0.0
# 计算止损倍数(考虑止损后放宽)
sl_mult = 1.0
if self.post_sl_dir == self.pos and self.post_sl_dir != 0:
sl_mult = self._post_sl_dynamic_mult()
effective_sl = sl * sl_mult
# ========== 【平仓条件1】正常平仓价格回归到EMA附近 ==========
# 这是均值回归策略的核心当价格回归到EMA附近时说明均值回归已经完成应该平仓
# 但必须扣除手续费后仍有盈利才平仓
if abs(dev) <= self.cfg.normal_exit_threshold:
if net_pnl > 0: # 扣除手续费后仍有盈利
if self.close_position_all():
self.ding(
f"📊正常平仓价格回归EMA附近 dev={dev * 100:.3f}% "
f"毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}% "
f"手续费={total_fee:.4f}u price={price:.2f}"
)
self.entry_price, self.entry_ts = None, None
self.entry_margin, self.entry_position_value = None, None
self.last_exit_ts = time.time()
return
else:
logger.error("正常平仓失败,持仓可能仍存在")
# 平仓失败时不更新状态,下次循环会继续尝试
return
# ========== 【平仓条件2】止盈 ==========
# 达到止盈阈值,且扣除手续费后仍有盈利
if gross_pnl >= tp:
if net_pnl > 0: # 扣除手续费后仍有盈利
if self.close_position_all():
self.ding(
f"🎯止盈:毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}% "
f"手续费={total_fee:.4f}u price={price:.2f} tp={tp * 100:.3f}%"
)
self.entry_price, self.entry_ts = None, None
self.entry_margin, self.entry_position_value = None, None
self.last_exit_ts = time.time()
return
else:
logger.error("止盈平仓失败,持仓可能仍存在")
return
# ========== 【平仓条件3】止损 ==========
# 止损使用毛盈亏判断(因为止损是风险控制,即使扣除手续费后亏损也要止损)
if gross_pnl <= -effective_sl:
sl_dir = self.pos
if self.close_position_all():
self.ding(
f"🛑止损:毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}% "
f"手续费={total_fee:.4f}u price={price:.2f} "
f"sl={sl * 100:.3f}% effective_sl={effective_sl * 100:.3f}%(×{sl_mult:.2f})",
error=True
)
self.last_sl_dir = sl_dir
self.last_sl_ts = time.time()
self.post_sl_dir = sl_dir
self.post_sl_ts = time.time()
self.post_sl_vol_scale = float(vol_scale)
self.entry_price, self.entry_ts = None, None
self.entry_margin, self.entry_position_value = None, None
self.last_exit_ts = time.time()
return
else:
logger.error("止损平仓失败,持仓可能仍存在,风险较高!")
self.ding("⚠️ 止损平仓失败,请手动检查持仓!", error=True)
# 即使平仓失败,也更新止损状态,避免重复触发
self.last_sl_dir = sl_dir
self.last_sl_ts = time.time()
return
# ========== 【平仓条件4】超时平仓 ==========
# 超时平仓也要考虑手续费,只有扣除手续费后仍有盈利才平仓
if hold >= self.cfg.max_hold_sec:
if net_pnl > 0: # 扣除手续费后仍有盈利
if self.close_position_all():
self.ding(
f"⏱超时hold={int(hold)}s 毛盈亏={gross_pnl * 100:.3f}% "
f"净盈亏={net_pnl * 100:.3f}% 手续费={total_fee:.4f}u price={price:.2f}"
)
self.entry_price, self.entry_ts = None, None
self.entry_margin, self.entry_position_value = None, None
self.last_exit_ts = time.time()
return
else:
logger.error("超时平仓失败,持仓可能仍存在")
return
else:
# 超时但扣除手续费后亏损,继续持有等待盈利
logger.debug(
f"⏱超时但净盈亏为负继续持有hold={int(hold)}s "
f"毛盈亏={gross_pnl * 100:.3f}% 净盈亏={net_pnl * 100:.3f}%"
)
def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float,
atr_ratio: float, base_ratio: float, vol_scale: float,
entry_dev: float, tp: float, sl: float,
entry_floor: float, tp_floor: float, sl_floor: float):
"""限频状态通知"""
now = time.time()
if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec:
return
self._last_status_notify_ts = now
direction_str = "" if self.pos == 1 else ("" if self.pos == -1 else "")
penalty_active = self._reentry_penalty_active(dev, entry_dev)
sl_mult = 1.0
if self.pos != 0 and self.post_sl_dir == self.pos:
sl_mult = self._post_sl_dynamic_mult()
base_age = int(now - self._base_ratio_ts) if self._base_ratio_ts else -1
# 计算当前盈亏(如果有持仓)
current_gross_pnl = 0.0
current_net_pnl = 0.0
current_fee = 0.0
if self.pos != 0 and self.entry_price:
current_net_pnl, current_fee, current_gross_pnl = self.calculate_net_pnl(price)
msg = (
f"【BitMart {self.cfg.contract_symbol}1m均值回归(动态阈值)】\n"
f"📊 状态:{direction_str}\n"
f"💰 现价:{price:.2f} | EMA{self.cfg.ema_len}{ema_value:.2f}\n"
f"📈 偏离:{dev * 100:.3f}% (入场阈值:±{entry_dev * 100:.3f}%)\n"
f"🌊 波动率ATR比={atr_ratio * 100:.3f}% | 基准={base_ratio * 100:.3f}% | 缩放={vol_scale:.2f}\n"
f"🎯 动态Floor入场={entry_floor * 100:.3f}% | 止盈={tp_floor * 100:.3f}% | 止损={sl_floor * 100:.3f}%\n"
f"💰 止盈/止损:{tp * 100:.3f}% / {sl * 100:.3f}% (盈亏比:{tp / sl:.2f})\n"
f"📊 正常平仓阈值:±{self.cfg.normal_exit_threshold * 100:.3f}%\n"
f"🔄 基准刷新:{self.cfg.base_ratio_refresh_sec}s (已过={base_age}s)\n"
f"⚠️ 止损同向加门槛:{'开启' if penalty_active else '关闭'} (方向={self.last_sl_dir})\n"
f"💳 可用余额:{bal:.2f} USDT | 杠杆:{self.cfg.leverage}x | 固定保证金:{self.cfg.fixed_margin}u\n"
)
if self.pos != 0 and self.entry_price is not None and self.entry_position_value is not None:
msg += (
f"📈 当前盈亏:毛盈亏={current_gross_pnl * 100:.3f}% | "
f"净盈亏={current_net_pnl * 100:.3f}% | "
f"手续费={current_fee:.4f}u\n"
f"📍 入场价:{self.entry_price:.2f} | "
f"仓位价值:{self.entry_position_value:.2f}u\n"
)
msg += f"⏱️ 持仓限制:{self.cfg.max_hold_sec}s | 冷却:{self.cfg.cooldown_sec_after_exit}s"
self.ding(msg)
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def take_over_browser(self):
"""接管浏览器"""
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def close_extra_tabs(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except:
return False
def 平仓(self):
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价最多或者做空1是做多-1是做空
limitPriceShortOrder 限价最多或者做空
"""
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def action(self):
"""主循环"""
if not self.set_leverage():
self.ding("杠杆设置失败,停止运行", error=True)
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开 TGE 失败!", error=True)
return
logger.info("TGE 端口获取成功")
# # 2. 接管浏览器
# if not self.take_over_browser():
# self.ding("接管浏览器失败!", error=True)
# return
logger.info("浏览器接管成功")
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
while True:
now_dt = datetime.datetime.now()
self.pbar.n = now_dt.second
self.pbar.refresh()
# 1. 获取K线数据
klines = self.get_klines_cached()
if not klines or len(klines) < (self.cfg.ema_len + 5):
logger.warning("K线数据不足等待...")
time.sleep(1)
continue
# 2. 计算技术指标
last_k = klines[-1]
closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]]
ema_value = self.ema(closes, self.cfg.ema_len)
price = self.get_last_price(fallback_close=float(last_k["close"]))
dev = (price - ema_value) / ema_value if ema_value else 0.0
# 3. 计算波动率相关指标
a = self.atr(klines, self.cfg.atr_len)
atr_ratio = (a / price) if price > 0 else 0.0
base_ratio = self.get_base_ratio_cached(klines)
# 4. 计算动态阈值
entry_dev, tp, sl, vol_scale, entry_floor, tp_floor, sl_floor = self.dynamic_thresholds(
atr_ratio, base_ratio
)
# 记录调试信息
logger.debug(
f"循环数据: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}%, "
f"atr_ratio={atr_ratio * 100:.3f}%, base_ratio={base_ratio * 100:.3f}%, "
f"entry_dev={entry_dev * 100:.3f}%"
)
# 5. 风控检查
self.risk_kill_switch()
# 6. 获取持仓状态
if not self.get_position_status():
time.sleep(1)
continue
# 7. 检查交易是否启用
if not self.trading_enabled:
if self.pos != 0:
if self.close_position_all():
logger.info("交易被禁用,已平仓")
else:
logger.error("交易被禁用,但平仓失败,请手动检查!")
self.ding("⚠️ 交易被禁用但平仓失败,请手动检查持仓!", error=True)
logger.warning("交易被禁用(风控触发),等待...")
time.sleep(5)
continue
# 8. 检查危险市场
if self.is_danger_market(klines, price):
logger.warning("危险模式:高波动/大实体K暂停开仓")
self.maybe_exit(price, ema_value, tp, sl, vol_scale)
time.sleep(self.cfg.tick_refresh_sec)
continue
# 9. 执行交易逻辑
# 先检查平仓(包括正常平仓、止盈、止损)
self.maybe_exit(price, ema_value, tp, sl, vol_scale)
# 再检查开仓
self.maybe_enter(price, ema_value, entry_dev)
# 10. 状态通知
bal = self.get_assets_available()
self.notify_status_throttled(
price, ema_value, dev, bal,
atr_ratio, base_ratio, vol_scale,
entry_dev, tp, sl,
entry_floor, tp_floor, sl_floor
)
time.sleep(self.cfg.tick_refresh_sec)
if __name__ == "__main__":
"""
Windows PowerShell:
setx BITMART_API_KEY "你的key"
setx BITMART_SECRET_KEY "你的secret"
setx BITMART_MEMO "合约交易"
重新打开终端再运行。
Linux/macOS:
export BITMART_API_KEY="你的key"
export BITMART_SECRET_KEY="你的secret"
export BITMART_MEMO "合约交易"
"""
cfg = StrategyConfig()
bot = BitmartFuturesMeanReversionBot(cfg, bit_id="f2320f57e24c45529a009e1541e25961")
# 设置日志级别为INFO以便查看详细计算过程
logger.remove()
logger.add(lambda msg: tqdm.write(msg, end=""), level="INFO")
try:
bot.action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
bot.ding("🤖 策略已手动停止")
except Exception as e:
logger.error(f"程序异常退出: {e}")
bot.ding(f"❌ 策略异常退出: {e}", error=True)
raise