From 15072ae9e41ad655c08c18b482380a79cda18d32 Mon Sep 17 00:00:00 2001 From: ddrwode Date: Sun, 1 Feb 2026 23:14:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E4=BA=86=E4=BA=94=E5=88=86?= =?UTF-8?q?=E4=B9=8B=E4=B8=80=E4=BA=A4=E6=98=93=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 交易/bitmart-五分之一策略交易.py | 559 +++++++++++++++++++++++++++++++ 1 file changed, 559 insertions(+) create mode 100644 交易/bitmart-五分之一策略交易.py diff --git a/交易/bitmart-五分之一策略交易.py b/交易/bitmart-五分之一策略交易.py new file mode 100644 index 0000000..f8c5660 --- /dev/null +++ b/交易/bitmart-五分之一策略交易.py @@ -0,0 +1,559 @@ +""" +BitMart 五分之一回归策略交易(精准版) +使用3分钟K线周期计算触发价格,实时监测;同根K线内多空都触及时用1分钟K线判断先后 + +策略规则(与 bitmart/回测-三分之一策略-精准版.py 一致): +1. 触发价格计算(基于有效的前一根K线,实体>=0.1): + - 做多触发价格 = 收盘价 + 实体/5(从收盘价往上涨1/5) + - 做空触发价格 = 收盘价 - 实体/5(从收盘价往下跌1/5) + +2. 信号触发条件: + - 当前K线最高价 >= 做多触发价格 → 做多信号 + - 当前K线最低价 <= 做空触发价格 → 做空信号 + +3. 执行逻辑: + - 做多时遇到做空信号 -> 平多并反手开空 + - 做空时遇到做多信号 -> 平空并反手开多 + - 同一根3分钟K线内只交易一次 + +4. 精准判断(使用1分钟K线): + - 当当前3分钟K线同时触及做多和做空价格时,拉取该3分钟对应的3根1分钟K线判断哪个方向先被触发 +""" + +import time +import datetime + +from tqdm import tqdm +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract +from 交易.tools import send_dingtalk_message + + +class BitmartOneFifthStrategy: + def __init__(self, bit_id): + self.page: ChromiumPage | None = None + + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "合约交易" + + self.contract_symbol = "ETHUSDT" + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 + self.direction = None + + self.pbar = tqdm(total=3, desc="等待K线", ncols=80) # 3分钟周期 + + self.last_kline_time = None + self.leverage = "100" + self.open_type = "cross" + self.risk_percent = 0.01 + + self.open_avg_price = None + self.current_amount = None + self.bit_id = bit_id + + # 五分之一策略参数(与回测一致) + self.min_body_size = 0.1 + self.kline_step = 3 # 3分钟K线 + self.kline_count = 20 + + self.check_interval = 3 + self.last_trigger_kline_id = None + self.last_trigger_direction = None + self.last_trade_kline_id = None + + # ========================= 五分之一策略核心 ========================= + + def is_bullish(self, c): + return float(c['close']) > float(c['open']) + + def is_bearish(self, c): + return float(c['close']) < float(c['open']) + + def get_body_size(self, candle): + return abs(float(candle['open']) - float(candle['close'])) + + def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1): + if current_idx <= 0: + return None, None + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + if self.get_body_size(prev) >= min_body_size: + return i, prev + return None, None + + def get_one_fifth_levels(self, prev): + """ + 计算前一根K线实体的 1/5 双向触发价格 + 做多触发 = 收盘价 + 实体/5,做空触发 = 收盘价 - 实体/5 + """ + p_open = float(prev['open']) + p_close = float(prev['close']) + body = abs(p_open - p_close) + if body < 0.001: + return None, None + long_trigger = p_close + body / 5 + short_trigger = p_close - body / 5 + return long_trigger, short_trigger + + def get_1m_bars_for_3m_bar(self, bar_3m): + """获取当前3分钟K线对应的3根1分钟K线(用于同根内多空都触发时判断先后)""" + try: + start_ts = int(bar_3m['id']) + end_ts = start_ts + 3 * 60 # 秒 + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, + start_time=start_ts, + end_time=end_ts + )[0] + if response.get('code') != 1000: + return [] + data = response.get('data', []) + out = [] + for k in data: + out.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + out.sort(key=lambda x: x['id']) + return out + except Exception as e: + logger.warning(f"获取1分钟K线失败: {e}") + return [] + + def determine_trigger_order_by_1m(self, bars_1m, long_trigger, short_trigger): + """ + 用1分钟K线判断在3分钟周期内先触发做多还是做空 + 返回 'long', 'short' 或 None + """ + if not bars_1m: + return None + for bar in bars_1m: + high = float(bar['high']) + low = float(bar['low']) + open_price = float(bar['open']) + long_ok = high >= long_trigger + short_ok = low <= short_trigger + if long_ok and not short_ok: + return 'long' + if short_ok and not long_ok: + return 'short' + if long_ok and short_ok: + d_long = abs(long_trigger - open_price) + d_short = abs(short_trigger - open_price) + return 'short' if d_short < d_long else 'long' + return None + + def check_realtime_trigger(self, kline_data): + """ + 实时检测当前3分钟K线是否触发信号 + 若多空都触发,优先用1分钟K线判断先后,否则用开盘价距离 + 返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None) + """ + if len(kline_data) < 2: + return None, None, None, None + + curr = kline_data[-1] + curr_kline_id = curr['id'] + valid_prev_idx, prev = self.find_valid_prev_bar( + kline_data, len(kline_data) - 1, self.min_body_size + ) + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = self.get_one_fifth_levels(prev) + if long_trigger is None: + return None, None, None, None + + c_high = float(curr['high']) + c_low = float(curr['low']) + long_triggered = c_high >= long_trigger + short_triggered = c_low <= short_trigger + + direction = None + trigger_price = None + + if long_triggered and short_triggered: + bars_1m = self.get_1m_bars_for_3m_bar(curr) + if bars_1m: + direction = self.determine_trigger_order_by_1m( + bars_1m, long_trigger, short_trigger + ) + trigger_price = long_trigger if direction == 'long' else short_trigger + if direction is None: + c_open = float(curr['open']) + d_long = abs(long_trigger - c_open) + d_short = abs(short_trigger - c_open) + direction = 'short' if d_short <= d_long else 'long' + trigger_price = long_trigger if direction == 'long' else short_trigger + elif short_triggered: + direction = 'short' + trigger_price = short_trigger + elif long_triggered: + direction = 'long' + trigger_price = long_trigger + + if direction is None: + return None, None, None, None + if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: + return None, None, None, None + + return direction, trigger_price, prev, curr + + # ========================= BitMart API ========================= + + def get_klines(self): + """获取最近3分钟K线""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, + start_time=end_time - 3600 * 3, + end_time=end_time + )[0]["data"] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + error_msg = str(e) + if "429" in error_msg or "too many requests" in error_msg.lower(): + logger.warning(f"API限流,等待60秒后重试: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding(msg="获取K线异常", error=True) + return None + + def get_current_price(self): + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, + start_time=end_time - 3600 * 3, + end_time=end_time + )[0] + if response['code'] == 1000: + return float(response['data'][-1]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + try: + response = self.contractAPI.get_assets_detail()[0] + if response['code'] == 1000: + data = response['data'] + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + if isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response['code'] == 1000: + positions = response['data'] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0]['position_type'] == 1 else -1 + self.open_avg_price = positions[0]['open_avg_price'] + self.current_amount = positions[0]['current_amount'] + self.position_cross = positions[0]["position_cross"] + return True + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response['code'] == 1000: + logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") + return True + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ========================= 浏览器 ========================= + + def openBrowser(self): + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except Exception: + return False + + def close_extra_tabs_in_browser(self): + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except Exception: + return False + + def click_safe(self, xpath, sleep=0.5): + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except Exception: + return False + + def 平仓(self): + logger.info("执行平仓操作...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding(msg="执行平仓操作") + + def 开单(self, marketPriceLongOrder=0, size=None): + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str}操作,金额: {size}") + size = 25 + try: + if marketPriceLongOrder == -1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + elif marketPriceLongOrder == 1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + self.ding(msg=f"执行{direction_str}操作,金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + prefix = "❌五分之一策略:" if error else "🔔五分之一策略:" + if error: + logger.error(msg) + for i in range(10): + send_dingtalk_message(f"{prefix}{msg}") + else: + logger.info(msg) + send_dingtalk_message(f"{prefix}{msg}") + + # ========================= 主循环 ========================= + + def action(self): + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + return + if not self.openBrowser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功') + else: + logger.info('关闭多余标签页失败') + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + logger.info(f"五分之一策略(3分钟K线)开始实时监测,检测间隔: {self.check_interval}秒") + + last_report_time = 0 + report_interval = 300 + + while True: + for i in range(5): + if self.openBrowser(): + break + time.sleep(5) + else: + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功') + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + try: + kline_data = self.get_klines() + if not kline_data or len(kline_data) < 3: + logger.warning("K线数据不足,等待重试...") + time.sleep(self.check_interval) + continue + + curr = kline_data[-1] + curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S') + direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) + + if direction: + curr_kline_id = curr_kline['id'] + if self.last_trade_kline_id == curr_kline_id: + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + if not self.get_position_status(): + logger.warning("获取仓位信息失败") + time.sleep(self.check_interval) + continue + + prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') + prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" + prev_body = self.get_body_size(valid_prev) + + if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1): + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + logger.info(f"{'=' * 50}") + logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}") + logger.info( + f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}") + logger.info( + f" 当前3分钟K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}") + logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)") + + balance = self.get_available_balance() + trade_size = (balance or 0) * self.risk_percent + executed = False + + if direction == "long": + if self.start == -1: + logger.info("📈 平空仓,反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif self.start == 0: + logger.info("📈 无仓位,开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif direction == "short": + if self.start == 1: + logger.info("📉 平多仓,反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + elif self.start == 0: + logger.info("📉 无仓位,开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + if executed: + self.last_trade_kline_id = curr_kline_id + self.get_position_status() + self._send_position_message(curr_kline) + last_report_time = time.time() + logger.info(f"{'=' * 50}") + else: + logger.debug( + f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") + + if time.time() - last_report_time >= report_interval: + if self.get_position_status(): + self._send_position_message(kline_data[-1]) + last_report_time = time.time() + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(self.check_interval) + time.sleep(15) + self.page.close() + time.sleep(15) + + def _send_position_message(self, latest_kline): + current_price = float(latest_kline["close"]) + balance = self.get_available_balance() + self.balance = balance if balance is not None else 0.0 + if self.start != 0: + open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0 + current_amount = float(self.current_amount) if self.current_amount else 0.0 + position_cross = float(self.position_cross) if getattr(self, 'position_cross', None) else 0.0 + if self.start == 1: + unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price) + else: + unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price) + if open_avg_price > 0: + if self.start == 1: + pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 + else: + pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000 + rate_str = f" ({pnl_rate:+.2f}%)" + else: + rate_str = "" + direction_str = "空" if self.start == -1 else "多" + pnl_str = f"{unrealized_pnl:+.2f} USDT" + msg = ( + f"【五分之一策略 {self.contract_symbol} 3分钟】\n" + f"当前方向:{direction_str}\n" + f"当前现价:{current_price:.2f} USDT\n" + f"开仓均价:{open_avg_price:.2f} USDT\n" + f"持仓量(eth):{float(current_amount) / 1000} eth\n" + f"持仓量(usdt):{position_cross} usdt\n" + f"浮动盈亏:{pnl_str}{rate_str}\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + else: + msg = ( + f"【五分之一策略 {self.contract_symbol} 3分钟】\n" + f"当前方向:无\n" + f"当前现价:{current_price:.2f} USDT\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + self.ding(msg=msg) + + +if __name__ == '__main__': + BitmartOneFifthStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()