""" 布林带均值回归策略 — 实盘交易 BB(10, 2.5) | 5分钟K线 | ETH | 50x杠杆 | 每单权益1% 逻辑: - 价格触及上布林带 → 平多(如有) + 开空 - 价格触及下布林带 → 平空(如有) + 开多 - 始终持仓(多空翻转) 使用 BitMart Futures API 进行开平仓 """ import time import uuid import numpy as np from datetime import datetime from pathlib import Path from loguru import logger from bitmart.api_contract import APIContract # --------------------------------------------------------------------------- # 配置 # --------------------------------------------------------------------------- class BBTradeConfig: # API 凭证 API_KEY = "6104088c65a68d7e53df5d9395b67d78e555293a" SECRET_KEY = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01" MEMO = "me" # 合约 CONTRACT_SYMBOL = "ETHUSDT" # 布林带参数 BB_PERIOD = 10 # 10根5分钟K线 = 50分钟回看 BB_STD = 2.5 # 标准差倍数 # 仓位管理 LEVERAGE = 50 # 杠杆倍数 OPEN_TYPE = "cross" # 全仓模式 MARGIN_PCT = 0.01 # 每单用权益的1%作为保证金 # 风控 MAX_DAILY_LOSS = 50.0 # 日最大亏损(U),达到后停止交易 COOLDOWN_SECONDS = 30 # 两次交易之间最小间隔(秒) # 运行 POLL_INTERVAL = 5 # 主循环轮询间隔(秒) KLINE_STEP = 5 # K线周期(分钟) KLINE_HOURS = 2 # 获取最近多少小时K线(需覆盖BB_PERIOD) # --------------------------------------------------------------------------- # 布林带计算 # --------------------------------------------------------------------------- def calc_bollinger(closes: list, period: int, n_std: float): """计算布林带,返回 (mid, upper, lower) 或 None(数据不足时)""" if len(closes) < period: return None arr = np.array(closes[-period:], dtype=float) mid = arr.mean() std = arr.std(ddof=0) upper = mid + n_std * std lower = mid - n_std * std return mid, upper, lower # --------------------------------------------------------------------------- # 交易主类 # --------------------------------------------------------------------------- class BBTrader: def __init__(self, cfg: BBTradeConfig = None): self.cfg = cfg or BBTradeConfig() self.api = APIContract( self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO, timeout=(5, 15) ) # 持仓状态: -1=空, 0=无, 1=多 self.position = 0 self.open_avg_price = None self.current_amount = None # 风控 self.daily_pnl = 0.0 self.daily_stopped = False self.current_date = None self.last_trade_time = 0.0 # 日志 self.log_dir = Path(__file__).resolve().parent logger.add( self.log_dir / "bb_trade_{time:YYYY-MM-DD}.log", rotation="1 day", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" ) # ------------------------------------------------------------------ # API 封装 # ------------------------------------------------------------------ def get_klines(self) -> list | None: """获取最近N小时的5分钟K线,返回 [{id, open, high, low, close}, ...]""" try: end_time = int(time.time()) start_time = end_time - 3600 * self.cfg.KLINE_HOURS resp = self.api.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=self.cfg.KLINE_STEP, start_time=start_time, end_time=end_time )[0] if resp.get("code") != 1000: logger.error(f"获取K线失败: {resp}") return None data = resp["data"] klines = [] for k in data: klines.append({ "id": int(k["timestamp"]), "open": float(k["open_price"]), "high": float(k["high_price"]), "low": float(k["low_price"]), "close": float(k["close_price"]), }) klines.sort(key=lambda x: x["id"]) return klines except Exception as e: logger.error(f"获取K线异常: {e}") return None def get_current_price(self) -> float | None: """获取当前最新价格(最近1分钟K线收盘价)""" try: end_time = int(time.time()) resp = self.api.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=1, start_time=end_time - 300, end_time=end_time )[0] if resp.get("code") == 1000 and resp["data"]: return float(resp["data"][-1]["close_price"]) return None except Exception as e: logger.error(f"获取价格异常: {e}") return None def get_balance(self) -> float | None: """获取合约账户可用余额""" try: resp = self.api.get_assets_detail()[0] if resp.get("code") == 1000: data = resp["data"] if isinstance(data, dict): return float(data.get("available_balance", 0)) elif isinstance(data, list): for asset in data: if asset.get("currency") == "USDT": return float(asset.get("available_balance", 0)) return None except Exception as e: logger.error(f"查询余额异常: {e}") return None def get_position_status(self) -> bool: """查询当前持仓,更新 self.position / open_avg_price / current_amount""" try: resp = self.api.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0] if resp.get("code") != 1000: logger.error(f"查询持仓失败: {resp}") return False positions = resp["data"] if not positions: self.position = 0 self.open_avg_price = None self.current_amount = None return True pos = positions[0] self.position = 1 if pos["position_type"] == 1 else -1 self.open_avg_price = float(pos["open_avg_price"]) self.current_amount = float(pos["current_amount"]) unrealized = float(pos.get("unrealized_value", 0)) logger.debug(f"持仓: dir={self.position} price={self.open_avg_price} " f"amt={self.current_amount} upnl={unrealized:.2f}") return True except Exception as e: logger.error(f"查询持仓异常: {e}") return False def set_leverage(self) -> bool: """设置杠杆和全仓模式""" try: resp = self.api.post_submit_leverage( contract_symbol=self.cfg.CONTRACT_SYMBOL, leverage=str(self.cfg.LEVERAGE), open_type=self.cfg.OPEN_TYPE )[0] if resp.get("code") == 1000: logger.success(f"杠杆设置成功: {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}") return True else: logger.error(f"杠杆设置失败: {resp}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False def _gen_client_order_id(self) -> str: return f"BB_{uuid.uuid4().hex[:12]}" def submit_order(self, side: int, size: int) -> bool: """ 提交市价单 side: 1=买入开多, 2=买入平空, 3=卖出平多, 4=卖出开空 size: 张数 """ side_names = {1: "买入开多", 2: "买入平空", 3: "卖出平多", 4: "卖出开空"} logger.info(f"下单: {side_names.get(side, side)} {size}张") try: resp = self.api.post_submit_order( contract_symbol=self.cfg.CONTRACT_SYMBOL, client_order_id=self._gen_client_order_id(), side=side, mode=1, # GTC type="market", leverage=str(self.cfg.LEVERAGE), open_type=self.cfg.OPEN_TYPE, size=size, )[0] if resp.get("code") == 1000: logger.success(f"下单成功: {side_names.get(side)} {size}张 resp={resp}") return True else: logger.error(f"下单失败: {resp}") return False except Exception as e: logger.error(f"下单异常: {e}") return False # ------------------------------------------------------------------ # 仓位操作 # ------------------------------------------------------------------ def calc_order_size(self, price: float) -> int: """ 根据当前权益的1%计算开仓张数 BitMart ETH合约: 1张 = 0.01 ETH 保证金 = equity * margin_pct 名义价值 = margin * leverage 数量(ETH) = 名义价值 / price 张数 = 数量 / 0.01 """ balance = self.get_balance() if balance is None or balance <= 0: logger.warning(f"余额不足或查询失败: {balance}") return 0 margin = balance * self.cfg.MARGIN_PCT notional = margin * self.cfg.LEVERAGE qty_eth = notional / price size = max(1, int(qty_eth / 0.01)) # 1张=0.01ETH logger.info(f"仓位计算: 余额={balance:.2f} 保证金={margin:.2f} " f"名义={notional:.2f} 数量={qty_eth:.4f}ETH = {size}张") return size def close_current_position(self) -> bool: """平掉当前持仓""" if not self.get_position_status(): return False if self.position == 0: logger.info("无持仓,无需平仓") return True if self.position == 1: # 平多: side=3 size = int(self.current_amount) return self.submit_order(side=3, size=size) else: # 平空: side=2 size = int(self.current_amount) return self.submit_order(side=2, size=size) def open_long(self, price: float) -> bool: """开多""" size = self.calc_order_size(price) if size <= 0: return False return self.submit_order(side=1, size=size) def open_short(self, price: float) -> bool: """开空""" size = self.calc_order_size(price) if size <= 0: return False return self.submit_order(side=4, size=size) def flip_to_long(self, price: float) -> bool: """平空 → 开多""" logger.info("=== 翻转为多 ===") if self.position == -1: if not self.close_current_position(): logger.error("平空失败,放弃开多") return False time.sleep(2) # 确认已无仓 for _ in range(5): if self.get_position_status() and self.position == 0: break time.sleep(1) if self.position != 0: logger.warning(f"平仓后仍有持仓({self.position}),放弃开多") return False return self.open_long(price) def flip_to_short(self, price: float) -> bool: """平多 → 开空""" logger.info("=== 翻转为空 ===") if self.position == 1: if not self.close_current_position(): logger.error("平多失败,放弃开空") return False time.sleep(2) for _ in range(5): if self.get_position_status() and self.position == 0: break time.sleep(1) if self.position != 0: logger.warning(f"平仓后仍有持仓({self.position}),放弃开空") return False return self.open_short(price) # ------------------------------------------------------------------ # 风控 # ------------------------------------------------------------------ def check_daily_reset(self): """每日重置(UTC+8 00:00 = UTC 16:00)""" now = datetime.utcnow() # 用UTC日期做简单日切 today = now.date() if self.current_date != today: if self.current_date is not None: logger.info(f"日切: {self.current_date} → {today}, 日PnL={self.daily_pnl:.2f}") self.current_date = today self.daily_pnl = 0.0 self.daily_stopped = False def can_trade(self) -> bool: """检查是否可交易""" if self.daily_stopped: return False now = time.time() if now - self.last_trade_time < self.cfg.COOLDOWN_SECONDS: remain = self.cfg.COOLDOWN_SECONDS - (now - self.last_trade_time) logger.debug(f"交易冷却中,剩余 {remain:.0f}s") return False return True # ------------------------------------------------------------------ # 日志 # ------------------------------------------------------------------ def write_trade_log(self, action: str, price: float, bb_upper: float, bb_mid: float, bb_lower: float, reason: str): """写入交易日志文件""" try: date_str = datetime.now().strftime("%Y%m%d") log_file = self.log_dir / f"bb_trade_log_{date_str}.txt" time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") block = ( f"\n{'='*60}\n" f"时间: {time_str}\n" f"操作: {action}\n" f"价格: {price:.2f}\n" f"BB上轨: {bb_upper:.2f} | 中轨: {bb_mid:.2f} | 下轨: {bb_lower:.2f}\n" f"原因: {reason}\n" f"{'='*60}\n" ) with open(log_file, "a", encoding="utf-8") as f: f.write(block) except Exception as e: logger.warning(f"写入日志失败: {e}") # ------------------------------------------------------------------ # 主循环 # ------------------------------------------------------------------ def run(self): """策略主循环""" logger.info("=" * 60) logger.info(f" BB策略启动: BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})") logger.info(f" 合约: {self.cfg.CONTRACT_SYMBOL} | {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}") logger.info(f" 每单: 权益×{self.cfg.MARGIN_PCT:.0%}") logger.info("=" * 60) # 设置杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,退出") return # 初始持仓同步 if not self.get_position_status(): logger.error("初始持仓查询失败,退出") return logger.info(f"初始持仓状态: {self.position}") last_kline_id = None # 避免同一根K线重复触发 while True: try: self.check_daily_reset() if self.daily_stopped: logger.info(f"日亏损已达限制({self.daily_pnl:.2f}),等待日切") time.sleep(60) continue # 1. 获取K线 klines = self.get_klines() if not klines or len(klines) < self.cfg.BB_PERIOD: logger.warning(f"K线数据不足({len(klines) if klines else 0}根),等待...") time.sleep(self.cfg.POLL_INTERVAL) continue # 当前K线 = 最后一根(未收盘),信号用已收盘的K线 # 使用倒数第二根及之前的收盘价算BB(已收盘的K线) closed_klines = klines[:-1] # 已收盘的K线 current_kline = klines[-1] # 当前未收盘K线 if len(closed_klines) < self.cfg.BB_PERIOD: time.sleep(self.cfg.POLL_INTERVAL) continue # 2. 计算布林带 closes = [k["close"] for k in closed_klines] bb = calc_bollinger(closes, self.cfg.BB_PERIOD, self.cfg.BB_STD) if bb is None: time.sleep(self.cfg.POLL_INTERVAL) continue bb_mid, bb_upper, bb_lower = bb # 3. 获取当前价格 current_price = self.get_current_price() if current_price is None: time.sleep(self.cfg.POLL_INTERVAL) continue # 用当前K线的 high/low 判断是否触及布林带 cur_high = current_kline["high"] cur_low = current_kline["low"] touched_upper = cur_high >= bb_upper touched_lower = cur_low <= bb_lower logger.info( f"价格={current_price:.2f} | " f"BB: {bb_lower:.2f} / {bb_mid:.2f} / {bb_upper:.2f} | " f"H={cur_high:.2f} L={cur_low:.2f} | " f"触上={touched_upper} 触下={touched_lower} | " f"仓位={self.position}" ) # 4. 同步持仓状态 if not self.get_position_status(): time.sleep(self.cfg.POLL_INTERVAL) continue # 5. 信号判断 + 执行 # 同一根K线只触发一次 kline_id = current_kline["id"] if kline_id == last_kline_id: # 已在这根K线触发过,不重复操作 time.sleep(self.cfg.POLL_INTERVAL) continue # 同时触及上下轨(极端波动)→ 跳过 if touched_upper and touched_lower: logger.warning("同时触及上下轨,跳过") time.sleep(self.cfg.POLL_INTERVAL) continue action = None reason = "" if touched_upper: # 触及上轨 → 开空 / 翻转为空 if not self.can_trade(): time.sleep(self.cfg.POLL_INTERVAL) continue reason = (f"价格最高{cur_high:.2f}触及上轨{bb_upper:.2f}," f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})") if self.position == 1: action = "翻转: 平多→开空" success = self.flip_to_short(current_price) elif self.position == 0: action = "开空" success = self.open_short(current_price) else: # 已经是空仓,不操作 logger.info("已持空仓,触上轨无需操作") success = False if success: last_kline_id = kline_id self.last_trade_time = time.time() self.write_trade_log(action, current_price, bb_upper, bb_mid, bb_lower, reason) logger.success(f"{action} 执行成功") elif touched_lower: # 触及下轨 → 开多 / 翻转为多 if not self.can_trade(): time.sleep(self.cfg.POLL_INTERVAL) continue reason = (f"价格最低{cur_low:.2f}触及下轨{bb_lower:.2f}," f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})") if self.position == -1: action = "翻转: 平空→开多" success = self.flip_to_long(current_price) elif self.position == 0: action = "开多" success = self.open_long(current_price) else: logger.info("已持多仓,触下轨无需操作") success = False if success: last_kline_id = kline_id self.last_trade_time = time.time() self.write_trade_log(action, current_price, bb_upper, bb_mid, bb_lower, reason) logger.success(f"{action} 执行成功") time.sleep(self.cfg.POLL_INTERVAL) except KeyboardInterrupt: logger.info("用户中断,程序退出") break except Exception as e: logger.error(f"主循环异常: {e}") time.sleep(10) # --------------------------------------------------------------------------- # 入口 # --------------------------------------------------------------------------- if __name__ == "__main__": trader = BBTrader() trader.run()