Files
codex_jxs_code/job_bitmart.py
2026-03-09 11:20:28 +08:00

870 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
布林带均值回归策略 — 实盘交易 (D方案: 递增加仓)
BB(10, 2.5) | 5分钟K线 | ETH | 50x杠杆 逐仓 | 递增加仓+1%/次 max=3
逻辑:
- 价格触及上布林带 → 平多(如有) + 开空; 已持空则加仓
- 价格触及下布林带 → 平空(如有) + 开多; 已持多则加仓
- 始终持仓(多空翻转 + 同向加仓)
- 加仓比例: 开仓1%, 第1次加仓2%, 第2次3%, 第3次4%, 最多加仓3次
使用浏览器自动化进行开平仓有手续费返佣API仅用于查询数据
"""
import time
import numpy as np
from datetime import datetime, timezone
from pathlib import Path
from loguru import logger
from bitmart.api_contract import APIContract
from bit_tools import openBrowser
from DrissionPage import ChromiumPage, ChromiumOptions
# ---------------------------------------------------------------------------
# 配置
# ---------------------------------------------------------------------------
class BBDelayReversalConfig:
# API 凭证
API_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
MEMO = "合约交易"
# 合约
CONTRACT_SYMBOL = "ETHUSDT"
TRADE_URL = "https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT"
# 浏览器
BIT_ID = "f2320f57e24c45529a009e1541e25961"
# 布林带参数
BB_PERIOD = 10 # 10根5分钟K线 = 50分钟回看
BB_STD = 2.5 # 标准差倍数
# 仓位管理
LEVERAGE = 50 # 杠杆倍数
OPEN_TYPE = "isolated" # 逐仓模式
MARGIN_PCT = 0.01 # 首次开仓用权益的1%作为保证金
# 递增加仓 (D方案)
PYRAMID_STEP = 0.01 # 每次加仓增加1%权益比例 (1%→2%→3%→4%)
PYRAMID_MAX = 1 # 最多加仓1次 (首次开仓不算)
# 风控
STOP_LOSS_PCT = 0.50 # 亏损百分之50就平仓
MAX_DAILY_LOSS = 50.0 # 日最大亏损(U),达到后停止交易
COOLDOWN_SECONDS = 30 # 两次交易之间最小间隔(秒)
# 运行
POLL_INTERVAL = 5 # 主循环轮询间隔(秒)
KLINE_STEP = 5 # K线周期(分钟)
KLINE_HOURS = 2 # 获取最近多少小时K线(需覆盖BB_PERIOD)
# ---------------------------------------------------------------------------
# 布林带计算
# ---------------------------------------------------------------------------
def calc_bollinger(closes: list, period: int, n_std: float):
"""计算布林带,返回 (mid, upper, lower) 或 None数据不足时"""
if len(closes) < period:
return None
arr = np.array(closes[-period:], dtype=float)
mid = arr.mean()
std = arr.std(ddof=0)
upper = mid + n_std * std
lower = mid - n_std * std
return mid, upper, lower
# ---------------------------------------------------------------------------
# 交易主类
# ---------------------------------------------------------------------------
class BBDelayReversalTrader:
def __init__(self, cfg: BBDelayReversalConfig = None, bit_id: str = None):
self.cfg = cfg or BBDelayReversalConfig()
if bit_id:
self.cfg.BIT_ID = bit_id
self.api = APIContract(
self.cfg.API_KEY, self.cfg.SECRET_KEY, self.cfg.MEMO,
timeout=(5, 15)
)
# 浏览器
self.page: ChromiumPage | None = None
self.page_start = True # 需要(重新)打开浏览器
self.last_page_open_time = 0.0 # 上次打开浏览器的时间
self.PAGE_REFRESH_INTERVAL = 1800 # 每30分钟关闭重开浏览器
# 持仓状态: -1=空, 0=无, 1=多
self.position = 0
self.open_avg_price = None
self.current_amount = None
self.unrealized_value = 0.0
# 半仓折返逻辑状态
self.has_half_closed = False # 是否已经触发过“平一半”逻辑
self.highest_since_open = None # 持有多仓以来的最高价
self.lowest_since_open = None # 持有空仓以来的最低价
# 延迟平仓(碰轨不平,回落/反弹再平)状态
self.touched_band_price = None # 最近一次触碰轨道的触发价格
self.touched_prev_open = None # 发生触碰时的前一根K线开盘价
self.current_kline_id = None # 记录当前在处理的K线
# 加仓状态
self.pyramid_count = 0 # 当前已加仓次数 (0=仅首次开仓)
# 风控
self.daily_pnl = 0.0
self.daily_stopped = False
self.current_date = None
self.last_trade_time = 0.0
# 日志
self.log_dir = Path(__file__).resolve().parent
logger.add(
self.log_dir / "bb_delay_reversal_trade_{time:YYYY-MM-DD}.log",
rotation="1 day", retention="30 days",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
# ------------------------------------------------------------------
# API 封装
# ------------------------------------------------------------------
def get_klines(self) -> list | None:
"""获取最近N小时的5分钟K线返回 [{id, open, high, low, close}, ...]"""
try:
end_time = int(time.time())
start_time = end_time - 3600 * self.cfg.KLINE_HOURS
resp = self.api.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=self.cfg.KLINE_STEP,
start_time=start_time,
end_time=end_time
)[0]
if resp.get("code") != 1000:
logger.error(f"获取K线失败: {resp}")
return None
data = resp["data"]
klines = []
for k in data:
klines.append({
"id": int(k["timestamp"]),
"open": float(k["open_price"]),
"high": float(k["high_price"]),
"low": float(k["low_price"]),
"close": float(k["close_price"]),
})
klines.sort(key=lambda x: x["id"])
return klines
except Exception as e:
logger.error(f"获取K线异常: {e}")
return None
def get_current_price(self) -> float | None:
"""获取当前最新价格最近1分钟K线收盘价"""
try:
end_time = int(time.time())
resp = self.api.get_kline(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
step=1,
start_time=end_time - 300,
end_time=end_time
)[0]
if resp.get("code") == 1000 and resp["data"]:
return float(resp["data"][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_balance(self) -> float | None:
"""获取合约账户可用余额"""
try:
resp = self.api.get_assets_detail()[0]
if resp.get("code") == 1000:
data = resp["data"]
if isinstance(data, dict):
return float(data.get("available_balance", 0))
elif isinstance(data, list):
for asset in data:
if asset.get("currency") == "USDT":
return float(asset.get("available_balance", 0))
return None
except Exception as e:
logger.error(f"查询余额异常: {e}")
return None
def get_position_status(self) -> bool:
"""查询当前持仓,更新 self.position / open_avg_price / current_amount"""
try:
resp = self.api.get_position(contract_symbol=self.cfg.CONTRACT_SYMBOL)[0]
if resp.get("code") != 1000:
logger.error(f"查询持仓失败: {resp}")
return False
positions = resp["data"]
if not positions:
self.position = 0
self.open_avg_price = None
self.current_amount = None
self.unrealized_value = 0.0
self.has_half_closed = False
self.highest_since_open = None
self.lowest_since_open = None
self.touched_band_price = None
self.touched_prev_open = None
return True
pos = positions[0]
self.position = 1 if pos["position_type"] == 1 else -1
self.open_avg_price = float(pos["open_avg_price"])
self.current_amount = float(pos["current_amount"])
self.unrealized_value = float(pos.get("unrealized_value", 0))
logger.debug(f"持仓: dir={self.position} price={self.open_avg_price} "
f"amt={self.current_amount} upnl={self.unrealized_value:.2f}")
return True
except Exception as e:
logger.error(f"查询持仓异常: {e}")
return False
def set_leverage(self) -> bool:
"""设置杠杆和逐仓模式"""
try:
resp = self.api.post_submit_leverage(
contract_symbol=self.cfg.CONTRACT_SYMBOL,
leverage=str(self.cfg.LEVERAGE),
open_type=self.cfg.OPEN_TYPE
)[0]
if resp.get("code") == 1000:
logger.success(f"杠杆设置成功: {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
return True
else:
logger.error(f"杠杆设置失败: {resp}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ------------------------------------------------------------------
# 浏览器自动化
# ------------------------------------------------------------------
def open_browser(self) -> bool:
"""打开浏览器并进入交易页面"""
try:
bit_port = openBrowser(id=self.cfg.BIT_ID)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.last_page_open_time = time.time()
return True
except Exception as e:
logger.error(f"打开浏览器失败: {e}")
return False
def click_safe(self, xpath, sleep=0.5) -> bool:
"""安全点击元素"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.click(by_js=True)
time.sleep(sleep)
return True
except Exception as e:
logger.warning(f"点击失败 [{xpath}]: {e}")
return False
def browser_close_position(self) -> bool:
"""浏览器点击市价全平"""
logger.info("浏览器操作: 市价平仓")
self.click_safe('x://button[normalize-space(text()) ="平仓"]')
time.sleep(0.5)
self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[5]') # 100%
time.sleep(0.5)
return self.click_safe('x://span[normalize-space(text()) ="市价"]')
def browser_half_close_long(self) -> bool:
"""平一半多仓"""
logger.info("浏览器操作: 平一半多仓")
self.click_safe('x://button[normalize-space(text()) ="平仓"]')
time.sleep(0.5)
self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]') # 50%
time.sleep(0.5)
return self.click_safe('x://span[normalize-space(text()) ="卖出/平多"]')
def browser_half_close_short(self) -> bool:
"""平一半空仓"""
logger.info("浏览器操作: 平一半空仓")
self.click_safe('x://button[normalize-space(text()) ="平仓"]')
time.sleep(0.5)
self.click_safe('x://*[@id="futureTradeForm"]/div[5]/div[3]/div[3]/span[3]') # 50%
time.sleep(0.5)
return self.click_safe('x://span[normalize-space(text()) ="买入/平空"]')
def reset_position_state(self):
"""重置所有关于持仓的变量"""
self.pyramid_count = 0
self.has_half_closed = False
self.highest_since_open = None
self.lowest_since_open = None
self.touched_band_price = None
self.touched_prev_open = None
# ------------------------------------------------------------------
# 仓位操作
# ------------------------------------------------------------------
def calc_order_usdt(self, is_add: bool = False) -> float:
"""
计算开仓/加仓金额(U)
首次开仓: 余额 × MARGIN_PCT (1%)
加仓: 余额 × (MARGIN_PCT + PYRAMID_STEP × (pyramid_count+1))
例: 开仓1%, 第1次加仓2%, 第2次加仓3%, 第3次加仓4%
"""
balance = self.get_balance()
if balance is None or balance <= 0:
logger.warning(f"余额不足或查询失败: {balance}")
return 0
if is_add:
pct = self.cfg.MARGIN_PCT + self.cfg.PYRAMID_STEP * (self.pyramid_count + 1)
else:
pct = self.cfg.MARGIN_PCT
order_usdt = round(balance * pct, 2)
logger.info(f"仓位计算: 余额={balance:.2f} × {pct:.0%} = {order_usdt} U"
f" ({'加仓#' + str(self.pyramid_count+1) if is_add else '首次开仓'})")
return order_usdt
def verify_position(self, expected: int) -> bool:
"""验证持仓方向"""
if self.get_position_status():
if self.position == expected:
return True
logger.warning(f"持仓方向不符: 期望{expected}, 实际{self.position}")
return False
# ------------------------------------------------------------------
# 风控
# ------------------------------------------------------------------
def check_daily_reset(self):
"""每日重置(UTC+8 00:00 = UTC 16:00)"""
now = datetime.now(timezone.utc)
# 用UTC日期做简单日切
today = now.date()
if self.current_date != today:
if self.current_date is not None:
logger.info(f"日切: {self.current_date}{today}, 日PnL={self.daily_pnl:.2f}")
self.current_date = today
self.daily_pnl = 0.0
self.daily_stopped = False
def can_trade(self) -> bool:
"""检查是否可交易"""
if self.daily_stopped:
return False
now = time.time()
if now - self.last_trade_time < self.cfg.COOLDOWN_SECONDS:
remain = self.cfg.COOLDOWN_SECONDS - (now - self.last_trade_time)
logger.debug(f"交易冷却中,剩余 {remain:.0f}s")
return False
return True
# ------------------------------------------------------------------
# 日志
# ------------------------------------------------------------------
def write_trade_log(self, action: str, price: float, bb_upper: float,
bb_mid: float, bb_lower: float, reason: str):
"""写入交易日志文件"""
try:
date_str = datetime.now().strftime("%Y%m%d")
log_file = self.log_dir / f"bb_delay_reversal_trade_log_{date_str}.txt"
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
block = (
f"\n{'='*60}\n"
f"时间: {time_str}\n"
f"操作: {action}\n"
f"价格: {price:.2f}\n"
f"BB上轨: {bb_upper:.2f} | 中轨: {bb_mid:.2f} | 下轨: {bb_lower:.2f}\n"
f"原因: {reason}\n"
f"{'='*60}\n"
)
with open(log_file, "a", encoding="utf-8") as f:
f.write(block)
except Exception as e:
logger.warning(f"写入日志失败: {e}")
def login(self):
self.page.ele('x://input[@placeholder="邮箱"]').input("ddrwode@gmail.com")
self.page.ele('x://input[@placeholder="密码"]').input("040828cjj")
self.page.ele('x://*[@id="__layout"]/div/div[2]/div/div[2]/div/div/div[2]/div[1]/div[2]/div/div[1]/div[2]/form/div[3]/div/button').click()
# ------------------------------------------------------------------
# 主循环(浏览器流程与四分之一代码一致)
# ------------------------------------------------------------------
def run(self):
"""策略主循环"""
logger.info("=" * 60)
logger.info(f" BB策略启动(D方案): BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})")
logger.info(f" 合约: {self.cfg.CONTRACT_SYMBOL} | {self.cfg.LEVERAGE}x {self.cfg.OPEN_TYPE}")
logger.info(f" 首次开仓: 权益×{self.cfg.MARGIN_PCT:.0%} | 递增加仓: +{self.cfg.PYRAMID_STEP:.0%}/次 | 最多{self.cfg.PYRAMID_MAX}")
logger.info("=" * 60)
# 设置杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,退出")
return
# 初始持仓同步
if not self.get_position_status():
logger.error("初始持仓查询失败,退出")
return
logger.info(f"初始持仓状态: {self.position}")
last_kline_id = None # 避免同一根K线重复触发
page_start = True # 需要打开浏览器
while True:
# ===== 浏览器管理 =====
# page_start时: 打开浏览器 → 导航 → 点市价 → 输入张数
if page_start:
for i in range(5):
if self.open_browser():
logger.info("浏览器打开成功")
break
else:
logger.error("打开浏览器失败!")
return
# self.login()
self.page.get(self.cfg.TRADE_URL)
time.sleep(2)
# 点击市价模式
self.click_safe('x://button[normalize-space(text()) ="市价"]')
time.sleep(0.5)
# 计算并预输入开仓金额(U)
current_price = self.get_current_price()
if current_price:
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
logger.info(f"预输入开仓金额: {order_usdt} U")
page_start = False
try:
# 每30分钟关闭浏览器重新打开
if time.time() - self.last_page_open_time >= self.PAGE_REFRESH_INTERVAL:
logger.info("浏览器已打开超过30分钟关闭刷新")
try:
self.page.close()
except Exception:
pass
self.page = None
page_start = True
time.sleep(3)
continue
self.check_daily_reset()
if self.daily_stopped:
logger.info(f"日亏损已达限制({self.daily_pnl:.2f}),等待日切")
time.sleep(60)
continue
# 1. 获取K线
klines = self.get_klines()
if not klines or len(klines) < self.cfg.BB_PERIOD:
logger.warning(f"K线数据不足({len(klines) if klines else 0}根),等待...")
time.sleep(self.cfg.POLL_INTERVAL)
continue
closed_klines = klines[:-1]
current_kline = klines[-1]
if len(closed_klines) < self.cfg.BB_PERIOD:
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 2. 计算布林带
closes = [k["close"] for k in closed_klines]
bb = calc_bollinger(closes, self.cfg.BB_PERIOD, self.cfg.BB_STD)
if bb is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
bb_mid, bb_upper, bb_lower = bb
# 3. 获取当前价格
current_price = self.get_current_price()
if current_price is None:
time.sleep(self.cfg.POLL_INTERVAL)
continue
cur_high = current_kline["high"]
cur_low = current_kline["low"]
# 容错: K线high/low + 当前实时价格,任一触及即算触碰
touched_upper = cur_high >= bb_upper or current_price >= bb_upper
touched_lower = cur_low <= bb_lower or current_price <= bb_lower
logger.info(
f"价格={current_price:.2f} | "
f"BB: {bb_lower:.2f} / {bb_mid:.2f} / {bb_upper:.2f} | "
f"H={cur_high:.2f} L={cur_low:.2f} | "
f"触上={touched_upper} 触下={touched_lower} | "
f"仓位={self.position}"
)
# 4. 同步持仓状态
if not self.get_position_status():
time.sleep(self.cfg.POLL_INTERVAL)
continue
# 4.5 止损检查
if self.position != 0 and self.current_amount and self.open_avg_price:
position_margin = (self.open_avg_price * self.current_amount) / self.cfg.LEVERAGE
if position_margin > 0:
roe = self.unrealized_value / position_margin
if roe <= -self.cfg.STOP_LOSS_PCT:
logger.warning(f"触发止损!当前亏损比例: {roe:.2%} (<= -{self.cfg.STOP_LOSS_PCT:.0%}),开始平仓")
action = "止损平仓"
reason = f"亏损比例 {roe:.2%} 达到 {self.cfg.STOP_LOSS_PCT:.0%} 止损线"
self.browser_close_position()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.position == 0:
break
time.sleep(1)
if self.position == 0:
self.reset_position_state()
self.write_trade_log(action, current_price, bb_upper, bb_mid, bb_lower, reason)
logger.success("止损平仓成功")
page_start = True
try:
self.page.close()
except Exception:
pass
self.page = None
time.sleep(5)
continue
else:
logger.error(f"止损平仓失败,当前仍有持仓({self.position})")
# 5. 极值更新与半仓/延迟全平反手逻辑
if self.position == 1 and self.open_avg_price:
if self.highest_since_open is None or current_price > self.highest_since_open:
self.highest_since_open = current_price
# 延迟平仓逻辑: 之前触碰过上轨
if self.touched_band_price is not None:
# 检查是否回落到 触发的上轨价格 或者 上一根K线的开盘价
trigger_target = max(self.touched_band_price, self.touched_prev_open) if self.touched_prev_open else self.touched_band_price
if current_price <= trigger_target or cur_low <= trigger_target:
if self.can_trade():
logger.info(f"多单延迟回落平仓: 当前价跌破触发点 {trigger_target:.2f} (上轨价: {self.touched_band_price}, 上根开: {self.touched_prev_open})")
self.browser_close_position()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.position == 0: break
time.sleep(1)
self.reset_position_state()
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
self.get_position_status()
self.write_trade_log("延迟全平反手(多转空)", current_price, bb_upper, bb_mid, bb_lower, f"多单碰上轨后回落至 {trigger_target:.2f}")
page_start = True
try: self.page.close()
except: pass
self.page = None
time.sleep(5)
continue
# 规则 A1: 涨过中轨
if self.highest_since_open >= bb_mid:
# 从最高点回落触碰中轨,且未平过一半
if not self.has_half_closed and (cur_low <= bb_mid or current_price <= bb_mid):
if self.can_trade():
logger.info("做多回落触碰中轨,执行平一半多仓")
self.browser_half_close_long()
time.sleep(2)
self.get_position_status()
self.has_half_closed = True
self.write_trade_log("平半仓(多)", current_price, bb_upper, bb_mid, bb_lower, "多单涨破中轨后回落触碰中轨")
# 切换回开仓面板以防后续逻辑出错
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
# 规则 A2: 平过一半后,跌回开仓价 -> 全平反手开空
if self.has_half_closed and (cur_low <= self.open_avg_price or current_price <= self.open_avg_price):
if self.can_trade():
logger.info("平半仓后跌回多单开仓价,全平反手开空")
self.browser_close_position()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.position == 0:
break
time.sleep(1)
self.reset_position_state()
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
# -> 新增: 输入开仓金额
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
self.get_position_status()
self.write_trade_log("全平反手(多转空)", current_price, bb_upper, bb_mid, bb_lower, "多单平半后跌回开仓价")
page_start = True
try: self.page.close()
except: pass
self.page = None
time.sleep(5)
continue
elif self.position == -1 and self.open_avg_price:
if self.lowest_since_open is None or current_price < self.lowest_since_open:
self.lowest_since_open = current_price
# 延迟平仓逻辑: 之前触碰过下轨
if self.touched_band_price is not None:
# 检查是否反弹到 触发的下轨价格 或者 上一根K线的开盘价
trigger_target = min(self.touched_band_price, self.touched_prev_open) if self.touched_prev_open else self.touched_band_price
if current_price >= trigger_target or cur_high >= trigger_target:
if self.can_trade():
logger.info(f"空单延迟反弹平仓: 当前价突破触发点 {trigger_target:.2f} (下轨价: {self.touched_band_price}, 上根开: {self.touched_prev_open})")
self.browser_close_position()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.position == 0: break
time.sleep(1)
self.reset_position_state()
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
self.get_position_status()
self.write_trade_log("延迟全平反手(空转多)", current_price, bb_upper, bb_mid, bb_lower, f"空单碰下轨后反弹至 {trigger_target:.2f}")
page_start = True
try: self.page.close()
except: pass
self.page = None
time.sleep(5)
continue
# 规则 B1: 跌破过中轨
if self.lowest_since_open <= bb_mid:
# 从最低点回升触碰中轨,且未平过一半
if not self.has_half_closed and (cur_high >= bb_mid or current_price >= bb_mid):
if self.can_trade():
logger.info("做空反弹触碰中轨,执行平一半空仓")
self.browser_half_close_short()
time.sleep(2)
self.get_position_status()
self.has_half_closed = True
self.write_trade_log("平半仓(空)", current_price, bb_upper, bb_mid, bb_lower, "空单跌破中轨后反弹触碰中轨")
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
# 规则 B2: 平过一半后,涨回开仓价 -> 全平反手开多
if self.has_half_closed and (cur_high >= self.open_avg_price or current_price >= self.open_avg_price):
if self.can_trade():
logger.info("平半仓后涨回空单开仓价,全平反手开多")
self.browser_close_position()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.position == 0:
break
time.sleep(1)
self.reset_position_state()
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
# -> 新增: 输入开仓金额
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
self.get_position_status()
self.write_trade_log("全平反手(空转多)", current_price, bb_upper, bb_mid, bb_lower, "空单平半后涨回开仓价")
page_start = True
try: self.page.close()
except: pass
self.page = None
time.sleep(5)
continue
# 6. 基本信号判断 (上轨空/下轨多)
kline_id = current_kline["id"]
if kline_id == last_kline_id:
time.sleep(self.cfg.POLL_INTERVAL)
continue
if touched_upper and touched_lower:
logger.warning("同时触及上下轨,跳过")
time.sleep(self.cfg.POLL_INTERVAL)
continue
action = None
reason = ""
success = False
# ===== 触及上轨 → 等待延迟平/开空/加仓空 =====
if touched_upper:
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
reason = (f"价格最高{cur_high:.2f}触及上轨{bb_upper:.2f}"
f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})")
if self.position == 1:
# 延迟平仓逻辑: 记录触碰上轨的信息,等待回落后再平仓,不断创新高则更新上轨目标价
prev_open = closed_klines[-1]["open"] if len(closed_klines) > 0 else cur_high
if self.touched_band_price is None or bb_upper > self.touched_band_price:
self.touched_band_price = bb_upper
self.touched_prev_open = prev_open
logger.info(f"多单已触碰上轨,记录延迟目标价: 轨={bb_upper:.2f}, 上根开={prev_open:.2f}。等待回落再平仓...")
# 特殊保护: 当前一瞬间即使碰到立刻跌下来也要依靠下一轮的主循环去判断防止剧烈波动同K线反复触发
self.current_kline_id = current_kline["id"]
elif self.position == 0:
action = "开空"
self.reset_position_state()
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
if self.verify_position(-1):
success = True
elif self.position == -1 and self.pyramid_count < self.cfg.PYRAMID_MAX:
# 已持空仓 + 再次触上轨 → 加仓做空
action = f"加仓空#{self.pyramid_count+1}"
reason += f" (加仓#{self.pyramid_count+1}/{self.cfg.PYRAMID_MAX})"
# 重新计算加仓金额并输入
add_usdt = self.calc_order_usdt(is_add=True)
if add_usdt > 0:
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=add_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
time.sleep(3)
if self.verify_position(-1):
self.pyramid_count += 1
success = True
else:
logger.info(f"已持空仓,加仓已达上限({self.pyramid_count}/{self.cfg.PYRAMID_MAX})")
# ===== 触及下轨 → 等待延迟平/开多/加仓多 =====
elif touched_lower:
if not self.can_trade():
time.sleep(self.cfg.POLL_INTERVAL)
continue
reason = (f"价格最低{cur_low:.2f}触及下轨{bb_lower:.2f}"
f"BB({self.cfg.BB_PERIOD},{self.cfg.BB_STD})")
if self.position == -1:
# 延迟平仓逻辑: 记录触碰下轨的信息,等待反弹后再平仓,不断创新低则更新下轨目标价
prev_open = closed_klines[-1]["open"] if len(closed_klines) > 0 else cur_low
if self.touched_band_price is None or bb_lower < self.touched_band_price:
self.touched_band_price = bb_lower
self.touched_prev_open = prev_open
logger.info(f"空单已触碰下轨,记录延迟目标价: 轨={bb_lower:.2f}, 上根开={prev_open:.2f}。等待反弹再平仓...")
self.current_kline_id = current_kline["id"]
elif self.position == 0:
action = "开多"
self.reset_position_state()
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
order_usdt = self.calc_order_usdt()
if order_usdt > 0:
self.page.ele('x://*[@id="size_0"]').input(vals=order_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
if self.verify_position(1):
success = True
elif self.position == 1 and self.pyramid_count < self.cfg.PYRAMID_MAX:
# 已持多仓 + 再次触下轨 → 加仓做多
action = f"加仓多#{self.pyramid_count+1}"
reason += f" (加仓#{self.pyramid_count+1}/{self.cfg.PYRAMID_MAX})"
# 重新计算加仓金额并输入
add_usdt = self.calc_order_usdt(is_add=True)
if add_usdt > 0:
self.click_safe('x://button[normalize-space(text()) ="开仓"]')
time.sleep(0.5)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=add_usdt, clear=True)
time.sleep(0.5)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
time.sleep(3)
if self.verify_position(1):
self.pyramid_count += 1
success = True
else:
logger.info(f"已持多仓,加仓已达上限({self.pyramid_count}/{self.cfg.PYRAMID_MAX})")
# ===== 交易成功后处理 =====
if success and action:
last_kline_id = kline_id
self.last_trade_time = time.time()
self.write_trade_log(action, current_price,
bb_upper, bb_mid, bb_lower, reason)
logger.success(f"{action} 执行成功")
# 交易完成后关闭浏览器,下轮重新打开
page_start = True
try:
self.page.close()
except Exception:
pass
self.page = None
time.sleep(5)
time.sleep(self.cfg.POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
page_start = True
time.sleep(10)
# ---------------------------------------------------------------------------
# 入口
# ---------------------------------------------------------------------------
if __name__ == "__main__":
trader = BBDelayReversalTrader()
trader.run()