From 40683f4babbadfa066dc83a44790604b76c23a67 Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Tue, 3 Feb 2026 10:28:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitmart/五分之一策略-开盘价版-3分钟交易.py | 516 +++++++++++++++++++++ 1 file changed, 516 insertions(+) create mode 100644 bitmart/五分之一策略-开盘价版-3分钟交易.py diff --git a/bitmart/五分之一策略-开盘价版-3分钟交易.py b/bitmart/五分之一策略-开盘价版-3分钟交易.py new file mode 100644 index 0000000..e5f2cd2 --- /dev/null +++ b/bitmart/五分之一策略-开盘价版-3分钟交易.py @@ -0,0 +1,516 @@ +""" +BitMart 五分之一策略(开盘价版)— 3分钟K线交易 + +策略规则(与 1111 一致): +基于前一根有效 K 线(实体 ≥ 0.1): + 做多触发价 = 当前K线开盘价 + 实体/5(收盘价向上 1/5 实体) + 做空触发价 = 当前K线开盘价 - 实体/5(收盘价向下 1/5 实体) + +反手(若已有持仓): + 持空反手做多:价格涨到 开仓价 + 前一根实体/5 + 持多反手做空:价格跌到 开仓价 - 前一根实体/5 + +同根K线内多空都触及时,用开盘价与触发价距离判断先后。 +""" +import random +import time +import datetime +from concurrent.futures import ThreadPoolExecutor + +from tqdm import tqdm +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract + +try: + from 交易.tools import send_dingtalk_message +except ImportError: + send_dingtalk_message = None + +ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk") + + +class BitmartOneFifthOpenStrategy: + """五分之一策略(开盘价版):触发价 = 当前K线开盘价 ± 实体/5,3分钟K线""" + + def __init__(self, bit_id): + self.page: ChromiumPage | None = None + + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "合约交易" + + self.contract_symbol = "ETHUSDT" + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓: -1 空, 0 无, 1 多 + self.direction = None + self.pbar = tqdm(total=3, desc="等待K线", ncols=80) + self.last_kline_time = None + + self.leverage = "100" + self.open_type = "cross" + self.risk_percent = 0.01 + + self.open_avg_price = None + self.current_amount = None + self.bit_id = bit_id + + self.min_body_size = 0.1 # 前一根有效K线实体下限 + self.kline_step = 3 # 3分钟K线 + self.check_interval = 3 + self.last_trigger_kline_id = None + self.last_trigger_direction = None + self.last_trade_kline_id = None + + # ========================= 策略核心(开盘价版)========================= + + def get_body_size(self, candle): + return abs(float(candle['open']) - float(candle['close'])) + + def find_valid_prev_bar(self, all_data, current_idx): + """找到当前K线之前最近一根实体≥min_body_size的K线""" + if current_idx <= 0: + return None, None + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + if self.get_body_size(prev) >= self.min_body_size: + return i, prev + return None, None + + def get_trigger_levels_by_open(self, curr_open, prev_bar): + """ + 做多触发价 = 当前K线开盘价 + 实体/5 + 做空触发价 = 当前K线开盘价 - 实体/5 + 实体来自前一根有效K线。 + """ + body = self.get_body_size(prev_bar) + if body < 0.001: + return None, None + curr_open_f = float(curr_open) + long_trigger = curr_open_f + body / 5 + short_trigger = curr_open_f - body / 5 + return long_trigger, short_trigger + + def check_realtime_trigger(self, kline_data): + """ + 检查当前3分钟K线是否触发信号(开盘价版)。 + 做多触发价 = 当前K线开盘价 + 前一根实体/5 + 做空触发价 = 当前K线开盘价 - 前一根实体/5 + 若同根多空都触发,用开盘价与触发价距离判断先后。 + 返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None) + """ + if len(kline_data) < 2: + return None, None, None, None + + curr = kline_data[-1] + curr_kline_id = curr['id'] + curr_open = float(curr['open']) + curr_high = float(curr['high']) + curr_low = float(curr['low']) + + valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1) + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = self.get_trigger_levels_by_open(curr_open, prev) + if long_trigger is None: + return None, None, None, None + + long_triggered = curr_high >= long_trigger + short_triggered = curr_low <= short_trigger + both_triggered = long_triggered and short_triggered + + direction = None + trigger_price = None + + if both_triggered: + dist_to_long = abs(long_trigger - curr_open) + dist_to_short = abs(short_trigger - curr_open) + if dist_to_short <= dist_to_long: + direction = 'short' + trigger_price = short_trigger + else: + direction = 'long' + trigger_price = long_trigger + elif short_triggered: + direction = 'short' + trigger_price = short_trigger + elif long_triggered: + direction = 'long' + trigger_price = long_trigger + + if direction is None: + return None, None, None, None + + if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: + return None, None, None, None + + return direction, trigger_price, prev, curr + + def check_reverse_trigger(self, kline_data): + """ + 反手检测:持空反手做多 = 现价 >= 开仓价+前一根实体/5 + 持多反手做空 = 现价 <= 开仓价-前一根实体/5 + 使用当前价(最新K线收盘或 get_current_price)与 开仓价±实体/5 比较。 + """ + if self.start == 0 or self.open_avg_price is None: + return None, None + if len(kline_data) < 2: + return None, None + + valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1) + if prev is None: + return None, None + + body = self.get_body_size(prev) + if body < 0.001: + return None, None + + open_price_f = float(self.open_avg_price) + reverse_long_price = open_price_f + body / 5 # 持空反手做多线 + reverse_short_price = open_price_f - body / 5 # 持多反手做空线 + + curr = kline_data[-1] + curr_high = float(curr['high']) + curr_low = float(curr['low']) + + if self.start == -1 and curr_high >= reverse_long_price: + return 'long', reverse_long_price + if self.start == 1 and curr_low <= reverse_short_price: + return 'short', reverse_short_price + return None, None + + def is_bullish(self, c): + return float(c['close']) > float(c['open']) + + # ========================= API ========================= + + def get_klines(self): + """获取最近3分钟K线""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, + start_time=end_time - 3600 * 3, + end_time=end_time + )[0]["data"] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + if "429" in str(e) or "too many requests" in str(e).lower(): + logger.warning(f"API限流,等待60秒: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding("获取K线异常", error=True) + return None + + def get_current_price(self): + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, + start_time=end_time - 3600 * 3, + end_time=end_time + )[0] + if response.get('code') == 1000: + return float(response['data'][-1]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + try: + response = self.contractAPI.get_assets_detail()[0] + if response.get('code') == 1000: + data = response.get('data') + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + if isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response.get('code') == 1000: + positions = response.get('data') or [] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0].get('position_type') == 1 else -1 + self.open_avg_price = positions[0].get('open_avg_price') + self.current_amount = positions[0].get('current_amount') + self.position_cross = positions[0].get("position_cross") + return True + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response.get('code') == 1000: + logger.success(f"全仓 {self.leverage}x 杠杆设置成功") + return True + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ========================= 浏览器 ========================= + + def open_browser(self): + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except Exception: + return False + + def close_extra_tabs(self): + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except Exception: + return False + + def click_safe(self, xpath, sleep=0.5): + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except Exception: + return False + + def 平仓(self): + logger.info("执行平仓...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding("执行平仓") + + def 开单(self, marketPriceLongOrder=0, size=None): + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str},金额: {size}") + try: + if marketPriceLongOrder == -1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + elif marketPriceLongOrder == 1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + self.ding(f"执行{direction_str},金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + prefix = "❌五分之一开盘价版:" if error else "🔔五分之一开盘价版:" + full_msg = f"{prefix}{msg}" + if error: + logger.error(msg) + for _ in range(3): + ding_executor.submit(self._send_ding_safe, full_msg) + else: + logger.info(msg) + ding_executor.submit(self._send_ding_safe, full_msg) + + def _send_ding_safe(self, msg): + try: + if send_dingtalk_message: + send_dingtalk_message(msg) + except Exception as e: + logger.warning(f"钉钉发送失败: {e}") + + # ========================= 主循环 ========================= + + def action(self): + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + if not self.open_browser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + self.close_extra_tabs() + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + logger.info("五分之一策略(开盘价版,3分钟K线)开始监测") + + last_report_time = 0 + report_interval = 300 + + while True: + for _ in range(5): + if self.open_browser(): + break + time.sleep(5) + else: + self.ding("打开浏览器失败!", error=True) + return + + self.close_extra_tabs() + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + try: + kline_data = self.get_klines() + if not kline_data or len(kline_data) < 3: + logger.warning("K线数据不足,等待重试...") + time.sleep(self.check_interval) + continue + + curr = kline_data[-1] + curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S') + + if not self.get_position_status(): + logger.warning("获取仓位失败,使用缓存") + + # 1) 反手检测:有持仓时先看是否触发反手 + rev_dir, rev_price = self.check_reverse_trigger(kline_data) + if rev_dir: + curr_kline_id = curr['id'] + if self.last_trade_kline_id != curr_kline_id: + prev_idx, prev_bar = self.find_valid_prev_bar(kline_data, len(kline_data) - 1) + if prev_bar is not None: + balance = self.get_available_balance() + trade_size = (balance or 0) * self.risk_percent + logger.info(f"反手信号: {rev_dir} 触发价={rev_price:.2f}") + if rev_dir == 'long': + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + else: + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + self.last_trade_kline_id = curr_kline_id + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = rev_dir + self.get_position_status() + time.sleep(self.check_interval) + continue + + # 2) 常规开仓/反手:基于 当前K线开盘价 ± 实体/5 + direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) + + if direction: + curr_kline_id = curr_kline['id'] + if self.last_trade_kline_id == curr_kline_id: + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') + prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" + prev_body = self.get_body_size(valid_prev) + + logger.info(f"{'='*50}") + logger.info(f"🚨 检测到{direction}信号 触发价={trigger_price:.2f}") + logger.info(f" 有效前一根[{prev_time}] {prev_type} 实体={prev_body:.2f}") + logger.info(f" 当前3分钟K线 O={curr_kline['open']:.2f} H={curr_kline['high']:.2f} L={curr_kline['low']:.2f}") + logger.info(f" 当前持仓: {self.start} (1=多 -1=空 0=无)") + + balance = self.get_available_balance() + trade_size = (balance or 0) * self.risk_percent + executed = False + + if direction == "long": + if self.start == -1: + logger.info("📈 平空反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif self.start == 0: + logger.info("📈 无仓位开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif direction == "short": + if self.start == 1: + logger.info("📉 平多反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + elif self.start == 0: + logger.info("📉 无仓位开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + if executed: + self.last_trade_kline_id = curr_kline_id + self.get_position_status() + logger.info(f"{'='*50}") + else: + logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") + + if time.time() - last_report_time >= report_interval: + last_report_time = time.time() + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(self.check_interval) + time.sleep(3) + + if random.randint(1, 10) > 7: + self.page.close() + time.sleep(15) + + +if __name__ == '__main__': + try: + BitmartOneFifthOpenStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action() + except KeyboardInterrupt: + logger.info("程序被用户中断") + finally: + ding_executor.shutdown(wait=True)