Merge remote-tracking branch 'origin/master'

This commit is contained in:
Administrator
2026-01-28 16:41:53 +08:00

View File

@@ -66,6 +66,11 @@ class BitmartOneThirdStrategy:
self.min_body_size = 0.1 # 最小实体大小
self.kline_step = 5 # K线周期5分钟
self.kline_count = 20 # 获取的K线数量用于向前查找有效K线
# 实时监测参数
self.check_interval = 3 # 检测间隔(秒)
self.last_trigger_kline_id = None # 记录上次触发信号的K线ID避免同一K线重复触发
self.last_trigger_direction = None # 记录上次触发的方向
# ========================= 三分之一策略核心函数 =========================
@@ -157,6 +162,49 @@ class BitmartOneThirdStrategy:
return None, None, None
def check_realtime_trigger(self, kline_data):
"""
实时检测当前K线是否触发信号
基于已收盘的K线计算触发价格用当前正在形成的K线判断
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
# 当前正在形成的K线最后一根未收盘
curr = kline_data[-1]
curr_kline_id = curr['id']
# 从倒数第二根开始往前找有效K线已收盘的K线
# current_idx 设为 len-1但我们从 len-2 开始找有效的前一根
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size)
if prev is None:
return None, None, None, None
trigger_price, direction = self.get_one_third_level(prev)
if trigger_price is None:
return None, None, None, None
# 检查是否在同一根K线内已经触发过相同方向
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
# 使用当前K线的实时高低点来判断
c_high = float(curr['high'])
c_low = float(curr['low'])
# 做多前一根阴线当前K线的最高价达到触发价格
if direction == 'long' and c_high >= trigger_price:
return 'long', trigger_price, prev, curr
# 做空前一根阳线当前K线的最低价达到触发价格
if direction == 'short' and c_low <= trigger_price:
return 'short', trigger_price, prev, curr
return None, None, None, None
# ========================= BitMart API 函数 =========================
def get_klines(self):
@@ -382,7 +430,7 @@ class BitmartOneThirdStrategy:
# ========================= 主运行函数 =========================
def action(self):
"""主运行逻辑"""
"""主运行逻辑 - 实时监测版本"""
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
@@ -404,102 +452,113 @@ class BitmartOneThirdStrategy:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80)
self.time_start = None # 时间状态,避免同一时段重复处理
logger.info(f"开始实时监测,检测间隔: {self.check_interval}")
# 用于定时发送持仓信息每5分钟发一次
last_report_time = 0
report_interval = 300 # 5分钟报告一次持仓
while True:
# 更新进度条
current_time = time.localtime()
current_minute = current_time.tm_min
self.pbar.n = current_minute % 5
self.pbar.refresh()
try:
# 获取K线数据
kline_data = self.get_klines()
if not kline_data:
logger.warning("获取K线数据失败等待重试...")
time.sleep(self.check_interval)
continue
# 检查是否已处理过当前时间段
if self.time_start == self.get_now_time():
time.sleep(3)
continue
if len(kline_data) < 3:
logger.warning("K线数据不足")
time.sleep(self.check_interval)
continue
# 获取K线数据
kline_data = self.get_klines()
if not kline_data:
logger.warning("获取K线数据失败")
time.sleep(5)
continue
# 获取当前K线信息用于日志
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
# ========== 实时信号检测 ==========
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
# 检查数据是否是最新的
if len(kline_data) < 3:
logger.warning("K线数据不足")
time.sleep(5)
continue
if direction:
# 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败")
time.sleep(self.check_interval)
continue
# 判断最新K线时间
latest_kline_time = kline_data[-1]['id']
if self.get_now_time() != latest_kline_time:
time.sleep(3)
continue
self.time_start = self.get_now_time()
# 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败")
self.ding(msg="获取仓位信息失败!", error=True)
continue
logger.info(f"当前持仓状态: {self.start} (1=多, -1=空, 0=无)")
# ========== 三分之一策略信号检测 ==========
current_idx = len(kline_data) - 1
direction, trigger_price, valid_prev_idx = self.check_trigger(kline_data, current_idx)
if direction:
# 获取有效前一根K线用于日志
valid_prev = kline_data[valid_prev_idx] if valid_prev_idx is not None else None
curr = kline_data[current_idx]
if valid_prev:
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
curr_time = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
logger.info(f"检测到{direction}信号,触发价格: {trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f}")
logger.info(f" 当前根[{curr_time}]: H={curr['high']:.2f} L={curr['low']:.2f}")
logger.info(f"{'='*50}")
logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}")
logger.info(f" 当前K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)")
# ========== 执行交易逻辑 ==========
balance = self.get_available_balance()
if balance is None:
balance = 0
trade_size = balance * self.risk_percent
# ========== 执行交易逻辑 ==========
balance = self.get_available_balance()
if balance is None:
balance = 0
trade_size = balance * self.risk_percent
if direction == "long":
if self.start == -1: # 当前空仓,平空开多
logger.info("空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
elif self.start == 0: # 当前无仓,直接开多
logger.info("无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
# 已有多仓则不操作
executed = False
if direction == "long":
if self.start == -1: # 当前空仓,平空开多
logger.info("📈 平空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0: # 当前无仓,直接开多
logger.info("📈 无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
else:
logger.info("已有多仓,忽略做多信号")
elif direction == "short":
if self.start == 1: # 当前多仓,平多开空
logger.info("平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
elif self.start == 0: # 当前无仓,直接开空
logger.info("无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
# 已有空仓则不操作
elif direction == "short":
if self.start == 1: # 当前多仓,平多开空
logger.info("📉 平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0: # 当前无仓,直接开空
logger.info("📉 无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
else:
logger.info("已有空仓,忽略做空信号")
# ========== 发送持仓信息 ==========
self._send_position_message(kline_data[-1])
# 记录本次触发避免同一K线重复触发
if executed:
self.last_trigger_kline_id = curr_kline['id']
self.last_trigger_direction = direction
# 交易后立即发送持仓信息
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
self.pbar.reset()
logger.info(f"{'='*50}")
else:
# 没有信号时,显示实时价格
logger.debug(f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
# ========== 定时发送持仓信息 ==========
current_time = time.time()
if current_time - last_report_time >= report_interval:
if self.get_position_status():
self._send_position_message(kline_data[-1])
last_report_time = current_time
# 等待下次检测
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
def _send_position_message(self, latest_kline):
"""发送持仓信息到钉钉"""