Files
codex_jxs_code/bb_delay_reversal_trade.py
2026-03-06 10:36:18 +08:00

992 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
布林带延迟反转策略 - 实盘交易
基于回测策略 bb_backtest_march_2026.py
使用框架的API查询 + 浏览器自动化交易
"""
import json
import time
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage, ChromiumOptions
from bitmart.api_contract import APIContract
class BBDelayReversalConfig:
"""策略配置"""
# 浏览器ID从框架获取
BIT_ID = "f2320f57e24c45529a009e1541e25961"
# API凭证从框架获取
API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
MEMO = "合约交易"
# 合约
CONTRACT_SYMBOL = "ETHUSDT"
TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 布林带参数
BB_PERIOD = 10
BB_STD = 2.5
BB_DDOF = 1 # 与回测pandas rolling.std()默认一致
BB_USE_FORMING_CANDLE = False # True更贴近网页实时指标False更稳健
# 仓位管理(从框架获取)
LEVERAGE = "50"
OPEN_TYPE = "isolated" # 逐仓模式
MARGIN_PCT = 0.01 # 首次开仓1%
STOP_LOSS_RATIO = 0.5 # 浮亏达到总保证金50%止损
# 运行参数
POLL_INTERVAL = 5
KLINE_STEP = 5
KLINE_HOURS = 2
class BBDelayReversalTrader:
"""布林带延迟反转交易器"""
def __init__(self, bit_id: str = None):
self.cfg = BBDelayReversalConfig()
if bit_id:
self.cfg.BIT_ID = bit_id
# API使用框架配置
self.contractAPI = APIContract(
self.cfg.API_KEY,
self.cfg.SECRET_KEY,
self.cfg.MEMO,
timeout=(5, 15)
)
# 浏览器
self.page: ChromiumPage | None = None
self.page_start = True
self.last_page_open_time = 0.0
self.PAGE_REFRESH_INTERVAL = 1800
# 持仓状态
self.position = 0 # -1空, 0无, 1多
self.position_count = 0 # 0空仓, 1首次, 2加仓
self.entry_price = 0
self.current_amount = 0
self.total_margin = 0
# 延迟反转状态
self.delay_reverse_price = None
self.delay_reverse_type = None # 'long_to_short' 或 'short_to_long'
self.delay_reverse_kline_id = None
# 中轨平仓
self.mid_closed_half = False
# 交易控制
self.last_trade_time = 0.0
self.last_kline_id = None
self.last_closed_kline_id = None
self.cooldown_seconds = 10 # 交易冷却时间
# 日志
self.log_dir = Path(__file__).resolve().parent
self.state_file = self.log_dir / "bb_delay_reversal_state.json"
logger.add(
self.log_dir / "bb_delay_trade_{time:YYYY-MM-DD}.log",
rotation="1 day", retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
self.load_state()
def load_state(self):
"""加载本地策略状态,便于重启后延续运行。"""
if not self.state_file.exists():
return
try:
data = json.loads(self.state_file.read_text(encoding="utf-8"))
except Exception as e:
logger.warning(f"读取本地状态失败,忽略旧状态: {e}")
return
self.position_count = int(data.get("position_count", self.position_count))
self.total_margin = float(data.get("total_margin", self.total_margin))
self.mid_closed_half = bool(data.get("mid_closed_half", self.mid_closed_half))
self.delay_reverse_price = data.get("delay_reverse_price")
self.delay_reverse_type = data.get("delay_reverse_type")
self.delay_reverse_kline_id = data.get("delay_reverse_kline_id")
self.last_kline_id = data.get("last_kline_id")
self.last_closed_kline_id = data.get("last_closed_kline_id")
if self.delay_reverse_price is not None:
self.delay_reverse_price = float(self.delay_reverse_price)
if self.delay_reverse_kline_id is not None:
self.delay_reverse_kline_id = int(self.delay_reverse_kline_id)
if self.last_kline_id is not None:
self.last_kline_id = int(self.last_kline_id)
if self.last_closed_kline_id is not None:
self.last_closed_kline_id = int(self.last_closed_kline_id)
logger.info("已加载本地策略状态")
def save_state(self):
"""保存关键状态,降低重启造成的状态丢失。"""
data = {
"position_count": self.position_count,
"total_margin": self.total_margin,
"mid_closed_half": self.mid_closed_half,
"delay_reverse_price": self.delay_reverse_price,
"delay_reverse_type": self.delay_reverse_type,
"delay_reverse_kline_id": self.delay_reverse_kline_id,
"last_kline_id": self.last_kline_id,
"last_closed_kline_id": self.last_closed_kline_id,
}
try:
self.state_file.write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception as e:
logger.warning(f"写入本地状态失败: {e}")
# ========== API查询方法 ==========
def get_klines(self) -> list | None:
"""获取5分钟K线使用框架方法"""
try:
end_time = int(time.time())
start_time = end_time - 3600 * self.cfg.KLINE_HOURS
response = self.contractAPI.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=self.cfg.KLINE_STEP,
start_time=start_time,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
logger.error(f"获取K线异常: {e}")
return None
def get_current_price(self) -> float | None:
"""获取当前价格(使用框架方法)"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=1,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_balance(self) -> float | None:
"""获取可用余额(使用框架方法)"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self) -> bool:
"""查询持仓状态(使用框架方法)"""
try:
old_state = (
self.position,
self.position_count,
self.entry_price,
self.current_amount,
self.total_margin,
self.mid_closed_half,
self.delay_reverse_price,
self.delay_reverse_type,
self.delay_reverse_kline_id,
)
old_position = self.position
response = self.contractAPI.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.position = 0
self.position_count = 0
self.entry_price = 0
self.current_amount = 0
self.total_margin = 0
self.mid_closed_half = False
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_id = None
if (
self.position,
self.position_count,
self.entry_price,
self.current_amount,
self.total_margin,
self.mid_closed_half,
self.delay_reverse_price,
self.delay_reverse_type,
self.delay_reverse_kline_id,
) != old_state:
self.save_state()
return True
pos = positions[0]
self.position = 1 if pos['position_type'] == 1 else -1
self.entry_price = float(pos['open_avg_price'])
self.current_amount = float(pos['current_amount'])
if old_position not in (0, self.position):
logger.warning("检测到持仓方向变化,重置本地策略状态")
self.position_count = 1
self.mid_closed_half = False
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_id = None
elif self.position_count == 0:
self.position_count = 1
if self.total_margin <= 0 and self.current_amount > 0:
leverage = float(self.cfg.LEVERAGE)
self.total_margin = self.current_amount * self.entry_price / leverage
if (
self.position,
self.position_count,
self.entry_price,
self.current_amount,
self.total_margin,
self.mid_closed_half,
self.delay_reverse_price,
self.delay_reverse_type,
self.delay_reverse_kline_id,
) != old_state:
self.save_state()
logger.debug(f"持仓: {'' if self.position > 0 else ''} | "
f"价格={self.entry_price:.2f} | 数量={self.current_amount:.4f}")
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self) -> bool:
"""设置杠杆(使用框架方法)"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
leverage=self.cfg.LEVERAGE,
open_type=self.cfg.OPEN_TYPE
)[0]
if response['code'] == 1000:
logger.success(f"{self.cfg.OPEN_TYPE}模式 + {self.cfg.LEVERAGE}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========== 布林带计算 ==========
def calc_bollinger(self, closes: list):
"""计算布林带"""
if len(closes) < self.cfg.BB_PERIOD:
return None
arr = np.array(closes[-self.cfg.BB_PERIOD:], dtype=float)
mid = arr.mean()
std = arr.std(ddof=self.cfg.BB_DDOF)
upper = mid + self.cfg.BB_STD * std
lower = mid - self.cfg.BB_STD * std
return mid, upper, lower
def calc_shifted_bollinger_for_index(self, closed_klines: list, target_index: int):
"""计算某根已收盘K线对应的右移一根布林带。"""
if target_index < self.cfg.BB_PERIOD:
return None
closes = [
k['close']
for k in closed_klines[target_index - self.cfg.BB_PERIOD:target_index]
]
return self.calc_bollinger(closes)
# ========== 浏览器自动化 ==========
def open_browser(self) -> bool:
"""打开浏览器(使用框架方法)"""
try:
bit_port = openBrowser(id=self.cfg.BIT_ID)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.last_page_open_time = time.time()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5) -> bool:
"""安全点击(使用框架方法)"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click()
return True
except:
return False
def browser_open_position(self, direction: str, usdt_amount: float) -> bool:
"""浏览器开仓(使用框架的开单方法)"""
try:
logger.info(f"浏览器操作: 开{'' if direction == 'long' else ''} {usdt_amount}U")
# 1. 先点击"开仓"按钮
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
# 2. 点击市价
self.click_safe('x://button[normalize-space(text()) ="市价"]')
time.sleep(0.5)
# 3. 输入金额
self.page.ele('x://*[@id="size_0"]').input(vals=usdt_amount, clear=True)
time.sleep(0.5)
# 4. 点击开仓按钮
if direction == 'long':
# 市价做多
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
else:
# 市价做空
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(2)
return True
except Exception as e:
logger.error(f"浏览器开仓失败: {e}")
return False
def browser_close_position(self, ratio: float = 1.0) -> bool:
"""浏览器平仓(使用框架方法)"""
try:
logger.info(f"浏览器操作: 平仓{int(ratio*100)}%")
# 1. 先点击"平仓"按钮
self.click_safe('x://button[normalize-space(text()) ="平仓"]')
time.sleep(0.5)
if ratio >= 0.99:
# 2. 全平:点击市价
self.click_safe('x://button[normalize-space(text()) ="市价"]')
time.sleep(0.5)
# 3. 点击平仓按钮
if self.position > 0:
# 平多仓
self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]')
else:
# 平空仓
self.click_safe('x://span[normalize-space(text()) ="买入/平空"]')
else:
# 2. 平半点击50%按钮
self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]')
time.sleep(0.5)
# 3. 点击平仓按钮
if self.position > 0:
# 平一半多仓
self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]')
else:
# 平一半空仓
self.click_safe('x://span[normalize-space(text()) ="买入/平空"]')
time.sleep(2)
return True
except Exception as e:
logger.error(f"浏览器平仓失败: {e}")
return False
def can_trade(self) -> bool:
"""检查是否可以交易(冷却时间)"""
now = time.time()
if now - self.last_trade_time < self.cooldown_seconds:
remain = self.cooldown_seconds - (now - self.last_trade_time)
logger.debug(f"交易冷却中,剩余 {remain:.0f}s")
return False
return True
def update_trade_time(self):
"""更新最后交易时间"""
self.last_trade_time = time.time()
def close_all_and_confirm(self, timeout_seconds: int = 15, poll_seconds: float = 1.0) -> bool:
"""执行全平仓并确认仓位归零"""
if self.position == 0:
return True
if not self.browser_close_position(1.0):
logger.error("全平仓指令发送失败")
return False
deadline = time.time() + timeout_seconds
while time.time() < deadline:
time.sleep(poll_seconds)
if not self.get_position_status():
continue
if self.position == 0:
logger.success("✓ 全平仓确认完成")
return True
logger.error(f"全平仓确认超时,当前仓位={self.position}")
return False
def open_with_balance_and_confirm(self, direction: str, previous_amount: float = 0.0) -> bool:
"""按余额比例开仓/加仓,并确认方向与数量变化正确。"""
balance = self.get_balance()
if not balance:
logger.error("余额获取失败,无法开仓")
return False
usdt_amount = round(balance * self.cfg.MARGIN_PCT, 2)
if usdt_amount <= 0:
logger.error(f"开仓金额无效: {usdt_amount}")
return False
if not self.browser_open_position(direction, usdt_amount):
logger.error("开仓指令发送失败")
return False
time.sleep(3)
if not self.get_position_status():
logger.error("开仓后持仓查询失败")
return False
expected_pos = 1 if direction == 'long' else -1
if self.position != expected_pos:
logger.error(f"开仓结果不一致: 期望={expected_pos}, 实际={self.position}")
return False
if previous_amount > 0:
min_increase = max(previous_amount * 0.05, 1e-8)
if self.current_amount <= previous_amount + min_increase:
logger.error(
f"开仓后数量未明显增加: 之前={previous_amount:.4f}, 当前={self.current_amount:.4f}"
)
return False
self.total_margin += usdt_amount
else:
self.total_margin = usdt_amount
self.save_state()
logger.success(f"✓ 开{'' if direction == 'long' else ''}成功")
return True
def close_partial_and_confirm(
self,
ratio: float,
previous_amount: float,
timeout_seconds: int = 15,
poll_seconds: float = 1.0,
) -> bool:
"""执行部分平仓,并确认仓位数量明显下降。"""
if self.position == 0 or previous_amount <= 0:
return False
expected_pos = self.position
if not self.browser_close_position(ratio):
logger.error("部分平仓指令发送失败")
return False
deadline = time.time() + timeout_seconds
while time.time() < deadline:
time.sleep(poll_seconds)
if not self.get_position_status():
continue
if self.position != expected_pos:
continue
if self.current_amount <= previous_amount * 0.75:
self.total_margin *= (1 - ratio)
self.save_state()
logger.success(
f"✓ 平仓{int(ratio*100)}%确认完成 | 之前={previous_amount:.4f}, 当前={self.current_amount:.4f}"
)
return True
logger.error(
f"平仓{int(ratio*100)}%确认超时,数量未明显下降 | 之前={previous_amount:.4f}, 当前={self.current_amount:.4f}"
)
return False
# ========== 延迟反转逻辑 ==========
def mark_delay_reversal(self, reverse_type: str, trigger_price: float, kline_id: int):
"""标记延迟反转"""
self.delay_reverse_type = reverse_type
self.delay_reverse_price = trigger_price
self.delay_reverse_kline_id = kline_id
self.save_state()
logger.warning(f"⚠️ 延迟反转触发: {reverse_type} @ {trigger_price:.2f} | K线ID: {kline_id}")
def clear_delay_reversal(self):
"""清除延迟反转状态"""
had_state = (
self.delay_reverse_price is not None
or self.delay_reverse_type is not None
or self.delay_reverse_kline_id is not None
)
self.delay_reverse_price = None
self.delay_reverse_type = None
self.delay_reverse_kline_id = None
if had_state:
self.save_state()
def check_stop_loss(self, high: float, low: float) -> bool:
"""检查是否达到总保证金50%的止损阈值。"""
if self.position == 0 or self.current_amount <= 0 or self.total_margin <= 0:
return False
stop_price = low if self.position > 0 else high
if self.position > 0:
unrealized_pnl = self.current_amount * (stop_price - self.entry_price)
else:
unrealized_pnl = self.current_amount * (self.entry_price - stop_price)
return unrealized_pnl <= -self.total_margin * self.cfg.STOP_LOSS_RATIO
def check_delay_reversal(self, current_kline, prev_kline, prev_upper, prev_lower) -> tuple | None:
"""
检查延迟反转确认
根据回测代码的逻辑:
1. 次K确认触轨后的下一根K线回调/反弹到记录价格
2. 持续追踪追踪上一根K线的触轨情况或实体
"""
if self.position == 0 or self.delay_reverse_price is None:
return None
if self.delay_reverse_kline_id is None:
return None
current_kline_id = current_kline['id']
trigger_kline_id = self.delay_reverse_kline_id
# 计算K线偏移5分钟 = 300000ms
offset = (current_kline_id - trigger_kline_id) // 300000
if offset <= 0:
return None
high = current_kline['high']
low = current_kline['low']
logger.debug(f"延迟反转检查: offset={offset} | 当前K线 H={high:.2f} L={low:.2f}")
if self.delay_reverse_type == 'long_to_short':
# 多转空: 需要价格回调
if offset == 1:
# 次K确认回调到记录的上轨价格
if low <= self.delay_reverse_price:
logger.success(f"✓ 延迟反转确认(次K): 回调到 {self.delay_reverse_price:.2f}")
return 'short', self.delay_reverse_price, "次K回调确认"
elif offset >= 2 and prev_kline and prev_upper:
# 持续追踪上一根K线
prev_high = prev_kline['high']
prev_touch_upper = prev_high >= prev_upper
if prev_touch_upper:
# 上一根触上轨当前K回调到上一根上轨价
if low <= prev_upper:
logger.success(f"✓ 延迟反转确认(追踪): 上一根触上轨后回调到 {prev_upper:.2f}")
return 'short', prev_upper, "上一根触上轨后回调确认"
else:
# 上一根未触轨,跌破上一根实体
prev_body_low = min(prev_kline['open'], prev_kline['close'])
if low <= prev_body_low:
logger.success(f"✓ 延迟反转确认(追踪): 跌破上一根实体 {prev_body_low:.2f}")
return 'short', prev_body_low, "跌破上一根实体确认"
elif self.delay_reverse_type == 'short_to_long':
# 空转多: 需要价格反弹
if offset == 1:
# 次K确认反弹到记录的下轨价格
if high >= self.delay_reverse_price:
logger.success(f"✓ 延迟反转确认(次K): 反弹到 {self.delay_reverse_price:.2f}")
return 'long', self.delay_reverse_price, "次K反弹确认"
elif offset >= 2 and prev_kline and prev_lower:
# 持续追踪上一根K线
prev_low = prev_kline['low']
prev_touch_lower = prev_low <= prev_lower
if prev_touch_lower:
# 上一根触下轨当前K反弹到上一根下轨价
if high >= prev_lower:
logger.success(f"✓ 延迟反转确认(追踪): 上一根触下轨后反弹到 {prev_lower:.2f}")
return 'long', prev_lower, "上一根触下轨后反弹确认"
else:
# 上一根未触轨,突破上一根实体
prev_body_high = max(prev_kline['open'], prev_kline['close'])
if high >= prev_body_high:
logger.success(f"✓ 延迟反转确认(追踪): 突破上一根实体 {prev_body_high:.2f}")
return 'long', prev_body_high, "突破上一根实体确认"
return None
# ========== 主循环 ==========
def run(self):
"""策略主循环"""
logger.info("=" * 60)
logger.info(f"布林带延迟反转策略启动")
logger.info(f"BB({self.cfg.BB_PERIOD}, {self.cfg.BB_STD}) | "
f"{self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
logger.info(
f"BB细节: ddof={self.cfg.BB_DDOF} | "
f"forming_candle={self.cfg.BB_USE_FORMING_CANDLE}"
)
logger.info("=" * 60)
# 设置杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,退出")
return
# 初始持仓
if not self.get_position_status():
logger.error("初始持仓查询失败,退出")
return
logger.info(f"初始持仓: {self.position}")
page_start = True
while True:
try:
# ===== 浏览器管理 =====
if page_start:
for i in range(5):
if self.open_browser():
logger.info("浏览器打开成功")
break
else:
logger.error("打开浏览器失败")
return
self.page.get(self.cfg.TRADE_URL)
time.sleep(2)
# 默认点击开仓按钮,准备开仓
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
page_start = False
# 定期刷新浏览器
if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL:
logger.info("浏览器刷新")
try:
self.page.close()
except:
pass
page_start = True
time.sleep(3)
continue
# ===== 获取K线 =====
klines = self.get_klines()
if not klines or len(klines) < self.cfg.BB_PERIOD + 1:
logger.warning(f"K线数据不足: {len(klines) if klines else 0}")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_klines = klines[:-1]
current_kline = klines[-1]
if len(closed_klines) < self.cfg.BB_PERIOD:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 计算布林带
bb_source = klines if self.cfg.BB_USE_FORMING_CANDLE else closed_klines
closes = [k['close'] for k in bb_source]
bb = self.calc_bollinger(closes)
if bb is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
bb_mid, bb_upper, bb_lower = bb
signal_kline = closed_klines[-1]
signal_bb = self.calc_shifted_bollinger_for_index(
closed_klines,
len(closed_klines) - 1,
)
current_price = float(current_kline['close'])
cur_high = current_kline['high']
cur_low = current_kline['low']
kline_id = current_kline['id']
touched_upper = cur_high >= bb_upper
touched_lower = cur_low <= bb_lower
touched_middle = cur_low <= bb_mid <= cur_high
delay_status = ""
if self.delay_reverse_price is not None:
delay_status = f" | 🔄延迟反转中: {self.delay_reverse_type} @ {self.delay_reverse_price:.2f}"
signal_status = ""
if signal_bb is not None:
signal_mid, signal_upper, signal_lower = signal_bb
signal_status = (
f" | 收盘K H/L={signal_kline['high']:.2f}/{signal_kline['low']:.2f}"
f" BB={signal_lower:.2f}/{signal_mid:.2f}/{signal_upper:.2f}"
)
logger.info(
f"价格={current_price:.2f} | "
f"BB: {bb_lower:.2f}/{bb_mid:.2f}/{bb_upper:.2f} | "
f"触上={touched_upper} 触下={touched_lower} 触中={touched_middle} | "
f"仓位={self.position}{delay_status}{signal_status}"
)
if not self.get_position_status():
time.sleep(self.cfg.POLL_INTERVAL)
continue
# ===== 收盘K确认逻辑止损 / 中轨平半 / 回开仓价反手 =====
if signal_bb is not None and signal_kline['id'] != self.last_closed_kline_id:
signal_mid, _, _ = signal_bb
signal_high = signal_kline['high']
signal_low = signal_kline['low']
signal_ts = datetime.fromtimestamp(signal_kline['id'] / 1000).strftime('%Y-%m-%d %H:%M')
closed_processed = True
if self.check_stop_loss(signal_high, signal_low):
logger.warning(f"🛑 收盘K触发止损: {signal_ts}")
if self.close_all_and_confirm():
self.last_closed_kline_id = signal_kline['id']
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_processed = False
elif self.position != 0 and self.delay_reverse_price is None:
had_mid_closed_half = self.mid_closed_half
if self.position > 0:
if had_mid_closed_half and signal_low <= self.entry_price:
logger.info(f"💰 收盘K回到开仓价 {self.entry_price:.2f},全平并反手开空")
if self.close_all_and_confirm() and self.open_with_balance_and_confirm('short'):
self.position_count = 1
self.mid_closed_half = False
self.last_closed_kline_id = signal_kline['id']
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success("✓ 收盘K反手完成")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_processed = False
elif not had_mid_closed_half and signal_low <= signal_mid <= signal_high:
logger.info(f"📊 收盘K触中轨 {signal_mid:.2f}平50%")
previous_amount = self.current_amount
if self.close_partial_and_confirm(0.5, previous_amount):
self.mid_closed_half = True
self.last_closed_kline_id = signal_kline['id']
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success("✓ 收盘K平半完成")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_processed = False
else:
if had_mid_closed_half and signal_high >= self.entry_price:
logger.info(f"💰 收盘K回到开仓价 {self.entry_price:.2f},全平并反手开多")
if self.close_all_and_confirm() and self.open_with_balance_and_confirm('long'):
self.position_count = 1
self.mid_closed_half = False
self.last_closed_kline_id = signal_kline['id']
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success("✓ 收盘K反手完成")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_processed = False
elif not had_mid_closed_half and signal_low <= signal_mid <= signal_high:
logger.info(f"📊 收盘K触中轨 {signal_mid:.2f}平50%")
previous_amount = self.current_amount
if self.close_partial_and_confirm(0.5, previous_amount):
self.mid_closed_half = True
self.last_closed_kline_id = signal_kline['id']
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success("✓ 收盘K平半完成")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_processed = False
if not closed_processed:
time.sleep(self.cfg.POLL_INTERVAL)
continue
self.last_closed_kline_id = signal_kline['id']
self.save_state()
# 避免同一K线重复触发
if kline_id == self.last_kline_id:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# ===== 延迟反转确认(优先级最高)=====
if self.delay_reverse_price is not None:
prev_kline = signal_kline if signal_bb is not None else None
prev_upper = signal_bb[1] if signal_bb is not None else None
prev_lower = signal_bb[2] if signal_bb is not None else None
reversal = self.check_delay_reversal(
current_kline, prev_kline, prev_upper, prev_lower
)
if reversal:
new_direction, reversal_price, reason = reversal
logger.warning(f"🔄 延迟反转确认: {reason} @ {reversal_price:.2f}")
logger.info("执行全平仓...")
if self.close_all_and_confirm():
logger.info(f"执行反向开{'' if new_direction == 'long' else ''}...")
if self.open_with_balance_and_confirm(new_direction):
self.position_count = 1
self.mid_closed_half = False
self.clear_delay_reversal()
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success("✓ 延迟反转完成!")
else:
logger.error("延迟反转取消:全平仓未确认")
time.sleep(self.cfg.POLL_INTERVAL)
continue
# ===== 开仓(空仓时)=====
if self.position == 0:
if self.delay_reverse_price is not None:
self.clear_delay_reversal()
if touched_upper:
logger.info("🔴 空仓触上轨,开空")
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
if self.open_with_balance_and_confirm('short'):
self.position_count = 1
self.mid_closed_half = False
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success(f"✓ 开空成功")
elif touched_lower:
logger.info("🟢 空仓触下轨,开多")
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
if self.open_with_balance_and_confirm('long'):
self.position_count = 1
self.mid_closed_half = False
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success(f"✓ 开多成功")
# ===== 延迟反转触发(有持仓且未在延迟反转中)=====
elif self.position > 0 and touched_upper and self.delay_reverse_price is None:
logger.warning("⚠️ 多仓触上轨,标记延迟反转")
self.mark_delay_reversal('long_to_short', bb_upper, kline_id)
self.last_kline_id = kline_id
self.save_state()
elif self.position < 0 and touched_lower and self.delay_reverse_price is None:
logger.warning("⚠️ 空仓触下轨,标记延迟反转")
self.mark_delay_reversal('short_to_long', bb_lower, kline_id)
self.last_kline_id = kline_id
self.save_state()
# ===== 加仓(仅首次开仓后,且未在延迟反转中)=====
elif self.position_count == 1 and self.delay_reverse_price is None:
previous_amount = self.current_amount
if self.position > 0 and touched_lower:
logger.info(" 多仓触下轨,加仓")
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
if self.open_with_balance_and_confirm('long', previous_amount=previous_amount):
self.position_count = 2
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success(f"✓ 加仓成功")
elif self.position < 0 and touched_upper:
logger.info(" 空仓触上轨,加仓")
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
if self.open_with_balance_and_confirm('short', previous_amount=previous_amount):
self.position_count = 2
self.last_kline_id = kline_id
self.update_trade_time()
self.save_state()
logger.success(f"✓ 加仓成功")
time.sleep(self.cfg.POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("用户中断")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
page_start = True
time.sleep(10)
if __name__ == "__main__":
# 使用框架的浏览器ID
trader = BBDelayReversalTrader(bit_id="f2320f57e24c45529a009e1541e25961")
trader.run()