Files
codex_jxs_code/bb_trade.py
2026-02-25 06:21:49 +08:00

618 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带均值回归策略 — 实盘交易 (D方案: 递增加仓)
BB(10, 2.5) | 5分钟K线 | ETH | 50x杠杆 | 递增加仓+1%/次 max=3
逻辑:
- 价格触及上布林带 → 平多(如有) + 开空; 已持空则加仓
- 价格触及下布林带 → 平空(如有) + 开多; 已持多则加仓
- 始终持仓(多空翻转 + 同向加仓)
- 加仓比例: 开仓1%, 第1次加仓2%, 第2次3%, 第3次4%, 最多加仓3次
使用浏览器自动化进行开平仓有手续费返佣API仅用于查询数据
"""
import time
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
from loguru import logger
from bitmart.api_contract import APIContract
from bit_tools import openBrowser
from DrissionPage import ChromiumPage, ChromiumOptions
# ---------------------------------------------------------------------------
# 配置
# ---------------------------------------------------------------------------
class BBTradeConfig:
# API 凭证
API_KEY = "6104088c65a68d7e53df5d9395b67d78e555293a"
SECRET_KEY = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01"
MEMO = "me"
# 合约
CONTRACT_SYMBOL = "ETHUSDT"
TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 浏览器
BIT_ID = "62f9107d0c674925972084e282df55b3"
# 布林带参数
BB_PERIOD = 10 # 10根5分钟K线 = 50分钟回看
BB_STD = 2.5 # 标准差倍数
# 仓位管理
LEVERAGE = 50 # 杠杆倍数
OPEN_TYPE = "cross" # 全仓模式
MARGIN_PCT = 0.01 # 首次开仓用权益的1%作为保证金
# 递增加仓 (D方案)
PYRAMID_STEP = 0.01 # 每次加仓增加1%权益比例 (1%→2%→3%→4%)
PYRAMID_MAX = 3 # 最多加仓3次 (首次开仓不算)
# 风控
MAX_DAILY_LOSS = 50.0 # 日最大亏损(U),达到后停止交易
COOLDOWN_SECONDS = 30 # 两次交易之间最小间隔(秒)
# 运行
POLL_INTERVAL = 5 # 主循环轮询间隔(秒)
KLINE_STEP = 5 # K线周期(分钟)
KLINE_HOURS = 2 # 获取最近多少小时K线(需覆盖BB_PERIOD)
# ---------------------------------------------------------------------------
# 布林带计算
# ---------------------------------------------------------------------------
def calc_bollinger(closes: list, period: int, n_std: float):
"""计算布林带,返回 (mid, upper, lower) 或 None数据不足时"""
if len(closes) < period:
return None
arr = np.array(closes[-period:], dtype=float)
mid = arr.mean()
std = arr.std(ddof=0)
upper = mid + n_std * std
lower = mid - n_std * std
return mid, upper, lower
# ---------------------------------------------------------------------------
# 交易主类
# ---------------------------------------------------------------------------
class BBTrader:
def __init__(self, cfg: BBTradeConfig = None):
self.cfg = cfg or BBTradeConfig()
self.api = APIContract(
self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO,
timeout=(5, 15)
)
# 浏览器
self.page: ChromiumPage | None = None
self.page_start = True # 需要(重新)打开浏览器
self.last_page_open_time = 0.0 # 上次打开浏览器的时间
self.PAGE_REFRESH_INTERVAL = 180 # 每3分钟关闭重开浏览器
# 持仓状态: -1=空, 0=无, 1=多
self.position = 0
self.open_avg_price = None
self.current_amount = None
# 加仓状态
self.pyramid_count = 0 # 当前已加仓次数 (0=仅首次开仓)
# 风控
self.daily_pnl = 0.0
self.daily_stopped = False
self.current_date = None
self.last_trade_time = 0.0
# 日志
self.log_dir = Path(__file__).resolve().parent
logger.add(
self.log_dir / "bb_trade_{time:YYYY-MM-DD}.log",
rotation="1 day", retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# ------------------------------------------------------------------
# API 封装
# ------------------------------------------------------------------
def get_klines(self) -> list | None:
"""获取最近N小时的5分钟K线返回 [{id, open, high, low, close}, ...]"""
try:
end_time = int(time.time())
start_time = end_time - 3600 * self.cfg.KLINE_HOURS
resp = self.api.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=self.cfg.KLINE_STEP,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp["data"]
klines = []
for k in data:
klines.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
klines.sort(key=lambda x: x["id"])
return klines
except Exception as e:
logger.error(f"获取K线异常: {e}")
return None
def get_current_price(self) -> float | None:
"""获取当前最新价格最近1分钟K线收盘价"""
try:
end_time = int(time.time())
resp = self.api.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=1,
start_time=end_time - 300,
end_time=end_time
)[0]
if resp.get("code") == 1000 and resp["data"]:
return float(resp["data"][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_balance(self) -> float | None:
"""获取合约账户可用余额"""
try:
resp = self.api.get_assets_detail()[0]
if resp.get("code") == 1000:
data = resp["data"]
if isinstance(data, dict):
return float(data.get("available_balance", 0))
elif isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return None
except Exception as e:
logger.error(f"查询余额异常: {e}")
return None
def get_position_status(self) -> bool:
"""查询当前持仓,更新 self.position / open_avg_price / current_amount"""
try:
resp = self.api.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0]
if resp.get("code") != 1000:
logger.error(f"查询持仓失败: {resp}")
return False
positions = resp["data"]
if not positions:
self.position = 0
self.open_avg_price = None
self.current_amount = None
return True
pos = positions[0]
self.position = 1 if pos["position_type"] == 1 else -1
self.open_avg_price = float(pos["open_avg_price"])
self.current_amount = float(pos["current_amount"])
unrealized = float(pos.get("unrealized_value", 0))
logger.debug(f"持仓: dir={self.position} price={self.open_avg_price} "
f"amt={self.current_amount} upnl={unrealized:.2f}")
return True
except Exception as e:
logger.error(f"查询持仓异常: {e}")
return False
def set_leverage(self) -> bool:
"""设置杠杆和全仓模式"""
try:
resp = self.api.post_submit_leverage(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
leverage=str(self.cfg.LEVERAGE),
open_type=self.cfg.OPEN_TYPE
)[0]
if resp.get("code") == 1000:
logger.success(f"杠杆设置成功: {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
return True
else:
logger.error(f"杠杆设置失败: {resp}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ------------------------------------------------------------------
# 浏览器自动化
# ------------------------------------------------------------------
def open_browser(self) -> bool:
"""打开浏览器并进入交易页面"""
try:
bit_port = openBrowser(id=self.cfg.BIT_ID)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.last_page_open_time = time.time()
return True
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def click_safe(self, xpath, sleep=0.5) -> bool:
"""安全点击元素"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.click(by_js=True)
time.sleep(sleep)
return True
except Exception as e:
logger.warning(f"点击失败 [{xpath}]: {e}")
return False
def browser_close_position(self) -> bool:
"""浏览器点击市价全平"""
logger.info("浏览器操作: 市价平仓")
return self.click_safe('x://span[normalize-space(text()) ="市价"]')
# ------------------------------------------------------------------
# 仓位操作
# ------------------------------------------------------------------
def calc_order_usdt(self, is_add: bool = False) -> float:
"""
计算开仓/加仓金额(U)
首次开仓: 余额 × MARGIN_PCT (1%)
加仓: 余额 × (MARGIN_PCT + PYRAMID_STEP × (pyramid_count+1))
例: 开仓1%, 第1次加仓2%, 第2次加仓3%, 第3次加仓4%
"""
balance = self.get_balance()
if balance is None or balance <= 0:
logger.warning(f"余额不足或查询失败: {balance}")
return 0
if is_add:
pct = self.cfg.MARGIN_PCT + self.cfg.PYRAMID_STEP * (self.pyramid_count + 1)
else:
pct = self.cfg.MARGIN_PCT
order_usdt = round(balance * pct, 2)
logger.info(f"仓位计算: 余额={balance:.2f} × {pct:.0%} = {order_usdt} U"
f" ({'加仓#' + str(self.pyramid_count+1) if is_add else '首次开仓'})")
return order_usdt
def verify_position(self, expected: int) -> bool:
"""验证持仓方向"""
if self.get_position_status():
if self.position == expected:
return True
logger.warning(f"持仓方向不符: 期望{expected}, 实际{self.position}")
return False
# ------------------------------------------------------------------
# 风控
# ------------------------------------------------------------------
def check_daily_reset(self):
"""每日重置(UTC+8 00:00 = UTC 16:00)"""
now = datetime.now(timezone.utc)
# 用UTC日期做简单日切
today = now.date()
if self.current_date != today:
if self.current_date is not None:
logger.info(f"日切: {self.current_date}{today}, 日PnL={self.daily_pnl:.2f}")
self.current_date = today
self.daily_pnl = 0.0
self.daily_stopped = False
def can_trade(self) -> bool:
"""检查是否可交易"""
if self.daily_stopped:
return False
now = time.time()
if now - self.last_trade_time < self.cfg.COOLDOWN_SECONDS:
remain = self.cfg.COOLDOWN_SECONDS - (now - self.last_trade_time)
logger.debug(f"交易冷却中,剩余 {remain:.0f}s")
return False
return True
# ------------------------------------------------------------------
# 日志
# ------------------------------------------------------------------
def write_trade_log(self, action: str, price: float, bb_upper: float,
bb_mid: float, bb_lower: float, reason: str):
"""写入交易日志文件"""
try:
date_str = datetime.now().strftime("%Y%m%d")
log_file = self.log_dir / f"bb_trade_log_{date_str}.txt"
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
block = (
f"\n{'='*60}\n"
f"时间: {time_str}\n"
f"操作: {action}\n"
f"价格: {price:.2f}\n"
f"BB上轨: {bb_upper:.2f} | 中轨: {bb_mid:.2f} | 下轨: {bb_lower:.2f}\n"
f"原因: {reason}\n"
f"{'='*60}\n"
)
with open(log_file, "a", encoding="utf-8") as f:
f.write(block)
except Exception as e:
logger.warning(f"写入日志失败: {e}")
# ------------------------------------------------------------------
# 主循环(浏览器流程与四分之一代码一致)
# ------------------------------------------------------------------
def run(self):
"""策略主循环"""
logger.info("=" * 60)
logger.info(f" BB策略启动(D方案): BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})")
logger.info(f" 合约: {self.cfg.CONTRACT_SYMBOL} | {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
logger.info(f" 首次开仓: 权益×{self.cfg.MARGIN_PCT:.0%} | 递增加仓: +{self.cfg.PYRAMID_STEP:.0%}/次 | 最多{self.cfg.PYRAMID_MAX}")
logger.info("=" * 60)
# 设置杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,退出")
return
# 初始持仓同步
if not self.get_position_status():
logger.error("初始持仓查询失败,退出")
return
logger.info(f"初始持仓状态: {self.position}")
last_kline_id = None # 避免同一根K线重复触发
page_start = True # 需要打开浏览器
while True:
# ===== 浏览器管理 =====
# page_start时: 打开浏览器 → 导航 → 点市价 → 输入张数
if page_start:
for i in range(5):
if self.open_browser():
logger.info("浏览器打开成功")
break
else:
logger.error("打开浏览器失败!")
return
self.page.get(self.cfg.TRADE_URL)
time.sleep(2)
# 点击市价模式
self.click_safe('x://button[normalize-space(text()) ="市价"]')
time.sleep(0.5)
# 计算并预输入开仓金额(U)
current_price = self.get_current_price()
if current_price:
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
logger.info(f"预输入开仓金额: {order_usdt} U")
page_start = False
try:
# 每3分钟关闭浏览器重新打开
if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL:
logger.info("浏览器已打开超过3分钟关闭刷新")
try:
self.page.close()
except Exception:
pass
self.page = None
page_start = True
time.sleep(3)
continue
self.check_daily_reset()
if self.daily_stopped:
logger.info(f"日亏损已达限制({self.daily_pnl:.2f}),等待日切")
time.sleep(60)
continue
# 1. 获取K线
klines = self.get_klines()
if not klines or len(klines) < self.cfg.BB_PERIOD:
logger.warning(f"K线数据不足({len(klines) if klines else 0}根),等待...")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_klines = klines[:-1]
current_kline = klines[-1]
if len(closed_klines) < self.cfg.BB_PERIOD:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 2. 计算布林带
closes = [k["close"] for k in closed_klines]
bb = calc_bollinger(closes, self.cfg.BB_PERIOD, self.cfg.BB_STD)
if bb is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
bb_mid, bb_upper, bb_lower = bb
# 3. 获取当前价格
current_price = self.get_current_price()
if current_price is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
cur_high = current_kline["high"]
cur_low = current_kline["low"]
# 容错: K线high/low + 当前实时价格,任一触及即算触碰
touched_upper = cur_high >= bb_upper or current_price >= bb_upper
touched_lower = cur_low <= bb_lower or current_price <= bb_lower
logger.info(
f"价格={current_price:.2f} | "
f"BB: {bb_lower:.2f} / {bb_mid:.2f} / {bb_upper:.2f} | "
f"H={cur_high:.2f} L={cur_low:.2f} | "
f"触上={touched_upper} 触下={touched_lower} | "
f"仓位={self.position}"
)
# 4. 同步持仓状态
if not self.get_position_status():
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 5. 信号判断
kline_id = current_kline["id"]
if kline_id == last_kline_id:
time.sleep(self.cfg.POLL_INTERVAL)
continue
if touched_upper and touched_lower:
logger.warning("同时触及上下轨,跳过")
time.sleep(self.cfg.POLL_INTERVAL)
continue
action = None
reason = ""
success = False
# ===== 触及上轨 → 开空 / 翻转为空 / 加仓空 =====
if touched_upper:
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
reason = (f"价格最高{cur_high:.2f}触及上轨{bb_upper:.2f}"
f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})")
if self.position == 1:
action = "翻转: 平多→开空"
# 在当前页面点市价平仓
self.browser_close_position()
time.sleep(1)
# 等待确认平仓
for _ in range(10):
if self.get_position_status() and self.position == 0:
break
time.sleep(1)
if self.position != 0:
logger.warning(f"平仓后仍有持仓({self.position}),放弃开空")
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 翻转时重置加仓计数
self.pyramid_count = 0
# 平仓后在同一页面直接点卖出/做空
logger.info("平仓完成,直接开空")
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
if self.verify_position(-1):
success = True
elif self.position == 0:
action = "开空"
self.pyramid_count = 0
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
if self.verify_position(-1):
success = True
elif self.position == -1 and self.pyramid_count < self.cfg.PYRAMID_MAX:
# 已持空仓 + 再次触上轨 → 加仓做空
action = f"加仓空#{self.pyramid_count+1}"
reason += f" (加仓#{self.pyramid_count+1}/{self.cfg.PYRAMID_MAX})"
# 重新计算加仓金额并输入
add_usdt = self.calc_order_usdt(is_add=True)
if add_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=add_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
if self.verify_position(-1):
self.pyramid_count += 1
success = True
else:
logger.info(f"已持空仓,加仓已达上限({self.pyramid_count}/{self.cfg.PYRAMID_MAX})")
# ===== 触及下轨 → 开多 / 翻转为多 / 加仓多 =====
elif touched_lower:
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
reason = (f"价格最低{cur_low:.2f}触及下轨{bb_lower:.2f}"
f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})")
if self.position == -1:
action = "翻转: 平空→开多"
self.browser_close_position()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.position == 0:
break
time.sleep(1)
if self.position != 0:
logger.warning(f"平仓后仍有持仓({self.position}),放弃开多")
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 翻转时重置加仓计数
self.pyramid_count = 0
logger.info("平仓完成,直接开多")
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
if self.verify_position(1):
success = True
elif self.position == 0:
action = "开多"
self.pyramid_count = 0
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
if self.verify_position(1):
success = True
elif self.position == 1 and self.pyramid_count < self.cfg.PYRAMID_MAX:
# 已持多仓 + 再次触下轨 → 加仓做多
action = f"加仓多#{self.pyramid_count+1}"
reason += f" (加仓#{self.pyramid_count+1}/{self.cfg.PYRAMID_MAX})"
# 重新计算加仓金额并输入
add_usdt = self.calc_order_usdt(is_add=True)
if add_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=add_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
if self.verify_position(1):
self.pyramid_count += 1
success = True
else:
logger.info(f"已持多仓,加仓已达上限({self.pyramid_count}/{self.cfg.PYRAMID_MAX})")
# ===== 交易成功后处理 =====
if success and action:
last_kline_id = kline_id
self.last_trade_time = time.time()
self.write_trade_log(action, current_price,
bb_upper, bb_mid, bb_lower, reason)
logger.success(f"{action} 执行成功")
# 交易完成后关闭浏览器,下轮重新打开
page_start = True
try:
self.page.close()
except Exception:
pass
self.page = None
time.sleep(5)
time.sleep(self.cfg.POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
page_start = True
time.sleep(10)
# ---------------------------------------------------------------------------
# 入口
# ---------------------------------------------------------------------------
if __name__ == "__main__":
trader = BBTrader()
trader.run()