From b3fb021b7cabfeebb21f6bf93831a02860174171 Mon Sep 17 00:00:00 2001 From: 27942 Date: Wed, 28 Jan 2026 16:41:34 +0800 Subject: [PATCH] 23423423 --- 交易/bitmart-三分之一策略交易.py | 223 +++++++++++++++++++------------ 1 file changed, 141 insertions(+), 82 deletions(-) diff --git a/交易/bitmart-三分之一策略交易.py b/交易/bitmart-三分之一策略交易.py index 0bb5ec0..c63cdcb 100644 --- a/交易/bitmart-三分之一策略交易.py +++ b/交易/bitmart-三分之一策略交易.py @@ -66,6 +66,11 @@ class BitmartOneThirdStrategy: self.min_body_size = 0.1 # 最小实体大小 self.kline_step = 5 # K线周期(5分钟) self.kline_count = 20 # 获取的K线数量,用于向前查找有效K线 + + # 实时监测参数 + self.check_interval = 3 # 检测间隔(秒) + self.last_trigger_kline_id = None # 记录上次触发信号的K线ID,避免同一K线重复触发 + self.last_trigger_direction = None # 记录上次触发的方向 # ========================= 三分之一策略核心函数 ========================= @@ -157,6 +162,49 @@ class BitmartOneThirdStrategy: return None, None, None + def check_realtime_trigger(self, kline_data): + """ + 实时检测当前K线是否触发信号 + 基于已收盘的K线计算触发价格,用当前正在形成的K线判断 + 返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None) + """ + if len(kline_data) < 2: + return None, None, None, None + + # 当前正在形成的K线(最后一根,未收盘) + curr = kline_data[-1] + curr_kline_id = curr['id'] + + # 从倒数第二根开始往前找有效K线(已收盘的K线) + # current_idx 设为 len-1,但我们从 len-2 开始找有效的前一根 + valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size) + + if prev is None: + return None, None, None, None + + trigger_price, direction = self.get_one_third_level(prev) + + if trigger_price is None: + return None, None, None, None + + # 检查是否在同一根K线内已经触发过相同方向 + if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: + return None, None, None, None + + # 使用当前K线的实时高低点来判断 + c_high = float(curr['high']) + c_low = float(curr['low']) + + # 做多:前一根阴线,当前K线的最高价达到触发价格 + if direction == 'long' and c_high >= trigger_price: + return 'long', trigger_price, prev, curr + + # 做空:前一根阳线,当前K线的最低价达到触发价格 + if direction == 'short' and c_low <= trigger_price: + return 'short', trigger_price, prev, curr + + return None, None, None, None + # ========================= BitMart API 函数 ========================= def get_klines(self): @@ -382,7 +430,7 @@ class BitmartOneThirdStrategy: # ========================= 主运行函数 ========================= def action(self): - """主运行逻辑""" + """主运行逻辑 - 实时监测版本""" # 启动时设置全仓高杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,程序继续运行但可能下单失败") @@ -404,102 +452,113 @@ class BitmartOneThirdStrategy: self.click_safe('x://button[normalize-space(text()) ="市价"]') - self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80) - - self.time_start = None # 时间状态,避免同一时段重复处理 + logger.info(f"开始实时监测,检测间隔: {self.check_interval}秒") + + # 用于定时发送持仓信息(每5分钟发一次) + last_report_time = 0 + report_interval = 300 # 5分钟报告一次持仓 while True: - # 更新进度条 - current_time = time.localtime() - current_minute = current_time.tm_min - self.pbar.n = current_minute % 5 - self.pbar.refresh() + try: + # 获取K线数据 + kline_data = self.get_klines() + if not kline_data: + logger.warning("获取K线数据失败,等待重试...") + time.sleep(self.check_interval) + continue - # 检查是否已处理过当前时间段 - if self.time_start == self.get_now_time(): - time.sleep(3) - continue + if len(kline_data) < 3: + logger.warning("K线数据不足") + time.sleep(self.check_interval) + continue - # 获取K线数据 - kline_data = self.get_klines() - if not kline_data: - logger.warning("获取K线数据失败") - time.sleep(5) - continue + # 获取当前K线信息用于日志 + curr = kline_data[-1] + curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S') + + # ========== 实时信号检测 ========== + direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) - # 检查数据是否是最新的 - if len(kline_data) < 3: - logger.warning("K线数据不足") - time.sleep(5) - continue + if direction: + # 获取持仓状态 + if not self.get_position_status(): + logger.warning("获取仓位信息失败") + time.sleep(self.check_interval) + continue - # 判断最新K线时间 - latest_kline_time = kline_data[-1]['id'] - if self.get_now_time() != latest_kline_time: - time.sleep(3) - continue - - self.time_start = self.get_now_time() - - # 获取持仓状态 - if not self.get_position_status(): - logger.warning("获取仓位信息失败") - self.ding(msg="获取仓位信息失败!", error=True) - continue - - logger.info(f"当前持仓状态: {self.start} (1=多, -1=空, 0=无)") - - # ========== 三分之一策略信号检测 ========== - current_idx = len(kline_data) - 1 - direction, trigger_price, valid_prev_idx = self.check_trigger(kline_data, current_idx) - - if direction: - # 获取有效前一根K线用于日志 - valid_prev = kline_data[valid_prev_idx] if valid_prev_idx is not None else None - curr = kline_data[current_idx] - - if valid_prev: prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') - curr_time = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M') prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" prev_body = self.get_body_size(valid_prev) - logger.info(f"检测到{direction}信号,触发价格: {trigger_price:.2f}") - logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f}") - logger.info(f" 当前根[{curr_time}]: H={curr['high']:.2f} L={curr['low']:.2f}") + logger.info(f"{'='*50}") + logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}") + logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}") + logger.info(f" 当前K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}") + logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)") - # ========== 执行交易逻辑 ========== - balance = self.get_available_balance() - if balance is None: - balance = 0 - trade_size = balance * self.risk_percent + # ========== 执行交易逻辑 ========== + balance = self.get_available_balance() + if balance is None: + balance = 0 + trade_size = balance * self.risk_percent - if direction == "long": - if self.start == -1: # 当前空仓,平空开多 - logger.info("平空仓,反手开多") - self.平仓() - time.sleep(1) - self.开单(marketPriceLongOrder=1, size=trade_size) - elif self.start == 0: # 当前无仓,直接开多 - logger.info("无仓位,开多") - self.开单(marketPriceLongOrder=1, size=trade_size) - # 已有多仓则不操作 + executed = False + if direction == "long": + if self.start == -1: # 当前空仓,平空开多 + logger.info("📈 平空仓,反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif self.start == 0: # 当前无仓,直接开多 + logger.info("📈 无仓位,开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + else: + logger.info("已有多仓,忽略做多信号") - elif direction == "short": - if self.start == 1: # 当前多仓,平多开空 - logger.info("平多仓,反手开空") - self.平仓() - time.sleep(1) - self.开单(marketPriceLongOrder=-1, size=trade_size) - elif self.start == 0: # 当前无仓,直接开空 - logger.info("无仓位,开空") - self.开单(marketPriceLongOrder=-1, size=trade_size) - # 已有空仓则不操作 + elif direction == "short": + if self.start == 1: # 当前多仓,平多开空 + logger.info("📉 平多仓,反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + elif self.start == 0: # 当前无仓,直接开空 + logger.info("📉 无仓位,开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + else: + logger.info("已有空仓,忽略做空信号") - # ========== 发送持仓信息 ========== - self._send_position_message(kline_data[-1]) + # 记录本次触发,避免同一K线重复触发 + if executed: + self.last_trigger_kline_id = curr_kline['id'] + self.last_trigger_direction = direction + # 交易后立即发送持仓信息 + self.get_position_status() + self._send_position_message(curr_kline) + last_report_time = time.time() - self.pbar.reset() + logger.info(f"{'='*50}") + + else: + # 没有信号时,显示实时价格 + logger.debug(f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") + + # ========== 定时发送持仓信息 ========== + current_time = time.time() + if current_time - last_report_time >= report_interval: + if self.get_position_status(): + self._send_position_message(kline_data[-1]) + last_report_time = current_time + + # 等待下次检测 + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(self.check_interval) def _send_position_message(self, latest_kline): """发送持仓信息到钉钉"""