From c5fdf86699f872ba9980075f2356a1a286faaaab Mon Sep 17 00:00:00 2001 From: 27942 Date: Tue, 16 Dec 2025 19:22:26 +0800 Subject: [PATCH] fewfef --- 交易/test.py | 356 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 交易/test.py diff --git a/交易/test.py b/交易/test.py new file mode 100644 index 0000000..3b8fcc4 --- /dev/null +++ b/交易/test.py @@ -0,0 +1,356 @@ +import time +import datetime +import hmac +import hashlib +from tqdm import tqdm +from loguru import logger +from curl_cffi import requests + +from 交易.tools import send_dingtalk_message + + +def is_bullish(c): + return float(c['close']) > float(c['open']) + + +def is_bearish(c): + return float(c['close']) < float(c['open']) + + +class WeexTransaction: + """BitMart 永续合约自动交易脚本(官方API版)""" + + def __init__(self): + self.base_url = "https://api-cloud.bitmart.com" + + # 请填写你的 API 信息 + self.ACCESS_KEY = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" + self.SECRET_KEY = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" + self.MEMO = "" # 创建 API Key 时填的 Memo(备注),如果没有可留空或填写实际值 + + self.session = requests.Session(impersonate="chrome110") # 保持原库,避免风控 + + # 当前仓位(-1 空 / 0 无 / 1 多) + self.start = 0 + + # 最新 3 根 kline + self.kline_1 = None + self.kline_2 = None + self.kline_3 = None + + # 当前信号 + self.direction = None + + # 防止同一时段重复执行 + self.time_start = None + + self.pbar = None + + # 合约信息 + self.symbol = "ETHUSDT_PERP" + self.leverage = "1" # 杠杆 + self.open_type = "isolated" # isolated 或 cross + + # ------------------------------------------------------------- + # ------------------------- 通用工具 ---------------------------- + # ------------------------------------------------------------- + + def now_text(self): + return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + def ding(self, msg, error=False): + prefix = "❌bitmart:" if error else "🔔bitmart:" + send_dingtalk_message(f"{prefix}{self.now_text()},{msg}") + + def get_half_hour_timestamp(self): + """返回当前最近的整点或半点时间戳""" + ts = time.time() + dt = datetime.datetime.fromtimestamp(ts) + target = dt.replace(minute=0, second=0, microsecond=0) if dt.minute < 30 \ + else dt.replace(minute=30, second=0, microsecond=0) + return int(target.timestamp()) + + def sign_request(self, timestamp, query_string="", body_string=""): + """BitMart 签名:HMAC_SHA256(timestamp + '#' + memo + '#' + (query or body))""" + message = f"{timestamp}#{self.MEMO}#{query_string or body_string}" + signature = hmac.new(self.SECRET_KEY.encode(), message.encode(), hashlib.sha256).hexdigest() + return signature + + def api_headers(self, timestamp, sign): + return { + "X-BM-KEY": self.ACCESS_KEY, + "X-BM-SIGN": sign, + "X-BM-TIMESTAMP": str(timestamp), + "Content-Type": "application/json", + } + + # ------------------------------------------------------------- + # --------------------------- K线相关 --------------------------- + # ------------------------------------------------------------- + + def get_price(self): + """获取最新 30 分钟 K 线(公共接口,修复版)""" + end_time = int(time.time()) + start_time = end_time - 3600 * 4 # 多取一点确保有3根以上 + + params = { + "symbol": "ETHUSDT", # 修正符号 + "type": "30", # 30分钟(官方常用 type,也可试 step) + "start_time": str(start_time), + "end_time": str(end_time), + } + + url = "https://api-cloud-v2.bitmart.com/contract/public/kline" # 推荐 v2 + + for _ in range(5): # 多重试几次 + try: + res = self.session.get(url, params=params, timeout=15) + data = res.json() + + # 详细打印错误,便于调试 + if "code" not in data or data["code"] != 1000: + logger.error(f"K线请求失败: {data}") + self.ding(f"K线获取失败: {data.get('message', '未知错误')}", error=True) + raise Exception(data.get("message", "No code 1000")) + + klines = data["data"]["klines"] + if len(klines) < 3: + raise Exception("K线数据不足3根") + + datas = [] + for k in klines[-3:]: # 取最新3根 + datas.append({ + 'id': int(k[0]), # timestamp + 'open': float(k[1]), + 'high': float(k[2]), + 'low': float(k[3]), + 'close': float(k[4]), + }) + return sorted(datas, key=lambda x: x["id"]) + + except Exception as e: + logger.error(f"获取K线异常: {e} | 响应: {res.text if 'res' in locals() else 'N/A'}") + time.sleep(2) + + return None + + def check_signal(self, prev, curr): + """包住形态判定""" + p_open, p_close = prev["open"], prev["close"] + c_open, c_close = curr["open"], curr["close"] + + # 前跌后涨包住 -> 多 + if is_bullish(curr) and is_bearish(prev) and c_close >= p_open: + return "long" + + # 前涨后跌包住 -> 空 + if is_bearish(curr) and is_bullish(prev) and c_close <= p_open: + return "short" + + return None + + # ------------------------------------------------------------- + # ---------------------- API 接口封装 --------------------------- + # ------------------------------------------------------------- + + def get_account_balance(self): + """获取可用余额(USDT)""" + timestamp = int(time.time() * 1000) + sign = self.sign_request(timestamp) + headers = self.api_headers(timestamp, sign) + + for _ in range(3): + try: + url = f"{self.base_url}/contract/private/assets-detail" + res = self.session.get(url, headers=headers, timeout=15) + data = res.json() + if data["code"] != 1000: + raise Exception(data["message"]) + for asset in data["data"]["positions"]: + if asset["symbol"] == self.symbol: + return float(asset["available_balance"]) + return 0.0 + except Exception as e: + logger.error(f"获取余额失败: {e}") + time.sleep(1) + return None + + def get_position_status(self): + """获取当前仓位:1 多、-1 空、0 无""" + timestamp = int(time.time() * 1000) + sign = self.sign_request(timestamp) + headers = self.api_headers(timestamp, sign) + params = {"symbol": self.symbol} + + for _ in range(3): + try: + url = f"{self.base_url}/contract/private/position" + res = self.session.get(url, params=params, headers=headers, timeout=15) + data = res.json() + if data["code"] != 1000: + raise Exception(data["message"]) + + positions = data["data"] + if not positions: + self.start = 0 + return True + + pos = positions[0] + position_side = int(pos["position_type"]) # 1 long, 2 short + self.start = 1 if position_side == 1 else -1 + return True + except Exception as e: + logger.error(f"获取仓位失败: {e}") + time.sleep(1) + return False + + def submit_order(self, side: str, size: float, close=False): + """提交订单:开仓或平仓(市价)""" + timestamp = int(time.time() * 1000) + + body = { + "symbol": self.symbol, + "type": "market", + "leverage": self.leverage, + "open_type": self.open_type, + "size": str(int(size)), # 张数,必须整数 + } + + # side: 1=buy_open_long, 2=sell_close_long, 3=sell_open_short, 4=buy_close_short + if close: + body["side"] = "2" if self.start == 1 else "4" # 平仓 + else: + body["side"] = side # "1" 多开, "3" 空开 + + body_string = "".join([f"{k}={v}" for k, v in sorted(body.items())]) # 排序后拼接 + sign = self.sign_request(timestamp, body_string=body_string) + headers = self.api_headers(timestamp, sign) + + for _ in range(3): + try: + url = f"{self.base_url}/contract/private/submit-order" + res = self.session.post(url, json=body, headers=headers, timeout=15) + data = res.json() + if data["code"] == 1000: + return True + else: + raise Exception(data["message"]) + except Exception as e: + logger.error(f"下单失败: {e}") + time.sleep(1) + return False + + def do_order(self, direction, amount): + """执行交易(开仓/反手/止损平仓)""" + if direction == "long": + if self.start == 0: + self.ding(f"信号:多,开多仓 {amount} 张") + self.submit_order("1", amount) + self.start = 1 + elif self.start == -1: + self.ding(f"信号:多,反手空转多 {amount} 张(先平空再开多)") + self.submit_order("4", amount, close=True) # 先平空 + time.sleep(1) + self.submit_order("1", amount) # 再开多 + self.start = 1 + + elif direction == "short": + if self.start == 0: + self.ding(f"信号:空,开空仓 {amount} 张") + self.submit_order("3", amount) + self.start = -1 + elif self.start == 1: + self.ding(f"信号:空,反手多转空 {amount} 张(先平多再开空)") + self.submit_order("2", amount, close=True) # 先平多 + time.sleep(1) + self.submit_order("3", amount) # 再开空 + self.start = -1 + + # ------------------------------------------------------------- + # ----------------------------- 主流程 --------------------------- + # ------------------------------------------------------------- + + def action(self): + self.pbar = tqdm(total=30, desc="等待半小时周期", ncols=80) + + while True: + now = time.localtime() + minute = now.tm_min + + # 更新进度条 + self.pbar.n = minute if minute < 30 else minute - 30 + self.pbar.refresh() + + # 时间重复跳过 + if self.time_start == self.get_half_hour_timestamp(): + time.sleep(5) + continue + + # ---- 获取价格 ---- + kdatas = self.get_price() + if not kdatas or len(kdatas) < 3: + self.ding("获取K线失败!", error=True) + time.sleep(10) + continue + + self.kline_1, self.kline_2, self.kline_3 = kdatas[-3:] + if int(self.kline_3["id"]) != self.get_half_hour_timestamp(): + time.sleep(10) + continue + + logger.success("K线获取成功") + self.time_start = self.get_half_hour_timestamp() + + # ---- 获取仓位 ---- + if not self.get_position_status(): + self.ding("获取仓位失败!", error=True) + continue + + # ---- 止损平仓(两根连续大阴/阳线)---- + try: + if self.start == 1 and is_bearish(self.kline_1) and is_bearish(self.kline_2): + self.ding("两根大阴线,止损平多") + self.submit_order("2", 999999, close=True) # 大数全平 + self.start = 0 + + elif self.start == -1 and is_bullish(self.kline_1) and is_bullish(self.kline_2): + self.ding("两根大阳线,止损平空") + self.submit_order("4", 999999, close=True) + self.start = 0 + except Exception as e: + self.ding(f"止损平仓错误!{e}", error=True) + + # ---- 生成新信号 ---- + self.direction = self.check_signal(prev=self.kline_1, curr=self.kline_2) + + # ---- 执行交易 ---- + if self.direction: + balance = self.get_account_balance() + if balance is None: + self.ding("获取余额失败,无法下单", error=True) + continue + + amount = int(balance / 100) # 张数,整数 + if amount < 1: + self.ding("余额不足,无法下单") + continue + + self.do_order(self.direction, amount) + + # ---- 周期结束消息 ---- + self.pbar.reset() + self.ding( + f"持仓:{'无' if self.start == 0 else ('多' if self.start == 1 else '空')}," + f"信号:{'无' if not self.direction else ('多' if self.direction == 'long' else '空')}" + ) + + time.sleep(10) + + +if __name__ == '__main__': + while True: + try: + WeexTransaction().action() + except Exception as e: + logger.error(f"主循环异常: {e}") + time.sleep(30) \ No newline at end of file