Files
lm_code/bitmart/四分之一,五分钟,反手条件充足修改版.py
2026-02-06 16:45:26 +08:00

993 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import time
from datetime import datetime
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.layout import Layout
from rich.align import Align
from rich.text import Text
from rich.live import Live
# 是否使用 Rich 仪表盘(否则用 loguru 普通输出)
USE_RICH_DASHBOARD = True
console = Console()
# 颜色标签loguru 用,无 Rich 时)
_R = "\033[0m"
_B = "\033[1m"
_C = "\033[36m"
_Y = "\033[33m"
_G = "\033[32m"
_M = "\033[90m"
_W = "\033[97m"
def _tag(t: str, color: str) -> str:
return color + "[" + t + "]" + _R + " "
LOG_PRICE = _tag("价格", _C)
LOG_SIGNAL = _tag("信号", _Y)
LOG_POSITION = _tag("仓位", _G)
LOG_SYSTEM = _tag("系统", _M)
if not USE_RICH_DASHBOARD:
logger.remove()
logger.add(sys.stderr, format="\033[2m│\033[0m {message}", colorize=False)
def log_kline_header(kline_id):
"""新 K 线分块头(仅非 Rich 模式使用)"""
s = f" K 线 {kline_id} "
width = max(52, len(s) + 4)
line = "" * (width - 2)
pad_left = (width - 2 - len(s)) // 2
pad_right = width - 2 - len(s) - pad_left
logger.info(_M + "" + line + "" + _R)
logger.info(_M + "" + _R + " " * pad_left + _B + _W + s + _R + " " * pad_right + _M + "" + _R)
logger.info(_M + "" + line + "" + _R)
# ---------- Rich 仪表盘 ----------
def make_header() -> Panel:
title = Text("四分之一策略 · ETHUSDT 5m", style="bold cyan")
subtitle = Text(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), style="dim")
return Panel(Align.center(title + Text("\n") + subtitle), border_style="cyan")
def make_metrics_panel(state: dict) -> Panel:
t = Table(show_header=True, header_style="bold magenta", expand=True)
t.add_column("指标", style="cyan")
t.add_column("数值", justify="right", style="green")
t.add_row("当前价", f"{state.get('price', 0):.2f}")
pos_map = {0: "", 1: "", -1: ""}
t.add_row("持仓", pos_map.get(state.get("position", 0), "?"))
t.add_row("K线 ID", str(state.get("kline_id", "-")))
t.add_row("做多触发", f"{state.get('long_trigger', 0):.2f}")
t.add_row("做空触发", f"{state.get('short_trigger', 0):.2f}")
t.add_row("突破做多", f"{state.get('long_breakout', 0):.2f}")
t.add_row("突破做空", f"{state.get('short_breakout', 0):.2f}")
ema10 = state.get("ema10")
atr14 = state.get("atr14")
t.add_row("EMA10", f"{ema10:.2f}" if ema10 is not None else "-")
t.add_row("ATR14", f"{atr14:.2f}" if atr14 is not None else "-")
pnl = state.get("unrealized_pnl")
t.add_row("未实现盈亏", f"{pnl:.2f}$" if pnl is not None else "-")
return Panel(t, title="[bold]价格 / 指标[/bold]", border_style="magenta")
def make_logs_panel(logs: list) -> Panel:
body = "\n".join(logs[-12:]) if logs else "等待日志..."
text = Text.from_ansi(body) if body else Text("等待日志...", style="dim")
return Panel(text, title="[bold]状态 / 日志[/bold]", border_style="green")
def make_footer(msg: str = "Ctrl+C 退出") -> Panel:
return Panel(Align.center(Text(msg, style="bold yellow")), border_style="yellow")
def build_dashboard_layout(state: dict, logs: list) -> Layout:
layout = Layout()
layout.split_column(
Layout(make_header(), name="header", size=5),
Layout(name="body", ratio=1),
Layout(make_footer(), name="footer", size=3),
)
layout["body"].split_row(
Layout(make_metrics_panel(state), name="left", ratio=1),
Layout(make_logs_panel(logs), name="right", ratio=2),
)
return layout
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.pbar = tqdm(total=30, desc="等待K线", ncols=80) # 可选:用于长时间等待时展示进度
self.last_kline_time = None # 上一次处理的K线时间戳用于判断是否是新K线
# 反手频率控制
self.reverse_cooldown_seconds = 1.5 * 60 # 反手冷却时间(秒)
self.reverse_min_move_pct = 0.05 # 反手最小价差过滤(百分比)
self.last_reverse_time = None # 上次反手时间
# 开仓频率控制
self.open_cooldown_seconds = 60 # 开仓冷却时间(秒),两次开仓至少间隔此时长
self.last_open_time = None # 上次开仓时间
self.last_open_kline_id = None # 上次开仓所在 K 线 id仅记录不限制单 K 线开仓次数)
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式
self.risk_percent = 0 # 未使用;若启用则可为每次开仓占可用余额的百分比
self.stop_loss_usd = -3 # 固定止损:亏损达到 3 美元平仓
self.trailing_activation_usd = 2 # 盈利达到此金额后启动移动止损
self.trailing_distance_usd = 1.5 # 从最高盈利回撤此金额则平仓
self.max_unrealized_pnl_seen = None # 持仓期间见过的最大盈利(用于移动止损)
# 当前K线从极值回落平仓
# 模式: 'fixed'=固定点数(drop_from_high_to_close)'pct_retrace'=按本K线涨幅比例动态算回撤
self.drop_from_high_mode = 'pct_retrace' # 'fixed' | 'pct_retrace'
self.drop_from_high_to_close = 2 # fixed 模式下:回落/反弹超过此价格点数则平仓0 表示关闭
# pct_retrace 模式本K线涨幅 = (最高-开盘)/开盘*100允许回撤% = 涨幅% * retrace_ratio从最高点回撤超过则平仓
self.retrace_ratio = 0.5 # 回撤系数,如 0.5 表示允许回撤涨幅的 50%(类似斐波那契 50% 回撤)
self.min_rise_pct_to_activate = 0.02 # 至少涨/跌这么多才启用动态回撤,避免噪音
self.min_drop_pct_from_high = 0.03 # 至少从最高点回撤这么多%才平仓(保底,避免过于敏感)
# EMA(10) + ATR(14) 平仓(多/空一致):多单跌破 EMA10 或从最高回撤≥ATR 平;空单涨破 EMA10 或从最低反弹≥ATR 平
self.use_ema_atr_exit = True # 是否启用 EMA/ATR 平仓规则(多单+空单)
self.atr_multiplier = 1.1 # 追踪止盈:从极值回撤/反弹 ≥ 此倍数 × ATR(14) 则平仓
self._candle_high_seen = None # 当前K线内见过的最高价多头用
self._candle_low_seen = None # 当前K线内见过的最低价空头用
self._candle_id_for_high_low = None # 记录高低对应的K线 id换线则重置
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
self.default_order_size = 25 # 开仓/反手张数,统一在此修改
# 策略相关变量
self.prev_kline = None
self.current_kline = None
self.prev_entity = None
self.current_open = None
# Rich 仪表盘:状态与日志(供 build_dashboard_layout 使用)
self._display_state = {}
self._display_logs = []
self._display_triggers = {}
def get_klines(self):
"""获取最近2根K线当前K线和上一根K线"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新的K线
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5, # 5分钟
start_time=end_time - 3600 * 3, # 取最近3小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
# 返回最近2根K线倒数第二根上一根和最后一根当前
if len(formatted) >= 2:
return formatted[-2], formatted[-1]
return None, None
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(text="获取K线异常", error=True)
return None, None
def get_klines_series(self, count=35):
"""获取最近 count 根 5 分钟 K 线(用于 EMA/ATR按时间正序。最后一根为当前未收盘 K 线。"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=5,
start_time=end_time - 3600 * 4,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted[-count:] if len(formatted) >= count else formatted
except Exception as e:
logger.error(f"获取K线序列异常: {e}")
return []
@staticmethod
def _ema(series, period):
"""对序列 series从旧到新计算 EMA(period),返回最后一个 EMA 值。"""
if not series or len(series) < period:
return None
alpha = 2.0 / (period + 1)
ema = sum(series[:period]) / period # 用前 period 根收盘的 SMA 做种子
for i in range(period, len(series)):
ema = alpha * series[i] + (1 - alpha) * ema
return ema
@staticmethod
def _atr(klines, period=14):
"""对 klines从旧到新每项含 high/low/close计算 ATR(period),返回最后一个 ATR 值。"""
if not klines or len(klines) < period + 1:
return None
tr_list = []
for i in range(1, len(klines)):
high, low = klines[i]['high'], klines[i]['low']
prev_close = klines[i - 1]['close']
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
tr_list.append(tr)
if len(tr_list) < period:
return None
# ATR = EMA(TR, period)
alpha = 2.0 / (period + 1)
atr = sum(tr_list[:period]) / period
for i in range(period, len(tr_list)):
atr = alpha * tr_list[i] + (1 - alpha) * atr
return atr
def get_ema_atr_for_exit(self, kline_series):
"""
基于已收盘 K 线计算 EMA10、EMA20、ATR(14)。
kline_series 最后一根可为当前未收盘 K 线,计算时用倒数第 2~N 根作为已收盘。
返回 (ema10, ema20, atr14),任一不足则对应为 None。
"""
if not kline_series or len(kline_series) < 21:
return None, None, None
# 用除最后一根外的已收盘 K 线(若只有 21 根则用前 20 根)
closed = kline_series[:-1] if len(kline_series) >= 21 else kline_series[:20]
closes = [k['close'] for k in closed]
ema10 = self._ema(closes, 10)
ema20 = self._ema(closes, 20)
atr14 = self._atr(closed, 14)
return ema10, ema20, atr14
def get_current_price(self):
"""获取当前最新价格"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 1, # 取最近1小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.unrealized_pnl = None
return True
pos = positions[0]
self.start = 1 if pos['position_type'] == 1 else -1
self.open_avg_price = float(pos['open_avg_price'])
self.current_amount = float(pos['current_amount'])
self.position_cross = pos["position_cross"]
# 直接从API获取未实现盈亏Bitmart返回的是 unrealized_value 字段)
self.unrealized_pnl = float(pos.get('unrealized_value', 0))
logger.debug(f"持仓详情: 方向={self.start}, 开仓均价={self.open_avg_price}, "
f"持仓量={self.current_amount}, 未实现盈亏={self.unrealized_pnl:.2f}")
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def get_unrealized_pnl_usd(self):
"""
获取当前持仓未实现盈亏美元直接使用API返回值
"""
if self.start == 0 or self.unrealized_pnl is None:
return None
return self.unrealized_pnl
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
# ele.scroll.to_see(center=True)
# time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
"""平仓操作"""
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价做多或者做空1是做多-1是做空
limitPriceShortOrder 限价做多或者做空
"""
if marketPriceLongOrder == -1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
# self.click_safe('x://button[normalize-space(text()) ="市价"]')
# self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, text, error=False):
"""日志通知"""
if error:
logger.error(text)
else:
logger.info(text)
def calculate_entity(self, kline):
"""计算K线实体大小绝对值"""
return abs(kline['close'] - kline['open'])
def calculate_upper_shadow(self, kline):
"""计算上阴线(上影线)涨幅百分比"""
# 上阴线 = (最高价 - max(开盘价, 收盘价)) / max(开盘价, 收盘价)
body_top = max(kline['open'], kline['close'])
if body_top == 0:
return 0
return (kline['high'] - body_top) / body_top * 100
def calculate_lower_shadow(self, kline):
"""计算下阴线(下影线)跌幅百分比"""
# 下阴线 = (min(开盘价, 收盘价) - 最低价) / min(开盘价, 收盘价)
body_bottom = min(kline['open'], kline['close'])
if body_bottom == 0:
return 0
return (body_bottom - kline['low']) / body_bottom * 100
def get_entity_edge(self, kline):
"""获取K线实体边收盘价或开盘价取决于是阳线还是阴线"""
# 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价
# 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价
return {
'upper': max(kline['open'], kline['close']), # 实体上边
'lower': min(kline['open'], kline['close']) # 实体下边
}
def check_signal(self, current_price, prev_kline, current_kline):
"""
检查交易信号
返回: ('long', trigger_price) / ('short', trigger_price) / None
"""
# 计算上一根K线实体
prev_entity = self.calculate_entity(prev_kline)
# 实体过小不交易(实体 < 0.1
if prev_entity < 0.1:
logger.info(LOG_PRICE + f"上一根K线实体过小: {prev_entity:.4f},跳过信号检测")
return None
# 获取上一根K线的实体上下边
prev_entity_edge = self.get_entity_edge(prev_kline)
prev_entity_upper = prev_entity_edge['upper'] # 实体上边
prev_entity_lower = prev_entity_edge['lower'] # 实体下边
# 优化:以下两种情况以当前这根的开盘价作为计算基准
# 1) 上一根阳线 且 当前开盘价 > 上一根收盘价(跳空高开)
# 2) 上一根阴线 且 当前开盘价 < 上一根收盘价(跳空低开)
prev_is_bullish_for_calc = prev_kline['close'] > prev_kline['open']
prev_is_bearish_for_calc = prev_kline['close'] < prev_kline['open']
current_open_above_prev_close = current_kline['open'] > prev_kline['close']
current_open_below_prev_close = current_kline['open'] < prev_kline['close']
use_current_open_as_base = (prev_is_bullish_for_calc and current_open_above_prev_close) or (prev_is_bearish_for_calc and current_open_below_prev_close)
if use_current_open_as_base:
# 以当前K线开盘价为基准计算跳空时用当前开盘价参与计算
calc_lower = current_kline['open']
calc_upper = current_kline['open'] # 同一基准,上下四分之一对称
long_trigger = calc_lower + prev_entity / 4
short_trigger = calc_upper - prev_entity / 4
long_breakout = calc_upper + prev_entity / 4
short_breakout = calc_lower - prev_entity / 4
else:
# 原有计算方式
long_trigger = prev_entity_lower + prev_entity / 4 # 做多触发价 = 实体下边 + 实体/4下四分之一处
short_trigger = prev_entity_upper - prev_entity / 4 # 做空触发价 = 实体上边 - 实体/4上四分之一处
long_breakout = prev_entity_upper + prev_entity / 4 # 做多突破价 = 实体上边 + 实体/4
short_breakout = prev_entity_lower - prev_entity / 4 # 做空突破价 = 实体下边 - 实体/4
# 上一根阴线 + 当前阳线做多形态不按上一根K线上三分之一做空
prev_is_bearish = prev_kline['close'] < prev_kline['open']
current_is_bullish = current_kline['close'] > current_kline['open']
skip_short_by_upper_third = prev_is_bearish and current_is_bullish
# 上一根阳线 + 当前阴线做空形态不按上一根K线下三分之一做多
prev_is_bullish = prev_kline['close'] > prev_kline['open']
current_is_bearish = current_kline['close'] < current_kline['open']
skip_long_by_lower_third = prev_is_bullish and current_is_bearish
if use_current_open_as_base:
if prev_is_bullish_for_calc and current_open_above_prev_close:
logger.info(LOG_PRICE + f"上一根阳线且当前开盘价({current_kline['open']:.2f})>上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算")
else:
logger.info(LOG_PRICE + f"上一根阴线且当前开盘价({current_kline['open']:.2f})<上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算")
logger.info(LOG_PRICE + f"当前价: {current_price:.2f} | 上一根实体: {prev_entity:.4f} | 实体上边: {prev_entity_upper:.2f} 下边: {prev_entity_lower:.2f}")
logger.info(LOG_PRICE + f"做多触发(下1/4): {long_trigger:.2f} | 做空触发(上1/4): {short_trigger:.2f} | 突破做多: {long_breakout:.2f} | 突破做空: {short_breakout:.2f}")
if skip_short_by_upper_third:
logger.info(LOG_PRICE + "上一根阴+当前阳(做多形态)不按上1/4做空")
if skip_long_by_lower_third:
logger.info(LOG_PRICE + "上一根阳+当前阴(做空形态)不按下1/4做多")
self._display_triggers = {"long_trigger": long_trigger, "short_trigger": short_trigger, "long_breakout": long_breakout, "short_breakout": short_breakout}
# 无持仓时检查开仓信号
if self.start == 0:
if current_price >= long_breakout and not skip_long_by_lower_third:
logger.info(LOG_SIGNAL + f"触发做多 | 价 {current_price:.2f} >= 突破价 {long_breakout:.2f}")
return ('long', long_breakout)
elif current_price <= short_breakout and not skip_short_by_upper_third:
logger.info(LOG_SIGNAL + f"触发做空 | 价 {current_price:.2f} <= 突破价 {short_breakout:.2f}")
return ('short', short_breakout)
# 持仓时检查反手信号
elif self.start == 1: # 持多仓
if current_price <= short_trigger and not skip_short_by_upper_third:
logger.info(LOG_SIGNAL + f"持多→反手做空 | 价 {current_price:.2f} <= 触发价 {short_trigger:.2f}")
return ('reverse_short', short_trigger)
upper_shadow_pct = self.calculate_upper_shadow(prev_kline)
if upper_shadow_pct > 0.01 and current_price <= prev_entity_lower:
logger.info(LOG_SIGNAL + f"持多→反手做空 | 上阴线 {upper_shadow_pct:.4f}% 价<=实体下边 {prev_entity_lower:.2f}")
return ('reverse_short', prev_entity_lower)
elif self.start == -1: # 持空仓
if current_price >= long_trigger and not skip_long_by_lower_third:
logger.info(LOG_SIGNAL + f"持空→反手做多 | 价 {current_price:.2f} >= 触发价 {long_trigger:.2f}")
return ('reverse_long', long_trigger)
lower_shadow_pct = self.calculate_lower_shadow(prev_kline)
if lower_shadow_pct > 0.01 and current_price >= prev_entity_upper:
logger.info(LOG_SIGNAL + f"持空→反手做多 | 下阴线 {lower_shadow_pct:.4f}% 价>=实体上边 {prev_entity_upper:.2f}")
return ('reverse_long', prev_entity_upper)
return None
def can_open(self, current_kline_id):
"""开仓前过滤:仅开仓冷却时间。单根 K 线符合规则可多次开仓。"""
now = time.time()
if self.last_open_time is not None and now - self.last_open_time < self.open_cooldown_seconds:
remain = self.open_cooldown_seconds - (now - self.last_open_time)
logger.info(LOG_SYSTEM + f"开仓冷却中,剩余 {remain:.0f}")
return False
return True
def can_reverse(self, current_price, trigger_price):
"""反手前过滤:冷却时间 + 最小价差"""
now = time.time()
if self.last_reverse_time and now - self.last_reverse_time < self.reverse_cooldown_seconds:
remain = self.reverse_cooldown_seconds - (now - self.last_reverse_time)
logger.info(LOG_SYSTEM + f"反手冷却中,剩余 {remain:.0f}")
return False
if trigger_price and trigger_price > 0:
move_pct = abs(current_price - trigger_price) / trigger_price * 100
if move_pct < self.reverse_min_move_pct:
logger.info(LOG_SYSTEM + f"反手价差不足: {move_pct:.4f}% < {self.reverse_min_move_pct}%")
return False
return True
def verify_no_position(self, max_retries=5, retry_interval=3):
"""
验证当前无持仓
返回: True 表示无持仓可以开仓False 表示有持仓不能开仓
"""
for i in range(max_retries):
if self.get_position_status():
if self.start == 0:
logger.info(LOG_POSITION + "确认无持仓,可以开仓")
return True
else:
logger.warning(
f"仍有持仓 (方向: {self.start}),等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
else:
logger.warning(f"查询持仓状态失败,等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})")
time.sleep(retry_interval)
logger.error(f"经过 {max_retries} 次重试仍有持仓或查询失败,放弃开仓")
return False
def verify_position_direction(self, expected_direction):
"""
验证当前持仓方向是否与预期一致
expected_direction: 1 多仓, -1 空仓
返回: True 表示持仓方向正确False 表示不正确
"""
if self.get_position_status():
if self.start == expected_direction:
logger.info(LOG_POSITION + f"持仓方向验证成功: {self.start}")
return True
else:
logger.warning(f"持仓方向不符: 期望 {expected_direction}, 实际 {self.start}")
return False
else:
logger.error("查询持仓状态失败")
return False
def execute_trade(self, signal, size=None):
"""执行交易。size 不传或为 None 时使用 default_order_size。"""
signal_type, trigger_price = signal
size = self.default_order_size if size is None else size
if signal_type == 'long':
# 开多前先确认无持仓
logger.info(LOG_POSITION + f"准备开多,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开多前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(LOG_POSITION + "确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(1):
self.max_unrealized_pnl_seen = None # 新仓位重置移动止损记录
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success(LOG_POSITION + "开多成功")
return True
else:
logger.error("开多后持仓验证失败")
return False
elif signal_type == 'short':
# 开空前先确认无持仓
logger.info(LOG_POSITION + f"准备开空,触发价: {trigger_price:.2f}")
if not self.get_position_status():
logger.error("开仓前查询持仓状态失败,放弃开仓")
return False
if self.start != 0:
logger.warning(f"开空前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓")
return False
logger.info(LOG_POSITION + "确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3) # 等待订单执行
# 验证开仓是否成功
if self.verify_position_direction(-1):
self.max_unrealized_pnl_seen = None # 新仓位重置移动止损记录
self.last_open_time = time.time()
self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None)
logger.success(LOG_POSITION + "开空成功")
return True
else:
logger.error("开空后持仓验证失败")
return False
elif signal_type == 'reverse_long':
# 平空 + 开多(反手做多):先平仓,确认无仓后再开多,避免双向持仓
logger.info(LOG_POSITION + f"执行反手做多,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1) # 给交易所处理平仓的时间
# 轮询确认已无持仓再开多(最多等约 10 秒)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做多:平仓后仍有持仓,放弃本次开多")
return False
logger.info(LOG_POSITION + "已确认无持仓,执行开多")
self.开单(marketPriceLongOrder=1, size=size)
time.sleep(3)
if self.verify_position_direction(1):
self.max_unrealized_pnl_seen = None
logger.success(LOG_POSITION + "反手做多成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做多后持仓验证失败")
return False
elif signal_type == 'reverse_short':
# 平多 + 开空(反手做空):先平仓,确认无仓后再开空
logger.info(LOG_POSITION + f"执行反手做空,触发价: {trigger_price:.2f}")
self.平仓()
time.sleep(1)
for _ in range(10):
if self.get_position_status() and self.start == 0:
break
time.sleep(1)
if self.start != 0:
logger.warning("反手做空:平仓后仍有持仓,放弃本次开空")
return False
logger.info(LOG_POSITION + "已确认无持仓,执行开空")
self.开单(marketPriceLongOrder=-1, size=size)
time.sleep(3)
if self.verify_position_direction(-1):
self.max_unrealized_pnl_seen = None
logger.success(LOG_POSITION + "反手做空成功")
self.last_reverse_time = time.time()
time.sleep(20)
return True
else:
logger.error("反手做空后持仓验证失败")
return False
return False
def action(self):
"""主循环"""
logger.info(LOG_SYSTEM + "开始运行四分之一策略交易...")
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
page_start = True
if USE_RICH_DASHBOARD:
self._display_logs = []
logger.remove()
def _sink(msg):
self._display_logs.append(msg.record["message"])
if len(self._display_logs) > 50:
self._display_logs.pop(0)
logger.add(_sink, format="{message}")
live = None
if USE_RICH_DASHBOARD:
live = Live(console=console, refresh_per_second=8, screen=True)
live.start()
try:
while True:
if live is not None:
try:
live.update(build_dashboard_layout(self._display_state, self._display_logs))
except Exception:
pass
if page_start:
# 打开浏览器
for i in range(5):
if self.openBrowser():
logger.info(LOG_SYSTEM + "浏览器打开成功")
break
else:
self.ding("打开浏览器失败!", error=True)
return
# 进入交易页面
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(vals=25, clear=True)
page_start = False
try:
# 1. 获取K线数据当前K线和上一根K线
prev_kline, current_kline = self.get_klines()
if not prev_kline or not current_kline:
logger.warning(LOG_SYSTEM + "获取K线失败等待重试...")
time.sleep(5)
continue
# 记录进入新的K线分块分隔便于阅读
current_kline_time = current_kline['id']
if self.last_kline_time != current_kline_time:
self.last_kline_time = current_kline_time
logger.info("")
log_kline_header(current_kline_time)
# 2. 获取当前价格
current_price = self.get_current_price()
if not current_price:
logger.warning(LOG_SYSTEM + "获取价格失败,等待重试...")
time.sleep(2)
continue
# 3. 每次循环都通过SDK获取真实持仓状态避免状态不同步导致双向持仓
if not self.get_position_status():
logger.warning(LOG_SYSTEM + "获取持仓状态失败,等待重试...")
time.sleep(2)
continue
logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)")
# 更新仪表盘左侧数据(供 Rich 展示)
try:
self._display_state["price"] = current_price
self._display_state["position"] = self.start
self._display_state["kline_id"] = current_kline_time
self._display_state["unrealized_pnl"] = self.get_unrealized_pnl_usd()
kline_series = self.get_klines_series(35)
ema10, ema20, atr14 = self.get_ema_atr_for_exit(kline_series)
self._display_state["ema10"] = ema10
self._display_state["atr14"] = atr14
if getattr(self, "_display_triggers", None):
self._display_state.update(self._display_triggers)
except Exception:
pass
# 3.5 止损/止盈/移动止损 + EMA/ATR 平仓 + 当前K线从极值回落平仓
if self.start != 0:
# 当前K线从最高/最低点回落平仓换线重置跟踪有持仓时更新本K线内最高/最低价并检查
if self._candle_id_for_high_low != current_kline_time:
self._candle_high_seen = None
self._candle_low_seen = None
self._candle_id_for_high_low = current_kline_time
# 多头:先更新本 K 线最高价(供 ATR 追踪与后续回落逻辑用)
if self.start == 1:
self._candle_high_seen = max(self._candle_high_seen or 0, current_price)
elif self.start == -1:
self._candle_low_seen = min(self._candle_low_seen or float('inf'), current_price)
# 多单EMA(10) + ATR(14) 平仓 —— 收盘跌破 EMA10 先平;或从最高价回撤 ≥ 1.1×ATR 平
if self.start == 1 and self.use_ema_atr_exit:
kline_series = self.get_klines_series(35)
ema10, ema20, atr14 = self.get_ema_atr_for_exit(kline_series)
if ema10 is not None and current_price < ema10:
logger.info(LOG_POSITION + f"多单 EMA10 平仓 | 价 {current_price:.2f} 跌破 EMA10 {ema10:.2f}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_high_seen = None
time.sleep(3)
continue
if atr14 is not None and self._candle_high_seen and (self._candle_high_seen - current_price) >= self.atr_multiplier * atr14:
logger.info(LOG_POSITION + f"多单 ATR 追踪止盈 | 最高 {self._candle_high_seen:.2f} 当前 {current_price:.2f} 回撤≥{self.atr_multiplier}×ATR={atr14:.2f}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_high_seen = None
time.sleep(3)
continue
# 空单EMA(10) + ATR(14) 平仓 —— 收盘涨破 EMA10 先平;或从最低价反弹 ≥ 1.1×ATR 平
if self.start == -1 and self.use_ema_atr_exit:
kline_series = self.get_klines_series(35)
ema10, ema20, atr14 = self.get_ema_atr_for_exit(kline_series)
if ema10 is not None and current_price > ema10:
logger.info(LOG_POSITION + f"空单 EMA10 平仓 | 价 {current_price:.2f} 涨破 EMA10 {ema10:.2f}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_low_seen = None
time.sleep(3)
continue
if atr14 is not None and self._candle_low_seen and (current_price - self._candle_low_seen) >= self.atr_multiplier * atr14:
logger.info(LOG_POSITION + f"空单 ATR 追踪止盈 | 最低 {self._candle_low_seen:.2f} 当前 {current_price:.2f} 反弹≥{self.atr_multiplier}×ATR={atr14:.2f}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_low_seen = None
time.sleep(3)
continue
use_fixed = self.drop_from_high_mode == 'fixed' and self.drop_from_high_to_close and self.drop_from_high_to_close > 0
use_pct = self.drop_from_high_mode == 'pct_retrace'
if use_fixed or use_pct:
if self.start == 1: # 多头:最高价已在上面更新,这里只做回落判断
do_close = False
if use_fixed and self._candle_high_seen and current_price <= self._candle_high_seen - self.drop_from_high_to_close:
do_close = True
reason = f"固定回落 {self.drop_from_high_to_close}"
elif use_pct and self._candle_high_seen and current_kline.get('open'):
candle_open = float(current_kline['open'])
rise_pct = (self._candle_high_seen - candle_open) / candle_open * 100 if candle_open > 0 else 0
if rise_pct >= self.min_rise_pct_to_activate:
drop_trigger_pct = max(self.min_drop_pct_from_high, rise_pct * self.retrace_ratio)
drop_pct = (self._candle_high_seen - current_price) / self._candle_high_seen * 100 if self._candle_high_seen else 0
if drop_pct >= drop_trigger_pct:
do_close = True
reason = f"涨幅 {rise_pct:.3f}% → 允许回撤 {drop_trigger_pct:.3f}%,实际回撤 {drop_pct:.3f}%"
else:
pass # 涨幅不足,不启用动态回撤
if do_close:
logger.info(LOG_POSITION + f"多单K线回落平仓 | 最高 {self._candle_high_seen:.2f} 当前 {current_price:.2f} | {reason}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_high_seen = None
time.sleep(3)
continue
elif self.start == -1: # 空头跟踪当前K线最低价从最低点反弹超过阈值则平仓
self._candle_low_seen = min(self._candle_low_seen or float('inf'), current_price)
do_close = False
if use_fixed and self._candle_low_seen and current_price >= self._candle_low_seen + self.drop_from_high_to_close:
do_close = True
reason = f"固定反弹 {self.drop_from_high_to_close}"
elif use_pct and self._candle_low_seen and current_kline.get('open'):
candle_open = float(current_kline['open'])
rise_pct = (candle_open - self._candle_low_seen) / candle_open * 100 if candle_open > 0 else 0 # 对空头是“跌幅”
if rise_pct >= self.min_rise_pct_to_activate:
drop_trigger_pct = max(self.min_drop_pct_from_high, rise_pct * self.retrace_ratio)
bounce_pct = (current_price - self._candle_low_seen) / self._candle_low_seen * 100 if self._candle_low_seen else 0
if bounce_pct >= drop_trigger_pct:
do_close = True
reason = f"跌幅 {rise_pct:.3f}% → 允许反弹 {drop_trigger_pct:.3f}%,实际反弹 {bounce_pct:.3f}%"
if do_close:
logger.info(LOG_POSITION + f"空单K线反弹平仓 | 最低 {self._candle_low_seen:.2f} 当前 {current_price:.2f} | {reason}")
self.平仓()
self.max_unrealized_pnl_seen = None
self._candle_low_seen = None
time.sleep(3)
continue
pnl_usd = self.get_unrealized_pnl_usd()
if pnl_usd is not None:
# 固定止损:亏损达到 3 美元平仓
if pnl_usd <= self.stop_loss_usd:
logger.info(LOG_POSITION + f"固定止损平仓 | 亏损 {pnl_usd:.2f} 美元")
self.平仓()
self.max_unrealized_pnl_seen = None
time.sleep(3)
continue
# 更新持仓期间最大盈利(用于移动止损)
if self.max_unrealized_pnl_seen is None:
self.max_unrealized_pnl_seen = pnl_usd
else:
self.max_unrealized_pnl_seen = max(self.max_unrealized_pnl_seen, pnl_usd)
# 移动止损:盈利曾达到 activation 后,从最高盈利回撤 trailing_distance 则平仓
if self.max_unrealized_pnl_seen >= self.trailing_activation_usd:
if pnl_usd < self.max_unrealized_pnl_seen - self.trailing_distance_usd:
logger.info(LOG_POSITION + f"移动止损平仓 | 盈利 {pnl_usd:.2f} 从最高 {self.max_unrealized_pnl_seen:.2f} 回撤≥{self.trailing_distance_usd}$")
self.平仓()
self.max_unrealized_pnl_seen = None
time.sleep(3)
continue
# 4. 检查信号
signal = self.check_signal(current_price, prev_kline, current_kline)
# 5. 反手过滤:冷却时间 + 最小价差
if signal and signal[0].startswith('reverse_'):
if not self.can_reverse(current_price, signal[1]):
signal = None
# 5.5 开仓频率过滤:仅冷却时间,单根 K 线符合规则可多次开仓
if signal and signal[0] in ('long', 'short'):
if not self.can_open(current_kline_time):
signal = None
else:
self._current_kline_id_for_open = current_kline_time # 供 execute_trade 成功后记录
# 6. 有信号则执行交易
if signal:
trade_success = self.execute_trade(signal)
if trade_success:
logger.success(LOG_POSITION + f"交易执行完成: {signal[0]} | 当前持仓: {self.start}")
page_start = True
else:
logger.warning(f"交易执行失败或被阻止: {signal[0]}")
# 短暂等待后继续循环同一根K线遇到信号就操作
time.sleep(0.1)
if page_start:
self.page.close()
time.sleep(5)
except KeyboardInterrupt:
logger.info(LOG_SYSTEM + "用户中断,程序退出")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(5)
finally:
if live is not None:
try:
live.stop()
except Exception:
pass
if USE_RICH_DASHBOARD:
logger.remove()
logger.add(sys.stderr, format="{message}")
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()