From 6af24582412107fca3a39eba241c7ac1a098e532 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Tue, 3 Feb 2026 11:46:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitmart/三分之一策略-5分钟交易.py | 469 ++++++++++++++++++++++++++++++ 1 file changed, 469 insertions(+) create mode 100644 bitmart/三分之一策略-5分钟交易.py diff --git a/bitmart/三分之一策略-5分钟交易.py b/bitmart/三分之一策略-5分钟交易.py new file mode 100644 index 0000000..66143cc --- /dev/null +++ b/bitmart/三分之一策略-5分钟交易.py @@ -0,0 +1,469 @@ +""" +BitMart 三分之一策略 — 5分钟K线交易 + +策略规则(与 交易/bitmart-三分之一策略交易.py、回测数据-三分之一策略-5分钟精准版 一致): +1. 触发价格(基于前一根有效K线,实体>=0.1): + - 做多触发价 = 收盘价 + 实体/3(从收盘价往上涨1/3) + - 做空触发价 = 收盘价 - 实体/3(从收盘价往下跌1/3) + +2. 信号触发: + - 当前5分钟K线最高价 >= 做多触发价 → 做多信号 + - 当前5分钟K线最低价 <= 做空触发价 → 做空信号 + +3. 执行逻辑: + - 做多时遇到做空信号 -> 平多并反手开空 + - 做空时遇到做多信号 -> 平空并反手开多 + - 同一根5分钟K线内只交易一次 + +4. 同根K线多空都触及时,用开盘价与触发价距离判断先后。 +""" +import random +import time +import datetime +from concurrent.futures import ThreadPoolExecutor + +from tqdm import tqdm +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract + +try: + from 交易.tools import send_dingtalk_message +except ImportError: + send_dingtalk_message = None + +ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk") + + +class BitmartOneThirdStrategy: + """三分之一策略:触发价 = 收盘价 ± 实体/3,5分钟K线""" + + def __init__(self, bit_id): + self.page: ChromiumPage | None = None + + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "合约交易" + + self.contract_symbol = "ETHUSDT" + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓: -1 空, 0 无, 1 多 + self.direction = None + self.pbar = tqdm(total=5, desc="等待K线", ncols=80) + self.last_kline_time = None + + self.leverage = "100" + self.open_type = "cross" + self.risk_percent = 0.01 + + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + self.bit_id = bit_id + + self.min_body_size = 0.1 # 前一根有效K线实体下限 + self.kline_step = 5 # 5分钟K线 + self.check_interval = 3 + self.last_trigger_kline_id = None + self.last_trigger_direction = None + self.last_trade_kline_id = None + + # ========================= 三分之一策略核心 ========================= + + def is_bullish(self, c): + return float(c['close']) > float(c['open']) + + def get_body_size(self, candle): + return abs(float(candle['open']) - float(candle['close'])) + + def find_valid_prev_bar(self, all_data, current_idx): + """找到当前K线之前最近一根实体>=min_body_size的K线""" + if current_idx <= 0: + return None, None + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + if self.get_body_size(prev) >= self.min_body_size: + return i, prev + return None, None + + def get_one_third_levels(self, prev): + """ + 做多触发价 = 收盘价 + 实体/3 + 做空触发价 = 收盘价 - 实体/3 + """ + p_open = float(prev['open']) + p_close = float(prev['close']) + body = abs(p_open - p_close) + if body < 0.001: + return None, None + long_trigger = p_close + body / 3 + short_trigger = p_close - body / 3 + return long_trigger, short_trigger + + def check_realtime_trigger(self, kline_data): + """ + 检查当前5分钟K线是否触发信号。 + 做多触发价 = 前一根收盘价 + 实体/3,做空 = 收盘价 - 实体/3。 + 同根多空都触发时用开盘价与触发价距离判断先后。 + 返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None) + """ + if len(kline_data) < 2: + return None, None, None, None + + curr = kline_data[-1] + curr_kline_id = curr['id'] + curr_high = float(curr['high']) + curr_low = float(curr['low']) + curr_open = float(curr['open']) + + valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1) + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = self.get_one_third_levels(prev) + if long_trigger is None: + return None, None, None, None + + long_triggered = curr_high >= long_trigger + short_triggered = curr_low <= short_trigger + both_triggered = long_triggered and short_triggered + + direction = None + trigger_price = None + + if both_triggered: + dist_to_long = abs(long_trigger - curr_open) + dist_to_short = abs(short_trigger - curr_open) + if dist_to_short <= dist_to_long: + direction = 'short' + trigger_price = short_trigger + else: + direction = 'long' + trigger_price = long_trigger + elif short_triggered: + direction = 'short' + trigger_price = short_trigger + elif long_triggered: + direction = 'long' + trigger_price = long_trigger + + if direction is None: + return None, None, None, None + + if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: + return None, None, None, None + + return direction, trigger_price, prev, curr + + # ========================= API ========================= + + def get_klines(self): + """获取最近5分钟K线""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, + start_time=end_time - 3600 * 3, + end_time=end_time + )[0]["data"] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + if "429" in str(e) or "too many requests" in str(e).lower(): + logger.warning(f"API限流,等待60秒: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding("获取K线异常", error=True) + return None + + def get_current_price(self): + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, + start_time=end_time - 3600 * 3, + end_time=end_time + )[0] + if response.get('code') == 1000: + return float(response['data'][-1]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + try: + response = self.contractAPI.get_assets_detail()[0] + if response.get('code') == 1000: + data = response.get('data') + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + if isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response.get('code') == 1000: + positions = response.get('data') or [] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0].get('position_type') == 1 else -1 + self.open_avg_price = positions[0].get('open_avg_price') + self.current_amount = positions[0].get('current_amount') + self.position_cross = positions[0].get("position_cross") + return True + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response.get('code') == 1000: + logger.success(f"全仓 {self.leverage}x 杠杆设置成功") + return True + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ========================= 浏览器 ========================= + + def open_browser(self): + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except Exception: + return False + + def close_extra_tabs(self): + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except Exception: + return False + + def click_safe(self, xpath, sleep=0.5): + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except Exception: + return False + + def 平仓(self): + logger.info("执行平仓...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding("执行平仓") + + def 开单(self, marketPriceLongOrder=0, size=None): + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str},金额: {size}") + try: + if marketPriceLongOrder == -1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + elif marketPriceLongOrder == 1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + self.ding(f"执行{direction_str},金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + prefix = "❌三分之一策略:" if error else "🔔三分之一策略:" + full_msg = f"{prefix}{msg}" + if error: + logger.error(msg) + for _ in range(3): + ding_executor.submit(self._send_ding_safe, full_msg) + else: + logger.info(msg) + ding_executor.submit(self._send_ding_safe, full_msg) + + def _send_ding_safe(self, msg): + try: + if send_dingtalk_message: + send_dingtalk_message(msg) + except Exception as e: + logger.warning(f"钉钉发送失败: {e}") + + # ========================= 主循环 ========================= + + def action(self): + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + if not self.open_browser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + self.close_extra_tabs() + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + logger.info("三分之一策略(5分钟K线)开始监测") + + last_report_time = 0 + report_interval = 300 + + while True: + for _ in range(5): + if self.open_browser(): + break + time.sleep(5) + else: + self.ding("打开浏览器失败!", error=True) + return + + self.close_extra_tabs() + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + try: + kline_data = self.get_klines() + if not kline_data or len(kline_data) < 3: + logger.warning("K线数据不足,等待重试...") + time.sleep(self.check_interval) + continue + + curr = kline_data[-1] + curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S') + + if not self.get_position_status(): + logger.warning("获取仓位失败,使用缓存") + + direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) + + if direction: + curr_kline_id = curr_kline['id'] + if self.last_trade_kline_id == curr_kline_id: + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + # 信号与持仓同向则不操作 + if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1): + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') + prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" + prev_body = self.get_body_size(valid_prev) + + logger.info(f"{'='*50}") + logger.info(f"🚨 检测到{direction}信号 触发价={trigger_price:.2f}") + logger.info(f" 有效前一根[{prev_time}] {prev_type} 实体={prev_body:.2f}") + logger.info(f" 当前5分钟K线 H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}") + logger.info(f" 当前持仓: {self.start} (1=多 -1=空 0=无)") + + balance = self.get_available_balance() + trade_size = (balance or 0) * self.risk_percent + executed = False + + if direction == "long": + if self.start == -1: + logger.info("📈 平空反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif self.start == 0: + logger.info("📈 无仓位开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif direction == "short": + if self.start == 1: + logger.info("📉 平多反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + elif self.start == 0: + logger.info("📉 无仓位开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + if executed: + self.last_trade_kline_id = curr_kline_id + self.get_position_status() + logger.info(f"{'='*50}") + else: + logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") + + if time.time() - last_report_time >= report_interval: + last_report_time = time.time() + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(self.check_interval) + time.sleep(3) + + if random.randint(1, 10) > 7: + self.page.close() + time.sleep(15) + + +if __name__ == '__main__': + try: + BitmartOneThirdStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action() + except KeyboardInterrupt: + logger.info("程序被用户中断") + finally: + ding_executor.shutdown(wait=True)