import time import uuid import datetime from tqdm import tqdm from loguru import logger from bitmart.api_contract import APIContract from bitmart.lib.cloud_exceptions import APIException from 交易.tools import send_dingtalk_message class BitmartFuturesTransaction: def __init__(self): self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" self.memo = "合约交易" self.contract_symbol = "ETHUSDT" self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 self.direction = None self.pbar = tqdm(total=30, desc="等待30分钟K线", ncols=80) self.last_kline_time = None self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位) self.open_type = "cross" # 全仓模式(你的“成本开仓”需求) self.risk_percent = 0.01 # 每次开仓使用可用余额的 1% self.open_avg_price = None # 开仓价格 self.current_amount = None # 持仓量 @staticmethod def is_bullish(c): return float(c['close']) > float(c['open']) @staticmethod def is_bearish(c): return float(c['close']) < float(c['open']) def ding(self, msg, error=False): """统一消息格式""" prefix = "❌bitmart:" if error else "🔔bitmart:" if error: for i in range(10): send_dingtalk_message(f"{prefix},{msg}") else: send_dingtalk_message(f"{prefix},{msg}") def set_leverage(self): """程序启动时设置全仓 + 高杠杆""" try: response = self.contractAPI.post_submit_leverage( contract_symbol=self.contract_symbol, leverage=self.leverage, open_type=self.open_type )[0] if response['code'] == 1000: logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") return True else: logger.error(f"杠杆设置失败: {response}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False def check_signal(self, prev, curr): """简化英戈尔夫形态""" if self.is_bullish(curr) and self.is_bearish(prev) and float(curr['close']) >= float(prev['open']): return "long" if self.is_bearish(curr) and self.is_bullish(prev) and float(curr['close']) <= float(prev['open']): return "short" return None def get_klines(self): """获取最近3根30分钟K线(step=30)""" try: end_time = int(time.time()) # 获取足够多的条目确保有最新3根 response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=30, # 30分钟 start_time=end_time - 3600 * 10, # 取最近10小时 end_time=end_time )[0]["data"] # 每根: [timestamp, open, high, low, close, volume] formatted = [] for k in response: formatted.append({ 'id': int(k["timestamp"]), 'open': float(k["open_price"]), 'high': float(k["high_price"]), 'low': float(k["low_price"]), 'close': float(k["close_price"]) }) formatted.sort(key=lambda x: x['id']) return formatted[-3:] # 最近3根: kline_1 (最老), kline_2, kline_3 (最新) except Exception as e: logger.error(f"获取K线异常: {e}") self.ding(error=True, msg="获取K线异常") return None def get_current_price(self): """获取当前最新价格,用于计算张数""" try: end_time = int(time.time()) # 获取足够多的条目确保有最新3根 response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=30, # 30分钟 start_time=end_time - 3600 * 10, # 取最近10小时 end_time=end_time )[0] if response['code'] == 1000: return float(response['data'][0]["close_price"]) return None except Exception as e: logger.error(f"获取价格异常: {e}") return None def get_available_balance(self): """获取合约账户可用USDT余额""" try: response = self.contractAPI.get_assets_detail()[0] if response['code'] == 1000: data = response['data'] if isinstance(data, dict): return float(data.get('available_balance', 0)) elif isinstance(data, list): for asset in data: if asset.get('currency') == 'USDT': return float(asset.get('available_balance', 0)) return None except Exception as e: logger.error(f"余额查询异常: {e}") return None def get_position_status(self): """获取当前持仓方向""" try: response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] if response['code'] == 1000: positions = response['data'] if not positions: self.start = 0 return True self.start = 1 if positions[0]['position_type'] == 1 else -1 self.open_avg_price = positions[0]['open_avg_price'] self.current_amount = positions[0]['current_amount'] self.position_cross = positions[0]["position_cross"] return True else: return False except Exception as e: logger.error(f"持仓查询异常: {e}") self.ding(error=True, msg="持仓查询异常") return False def calculate_size(self): """计算开仓张数:使用可用余额的1%作为保证金""" balance = self.get_available_balance() self.balance = balance if not balance or balance < 10: logger.warning("余额不足,无法开仓") return 0 price = self.get_current_price() if not price: price = 3000 # 保守估计,避免size过大 leverage = int(self.leverage) margin = balance * self.risk_percent # 使用1%余额 # ETHUSDT 1张 ≈ 0.001 ETH size = int((margin * leverage) / (price * 0.001)) size = max(1, size) logger.info(f"余额 {balance:.2f} USDT → 使用 {margin:.2f} USDT (1%) → 开仓 {size} 张 (价格≈{price})") return size def place_market_order(self, side: int, size: int): if size <= 0: return False client_order_id = f"auto_{int(time.time())}_{uuid.uuid4().hex[:8]}" try: response = self.contractAPI.post_submit_order( contract_symbol=self.contract_symbol, client_order_id=client_order_id, side=side, mode=1, type='market', leverage=self.leverage, open_type=self.open_type, size=size )[0] if response['code'] == 1000: logger.success( f"下单成功: {'开多' if side in [1] else '开空' if side in [4] else '平多' if side in [3] else '平空'} {size}张") return True else: logger.error(f"下单失败: {response}") return False except APIException as e: logger.error(f"API下单异常: {e}") return False def execute_trade(self): size = self.calculate_size() if size == 0: return if self.direction == "long": if self.start == 0: self.place_market_order(1, size) # 开多 self.start = 1 self.ding(msg="开多成功!!!") elif self.start == -1: self.place_market_order(2, 999999) # 全平空 time.sleep(2) self.place_market_order(1, size) self.start = 1 self.ding(msg="平空反手开多成功!!!") elif self.direction == "short": if self.start == 0: self.place_market_order(4, size) # 开空 self.start = -1 self.ding(msg="开空成功!!!") elif self.start == 1: self.place_market_order(3, 999999) # 全平多 time.sleep(2) self.place_market_order(4, size) self.start = -1 self.ding(msg="平多反手开空 成功!!!") def action(self): # 启动时设置全仓高杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,程序继续运行但可能下单失败") self.ding(error=True, msg="杠杆设置失败,程序继续运行但可能下单失败") return while True: current_minute = datetime.datetime.now().minute if current_minute < 30: self.pbar.n = current_minute else: self.pbar.n = current_minute - 30 self.pbar.refresh() # 每10秒检查一次,等待新K线 klines = self.get_klines() if not klines or len(klines) < 3: time.sleep(10) continue kline_1, kline_2, kline_3 = klines if self.last_kline_time == kline_3['id']: time.sleep(10) continue self.last_kline_time = kline_3['id'] logger.success("获取到新30分钟K线") if not self.get_position_status(): self.ding(error=True, msg="获取仓位信息失败!!!") continue # 止损:两连反向K线平仓 try: if self.start == 1 and self.is_bearish(kline_1) and self.is_bearish(kline_2): logger.success("两连阴,止损平多") self.ding(msg="两连阴,止损平多") self.place_market_order(3, 999999) self.start = 0 elif self.start == -1 and self.is_bullish(kline_1) and self.is_bullish(kline_2): logger.success("两连阳,止损平空") self.ding(msg="两连阳,止损平空") self.place_market_order(2, 999999) self.start = 0 except Exception as e: logger.error(f"止损执行异常: {e}") self.ding(error=True, msg="止损执行异常") self.direction = self.check_signal(kline_1, kline_2) if self.direction: logger.success(f"检测到{self.direction}信号,准备开仓(用余额1%)") self.execute_trade() self.pbar.reset() # =================================================================================================== msg = None current_price = float(kline_3["close"]) self.balance = self.get_available_balance() if self.start: if not self.get_position_status(): self.ding(error=True, msg="获取仓位信息失败!!!") continue # 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值 # 1. 确保所有关键数据为 float 类型(BitMart API 常返回字符串) open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0 current_amount = float(self.current_amount) if self.current_amount else 0.0 # 2. 计算浮动盈亏(USDT) if self.start == 1: # 多头 unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price) elif self.start == -1: # 空头 unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price) else: # 无仓 unrealized_pnl = 0.0 # 3. 计算收益率(可选,更直观) if self.start != 0 and open_avg_price > 0: if self.start == 1: pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000 else: pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000 rate_str = f" ({pnl_rate:+.2f}%)" else: rate_str = "" # 4. 格式化输出字符串 direction_str = "空" if self.start == -1 else ("多" if self.start == 1 else "无") pnl_str = f"{unrealized_pnl:+.2f} USDT" # 6. 最终消息 msg = ( f"【BitMart {self.contract_symbol} 永续】\n" f"当前方向:{direction_str}\n" f"当前现价:{current_price:.2f} USDT\n" f"开仓均价:{open_avg_price:.2f} USDT\n" f"持仓量(eht):{float(self.current_amount) / 1000} eth\n" f"持仓量(usdt):{float(self.position_cross)} usdt\n" f"浮动盈亏:{pnl_str}{rate_str}\n" f"账户可用余额:{self.balance:.2f} usdt" ) else: msg = ( f"【BitMart {self.contract_symbol} 永续】\n" f"当前方向:无\n" f"当前现价:{current_price:.2f} USDT\n" # f"开仓均价:{open_avg_price:.2f} USDT\n" # f"持仓量:{float(self.current_amount) / 1000} eth\n" # f"浮动盈亏:{pnl_str}{rate_str}\n" f"账户可用余额:{self.balance:.2f} usdt" ) # 7. 发送钉钉消息 self.ding(msg=msg) if __name__ == '__main__': BitmartFuturesTransaction().action()