461 lines
18 KiB
Python
461 lines
18 KiB
Python
import time
|
||
import datetime
|
||
|
||
|
||
from tqdm import tqdm
|
||
from loguru import logger
|
||
from bit_tools import openBrowser
|
||
from DrissionPage import ChromiumPage
|
||
from DrissionPage import ChromiumOptions
|
||
|
||
from bitmart.api_contract import APIContract
|
||
from 交易.tools import send_dingtalk_message
|
||
|
||
|
||
class BitmartFuturesTransaction:
|
||
def __init__(self, bit_id):
|
||
|
||
self.page: ChromiumPage | None = None
|
||
|
||
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
|
||
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
|
||
self.memo = "合约交易"
|
||
|
||
self.contract_symbol = "ETHUSDT"
|
||
|
||
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
|
||
|
||
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
|
||
self.direction = None
|
||
|
||
self.pbar = tqdm(total=30, desc="等待K线", ncols=80)
|
||
|
||
self.last_kline_time = None
|
||
|
||
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
|
||
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
|
||
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
|
||
|
||
self.open_avg_price = None # 开仓价格
|
||
self.current_amount = None # 持仓量
|
||
|
||
self.bit_id = bit_id
|
||
|
||
def get_klines(self):
|
||
"""获取最近3根30分钟K线(step=30)"""
|
||
try:
|
||
end_time = int(time.time())
|
||
# 获取足够多的条目确保有最新3根
|
||
response = self.contractAPI.get_kline(
|
||
contract_symbol=self.contract_symbol,
|
||
step=30, # 30分钟
|
||
start_time=end_time - 3600 * 10, # 取最近10小时
|
||
end_time=end_time
|
||
)[0]["data"]
|
||
|
||
# 每根: [timestamp, open, high, low, close, volume]
|
||
formatted = []
|
||
for k in response:
|
||
formatted.append({
|
||
'id': int(k["timestamp"]),
|
||
'open': float(k["open_price"]),
|
||
'high': float(k["high_price"]),
|
||
'low': float(k["low_price"]),
|
||
'close': float(k["close_price"])
|
||
})
|
||
formatted.sort(key=lambda x: x['id'])
|
||
return formatted # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
# 检查是否是429限流错误
|
||
if "429" in error_msg or "too many requests" in error_msg.lower():
|
||
logger.warning(f"API限流,等待60秒后重试: {e}")
|
||
time.sleep(60) # 等待60秒后重试
|
||
else:
|
||
logger.error(f"获取K线异常: {e}")
|
||
self.ding(msg="获取K线异常", error=True)
|
||
return None
|
||
|
||
def get_current_price(self):
|
||
"""获取当前最新价格,用于计算张数"""
|
||
try:
|
||
end_time = int(time.time())
|
||
response = self.contractAPI.get_kline(
|
||
contract_symbol=self.contract_symbol,
|
||
step=1, # 1分钟
|
||
start_time=end_time - 3600 * 3, # 取最近10小时
|
||
end_time=end_time
|
||
)[0]
|
||
if response['code'] == 1000:
|
||
return float(response['data'][-1]["close_price"])
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取价格异常: {e}")
|
||
return None
|
||
|
||
def get_available_balance(self):
|
||
"""获取合约账户可用USDT余额"""
|
||
try:
|
||
response = self.contractAPI.get_assets_detail()[0]
|
||
if response['code'] == 1000:
|
||
data = response['data']
|
||
if isinstance(data, dict):
|
||
return float(data.get('available_balance', 0))
|
||
elif isinstance(data, list):
|
||
for asset in data:
|
||
if asset.get('currency') == 'USDT':
|
||
return float(asset.get('available_balance', 0))
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"余额查询异常: {e}")
|
||
return None
|
||
|
||
# 获取当前持仓方向
|
||
def get_position_status(self):
|
||
"""获取当前持仓方向"""
|
||
try:
|
||
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
|
||
if response['code'] == 1000:
|
||
positions = response['data']
|
||
if not positions:
|
||
self.start = 0
|
||
self.open_avg_price = None
|
||
self.current_amount = None
|
||
self.position_cross = None
|
||
return True
|
||
self.start = 1 if positions[0]['position_type'] == 1 else -1
|
||
self.open_avg_price = positions[0]['open_avg_price']
|
||
self.current_amount = positions[0]['current_amount']
|
||
self.position_cross = positions[0]["position_cross"]
|
||
return True
|
||
|
||
else:
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"持仓查询异常: {e}")
|
||
return False
|
||
|
||
# 设置杠杆和全仓
|
||
def set_leverage(self):
|
||
"""程序启动时设置全仓 + 高杠杆"""
|
||
try:
|
||
response = self.contractAPI.post_submit_leverage(
|
||
contract_symbol=self.contract_symbol,
|
||
leverage=self.leverage,
|
||
open_type=self.open_type
|
||
)[0]
|
||
if response['code'] == 1000:
|
||
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
|
||
return True
|
||
else:
|
||
logger.error(f"杠杆设置失败: {response}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"设置杠杆异常: {e}")
|
||
return False
|
||
|
||
def openBrowser(self):
|
||
"""打开 TGE 对应浏览器实例"""
|
||
try:
|
||
bit_port = openBrowser(id=self.bit_id)
|
||
co = ChromiumOptions()
|
||
co.set_local_port(port=bit_port)
|
||
self.page = ChromiumPage(addr_or_opts=co)
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def take_over_browser(self):
|
||
"""接管浏览器"""
|
||
try:
|
||
co = ChromiumOptions()
|
||
co.set_local_port(self.tge_port)
|
||
self.page = ChromiumPage(addr_or_opts=co)
|
||
self.page.set.window.max()
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def close_extra_tabs(self):
|
||
"""关闭多余 tab"""
|
||
try:
|
||
for idx, tab in enumerate(self.page.get_tabs()):
|
||
if idx > 0:
|
||
tab.close()
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def click_safe(self, xpath, sleep=0.5):
|
||
"""安全点击"""
|
||
try:
|
||
ele = self.page.ele(xpath)
|
||
if not ele:
|
||
return False
|
||
ele.scroll.to_see(center=True)
|
||
time.sleep(sleep)
|
||
ele.click(by_js=True)
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
def 平仓(self):
|
||
self.click_safe('x://span[normalize-space(text()) ="市价"]')
|
||
|
||
def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None):
|
||
"""
|
||
marketPriceLongOrder 市价最多或者做空,1是做多,-1是做空
|
||
limitPriceShortOrder 限价最多或者做空
|
||
"""
|
||
|
||
if marketPriceLongOrder == -1:
|
||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||
self.page.ele('x://*[@id="size_0"]').input(size)
|
||
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
|
||
elif marketPriceLongOrder == 1:
|
||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||
self.page.ele('x://*[@id="size_0"]').input(size)
|
||
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
|
||
|
||
if limitPriceShortOrder == -1:
|
||
self.click_safe('x://button[normalize-space(text()) ="限价"]')
|
||
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
|
||
time.sleep(1)
|
||
self.page.ele('x://*[@id="size_0"]').input(1)
|
||
self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]')
|
||
elif limitPriceShortOrder == 1:
|
||
self.click_safe('x://button[normalize-space(text()) ="限价"]')
|
||
self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True)
|
||
time.sleep(1)
|
||
self.page.ele('x://*[@id="size_0"]').input(1)
|
||
self.click_safe('x://span[normalize-space(text()) ="买入/做多"]')
|
||
|
||
def ding(self, msg, error=False):
|
||
"""统一消息格式"""
|
||
prefix = "❌bitmart:" if error else "🔔bitmart:"
|
||
if error:
|
||
logger.error(msg)
|
||
for i in range(10):
|
||
send_dingtalk_message(f"{prefix},{msg}")
|
||
else:
|
||
logger.info(msg)
|
||
send_dingtalk_message(f"{prefix},{msg}")
|
||
|
||
def close_extra_tabs_in_browser(self):
|
||
|
||
try:
|
||
for _, i in enumerate(self.page.get_tabs()):
|
||
if _ == 0:
|
||
continue
|
||
|
||
i.close()
|
||
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
return False
|
||
|
||
def get_now_time(self):
|
||
# 获取当前时间戳
|
||
current_timestamp = time.time()
|
||
# 将当前时间戳转换为 datetime 对象
|
||
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
|
||
|
||
# 计算距离当前时间最近的整点或 30 分时刻
|
||
if current_datetime.minute < 30:
|
||
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
|
||
else:
|
||
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
|
||
|
||
# 将目标 datetime 对象转换为时间戳
|
||
target_timestamp = target_datetime.timestamp()
|
||
|
||
return int(target_timestamp)
|
||
|
||
def is_bullish(self, c): # 阳线
|
||
return float(c['close']) > float(c['open'])
|
||
|
||
def is_bearish(self, c): # 阴线
|
||
return float(c['close']) < float(c['open'])
|
||
|
||
def check_signal(self, prev, curr):
|
||
"""
|
||
包住形态信号判定(优化版):
|
||
只看两种信号,严格按照收盘价与开盘价的比较:
|
||
|
||
1. 阳包阴(涨包跌,前跌后涨)-> 做多:
|
||
- 前一根是跌(阴线:close < open)
|
||
- 后一根是涨(阳线:close > open)
|
||
- 且:涨的收盘价 > 跌的开盘价(curr['close'] > prev['open'])
|
||
|
||
2. 阴包阳(跌包涨,前涨后跌)-> 做空:
|
||
- 前一根是涨(阳线:close > open)
|
||
- 后一根是跌(阴线:close < open)
|
||
- 且:跌的收盘价 < 涨的开盘价(curr['close'] < prev['open'])
|
||
"""
|
||
p_open = float(prev['open'])
|
||
c_close = float(curr['close'])
|
||
|
||
# 阳包阴(涨包跌,前跌后涨) -> 做多:涨的收盘价 > 跌的开盘价
|
||
if self.is_bearish(prev) and self.is_bullish(curr) and c_close > p_open:
|
||
return "long", "bear_bull_engulf"
|
||
|
||
# 阴包阳(跌包涨,前涨后跌) -> 做空:跌的收盘价 < 涨的开盘价
|
||
if self.is_bullish(prev) and self.is_bearish(curr) and c_close < p_open:
|
||
return "short", "bull_bear_engulf"
|
||
|
||
return None, None
|
||
|
||
def action(self):
|
||
# 启动时设置全仓高杠杆
|
||
if not self.set_leverage():
|
||
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
|
||
return
|
||
|
||
# 1. 打开浏览器
|
||
if not self.openBrowser():
|
||
self.ding("打开 TGE 失败!", error=True)
|
||
return
|
||
logger.info("TGE 端口获取成功")
|
||
|
||
if self.close_extra_tabs_in_browser():
|
||
logger.info('关闭多余标签页成功!!!')
|
||
else:
|
||
logger.info('关闭多余标签页失败!!!')
|
||
|
||
self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
|
||
|
||
self.click_safe('x://button[normalize-space(text()) ="市价"]')
|
||
|
||
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc:进度条说明,ncols:长度
|
||
|
||
self.time_start = None # 时间状态 避免同一个时段,发生太多消息
|
||
while True:
|
||
|
||
# 获取当前时间
|
||
current_time = time.localtime()
|
||
current_minute = current_time.tm_min
|
||
|
||
if current_minute < 30:
|
||
self.pbar.n = current_minute
|
||
self.pbar.refresh()
|
||
else:
|
||
self.pbar.n = current_minute - 30
|
||
self.pbar.refresh()
|
||
|
||
if self.time_start == self.get_now_time():
|
||
time.sleep(5)
|
||
continue
|
||
|
||
new_price_datas = self.get_klines()
|
||
if not new_price_datas:
|
||
logger.info("获取最新价格有问题!!!")
|
||
time.sleep(5) # 等待一段时间后重试
|
||
continue # 跳过本次循环,避免对None进行排序操作
|
||
|
||
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
|
||
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
|
||
|
||
# 判断抓取的数据是否正确
|
||
if self.get_now_time() != self.kline_3["id"]:
|
||
continue
|
||
|
||
self.time_start = self.get_now_time()
|
||
|
||
if self.get_position_status():
|
||
logger.info("获取仓位信息成功!!!")
|
||
else:
|
||
logger.info("获取仓位信息失败!!!")
|
||
|
||
self.ding(msg="获取仓位信息失败!!!", error=True)
|
||
continue
|
||
|
||
if self.start == 1:
|
||
if self.is_bearish(self.kline_1) and self.is_bearish(self.kline_2):
|
||
self.平仓()
|
||
elif self.start == -1:
|
||
if self.is_bullish(self.kline_1) and self.is_bullish(self.kline_2):
|
||
self.平仓()
|
||
|
||
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
|
||
|
||
if self.direction == "long":
|
||
if self.start == -1:
|
||
self.平仓()
|
||
self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent)
|
||
elif self.start == 0:
|
||
self.开单(marketPriceLongOrder=1, size=self.get_available_balance() * self.risk_percent)
|
||
|
||
if self.direction == "short":
|
||
if self.start == 1:
|
||
self.平仓()
|
||
self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent)
|
||
elif self.start == 0:
|
||
self.开单(marketPriceLongOrder=-1, size=self.get_available_balance() * self.risk_percent)
|
||
|
||
# ===================================================================================================
|
||
# 发送持仓信息消息
|
||
msg = None
|
||
current_price = float(self.kline_3["close"])
|
||
balance = self.get_available_balance()
|
||
self.balance = balance if balance is not None else 0.0
|
||
|
||
if self.start:
|
||
# 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值
|
||
# 1. 确保所有关键数据为 float 类型(BitMart API 常返回字符串)
|
||
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
|
||
current_amount = float(self.current_amount) if self.current_amount else 0.0
|
||
position_cross = float(self.position_cross) if hasattr(self, 'position_cross') and self.position_cross else 0.0
|
||
|
||
# 2. 计算浮动盈亏(USDT)
|
||
if self.start == 1: # 多头
|
||
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
|
||
elif self.start == -1: # 空头
|
||
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
|
||
else: # 无仓
|
||
unrealized_pnl = 0.0
|
||
|
||
# 3. 计算收益率(可选,更直观)
|
||
if self.start != 0 and open_avg_price > 0:
|
||
if self.start == 1:
|
||
pnl_rate = (current_price - open_avg_price) / open_avg_price * 10000
|
||
else:
|
||
pnl_rate = (open_avg_price - current_price) / open_avg_price * 10000
|
||
rate_str = f" ({pnl_rate:+.2f}%)"
|
||
else:
|
||
rate_str = ""
|
||
|
||
# 4. 格式化输出字符串
|
||
direction_str = "空" if self.start == -1 else ("多" if self.start == 1 else "无")
|
||
pnl_str = f"{unrealized_pnl:+.2f} USDT"
|
||
|
||
# 6. 最终消息
|
||
msg = (
|
||
f"【BitMart {self.contract_symbol} 永续】\n"
|
||
f"当前方向:{direction_str}\n"
|
||
f"当前现价:{current_price:.2f} USDT\n"
|
||
f"开仓均价:{open_avg_price:.2f} USDT\n"
|
||
f"持仓量(eht):{float(current_amount) / 1000} eth\n"
|
||
f"持仓量(usdt):{position_cross} usdt\n"
|
||
f"浮动盈亏:{pnl_str}{rate_str}\n"
|
||
f"账户可用余额:{self.balance:.2f} usdt"
|
||
)
|
||
else:
|
||
msg = (
|
||
f"【BitMart {self.contract_symbol} 永续】\n"
|
||
f"当前方向:无\n"
|
||
f"当前现价:{current_price:.2f} USDT\n"
|
||
f"账户可用余额:{self.balance:.2f} usdt"
|
||
)
|
||
|
||
# 7. 发送钉钉消息
|
||
self.ding(msg=msg)
|
||
|
||
self.pbar.reset() # 重置进度条
|
||
|
||
|
||
if __name__ == '__main__':
|
||
BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()
|