Files
lm_code/交易/bitmart_api交易.py
2025-12-17 18:10:48 +08:00

377 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import uuid
import datetime
from tqdm import tqdm
from loguru import logger
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待30分钟K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
@staticmethod
def is_bullish(c):
return float(c['close']) > float(c['open'])
@staticmethod
def is_bearish(c):
return float(c['close']) < float(c['open'])
def ding(self, msg, error=False):
"""统一消息格式"""
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
send_dingtalk_message(f"{prefix}{msg}")
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def check_signal(self, prev, curr):
"""简化英戈尔夫形态"""
if self.is_bullish(curr) and self.is_bearish(prev) and float(curr['close']) >= float(prev['open']):
return "long"
if self.is_bearish(curr) and self.is_bullish(prev) and float(curr['close']) <= float(prev['open']):
return "short"
return None
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted[-3:] # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(error=True, msg="获取K线异常")
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]
if response['code'] == 1000:
return float(response['data'][0]["close_price"])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
self.position_cross = positions[0]["position_cross"]
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
self.ding(error=True, msg="持仓查询异常")
return False
def calculate_size(self):
"""计算开仓张数使用可用余额的1%作为保证金"""
balance = self.get_available_balance()
self.balance = balance
if not balance or balance < 10:
logger.warning("余额不足,无法开仓")
return 0
price = self.get_current_price()
if not price:
price = 3000 # 保守估计避免size过大
leverage = int(self.leverage)
margin = balance * self.risk_percent # 使用1%余额
# ETHUSDT 1张 ≈ 0.001 ETH
size = int((margin * leverage) / (price * 0.001))
size = max(1, size)
logger.info(f"余额 {balance:.2f} USDT → 使用 {margin:.2f} USDT (1%) → 开仓 {size} 张 (价格≈{price})")
return size
def place_market_order(self, side: int, size: int):
if size <= 0:
return False
client_order_id = f"auto_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
response = self.contractAPI.post_submit_order(
contract_symbol=self.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type='market',
leverage=self.leverage,
open_type=self.open_type,
size=size
)[0]
if response['code'] == 1000:
logger.success(
f"下单成功: {'开多' if side in [1] else '开空' if side in [4] else '平多' if side in [3] else '平空'} {size}")
return True
else:
logger.error(f"下单失败: {response}")
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
return False
def execute_trade(self):
size = self.calculate_size()
if size == 0:
return
if self.direction == "long":
if self.start == 0:
self.place_market_order(1, size) # 开多
self.start = 1
self.ding(msg="开多成功!!!")
elif self.start == -1:
self.place_market_order(2, 999999) # 全平空
time.sleep(2)
self.place_market_order(1, size)
self.start = 1
self.ding(msg="平空反手开多成功!!!")
elif self.direction == "short":
if self.start == 0:
self.place_market_order(4, size) # 开空
self.start = -1
self.ding(msg="开空成功!!!")
elif self.start == 1:
self.place_market_order(3, 999999) # 全平多
time.sleep(2)
self.place_market_order(4, size)
self.start = -1
self.ding(msg="平多反手开空 成功!!!")
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
self.ding(error=True, msg="杠杆设置失败,程序继续运行但可能下单失败")
return
while True:
current_minute = datetime.datetime.now().minute
if current_minute < 30:
self.pbar.n = current_minute
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
# 每10秒检查一次等待新K线
klines = self.get_klines()
if not klines or len(klines) < 3:
time.sleep(10)
continue
kline_1, kline_2, kline_3 = klines
if self.last_kline_time == kline_3['id']:
time.sleep(10)
continue
self.last_kline_time = kline_3['id']
logger.success("获取到新30分钟K线")
if not self.get_position_status():
self.ding(error=True, msg="获取仓位信息失败!!!")
continue
# 止损两连反向K线平仓
try:
if self.start == 1 and self.is_bearish(kline_1) and self.is_bearish(kline_2):
logger.success("两连阴,止损平多")
self.ding(msg="两连阴,止损平多")
self.place_market_order(3, 999999)
self.start = 0
elif self.start == -1 and self.is_bullish(kline_1) and self.is_bullish(kline_2):
logger.success("两连阳,止损平空")
self.ding(msg="两连阳,止损平空")
self.place_market_order(2, 999999)
self.start = 0
except Exception as e:
logger.error(f"止损执行异常: {e}")
self.ding(error=True, msg="止损执行异常")
self.direction = self.check_signal(kline_1, kline_2)
if self.direction:
logger.success(f"检测到{self.direction}信号准备开仓用余额1%")
self.execute_trade()
self.pbar.reset()
# ===================================================================================================
msg = None
current_price = float(kline_3["close"])
self.balance = self.get_available_balance()
if self.start:
if not self.get_position_status():
self.ding(error=True, msg="获取仓位信息失败!!!")
continue
# 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值
# 1. 确保所有关键数据为 float 类型BitMart API 常返回字符串)
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
# 2. 计算浮动盈亏USDT
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
elif self.start == -1: # 空头
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
else: # 无仓
unrealized_pnl = 0.0
# 3. 计算收益率(可选,更直观)
if self.start != 0 and open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
# 4. 格式化输出字符串
direction_str = "" if self.start == -1 else ("" if self.start == 1 else "")
pnl_str = f"{unrealized_pnl:+.2f} USDT"
# 6. 最终消息
msg = (
f"【BitMart {self.contract_symbol} 永续】\n"
f"当前方向:{direction_str}\n"
f"当前现价:{current_price:.2f} USDT\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"持仓量(eht){float(self.current_amount) / 1000} eth\n"
f"持仓量(usdt){float(self.position_cross)} usdt\n"
f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
else:
msg = (
f"【BitMart {self.contract_symbol} 永续】\n"
f"当前方向:无\n"
f"当前现价:{current_price:.2f} USDT\n"
# f"开仓均价:{open_avg_price:.2f} USDT\n"
# f"持仓量:{float(self.current_amount) / 1000} eth\n"
# f"浮动盈亏:{pnl_str}{rate_str}\n"
f"账户可用余额:{self.balance:.2f} usdt"
)
# 7. 发送钉钉消息
self.ding(msg=msg)
if __name__ == '__main__':
BitmartFuturesTransaction().action()