Files
lm_code/bitmart/五分之一策略-开盘价版-3分钟交易.py
2026-02-03 10:28:54 +08:00

517 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 五分之一策略(开盘价版)— 3分钟K线交易
策略规则(与 1111 一致):
基于前一根有效 K 线(实体 ≥ 0.1
做多触发价 = 当前K线开盘价 + 实体/5收盘价向上 1/5 实体)
做空触发价 = 当前K线开盘价 - 实体/5收盘价向下 1/5 实体)
反手(若已有持仓):
持空反手做多:价格涨到 开仓价 + 前一根实体/5
持多反手做空:价格跌到 开仓价 - 前一根实体/5
同根K线内多空都触及时用开盘价与触发价距离判断先后。
"""
import random
import time
import datetime
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
try:
from 交易.tools import send_dingtalk_message
except ImportError:
send_dingtalk_message = None
ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk")
class BitmartOneFifthOpenStrategy:
"""五分之一策略(开盘价版):触发价 = 当前K线开盘价 ± 实体/53分钟K线"""
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=3, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100"
self.open_type = "cross"
self.risk_percent = 0.01
self.open_avg_price = None
self.current_amount = None
self.bit_id = bit_id
self.min_body_size = 0.1 # 前一根有效K线实体下限
self.kline_step = 3 # 3分钟K线
self.check_interval = 3
self.last_trigger_kline_id = None
self.last_trigger_direction = None
self.last_trade_kline_id = None
# ========================= 策略核心(开盘价版)=========================
def get_body_size(self, candle):
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx):
"""找到当前K线之前最近一根实体≥min_body_size的K线"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if self.get_body_size(prev) >= self.min_body_size:
return i, prev
return None, None
def get_trigger_levels_by_open(self, curr_open, prev_bar):
"""
做多触发价 = 当前K线开盘价 + 实体/5
做空触发价 = 当前K线开盘价 - 实体/5
实体来自前一根有效K线。
"""
body = self.get_body_size(prev_bar)
if body < 0.001:
return None, None
curr_open_f = float(curr_open)
long_trigger = curr_open_f + body / 5
short_trigger = curr_open_f - body / 5
return long_trigger, short_trigger
def check_realtime_trigger(self, kline_data):
"""
检查当前3分钟K线是否触发信号开盘价版
做多触发价 = 当前K线开盘价 + 前一根实体/5
做空触发价 = 当前K线开盘价 - 前一根实体/5
若同根多空都触发,用开盘价与触发价距离判断先后。
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr['id']
curr_open = float(curr['open'])
curr_high = float(curr['high'])
curr_low = float(curr['low'])
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_trigger_levels_by_open(curr_open, prev)
if long_trigger is None:
return None, None, None, None
long_triggered = curr_high >= long_trigger
short_triggered = curr_low <= short_trigger
both_triggered = long_triggered and short_triggered
direction = None
trigger_price = None
if both_triggered:
dist_to_long = abs(long_trigger - curr_open)
dist_to_short = abs(short_trigger - curr_open)
if dist_to_short <= dist_to_long:
direction = 'short'
trigger_price = short_trigger
else:
direction = 'long'
trigger_price = long_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
def check_reverse_trigger(self, kline_data):
"""
反手检测:持空反手做多 = 现价 >= 开仓价+前一根实体/5
持多反手做空 = 现价 <= 开仓价-前一根实体/5
使用当前价最新K线收盘或 get_current_price与 开仓价±实体/5 比较。
"""
if self.start == 0 or self.open_avg_price is None:
return None, None
if len(kline_data) < 2:
return None, None
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
if prev is None:
return None, None
body = self.get_body_size(prev)
if body < 0.001:
return None, None
open_price_f = float(self.open_avg_price)
reverse_long_price = open_price_f + body / 5 # 持空反手做多线
reverse_short_price = open_price_f - body / 5 # 持多反手做空线
curr = kline_data[-1]
curr_high = float(curr['high'])
curr_low = float(curr['low'])
if self.start == -1 and curr_high >= reverse_long_price:
return 'long', reverse_long_price
if self.start == 1 and curr_low <= reverse_short_price:
return 'short', reverse_short_price
return None, None
def is_bullish(self, c):
return float(c['close']) > float(c['open'])
# ========================= API =========================
def get_klines(self):
"""获取最近3分钟K线"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
if "429" in str(e) or "too many requests" in str(e).lower():
logger.warning(f"API限流等待60秒: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding("获取K线异常", error=True)
return None
def get_current_price(self):
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response.get('code') == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
try:
response = self.contractAPI.get_assets_detail()[0]
if response.get('code') == 1000:
data = response.get('data')
if isinstance(data, dict):
return float(data.get('available_balance', 0))
if isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response.get('code') == 1000:
positions = response.get('data') or []
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0].get('position_type') == 1 else -1
self.open_avg_price = positions[0].get('open_avg_price')
self.current_amount = positions[0].get('current_amount')
self.position_cross = positions[0].get("position_cross")
return True
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response.get('code') == 1000:
logger.success(f"全仓 {self.leverage}x 杠杆设置成功")
return True
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器 =========================
def open_browser(self):
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except Exception:
return False
def close_extra_tabs(self):
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
def click_safe(self, xpath, sleep=0.5):
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except Exception:
return False
def 平仓(self):
logger.info("执行平仓...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding("执行平仓")
def 开单(self, marketPriceLongOrder=0, size=None):
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str},金额: {size}")
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(f"执行{direction_str},金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
prefix = "❌五分之一开盘价版:" if error else "🔔五分之一开盘价版:"
full_msg = f"{prefix}{msg}"
if error:
logger.error(msg)
for _ in range(3):
ding_executor.submit(self._send_ding_safe, full_msg)
else:
logger.info(msg)
ding_executor.submit(self._send_ding_safe, full_msg)
def _send_ding_safe(self, msg):
try:
if send_dingtalk_message:
send_dingtalk_message(msg)
except Exception as e:
logger.warning(f"钉钉发送失败: {e}")
# ========================= 主循环 =========================
def action(self):
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
if not self.open_browser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info("五分之一策略开盘价版3分钟K线开始监测")
last_report_time = 0
report_interval = 300
while True:
for _ in range(5):
if self.open_browser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
kline_data = self.get_klines()
if not kline_data or len(kline_data) < 3:
logger.warning("K线数据不足等待重试...")
time.sleep(self.check_interval)
continue
curr = kline_data[-1]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
if not self.get_position_status():
logger.warning("获取仓位失败,使用缓存")
# 1) 反手检测:有持仓时先看是否触发反手
rev_dir, rev_price = self.check_reverse_trigger(kline_data)
if rev_dir:
curr_kline_id = curr['id']
if self.last_trade_kline_id != curr_kline_id:
prev_idx, prev_bar = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
if prev_bar is not None:
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
logger.info(f"反手信号: {rev_dir} 触发价={rev_price:.2f}")
if rev_dir == 'long':
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
else:
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
self.last_trade_kline_id = curr_kline_id
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = rev_dir
self.get_position_status()
time.sleep(self.check_interval)
continue
# 2) 常规开仓/反手:基于 当前K线开盘价 ± 实体/5
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
curr_kline_id = curr_kline['id']
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
logger.info(f"{'='*50}")
logger.info(f"🚨 检测到{direction}信号 触发价={trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}] {prev_type} 实体={prev_body:.2f}")
logger.info(f" 当前3分钟K线 O={curr_kline['open']:.2f} H={curr_kline['high']:.2f} L={curr_kline['low']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多 -1=空 0=无)")
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
executed = False
if direction == "long":
if self.start == -1:
logger.info("📈 平空反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0:
logger.info("📈 无仓位开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif direction == "short":
if self.start == 1:
logger.info("📉 平多反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0:
logger.info("📉 无仓位开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
self.last_trade_kline_id = curr_kline_id
self.get_position_status()
logger.info(f"{'='*50}")
else:
logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
if time.time() - last_report_time >= report_interval:
last_report_time = time.time()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(3)
if random.randint(1, 10) > 7:
self.page.close()
time.sleep(15)
if __name__ == '__main__':
try:
BitmartOneFifthOpenStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
finally:
ding_executor.shutdown(wait=True)