Files
lm_code/交易/bitmart-趋势策略交易.py
2026-01-21 11:32:48 +08:00

461 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import datetime
from tqdm import tqdm
from loguru import logger
from bit_tools import openBrowser
from DrissionPage import ChromiumPage
from DrissionPage import ChromiumOptions
from bitmart.api_contract import APIContract
from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self, bit_id):
self.page: ChromiumPage | None = None
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
self.bit_id = bit_id
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
error_msg = str(e)
# 检查是否是429限流错误
if "429" in error_msg or "too many requests" in error_msg.lower():
logger.warning(f"API限流等待60秒后重试: {e}")
time.sleep(60) # 等待60秒后重试
else:
logger.error(f"获取K线异常: {e}")
self.ding(msg="获取K线异常", error=True)
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
end_time = int(time.time())
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=1, # 1分钟
start_time=end_time - 3600 * 3, # 取最近10小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][-1]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
# 获取当前持仓方向
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
self.open_avg_price = None
self.current_amount = None
self.position_cross = None
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
return False
# 设置杠杆和全仓
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def openBrowser(self):
"""打开 TGE 对应浏览器实例"""
try:
bit_port = openBrowser(id=self.bit_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
return True
except:
return False
def take_over_browser(self):
"""接管浏览器"""
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def close_extra_tabs(self):
"""关闭多余 tab"""
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except:
return False
def click_safe(self, xpath, sleep=0.5):
"""安全点击"""
try:
ele = self.page.ele(xpath)
if not ele:
return False
ele.scroll.to_see(center=True)
time.sleep(sleep)
ele.click(by_js=True)
return True
except:
return False
def 平仓(self):
self.click_safe('x://span[normalize-space(text()) ="市价"]')
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
"""
marketPriceLongOrder 市价最多或者做空1是做多-1是做空
limitPriceShortOrder 限价最多或者做空
"""
if marketPriceLongOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif marketPriceLongOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.page.ele('x://*[@id="size_0"]').input(size)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
if limitPriceShortOrder == -1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
elif limitPriceShortOrder == 1:
self.click_safe('x://button[normalize-space(text()) ="限价"]')
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
time.sleep(1)
self.page.ele('x://*[@id="size_0"]').input(1)
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
def ding(self, msg, error=False):
"""统一消息格式"""
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
logger.error(msg)
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
logger.info(msg)
send_dingtalk_message(f"{prefix}{msg}")
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp)
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定(优化版):
只看两种信号,严格按照收盘价与开盘价的比较:
1. 阳包阴(涨包跌,前跌后涨)-> 做多:
- 前一根是跌阴线close < open
- 后一根是涨阳线close > open
- 且:涨的收盘价 > 跌的开盘价curr['close'] > prev['open']
2. 阴包阳(跌包涨,前涨后跌)-> 做空:
- 前一根是涨阳线close > open
- 后一根是跌阴线close < open
- 且:跌的收盘价 < 涨的开盘价curr['close'] < prev['open']
"""
p_open = float(prev['open'])
c_close = float(curr['close'])
# 阳包阴(涨包跌,前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价
if self.is_bearish(prev) and self.is_bullish(curr) and c_close > p_open:
return "long", "bear_bull_engulf"
# 阴包阳(跌包涨,前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价
if self.is_bullish(prev) and self.is_bearish(curr) and c_close < p_open:
return "short", "bull_bear_engulf"
return None, None
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
return
# 1. 打开浏览器
if not self.openBrowser():
self.ding("打开 TGE 失败!", error=True)
return
logger.info("TGE 端口获取成功")
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
self.click_safe('x://button[normalize-space(text()) ="市价"]')
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
self.time_start = None # 时间状态 避免同一个时段,发生太多消息
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if self.time_start == self.get_now_time():
time.sleep(5)
continue
new_price_datas = self.get_klines()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
time.sleep(5) # 等待一段时间后重试
continue # 跳过本次循环避免对None进行排序操作
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() != self.kline_3["id"]:
continue
self.time_start = self.get_now_time()
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.ding(msg="获取仓位信息失败!!!", error=True)
continue
if self.start == 1:
if self.is_bearish(self.kline_1) and self.is_bearish(self.kline_2):
self.平仓()
elif self.start == -1:
if self.is_bullish(self.kline_1) and self.is_bullish(self.kline_2):
self.平仓()
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction == "long":
if self.start == -1:
self.平仓()
self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent)
elif self.start == 0:
self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent)
if self.direction == "short":
if self.start == 1:
self.平仓()
self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent)
elif self.start == 0:
self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent)
# ===================================================================================================
# 发送持仓信息消息
msg = None
current_price = float(self.kline_3["close"])
balance = self.get_available_balance()
self.balance = balance if balance is not None else 0.0
if self.start:
# 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值
# 1. 确保所有关键数据为 float 类型BitMart API 常返回字符串)
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
position_cross = float(self.position_cross) if hasattr(self, 'position_cross') and self.position_cross else 0.0
# 2. 计算浮动盈亏USDT
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
elif self.start == -1: # 空头
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
else: # 无仓
unrealized_pnl = 0.0
# 3. 计算收益率(可选,更直观)
if self.start != 0 and open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
# 4. 格式化输出字符串
direction_str = "" if self.start == -1 else ("" if self.start == 1 else "")
pnl_str = f"{unrealized_pnl:+.2f} USDT"
# 6. 最终消息
msg = (
f"【BitMart {self.contract_symbol} 永续】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eht){float(current_amount) / 1000} eth\n"
f"持仓量(usdt){position_cross} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【BitMart {self.contract_symbol} 永续】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
# 7. 发送钉钉消息
self.ding(msg=msg)
self.pbar.reset() # 重置进度条
if __name__ == '__main__':
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()