From b63715de5afc4730991df2125095557adeca8e7a Mon Sep 17 00:00:00 2001 From: 27942 Date: Wed, 28 Jan 2026 16:30:19 +0800 Subject: [PATCH] 23423423 --- 交易/bitmart-三分之一策略交易.py | 557 +++++++++++++++++++++++++++++++ 1 file changed, 557 insertions(+) create mode 100644 交易/bitmart-三分之一策略交易.py diff --git a/交易/bitmart-三分之一策略交易.py b/交易/bitmart-三分之一策略交易.py new file mode 100644 index 0000000..0bb5ec0 --- /dev/null +++ b/交易/bitmart-三分之一策略交易.py @@ -0,0 +1,557 @@ +""" +BitMart 三分之一回归策略交易 +使用5分钟K线周期 + +策略规则: +1. 开多条件: + - 找到实体>=0.1的前一根K线(如果前一根实体<0.1,继续往前找) + - 前一根是阴线(close < open) + - 当前K线的最高价(包括影线)涨到前一根阴线实体的 1/3 处 + - 即:当前high >= prev_close + (prev_open - prev_close) / 3 + +2. 平多/开空条件: + - 找到实体>=0.1的前一根K线 + - 前一根是阳线(close > open) + - 当前K线的最低价(包括影线)跌到前一根阳线实体的 1/3 处 + - 即:当前low <= prev_close - (prev_close - prev_open) / 3 + +3. 执行逻辑: + - 做多时遇到开空信号 -> 平多并反手开空 + - 做空时遇到开多信号 -> 平空并反手开多 +""" + +import time +import datetime + +from tqdm import tqdm +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract +from 交易.tools import send_dingtalk_message + + +class BitmartOneThirdStrategy: + def __init__(self, bit_id): + + self.page: ChromiumPage | None = None + + self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.memo = "合约交易" + + self.contract_symbol = "ETHUSDT" + + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 + self.direction = None + + self.pbar = tqdm(total=5, desc="等待K线", ncols=80) # 5分钟周期 + + self.last_kline_time = None + + self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位) + self.open_type = "cross" # 全仓模式 + self.risk_percent = 0.01 # 每次开仓使用可用余额的 1% + + self.open_avg_price = None # 开仓价格 + self.current_amount = None # 持仓量 + + self.bit_id = bit_id + + # 三分之一策略参数 + self.min_body_size = 0.1 # 最小实体大小 + self.kline_step = 5 # K线周期(5分钟) + self.kline_count = 20 # 获取的K线数量,用于向前查找有效K线 + + # ========================= 三分之一策略核心函数 ========================= + + def is_bullish(self, c): + """判断阳线""" + return float(c['close']) > float(c['open']) + + def is_bearish(self, c): + """判断阴线""" + return float(c['close']) < float(c['open']) + + def get_body_size(self, candle): + """计算K线实体大小(绝对值)""" + return abs(float(candle['open']) - float(candle['close'])) + + def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1): + """ + 从当前索引往前查找,直到找到实体>=min_body_size的K线 + 返回:(有效K线的索引, K线数据) 或 (None, None) + """ + if current_idx <= 0: + return None, None + + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + body_size = self.get_body_size(prev) + if body_size >= min_body_size: + return i, prev + + return None, None + + def get_one_third_level(self, prev): + """ + 计算前一根K线实体的 1/3 回归位置 + 返回:(触发价格, 方向) + - 如果前一根是阴线:返回向上1/3价格,方向为 'long' + - 如果前一根是阳线:返回向下1/3价格,方向为 'short' + """ + p_open = float(prev['open']) + p_close = float(prev['close']) + + if self.is_bearish(prev): # 阴线,向上回归 + # 阴线实体 = open - close + body = p_open - p_close + trigger_price = p_close + body / 3 # 从低点涨 1/3 + return trigger_price, 'long' + + elif self.is_bullish(prev): # 阳线,向下回归 + # 阳线实体 = close - open + body = p_close - p_open + trigger_price = p_close - body / 3 # 从高点跌 1/3 + return trigger_price, 'short' + + return None, None + + def check_trigger(self, all_data, current_idx): + """ + 检查当前K线是否触发了交易信号 + 返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None) + 规则:考虑影线部分(high/low),因为实际交易中价格会到达影线位置 + """ + if current_idx <= 0: + return None, None, None + + curr = all_data[current_idx] + + # 查找实体>=min_body_size的前一根K线 + valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size) + + if prev is None: + return None, None, None + + trigger_price, direction = self.get_one_third_level(prev) + + if trigger_price is None: + return None, None, None + + # 使用影线部分(high/low)来判断 + c_high = float(curr['high']) + c_low = float(curr['low']) + + # 做多:前一根阴线,当前K线的最高价(包括影线)达到触发价格 + if direction == 'long' and c_high >= trigger_price: + return 'long', trigger_price, valid_prev_idx + + # 做空:前一根阳线,当前K线的最低价(包括影线)达到触发价格 + if direction == 'short' and c_low <= trigger_price: + return 'short', trigger_price, valid_prev_idx + + return None, None, None + + # ========================= BitMart API 函数 ========================= + + def get_klines(self): + """获取最近N根5分钟K线""" + try: + end_time = int(time.time()) + # 获取足够多的K线用于向前查找有效K线 + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, # 5分钟 + start_time=end_time - 3600 * 3, # 取最近3小时 + end_time=end_time + )[0]["data"] + + # 每根: [timestamp, open, high, low, close, volume] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + error_msg = str(e) + # 检查是否是429限流错误 + if "429" in error_msg or "too many requests" in error_msg.lower(): + logger.warning(f"API限流,等待60秒后重试: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding(msg="获取K线异常", error=True) + return None + + def get_current_price(self): + """获取当前最新价格""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, # 1分钟 + start_time=end_time - 3600 * 3, + end_time=end_time + )[0] + if response['code'] == 1000: + return float(response['data'][-1]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + """获取合约账户可用USDT余额""" + try: + response = self.contractAPI.get_assets_detail()[0] + if response['code'] == 1000: + data = response['data'] + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + elif isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + """获取当前持仓方向""" + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response['code'] == 1000: + positions = response['data'] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0]['position_type'] == 1 else -1 + self.open_avg_price = positions[0]['open_avg_price'] + self.current_amount = positions[0]['current_amount'] + self.position_cross = positions[0]["position_cross"] + return True + else: + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + """程序启动时设置全仓 + 高杠杆""" + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response['code'] == 1000: + logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") + return True + else: + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ========================= 浏览器自动化函数 ========================= + + def openBrowser(self): + """打开 TGE 对应浏览器实例""" + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except: + return False + + def close_extra_tabs_in_browser(self): + """关闭多余 tab""" + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except: + return False + + def click_safe(self, xpath, sleep=0.5): + """安全点击""" + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except: + return False + + def 平仓(self): + """市价平仓""" + logger.info("执行平仓操作...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding(msg="执行平仓操作") + + def 开单(self, marketPriceLongOrder=0, size=None): + """ + 市价开单 + marketPriceLongOrder: 1 做多, -1 做空 + """ + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str}操作,金额: {size}") + + try: + if marketPriceLongOrder == -1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + elif marketPriceLongOrder == 1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + + self.ding(msg=f"执行{direction_str}操作,金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + """统一消息格式""" + prefix = "❌三分之一策略:" if error else "🔔三分之一策略:" + if error: + logger.error(msg) + for i in range(10): + send_dingtalk_message(f"{prefix}{msg}") + else: + logger.info(msg) + send_dingtalk_message(f"{prefix}{msg}") + + # ========================= 时间计算函数 ========================= + + def get_now_time(self): + """获取当前5分钟整点时间戳""" + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + # 计算距离当前时间最近的5分钟整点 + minute = current_datetime.minute + target_minute = (minute // 5) * 5 # 向下取整到5分钟 + target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0) + + return int(target_datetime.timestamp()) + + def get_time_to_next_5min(self): + """获取距离下一个5分钟的秒数""" + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + minute = current_datetime.minute + next_5min = ((minute // 5) + 1) * 5 + if next_5min >= 60: + next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1) + else: + next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0) + + return (next_datetime - current_datetime).total_seconds() + + # ========================= 主运行函数 ========================= + + def action(self): + """主运行逻辑""" + # 启动时设置全仓高杠杆 + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + return + + # 1. 打开浏览器 + if not self.openBrowser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功') + else: + logger.info('关闭多余标签页失败') + + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80) + + self.time_start = None # 时间状态,避免同一时段重复处理 + + while True: + # 更新进度条 + current_time = time.localtime() + current_minute = current_time.tm_min + self.pbar.n = current_minute % 5 + self.pbar.refresh() + + # 检查是否已处理过当前时间段 + if self.time_start == self.get_now_time(): + time.sleep(3) + continue + + # 获取K线数据 + kline_data = self.get_klines() + if not kline_data: + logger.warning("获取K线数据失败") + time.sleep(5) + continue + + # 检查数据是否是最新的 + if len(kline_data) < 3: + logger.warning("K线数据不足") + time.sleep(5) + continue + + # 判断最新K线时间 + latest_kline_time = kline_data[-1]['id'] + if self.get_now_time() != latest_kline_time: + time.sleep(3) + continue + + self.time_start = self.get_now_time() + + # 获取持仓状态 + if not self.get_position_status(): + logger.warning("获取仓位信息失败") + self.ding(msg="获取仓位信息失败!", error=True) + continue + + logger.info(f"当前持仓状态: {self.start} (1=多, -1=空, 0=无)") + + # ========== 三分之一策略信号检测 ========== + current_idx = len(kline_data) - 1 + direction, trigger_price, valid_prev_idx = self.check_trigger(kline_data, current_idx) + + if direction: + # 获取有效前一根K线用于日志 + valid_prev = kline_data[valid_prev_idx] if valid_prev_idx is not None else None + curr = kline_data[current_idx] + + if valid_prev: + prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') + curr_time = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M') + prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" + prev_body = self.get_body_size(valid_prev) + + logger.info(f"检测到{direction}信号,触发价格: {trigger_price:.2f}") + logger.info(f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f}") + logger.info(f" 当前根[{curr_time}]: H={curr['high']:.2f} L={curr['low']:.2f}") + + # ========== 执行交易逻辑 ========== + balance = self.get_available_balance() + if balance is None: + balance = 0 + trade_size = balance * self.risk_percent + + if direction == "long": + if self.start == -1: # 当前空仓,平空开多 + logger.info("平空仓,反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + elif self.start == 0: # 当前无仓,直接开多 + logger.info("无仓位,开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + # 已有多仓则不操作 + + elif direction == "short": + if self.start == 1: # 当前多仓,平多开空 + logger.info("平多仓,反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + elif self.start == 0: # 当前无仓,直接开空 + logger.info("无仓位,开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + # 已有空仓则不操作 + + # ========== 发送持仓信息 ========== + self._send_position_message(kline_data[-1]) + + self.pbar.reset() + + def _send_position_message(self, latest_kline): + """发送持仓信息到钉钉""" + current_price = float(latest_kline["close"]) + balance = self.get_available_balance() + self.balance = balance if balance is not None else 0.0 + + if self.start != 0: + open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0 + current_amount = float(self.current_amount) if self.current_amount else 0.0 + position_cross = float(self.position_cross) if hasattr(self, 'position_cross') and self.position_cross else 0.0 + + # 计算浮动盈亏 + if self.start == 1: # 多头 + unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price) + else: # 空头 + unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price) + + # 计算收益率 + if open_avg_price > 0: + if self.start == 1: + pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 + else: + pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000 + rate_str = f" ({pnl_rate:+.2f}%)" + else: + rate_str = "" + + direction_str = "空" if self.start == -1 else "多" + pnl_str = f"{unrealized_pnl:+.2f} USDT" + + msg = ( + f"【三分之一策略 {self.contract_symbol} 5分钟】\n" + f"当前方向:{direction_str}\n" + f"当前现价:{current_price:.2f} USDT\n" + f"开仓均价:{open_avg_price:.2f} USDT\n" + f"持仓量(eth):{float(current_amount) / 1000} eth\n" + f"持仓量(usdt):{position_cross} usdt\n" + f"浮动盈亏:{pnl_str}{rate_str}\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + else: + msg = ( + f"【三分之一策略 {self.contract_symbol} 5分钟】\n" + f"当前方向:无\n" + f"当前现价:{current_price:.2f} USDT\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + + self.ding(msg=msg) + + +if __name__ == '__main__': + # 启动三分之一策略交易 + BitmartOneThirdStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()