diff --git a/1111 b/1111 index 7270362..a1c51e4 100644 --- a/1111 +++ b/1111 @@ -23,4 +23,25 @@ Abc12345678 248.23 -2611 \ No newline at end of file +2611 + + + +基于前一根有效 K 线(实体 ≥ 0.1): +做多触发价 = 当前 k 线开盘价 + 实体 / 5(收盘价向上 1/5 实体) +做空触发价 = 当前 k 线开盘价 - 实体 / 5(收盘价向下 1/5 实体) + +若已有持仓,在 3 分钟 K 线的第一分钟可单独检测反手: +持空反手做多:价格涨到 开仓价 + 前一根实体 / 5 +持多反手做空:价格跌到 开仓价 - 前一根实体 / 5 + + + + + + + + + + + diff --git a/me/bitmart-三分之一策略交易111111111.py b/me/bitmart-三分之一策略交易111111111.py new file mode 100644 index 0000000..f979a0d --- /dev/null +++ b/me/bitmart-三分之一策略交易111111111.py @@ -0,0 +1,715 @@ +""" +BitMart 三分之一回归策略交易(双向触发版) +使用5分钟K线周期,实时监测 + +策略规则: +1. 触发价格计算(基于有效的前一根K线,实体>=0.1): + - 做多触发价格 = 收盘价 + 实体/3(从收盘价往上涨1/3) + - 做空触发价格 = 收盘价 - 实体/3(从收盘价往下跌1/3) + +2. 信号触发条件: + - 当前K线最高价 >= 做多触发价格 → 做多信号 + - 当前K线最低价 <= 做空触发价格 → 做空信号 + +3. 执行逻辑: + - 做多时遇到做空信号 -> 平多并反手开空 + - 做空时遇到做多信号 -> 平空并反手开多 + - 同一根K线内只交易一次,防止频繁反手 + +示例1(阳线): + 前一根K线:开盘3000,收盘3100(阳线,实体=100) + - 做多触发价格 = 3100 + 33 = 3133(继续上涨做多) + - 做空触发价格 = 3100 - 33 = 3067(回调做空)←当前跌到这里就做空 + +示例2(阴线): + 前一根K线:开盘3100,收盘3000(阴线,实体=100) + - 做多触发价格 = 3000 + 33 = 3033(反弹做多) + - 做空触发价格 = 3000 - 33 = 2967(继续下跌做空) +""" + +import time +import datetime + +from tqdm import tqdm +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract +from 交易.tools import send_dingtalk_message + + +class BitmartOneThirdStrategy: + def __init__(self, bit_id): + + self.page: ChromiumPage | None = None + + self.api_key = "6104088c65a68d7e53df5d9395b67d78e555293a" + self.secret_key = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01" + self.memo = "me" + + self.contract_symbol = "ETHUSDT" + + self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) + + self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 + self.direction = None + + self.pbar = tqdm(total=5, desc="等待K线", ncols=80) # 5分钟周期 + + self.last_kline_time = None + + self.leverage = "40" # 高杠杆(全仓模式下可开更大仓位) + self.open_type = "cross" # 全仓模式 + self.risk_percent = 0.01 # 每次开仓使用可用余额的 1% + + self.open_avg_price = None # 开仓价格 + self.current_amount = None # 持仓量 + + self.bit_id = bit_id + + # 三分之一策略参数 + self.min_body_size = 0.1 # 最小实体大小 + self.kline_step = 5 # K线周期(5分钟) + self.kline_count = 20 # 获取的K线数量,用于向前查找有效K线 + + # 实时监测参数 + self.check_interval = 3 # 检测间隔(秒) + self.last_trigger_kline_id = None # 记录上次触发信号的K线ID,避免同一K线重复触发 + self.last_trigger_direction = None # 记录上次触发的方向 + self.last_trade_kline_id = None # 记录上次实际交易的K线ID,防止同一K线内频繁反手 + + # ========================= 三分之一策略核心函数 ========================= + + def is_bullish(self, c): + """判断阳线""" + return float(c['close']) > float(c['open']) + + def is_bearish(self, c): + """判断阴线""" + return float(c['close']) < float(c['open']) + + def get_body_size(self, candle): + """计算K线实体大小(绝对值)""" + return abs(float(candle['open']) - float(candle['close'])) + + def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1): + """ + 从当前索引往前查找,直到找到实体>=min_body_size的K线 + 返回:(有效K线的索引, K线数据) 或 (None, None) + """ + if current_idx <= 0: + return None, None + + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + body_size = self.get_body_size(prev) + if body_size >= min_body_size: + return i, prev + + return None, None + + def get_one_third_levels(self, prev): + """ + 计算前一根K线实体的 1/3 双向触发价格 + 返回:(做多触发价格, 做空触发价格) + + 基于收盘价计算(无论阴线阳线): + - 做多触发价格 = 收盘价 + 实体/3(从收盘价往上涨1/3实体) + - 做空触发价格 = 收盘价 - 实体/3(从收盘价往下跌1/3实体) + + 示例: + 阳线 open=3000, close=3100, 实体=100 + - 做多触发 = 3100 + 33 = 3133(继续涨) + - 做空触发 = 3100 - 33 = 3067(回调) + + 阴线 open=3100, close=3000, 实体=100 + - 做多触发 = 3000 + 33 = 3033(反弹) + - 做空触发 = 3000 - 33 = 2967(继续跌) + """ + p_open = float(prev['open']) + p_close = float(prev['close']) + + body = abs(p_open - p_close) + + if body < 0.001: # 十字星,忽略 + return None, None + + # 基于收盘价的双向触发价格 + long_trigger = p_close + body / 3 # 从收盘价往上涨1/3触发做多 + short_trigger = p_close - body / 3 # 从收盘价往下跌1/3触发做空 + + return long_trigger, short_trigger + + def check_trigger(self, all_data, current_idx): + """ + 检查当前K线是否触发了交易信号(双向检测) + 返回:(方向, 触发价格, 有效前一根K线索引) 或 (None, None, None) + + 规则: + - 当前K线高点 >= 做多触发价格 → 做多信号 + - 当前K线低点 <= 做空触发价格 → 做空信号 + """ + if current_idx <= 0: + return None, None, None + + curr = all_data[current_idx] + + # 查找实体>=min_body_size的前一根K线 + valid_prev_idx, prev = self.find_valid_prev_bar(all_data, current_idx, self.min_body_size) + + if prev is None: + return None, None, None + + long_trigger, short_trigger = self.get_one_third_levels(prev) + + if long_trigger is None: + return None, None, None + + # 使用影线部分(high/low)来判断 + c_high = float(curr['high']) + c_low = float(curr['low']) + + # 检测是否触发 + long_triggered = c_high >= long_trigger + short_triggered = c_low <= short_trigger + + # 如果两个方向都触发,判断哪个先触发 + if long_triggered and short_triggered: + c_open = float(curr['open']) + dist_to_long = abs(long_trigger - c_open) + dist_to_short = abs(short_trigger - c_open) + if dist_to_short <= dist_to_long: + return 'short', short_trigger, valid_prev_idx + else: + return 'long', long_trigger, valid_prev_idx + + if short_triggered: + return 'short', short_trigger, valid_prev_idx + + if long_triggered: + return 'long', long_trigger, valid_prev_idx + + return None, None, None + + def check_realtime_trigger(self, kline_data): + """ + 实时检测当前K线是否触发信号(双向检测) + 基于已收盘的K线计算触发价格,用当前正在形成的K线判断 + 返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None) + """ + if len(kline_data) < 2: + return None, None, None, None + + # 当前正在形成的K线(最后一根,未收盘) + curr = kline_data[-1] + curr_kline_id = curr['id'] + + # 从倒数第二根开始往前找有效K线(已收盘的K线) + valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1, self.min_body_size) + + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = self.get_one_third_levels(prev) + + if long_trigger is None: + return None, None, None, None + + # 使用当前K线的实时高低点来判断 + c_high = float(curr['high']) + c_low = float(curr['low']) + + # 检测是否触发 + long_triggered = c_high >= long_trigger + short_triggered = c_low <= short_trigger + + # 确定触发方向 + direction = None + trigger_price = None + + if long_triggered and short_triggered: + # 两个方向都触发,判断哪个先(距离开盘价更近的先触发) + c_open = float(curr['open']) + dist_to_long = abs(long_trigger - c_open) + dist_to_short = abs(short_trigger - c_open) + if dist_to_short <= dist_to_long: + direction = 'short' + trigger_price = short_trigger + else: + direction = 'long' + trigger_price = long_trigger + elif short_triggered: + direction = 'short' + trigger_price = short_trigger + elif long_triggered: + direction = 'long' + trigger_price = long_trigger + + if direction is None: + return None, None, None, None + + # 检查是否在同一根K线内已经触发过相同方向 + if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: + return None, None, None, None + + return direction, trigger_price, prev, curr + + # ========================= BitMart API 函数 ========================= + + def get_klines(self): + """获取最近N根5分钟K线""" + try: + end_time = int(time.time()) + # 获取足够多的K线用于向前查找有效K线 + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, # 5分钟 + start_time=end_time - 3600 * 3, # 取最近3小时 + end_time=end_time + )[0]["data"] + + # 每根: [timestamp, open, high, low, close, volume] + formatted = [] + for k in response: + formatted.append({ + 'id': int(k["timestamp"]), + 'open': float(k["open_price"]), + 'high': float(k["high_price"]), + 'low': float(k["low_price"]), + 'close': float(k["close_price"]) + }) + formatted.sort(key=lambda x: x['id']) + return formatted + except Exception as e: + error_msg = str(e) + # 检查是否是429限流错误 + if "429" in error_msg or "too many requests" in error_msg.lower(): + logger.warning(f"API限流,等待60秒后重试: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding(msg="获取K线异常", error=True) + return None + + def get_current_price(self): + """获取当前最新价格""" + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, # 1分钟 + start_time=end_time - 3600 * 3, + end_time=end_time + )[0] + if response['code'] == 1000: + return float(response['data'][-1]["close_price"]) + return None + except Exception as e: + logger.error(f"获取价格异常: {e}") + return None + + def get_available_balance(self): + """获取合约账户可用USDT余额""" + try: + response = self.contractAPI.get_assets_detail()[0] + if response['code'] == 1000: + data = response['data'] + if isinstance(data, dict): + return float(data.get('available_balance', 0)) + elif isinstance(data, list): + for asset in data: + if asset.get('currency') == 'USDT': + return float(asset.get('available_balance', 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + """获取当前持仓方向""" + try: + response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] + if response['code'] == 1000: + positions = response['data'] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0]['position_type'] == 1 else -1 + self.open_avg_price = positions[0]['open_avg_price'] + self.current_amount = positions[0]['current_amount'] + self.position_cross = positions[0]["position_cross"] + return True + else: + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + """程序启动时设置全仓 + 高杠杆""" + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type + )[0] + if response['code'] == 1000: + logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") + return True + else: + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ========================= 浏览器自动化函数 ========================= + + def openBrowser(self): + """打开 TGE 对应浏览器实例""" + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except: + return False + + def close_extra_tabs_in_browser(self): + """关闭多余 tab""" + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except: + return False + + def click_safe(self, xpath, sleep=0.5): + """安全点击""" + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except: + return False + + def 平仓(self): + """市价平仓""" + logger.info("执行平仓操作...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding(msg="执行平仓操作") + + def 开单(self, marketPriceLongOrder=0, size=None): + """ + 市价开单 + marketPriceLongOrder: 1 做多, -1 做空 + """ + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str}操作,金额: {size}") + + size = 50 + try: + if marketPriceLongOrder == -1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + elif marketPriceLongOrder == 1: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(size) + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + + self.ding(msg=f"执行{direction_str}操作,金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + """统一消息格式""" + prefix = "❌三分之一策略:" if error else "🔔三分之一策略:" + if error: + logger.error(msg) + for i in range(10): + send_dingtalk_message(f"{prefix}{msg}") + else: + logger.info(msg) + send_dingtalk_message(f"{prefix}{msg}") + + # ========================= 时间计算函数 ========================= + + def get_now_time(self): + """获取当前5分钟整点时间戳""" + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + # 计算距离当前时间最近的5分钟整点 + minute = current_datetime.minute + target_minute = (minute // 5) * 5 # 向下取整到5分钟 + target_datetime = current_datetime.replace(minute=target_minute, second=0, microsecond=0) + + return int(target_datetime.timestamp()) + + def get_time_to_next_5min(self): + """获取距离下一个5分钟的秒数""" + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + minute = current_datetime.minute + next_5min = ((minute // 5) + 1) * 5 + if next_5min >= 60: + next_datetime = current_datetime.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1) + else: + next_datetime = current_datetime.replace(minute=next_5min, second=0, microsecond=0) + + return (next_datetime - current_datetime).total_seconds() + + # ========================= 主运行函数 ========================= + + def action(self): + """主运行逻辑 - 实时监测版本""" + # 启动时设置全仓高杠杆 + if not self.set_leverage(): + logger.error("杠杆设置失败,程序继续运行但可能下单失败") + return + + # 1. 打开浏览器 + if not self.openBrowser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功') + else: + logger.info('关闭多余标签页失败') + + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + logger.info(f"开始实时监测,检测间隔: {self.check_interval}秒") + + # 用于定时发送持仓信息(每5分钟发一次) + last_report_time = 0 + report_interval = 300 # 5分钟报告一次持仓 + + while True: + # 1. 打开浏览器 + for i in range(5): + if self.openBrowser(): + break + + time.sleep(5) + else: + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + + if self.close_extra_tabs_in_browser(): + logger.info('关闭多余标签页成功') + else: + logger.info('关闭多余标签页失败') + + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + try: + # 获取K线数据 + kline_data = self.get_klines() + if not kline_data: + logger.warning("获取K线数据失败,等待重试...") + time.sleep(self.check_interval) + continue + + if len(kline_data) < 3: + logger.warning("K线数据不足") + time.sleep(self.check_interval) + continue + + # 获取当前K线信息用于日志 + curr = kline_data[-1] + curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S') + + # ========== 实时信号检测 ========== + direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) + + if direction: + curr_kline_id = curr_kline['id'] + + # 检查是否在同一K线内已经交易过(防止频繁反手) + if self.last_trade_kline_id == curr_kline_id: + logger.debug(f"同一K线内已交易,跳过本次{direction}信号") + # 更新触发记录,避免重复日志 + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + # 获取持仓状态 + if not self.get_position_status(): + logger.warning("获取仓位信息失败") + time.sleep(self.check_interval) + continue + + prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M') + prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线" + prev_body = self.get_body_size(valid_prev) + + # 检查信号与持仓是否同向(避免重复日志) + if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1): + # 信号与持仓同向,静默忽略 + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + logger.info(f"{'=' * 50}") + logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}") + logger.info( + f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}") + logger.info( + f" 当前K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}") + logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)") + + # ========== 执行交易逻辑 ========== + balance = self.get_available_balance() + if balance is None: + balance = 0 + trade_size = balance * self.risk_percent + + executed = False + if direction == "long": + if self.start == -1: # 当前空仓,平空开多 + logger.info("📈 平空仓,反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif self.start == 0: # 当前无仓,直接开多 + logger.info("📈 无仓位,开多") + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + + elif direction == "short": + if self.start == 1: # 当前多仓,平多开空 + logger.info("📉 平多仓,反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + elif self.start == 0: # 当前无仓,直接开空 + logger.info("📉 无仓位,开空") + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + + # 记录本次触发 + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + + if executed: + # 记录交易K线,防止同一K线内频繁反手 + self.last_trade_kline_id = curr_kline_id + # 交易后立即发送持仓信息 + self.get_position_status() + self._send_position_message(curr_kline) + last_report_time = time.time() + + logger.info(f"{'=' * 50}") + + else: + # 没有信号时,显示实时价格 + logger.debug( + f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") + + # ========== 定时发送持仓信息 ========== + current_time = time.time() + if current_time - last_report_time >= report_interval: + if self.get_position_status(): + self._send_position_message(kline_data[-1]) + last_report_time = current_time + + # 等待下次检测 + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(self.check_interval) + + time.sleep(15) + self.page.close() + time.sleep(15) + + def _send_position_message(self, latest_kline): + """发送持仓信息到钉钉""" + current_price = float(latest_kline["close"]) + balance = self.get_available_balance() + self.balance = balance if balance is not None else 0.0 + + if self.start != 0: + open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0 + current_amount = float(self.current_amount) if self.current_amount else 0.0 + position_cross = float(self.position_cross) if hasattr(self, + 'position_cross') and self.position_cross else 0.0 + + # 计算浮动盈亏 + if self.start == 1: # 多头 + unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price) + else: # 空头 + unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price) + + # 计算收益率 + if open_avg_price > 0: + if self.start == 1: + pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 + else: + pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000 + rate_str = f" ({pnl_rate:+.2f}%)" + else: + rate_str = "" + + direction_str = "空" if self.start == -1 else "多" + pnl_str = f"{unrealized_pnl:+.2f} USDT" + + msg = ( + f"【三分之一策略 {self.contract_symbol} 5分钟】\n" + f"当前方向:{direction_str}\n" + f"当前现价:{current_price:.2f} USDT\n" + f"开仓均价:{open_avg_price:.2f} USDT\n" + f"持仓量(eth):{float(current_amount) / 1000} eth\n" + f"持仓量(usdt):{position_cross} usdt\n" + f"浮动盈亏:{pnl_str}{rate_str}\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + else: + msg = ( + f"【三分之一策略 {self.contract_symbol} 5分钟】\n" + f"当前方向:无\n" + f"当前现价:{current_price:.2f} USDT\n" + f"账户可用余额:{self.balance:.2f} usdt" + ) + + self.ding(msg=msg) + + +if __name__ == '__main__': + # 启动三分之一策略交易 + BitmartOneThirdStrategy(bit_id="62f9107d0c674925972084e282df55b3").action() diff --git a/open_fifth_strategy/README.md b/open_fifth_strategy/README.md new file mode 100644 index 0000000..7fc67ad --- /dev/null +++ b/open_fifth_strategy/README.md @@ -0,0 +1,52 @@ +# 基于开盘价的五分之一策略 + +根据 1111 中的策略规则实现的 BitMart 合约交易策略。 + +## 策略规则 + +### 触发价计算(基于前一根有效 K 线,实体 ≥ 0.1) + +- **做多触发价** = 当前 K 线开盘价 + 实体/5 +- **做空触发价** = 当前 K 线开盘价 - 实体/5 + +### 信号触发条件 + +- 当前 K 线最高价 ≥ 做多触发价 → 做多信号 +- 当前 K 线最低价 ≤ 做空触发价 → 做空信号 + +### 第一分钟反手(若已有持仓) + +- **持空反手做多**:价格涨到 开仓价 + 前一根实体/5 +- **持多反手做空**:价格跌到 开仓价 - 前一根实体/5 + +### 与原始五分之一策略的区别 + +| 项目 | 原始策略 | 本策略(基于开盘价) | +|------------|----------------|--------------------------| +| 做多触发基 | 前一根收盘价 | 当前 K 线开盘价 | +| 做空触发基 | 前一根收盘价 | 当前 K 线开盘价 | +| 反手逻辑 | 同左 | 相同 | + +## 运行方式 + +在项目根目录 `lm_code` 下执行: + +```bash +python open_fifth_strategy/main.py +``` + +或使用模块方式: + +```bash +cd /path/to/lm_code +python -m open_fifth_strategy.main +``` + +## 配置 + +在 `config.py` 中修改: + +- API 密钥 +- 合约交易对(默认 ETHUSDT) +- K 线周期(默认 3 分钟) +- 杠杆、风险比例等 diff --git a/open_fifth_strategy/__init__.py b/open_fifth_strategy/__init__.py new file mode 100644 index 0000000..94126ba --- /dev/null +++ b/open_fifth_strategy/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +"""基于开盘价的五分之一策略""" diff --git a/open_fifth_strategy/config.py b/open_fifth_strategy/config.py new file mode 100644 index 0000000..3e986b9 --- /dev/null +++ b/open_fifth_strategy/config.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +""" +基于开盘价的五分之一策略 - 配置文件 + +策略规则(来自 1111): +- 做多触发价 = 当前K线开盘价 + 前一根实体/5 +- 做空触发价 = 当前K线开盘价 - 前一根实体/5 +- 前一根有效K线:实体 >= 0.1 + +第一分钟反手: +- 持空反手做多:价格涨到 开仓价 + 前一根实体/5 +- 持多反手做空:价格跌到 开仓价 - 前一根实体/5 +""" + +# BitMart API(请勿提交敏感信息到版本库) +API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" +SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" +MEMO = "合约交易" + +# 交易参数 +CONTRACT_SYMBOL = "ETHUSDT" +KLINE_STEP = 3 # 3分钟K线 +MIN_BODY_SIZE = 0.1 # 有效K线最小实体 +CHECK_INTERVAL = 3 # 检测间隔(秒) +LEVERAGE = "100" +OPEN_TYPE = "cross" # 全仓 +RISK_PERCENT = 0.01 # 每次开仓占用可用余额的比例 + +# 反手信号价格容差(美元) +REVERSE_PRICE_TOLERANCE = 2.0 + +# 比特浏览器ID(用于网页下单) +BIT_ID = "f2320f57e24c45529a009e1541e25961" diff --git a/open_fifth_strategy/main.py b/open_fifth_strategy/main.py new file mode 100644 index 0000000..a8ca0fc --- /dev/null +++ b/open_fifth_strategy/main.py @@ -0,0 +1,665 @@ +# -*- coding: utf-8 -*- +""" +BitMart 基于开盘价的五分之一策略交易 + +策略规则(与 1111 一致): +1. 触发价格计算(基于前一根有效K线,实体>=0.1): + - 做多触发价 = 当前K线开盘价 + 实体/5 + - 做空触发价 = 当前K线开盘价 - 实体/5 + +2. 信号触发条件: + - 当前K线最高价 >= 做多触发价 → 做多信号 + - 当前K线最低价 <= 做空触发价 → 做空信号 + +3. 第一分钟反手(若已有持仓): + - 持空反手做多:价格涨到 开仓价 + 前一根实体/5 + - 持多反手做空:价格跌到 开仓价 - 前一根实体/5 + +运行方式(在项目根目录 lm_code 下): + python open_fifth_strategy/main.py +""" +import sys +from pathlib import Path + +# 确保项目根目录在路径中 +_root = Path(__file__).resolve().parent.parent +if str(_root) not in sys.path: + sys.path.insert(0, str(_root)) + +import random +import time +from concurrent.futures import ThreadPoolExecutor + +from loguru import logger +from bit_tools import openBrowser +from DrissionPage import ChromiumPage +from DrissionPage import ChromiumOptions + +from bitmart.api_contract import APIContract +from 交易.tools import send_dingtalk_message + +from open_fifth_strategy.config import ( + API_KEY, + SECRET_KEY, + MEMO, + CONTRACT_SYMBOL, + KLINE_STEP, + MIN_BODY_SIZE, + CHECK_INTERVAL, + LEVERAGE, + OPEN_TYPE, + RISK_PERCENT, + REVERSE_PRICE_TOLERANCE, + BIT_ID, +) + +ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk") + + +class OpenBasedFifthStrategy: + """基于开盘价的五分之一策略""" + + def __init__(self, bit_id=None): + self.page = None + self.api_key = API_KEY + self.secret_key = SECRET_KEY + self.memo = MEMO + self.contract_symbol = CONTRACT_SYMBOL + self.contractAPI = APIContract( + self.api_key, self.secret_key, self.memo, timeout=(5, 15) + ) + + self.start = 0 # 持仓: -1空, 0无, 1多 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + self.bit_id = bit_id or BIT_ID + + self.min_body_size = MIN_BODY_SIZE + self.kline_step = KLINE_STEP + self.check_interval = CHECK_INTERVAL + self.leverage = LEVERAGE + self.open_type = OPEN_TYPE + self.risk_percent = RISK_PERCENT + self.reverse_price_tolerance = REVERSE_PRICE_TOLERANCE + + self.last_trigger_kline_id = None + self.last_trigger_direction = None + self.last_trade_kline_id = None + self.entry_prev_body = None + self.entry_price = None + self.entry_kline_id = None + self.first_minute_reverse_executed = False + + # ==================== 策略核心(基于开盘价)==================== + + def get_body_size(self, candle): + return abs(float(candle["open"]) - float(candle["close"])) + + def find_valid_prev_bar(self, all_data, current_idx): + """找前一根有效K线(实体>=min_body_size)""" + if current_idx <= 0: + return None, None + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + if self.get_body_size(prev) >= self.min_body_size: + return i, prev + return None, None + + def get_open_based_levels(self, prev, curr_open): + """ + 基于当前K线开盘价计算触发价(与1111一致) + 做多触发 = 当前K线开盘价 + 实体/5 + 做空触发 = 当前K线开盘价 - 实体/5 + """ + body = self.get_body_size(prev) + if body < 0.001: + return None, None + curr_o = float(curr_open) + long_trigger = curr_o + body / 5 + short_trigger = curr_o - body / 5 + return long_trigger, short_trigger + + def get_1m_bars_for_3m_bar(self, bar_3m): + """获取当前3分钟K线对应的3根1分钟K线""" + try: + start_ts = int(bar_3m["id"]) + end_ts = start_ts + 3 * 60 + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=1, + start_time=start_ts, + end_time=end_ts, + )[0] + if response.get("code") != 1000: + return [] + data = response.get("data", []) + out = [] + for k in data: + out.append( + { + "id": int(k["timestamp"]), + "open": float(k["open_price"]), + "high": float(k["high_price"]), + "low": float(k["low_price"]), + "close": float(k["close_price"]), + } + ) + out.sort(key=lambda x: x["id"]) + return out + except Exception as e: + logger.warning(f"获取1分钟K线失败: {e}") + return [] + + def determine_trigger_order_by_1m(self, bars_1m, long_trigger, short_trigger): + """用1分钟K线判断先触发做多还是做空""" + if not bars_1m: + return None + for bar in bars_1m: + high = float(bar["high"]) + low = float(bar["low"]) + open_price = float(bar["open"]) + long_ok = high >= long_trigger + short_ok = low <= short_trigger + if long_ok and not short_ok: + return "long" + if short_ok and not long_ok: + return "short" + if long_ok and short_ok: + d_long = abs(long_trigger - open_price) + d_short = abs(short_trigger - open_price) + return "short" if d_short < d_long else "long" + return None + + def check_first_minute_reverse_signal(self, curr_kline, kline_data): + """ + 第一分钟反手检测(与1111一致): + - 持空反手做多:价格涨到 开仓价 + 前一根实体/5 + - 持多反手做空:价格跌到 开仓价 - 前一根实体/5 + """ + if self.start == 0: + return None, None + + curr_kline_id = curr_kline["id"] + if self.entry_kline_id != curr_kline_id: + self.first_minute_reverse_executed = False + self.entry_kline_id = curr_kline_id + + if self.first_minute_reverse_executed: + return None, None + + entry_price = self.entry_price + if entry_price is None and self.open_avg_price: + entry_price = float(self.open_avg_price) + if entry_price is None: + return None, None + + _, valid_prev = self.find_valid_prev_bar( + kline_data, len(kline_data) - 1 + ) + if valid_prev is None: + return None, None + prev_body = self.get_body_size(valid_prev) + reverse_offset = prev_body / 5 + + bars_1m = self.get_1m_bars_for_3m_bar(curr_kline) + if not bars_1m or len(bars_1m) < 1: + return None, None + + first_1m = bars_1m[0] + first_1m_high = float(first_1m["high"]) + first_1m_low = float(first_1m["low"]) + first_1m_close = float(first_1m["close"]) + + if self.start == -1: + reverse_long_trigger = entry_price + reverse_offset + if first_1m_high >= reverse_long_trigger: + if first_1m_close >= reverse_long_trigger - self.reverse_price_tolerance: + return "long", reverse_long_trigger + + elif self.start == 1: + reverse_short_trigger = entry_price - reverse_offset + if first_1m_low <= reverse_short_trigger: + if first_1m_close <= reverse_short_trigger + self.reverse_price_tolerance: + return "short", reverse_short_trigger + + return None, None + + def check_realtime_trigger(self, kline_data, current_position=0): + """ + 实时检测信号(基于当前K线开盘价计算触发价) + 返回:(方向, 触发价, 有效前一根, 当前K线) 或 (None,...) + """ + if len(kline_data) < 2: + return None, None, None, None + + curr = kline_data[-1] + curr_kline_id = curr["id"] + curr_open = curr["open"] + valid_prev_idx, prev = self.find_valid_prev_bar( + kline_data, len(kline_data) - 1 + ) + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = self.get_open_based_levels(prev, curr_open) + if long_trigger is None: + return None, None, None, None + + c_high = float(curr["high"]) + c_low = float(curr["low"]) + c_close = float(curr["close"]) + long_triggered = c_high >= long_trigger + short_triggered = c_low <= short_trigger + + direction = None + trigger_price = None + + if current_position == 1: + if short_triggered and c_close <= short_trigger + self.reverse_price_tolerance: + direction = "short" + trigger_price = short_trigger + elif current_position == -1: + if long_triggered and c_close >= long_trigger - self.reverse_price_tolerance: + direction = "long" + trigger_price = long_trigger + else: + if long_triggered and short_triggered: + bars_1m = self.get_1m_bars_for_3m_bar(curr) + if bars_1m: + direction = self.determine_trigger_order_by_1m( + bars_1m, long_trigger, short_trigger + ) + trigger_price = ( + long_trigger if direction == "long" else short_trigger + ) + if direction is None: + c_open_f = float(curr["open"]) + d_long = abs(long_trigger - c_open_f) + d_short = abs(short_trigger - c_open_f) + direction = "short" if d_short <= d_long else "long" + trigger_price = ( + long_trigger if direction == "long" else short_trigger + ) + elif short_triggered: + direction = "short" + trigger_price = short_trigger + elif long_triggered: + direction = "long" + trigger_price = long_trigger + + if direction is None: + return None, None, None, None + if ( + self.last_trigger_kline_id == curr_kline_id + and self.last_trigger_direction == direction + ): + return None, None, None, None + + return direction, trigger_price, prev, curr + + # ==================== BitMart API ==================== + + def get_klines(self): + try: + end_time = int(time.time()) + response = self.contractAPI.get_kline( + contract_symbol=self.contract_symbol, + step=self.kline_step, + start_time=end_time - 3600 * 3, + end_time=end_time, + )[0]["data"] + formatted = [] + for k in response: + formatted.append( + { + "id": int(k["timestamp"]), + "open": float(k["open_price"]), + "high": float(k["high_price"]), + "low": float(k["low_price"]), + "close": float(k["close_price"]), + } + ) + formatted.sort(key=lambda x: x["id"]) + return formatted + except Exception as e: + if "429" in str(e) or "too many requests" in str(e).lower(): + logger.warning(f"API限流,等待60秒: {e}") + time.sleep(60) + else: + logger.error(f"获取K线异常: {e}") + self.ding("获取K线异常", error=True) + return None + + def get_available_balance(self): + try: + response = self.contractAPI.get_assets_detail()[0] + if response["code"] == 1000: + data = response["data"] + if isinstance(data, dict): + return float(data.get("available_balance", 0)) + for asset in data: + if asset.get("currency") == "USDT": + return float(asset.get("available_balance", 0)) + return None + except Exception as e: + logger.error(f"余额查询异常: {e}") + return None + + def get_position_status(self): + try: + response = self.contractAPI.get_position( + contract_symbol=self.contract_symbol + )[0] + if response["code"] == 1000: + positions = response["data"] + if not positions: + self.start = 0 + self.open_avg_price = None + self.current_amount = None + self.position_cross = None + return True + self.start = 1 if positions[0]["position_type"] == 1 else -1 + self.open_avg_price = positions[0]["open_avg_price"] + self.current_amount = positions[0]["current_amount"] + self.position_cross = positions[0]["position_cross"] + return True + return False + except Exception as e: + logger.error(f"持仓查询异常: {e}") + return False + + def set_leverage(self): + try: + response = self.contractAPI.post_submit_leverage( + contract_symbol=self.contract_symbol, + leverage=self.leverage, + open_type=self.open_type, + )[0] + if response["code"] == 1000: + logger.success(f"全仓 {self.leverage}x 杠杆设置成功") + return True + logger.error(f"杠杆设置失败: {response}") + return False + except Exception as e: + logger.error(f"设置杠杆异常: {e}") + return False + + # ==================== 浏览器 ==================== + + def _open_browser(self): + try: + bit_port = openBrowser(id=self.bit_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + return True + except Exception: + return False + + def close_extra_tabs(self): + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except Exception: + return False + + def click_safe(self, xpath, sleep=0.5): + try: + ele = self.page.ele(xpath) + if not ele: + return False + ele.scroll.to_see(center=True) + time.sleep(sleep) + ele.click(by_js=True) + return True + except Exception: + return False + + def 平仓(self): + logger.info("执行平仓...") + self.click_safe('x://span[normalize-space(text()) ="市价"]') + time.sleep(0.5) + self.ding("执行平仓操作") + + def 开单(self, marketPriceLongOrder=0, size=None): + if size is None or size <= 0: + logger.warning("开单金额无效") + return False + direction_str = "做多" if marketPriceLongOrder == 1 else "做空" + logger.info(f"执行{direction_str},金额: {size}") + size = max(1, min(25, int(size))) # 限制单次下单金额 1~25 + try: + self.click_safe('x://button[normalize-space(text()) ="市价"]') + self.page.ele('x://*[@id="size_0"]').input(str(size)) + if marketPriceLongOrder == -1: + self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') + else: + self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') + self.ding(f"执行{direction_str},金额: {size}") + return True + except Exception as e: + logger.error(f"开单异常: {e}") + return False + + def ding(self, msg, error=False): + prefix = "❌开盘价五分之一:" if error else "🔔开盘价五分之一:" + full_msg = f"{prefix}{msg}" + if error: + logger.error(msg) + for _ in range(3): + ding_executor.submit(self._send_ding_safe, full_msg) + else: + logger.info(msg) + ding_executor.submit(self._send_ding_safe, full_msg) + + def _send_ding_safe(self, msg): + try: + send_dingtalk_message(msg) + except Exception as e: + logger.warning(f"消息发送失败: {e}") + + def _send_position_message(self, latest_kline): + current_price = float(latest_kline["close"]) + balance = self.get_available_balance() + self.balance = balance if balance is not None else 0.0 + if self.start != 0: + open_avg_price = float(self.open_avg_price) + current_amount = float(self.current_amount) + position_cross = float( + getattr(self, "position_cross", 0) or 0 + ) + if self.start == 1: + unrealized_pnl = current_amount * 0.001 * ( + current_price - open_avg_price + ) + else: + unrealized_pnl = current_amount * 0.001 * ( + open_avg_price - current_price + ) + pnl_rate = ( + (current_price - open_avg_price) + / open_avg_price + * 100 + if self.start == 1 + else (open_avg_price - current_price) + / open_avg_price + * 100 + ) + direction_str = "空" if self.start == -1 else "多" + msg = ( + f"【开盘价五分之一 {self.contract_symbol}】\n" + f"方向:{direction_str}\n" + f"现价:{current_price:.2f}\n" + f"开仓均价:{open_avg_price:.2f}\n" + f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n" + f"余额:{self.balance:.2f}" + ) + else: + msg = ( + f"【开盘价五分之一 {self.contract_symbol}】\n" + f"方向:无\n" + f"现价:{current_price:.2f}\n" + f"余额:{self.balance:.2f}" + ) + self.ding(msg) + + # ==================== 主循环 ==================== + + def action(self): + if not self.set_leverage(): + logger.error("杠杆设置失败") + return + if not self._open_browser(): + self.ding("打开浏览器失败!", error=True) + return + logger.info("浏览器打开成功") + self.close_extra_tabs() + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + logger.info( + f"开盘价五分之一策略(3分钟K线)开始监测,间隔: {self.check_interval}秒" + ) + + last_report_time = 0 + report_interval = 300 + + while True: + for _ in range(5): + if self._open_browser(): + break + time.sleep(5) + else: + self.ding("打开浏览器失败!", error=True) + return + self.close_extra_tabs() + self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") + time.sleep(2) + self.click_safe('x://button[normalize-space(text()) ="市价"]') + + try: + kline_data = self.get_klines() + if not kline_data or len(kline_data) < 3: + logger.warning("K线数据不足...") + time.sleep(self.check_interval) + continue + + curr = kline_data[-1] + if not self.get_position_status(): + logger.warning("获取仓位失败,使用缓存") + + # 第一分钟反手 + if self.start != 0: + first_dir, first_trigger = self.check_first_minute_reverse_signal( + curr, kline_data + ) + if first_dir: + curr_kline_id = curr["id"] + if self.last_trade_kline_id != curr_kline_id: + balance = self.get_available_balance() + trade_size = (balance or 0) * self.risk_percent + if first_dir == "long" and self.start == -1: + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + elif first_dir == "short" and self.start == 1: + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + self.first_minute_reverse_executed = True + self.last_trade_kline_id = curr_kline_id + _, valid_prev = self.find_valid_prev_bar( + kline_data, len(kline_data) - 1 + ) + if valid_prev: + self.entry_prev_body = self.get_body_size(valid_prev) + self.entry_price = float(curr["close"]) + self.entry_kline_id = curr_kline_id + self.get_position_status() + self._send_position_message(curr) + time.sleep(self.check_interval) + continue + + # 常规信号检测 + direction, trigger_price, valid_prev, curr_kline = ( + self.check_realtime_trigger(kline_data, self.start) + ) + + if direction: + curr_kline_id = curr_kline["id"] + if self.last_trade_kline_id == curr_kline_id: + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + if (direction == "long" and self.start == 1) or ( + direction == "short" and self.start == -1 + ): + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + balance = self.get_available_balance() + trade_size = (balance or 0) * self.risk_percent + executed = False + + if direction == "long": + if self.start == -1: + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif self.start == 0: + self.开单(marketPriceLongOrder=1, size=trade_size) + executed = True + elif direction == "short": + if self.start == 1: + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + elif self.start == 0: + self.开单(marketPriceLongOrder=-1, size=trade_size) + executed = True + + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + if executed: + self.last_trade_kline_id = curr_kline_id + self.entry_price = trigger_price + self.entry_prev_body = self.get_body_size(valid_prev) + self.entry_kline_id = curr_kline_id + self.first_minute_reverse_executed = False + self.get_position_status() + self._send_position_message(curr_kline) + last_report_time = time.time() + + if time.time() - last_report_time >= report_interval: + if self.get_position_status(): + self._send_position_message(kline_data[-1]) + last_report_time = time.time() + + time.sleep(self.check_interval) + + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(self.check_interval) + time.sleep(3) + if random.randint(1, 10) > 7: + self.page.close() + time.sleep(15) + + +if __name__ == "__main__": + try: + OpenBasedFifthStrategy(bit_id=BIT_ID).action() + except KeyboardInterrupt: + logger.info("程序被用户中断") + finally: + ding_executor.shutdown(wait=True) + logger.info("已退出")