Files
lm_code/bitmart/三分之一策略-5分钟交易.py
2026-02-03 23:43:07 +08:00

568 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
BitMart 三分之一策略 — 5分钟K线交易
策略规则:
1. 触发价格基于前一根有效K线实体>=0.1使用当前K线开盘价
- 做多触发价 = 当前K线开盘价 + 实体/3
- 做空触发价 = 当前K线开盘价 - 实体/3
2. 信号触发(无持仓时):
- 当前5分钟K线最高价 >= 做多触发价 → 做多信号
- 当前5分钟K线最低价 <= 做空触发价 → 做空信号
3. 执行逻辑:
- 做多时遇到做空信号 -> 平多并反手开空
- 做空时遇到做多信号 -> 平空并反手开多
- 同一根5分钟K线内只交易一次
4. 同根K线多空都触及时用开盘价与触发价距离判断先后。
5. 反手信号(有持仓时检测):
- 反手做空当前持多上一根K线上阴线涨幅 > 0.01%当前K线跌到上一根K线的实体下边 -> 平多反手开空
- 反手做多当前持空上一根K线下阴线跌幅 > 0.01%当前K线涨到上一根K线的实体上边 -> 平空反手开多
(上阴线 = high - max(open,close),上阴线涨幅 = (上阴线/开盘价)*100下阴线同理
(实体上边 = max(open,close),实体下边 = min(open,close)
"""
import random
import time
import datetime
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
try:
from 交易.tools import send_dingtalk_message
except ImportError:
send_dingtalk_message = None
ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk")
class BitmartOneThirdStrategy:
"""三分之一策略:触发价 = 当前K线开盘价 ± 实体/35分钟K线"""
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=5, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100"
self.open_type = "cross"
self.risk_percent = 0.01
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
self.bit_id = bit_id
self.min_body_size = 0.1 # 前一根有效K线实体下限
self.kline_step = 5 # 5分钟K线
self.check_interval = 3
self.last_trigger_kline_id = None
self.last_trigger_direction = None
self.min_shadow_pct = 0.01 # 反手信号:上/下影线幅度需 > 0.01%
# ========================= 三分之一策略核心 =========================
def is_bullish(self, c):
return float(c['close']) > float(c['open'])
def get_body_size(self, candle):
return abs(float(candle['open']) - float(candle['close']))
def find_valid_prev_bar(self, all_data, current_idx):
"""找到当前K线之前最近一根实体>=min_body_size的K线"""
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if self.get_body_size(prev) >= self.min_body_size:
return i, prev
return None, None
def get_one_third_levels(self, prev, curr):
"""
做多触发价 = 当前K线开盘价 + 实体/3
做空触发价 = 当前K线开盘价 - 实体/3
实体来自前一根有效K线
"""
p_open = float(prev['open'])
p_close = float(prev['close'])
curr_open = float(curr['open'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
long_trigger = curr_open + body / 3
short_trigger = curr_open - body / 3
return long_trigger, short_trigger
def get_upper_shadow(self, candle):
"""上影线 = high - max(open, close)"""
o, c, h = float(candle['open']), float(candle['close']), float(candle['high'])
return h - max(o, c)
def get_lower_shadow(self, candle):
"""下影线 = min(open, close) - low"""
o, c, l = float(candle['open']), float(candle['close']), float(candle['low'])
return min(o, c) - l
def upper_shadow_pct(self, candle):
"""上影线涨幅 = (上影线/开盘价)*100"""
o = float(candle['open'])
if o <= 0:
return 0.0
return self.get_upper_shadow(candle) / o * 100
def lower_shadow_pct(self, candle):
"""下影线跌幅 = (下影线/开盘价)*100"""
o = float(candle['open'])
if o <= 0:
return 0.0
return self.get_lower_shadow(candle) / o * 100
def get_body_upper_edge(self, candle):
"""实体上边 = max(open, close)"""
return max(float(candle['open']), float(candle['close']))
def get_body_lower_edge(self, candle):
"""实体下边 = min(open, close)"""
return min(float(candle['open']), float(candle['close']))
def check_reverse_signal(self, kline_data):
"""
反手信号(有持仓时检测):
- 反手做空当前持多上一根K线上阴线涨幅 > 0.01%当前K线跌到上一根K线的实体下边
- 反手做多当前持空上一根K线下阴线跌幅 > 0.01%当前K线涨到上一根K线的实体上边
使用紧邻的上一根K线kline_data[-2])。
返回:(方向 'long'|'short', 上一根K线) 或 (None, None)
"""
if len(kline_data) < 2:
return None, None
curr = kline_data[-1]
prev = kline_data[-2]
curr_high = float(curr['high'])
curr_low = float(curr['low'])
prev_body_upper = self.get_body_upper_edge(prev) # 实体上边
prev_body_lower = self.get_body_lower_edge(prev) # 实体下边
# 持多反手做空:上一根上阴线涨幅>0.01%,当前跌到上一根实体下边
if self.start == 1:
if self.upper_shadow_pct(prev) > self.min_shadow_pct and curr_low <= prev_body_lower:
return 'short', prev
# 持空反手做多:上一根下阴线跌幅>0.01%,当前涨到上一根实体上边
if self.start == -1:
if self.lower_shadow_pct(prev) > self.min_shadow_pct and curr_high >= prev_body_upper:
return 'long', prev
return None, None
def check_realtime_trigger(self, kline_data):
"""
检查当前5分钟K线是否触发信号。
做多触发价 = 当前K线开盘价 + 实体/3做空 = 当前K线开盘价 - 实体/3。
同根多空都触发时用开盘价与触发价距离判断先后。
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
"""
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr['id']
curr_high = float(curr['high'])
curr_low = float(curr['low'])
curr_open = float(curr['open'])
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev, curr)
if long_trigger is None:
return None, None, None, None
long_triggered = curr_high >= long_trigger
short_triggered = curr_low <= short_trigger
both_triggered = long_triggered and short_triggered
direction = None
trigger_price = None
if both_triggered:
dist_to_long = abs(long_trigger - curr_open)
dist_to_short = abs(short_trigger - curr_open)
if dist_to_short <= dist_to_long:
direction = 'short'
trigger_price = short_trigger
else:
direction = 'long'
trigger_price = long_trigger
elif short_triggered:
direction = 'short'
trigger_price = short_trigger
elif long_triggered:
direction = 'long'
trigger_price = long_trigger
if direction is None:
return None, None, None, None
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ========================= API =========================
def get_klines(self):
"""获取最近5分钟K线"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=self.kline_step,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]["data"]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted
except Exception as e:
if "429" in str(e) or "too many requests" in str(e).lower():
logger.warning(f"API限流等待60秒: {e}")
time.sleep(60)
else:
logger.error(f"获取K线异常: {e}")
self.ding("获取K线异常", error=True)
return None
def get_current_price(self):
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1,
start_time=end_time - 3600 * 3,
end_time=end_time
)[0]
if response.get('code') == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
try:
response = self.contractAPI.get_assets_detail()[0]
if response.get('code') == 1000:
data = response.get('data')
if isinstance(data, dict):
return float(data.get('available_balance', 0))
if isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response.get('code') == 1000:
positions = response.get('data') or []
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0].get('position_type') == 1 else -1
self.open_avg_price = positions[0].get('open_avg_price')
self.current_amount = positions[0].get('current_amount')
self.position_cross = positions[0].get("position_cross")
return True
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
def set_leverage(self):
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response.get('code') == 1000:
logger.success(f"全仓 {self.leverage}x 杠杆设置成功")
return True
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
# ========================= 浏览器 =========================
def open_browser(self):
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except Exception:
return False
def close_extra_tabs(self):
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
def click_safe(self, xpath, sleep=0.5):
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except Exception:
return False
def 平仓(self):
logger.info("执行平仓...")
self.click_safe('x://span[normalize-space(text()) ="市价"]')
time.sleep(0.5)
self.ding("执行平仓")
def 开单(self, marketPriceLongOrder=0, size=None):
if size is None or size <= 0:
logger.warning("开单金额无效")
return False
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
logger.info(f"执行{direction_str},金额: {size}")
try:
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
self.ding(f"执行{direction_str},金额: {size}")
return True
except Exception as e:
logger.error(f"开单异常: {e}")
return False
def ding(self, msg, error=False):
prefix = "❌三分之一策略:" if error else "🔔三分之一策略:"
full_msg = f"{prefix}{msg}"
if error:
logger.error(msg)
for _ in range(3):
ding_executor.submit(self._send_ding_safe, full_msg)
else:
logger.info(msg)
ding_executor.submit(self._send_ding_safe, full_msg)
def _send_ding_safe(self, msg):
try:
if send_dingtalk_message:
send_dingtalk_message(msg)
except Exception as e:
logger.warning(f"钉钉发送失败: {e}")
# ========================= 主循环 =========================
def action(self):
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
if not self.open_browser():
self.ding("打开浏览器失败!", error=True)
return
logger.info("浏览器打开成功")
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
logger.info("三分之一策略5分钟K线开始监测")
last_report_time = 0
report_interval = 300
while True:
for _ in range(5):
if self.open_browser():
break
time.sleep(5)
else:
self.ding("打开浏览器失败!", error=True)
return
self.close_extra_tabs()
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
time.sleep(2)
self.click_safe('x://button[normalize-space(text()) ="市价"]')
try:
kline_data = self.get_klines()
if not kline_data or len(kline_data) < 2:
logger.warning("K线数据不足等待重试...")
time.sleep(self.check_interval)
continue
curr = kline_data[-1]
prev = kline_data[-2]
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
if not self.get_position_status():
logger.warning("获取仓位失败,使用缓存")
curr_kline_id = curr['id']
# 有持仓时优先检测反手信号
if self.start != 0:
rev_dir, rev_prev = self.check_reverse_signal(kline_data)
if rev_dir:
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
if rev_prev is not None:
up_pct = self.upper_shadow_pct(rev_prev)
low_pct = self.lower_shadow_pct(rev_prev)
prev_body_upper = self.get_body_upper_edge(rev_prev)
prev_body_lower = self.get_body_lower_edge(rev_prev)
logger.info(f"{'='*50}")
if rev_dir == 'long':
logger.info(f"🔄 反手信号({rev_dir})当前K线涨到上一根实体上边={prev_body_upper:.2f}")
else:
logger.info(f"🔄 反手信号({rev_dir})当前K线跌到上一根实体下边={prev_body_lower:.2f}")
logger.info(f" 上一根 上阴线涨幅={up_pct:.3f}% 下阴线跌幅={low_pct:.3f}%")
logger.info(f"{'='*50}")
if rev_dir == 'long':
logger.info("📈 平空反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
else:
logger.info("📉 平多反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = rev_dir
self.get_position_status()
time.sleep(self.check_interval)
continue
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
curr_kline_id = curr_kline['id']
# 信号与持仓同向则不操作
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
prev_body = self.get_body_size(valid_prev)
logger.info(f"{'='*50}")
logger.info(f"🚨 检测到{direction}信号 触发价={trigger_price:.2f}")
logger.info(f" 有效前一根[{prev_time}] {prev_type} 实体={prev_body:.2f}")
logger.info(f" 当前5分钟K线 O={curr_kline['open']:.2f} H={curr_kline['high']:.2f} L={curr_kline['low']:.2f} C={curr_kline['close']:.2f}")
logger.info(f" 当前持仓: {self.start} (1=多 -1=空 0=无)")
balance = self.get_available_balance()
trade_size = (balance or 0) * self.risk_percent
executed = False
if direction == "long":
if self.start == -1:
logger.info("📈 平空反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif self.start == 0:
logger.info("📈 无仓位开多")
self.开单(marketPriceLongOrder=1, size=trade_size)
executed = True
elif direction == "short":
if self.start == 1:
logger.info("📉 平多反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
elif self.start == 0:
logger.info("📉 无仓位开空")
self.开单(marketPriceLongOrder=-1, size=trade_size)
executed = True
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if executed:
self.get_position_status()
logger.info(f"{'='*50}")
else:
logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
if time.time() - last_report_time >= report_interval:
last_report_time = time.time()
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(self.check_interval)
time.sleep(3)
if random.randint(1, 10) > 7:
self.page.close()
time.sleep(15)
if __name__ == '__main__':
try:
BitmartOneThirdStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()
except KeyboardInterrupt:
logger.info("程序被用户中断")
finally:
ding_executor.shutdown(wait=True)