Files
lm_code/me/bitmart-三分之一策略交易1111111.py
2026-02-10 09:54:32 +08:00

550 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 五分之一策略交易3分钟精准版
使用3分钟K线周期计算触发价格实时监测同根K线内多空都触及时用开盘价距离判断先后
策略规则(与 bitmart/回测数据-五分之一策略-3分钟精准版.py 完全一致):
1. 触发价格计算基于有效的前一根3分钟K线实体>=0.1
- 做多触发价格 = 收盘价 + 实体/5从收盘价往上涨1/5
- 做空触发价格 = 收盘价 - 实体/5从收盘价往下跌1/5
2. 信号触发条件:
- 当前3分钟K线最高价 >= 做多触发价格 → 做多信号
- 当前3分钟K线最低价 <= 做空触发价格 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根3分钟K线内只交易一次
4. 同根触发判断无需1分钟K线
- 当一根3分钟K线同时触及做多和做空价格时
- 使用该3分钟K线开盘价与触发价的距离判断先后
"""
import random
import time
import datetime
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
# 创建全局线程池用于异步发送钉钉消息
ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk")
class BitmartOneFifthStrategy:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "6104088c65a68d7e53df5d9395b67d78e555293a"
self.secret_key = "a8b14312330d8e6b9b09acfd972b34e32022fdfa9f2b06f0a0a31723b873fd01"
self.memo = "me"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=3, desc="等待K线", ncols=80) # 3分钟周期
self.last_kline_time = None
self.leverage = "40"
self.open_type = "cross"
self.risk_percent = 0.1
self.open_avg_price = None
self.current_amount = None
self.bit_id = bit_id
# 五分之一策略参数(与回测一致)
self.min_body_size = 0.1
self.kline_step = 3 # 3分钟K线
self.kline_count = 20
self.check_interval = 3
self.last_trigger_kline_id = None
self.last_trigger_direction = None
self.last_trade_kline_id = None
# ========================= 五分之一策略核心 =========================
def is_bullish(self, c):
return float(c['close']) > float(c['open'])
def is_bearish(self, c):
return float(c['close']) < float(c['open'])
def get_body_size(self, candle):
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx, min_body_size=0.1):
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if self.get_body_size(prev) >= min_body_size:
return i, prev
return None, None
def get_one_fifth_levels(self, prev):
"""
计算前一根K线实体的 1/5 双向触发价格
做多触发 = 收盘价 + 实体/5做空触发 = 收盘价 - 实体/5
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = p_close + body / 5
short_trigger = p_close - body / 5
return long_trigger, short_trigger
def check_realtime_trigger(self, kline_data):
"""
检查当前3分钟K线是否触发交易信号与回测逻辑完全一致
若同时触发多空则用开盘价距离判断先后顺序不请求1分钟K线
参数:
kline_data: K线数据列表
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr['id']
valid_prev_idx, prev = self.find_valid_prev_bar(
kline_data, len(kline_data) - 1, self.min_body_size
)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_one_fifth_levels(prev)
if long_trigger is None:
return None, None, None, None
c_high = float(curr['high'])
c_low = float(curr['low'])
long_triggered = c_high >= long_trigger
short_triggered = c_low <= short_trigger
both_triggered = long_triggered and short_triggered
direction = None
trigger_price = None
# 如果同时触发多空用开盘价距离判断先后顺序避免请求1分钟K线
if both_triggered:
c_open = float(curr['open'])
dist_to_long = abs(long_trigger - c_open)
dist_to_short = abs(short_trigger - c_open)
if dist_to_short <= dist_to_long:
direction = 'short'
trigger_price = short_trigger
else:
direction = 'long'
trigger_price = long_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
# 避免同一根K线内重复触发相同信号
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ========================= BitMart API =========================
def get_klines(self):
"""获取最近3分钟K线"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
error_msg = str(e)
if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding(msg="获取K线异常", error=True)
return None
def get_current_price(self):
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
if isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器 =========================
def openBrowser(self):
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except Exception:
return False
def close_extra_tabs_in_browser(self):
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
def click_safe(self, xpath, sleep=0.5):
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except Exception:
return False
def 平仓(self):
logger.info("执行平仓操作...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding(msg="执行平仓操作")
def 开单(self, marketPriceLongOrder=0, size=None):
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str}操作,金额: {size}")
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(msg=f"执行{direction_str}操作,金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
"""
异步发送钉钉消息不阻塞主程序的K线获取和交易操作
"""
prefix = "❌五分之一策略:" if error else "🔔五分之一策略:"
full_msg = f"{prefix}{msg}"
if error:
logger.error(msg)
# 异步发送多条错误消息
for i in range(10):
ding_executor.submit(self._send_ding_safe, full_msg)
else:
logger.info(msg)
# 异步发送单条消息
ding_executor.submit(self._send_ding_safe, full_msg)
def _send_ding_safe(self, msg):
"""
安全发送钉钉消息,捕获异常防止线程崩溃
"""
try:
send_dingtalk_message(msg)
except Exception as e:
logger.warning(f"钉钉消息发送失败: {e}")
# ========================= 主循环 =========================
def action(self):
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
if not self.openBrowser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
else:
logger.info('关闭多余标签页失败')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info(f"五分之一策略3分钟K线开始实时监测检测间隔: {self.check_interval}")
last_report_time = 0
report_interval = 300
while True:
for i in range(5):
if self.openBrowser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
kline_data = self.get_klines()
if not kline_data or len(kline_data) < 3:
logger.warning("K线数据不足等待重试...")
time.sleep(self.check_interval)
continue
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
# 获取持仓状态
if not self.get_position_status():
logger.warning("获取仓位信息失败,使用缓存的持仓状态")
# 检测五分之一策略信号(与回测逻辑完全一致)
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
curr_kline_id = curr_kline['id']
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
logger.info(f"{'=' * 50}")
logger.info(f"🚨 检测到{direction}信号!触发价格: {trigger_price:.2f}")
logger.info(
f" 有效前一根[{prev_time}]: {prev_type} 实体={prev_body:.2f} O={valid_prev['open']:.2f} C={valid_prev['close']:.2f}")
logger.info(
f" 当前3分钟K线: H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多, -1=空, 0=无)")
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
executed = False
# 执行交易逻辑(与回测一致)
if direction == "long":
if self.start == -1:
# 持空仓遇到做多信号 -> 平空并反手开多
logger.info("📈 平空仓,反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0:
# 无仓位遇到做多信号 -> 开多
logger.info("📈 无仓位,开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 1:
# 持多仓遇到做多信号 -> 不操作
logger.debug("已持有多仓,忽略做多信号")
elif direction == "short":
if self.start == 1:
# 持多仓遇到做空信号 -> 平多并反手开空
logger.info("📉 平多仓,反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0:
# 无仓位遇到做空信号 -> 开空
logger.info("📉 无仓位,开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == -1:
# 持空仓遇到做空信号 -> 不操作
logger.debug("已持有空仓,忽略做空信号")
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
self.last_trade_kline_id = curr_kline_id
self.get_position_status()
self._send_position_message(curr_kline)
last_report_time = time.time()
logger.info(f"{'=' * 50}")
else:
logger.debug(
f"[{curr_time_str}] 现价: {curr['close']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
if time.time() - last_report_time >= report_interval:
if self.get_position_status():
self._send_position_message(kline_data[-1])
last_report_time = time.time()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(3)
if random.randint(1,10)>7:
self.page.close()
time.sleep(15)
def _send_position_message(self, latest_kline):
current_price = float(latest_kline["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start != 0:
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if getattr(self, 'position_cross', None) else 0.0
if self.start == 1:
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
else:
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
if open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
direction_str = "" if self.start == -1 else ""
pnl_str = f"{unrealized_pnl:+.2f} USDT"
msg = (
f"【五分之一策略 {self.contract_symbol} 3分钟】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eth){float(current_amount) / 1000} eth\n"
f"持仓量(usdt){position_cross} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【五分之一策略 {self.contract_symbol} 3分钟】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
self.ding(msg=msg)
if __name__ == '__main__':
try:
BitmartOneFifthStrategy(bit_id="62f9107d0c674925972084e282df55b3").action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
finally:
# 关闭线程池,等待所有钉钉消息发送完成
logger.info("正在关闭钉钉消息线程池...")
ding_executor.shutdown(wait=True)
logger.info("线程池已关闭")