Files
codex_jxs_code/bb_delay_reversal_trade.py
2026-03-05 12:51:12 +08:00

602 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带延迟反转策略 - 实盘交易
基于回测策略 bb_backtest_march_2026.py
使用API查询 + 浏览器自动化交易
"""
import time
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
from loguru import logger
from bitmart.api_contract import APIContract
from bit_tools import openBrowser
from DrissionPage import ChromiumPage, ChromiumOptions
class BBDelayReversalConfig:
"""策略配置"""
# API凭证
API_KEY = "6104088c65a68d7e53df5d9395b67d78e555293a"
SECRET_KEY = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01"
MEMO = "me"
# 合约
CONTRACT_SYMBOL = "ETHUSDT"
TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 浏览器ID
BIT_ID = "62f9107d0c674925972084e282df55b3"
# 布林带参数
BB_PERIOD = 10
BB_STD = 2.5
# 仓位管理
LEVERAGE = 50
OPEN_TYPE = "isolated"
MARGIN_PCT = 0.01 # 首次开仓1%
# 运行参数
POLL_INTERVAL = 5
KLINE_STEP = 5
KLINE_HOURS = 2
class BBDelayReversalTrader:
"""布林带延迟反转交易器"""
def __init__(self, cfg: BBDelayReversalConfig = None):
self.cfg = cfg or BBDelayReversalConfig()
self.api = APIContract(
self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO,
timeout=(5, 15)
)
# 浏览器
self.page: ChromiumPage | None = None
self.page_start = True
self.last_page_open_time = 0.0
self.PAGE_REFRESH_INTERVAL = 1800
# 持仓状态
self.position = 0 # -1空, 0无, 1多
self.position_count = 0 # 0空仓, 1首次, 2加仓
self.entry_price = 0
self.current_amount = 0
self.total_margin = 0
# 延迟反转状态
self.delay_reverse_price = None
self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long'
self.delay_reverse_kline_id = None
# 中轨平仓
self.mid_closed_half = False
# 交易控制
self.last_trade_time = 0.0
self.last_kline_id = None
# 日志
self.log_dir = Path(__file__).resolve().parent
logger.add(
self.log_dir / "bb_delay_trade_{time:YYYY-MM-DD}.log",
rotation="1 day", retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# ========== API查询方法 ==========
def get_klines(self) -> list | None:
"""获取5分钟K线"""
try:
end_time = int(time.time())
start_time = end_time - 3600 * self.cfg.KLINE_HOURS
resp = self.api.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=self.cfg.KLINE_STEP,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp["data"]
klines = []
for k in data:
klines.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
klines.sort(key=lambda x: x["id"])
return klines
except Exception as e:
logger.error(f"获取K线异常: {e}")
return None
def get_current_price(self) -> float | None:
"""获取当前价格"""
try:
end_time = int(time.time())
resp = self.api.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=1,
start_time=end_time - 300,
end_time=end_time
)[0]
if resp.get("code") == 1000 and resp["data"]:
return float(resp["data"][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_balance(self) -> float | None:
"""获取可用余额"""
try:
resp = self.api.get_assets_detail()[0]
if resp.get("code") == 1000:
data = resp["data"]
if isinstance(data, dict):
return float(data.get("available_balance", 0))
elif isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return None
except Exception as e:
logger.error(f"查询余额异常: {e}")
return None
def get_position_status(self) -> bool:
"""查询持仓状态"""
try:
resp = self.api.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0]
if resp.get("code") != 1000:
logger.error(f"查询持仓失败: {resp}")
return False
positions = resp["data"]
if not positions:
self.position = 0
self.position_count = 0
self.entry_price = 0
self.current_amount = 0
return True
pos = positions[0]
self.position = 1 if pos["position_type"] == 1 else -1
self.entry_price = float(pos["open_avg_price"])
self.current_amount = float(pos["current_amount"])
logger.debug(f"持仓: {'' if self.position > 0 else ''} | "
f"价格={self.entry_price:.2f} | 数量={self.current_amount:.4f}")
return True
except Exception as e:
logger.error(f"查询持仓异常: {e}")
return False
def set_leverage(self) -> bool:
"""设置杠杆"""
try:
resp = self.api.post_submit_leverage(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
leverage=str(self.cfg.LEVERAGE),
open_type=self.cfg.OPEN_TYPE
)[0]
if resp.get("code") == 1000:
logger.success(f"杠杆设置成功: {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
return True
else:
logger.error(f"杠杆设置失败: {resp}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========== 布林带计算 ==========
def calc_bollinger(self, closes: list):
"""计算布林带"""
if len(closes) < self.cfg.BB_PERIOD:
return None
arr = np.array(closes[-self.cfg.BB_PERIOD:], dtype=float)
mid = arr.mean()
std = arr.std(ddof=0)
upper = mid + self.cfg.BB_STD * std
lower = mid - self.cfg.BB_STD * std
return mid, upper, lower
# ========== 浏览器自动化 ==========
def open_browser(self) -> bool:
"""打开浏览器"""
try:
bit_port = openBrowser(id=self.cfg.BIT_ID)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.last_page_open_time = time.time()
return True
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def click_safe(self, xpath, sleep=0.5) -> bool:
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.click(by_js=True)
time.sleep(sleep)
return True
except Exception as e:
logger.warning(f"点击失败: {e}")
return False
def browser_open_position(self, direction: str, usdt_amount: float) -> bool:
"""浏览器开仓"""
try:
logger.info(f"浏览器操作: 开{'' if direction == 'long' else ''} {usdt_amount}U")
# 点击市价
self.click_safe('x://button[normalize-space(text()) ="市价"]')
# 输入金额
self.page.ele('x://*[@id="size_0"]').input(vals=usdt_amount, clear=True)
time.sleep(0.5)
# 点击开仓按钮
if direction == 'long':
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
else:
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(2)
return True
except Exception as e:
logger.error(f"浏览器开仓失败: {e}")
return False
def browser_close_position(self, ratio: float = 1.0) -> bool:
"""浏览器平仓"""
try:
logger.info(f"浏览器操作: 平仓{int(ratio*100)}%")
if ratio >= 0.99:
# 全平
self.click_safe('x://span[normalize-space(text()) ="市价"]')
else:
# 平半
self.click_safe('x://button[normalize-space(text()) ="平仓"]')
if ratio == 0.5:
self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]')
if self.position > 0:
self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]')
else:
self.click_safe('x://span[normalize-space(text()) ="买入/平空"]')
time.sleep(2)
return True
except Exception as e:
logger.error(f"浏览器平仓失败: {e}")
return False
# ========== 延迟反转逻辑 ==========
def mark_delay_reversal(self, reverse_type: str, trigger_price: float, kline_id: int):
"""标记延迟反转"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_id = kline_id
logger.info(f"触发延迟反转: {reverse_type} @ {trigger_price:.2f}")
def clear_delay_reversal(self):
"""清除延迟反转状态"""
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_id = None
def check_delay_reversal(self, current_kline, prev_kline, kline_index) -> tuple | None:
"""检查延迟反转确认"""
if self.position == 0 or self.delay_reverse_price is None:
return None
if self.delay_reverse_kline_id is None:
return None
offset = kline_index - self.delay_reverse_kline_id
if offset <= 0:
return None
high = current_kline['high']
low = current_kline['low']
if self.delay_reverse_type == 'long_to_short':
# 多转空: 回调到记录价格
if offset == 1 and low <= self.delay_reverse_price:
return 'short', self.delay_reverse_price, "次K回调确认"
if offset >= 2 and prev_kline:
prev_body_low = min(prev_kline['open'], prev_kline['close'])
if low <= prev_body_low:
return 'short', prev_body_low, "跌破上一根实体"
elif self.delay_reverse_type == 'short_to_long':
# 空转多: 反弹到记录价格
if offset == 1 and high >= self.delay_reverse_price:
return 'long', self.delay_reverse_price, "次K反弹确认"
if offset >= 2 and prev_kline:
prev_body_high = max(prev_kline['open'], prev_kline['close'])
if high >= prev_body_high:
return 'long', prev_body_high, "突破上一根实体"
return None
# ========== 主循环 ==========
def run(self):
"""策略主循环"""
logger.info("=" * 60)
logger.info(f"布林带延迟反转策略启动")
logger.info(f"BB({self.cfg.BB_PERIOD}, {self.cfg.BB_STD}) | "
f"{self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
logger.info("=" * 60)
# 设置杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,退出")
return
# 初始持仓
if not self.get_position_status():
logger.error("初始持仓查询失败,退出")
return
logger.info(f"初始持仓: {self.position}")
page_start = True
kline_history = []
while True:
try:
# ===== 浏览器管理 =====
if page_start:
for i in range(5):
if self.open_browser():
logger.info("浏览器打开成功")
break
else:
logger.error("打开浏览器失败")
return
self.page.get(self.cfg.TRADE_URL)
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
page_start = False
# 定期刷新浏览器
if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL:
logger.info("浏览器刷新")
try:
self.page.close()
except:
pass
page_start = True
time.sleep(3)
continue
# ===== 获取K线 =====
klines = self.get_klines()
if not klines or len(klines) < self.cfg.BB_PERIOD + 1:
logger.warning(f"K线数据不足: {len(klines) if klines else 0}")
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 使用已收盘K线计算BB
closed_klines = klines[:-1]
current_kline = klines[-1]
if len(closed_klines) < self.cfg.BB_PERIOD:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 计算布林带
closes = [k['close'] for k in closed_klines]
bb = self.calc_bollinger(closes)
if bb is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
bb_mid, bb_upper, bb_lower = bb
# 获取当前价格
current_price = self.get_current_price()
if current_price is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
cur_high = current_kline['high']
cur_low = current_kline['low']
kline_id = current_kline['id']
# 触轨判断
touched_upper = cur_high >= bb_upper
touched_lower = cur_low <= bb_lower
touched_middle = cur_low <= bb_mid <= cur_high
logger.info(
f"价格={current_price:.2f} | "
f"BB: {bb_lower:.2f}/{bb_mid:.2f}/{bb_upper:.2f} | "
f"触上={touched_upper} 触下={touched_lower} 触中={touched_middle} | "
f"仓位={self.position}"
)
# 同步持仓
if not self.get_position_status():
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 避免同一K线重复触发
if kline_id == self.last_kline_id:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# ===== 延迟反转确认 =====
if self.delay_reverse_price is not None and len(kline_history) > 0:
prev_kline = kline_history[-1] if len(kline_history) > 0 else None
reversal = self.check_delay_reversal(
current_kline, prev_kline, len(kline_history)
)
if reversal:
new_direction, reversal_price, reason = reversal
logger.info(f"延迟反转确认: {reason} @ {reversal_price:.2f}")
# 平仓
self.browser_close_position(1.0)
time.sleep(2)
self.get_position_status()
if self.position == 0:
# 反向开仓
balance = self.get_balance()
if balance:
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
self.browser_open_position(new_direction, usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position != 0:
self.position_count = 1
self.mid_closed_half = False
self.clear_delay_reversal()
self.last_kline_id = kline_id
continue
# ===== 中轨平仓 =====
if self.position != 0 and touched_middle:
if not self.mid_closed_half:
logger.info("触中轨平50%")
self.browser_close_position(0.5)
time.sleep(2)
self.mid_closed_half = True
self.last_kline_id = kline_id
continue
elif self.mid_closed_half:
# 回到开仓价全平+反手
if (self.position > 0 and cur_low <= self.entry_price) or \
(self.position < 0 and cur_high >= self.entry_price):
logger.info("回到开仓价,全平+反手")
old_direction = 'long' if self.position > 0 else 'short'
new_direction = 'short' if old_direction == 'long' else 'long'
self.browser_close_position(1.0)
time.sleep(2)
self.get_position_status()
if self.position == 0:
balance = self.get_balance()
if balance:
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
self.browser_open_position(new_direction, usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position != 0:
self.position_count = 1
self.mid_closed_half = False
self.last_kline_id = kline_id
continue
# ===== 开仓与加仓 =====
if self.position == 0:
self.clear_delay_reversal()
balance = self.get_balance()
if not balance:
time.sleep(self.cfg.POLL_INTERVAL)
continue
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
if touched_upper:
logger.info(f"空仓触上轨,开空 {usdt_amount}U")
self.browser_open_position('short', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == -1:
self.position_count = 1
self.last_kline_id = kline_id
elif touched_lower:
logger.info(f"空仓触下轨,开多 {usdt_amount}U")
self.browser_open_position('long', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == 1:
self.position_count = 1
self.last_kline_id = kline_id
# ===== 延迟反转触发 =====
elif self.position > 0 and touched_upper:
logger.info("多仓触上轨,标记延迟反转")
self.mark_delay_reversal('long_to_short', bb_upper, len(kline_history))
self.last_kline_id = kline_id
elif self.position < 0 and touched_lower:
logger.info("空仓触下轨,标记延迟反转")
self.mark_delay_reversal('short_to_long', bb_lower, len(kline_history))
self.last_kline_id = kline_id
# ===== 加仓 =====
elif self.position_count == 1 and self.delay_reverse_price is None:
balance = self.get_balance()
if balance:
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
if self.position > 0 and touched_lower:
logger.info(f"多仓触下轨,加仓 {usdt_amount}U")
self.browser_open_position('long', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == 1:
self.position_count = 2
self.last_kline_id = kline_id
elif self.position < 0 and touched_upper:
logger.info(f"空仓触上轨,加仓 {usdt_amount}U")
self.browser_open_position('short', usdt_amount)
time.sleep(2)
self.get_position_status()
if self.position == -1:
self.position_count = 2
self.last_kline_id = kline_id
# 更新K线历史
kline_history.append(current_kline)
if len(kline_history) > 100:
kline_history = kline_history[-100:]
time.sleep(self.cfg.POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("用户中断")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
page_start = True
time.sleep(10)
if __name__ == "__main__":
trader = BBDelayReversalTrader()
trader.run()