# -*- coding: utf-8 -*- """ BitMart 基于开盘价的五分之一策略交易 策略规则(与 1111 一致): 1. 触发价格计算(基于前一根有效K线,实体>=0.1): - 做多触发价 = 当前K线开盘价 + 实体/5 - 做空触发价 = 当前K线开盘价 - 实体/5 2. 信号触发条件: - 当前K线最高价 >= 做多触发价 → 做多信号 - 当前K线最低价 <= 做空触发价 → 做空信号 3. 前1分30秒反手(若已有持仓): - 3分钟K线的前1分30秒内若出现反手信号,则平仓开反手 - 持空反手做多:价格涨到 开仓价 + 前一根实体/5 - 持多反手做空:价格跌到 开仓价 - 前一根实体/5 运行方式(在项目根目录 lm_code 下): python open_fifth_strategy/main.py """ import sys from pathlib import Path # 确保项目根目录在路径中 _root = Path(__file__).resolve().parent.parent if str(_root) not in sys.path: sys.path.insert(0, str(_root)) import random import time from concurrent.futures import ThreadPoolExecutor from loguru import logger from bit_tools import openBrowser from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions from bitmart.api_contract import APIContract from 交易.tools import send_dingtalk_message from open_fifth_strategy.config import ( API_KEY, SECRET_KEY, MEMO, CONTRACT_SYMBOL, KLINE_STEP, MIN_BODY_SIZE, CHECK_INTERVAL, LEVERAGE, OPEN_TYPE, RISK_PERCENT, REVERSE_PRICE_TOLERANCE, REVERSE_WINDOW_1M_BARS, BIT_ID, ) ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk") class OpenBasedFifthStrategy: """基于开盘价的五分之一策略""" def __init__(self, bit_id=None): self.page = None self.api_key = API_KEY self.secret_key = SECRET_KEY self.memo = MEMO self.contract_symbol = CONTRACT_SYMBOL self.contractAPI = APIContract( self.api_key, self.secret_key, self.memo, timeout=(5, 15) ) self.start = 0 # 持仓: -1空, 0无, 1多 self.open_avg_price = None self.current_amount = None self.position_cross = None self.bit_id = bit_id or BIT_ID self.min_body_size = MIN_BODY_SIZE self.kline_step = KLINE_STEP self.check_interval = CHECK_INTERVAL self.leverage = LEVERAGE self.open_type = OPEN_TYPE self.risk_percent = RISK_PERCENT self.reverse_price_tolerance = REVERSE_PRICE_TOLERANCE self.reverse_window_1m_bars = REVERSE_WINDOW_1M_BARS self.last_trigger_kline_id = None self.last_trigger_direction = None self.last_trade_kline_id = None self.entry_prev_body = None self.entry_price = None self.entry_kline_id = None self.early_reverse_executed = False # ==================== 策略核心(基于开盘价)==================== def get_body_size(self, candle): return abs(float(candle["open"]) - float(candle["close"])) def find_valid_prev_bar(self, all_data, current_idx): """找前一根有效K线(实体>=min_body_size)""" if current_idx <= 0: return None, None for i in range(current_idx - 1, -1, -1): prev = all_data[i] if self.get_body_size(prev) >= self.min_body_size: return i, prev return None, None def get_open_based_levels(self, prev, curr_open): """ 基于当前K线开盘价计算触发价(与1111一致) 做多触发 = 当前K线开盘价 + 实体/5 做空触发 = 当前K线开盘价 - 实体/5 """ body = self.get_body_size(prev) if body < 0.001: return None, None curr_o = float(curr_open) long_trigger = curr_o + body / 5 short_trigger = curr_o - body / 5 return long_trigger, short_trigger def get_1m_bars_for_3m_bar(self, bar_3m): """获取当前3分钟K线对应的3根1分钟K线""" try: start_ts = int(bar_3m["id"]) end_ts = start_ts + 3 * 60 response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=1, start_time=start_ts, end_time=end_ts, )[0] if response.get("code") != 1000: return [] data = response.get("data", []) out = [] for k in data: out.append( { "id": int(k["timestamp"]), "open": float(k["open_price"]), "high": float(k["high_price"]), "low": float(k["low_price"]), "close": float(k["close_price"]), } ) out.sort(key=lambda x: x["id"]) return out except Exception as e: logger.warning(f"获取1分钟K线失败: {e}") return [] def determine_trigger_order_by_1m(self, bars_1m, long_trigger, short_trigger): """用1分钟K线判断先触发做多还是做空""" if not bars_1m: return None for bar in bars_1m: high = float(bar["high"]) low = float(bar["low"]) open_price = float(bar["open"]) long_ok = high >= long_trigger short_ok = low <= short_trigger if long_ok and not short_ok: return "long" if short_ok and not long_ok: return "short" if long_ok and short_ok: d_long = abs(long_trigger - open_price) d_short = abs(short_trigger - open_price) return "short" if d_short < d_long else "long" return None def check_early_reverse_signal(self, curr_kline, kline_data): """ 前1分30秒反手检测(与1111一致): - 3分钟K线的「前1分30秒」内若出现反手信号 → 平仓开反手 - 持空反手做多:价格涨到 开仓价 + 前一根实体/5 - 持多反手做空:价格跌到 开仓价 - 前一根实体/5 - 使用前 N 根1分钟K线近似(REVERSE_WINDOW_1M_BARS=2 覆盖约 0:00~1:30) """ if self.start == 0: return None, None curr_kline_id = curr_kline["id"] if self.entry_kline_id != curr_kline_id: self.early_reverse_executed = False self.entry_kline_id = curr_kline_id if self.early_reverse_executed: return None, None entry_price = self.entry_price if entry_price is None and self.open_avg_price: entry_price = float(self.open_avg_price) if entry_price is None: return None, None _, valid_prev = self.find_valid_prev_bar( kline_data, len(kline_data) - 1 ) if valid_prev is None: return None, None prev_body = self.get_body_size(valid_prev) reverse_offset = prev_body / 5 bars_1m = self.get_1m_bars_for_3m_bar(curr_kline) n_bars = min(self.reverse_window_1m_bars, len(bars_1m)) if n_bars < 1: return None, None # 遍历前 N 根1分钟K线(覆盖前1分30秒),任一出现反手信号则触发 for i in range(n_bars): bar = bars_1m[i] bar_high = float(bar["high"]) bar_low = float(bar["low"]) bar_close = float(bar["close"]) if self.start == -1: reverse_long_trigger = entry_price + reverse_offset if bar_high >= reverse_long_trigger: if bar_close >= reverse_long_trigger - self.reverse_price_tolerance: return "long", reverse_long_trigger elif self.start == 1: reverse_short_trigger = entry_price - reverse_offset if bar_low <= reverse_short_trigger: if bar_close <= reverse_short_trigger + self.reverse_price_tolerance: return "short", reverse_short_trigger return None, None def check_realtime_trigger(self, kline_data, current_position=0): """ 实时检测信号 - 无仓位:基于当前K线开盘价计算触发价 - 有仓位(反手):基于开仓价计算触发价(1111:开仓价±前一根实体/5) 返回:(方向, 触发价, 有效前一根, 当前K线) 或 (None,...) """ if len(kline_data) < 2: return None, None, None, None curr = kline_data[-1] curr_kline_id = curr["id"] curr_open = curr["open"] valid_prev_idx, prev = self.find_valid_prev_bar( kline_data, len(kline_data) - 1 ) if prev is None: return None, None, None, None prev_body = self.get_body_size(prev) reverse_offset = prev_body / 5 # 有仓位时反手用开仓价(1111),无仓位用当前K线开盘价 if current_position != 0: entry = self.entry_price or (float(self.open_avg_price) if self.open_avg_price else None) if entry is not None: long_trigger = entry + reverse_offset short_trigger = entry - reverse_offset else: long_trigger, short_trigger = self.get_open_based_levels(prev, curr_open) else: long_trigger, short_trigger = self.get_open_based_levels(prev, curr_open) if long_trigger is None: return None, None, None, None c_high = float(curr["high"]) c_low = float(curr["low"]) c_close = float(curr["close"]) long_triggered = c_high >= long_trigger short_triggered = c_low <= short_trigger direction = None trigger_price = None if current_position == 1: if short_triggered and c_close <= short_trigger + self.reverse_price_tolerance: direction = "short" trigger_price = short_trigger elif current_position == -1: if long_triggered and c_close >= long_trigger - self.reverse_price_tolerance: direction = "long" trigger_price = long_trigger else: if long_triggered and short_triggered: bars_1m = self.get_1m_bars_for_3m_bar(curr) if bars_1m: direction = self.determine_trigger_order_by_1m( bars_1m, long_trigger, short_trigger ) trigger_price = ( long_trigger if direction == "long" else short_trigger ) if direction is None: c_open_f = float(curr["open"]) d_long = abs(long_trigger - c_open_f) d_short = abs(short_trigger - c_open_f) direction = "short" if d_short <= d_long else "long" trigger_price = ( long_trigger if direction == "long" else short_trigger ) elif short_triggered: direction = "short" trigger_price = short_trigger elif long_triggered: direction = "long" trigger_price = long_trigger if direction is None: return None, None, None, None if ( self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction ): return None, None, None, None return direction, trigger_price, prev, curr # ==================== BitMart API ==================== def get_klines(self): try: end_time = int(time.time()) response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=self.kline_step, start_time=end_time - 3600 * 3, end_time=end_time, )[0]["data"] formatted = [] for k in response: formatted.append( { "id": int(k["timestamp"]), "open": float(k["open_price"]), "high": float(k["high_price"]), "low": float(k["low_price"]), "close": float(k["close_price"]), } ) formatted.sort(key=lambda x: x["id"]) return formatted except Exception as e: if "429" in str(e) or "too many requests" in str(e).lower(): logger.warning(f"API限流,等待60秒: {e}") time.sleep(60) else: logger.error(f"获取K线异常: {e}") self.ding("获取K线异常", error=True) return None def get_available_balance(self): try: response = self.contractAPI.get_assets_detail()[0] if response["code"] == 1000: data = response["data"] if isinstance(data, dict): return float(data.get("available_balance", 0)) for asset in data: if asset.get("currency") == "USDT": return float(asset.get("available_balance", 0)) return None except Exception as e: logger.error(f"余额查询异常: {e}") return None def get_position_status(self): try: response = self.contractAPI.get_position( contract_symbol=self.contract_symbol )[0] if response["code"] == 1000: positions = response["data"] if not positions: self.start = 0 self.open_avg_price = None self.current_amount = None self.position_cross = None return True self.start = 1 if positions[0]["position_type"] == 1 else -1 self.open_avg_price = positions[0]["open_avg_price"] self.current_amount = positions[0]["current_amount"] self.position_cross = positions[0]["position_cross"] return True return False except Exception as e: logger.error(f"持仓查询异常: {e}") return False def set_leverage(self): try: response = self.contractAPI.post_submit_leverage( contract_symbol=self.contract_symbol, leverage=self.leverage, open_type=self.open_type, )[0] if response["code"] == 1000: logger.success(f"全仓 {self.leverage}x 杠杆设置成功") return True logger.error(f"杠杆设置失败: {response}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False # ==================== 浏览器 ==================== def _open_browser(self): try: bit_port = openBrowser(id=self.bit_id) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) return True except Exception: return False def close_extra_tabs(self): try: for idx, tab in enumerate(self.page.get_tabs()): if idx > 0: tab.close() return True except Exception: return False def click_safe(self, xpath, sleep=0.5): try: ele = self.page.ele(xpath) if not ele: return False ele.scroll.to_see(center=True) time.sleep(sleep) ele.click(by_js=True) return True except Exception: return False def 平仓(self): logger.info("执行平仓...") self.click_safe('x://span[normalize-space(text()) ="市价"]') time.sleep(0.5) self.ding("执行平仓操作") def 开单(self, marketPriceLongOrder=0, size=None): if size is None or size <= 0: logger.warning("开单金额无效") return False direction_str = "做多" if marketPriceLongOrder == 1 else "做空" logger.info(f"执行{direction_str},金额: {size}") size = max(1, min(25, int(size))) # 限制单次下单金额 1~25 try: self.click_safe('x://button[normalize-space(text()) ="市价"]') self.page.ele('x://*[@id="size_0"]').input(str(size)) if marketPriceLongOrder == -1: self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') else: self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') self.ding(f"执行{direction_str},金额: {size}") return True except Exception as e: logger.error(f"开单异常: {e}") return False def ding(self, msg, error=False): prefix = "❌开盘价五分之一:" if error else "🔔开盘价五分之一:" full_msg = f"{prefix}{msg}" if error: logger.error(msg) for _ in range(3): ding_executor.submit(self._send_ding_safe, full_msg) else: logger.info(msg) ding_executor.submit(self._send_ding_safe, full_msg) def _send_ding_safe(self, msg): try: send_dingtalk_message(msg) except Exception as e: logger.warning(f"消息发送失败: {e}") def _send_position_message(self, latest_kline): current_price = float(latest_kline["close"]) balance = self.get_available_balance() self.balance = balance if balance is not None else 0.0 if self.start != 0: open_avg_price = float(self.open_avg_price) current_amount = float(self.current_amount) position_cross = float( getattr(self, "position_cross", 0) or 0 ) if self.start == 1: unrealized_pnl = current_amount * 0.001 * ( current_price - open_avg_price ) else: unrealized_pnl = current_amount * 0.001 * ( open_avg_price - current_price ) pnl_rate = ( (current_price - open_avg_price) / open_avg_price * 100 if self.start == 1 else (open_avg_price - current_price) / open_avg_price * 100 ) direction_str = "空" if self.start == -1 else "多" msg = ( f"【开盘价五分之一 {self.contract_symbol}】\n" f"方向:{direction_str}\n" f"现价:{current_price:.2f}\n" f"开仓均价:{open_avg_price:.2f}\n" f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n" f"余额:{self.balance:.2f}" ) else: msg = ( f"【开盘价五分之一 {self.contract_symbol}】\n" f"方向:无\n" f"现价:{current_price:.2f}\n" f"余额:{self.balance:.2f}" ) self.ding(msg) # ==================== 主循环 ==================== def action(self): if not self.set_leverage(): logger.error("杠杆设置失败") return if not self._open_browser(): self.ding("打开浏览器失败!", error=True) return logger.info("浏览器打开成功") self.close_extra_tabs() self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") time.sleep(2) self.click_safe('x://button[normalize-space(text()) ="市价"]') logger.info( f"开盘价五分之一策略(3分钟K线)开始监测,间隔: {self.check_interval}秒" ) last_report_time = 0 report_interval = 300 while True: for _ in range(5): if self._open_browser(): break time.sleep(5) else: self.ding("打开浏览器失败!", error=True) return self.close_extra_tabs() self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") time.sleep(2) self.click_safe('x://button[normalize-space(text()) ="市价"]') try: kline_data = self.get_klines() if not kline_data or len(kline_data) < 3: logger.warning("K线数据不足...") time.sleep(self.check_interval) continue curr = kline_data[-1] if not self.get_position_status(): logger.warning("获取仓位失败,使用缓存") # 有仓位但 entry_price 未设置时(如程序重启),用开仓均价补全 if self.start != 0 and self.entry_price is None and self.open_avg_price: self.entry_price = float(self.open_avg_price) logger.info(f"从API恢复 entry_price={self.entry_price:.2f}") # 前1分30秒反手 if self.start != 0: first_dir, first_trigger = self.check_early_reverse_signal( curr, kline_data ) if first_dir: curr_kline_id = curr["id"] if self.last_trade_kline_id != curr_kline_id: balance = self.get_available_balance() trade_size = (balance or 0) * self.risk_percent if first_dir == "long" and self.start == -1: self.平仓() time.sleep(1) self.开单(marketPriceLongOrder=1, size=trade_size) elif first_dir == "short" and self.start == 1: self.平仓() time.sleep(1) self.开单(marketPriceLongOrder=-1, size=trade_size) self.early_reverse_executed = True self.last_trade_kline_id = curr_kline_id _, valid_prev = self.find_valid_prev_bar( kline_data, len(kline_data) - 1 ) if valid_prev: self.entry_prev_body = self.get_body_size(valid_prev) self.entry_price = float(curr["close"]) self.entry_kline_id = curr_kline_id self.get_position_status() self._send_position_message(curr) time.sleep(self.check_interval) continue # 常规信号检测 direction, trigger_price, valid_prev, curr_kline = ( self.check_realtime_trigger(kline_data, self.start) ) if direction: curr_kline_id = curr_kline["id"] if self.last_trade_kline_id == curr_kline_id: self.last_trigger_kline_id = curr_kline_id self.last_trigger_direction = direction time.sleep(self.check_interval) continue if (direction == "long" and self.start == 1) or ( direction == "short" and self.start == -1 ): self.last_trigger_kline_id = curr_kline_id self.last_trigger_direction = direction time.sleep(self.check_interval) continue balance = self.get_available_balance() trade_size = (balance or 0) * self.risk_percent executed = False if direction == "long": if self.start == -1: self.平仓() time.sleep(1) self.开单(marketPriceLongOrder=1, size=trade_size) executed = True elif self.start == 0: self.开单(marketPriceLongOrder=1, size=trade_size) executed = True elif direction == "short": if self.start == 1: self.平仓() time.sleep(1) self.开单(marketPriceLongOrder=-1, size=trade_size) executed = True elif self.start == 0: self.开单(marketPriceLongOrder=-1, size=trade_size) executed = True self.last_trigger_kline_id = curr_kline_id self.last_trigger_direction = direction if executed: self.last_trade_kline_id = curr_kline_id self.entry_price = trigger_price self.entry_prev_body = self.get_body_size(valid_prev) self.entry_kline_id = curr_kline_id self.early_reverse_executed = False self.get_position_status() self._send_position_message(curr_kline) last_report_time = time.time() if time.time() - last_report_time >= report_interval: if self.get_position_status(): self._send_position_message(kline_data[-1]) last_report_time = time.time() time.sleep(self.check_interval) except Exception as e: logger.error(f"主循环异常: {e}") time.sleep(self.check_interval) time.sleep(3) if random.randint(1, 10) > 7: self.page.close() time.sleep(15) if __name__ == "__main__": try: OpenBasedFifthStrategy(bit_id=BIT_ID).action() except KeyboardInterrupt: logger.info("程序被用户中断") finally: ding_executor.shutdown(wait=True) logger.info("已退出")