代码结构优化
This commit is contained in:
516
bitmart/五分之一策略-开盘价版-3分钟交易.py
Normal file
516
bitmart/五分之一策略-开盘价版-3分钟交易.py
Normal file
@@ -0,0 +1,516 @@
|
||||
"""
|
||||
BitMart 五分之一策略(开盘价版)— 3分钟K线交易
|
||||
|
||||
策略规则(与 1111 一致):
|
||||
基于前一根有效 K 线(实体 ≥ 0.1):
|
||||
做多触发价 = 当前K线开盘价 + 实体/5(收盘价向上 1/5 实体)
|
||||
做空触发价 = 当前K线开盘价 - 实体/5(收盘价向下 1/5 实体)
|
||||
|
||||
反手(若已有持仓):
|
||||
持空反手做多:价格涨到 开仓价 + 前一根实体/5
|
||||
持多反手做空:价格跌到 开仓价 - 前一根实体/5
|
||||
|
||||
同根K线内多空都触及时,用开盘价与触发价距离判断先后。
|
||||
"""
|
||||
import random
|
||||
import time
|
||||
import datetime
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from tqdm import tqdm
|
||||
from loguru import logger
|
||||
from bit_tools import openBrowser
|
||||
from DrissionPage import ChromiumPage
|
||||
from DrissionPage import ChromiumOptions
|
||||
|
||||
from bitmart.api_contract import APIContract
|
||||
|
||||
try:
|
||||
from 交易.tools import send_dingtalk_message
|
||||
except ImportError:
|
||||
send_dingtalk_message = None
|
||||
|
||||
ding_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="dingtalk")
|
||||
|
||||
|
||||
class BitmartOneFifthOpenStrategy:
|
||||
"""五分之一策略(开盘价版):触发价 = 当前K线开盘价 ± 实体/5,3分钟K线"""
|
||||
|
||||
def __init__(self, bit_id):
|
||||
self.page: ChromiumPage | None = None
|
||||
|
||||
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
|
||||
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
|
||||
self.memo = "合约交易"
|
||||
|
||||
self.contract_symbol = "ETHUSDT"
|
||||
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
|
||||
|
||||
self.start = 0 # 持仓: -1 空, 0 无, 1 多
|
||||
self.direction = None
|
||||
self.pbar = tqdm(total=3, desc="等待K线", ncols=80)
|
||||
self.last_kline_time = None
|
||||
|
||||
self.leverage = "100"
|
||||
self.open_type = "cross"
|
||||
self.risk_percent = 0.01
|
||||
|
||||
self.open_avg_price = None
|
||||
self.current_amount = None
|
||||
self.bit_id = bit_id
|
||||
|
||||
self.min_body_size = 0.1 # 前一根有效K线实体下限
|
||||
self.kline_step = 3 # 3分钟K线
|
||||
self.check_interval = 3
|
||||
self.last_trigger_kline_id = None
|
||||
self.last_trigger_direction = None
|
||||
self.last_trade_kline_id = None
|
||||
|
||||
# ========================= 策略核心(开盘价版)=========================
|
||||
|
||||
def get_body_size(self, candle):
|
||||
return abs(float(candle['open']) - float(candle['close']))
|
||||
|
||||
def find_valid_prev_bar(self, all_data, current_idx):
|
||||
"""找到当前K线之前最近一根实体≥min_body_size的K线"""
|
||||
if current_idx <= 0:
|
||||
return None, None
|
||||
for i in range(current_idx - 1, -1, -1):
|
||||
prev = all_data[i]
|
||||
if self.get_body_size(prev) >= self.min_body_size:
|
||||
return i, prev
|
||||
return None, None
|
||||
|
||||
def get_trigger_levels_by_open(self, curr_open, prev_bar):
|
||||
"""
|
||||
做多触发价 = 当前K线开盘价 + 实体/5
|
||||
做空触发价 = 当前K线开盘价 - 实体/5
|
||||
实体来自前一根有效K线。
|
||||
"""
|
||||
body = self.get_body_size(prev_bar)
|
||||
if body < 0.001:
|
||||
return None, None
|
||||
curr_open_f = float(curr_open)
|
||||
long_trigger = curr_open_f + body / 5
|
||||
short_trigger = curr_open_f - body / 5
|
||||
return long_trigger, short_trigger
|
||||
|
||||
def check_realtime_trigger(self, kline_data):
|
||||
"""
|
||||
检查当前3分钟K线是否触发信号(开盘价版)。
|
||||
做多触发价 = 当前K线开盘价 + 前一根实体/5
|
||||
做空触发价 = 当前K线开盘价 - 前一根实体/5
|
||||
若同根多空都触发,用开盘价与触发价距离判断先后。
|
||||
返回:(方向, 触发价格, 有效前一根K线, 当前K线) 或 (None, None, None, None)
|
||||
"""
|
||||
if len(kline_data) < 2:
|
||||
return None, None, None, None
|
||||
|
||||
curr = kline_data[-1]
|
||||
curr_kline_id = curr['id']
|
||||
curr_open = float(curr['open'])
|
||||
curr_high = float(curr['high'])
|
||||
curr_low = float(curr['low'])
|
||||
|
||||
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
|
||||
if prev is None:
|
||||
return None, None, None, None
|
||||
|
||||
long_trigger, short_trigger = self.get_trigger_levels_by_open(curr_open, prev)
|
||||
if long_trigger is None:
|
||||
return None, None, None, None
|
||||
|
||||
long_triggered = curr_high >= long_trigger
|
||||
short_triggered = curr_low <= short_trigger
|
||||
both_triggered = long_triggered and short_triggered
|
||||
|
||||
direction = None
|
||||
trigger_price = None
|
||||
|
||||
if both_triggered:
|
||||
dist_to_long = abs(long_trigger - curr_open)
|
||||
dist_to_short = abs(short_trigger - curr_open)
|
||||
if dist_to_short <= dist_to_long:
|
||||
direction = 'short'
|
||||
trigger_price = short_trigger
|
||||
else:
|
||||
direction = 'long'
|
||||
trigger_price = long_trigger
|
||||
elif short_triggered:
|
||||
direction = 'short'
|
||||
trigger_price = short_trigger
|
||||
elif long_triggered:
|
||||
direction = 'long'
|
||||
trigger_price = long_trigger
|
||||
|
||||
if direction is None:
|
||||
return None, None, None, None
|
||||
|
||||
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
|
||||
return None, None, None, None
|
||||
|
||||
return direction, trigger_price, prev, curr
|
||||
|
||||
def check_reverse_trigger(self, kline_data):
|
||||
"""
|
||||
反手检测:持空反手做多 = 现价 >= 开仓价+前一根实体/5
|
||||
持多反手做空 = 现价 <= 开仓价-前一根实体/5
|
||||
使用当前价(最新K线收盘或 get_current_price)与 开仓价±实体/5 比较。
|
||||
"""
|
||||
if self.start == 0 or self.open_avg_price is None:
|
||||
return None, None
|
||||
if len(kline_data) < 2:
|
||||
return None, None
|
||||
|
||||
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
|
||||
if prev is None:
|
||||
return None, None
|
||||
|
||||
body = self.get_body_size(prev)
|
||||
if body < 0.001:
|
||||
return None, None
|
||||
|
||||
open_price_f = float(self.open_avg_price)
|
||||
reverse_long_price = open_price_f + body / 5 # 持空反手做多线
|
||||
reverse_short_price = open_price_f - body / 5 # 持多反手做空线
|
||||
|
||||
curr = kline_data[-1]
|
||||
curr_high = float(curr['high'])
|
||||
curr_low = float(curr['low'])
|
||||
|
||||
if self.start == -1 and curr_high >= reverse_long_price:
|
||||
return 'long', reverse_long_price
|
||||
if self.start == 1 and curr_low <= reverse_short_price:
|
||||
return 'short', reverse_short_price
|
||||
return None, None
|
||||
|
||||
def is_bullish(self, c):
|
||||
return float(c['close']) > float(c['open'])
|
||||
|
||||
# ========================= API =========================
|
||||
|
||||
def get_klines(self):
|
||||
"""获取最近3分钟K线"""
|
||||
try:
|
||||
end_time = int(time.time())
|
||||
response = self.contractAPI.get_kline(
|
||||
contract_symbol=self.contract_symbol,
|
||||
step=self.kline_step,
|
||||
start_time=end_time - 3600 * 3,
|
||||
end_time=end_time
|
||||
)[0]["data"]
|
||||
formatted = []
|
||||
for k in response:
|
||||
formatted.append({
|
||||
'id': int(k["timestamp"]),
|
||||
'open': float(k["open_price"]),
|
||||
'high': float(k["high_price"]),
|
||||
'low': float(k["low_price"]),
|
||||
'close': float(k["close_price"])
|
||||
})
|
||||
formatted.sort(key=lambda x: x['id'])
|
||||
return formatted
|
||||
except Exception as e:
|
||||
if "429" in str(e) or "too many requests" in str(e).lower():
|
||||
logger.warning(f"API限流,等待60秒: {e}")
|
||||
time.sleep(60)
|
||||
else:
|
||||
logger.error(f"获取K线异常: {e}")
|
||||
self.ding("获取K线异常", error=True)
|
||||
return None
|
||||
|
||||
def get_current_price(self):
|
||||
try:
|
||||
end_time = int(time.time())
|
||||
response = self.contractAPI.get_kline(
|
||||
contract_symbol=self.contract_symbol,
|
||||
step=1,
|
||||
start_time=end_time - 3600 * 3,
|
||||
end_time=end_time
|
||||
)[0]
|
||||
if response.get('code') == 1000:
|
||||
return float(response['data'][-1]["close_price"])
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"获取价格异常: {e}")
|
||||
return None
|
||||
|
||||
def get_available_balance(self):
|
||||
try:
|
||||
response = self.contractAPI.get_assets_detail()[0]
|
||||
if response.get('code') == 1000:
|
||||
data = response.get('data')
|
||||
if isinstance(data, dict):
|
||||
return float(data.get('available_balance', 0))
|
||||
if isinstance(data, list):
|
||||
for asset in data:
|
||||
if asset.get('currency') == 'USDT':
|
||||
return float(asset.get('available_balance', 0))
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"余额查询异常: {e}")
|
||||
return None
|
||||
|
||||
def get_position_status(self):
|
||||
try:
|
||||
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
|
||||
if response.get('code') == 1000:
|
||||
positions = response.get('data') or []
|
||||
if not positions:
|
||||
self.start = 0
|
||||
self.open_avg_price = None
|
||||
self.current_amount = None
|
||||
self.position_cross = None
|
||||
return True
|
||||
self.start = 1 if positions[0].get('position_type') == 1 else -1
|
||||
self.open_avg_price = positions[0].get('open_avg_price')
|
||||
self.current_amount = positions[0].get('current_amount')
|
||||
self.position_cross = positions[0].get("position_cross")
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"持仓查询异常: {e}")
|
||||
return False
|
||||
|
||||
def set_leverage(self):
|
||||
try:
|
||||
response = self.contractAPI.post_submit_leverage(
|
||||
contract_symbol=self.contract_symbol,
|
||||
leverage=self.leverage,
|
||||
open_type=self.open_type
|
||||
)[0]
|
||||
if response.get('code') == 1000:
|
||||
logger.success(f"全仓 {self.leverage}x 杠杆设置成功")
|
||||
return True
|
||||
logger.error(f"杠杆设置失败: {response}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"设置杠杆异常: {e}")
|
||||
return False
|
||||
|
||||
# ========================= 浏览器 =========================
|
||||
|
||||
def open_browser(self):
|
||||
try:
|
||||
bit_port = openBrowser(id=self.bit_id)
|
||||
co = ChromiumOptions()
|
||||
co.set_local_port(port=bit_port)
|
||||
self.page = ChromiumPage(addr_or_opts=co)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def close_extra_tabs(self):
|
||||
try:
|
||||
for idx, tab in enumerate(self.page.get_tabs()):
|
||||
if idx > 0:
|
||||
tab.close()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def click_safe(self, xpath, sleep=0.5):
|
||||
try:
|
||||
ele = self.page.ele(xpath)
|
||||
if not ele:
|
||||
return False
|
||||
ele.scroll.to_see(center=True)
|
||||
time.sleep(sleep)
|
||||
ele.click(by_js=True)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def 平仓(self):
|
||||
logger.info("执行平仓...")
|
||||
self.click_safe('x://span[normalize-space(text()) ="市价"]')
|
||||
time.sleep(0.5)
|
||||
self.ding("执行平仓")
|
||||
|
||||
def 开单(self, marketPriceLongOrder=0, size=None):
|
||||
if size is None or size <= 0:
|
||||
logger.warning("开单金额无效")
|
||||
return False
|
||||
direction_str = "做多" if marketPriceLongOrder == 1 else "做空"
|
||||
logger.info(f"执行{direction_str},金额: {size}")
|
||||
try:
|
||||
if marketPriceLongOrder == -1:
|
||||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||||
self.page.ele('x://*[@id="size_0"]').input(size)
|
||||
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
|
||||
elif marketPriceLongOrder == 1:
|
||||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||||
self.page.ele('x://*[@id="size_0"]').input(size)
|
||||
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
|
||||
self.ding(f"执行{direction_str},金额: {size}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"开单异常: {e}")
|
||||
return False
|
||||
|
||||
def ding(self, msg, error=False):
|
||||
prefix = "❌五分之一开盘价版:" if error else "🔔五分之一开盘价版:"
|
||||
full_msg = f"{prefix}{msg}"
|
||||
if error:
|
||||
logger.error(msg)
|
||||
for _ in range(3):
|
||||
ding_executor.submit(self._send_ding_safe, full_msg)
|
||||
else:
|
||||
logger.info(msg)
|
||||
ding_executor.submit(self._send_ding_safe, full_msg)
|
||||
|
||||
def _send_ding_safe(self, msg):
|
||||
try:
|
||||
if send_dingtalk_message:
|
||||
send_dingtalk_message(msg)
|
||||
except Exception as e:
|
||||
logger.warning(f"钉钉发送失败: {e}")
|
||||
|
||||
# ========================= 主循环 =========================
|
||||
|
||||
def action(self):
|
||||
if not self.set_leverage():
|
||||
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
|
||||
if not self.open_browser():
|
||||
self.ding("打开浏览器失败!", error=True)
|
||||
return
|
||||
logger.info("浏览器打开成功")
|
||||
self.close_extra_tabs()
|
||||
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
|
||||
time.sleep(2)
|
||||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||||
logger.info("五分之一策略(开盘价版,3分钟K线)开始监测")
|
||||
|
||||
last_report_time = 0
|
||||
report_interval = 300
|
||||
|
||||
while True:
|
||||
for _ in range(5):
|
||||
if self.open_browser():
|
||||
break
|
||||
time.sleep(5)
|
||||
else:
|
||||
self.ding("打开浏览器失败!", error=True)
|
||||
return
|
||||
|
||||
self.close_extra_tabs()
|
||||
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
|
||||
time.sleep(2)
|
||||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||||
|
||||
try:
|
||||
kline_data = self.get_klines()
|
||||
if not kline_data or len(kline_data) < 3:
|
||||
logger.warning("K线数据不足,等待重试...")
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
curr = kline_data[-1]
|
||||
curr_time_str = datetime.datetime.fromtimestamp(curr['id']).strftime('%H:%M:%S')
|
||||
|
||||
if not self.get_position_status():
|
||||
logger.warning("获取仓位失败,使用缓存")
|
||||
|
||||
# 1) 反手检测:有持仓时先看是否触发反手
|
||||
rev_dir, rev_price = self.check_reverse_trigger(kline_data)
|
||||
if rev_dir:
|
||||
curr_kline_id = curr['id']
|
||||
if self.last_trade_kline_id != curr_kline_id:
|
||||
prev_idx, prev_bar = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
|
||||
if prev_bar is not None:
|
||||
balance = self.get_available_balance()
|
||||
trade_size = (balance or 0) * self.risk_percent
|
||||
logger.info(f"反手信号: {rev_dir} 触发价={rev_price:.2f}")
|
||||
if rev_dir == 'long':
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=1, size=trade_size)
|
||||
else:
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=-1, size=trade_size)
|
||||
self.last_trade_kline_id = curr_kline_id
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = rev_dir
|
||||
self.get_position_status()
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
# 2) 常规开仓/反手:基于 当前K线开盘价 ± 实体/5
|
||||
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
|
||||
|
||||
if direction:
|
||||
curr_kline_id = curr_kline['id']
|
||||
if self.last_trade_kline_id == curr_kline_id:
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = direction
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
prev_time = datetime.datetime.fromtimestamp(valid_prev['id']).strftime('%H:%M')
|
||||
prev_type = "阳线" if self.is_bullish(valid_prev) else "阴线"
|
||||
prev_body = self.get_body_size(valid_prev)
|
||||
|
||||
logger.info(f"{'='*50}")
|
||||
logger.info(f"🚨 检测到{direction}信号 触发价={trigger_price:.2f}")
|
||||
logger.info(f" 有效前一根[{prev_time}] {prev_type} 实体={prev_body:.2f}")
|
||||
logger.info(f" 当前3分钟K线 O={curr_kline['open']:.2f} H={curr_kline['high']:.2f} L={curr_kline['low']:.2f}")
|
||||
logger.info(f" 当前持仓: {self.start} (1=多 -1=空 0=无)")
|
||||
|
||||
balance = self.get_available_balance()
|
||||
trade_size = (balance or 0) * self.risk_percent
|
||||
executed = False
|
||||
|
||||
if direction == "long":
|
||||
if self.start == -1:
|
||||
logger.info("📈 平空反手开多")
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=1, size=trade_size)
|
||||
executed = True
|
||||
elif self.start == 0:
|
||||
logger.info("📈 无仓位开多")
|
||||
self.开单(marketPriceLongOrder=1, size=trade_size)
|
||||
executed = True
|
||||
elif direction == "short":
|
||||
if self.start == 1:
|
||||
logger.info("📉 平多反手开空")
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=-1, size=trade_size)
|
||||
executed = True
|
||||
elif self.start == 0:
|
||||
logger.info("📉 无仓位开空")
|
||||
self.开单(marketPriceLongOrder=-1, size=trade_size)
|
||||
executed = True
|
||||
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = direction
|
||||
if executed:
|
||||
self.last_trade_kline_id = curr_kline_id
|
||||
self.get_position_status()
|
||||
logger.info(f"{'='*50}")
|
||||
else:
|
||||
logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
|
||||
|
||||
if time.time() - last_report_time >= report_interval:
|
||||
last_report_time = time.time()
|
||||
time.sleep(self.check_interval)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"主循环异常: {e}")
|
||||
time.sleep(self.check_interval)
|
||||
time.sleep(3)
|
||||
|
||||
if random.randint(1, 10) > 7:
|
||||
self.page.close()
|
||||
time.sleep(15)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
BitmartOneFifthOpenStrategy(bit_id="f2320f57e24c45529a009e1541e25961").action()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("程序被用户中断")
|
||||
finally:
|
||||
ding_executor.shutdown(wait=True)
|
||||
Reference in New Issue
Block a user