From afb26ced5daec2c7d9be2c7610fc351a86d916e9 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Fri, 6 Feb 2026 11:32:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=89=8D=E6=94=B9=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitmart/保守模式参数优化/README.md | 27 +- .../四分之一,五分钟,反手条件充足_保守模式.py | 387 +++++++++++++----- 2 files changed, 305 insertions(+), 109 deletions(-) diff --git a/bitmart/保守模式参数优化/README.md b/bitmart/保守模式参数优化/README.md index b8828f0..b8610a4 100644 --- a/bitmart/保守模式参数优化/README.md +++ b/bitmart/保守模式参数优化/README.md @@ -72,7 +72,30 @@ BITMART_PARAMS_PATH="/absolute/path/to/current_params.json" \ python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py" ``` -## 4. 费用模型说明 +## 4. 实时价格(WebSocket) + +保守模式脚本已支持: +- WebSocket 实时价优先 +- 自动重连 +- 超时自动回退 API 价格 + +默认 WebSocket 订阅: +- `wss://openapi-ws-v2.bitmart.com/api?protocol=1.1` +- topic: `futures/ticker:ETHUSDT` + +如果你本机没有 WebSocket 依赖,会自动回退 API。安装方式: + +```bash +pip3 install websocket-client +``` + +可选环境变量: +- `BITMART_WS_ENABLED=0`:禁用 WebSocket +- `BITMART_WS_URL=...`:自定义 WS 地址 +- `BITMART_WS_TOPIC=...`:自定义订阅 topic +- `BITMART_WS_PRICE_TTL=2.0`:价格新鲜度阈值(秒) + +## 5. 费用模型说明 优化器会按下面公式计入手续费返佣: @@ -87,7 +110,7 @@ python3 "/Users/ddrwode/code/lm_code/bitmart/保守模式参数优化/四分之 - `--raw-fee-rate` - `--rebate-ratio` -## 5. 重要提示 +## 6. 重要提示 - 回测撮合属于简化模型,不等于实盘撮合。 - 参数应周期性重训(例如每天或每周)。 diff --git a/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py b/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py index f2760e8..02abdf7 100644 --- a/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py +++ b/bitmart/保守模式参数优化/四分之一,五分钟,反手条件充足_保守模式.py @@ -1,6 +1,7 @@ import json import os import sys +import threading import time from pathlib import Path @@ -16,6 +17,11 @@ from DrissionPage import ChromiumOptions from bitmart.api_contract import APIContract +try: + import websocket +except Exception: + websocket = None + class BitmartFuturesTransactionConservative: def __init__(self, bit_id): @@ -72,6 +78,20 @@ class BitmartFuturesTransactionConservative: self.prev_entity = None # 上一根K线实体大小 self.current_open = None # 当前K线开盘价 + # WebSocket 实时价格(优先使用,失败自动回退 API) + self.ws_enabled = os.getenv("BITMART_WS_ENABLED", "1").strip().lower() not in {"0", "false", "off", "no"} + self.ws_url = os.getenv("BITMART_WS_URL", "wss://openapi-ws-v2.bitmart.com/api?protocol=1.1") + self.ws_topic = os.getenv("BITMART_WS_TOPIC", f"futures/ticker:{self.contract_symbol}") + self.ws_price_ttl_seconds = float(os.getenv("BITMART_WS_PRICE_TTL", "2.0")) + self.ws_reconnect_seconds = float(os.getenv("BITMART_WS_RECONNECT_SECONDS", "2.0")) + self.ws_last_price = None + self.ws_last_price_time = 0.0 + self.ws_last_stale_log_time = 0.0 + self.ws_app = None + self.ws_thread = None + self.ws_stop_event = threading.Event() + self.ws_lock = threading.Lock() + # 启动时尝试读取动态参数(可由优化脚本自动生成) self.load_runtime_params() @@ -134,6 +154,142 @@ class BitmartFuturesTransactionConservative: except Exception as e: logger.error(f"加载动态参数文件失败: {e} | path={params_path}") + def _safe_float(self, value): + try: + f = float(value) + if f > 0: + return f + return None + except Exception: + return None + + def _extract_price_from_ws_payload(self, payload): + """ + 尽量从 WS 消息中提取价格,兼容不同字段结构。 + """ + key_priority = ("last_price", "last", "price", "close_price", "close", "mark_price") + stack = [payload] + while stack: + node = stack.pop(0) + if isinstance(node, dict): + for key in key_priority: + price = self._safe_float(node.get(key)) + if price is not None: + return price + + bid = self._safe_float(node.get("best_bid_price") or node.get("bid_price") or node.get("bid")) + ask = self._safe_float(node.get("best_ask_price") or node.get("ask_price") or node.get("ask")) + if bid is not None and ask is not None: + return (bid + ask) / 2.0 + + for value in node.values(): + if isinstance(value, (dict, list)): + stack.append(value) + elif isinstance(node, list): + for value in node: + if isinstance(value, (dict, list)): + stack.append(value) + return None + + def _on_ws_open(self, ws): + subscribe_msg = {"action": "subscribe", "args": [self.ws_topic]} + ws.send(json.dumps(subscribe_msg)) + logger.success(f"WebSocket 已连接并订阅: {self.ws_topic}") + + def _on_ws_message(self, ws, message): + try: + if isinstance(message, bytes): + message = message.decode("utf-8", errors="ignore") + if not message: + return + if message == "pong": + return + if message == "ping": + try: + ws.send("pong") + except Exception: + pass + return + + payload = json.loads(message) + price = self._extract_price_from_ws_payload(payload) + if price is not None: + with self.ws_lock: + self.ws_last_price = price + self.ws_last_price_time = time.time() + except Exception: + return + + def _on_ws_error(self, ws, error): + if not self.ws_stop_event.is_set(): + logger.warning(f"WebSocket 价格流异常: {error}") + + def _on_ws_close(self, ws, close_status_code, close_msg): + if not self.ws_stop_event.is_set(): + logger.warning(f"WebSocket 价格流断开: code={close_status_code}, msg={close_msg}") + + def _ws_price_loop(self): + while not self.ws_stop_event.is_set(): + try: + self.ws_app = websocket.WebSocketApp( + self.ws_url, + on_open=self._on_ws_open, + on_message=self._on_ws_message, + on_error=self._on_ws_error, + on_close=self._on_ws_close, + ) + self.ws_app.run_forever(ping_interval=20, ping_timeout=10) + except Exception as e: + if not self.ws_stop_event.is_set(): + logger.warning(f"WebSocket 启动/运行失败: {e}") + finally: + self.ws_app = None + + if not self.ws_stop_event.is_set(): + time.sleep(self.ws_reconnect_seconds) + + def start_price_stream(self): + if not self.ws_enabled: + logger.info("WebSocket 实时价格已禁用(BITMART_WS_ENABLED=0)") + return False + if websocket is None: + logger.warning("未安装 websocket-client,实时价格不可用,将回退到 API 轮询") + return False + if self.ws_thread and self.ws_thread.is_alive(): + return True + + self.ws_stop_event.clear() + self.ws_thread = threading.Thread(target=self._ws_price_loop, name="bitmart-price-ws", daemon=True) + self.ws_thread.start() + return True + + def stop_price_stream(self): + self.ws_stop_event.set() + if self.ws_app is not None: + try: + self.ws_app.close() + except Exception: + pass + if self.ws_thread and self.ws_thread.is_alive(): + self.ws_thread.join(timeout=3) + + def get_ws_price(self): + with self.ws_lock: + price = self.ws_last_price + ts = self.ws_last_price_time + if price is None: + return None + + age = time.time() - ts + if age <= self.ws_price_ttl_seconds: + return price + + now = time.time() + if now - self.ws_last_stale_log_time > 30: + logger.info(f"WebSocket 价格超时({age:.1f}s),回退 API 获取价格") + self.ws_last_stale_log_time = now + return None + def get_klines(self): """获取最近2根K线(当前K线和上一根K线)""" try: @@ -169,6 +325,10 @@ class BitmartFuturesTransactionConservative: def get_current_price(self): """获取当前最新价格""" + ws_price = self.get_ws_price() + if ws_price is not None: + return ws_price + try: end_time = int(time.time()) response = self.contractAPI.get_kline( @@ -648,136 +808,149 @@ class BitmartFuturesTransactionConservative: logger.error("杠杆设置失败,程序继续运行但可能下单失败") return + if self.start_price_stream(): + logger.success("实时价格流已启动(WebSocket 优先)") + else: + logger.warning("实时价格流未启动,当前将使用 API 轮询价格") + page_start = True - while True: + try: + while True: - if page_start: - # 打开浏览器 - for i in range(5): - if self.openBrowser(): - logger.info("浏览器打开成功") - break - else: - self.ding("打开浏览器失败!", error=True) - return + if page_start: + # 打开浏览器 + for i in range(5): + if self.openBrowser(): + logger.info("浏览器打开成功") + break + else: + self.ding("打开浏览器失败!", error=True) + return - # 进入交易页面 - self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") - self.click_safe('x://button[normalize-space(text()) ="市价"]') + # 进入交易页面 + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + self.click_safe('x://button[normalize-space(text()) ="市价"]') - self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True) + self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True) - page_start = False + page_start = False - try: - # 1. 获取K线数据(当前K线和上一根K线) - prev_kline, current_kline = self.get_klines() - if not prev_kline or not current_kline: - logger.warning("获取K线失败,等待重试...") - time.sleep(5) - continue + try: + # 1. 获取K线数据(当前K线和上一根K线) + prev_kline, current_kline = self.get_klines() + if not prev_kline or not current_kline: + logger.warning("获取K线失败,等待重试...") + time.sleep(5) + continue - # 记录进入新的K线 - current_kline_time = current_kline['id'] - if self.last_kline_time != current_kline_time: - self.last_kline_time = current_kline_time - logger.info(f"进入新K线: {current_kline_time}") + # 记录进入新的K线 + current_kline_time = current_kline['id'] + if self.last_kline_time != current_kline_time: + self.last_kline_time = current_kline_time + logger.info(f"进入新K线: {current_kline_time}") - # 2. 获取当前价格 - current_price = self.get_current_price() - if not current_price: - logger.warning("获取价格失败,等待重试...") - time.sleep(2) - continue + # 2. 获取当前价格 + current_price = self.get_current_price() + if not current_price: + logger.warning("获取价格失败,等待重试...") + time.sleep(2) + continue - # 3. 每次循环都通过SDK获取真实持仓状态(避免状态不同步导致双向持仓) - if not self.get_position_status(): - logger.warning("获取持仓状态失败,等待重试...") - time.sleep(2) - continue + # 3. 每次循环都通过SDK获取真实持仓状态(避免状态不同步导致双向持仓) + if not self.get_position_status(): + logger.warning("获取持仓状态失败,等待重试...") + time.sleep(2) + continue - logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)") + logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)") - # 3.5 止损/止盈/保本锁盈/移动止损 - if self.start != 0: - pnl_usd = self.get_unrealized_pnl_usd() - if pnl_usd is not None: - # 固定止损:亏损达到 stop_loss_usd 平仓 - if pnl_usd <= self.stop_loss_usd: - logger.info(f"仓位亏损 {pnl_usd:.2f} 美元 <= 止损 {self.stop_loss_usd} 美元,执行止损平仓") - self.平仓() - self.max_unrealized_pnl_seen = None - time.sleep(3) - continue - # 更新持仓期间最大盈利(用于移动止损) - if self.max_unrealized_pnl_seen is None: - self.max_unrealized_pnl_seen = pnl_usd - else: - self.max_unrealized_pnl_seen = max(self.max_unrealized_pnl_seen, pnl_usd) - # 保本锁盈:盈利达到 break_even_activation_usd 后,回落到 floor 即平仓 - if self.max_unrealized_pnl_seen >= self.break_even_activation_usd and pnl_usd <= self.break_even_floor_usd: - logger.info( - f"保本锁盈:最高盈利 {self.max_unrealized_pnl_seen:.2f} >= {self.break_even_activation_usd}," - f"当前盈利回落到 {pnl_usd:.2f} <= {self.break_even_floor_usd},执行平仓" - ) - self.平仓() - self.max_unrealized_pnl_seen = None - time.sleep(3) - continue - # 移动止损:盈利曾达到 activation 后,从最高盈利回撤 trailing_distance 则平仓 - if self.max_unrealized_pnl_seen >= self.trailing_activation_usd: - if pnl_usd < self.max_unrealized_pnl_seen - self.trailing_distance_usd: - logger.info(f"移动止损:当前盈利 {pnl_usd:.2f} 从最高 {self.max_unrealized_pnl_seen:.2f} 回撤 >= {self.trailing_distance_usd} 美元,平仓") + # 3.5 止损/止盈/保本锁盈/移动止损 + if self.start != 0: + pnl_usd = self.get_unrealized_pnl_usd() + if pnl_usd is not None: + # 固定止损:亏损达到 stop_loss_usd 平仓 + if pnl_usd <= self.stop_loss_usd: + logger.info(f"仓位亏损 {pnl_usd:.2f} 美元 <= 止损 {self.stop_loss_usd} 美元,执行止损平仓") + self.平仓() + self.max_unrealized_pnl_seen = None + time.sleep(3) + continue + # 更新持仓期间最大盈利(用于移动止损) + if self.max_unrealized_pnl_seen is None: + self.max_unrealized_pnl_seen = pnl_usd + else: + self.max_unrealized_pnl_seen = max(self.max_unrealized_pnl_seen, pnl_usd) + # 保本锁盈:盈利达到 break_even_activation_usd 后,回落到 floor 即平仓 + if self.max_unrealized_pnl_seen >= self.break_even_activation_usd and pnl_usd <= self.break_even_floor_usd: + logger.info( + f"保本锁盈:最高盈利 {self.max_unrealized_pnl_seen:.2f} >= {self.break_even_activation_usd}," + f"当前盈利回落到 {pnl_usd:.2f} <= {self.break_even_floor_usd},执行平仓" + ) + self.平仓() + self.max_unrealized_pnl_seen = None + time.sleep(3) + continue + # 移动止损:盈利曾达到 activation 后,从最高盈利回撤 trailing_distance 则平仓 + if self.max_unrealized_pnl_seen >= self.trailing_activation_usd: + if pnl_usd < self.max_unrealized_pnl_seen - self.trailing_distance_usd: + logger.info(f"移动止损:当前盈利 {pnl_usd:.2f} 从最高 {self.max_unrealized_pnl_seen:.2f} 回撤 >= {self.trailing_distance_usd} 美元,平仓") + self.平仓() + self.max_unrealized_pnl_seen = None + time.sleep(3) + continue + # 止盈:盈利达到 take_profit_usd 平仓 + if pnl_usd >= self.take_profit_usd: + logger.info(f"仓位盈利 {pnl_usd:.2f} 美元 >= {self.take_profit_usd} 美元,执行止盈平仓") self.平仓() self.max_unrealized_pnl_seen = None time.sleep(3) continue - # 止盈:盈利达到 take_profit_usd 平仓 - if pnl_usd >= self.take_profit_usd: - logger.info(f"仓位盈利 {pnl_usd:.2f} 美元 >= {self.take_profit_usd} 美元,执行止盈平仓") - self.平仓() - self.max_unrealized_pnl_seen = None - time.sleep(3) - continue - # 4. 检查信号 - signal = self.check_signal(current_price, prev_kline, current_kline) + # 4. 检查信号 + signal = self.check_signal(current_price, prev_kline, current_kline) - # 5. 反手过滤:冷却时间 + 最小价差 - if signal and signal[0].startswith('reverse_'): - if not self.can_reverse(current_price, signal[1]): - signal = None + # 5. 反手过滤:冷却时间 + 最小价差 + if signal and signal[0].startswith('reverse_'): + if not self.can_reverse(current_price, signal[1]): + signal = None - # 5.5 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却 - if signal and signal[0] in ('long', 'short'): - if not self.can_open(current_kline_time): - signal = None - else: - self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录 + # 5.5 开仓频率过滤:同一根 K 线只开一次 + 开仓冷却 + if signal and signal[0] in ('long', 'short'): + if not self.can_open(current_kline_time): + signal = None + else: + self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录 - # 6. 有信号则执行交易 - if signal: - trade_success = self.execute_trade(signal) - if trade_success: - logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}") - page_start = True - else: - logger.warning(f"交易执行失败或被阻止: {signal[0]}") + # 6. 有信号则执行交易 + if signal: + trade_success = self.execute_trade(signal) + if trade_success: + logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}") + page_start = True + else: + logger.warning(f"交易执行失败或被阻止: {signal[0]}") - # 短暂等待后继续循环(同一根K线遇到信号就操作) - time.sleep(0.1) + # 短暂等待后继续循环(同一根K线遇到信号就操作) + time.sleep(0.1) - if page_start: - self.page.close() + if page_start and self.page: + self.page.close() + time.sleep(5) + + except KeyboardInterrupt: + logger.info("用户中断,程序退出") + break + except Exception as e: + logger.error(f"主循环异常: {e}") time.sleep(5) - - except KeyboardInterrupt: - logger.info("用户中断,程序退出") - break - except Exception as e: - logger.error(f"主循环异常: {e}") - time.sleep(5) + finally: + self.stop_price_stream() + if self.page: + try: + self.page.close() + except Exception: + pass if __name__ == '__main__':