This commit is contained in:
27942
2025-12-16 19:22:26 +08:00
parent cc482d1ef6
commit c5fdf86699

356
交易/test.py Normal file
View File

@@ -0,0 +1,356 @@
import time
import datetime
import hmac
import hashlib
from tqdm import tqdm
from loguru import logger
from curl_cffi import requests
from 交易.tools import send_dingtalk_message
def is_bullish(c):
return float(c['close']) > float(c['open'])
def is_bearish(c):
return float(c['close']) < float(c['open'])
class WeexTransaction:
"""BitMart 永续合约自动交易脚本官方API版"""
def __init__(self):
self.base_url = "https://api-cloud.bitmart.com"
# 请填写你的 API 信息
self.ACCESS_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8"
self.SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5"
self.MEMO = "" # 创建 API Key 时填的 Memo备注如果没有可留空或填写实际值
self.session = requests.Session(impersonate="chrome110") # 保持原库,避免风控
# 当前仓位(-1 空 / 0 无 / 1 多)
self.start = 0
# 最新 3 根 kline
self.kline_1 = None
self.kline_2 = None
self.kline_3 = None
# 当前信号
self.direction = None
# 防止同一时段重复执行
self.time_start = None
self.pbar = None
# 合约信息
self.symbol = "ETHUSDT_PERP"
self.leverage = "1" # 杠杆
self.open_type = "isolated" # isolated 或 cross
# -------------------------------------------------------------
# ------------------------- 通用工具 ----------------------------
# -------------------------------------------------------------
def now_text(self):
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def ding(self, msg, error=False):
prefix = "❌bitmart" if error else "🔔bitmart"
send_dingtalk_message(f"{prefix}{self.now_text()}{msg}")
def get_half_hour_timestamp(self):
"""返回当前最近的整点或半点时间戳"""
ts = time.time()
dt = datetime.datetime.fromtimestamp(ts)
target = dt.replace(minute=0, second=0, microsecond=0) if dt.minute < 30 \
else dt.replace(minute=30, second=0, microsecond=0)
return int(target.timestamp())
def sign_request(self, timestamp, query_string="", body_string=""):
"""BitMart 签名HMAC_SHA256(timestamp + '#' + memo + '#' + (query or body))"""
message = f"{timestamp}#{self.MEMO}#{query_string or body_string}"
signature = hmac.new(self.SECRET_KEY.encode(), message.encode(), hashlib.sha256).hexdigest()
return signature
def api_headers(self, timestamp, sign):
return {
"X-BM-KEY": self.ACCESS_KEY,
"X-BM-SIGN": sign,
"X-BM-TIMESTAMP": str(timestamp),
"Content-Type": "application/json",
}
# -------------------------------------------------------------
# --------------------------- K线相关 ---------------------------
# -------------------------------------------------------------
def get_price(self):
"""获取最新 30 分钟 K 线(公共接口,修复版)"""
end_time = int(time.time())
start_time = end_time - 3600 * 4 # 多取一点确保有3根以上
params = {
"symbol": "ETHUSDT", # 修正符号
"type": "30", # 30分钟官方常用 type也可试 step
"start_time": str(start_time),
"end_time": str(end_time),
}
url = "https://api-cloud-v2.bitmart.com/contract/public/kline" # 推荐 v2
for _ in range(5): # 多重试几次
try:
res = self.session.get(url, params=params, timeout=15)
data = res.json()
# 详细打印错误,便于调试
if "code" not in data or data["code"] != 1000:
logger.error(f"K线请求失败: {data}")
self.ding(f"K线获取失败: {data.get('message', '未知错误')}", error=True)
raise Exception(data.get("message", "No code 1000"))
klines = data["data"]["klines"]
if len(klines) < 3:
raise Exception("K线数据不足3根")
datas = []
for k in klines[-3:]: # 取最新3根
datas.append({
'id': int(k[0]), # timestamp
'open': float(k[1]),
'high': float(k[2]),
'low': float(k[3]),
'close': float(k[4]),
})
return sorted(datas, key=lambda x: x["id"])
except Exception as e:
logger.error(f"获取K线异常: {e} | 响应: {res.text if 'res' in locals() else 'N/A'}")
time.sleep(2)
return None
def check_signal(self, prev, curr):
"""包住形态判定"""
p_open, p_close = prev["open"], prev["close"]
c_open, c_close = curr["open"], curr["close"]
# 前跌后涨包住 -> 多
if is_bullish(curr) and is_bearish(prev) and c_close >= p_open:
return "long"
# 前涨后跌包住 -> 空
if is_bearish(curr) and is_bullish(prev) and c_close <= p_open:
return "short"
return None
# -------------------------------------------------------------
# ---------------------- API 接口封装 ---------------------------
# -------------------------------------------------------------
def get_account_balance(self):
"""获取可用余额USDT"""
timestamp = int(time.time() * 1000)
sign = self.sign_request(timestamp)
headers = self.api_headers(timestamp, sign)
for _ in range(3):
try:
url = f"{self.base_url}/contract/private/assets-detail"
res = self.session.get(url, headers=headers, timeout=15)
data = res.json()
if data["code"] != 1000:
raise Exception(data["message"])
for asset in data["data"]["positions"]:
if asset["symbol"] == self.symbol:
return float(asset["available_balance"])
return 0.0
except Exception as e:
logger.error(f"获取余额失败: {e}")
time.sleep(1)
return None
def get_position_status(self):
"""获取当前仓位1 多、-1 空、0 无"""
timestamp = int(time.time() * 1000)
sign = self.sign_request(timestamp)
headers = self.api_headers(timestamp, sign)
params = {"symbol": self.symbol}
for _ in range(3):
try:
url = f"{self.base_url}/contract/private/position"
res = self.session.get(url, params=params, headers=headers, timeout=15)
data = res.json()
if data["code"] != 1000:
raise Exception(data["message"])
positions = data["data"]
if not positions:
self.start = 0
return True
pos = positions[0]
position_side = int(pos["position_type"]) # 1 long, 2 short
self.start = 1 if position_side == 1 else -1
return True
except Exception as e:
logger.error(f"获取仓位失败: {e}")
time.sleep(1)
return False
def submit_order(self, side: str, size: float, close=False):
"""提交订单:开仓或平仓(市价)"""
timestamp = int(time.time() * 1000)
body = {
"symbol": self.symbol,
"type": "market",
"leverage": self.leverage,
"open_type": self.open_type,
"size": str(int(size)), # 张数,必须整数
}
# side: 1=buy_open_long, 2=sell_close_long, 3=sell_open_short, 4=buy_close_short
if close:
body["side"] = "2" if self.start == 1 else "4" # 平仓
else:
body["side"] = side # "1" 多开, "3" 空开
body_string = "".join([f"{k}={v}" for k, v in sorted(body.items())]) # 排序后拼接
sign = self.sign_request(timestamp, body_string=body_string)
headers = self.api_headers(timestamp, sign)
for _ in range(3):
try:
url = f"{self.base_url}/contract/private/submit-order"
res = self.session.post(url, json=body, headers=headers, timeout=15)
data = res.json()
if data["code"] == 1000:
return True
else:
raise Exception(data["message"])
except Exception as e:
logger.error(f"下单失败: {e}")
time.sleep(1)
return False
def do_order(self, direction, amount):
"""执行交易(开仓/反手/止损平仓)"""
if direction == "long":
if self.start == 0:
self.ding(f"信号:多,开多仓 {amount}")
self.submit_order("1", amount)
self.start = 1
elif self.start == -1:
self.ding(f"信号:多,反手空转多 {amount} 张(先平空再开多)")
self.submit_order("4", amount, close=True) # 先平空
time.sleep(1)
self.submit_order("1", amount) # 再开多
self.start = 1
elif direction == "short":
if self.start == 0:
self.ding(f"信号:空,开空仓 {amount}")
self.submit_order("3", amount)
self.start = -1
elif self.start == 1:
self.ding(f"信号:空,反手多转空 {amount} 张(先平多再开空)")
self.submit_order("2", amount, close=True) # 先平多
time.sleep(1)
self.submit_order("3", amount) # 再开空
self.start = -1
# -------------------------------------------------------------
# ----------------------------- 主流程 ---------------------------
# -------------------------------------------------------------
def action(self):
self.pbar = tqdm(total=30, desc="等待半小时周期", ncols=80)
while True:
now = time.localtime()
minute = now.tm_min
# 更新进度条
self.pbar.n = minute if minute < 30 else minute - 30
self.pbar.refresh()
# 时间重复跳过
if self.time_start == self.get_half_hour_timestamp():
time.sleep(5)
continue
# ---- 获取价格 ----
kdatas = self.get_price()
if not kdatas or len(kdatas) < 3:
self.ding("获取K线失败", error=True)
time.sleep(10)
continue
self.kline_1, self.kline_2, self.kline_3 = kdatas[-3:]
if int(self.kline_3["id"]) != self.get_half_hour_timestamp():
time.sleep(10)
continue
logger.success("K线获取成功")
self.time_start = self.get_half_hour_timestamp()
# ---- 获取仓位 ----
if not self.get_position_status():
self.ding("获取仓位失败!", error=True)
continue
# ---- 止损平仓(两根连续大阴/阳线)----
try:
if self.start == 1 and is_bearish(self.kline_1) and is_bearish(self.kline_2):
self.ding("两根大阴线,止损平多")
self.submit_order("2", 999999, close=True) # 大数全平
self.start = 0
elif self.start == -1 and is_bullish(self.kline_1) and is_bullish(self.kline_2):
self.ding("两根大阳线,止损平空")
self.submit_order("4", 999999, close=True)
self.start = 0
except Exception as e:
self.ding(f"止损平仓错误!{e}", error=True)
# ---- 生成新信号 ----
self.direction = self.check_signal(prev=self.kline_1, curr=self.kline_2)
# ---- 执行交易 ----
if self.direction:
balance = self.get_account_balance()
if balance is None:
self.ding("获取余额失败,无法下单", error=True)
continue
amount = int(balance / 100) # 张数,整数
if amount < 1:
self.ding("余额不足,无法下单")
continue
self.do_order(self.direction, amount)
# ---- 周期结束消息 ----
self.pbar.reset()
self.ding(
f"持仓:{'' if self.start == 0 else ('' if self.start == 1 else '')}"
f"信号:{'' if not self.direction else ('' if self.direction == 'long' else '')}"
)
time.sleep(10)
if __name__ == '__main__':
while True:
try:
WeexTransaction().action()
except Exception as e:
logger.error(f"主循环异常: {e}")
time.sleep(30)