""" 布林带均值回归策略 — 实盘交易 (D方案: 递增加仓) BB(10, 2.5) | 5分钟K线 | ETH | 50x杠杆 | 递增加仓+1%/次 max=3 逻辑: - 价格触及上布林带 → 平多(如有) + 开空; 已持空则加仓 - 价格触及下布林带 → 平空(如有) + 开多; 已持多则加仓 - 始终持仓(多空翻转 + 同向加仓) - 加仓比例: 开仓1%, 第1次加仓2%, 第2次3%, 第3次4%, 最多加仓3次 使用浏览器自动化进行开平仓(有手续费返佣),API仅用于查询数据 """ import time import numpy as np from datetime import datetime, timezone from pathlib import Path from loguru import logger from bitmart.api_contract import APIContract from bit_tools import openBrowser from DrissionPage import ChromiumPage, ChromiumOptions # --------------------------------------------------------------------------- # 配置 # --------------------------------------------------------------------------- class BBTradeConfig: # API 凭证 API_KEY = "6104088c65a68d7e53df5d9395b67d78e555293a" SECRET_KEY = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01" MEMO = "me" # 合约 CONTRACT_SYMBOL = "ETHUSDT" TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT" # 浏览器 BIT_ID = "62f9107d0c674925972084e282df55b3" # 布林带参数 BB_PERIOD = 10 # 10根5分钟K线 = 50分钟回看 BB_STD = 2.5 # 标准差倍数 # 仓位管理 LEVERAGE = 50 # 杠杆倍数 OPEN_TYPE = "cross" # 全仓模式 MARGIN_PCT = 0.01 # 首次开仓用权益的1%作为保证金 # 递增加仓 (D方案) PYRAMID_STEP = 0.01 # 每次加仓增加1%权益比例 (1%→2%→3%→4%) PYRAMID_MAX = 3 # 最多加仓3次 (首次开仓不算) # 风控 MAX_DAILY_LOSS = 50.0 # 日最大亏损(U),达到后停止交易 COOLDOWN_SECONDS = 30 # 两次交易之间最小间隔(秒) # 运行 POLL_INTERVAL = 5 # 主循环轮询间隔(秒) KLINE_STEP = 5 # K线周期(分钟) KLINE_HOURS = 2 # 获取最近多少小时K线(需覆盖BB_PERIOD) # --------------------------------------------------------------------------- # 布林带计算 # --------------------------------------------------------------------------- def calc_bollinger(closes: list, period: int, n_std: float): """计算布林带,返回 (mid, upper, lower) 或 None(数据不足时)""" if len(closes) < period: return None arr = np.array(closes[-period:], dtype=float) mid = arr.mean() std = arr.std(ddof=0) upper = mid + n_std * std lower = mid - n_std * std return mid, upper, lower # --------------------------------------------------------------------------- # 交易主类 # --------------------------------------------------------------------------- class BBTrader: def __init__(self, cfg: BBTradeConfig = None): self.cfg = cfg or BBTradeConfig() self.api = APIContract( self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO, timeout=(5, 15) ) # 浏览器 self.page: ChromiumPage | None = None self.page_start = True # 需要(重新)打开浏览器 self.last_page_open_time = 0.0 # 上次打开浏览器的时间 self.PAGE_REFRESH_INTERVAL = 180 # 每3分钟关闭重开浏览器 # 持仓状态: -1=空, 0=无, 1=多 self.position = 0 self.open_avg_price = None self.current_amount = None # 加仓状态 self.pyramid_count = 0 # 当前已加仓次数 (0=仅首次开仓) # 风控 self.daily_pnl = 0.0 self.daily_stopped = False self.current_date = None self.last_trade_time = 0.0 # 日志 self.log_dir = Path(__file__).resolve().parent logger.add( self.log_dir / "bb_trade_{time:YYYY-MM-DD}.log", rotation="1 day", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" ) # ------------------------------------------------------------------ # API 封装 # ------------------------------------------------------------------ def get_klines(self) -> list | None: """获取最近N小时的5分钟K线,返回 [{id, open, high, low, close}, ...]""" try: end_time = int(time.time()) start_time = end_time - 3600 * self.cfg.KLINE_HOURS resp = self.api.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=self.cfg.KLINE_STEP, start_time=start_time, end_time=end_time )[0] if resp.get("code") != 1000: logger.error(f"获取K线失败: {resp}") return None data = resp["data"] klines = [] for k in data: klines.append({ "id": int(k["timestamp"]), "open": float(k["open_price"]), "high": float(k["high_price"]), "low": float(k["low_price"]), "close": float(k["close_price"]), }) klines.sort(key=lambda x: x["id"]) return klines except Exception as e: logger.error(f"获取K线异常: {e}") return None def get_current_price(self) -> float | None: """获取当前最新价格(最近1分钟K线收盘价)""" try: end_time = int(time.time()) resp = self.api.get_kline( contract_symbol=self.cfg.CONTRACT_SYMBOL, step=1, start_time=end_time - 300, end_time=end_time )[0] if resp.get("code") == 1000 and resp["data"]: return float(resp["data"][-1]["close_price"]) return None except Exception as e: logger.error(f"获取价格异常: {e}") return None def get_balance(self) -> float | None: """获取合约账户可用余额""" try: resp = self.api.get_assets_detail()[0] if resp.get("code") == 1000: data = resp["data"] if isinstance(data, dict): return float(data.get("available_balance", 0)) elif isinstance(data, list): for asset in data: if asset.get("currency") == "USDT": return float(asset.get("available_balance", 0)) return None except Exception as e: logger.error(f"查询余额异常: {e}") return None def get_position_status(self) -> bool: """查询当前持仓,更新 self.position / open_avg_price / current_amount""" try: resp = self.api.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0] if resp.get("code") != 1000: logger.error(f"查询持仓失败: {resp}") return False positions = resp["data"] if not positions: self.position = 0 self.open_avg_price = None self.current_amount = None return True pos = positions[0] self.position = 1 if pos["position_type"] == 1 else -1 self.open_avg_price = float(pos["open_avg_price"]) self.current_amount = float(pos["current_amount"]) unrealized = float(pos.get("unrealized_value", 0)) logger.debug(f"持仓: dir={self.position} price={self.open_avg_price} " f"amt={self.current_amount} upnl={unrealized:.2f}") return True except Exception as e: logger.error(f"查询持仓异常: {e}") return False def set_leverage(self) -> bool: """设置杠杆和全仓模式""" try: resp = self.api.post_submit_leverage( contract_symbol=self.cfg.CONTRACT_SYMBOL, leverage=str(self.cfg.LEVERAGE), open_type=self.cfg.OPEN_TYPE )[0] if resp.get("code") == 1000: logger.success(f"杠杆设置成功: {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}") return True else: logger.error(f"杠杆设置失败: {resp}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False # ------------------------------------------------------------------ # 浏览器自动化 # ------------------------------------------------------------------ def open_browser(self) -> bool: """打开浏览器并进入交易页面""" try: bit_port = openBrowser(id=self.cfg.BIT_ID) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) self.last_page_open_time = time.time() return True except Exception as e: logger.error(f"打开浏览器失败: {e}") return False def click_safe(self, xpath, sleep=0.5) -> bool: """安全点击元素""" try: ele = self.page.ele(xpath) if not ele: return False ele.click(by_js=True) time.sleep(sleep) return True except Exception as e: logger.warning(f"点击失败 [{xpath}]: {e}") return False def browser_close_position(self) -> bool: """浏览器点击市价全平""" logger.info("浏览器操作: 市价平仓") return self.click_safe('x://span[normalize-space(text()) ="市价"]') # ------------------------------------------------------------------ # 仓位操作 # ------------------------------------------------------------------ def calc_order_usdt(self, is_add: bool = False) -> float: """ 计算开仓/加仓金额(U) 首次开仓: 余额 × MARGIN_PCT (1%) 加仓: 余额 × (MARGIN_PCT + PYRAMID_STEP × (pyramid_count+1)) 例: 开仓1%, 第1次加仓2%, 第2次加仓3%, 第3次加仓4% """ balance = self.get_balance() if balance is None or balance <= 0: logger.warning(f"余额不足或查询失败: {balance}") return 0 if is_add: pct = self.cfg.MARGIN_PCT + self.cfg.PYRAMID_STEP * (self.pyramid_count + 1) else: pct = self.cfg.MARGIN_PCT order_usdt = round(balance * pct, 2) logger.info(f"仓位计算: 余额={balance:.2f} × {pct:.0%} = {order_usdt} U" f" ({'加仓#' + str(self.pyramid_count+1) if is_add else '首次开仓'})") return order_usdt def verify_position(self, expected: int) -> bool: """验证持仓方向""" if self.get_position_status(): if self.position == expected: return True logger.warning(f"持仓方向不符: 期望{expected}, 实际{self.position}") return False # ------------------------------------------------------------------ # 风控 # ------------------------------------------------------------------ def check_daily_reset(self): """每日重置(UTC+8 00:00 = UTC 16:00)""" now = datetime.now(timezone.utc) # 用UTC日期做简单日切 today = now.date() if self.current_date != today: if self.current_date is not None: logger.info(f"日切: {self.current_date} → {today}, 日PnL={self.daily_pnl:.2f}") self.current_date = today self.daily_pnl = 0.0 self.daily_stopped = False def can_trade(self) -> bool: """检查是否可交易""" if self.daily_stopped: return False now = time.time() if now - self.last_trade_time < self.cfg.COOLDOWN_SECONDS: remain = self.cfg.COOLDOWN_SECONDS - (now - self.last_trade_time) logger.debug(f"交易冷却中,剩余 {remain:.0f}s") return False return True # ------------------------------------------------------------------ # 日志 # ------------------------------------------------------------------ def write_trade_log(self, action: str, price: float, bb_upper: float, bb_mid: float, bb_lower: float, reason: str): """写入交易日志文件""" try: date_str = datetime.now().strftime("%Y%m%d") log_file = self.log_dir / f"bb_trade_log_{date_str}.txt" time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") block = ( f"\n{'='*60}\n" f"时间: {time_str}\n" f"操作: {action}\n" f"价格: {price:.2f}\n" f"BB上轨: {bb_upper:.2f} | 中轨: {bb_mid:.2f} | 下轨: {bb_lower:.2f}\n" f"原因: {reason}\n" f"{'='*60}\n" ) with open(log_file, "a", encoding="utf-8") as f: f.write(block) except Exception as e: logger.warning(f"写入日志失败: {e}") # ------------------------------------------------------------------ # 主循环(浏览器流程与四分之一代码一致) # ------------------------------------------------------------------ def run(self): """策略主循环""" logger.info("=" * 60) logger.info(f" BB策略启动(D方案): BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})") logger.info(f" 合约: {self.cfg.CONTRACT_SYMBOL} | {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}") logger.info(f" 首次开仓: 权益×{self.cfg.MARGIN_PCT:.0%} | 递增加仓: +{self.cfg.PYRAMID_STEP:.0%}/次 | 最多{self.cfg.PYRAMID_MAX}次") logger.info("=" * 60) # 设置杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,退出") return # 初始持仓同步 if not self.get_position_status(): logger.error("初始持仓查询失败,退出") return logger.info(f"初始持仓状态: {self.position}") last_kline_id = None # 避免同一根K线重复触发 page_start = True # 需要打开浏览器 while True: # ===== 浏览器管理 ===== # page_start时: 打开浏览器 → 导航 → 点市价 → 输入张数 if page_start: for i in range(5): if self.open_browser(): logger.info("浏览器打开成功") break else: logger.error("打开浏览器失败!") return self.page.get(self.cfg.TRADE_URL) time.sleep(2) # 点击市价模式 self.click_safe('x://button[normalize-space(text()) ="市价"]') time.sleep(0.5) # 计算并预输入开仓金额(U) current_price = self.get_current_price() if current_price: order_usdt = self.calc_order_usdt() if order_usdt > 0: self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True) logger.info(f"预输入开仓金额: {order_usdt} U") page_start = False try: # 每3分钟关闭浏览器重新打开 if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL: logger.info("浏览器已打开超过3分钟,关闭刷新") try: self.page.close() except Exception: pass self.page = None page_start = True time.sleep(3) continue self.check_daily_reset() if self.daily_stopped: logger.info(f"日亏损已达限制({self.daily_pnl:.2f}),等待日切") time.sleep(60) continue # 1. 获取K线 klines = self.get_klines() if not klines or len(klines) < self.cfg.BB_PERIOD: logger.warning(f"K线数据不足({len(klines) if klines else 0}根),等待...") time.sleep(self.cfg.POLL_INTERVAL) continue closed_klines = klines[:-1] current_kline = klines[-1] if len(closed_klines) < self.cfg.BB_PERIOD: time.sleep(self.cfg.POLL_INTERVAL) continue # 2. 计算布林带 closes = [k["close"] for k in closed_klines] bb = calc_bollinger(closes, self.cfg.BB_PERIOD, self.cfg.BB_STD) if bb is None: time.sleep(self.cfg.POLL_INTERVAL) continue bb_mid, bb_upper, bb_lower = bb # 3. 获取当前价格 current_price = self.get_current_price() if current_price is None: time.sleep(self.cfg.POLL_INTERVAL) continue cur_high = current_kline["high"] cur_low = current_kline["low"] # 容错: K线high/low + 当前实时价格,任一触及即算触碰 touched_upper = cur_high >= bb_upper or current_price >= bb_upper touched_lower = cur_low <= bb_lower or current_price <= bb_lower logger.info( f"价格={current_price:.2f} | " f"BB: {bb_lower:.2f} / {bb_mid:.2f} / {bb_upper:.2f} | " f"H={cur_high:.2f} L={cur_low:.2f} | " f"触上={touched_upper} 触下={touched_lower} | " f"仓位={self.position}" ) # 4. 同步持仓状态 if not self.get_position_status(): time.sleep(self.cfg.POLL_INTERVAL) continue # 5. 信号判断 kline_id = current_kline["id"] if kline_id == last_kline_id: time.sleep(self.cfg.POLL_INTERVAL) continue if touched_upper and touched_lower: logger.warning("同时触及上下轨,跳过") time.sleep(self.cfg.POLL_INTERVAL) continue action = None reason = "" success = False # ===== 触及上轨 → 开空 / 翻转为空 / 加仓空 ===== if touched_upper: if not self.can_trade(): time.sleep(self.cfg.POLL_INTERVAL) continue reason = (f"价格最高{cur_high:.2f}触及上轨{bb_upper:.2f}," f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})") if self.position == 1: action = "翻转: 平多→开空" # 在当前页面点市价平仓 self.browser_close_position() time.sleep(1) # 等待确认平仓 for _ in range(10): if self.get_position_status() and self.position == 0: break time.sleep(1) if self.position != 0: logger.warning(f"平仓后仍有持仓({self.position}),放弃开空") time.sleep(self.cfg.POLL_INTERVAL) continue # 翻转时重置加仓计数 self.pyramid_count = 0 # 平仓后在同一页面直接点卖出/做空 logger.info("平仓完成,直接开空") self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') time.sleep(3) if self.verify_position(-1): success = True elif self.position == 0: action = "开空" self.pyramid_count = 0 self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') time.sleep(3) if self.verify_position(-1): success = True elif self.position == -1 and self.pyramid_count < self.cfg.PYRAMID_MAX: # 已持空仓 + 再次触上轨 → 加仓做空 action = f"加仓空#{self.pyramid_count+1}" reason += f" (加仓#{self.pyramid_count+1}/{self.cfg.PYRAMID_MAX})" # 重新计算加仓金额并输入 add_usdt = self.calc_order_usdt(is_add=True) if add_usdt > 0: self.page.ele('x://*[@id="size_0"]').input(vals=add_usdt, clear=True) time.sleep(0.5) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') time.sleep(3) if self.verify_position(-1): self.pyramid_count += 1 success = True else: logger.info(f"已持空仓,加仓已达上限({self.pyramid_count}/{self.cfg.PYRAMID_MAX})") # ===== 触及下轨 → 开多 / 翻转为多 / 加仓多 ===== elif touched_lower: if not self.can_trade(): time.sleep(self.cfg.POLL_INTERVAL) continue reason = (f"价格最低{cur_low:.2f}触及下轨{bb_lower:.2f}," f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})") if self.position == -1: action = "翻转: 平空→开多" self.browser_close_position() time.sleep(1) for _ in range(10): if self.get_position_status() and self.position == 0: break time.sleep(1) if self.position != 0: logger.warning(f"平仓后仍有持仓({self.position}),放弃开多") time.sleep(self.cfg.POLL_INTERVAL) continue # 翻转时重置加仓计数 self.pyramid_count = 0 logger.info("平仓完成,直接开多") self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') time.sleep(3) if self.verify_position(1): success = True elif self.position == 0: action = "开多" self.pyramid_count = 0 self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') time.sleep(3) if self.verify_position(1): success = True elif self.position == 1 and self.pyramid_count < self.cfg.PYRAMID_MAX: # 已持多仓 + 再次触下轨 → 加仓做多 action = f"加仓多#{self.pyramid_count+1}" reason += f" (加仓#{self.pyramid_count+1}/{self.cfg.PYRAMID_MAX})" # 重新计算加仓金额并输入 add_usdt = self.calc_order_usdt(is_add=True) if add_usdt > 0: self.page.ele('x://*[@id="size_0"]').input(vals=add_usdt, clear=True) time.sleep(0.5) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') time.sleep(3) if self.verify_position(1): self.pyramid_count += 1 success = True else: logger.info(f"已持多仓,加仓已达上限({self.pyramid_count}/{self.cfg.PYRAMID_MAX})") # ===== 交易成功后处理 ===== if success and action: last_kline_id = kline_id self.last_trade_time = time.time() self.write_trade_log(action, current_price, bb_upper, bb_mid, bb_lower, reason) logger.success(f"{action} 执行成功") # 交易完成后关闭浏览器,下轮重新打开 page_start = True try: self.page.close() except Exception: pass self.page = None time.sleep(5) time.sleep(self.cfg.POLL_INTERVAL) except KeyboardInterrupt: logger.info("用户中断,程序退出") break except Exception as e: logger.error(f"主循环异常: {e}") page_start = True time.sleep(10) # --------------------------------------------------------------------------- # 入口 # --------------------------------------------------------------------------- if __name__ == "__main__": trader = BBTrader() trader.run()