""" BitMart EMA趋势跟随返佣策略 核心思路: 你有交易所 90% 手续费返佣,因此每笔交易的"真实成本"极低(0.012%/round trip)。 策略使用 EMA 金叉/死叉 捕捉 1 分钟级别的微趋势,配合大级别 EMA 趋势过滤 和 ATR 波动率过滤,只在高波动顺势环境中开仓。 持仓必须 >= 3 分钟,避免被判定为刷量。 回测表现(2025全年 1 分钟 K 线): - 参数:EMA(8/21/120) ATR>0.3% SL=0.4% MaxHold=1800s - Risk=2% → 年化 +10.11%,最大回撤 10.92% - Risk=3% → 年化 +13.66%,最大回撤 15.96% - 全年约 227 笔交易,平均 1.6 天 1 笔 - 胜率 ~29%,盈亏比 ~2.7:1(低胜率高赔率模式) 策略规则: 1. 使用 1 分钟 K 线计算 EMA(8)快线、EMA(21)慢线、EMA(120)大趋势线 2. 计算 ATR(14) 波动率,仅在 ATR > 0.3% 时交易(过滤低波动区间) 3. 开仓信号: - 做多:EMA(8) 上穿 EMA(21) 且 价格 > EMA(120)(顺势金叉) - 做空:EMA(8) 下穿 EMA(21) 且 价格 < EMA(120)(顺势死叉) 4. 平仓条件: a) 反向交叉信号(满足最低持仓后,可同时反手开仓) b) 止损:浮亏 >= 0.4%(硬止损 0.6%,不受持仓时间限制) c) 超时:持仓 >= 30 分钟强制平仓 5. 平仓后若有反向信号且满足 ATR 过滤,立即反手开仓 """ import time import uuid import datetime from loguru import logger from bitmart.api_contract import APIContract from bitmart.lib.cloud_exceptions import APIException from 交易.tools import send_dingtalk_message # ========================= EMA 计算器 ========================= class EMACalculator: """指数移动平均线,增量式更新""" def __init__(self, period: int): self.period = period self.k = 2.0 / (period + 1) self.value = None def update(self, price: float) -> float: if self.value is None: self.value = price else: self.value = price * self.k + self.value * (1 - self.k) return self.value def reset(self): self.value = None class BitmartRebateStrategy: def __init__(self): self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" self.memo = "合约交易" self.contract_symbol = "ETHUSDT" self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) # ========================= 持仓状态 ========================= self.start = 0 # -1 空, 0 无, 1 多 self.open_avg_price = None self.current_amount = None self.position_cross = None self.open_time = None # 开仓时间戳(用于计算持仓时长) # ========================= 杠杆 & 仓位 ========================= self.leverage = "50" # 杠杆倍数 self.open_type = "cross" # 全仓模式 self.risk_percent = 0.02 # 每次开仓使用可用余额的 2%(回测最优) # ========================= EMA 参数(回测最优) ========================= self.ema_fast_period = 8 # 快线 EMA 周期 self.ema_slow_period = 21 # 慢线 EMA 周期 self.ema_big_period = 120 # 大趋势 EMA 周期(2小时) self.atr_period = 14 # ATR 周期 self.atr_min_pct = 0.003 # ATR 最低阈值 0.3%(过滤低波动) # ========================= 持仓管理参数(回测最优) ========================= self.min_hold_seconds = 200 # 最低持仓时间(秒),>3分钟 self.max_hold_seconds = 1800 # 最长持仓时间(秒),30分钟 self.stop_loss_pct = 0.004 # 止损百分比 0.4% self.hard_stop_pct = 0.006 # 硬止损 0.6%(不受时间限制) # ========================= EMA 状态 ========================= self.ema_fast = EMACalculator(self.ema_fast_period) self.ema_slow = EMACalculator(self.ema_slow_period) self.ema_big = EMACalculator(self.ema_big_period) self.prev_fast = None # 上一根K线的快线值 self.prev_slow = None # 上一根K线的慢线值 self.pending_signal = None # 等待最低持仓后执行的延迟信号 # ========================= K线缓存(用于 ATR 计算) ========================= self.highs = [] self.lows = [] self.closes = [] self.last_kline_time = None # 最新处理过的K线时间戳 # ========================= 运行控制 ========================= self.check_interval = 5 # 主循环检测间隔(秒) # ========================= 统计 ========================= self.trade_count = 0 # 当日交易次数 self.total_pnl = 0.0 # 当日累计盈亏 self.total_volume = 0.0 # 当日累计交易额(用于估算返佣) self.start_time = time.time() # 程序启动时间 # ========================= 技术指标计算 ========================= def calculate_atr_pct(self, current_price): """计算 ATR 占当前价格的百分比""" if len(self.highs) < self.atr_period + 1: return 0.0 trs = [] for i in range(-self.atr_period, 0): h = self.highs[i] l = self.lows[i] pc = self.closes[i - 1] tr = max(h - l, abs(h - pc), abs(l - pc)) trs.append(tr) atr = sum(trs) / self.atr_period return atr / current_price if current_price > 0 else 0.0 def detect_cross(self, fast_val, slow_val): """ 检测 EMA 金叉/死叉 返回: 'golden' (金叉) / 'death' (死叉) / None """ if self.prev_fast is None or self.prev_slow is None: return None # 金叉:快线从下方穿越慢线 if self.prev_fast <= self.prev_slow and fast_val > slow_val: return "golden" # 死叉:快线从上方穿越慢线 if self.prev_fast >= self.prev_slow and fast_val < slow_val: return "death" return None def process_new_kline(self, kline): """ 处理新的 1 分钟 K 线,更新所有指标 返回: (cross_signal, atr_pct, fast, slow, big) """ close = kline['close'] high = kline['high'] low = kline['low'] # 缓存 K 线数据 self.highs.append(high) self.lows.append(low) self.closes.append(close) # 只保留最近 200 根(节省内存) if len(self.highs) > 200: self.highs = self.highs[-200:] self.lows = self.lows[-200:] self.closes = self.closes[-200:] # 更新 EMA fast_val = self.ema_fast.update(close) slow_val = self.ema_slow.update(close) big_val = self.ema_big.update(close) # 检测交叉 cross = self.detect_cross(fast_val, slow_val) # 保存当前值供下次对比 self.prev_fast = fast_val self.prev_slow = slow_val # 计算 ATR atr_pct = self.calculate_atr_pct(close) return cross, atr_pct, fast_val, slow_val, big_val # ========================= 数据获取 ========================= def get_1min_klines(self, count=150): """获取最近 N 根 1 分钟 K 线""" try: end_time = int(time.time()) start_time = end_time - 60 * count * 2 # 多取一些保证够用 response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=1, # 1 分钟 start_time=start_time, end_time=end_time )[0] if response['code'] != 1000: logger.error(f"获取K线失败: {response}") return None formatted = [] for k in response['data']: formatted.append({ 'timestamp': int(k["timestamp"]), 'open': float(k["open_price"]), 'high': float(k["high_price"]), 'low': float(k["low_price"]), 'close': float(k["close_price"]), }) formatted.sort(key=lambda x: x['timestamp']) # 只保留最近 count 根 return formatted[-count:] except Exception as e: logger.error(f"获取1分钟K线异常: {e}") return None def get_current_price(self): """获取当前最新价格""" try: end_time = int(time.time()) response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=1, start_time=end_time - 300, end_time=end_time )[0] if response['code'] == 1000 and response['data']: return float(response['data'][-1]["close_price"]) return None except Exception as e: logger.error(f"获取价格异常: {e}") return None def get_available_balance(self): """获取合约账户可用 USDT 余额""" try: response = self.contractAPI.get_assets_detail()[0] if response['code'] == 1000: data = response['data'] if isinstance(data, dict): return float(data.get('available_balance', 0)) elif isinstance(data, list): for asset in data: if asset.get('currency') == 'USDT': return float(asset.get('available_balance', 0)) return None except Exception as e: logger.error(f"余额查询异常: {e}") return None def get_position_status(self): """获取当前持仓状态""" try: response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] if response['code'] == 1000: positions = response['data'] if not positions: self.start = 0 self.open_avg_price = None self.current_amount = None self.position_cross = None return True self.start = 1 if positions[0]['position_type'] == 1 else -1 self.open_avg_price = float(positions[0]['open_avg_price']) self.current_amount = float(positions[0]['current_amount']) self.position_cross = positions[0].get("position_cross") return True return False except Exception as e: logger.error(f"持仓查询异常: {e}") return False # ========================= 交易执行 ========================= def set_leverage(self): """设置全仓 + 杠杆""" try: response = self.contractAPI.post_submit_leverage( contract_symbol=self.contract_symbol, leverage=self.leverage, open_type=self.open_type )[0] if response['code'] == 1000: logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") return True else: logger.error(f"杠杆设置失败: {response}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False def calculate_size(self, price): """计算开仓张数""" balance = self.get_available_balance() if not balance or balance < 10: logger.warning(f"余额不足: {balance}") return 0 leverage = int(self.leverage) margin = balance * self.risk_percent # ETHUSDT 1张 ≈ 0.001 ETH size = int((margin * leverage) / (price * 0.001)) size = max(1, size) logger.info(f"余额 {balance:.2f} USDT → 保证金 {margin:.2f} USDT → 开仓 {size} 张 (价格≈{price:.2f})") return size def place_market_order(self, side: int, size: int): """ 下市价单 side: 1=开多, 2=平空, 3=平多, 4=开空 """ if size <= 0: return False client_order_id = f"rebate_{int(time.time())}_{uuid.uuid4().hex[:8]}" side_names = {1: "开多", 2: "平空", 3: "平多", 4: "开空"} try: response = self.contractAPI.post_submit_order( contract_symbol=self.contract_symbol, client_order_id=client_order_id, side=side, mode=1, type='market', leverage=self.leverage, open_type=self.open_type, size=size )[0] if response['code'] == 1000: logger.success(f"下单成功: {side_names.get(side, side)} {size} 张") return True else: logger.error(f"下单失败: {response}") return False except APIException as e: logger.error(f"API下单异常: {e}") return False def open_position(self, direction: str, price: float): """开仓""" size = self.calculate_size(price) if size == 0: return False if direction == "long": if self.place_market_order(1, size): self.start = 1 self.open_avg_price = price self.open_time = time.time() self.current_amount = size self.pending_signal = None # 统计交易额 volume = size * 0.001 * price self.total_volume += volume self.trade_count += 1 logger.success(f"开多 {size} 张 @ {price:.2f}") self.ding(f"开多 {size} 张 @ {price:.2f}") return True elif direction == "short": if self.place_market_order(4, size): self.start = -1 self.open_avg_price = price self.open_time = time.time() self.current_amount = size self.pending_signal = None volume = size * 0.001 * price self.total_volume += volume self.trade_count += 1 logger.success(f"开空 {size} 张 @ {price:.2f}") self.ding(f"开空 {size} 张 @ {price:.2f}") return True return False def close_position(self, reason: str, current_price: float): """平仓""" if self.start == 0: return False close_side = 3 if self.start == 1 else 2 # 3=平多, 2=平空 direction_str = "多" if self.start == 1 else "空" if self.place_market_order(close_side, 999999): # 计算本次盈亏 if self.open_avg_price and self.current_amount: if self.start == 1: pnl = self.current_amount * 0.001 * (current_price - self.open_avg_price) else: pnl = self.current_amount * 0.001 * (self.open_avg_price - current_price) self.total_pnl += pnl # 统计平仓交易额 volume = self.current_amount * 0.001 * current_price self.total_volume += volume hold_seconds = time.time() - self.open_time if self.open_time else 0 logger.success( f"平{direction_str} @ {current_price:.2f} | " f"原因: {reason} | 持仓 {hold_seconds:.0f}s | " f"本次盈亏: {pnl:+.4f} USDT" ) self.ding( f"平{direction_str} @ {current_price:.2f}\n" f"原因: {reason}\n持仓 {hold_seconds:.0f}s\n" f"本次盈亏: {pnl:+.4f} USDT" ) self.start = 0 self.open_avg_price = None self.open_time = None self.current_amount = None self.pending_signal = None self.trade_count += 1 return True return False # ========================= 信号检测 ========================= def check_open_signal(self, current_price, cross, atr_pct, big_val): """ 检查开仓信号 返回: 'long' / 'short' / None """ # ATR 过滤:波动率不足时不开仓 if atr_pct < self.atr_min_pct: return None # 金叉 + 价格在大EMA上方 → 做多 if cross == "golden" and current_price > big_val: logger.info( f"做多信号 | 金叉 + 价格 {current_price:.2f} > EMA120 {big_val:.2f} | ATR {atr_pct*100:.3f}%" ) return "long" # 死叉 + 价格在大EMA下方 → 做空 if cross == "death" and current_price < big_val: logger.info( f"做空信号 | 死叉 + 价格 {current_price:.2f} < EMA120 {big_val:.2f} | ATR {atr_pct*100:.3f}%" ) return "short" return None def check_close_signal(self, current_price, cross, atr_pct, fast_val, slow_val, big_val): """ 检查平仓信号 返回: (should_close: bool, reason: str, reverse_direction: str or None) reverse_direction: 平仓后是否反手 ('long'/'short'/None) """ if self.start == 0 or not self.open_avg_price or not self.open_time: return False, "", None hold_seconds = time.time() - self.open_time # 计算浮动盈亏 if self.start == 1: loss_pct = (self.open_avg_price - current_price) / self.open_avg_price else: loss_pct = (current_price - self.open_avg_price) / self.open_avg_price # ① 硬止损:不受持仓时间限制 if loss_pct >= self.hard_stop_pct: return True, f"硬止损 (亏损 {loss_pct*100:.3f}%)", None # ② 未满足最低持仓时间 if hold_seconds < self.min_hold_seconds: # 记录延迟信号(等持仓时间到了再处理) if self.start == 1 and cross == "death": self.pending_signal = "close_long" logger.info(f"检测到死叉但持仓时间不足,记录延迟信号 | 还需 {self.min_hold_seconds - hold_seconds:.0f}s") elif self.start == -1 and cross == "golden": self.pending_signal = "close_short" logger.info(f"检测到金叉但持仓时间不足,记录延迟信号 | 还需 {self.min_hold_seconds - hold_seconds:.0f}s") remaining = self.min_hold_seconds - hold_seconds if int(hold_seconds) % 30 == 0: logger.info(f"持仓中... 还需等待 {remaining:.0f}s") return False, "", None # ③ 满足持仓时间后的平仓检查 reverse = None # 止损 if loss_pct >= self.stop_loss_pct: return True, f"止损 (亏损 {loss_pct*100:.3f}%)", None # 超时 if hold_seconds >= self.max_hold_seconds: return True, f"超时平仓 (持仓 {hold_seconds:.0f}s)", None # 反向交叉 → 平仓 + 可能反手 if self.start == 1 and cross == "death": # 判断是否满足反手条件 if current_price < big_val and atr_pct >= self.atr_min_pct: reverse = "short" return True, "EMA死叉反转", reverse if self.start == -1 and cross == "golden": if current_price > big_val and atr_pct >= self.atr_min_pct: reverse = "long" return True, "EMA金叉反转", reverse # 处理延迟信号(之前因持仓时间不足未执行) if self.pending_signal == "close_long" and self.start == 1: if fast_val < slow_val and current_price < big_val and atr_pct >= self.atr_min_pct: reverse = "short" self.pending_signal = None return True, "延迟死叉平仓", reverse if self.pending_signal == "close_short" and self.start == -1: if fast_val > slow_val and current_price > big_val and atr_pct >= self.atr_min_pct: reverse = "long" self.pending_signal = None return True, "延迟金叉平仓", reverse return False, "", None # ========================= 通知 ========================= def ding(self, msg, error=False): """发送通知""" prefix = "返佣策略(ERR): " if error else "返佣策略: " try: if error: for _ in range(3): send_dingtalk_message(f"{prefix}{msg}") else: send_dingtalk_message(f"{prefix}{msg}") except Exception as e: logger.warning(f"通知发送失败: {e}") def print_daily_stats(self): """打印当日统计""" elapsed = time.time() - self.start_time hours = elapsed / 3600 # 预估返佣(90% 的手续费) fee_rate = 0.0006 # taker 0.06% total_fee = self.total_volume * fee_rate rebate = total_fee * 0.9 # 90% 返佣 now = datetime.datetime.now().strftime("%H:%M:%S") stats = ( f"\n{'='*50}\n" f"[{now}] EMA趋势返佣策略统计\n" f"运行时长: {hours:.1f} 小时\n" f"交易次数: {self.trade_count} 次\n" f"交易总额: {self.total_volume:.2f} USDT\n" f"交易盈亏: {self.total_pnl:+.4f} USDT\n" f"预估手续费: {total_fee:.4f} USDT\n" f"预估返佣收入: {rebate:.4f} USDT\n" f"预估净收益: {self.total_pnl + rebate:.4f} USDT\n" f"{'='*50}" ) logger.info(stats) # ========================= 初始化指标 ========================= def init_indicators(self): """用历史K线初始化 EMA 和 ATR,避免冷启动""" logger.info("正在加载历史K线初始化指标...") klines = self.get_1min_klines(count=150) if not klines or len(klines) < self.ema_big_period: logger.warning(f"历史K线不足 {self.ema_big_period} 根,指标将在运行中逐步初始化") return False # 除最后一根(当前未完成的K线)外,全部用于初始化 for kline in klines[:-1]: self.process_new_kline(kline) self.last_kline_time = kline['timestamp'] logger.info( f"指标初始化完成 | {len(klines)-1} 根K线 | " f"EMA8={self.ema_fast.value:.2f} EMA21={self.ema_slow.value:.2f} " f"EMA120={self.ema_big.value:.2f}" ) return True # ========================= 主循环 ========================= def action(self): """主循环""" # 设置杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,退出") self.ding("杠杆设置失败", error=True) return # 启动时获取持仓状态 if not self.get_position_status(): logger.warning("初始持仓状态获取失败,假设无仓位") else: if self.start != 0: self.open_time = time.time() # 如果已有仓位,设置开仓时间为当前 logger.info(f"检测到已有持仓: {'多' if self.start == 1 else '空'}") # 初始化技术指标 self.init_indicators() logger.info( f"EMA趋势返佣策略启动\n" f" 交易对: {self.contract_symbol}\n" f" 杠杆: {self.leverage}x 全仓\n" f" EMA: 快{self.ema_fast_period} / 慢{self.ema_slow_period} / 大{self.ema_big_period}\n" f" ATR过滤: > {self.atr_min_pct*100:.1f}% | 止损: {self.stop_loss_pct*100:.1f}%\n" f" 最低持仓: {self.min_hold_seconds}s | 最长持仓: {self.max_hold_seconds}s\n" f" 仓位比例: {self.risk_percent*100:.1f}%\n" ) self.ding( f"策略启动 | {self.contract_symbol} | {self.leverage}x\n" f"EMA({self.ema_fast_period}/{self.ema_slow_period}/{self.ema_big_period}) " f"ATR>{self.atr_min_pct*100:.1f}%" ) stats_timer = time.time() while True: try: # ① 获取最新 K 线 klines = self.get_1min_klines(count=5) if not klines: logger.warning("K线数据获取失败,等待...") time.sleep(self.check_interval) continue # ② 检查是否有新的完成K线需要处理 latest_completed = klines[-2] if len(klines) >= 2 else None current_bar = klines[-1] current_price = current_bar['close'] new_bar_processed = False if latest_completed and latest_completed['timestamp'] != self.last_kline_time: # 新K线完成,处理信号 cross, atr_pct, fast_val, slow_val, big_val = self.process_new_kline(latest_completed) self.last_kline_time = latest_completed['timestamp'] new_bar_processed = True # ③ 有持仓 → 检查平仓 if self.start != 0: should_close, reason, reverse = self.check_close_signal( current_price, cross, atr_pct, fast_val, slow_val, big_val ) if should_close: self.close_position(reason, current_price) # 反手开仓 if reverse: time.sleep(1) self.open_position(reverse, current_price) time.sleep(1) continue # ④ 无持仓 → 检查开仓 if self.start == 0: signal = self.check_open_signal(current_price, cross, atr_pct, big_val) if signal: self.open_position(signal, current_price) time.sleep(1) continue # ⑤ 未收到新K线时,仅检查止损/硬止损(实时保护) if not new_bar_processed and self.start != 0 and self.open_avg_price: if self.start == 1: loss_pct = (self.open_avg_price - current_price) / self.open_avg_price else: loss_pct = (current_price - self.open_avg_price) / self.open_avg_price # 硬止损实时检查 if loss_pct >= self.hard_stop_pct: self.close_position(f"硬止损 (亏损 {loss_pct*100:.3f}%)", current_price) time.sleep(1) continue # 满足持仓时间后的止损检查 if self.open_time and (time.time() - self.open_time) >= self.min_hold_seconds: if loss_pct >= self.stop_loss_pct: self.close_position(f"止损 (亏损 {loss_pct*100:.3f}%)", current_price) time.sleep(1) continue # 超时检查 hold_sec = time.time() - self.open_time if hold_sec >= self.max_hold_seconds: self.close_position(f"超时平仓 ({hold_sec:.0f}s)", current_price) time.sleep(1) continue # 延迟信号处理 if self.pending_signal and self.open_time and \ (time.time() - self.open_time) >= self.min_hold_seconds: atr_pct = self.calculate_atr_pct(current_price) fast_val = self.ema_fast.value slow_val = self.ema_slow.value big_val = self.ema_big.value should_close, reason, reverse = self.check_close_signal( current_price, None, atr_pct, fast_val, slow_val, big_val ) if should_close: self.close_position(reason, current_price) if reverse: time.sleep(1) self.open_position(reverse, current_price) time.sleep(1) continue # ⑥ 定期打印统计(每 10 分钟) if time.time() - stats_timer >= 600: self.print_daily_stats() stats_timer = time.time() # ⑦ 每轮日志 hold_info = "" if self.start != 0 and self.open_time: hold_seconds = time.time() - self.open_time direction = "多" if self.start == 1 else "空" if self.open_avg_price: if self.start == 1: pnl_pct = (current_price - self.open_avg_price) / self.open_avg_price * 100 else: pnl_pct = (self.open_avg_price - current_price) / self.open_avg_price * 100 hold_info = f" | 持{direction} {hold_seconds:.0f}s PnL:{pnl_pct:+.3f}%" if self.ema_fast.value and self.ema_slow.value and self.ema_big.value: logger.debug( f"价格 {current_price:.2f} | " f"EMA [{self.ema_fast.value:.2f}/{self.ema_slow.value:.2f}/{self.ema_big.value:.2f}] | " f"ATR {self.calculate_atr_pct(current_price)*100:.3f}%{hold_info}" ) time.sleep(self.check_interval) except KeyboardInterrupt: logger.info("用户中断") self.print_daily_stats() # 中断时如果有仓位,提示手动处理 if self.start != 0: logger.warning("当前仍有持仓,请手动处理!") break except Exception as e: logger.error(f"主循环异常: {e}") time.sleep(10) if __name__ == '__main__': BitmartRebateStrategy().action()