# -*- coding: utf-8 -*- """ 自适应三分位趋势策略 - 实盘交易 拉取 Bitmart 5/15/60 分钟数据,计算信号,执行开平仓(沿用 交易/bitmart-三分之一策略交易 的浏览器/API 逻辑) """ import os import sys import time import datetime from typing import List, Dict, Optional, Tuple try: from loguru import logger except ImportError: import logging logger = logging.getLogger(__name__) ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from bitmart.api_contract import APIContract from adaptive_third_strategy.config import ( STEP_5M, STEP_15M, STEP_60M, ATR_PERIOD, EMA_SHORT, EMA_MID_FAST, EMA_MID_SLOW, EMA_LONG_FAST, EMA_LONG_SLOW, BASE_POSITION_PERCENT, MAX_POSITION_PERCENT, CONTRACT_SYMBOL, ) from adaptive_third_strategy.indicators import get_ema_atr_from_klines, align_higher_tf_ema from adaptive_third_strategy.strategy_core import check_trigger, build_volume_ma from adaptive_third_strategy.data_fetcher import fetch_klines, _format_bar SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) class AdaptiveThirdStrategyTrader: """实盘:获取多周期K线 -> 策略信号 -> 开平仓(可接浏览器或纯 API)""" def __init__( self, api_key: str, secret_key: str, memo: str = "合约交易", symbol: str = CONTRACT_SYMBOL, bit_id: Optional[str] = None, ): self.api_key = api_key self.secret_key = secret_key self.memo = memo self.symbol = symbol self.bit_id = bit_id self.contractAPI = APIContract(api_key, secret_key, memo, timeout=(5, 15)) self.check_interval = 3 self.last_trigger_kline_id: Optional[int] = None self.last_trigger_direction: Optional[str] = None self.last_trade_kline_id: Optional[int] = None self.position_direction: int = 0 # 1 多 -1 空 0 无 def get_klines_multi(self) -> Tuple[List[Dict], List[Dict], List[Dict]]: """拉取当前 5/15/60 分钟 K 线(最近约 3 小时足够算 EMA/ATR)""" end_ts = int(time.time()) start_ts = end_ts - 3600 * 3 k5 = fetch_klines(self.contractAPI, STEP_5M, start_ts, end_ts, self.symbol) k15 = fetch_klines(self.contractAPI, STEP_15M, start_ts, end_ts, self.symbol) k60 = fetch_klines(self.contractAPI, STEP_60M, start_ts, end_ts, self.symbol) return k5, k15, k60 def get_position_status(self) -> bool: """查询当前持仓,更新 self.position_direction""" try: response = self.contractAPI.get_position(contract_symbol=self.symbol)[0] if response.get("code") != 1000: return False positions = response.get("data", []) if not positions: self.position_direction = 0 return True self.position_direction = 1 if positions[0].get("position_type") == 1 else -1 return True except Exception as e: logger.error(f"持仓查询异常: {e}") return False def check_realtime_signal( self, klines_5m: List[Dict], klines_15m: List[Dict], klines_60m: List[Dict], ) -> Tuple[Optional[str], Optional[float], Optional[Dict]]: """ 基于当前 K 线(最后一根未收盘)检测实时信号。 返回 (方向, 触发价, 当前K线) 或 (None, None, None)。 """ if len(klines_5m) < ATR_PERIOD + 2: return None, None, None from adaptive_third_strategy.indicators import get_ema_atr_from_klines from adaptive_third_strategy.strategy_core import check_trigger ema_5m, atr_5m = get_ema_atr_from_klines(klines_5m, EMA_SHORT, ATR_PERIOD) ema_15m_align = align_higher_tf_ema(klines_5m, klines_15m, EMA_MID_FAST, EMA_MID_SLOW) ema_60m_align = align_higher_tf_ema(klines_5m, klines_60m, EMA_LONG_FAST, EMA_LONG_SLOW) volume_ma = build_volume_ma(klines_5m) curr_idx = len(klines_5m) - 1 direction, trigger_price, _, _ = check_trigger( klines_5m, curr_idx, atr_5m, ema_5m, ema_15m_align, ema_60m_align, volume_ma, use_confirm=True ) curr = klines_5m[curr_idx] curr_id = curr["id"] if direction is None: return None, None, None if self.last_trigger_kline_id == curr_id and self.last_trigger_direction == direction: return None, None, None if self.last_trade_kline_id == curr_id: return None, None, None if (direction == "long" and self.position_direction == 1) or (direction == "short" and self.position_direction == -1): return None, None, None return direction, trigger_price, curr def run_loop(self, ding_callback=None): """ 主循环:拉数据 -> 检测信号 -> 若需交易则调用开平仓(需外部实现或注入)。 ding_callback(msg, error=False) 可选,用于钉钉通知。 """ def ding(msg: str, error: bool = False): if ding_callback: ding_callback(msg, error) else: if error: logger.error(msg) else: logger.info(msg) logger.info("自适应三分位趋势策略实盘启动,按 Bitmart 方式拉取 5/15/60 分钟数据") while True: try: k5, k15, k60 = self.get_klines_multi() if len(k5) < ATR_PERIOD + 2: time.sleep(self.check_interval) continue if not self.get_position_status(): time.sleep(self.check_interval) continue direction, trigger_price, curr = self.check_realtime_signal(k5, k15, k60) if direction and trigger_price is not None and curr is not None: curr_id = curr["id"] ding(f"信号: {direction} @ {trigger_price:.2f} 当前K线 id={curr_id}") # 这里接入你的开平仓实现:平仓 + 开单(可调用 交易/bitmart-三分之一策略交易 的 平仓/开单 或 API 下单) # 示例:仅记录,不实际下单 self.last_trigger_kline_id = curr_id self.last_trigger_direction = direction self.last_trade_kline_id = curr_id time.sleep(self.check_interval) except Exception as e: logger.exception(e) time.sleep(60) def main(): import argparse parser = argparse.ArgumentParser(description="自适应三分位趋势策略实盘") parser.add_argument("--api-key", default="", help="Bitmart API Key") parser.add_argument("--secret-key", default="", help="Bitmart Secret Key") parser.add_argument("--bit-id", default="", help="比特浏览器 ID(若用浏览器下单)") args = parser.parse_args() api_key = args.api_key or os.environ.get("BITMART_API_KEY", "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8") secret_key = args.secret_key or os.environ.get("BITMART_SECRET_KEY", "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5") trader = AdaptiveThirdStrategyTrader(api_key, secret_key, bit_id=args.bit_id or None) try: from 交易.tools import send_dingtalk_message def ding(msg, error=False): prefix = "❌自适应三分位:" if error else "🔔自适应三分位:" send_dingtalk_message(f"{prefix}{msg}") except Exception: def ding(msg, error=False): logger.info(msg) trader.run_loop(ding_callback=ding) if __name__ == "__main__": main()