This commit is contained in:
27942
2025-12-17 16:29:30 +08:00
parent 396f7fa735
commit 74d5be276b
3 changed files with 348 additions and 825 deletions

347
交易/bitmart_api交易.py Normal file
View File

@@ -0,0 +1,347 @@
import time
import uuid
import datetime
from tqdm import tqdm
from loguru import logger
from bitmart.api_contract import APIContract
from bitmart.lib.cloud_exceptions import APIException
from 交易.tools import send_dingtalk_message
class BitmartFuturesTransaction:
def __init__(self):
self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.memo = "合约交易"
self.contract_symbol = "ETHUSDT"
self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15))
self.start = 0 # 持仓状态: -1 空, 0 无, 1 多
self.direction = None
self.pbar = tqdm(total=30, desc="等待30分钟K线", ncols=80)
self.last_kline_time = None
self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位)
self.open_type = "cross" # 全仓模式(你的“成本开仓”需求)
self.risk_percent = 0.01 # 每次开仓使用可用余额的 1%
self.open_avg_price = None # 开仓价格
self.current_amount = None # 持仓量
@staticmethod
def is_bullish(c):
return float(c['close']) > float(c['open'])
@staticmethod
def is_bearish(c):
return float(c['close']) < float(c['open'])
def ding(self, msg, error=False):
"""统一消息格式"""
prefix = "❌bitmart" if error else "🔔bitmart"
if error:
for i in range(10):
send_dingtalk_message(f"{prefix}{msg}")
else:
send_dingtalk_message(f"{prefix}{msg}")
def set_leverage(self):
"""程序启动时设置全仓 + 高杠杆"""
try:
response = self.contractAPI.post_submit_leverage(
contract_symbol=self.contract_symbol,
leverage=self.leverage,
open_type=self.open_type
)[0]
if response['code'] == 1000:
logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功")
return True
else:
logger.error(f"杠杆设置失败: {response}")
return False
except Exception as e:
logger.error(f"设置杠杆异常: {e}")
return False
def check_signal(self, prev, curr):
"""简化英戈尔夫形态"""
if self.is_bullish(curr) and self.is_bearish(prev) and float(curr['close']) >= float(prev['open']):
return "long"
if self.is_bearish(curr) and self.is_bullish(prev) and float(curr['close']) <= float(prev['open']):
return "short"
return None
def get_klines(self):
"""获取最近3根30分钟K线step=30"""
try:
end_time = int(time.time())
# 获取足够多的条目确保有最新3根
response = self.contractAPI.get_kline(
contract_symbol=self.contract_symbol,
step=30, # 30分钟
start_time=end_time - 3600 * 10, # 取最近10小时
end_time=end_time
)[0]["data"]
# 每根: [timestamp, open, high, low, close, volume]
formatted = []
for k in response:
formatted.append({
'id': int(k["timestamp"]),
'open': float(k["open_price"]),
'high': float(k["high_price"]),
'low': float(k["low_price"]),
'close': float(k["close_price"])
})
formatted.sort(key=lambda x: x['id'])
return formatted[-3:] # 最近3根: kline_1 (最老), kline_2, kline_3 (最新)
except Exception as e:
logger.error(f"获取K线异常: {e}")
self.ding(error=True, msg="获取K线异常")
return None
def get_current_price(self):
"""获取当前最新价格,用于计算张数"""
try:
response = self.contractAPI.get_ticker(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
return float(response['data']['last_price'])
return None
except Exception as e:
logger.error(f"获取价格异常: {e}")
return None
def get_available_balance(self):
"""获取合约账户可用USDT余额"""
try:
response = self.contractAPI.get_assets_detail()[0]
if response['code'] == 1000:
data = response['data']
if isinstance(data, dict):
return float(data.get('available_balance', 0))
elif isinstance(data, list):
for asset in data:
if asset.get('currency') == 'USDT':
return float(asset.get('available_balance', 0))
return None
except Exception as e:
logger.error(f"余额查询异常: {e}")
return None
def get_position_status(self):
"""获取当前持仓方向"""
try:
response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0]
if response['code'] == 1000:
positions = response['data']
if not positions:
self.start = 0
return True
self.start = 1 if positions[0]['position_type'] == 1 else -1
self.open_avg_price = positions[0]['open_avg_price']
self.current_amount = positions[0]['current_amount']
return True
else:
return False
except Exception as e:
logger.error(f"持仓查询异常: {e}")
self.ding(error=True, msg="持仓查询异常")
return False
def calculate_size(self):
"""计算开仓张数使用可用余额的1%作为保证金"""
balance = self.get_available_balance()
if not balance or balance < 10:
logger.warning("余额不足,无法开仓")
return 0
price = self.get_current_price()
if not price:
price = 3000 # 保守估计避免size过大
leverage = int(self.leverage)
margin = balance * self.risk_percent # 使用1%余额
# ETHUSDT 1张 ≈ 0.001 ETH
size = int((margin * leverage) / (price * 0.001))
size = max(1, size)
logger.info(f"余额 {balance:.2f} USDT → 使用 {margin:.2f} USDT (1%) → 开仓 {size} 张 (价格≈{price})")
return size
def place_market_order(self, side: int, size: int):
if size <= 0:
return False
client_order_id = f"auto_{int(time.time())}_{uuid.uuid4().hex[:8]}"
try:
response = self.contractAPI.post_submit_order(
contract_symbol=self.contract_symbol,
client_order_id=client_order_id,
side=side,
mode=1,
type='market',
leverage=self.leverage,
open_type=self.open_type,
size=size
)[0]
if response['code'] == 1000:
logger.success(
f"下单成功: {'开多' if side in [1] else '开空' if side in [4] else '平多' if side in [3] else '平空'} {size}")
return True
else:
logger.error(f"下单失败: {response}")
return False
except APIException as e:
logger.error(f"API下单异常: {e}")
return False
def execute_trade(self):
size = self.calculate_size()
if size == 0:
return
if self.direction == "long":
if self.start == 0:
self.place_market_order(1, size) # 开多
self.start = 1
self.ding(msg="开多成功!!!")
elif self.start == -1:
self.place_market_order(2, 999999) # 全平空
time.sleep(2)
self.place_market_order(1, size)
self.start = 1
self.ding(msg="平空反手开多成功!!!")
elif self.direction == "short":
if self.start == 0:
self.place_market_order(4, size) # 开空
self.start = -1
self.ding(msg="开空成功!!!")
elif self.start == 1:
self.place_market_order(3, 999999) # 全平多
time.sleep(2)
self.place_market_order(4, size)
self.start = -1
self.ding(msg="平多反手开空 成功!!!")
def action(self):
# 启动时设置全仓高杠杆
if not self.set_leverage():
logger.error("杠杆设置失败,程序继续运行但可能下单失败")
self.ding(error=True, msg="杠杆设置失败,程序继续运行但可能下单失败")
return
while True:
current_minute = datetime.datetime.now().minute
if current_minute < 30:
self.pbar.n = current_minute
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
# 每10秒检查一次等待新K线
klines = self.get_klines()
if not klines or len(klines) < 3:
time.sleep(10)
continue
kline_1, kline_2, kline_3 = klines
if self.last_kline_time == kline_3['id']:
time.sleep(10)
continue
self.last_kline_time = kline_3['id']
logger.success("获取到新30分钟K线")
if not self.get_position_status():
self.ding(error=True, msg="获取仓位信息失败!!!")
continue
# 止损两连反向K线平仓
try:
if self.start == 1 and self.is_bearish(kline_1) and self.is_bearish(kline_2):
logger.success("两连阴,止损平多")
self.ding(msg="两连阴,止损平多")
self.place_market_order(3, 999999)
self.start = 0
elif self.start == -1 and self.is_bullish(kline_1) and self.is_bullish(kline_2):
logger.success("两连阳,止损平空")
self.ding(msg="两连阳,止损平空")
self.place_market_order(2, 999999)
self.start = 0
except Exception as e:
logger.error(f"止损执行异常: {e}")
self.ding(error=True, msg="止损执行异常")
self.direction = self.check_signal(kline_1, kline_2)
if self.direction:
logger.success(f"检测到{self.direction}信号准备开仓用余额1%")
self.execute_trade()
self.pbar.reset()
# 持仓方向,开仓价格,现价,持仓量,盈亏,当前价值
# 1. 确保所有关键数据为 float 类型BitMart API 常返回字符串)
current_price = float(kline_3["close"])
open_avg_price = float(self.open_avg_price) if self.open_avg_price else 0.0
current_amount = float(self.current_amount) if self.current_amount else 0.0
# 2. 计算浮动盈亏USDT
if self.start == 1: # 多头
unrealized_pnl = current_amount * 0.001 * (current_price - open_avg_price)
elif self.start == -1: # 空头
unrealized_pnl = current_amount * 0.001 * (open_avg_price - current_price)
else: # 无仓
unrealized_pnl = 0.0
# 3. 计算收益率(可选,更直观)
if self.start != 0 and open_avg_price > 0:
if self.start == 1:
pnl_rate = (current_price - open_avg_price) / open_avg_price * 100
else:
pnl_rate = (open_avg_price - current_price) / open_avg_price * 100
rate_str = f" ({pnl_rate:+.2f}%)"
else:
rate_str = ""
# 4. 格式化输出字符串
direction_str = "" if self.start == -1 else ("" if self.start == 1 else "")
pnl_str = f"{unrealized_pnl:+.2f} USDT"
# 5. 持仓量显示优化k为单位
amount_display = f"{current_amount / 1000:.1f}k 张" if current_amount >= 1000 else f"{current_amount:.0f}"
# 6. 最终消息
msg = (
f"【BitMart {self.contract_symbol} 永续】\n"
f"当前方向:{direction_str}\n"
f"开仓均价:{open_avg_price:.2f} USDT\n"
f"当前现价:{current_price:.2f} USDT\n"
f"持仓数量:{amount_display}\n"
f"浮动盈亏:{pnl_str}{rate_str}"
)
# 7. 发送钉钉消息
self.ding(msg=msg)
if __name__ == '__main__':
BitmartFuturesTransaction().action()

View File

@@ -1,469 +0,0 @@
import time
import datetime
from tqdm import *
from loguru import *
from DrissionPage import *
from curl_cffi import requests
from 交易.tools import send_dingtalk_message
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
class WeexTransaction:
def __init__(self, tge_id):
self.tge_port = None # tge浏览器使用端口
self.tge_id = tge_id # tge id
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
self.page = None # 浏览器对象
self.start = 0 # 持仓状态 -1:做空0维持仓1做多
self.kline_1 = None # 01
self.kline_2 = None # 01
self.kline_1 = self.kline_2 = self.kline_3 = None
self.direction = None # 信号类型
self.pbar = None # 进度条对象
self.session = requests.Session() # 接口请求对象
self.cookies = {}
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
# if is_bullish(curr) and is_bearish(prev) and int(c_open) - 3 <= int(p_close) and int(c_close) >= int(p_open):
# return "long", "bear_bull_engulf"
if is_bullish(curr) and is_bearish(prev) and int(c_close) >= int(p_open):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
# if is_bearish(curr) and is_bullish(prev) and int(c_open) + 3 >= int(p_close) and int(c_close) <= int(p_open):
# return "short", "bull_bear_engulf"
if is_bearish(curr) and is_bullish(prev) and int(c_close) <= int(p_open):
return "short", "bull_bear_engulf"
return None, None
def get_price(self):
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'origin': 'https://derivatives.bitmart.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://derivatives.bitmart.com/',
'sec-ch-ua': '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
}
params = {
'unit': '30',
'resolution': 'M',
'contractID': '2',
'offset': '340',
'endTime': str(int(time.time())),
}
datas = []
for i in range(3):
logger.info(f"获取最新数据:{i + 1}次。。。")
try:
response = self.session.get('https://contract-v2.bitmart.com/v1/ifcontract/quote/kline', params=params,
headers=headers, timeout=5)
for i in response.json()["data"]:
insert_data = {
'id': int(i["timestamp"]) - 1,
'open': float(i["open"]),
'high': float(i["high"]),
'low': float(i["low"]),
'close': float(i["close"])
}
datas.append(insert_data)
return datas
except:
time.sleep(1)
return datas
def to_do_page(self):
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
num = self.get_num()
if num:
logger.info("获取可用余额成功!!!")
else:
logger.error("获取可用余额失败!!!")
self.send_dingtalk_message("获取可用余额失败!!!", type=0)
return
self.page.ele('x://*[@id="size_0"]').input(float(num) / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(f"信号:{self.direction},开多,开仓金额:{float(num) / 100}")
self.page.ele('x://span[normalize-space(text()) = "买入/做多"]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(f"信号:{self.direction},开空,开仓金额:{float(num) / 100}")
self.page.ele('x://span[normalize-space(text()) = "卖出/做空"]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(f"信号:{self.direction},反手平空做多,开仓金额:{float(num) / 100}")
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').click()
time.sleep(3)
self.page.ele('x:(//span[normalize-space(text()) = "买入/做多"])').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(f"信号:{self.direction},反手平多做空,开仓金额:{float(num) / 100}")
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').click()
time.sleep(3)
self.page.ele('x://span[normalize-space(text()) = "卖出/做空"]').click()
self.start = -1
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp)
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_num(self):
params = {
'account_type': '3',
'isServerCal': '1',
}
for i in range(3):
try:
response = self.session.get(
'https://derivatives.bitmart.com/gw-api/contract-tiger/forward/v1/ifcontract/accounts',
params=params,
)
return response.json()["data"]["accounts"][0]["available_balance"]
except:
time.sleep(1)
return False
def get_token(self):
tab = self.page.new_tab()
tab.listen.start("derivatives.bitmart.com/gw-api/contract-tiger/forward/v1/ifcontract/userPositions")
for i in range(3):
tab.get(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT")
try:
res = tab.listen.wait(timeout=5)
self.headers = res.request.headers
for i in res.request.cookies:
self.cookies[i["name"]] = i["value"]
if self.cookies.get('accessKey'):
self.session.cookies.update(self.cookies)
self.session.headers.update(self.headers)
break
except:
time.sleep(1)
tab.close()
return True if self.cookies else False
def get_now_time1(self):
timestamp = time.time()
local_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
return formatted_time
def get_position_status(self):
params = {
'status': '1',
}
for i in range(3):
try:
response = self.session.get(
'https://derivatives.bitmart.com/gw-api/contract-tiger/forward/v1/ifcontract/userPositions',
params=params,
)
try:
resp_data = response.json()
positions = resp_data.get("data", {}).get("positions", [])
if not positions:
self.start = 0
else:
pos_type = positions[0].get("position_type")
if pos_type == 1:
self.start = 1
elif pos_type == 2:
self.start = -1
return True
except (KeyError, IndexError, AttributeError, TypeError, ValueError) as e:
# JSON 解析失败或结构异常
print(f"持仓数据解析错误: {e}")
self.start = 0
except:
time.sleep(1)
return False
def send_dingtalk_message(self, message_content, type=1):
if type:
send_dingtalk_message(
message_content=f"🔔bitmart{self.get_now_time1()}" + message_content
)
else:
for i in range(15):
send_dingtalk_message(
message_content=f"❌bitmart{self.get_now_time1()}" + message_content
)
def action(self):
# 获取比特端口
if self.openBrowser():
logger.info("获取打开比特成功,成功获取端口!!!")
else:
logger.error("打开比特失败!!!")
return
# 接管浏览器
if self.take_over_browser():
logger.info("接管比特浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.page.get(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
self.time_start = None # 时间状态 避免同一个时段,发生太多消息
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue
if not self.kline_3 or self.get_now_time() != int(self.kline_3["id"]):
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败")
self.send_dingtalk_message(message_content=f"获取token失败", type=0)
continue
else:
time.sleep(5)
new_price_datas = self.get_price()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0)
continue
# 解析价格
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() == int(self.kline_3["id"]):
logger.success("获取最新交易价格成功!!!")
else:
continue
if self.time_start == self.get_now_time():
continue
self.time_start = self.get_now_time()
self.page.get(url="https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") # 打开网页
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
try:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').click()
self.start = 0
except:
self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0)
continue
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 根据策略生成信号判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0)
continue
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
if __name__ == '__main__':
WeexTransaction(
tge_id=196495,
).action()

View File

@@ -1,356 +1 @@
import time
import datetime
import hmac
import hashlib
from tqdm import tqdm
from loguru import logger
from curl_cffi import requests
from 交易.tools import send_dingtalk_message
def is_bullish(c):
return float(c['close']) > float(c['open'])
def is_bearish(c):
return float(c['close']) < float(c['open'])
class WeexTransaction:
"""BitMart 永续合约自动交易脚本官方API版"""
def __init__(self):
self.base_url = "https://api-cloud.bitmart.com"
# 请填写你的 API 信息
self.ACCESS_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.MEMO = "" # 创建 API Key 时填的 Memo备注如果没有可留空或填写实际值
self.session = requests.Session(impersonate="chrome110") # 保持原库,避免风控
# 当前仓位(-1 空 / 0 无 / 1 多)
self.start = 0
# 最新 3 根 kline
self.kline_1 = None
self.kline_2 = None
self.kline_3 = None
# 当前信号
self.direction = None
# 防止同一时段重复执行
self.time_start = None
self.pbar = None
# 合约信息
self.symbol = "ETHUSDT_PERP"
self.leverage = "1" # 杠杆
self.open_type = "isolated" # isolated 或 cross
# -------------------------------------------------------------
# ------------------------- 通用工具 ----------------------------
# -------------------------------------------------------------
def now_text(self):
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def ding(self, msg, error=False):
prefix = "❌bitmart" if error else "🔔bitmart"
send_dingtalk_message(f"{prefix}{self.now_text()}{msg}")
def get_half_hour_timestamp(self):
"""返回当前最近的整点或半点时间戳"""
ts = time.time()
dt = datetime.datetime.fromtimestamp(ts)
target = dt.replace(minute=0, second=0, microsecond=0) if dt.minute < 30 \
else dt.replace(minute=30, second=0, microsecond=0)
return int(target.timestamp())
def sign_request(self, timestamp, query_string="", body_string=""):
"""BitMart 签名HMAC_SHA256(timestamp + '#' + memo + '#' + (query or body))"""
message = f"{timestamp}#{self.MEMO}#{query_string or body_string}"
signature = hmac.new(self.SECRET_KEY.encode(), message.encode(), hashlib.sha256).hexdigest()
return signature
def api_headers(self, timestamp, sign):
return {
"X-BM-KEY": self.ACCESS_KEY,
"X-BM-SIGN": sign,
"X-BM-TIMESTAMP": str(timestamp),
"Content-Type": "application/json",
}
# -------------------------------------------------------------
# --------------------------- K线相关 ---------------------------
# -------------------------------------------------------------
def get_price(self):
"""获取最新 30 分钟 K 线(公共接口,修复版)"""
end_time = int(time.time())
start_time = end_time - 3600 * 4 # 多取一点确保有3根以上
params = {
"symbol": "ETHUSDT", # 修正符号
"type": "30", # 30分钟官方常用 type也可试 step
"start_time": str(start_time),
"end_time": str(end_time),
}
url = "https://api-cloud-v2.bitmart.com/contract/public/kline" # 推荐 v2
for _ in range(5): # 多重试几次
try:
res = self.session.get(url, params=params, timeout=15)
data = res.json()
# 详细打印错误,便于调试
if "code" not in data or data["code"] != 1000:
logger.error(f"K线请求失败: {data}")
self.ding(f"K线获取失败: {data.get('message', '未知错误')}", error=True)
raise Exception(data.get("message", "No code 1000"))
klines = data["data"]["klines"]
if len(klines) < 3:
raise Exception("K线数据不足3根")
datas = []
for k in klines[-3:]: # 取最新3根
datas.append({
'id': int(k[0]), # timestamp
'open': float(k[1]),
'high': float(k[2]),
'low': float(k[3]),
'close': float(k[4]),
})
return sorted(datas, key=lambda x: x["id"])
except Exception as e:
logger.error(f"获取K线异常: {e} | 响应: {res.text if 'res' in locals() else 'N/A'}")
time.sleep(2)
return None
def check_signal(self, prev, curr):
"""包住形态判定"""
p_open, p_close = prev["open"], prev["close"]
c_open, c_close = curr["open"], curr["close"]
# 前跌后涨包住 -> 多
if is_bullish(curr) and is_bearish(prev) and c_close >= p_open:
return "long"
# 前涨后跌包住 -> 空
if is_bearish(curr) and is_bullish(prev) and c_close <= p_open:
return "short"
return None
# -------------------------------------------------------------
# ---------------------- API 接口封装 ---------------------------
# -------------------------------------------------------------
def get_account_balance(self):
"""获取可用余额USDT"""
timestamp = int(time.time() * 1000)
sign = self.sign_request(timestamp)
headers = self.api_headers(timestamp, sign)
for _ in range(3):
try:
url = f"{self.base_url}/contract/private/assets-detail"
res = self.session.get(url, headers=headers, timeout=15)
data = res.json()
if data["code"] != 1000:
raise Exception(data["message"])
for asset in data["data"]["positions"]:
if asset["symbol"] == self.symbol:
return float(asset["available_balance"])
return 0.0
except Exception as e:
logger.error(f"获取余额失败: {e}")
time.sleep(1)
return None
def get_position_status(self):
"""获取当前仓位1 多、-1 空、0 无"""
timestamp = int(time.time() * 1000)
sign = self.sign_request(timestamp)
headers = self.api_headers(timestamp, sign)
params = {"symbol": self.symbol}
for _ in range(3):
try:
url = f"{self.base_url}/contract/private/position"
res = self.session.get(url, params=params, headers=headers, timeout=15)
data = res.json()
if data["code"] != 1000:
raise Exception(data["message"])
positions = data["data"]
if not positions:
self.start = 0
return True
pos = positions[0]
position_side = int(pos["position_type"]) # 1 long, 2 short
self.start = 1 if position_side == 1 else -1
return True
except Exception as e:
logger.error(f"获取仓位失败: {e}")
time.sleep(1)
return False
def submit_order(self, side: str, size: float, close=False):
"""提交订单:开仓或平仓(市价)"""
timestamp = int(time.time() * 1000)
body = {
"symbol": self.symbol,
"type": "market",
"leverage": self.leverage,
"open_type": self.open_type,
"size": str(int(size)), # 张数,必须整数
}
# side: 1=buy_open_long, 2=sell_close_long, 3=sell_open_short, 4=buy_close_short
if close:
body["side"] = "2" if self.start == 1 else "4" # 平仓
else:
body["side"] = side # "1" 多开, "3" 空开
body_string = "".join([f"{k}={v}" for k, v in sorted(body.items())]) # 排序后拼接
sign = self.sign_request(timestamp, body_string=body_string)
headers = self.api_headers(timestamp, sign)
for _ in range(3):
try:
url = f"{self.base_url}/contract/private/submit-order"
res = self.session.post(url, json=body, headers=headers, timeout=15)
data = res.json()
if data["code"] == 1000:
return True
else:
raise Exception(data["message"])
except Exception as e:
logger.error(f"下单失败: {e}")
time.sleep(1)
return False
def do_order(self, direction, amount):
"""执行交易(开仓/反手/止损平仓)"""
if direction == "long":
if self.start == 0:
self.ding(f"信号:多,开多仓 {amount}")
self.submit_order("1", amount)
self.start = 1
elif self.start == -1:
self.ding(f"信号:多,反手空转多 {amount} 张(先平空再开多)")
self.submit_order("4", amount, close=True) # 先平空
time.sleep(1)
self.submit_order("1", amount) # 再开多
self.start = 1
elif direction == "short":
if self.start == 0:
self.ding(f"信号:空,开空仓 {amount}")
self.submit_order("3", amount)
self.start = -1
elif self.start == 1:
self.ding(f"信号:空,反手多转空 {amount} 张(先平多再开空)")
self.submit_order("2", amount, close=True) # 先平多
time.sleep(1)
self.submit_order("3", amount) # 再开空
self.start = -1
# -------------------------------------------------------------
# ----------------------------- 主流程 ---------------------------
# -------------------------------------------------------------
def action(self):
self.pbar = tqdm(total=30, desc="等待半小时周期", ncols=80)
while True:
now = time.localtime()
minute = now.tm_min
# 更新进度条
self.pbar.n = minute if minute < 30 else minute - 30
self.pbar.refresh()
# 时间重复跳过
if self.time_start == self.get_half_hour_timestamp():
time.sleep(5)
continue
# ---- 获取价格 ----
kdatas = self.get_price()
if not kdatas or len(kdatas) < 3:
self.ding("获取K线失败", error=True)
time.sleep(10)
continue
self.kline_1, self.kline_2, self.kline_3 = kdatas[-3:]
if int(self.kline_3["id"]) != self.get_half_hour_timestamp():
time.sleep(10)
continue
logger.success("K线获取成功")
self.time_start = self.get_half_hour_timestamp()
# ---- 获取仓位 ----
if not self.get_position_status():
self.ding("获取仓位失败!", error=True)
continue
# ---- 止损平仓(两根连续大阴/阳线)----
try:
if self.start == 1 and is_bearish(self.kline_1) and is_bearish(self.kline_2):
self.ding("两根大阴线,止损平多")
self.submit_order("2", 999999, close=True) # 大数全平
self.start = 0
elif self.start == -1 and is_bullish(self.kline_1) and is_bullish(self.kline_2):
self.ding("两根大阳线,止损平空")
self.submit_order("4", 999999, close=True)
self.start = 0
except Exception as e:
self.ding(f"止损平仓错误!{e}", error=True)
# ---- 生成新信号 ----
self.direction = self.check_signal(prev=self.kline_1, curr=self.kline_2)
# ---- 执行交易 ----
if self.direction:
balance = self.get_account_balance()
if balance is None:
self.ding("获取余额失败,无法下单", error=True)
continue
amount = int(balance / 100) # 张数,整数
if amount < 1:
self.ding("余额不足,无法下单")
continue
self.do_order(self.direction, amount)
# ---- 周期结束消息 ----
self.pbar.reset()
self.ding(
f"持仓:{'' if self.start == 0 else ('' if self.start == 1 else '')}"
f"信号:{'' if not self.direction else ('' if self.direction == 'long' else '')}"
)
time.sleep(10)
if __name__ == '__main__':
while True:
try:
WeexTransaction().action()
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(30)
print(407 / 1000)