From bf5fc79fb901530b886a2bf276b62b02cb37e5d6 Mon Sep 17 00:00:00 2001 From: 27942 Date: Sun, 4 Jan 2026 14:36:53 +0800 Subject: [PATCH] fewfef --- models/__init__.py | 4 +- test.py | 534 +--------------------------------- weex/调试信号检测.py | 90 ++++++ weex/长期持有信号/database.db | Bin 0 -> 8192 bytes 4 files changed, 95 insertions(+), 533 deletions(-) create mode 100644 weex/调试信号检测.py create mode 100644 weex/长期持有信号/database.db diff --git a/models/__init__.py b/models/__init__.py index 5509426..084d966 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,7 +1,9 @@ +from pathlib import Path + from peewee import * # 连接到 SQLite 数据库,如果文件不存在会自动创建 -db = SqliteDatabase(r'database.db') +db = SqliteDatabase(fr'{Path(__file__).parent}/database.db') import pymysql diff --git a/test.py b/test.py index c4474cb..276a930 100644 --- a/test.py +++ b/test.py @@ -1,533 +1,3 @@ -import os -import time -import uuid -import datetime -from dataclasses import dataclass -from tqdm import tqdm -from loguru import logger - -from bitmart.api_contract import APIContract -from bitmart.lib.cloud_exceptions import APIException - -from 交易.tools import send_dingtalk_message - - -@dataclass -class StrategyConfig: - # ===== 合约 ===== - contract_symbol: str = "ETHUSDT" - open_type: str = "cross" - leverage: str = "50" # 1~2 更稳 - - # ===== K线与指标 ===== - step_min: int = 1 - lookback_min: int = 240 - ema_len: int = 30 - atr_len: int = 14 - - # ===== 动态阈值(关键:自适应行情)===== - # entry_dev / tp / sl 都由 ATR/Price 动态计算: max(下限, 系数 * atr_ratio) - entry_dev_floor: float = 0.0010 # 0.10% 最小偏离阈值(保证平静行情也能出单) - tp_floor: float = 0.0005 # 0.05% 最小止盈 - sl_floor: float = 0.0015 # 0.15% 最小止损 - - entry_k: float = 1.20 # entry_dev = max(floor, entry_k * atr_ratio) - tp_k: float = 0.60 # tp = max(floor, tp_k * atr_ratio) - sl_k: float = 1.20 # sl = max(floor, sl_k * atr_ratio) - - max_hold_sec: int = 120 # 2分钟超时退出 - cooldown_sec_after_exit: int = 10 # 平仓后冷却10秒,防抖 - - # ===== 下单/仓位 ===== - risk_percent: float = 0.005 # 每次用可用余额的0.15%作为保证金预算 - min_size: int = 1 - max_size: int = 5000 - - # ===== 日内风控 ===== - daily_loss_limit: float = 0.02 # -2% 停机 - daily_profit_cap: float = 0.01 # +1% 封顶停机 - - # ===== 危险模式过滤(避免趋势/插针)===== - atr_ratio_kill: float = 0.0045 # ATR/Price > 0.45% -> 危险模式,暂停开仓 - big_body_kill: float = 0.012 # 1m实体>1.2% -> 危险模式 - - # ===== 轮询节奏(减少REST压力)===== - klines_refresh_sec: int = 10 - tick_refresh_sec: int = 1 - status_notify_sec: int = 60 - - -class BitmartFuturesMeanReversionBot: - def __init__(self, cfg: StrategyConfig): - self.cfg = cfg - - # ✅ 强制只从环境变量读 - self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" - self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" - self.memo = "合约交易" - - if not self.api_key or not self.secret_key: - raise RuntimeError("请先设置环境变量 BITMART_API_KEY / BITMART_SECRET_KEY / BITMART_MEMO(可选)") - - self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) - - # 持仓状态: -1 空, 0 无, 1 多 - self.pos = 0 - self.entry_price = None - self.entry_ts = None - self.last_exit_ts = 0 - - # 日内权益基准 - self.day_start_equity = None - self.trading_enabled = True - self.day_tag = datetime.date.today() - - # 缓存 - self._klines_cache = None - self._klines_cache_ts = 0 - - self._last_status_notify_ts = 0 - - self.pbar = tqdm(total=60, desc="运行中(秒)", ncols=90) - - # ----------------- 通用工具 ----------------- - def ding(self, msg, error=False): - prefix = "❌bitmart:" if error else "🔔bitmart:" - if error: - for _ in range(3): - send_dingtalk_message(f"{prefix}{msg}") - else: - send_dingtalk_message(f"{prefix}{msg}") - - def set_leverage(self) -> bool: - try: - resp = self.contractAPI.post_submit_leverage( - contract_symbol=self.cfg.contract_symbol, - leverage=self.cfg.leverage, - open_type=self.cfg.open_type - )[0] - if resp.get("code") == 1000: - logger.success(f"设置杠杆成功:{self.cfg.open_type} + {self.cfg.leverage}x") - return True - logger.error(f"设置杠杆失败: {resp}") - self.ding(f"设置杠杆失败: {resp}", error=True) - return False - except Exception as e: - logger.error(f"设置杠杆异常: {e}") - self.ding(f"设置杠杆异常: {e}", error=True) - return False - - # ----------------- 行情/指标 ----------------- - def get_klines_cached(self): - now = time.time() - if self._klines_cache is not None and (now - self._klines_cache_ts) < self.cfg.klines_refresh_sec: - return self._klines_cache - - kl = self.get_klines() - if kl: - self._klines_cache = kl - self._klines_cache_ts = now - return self._klines_cache - - def get_klines(self): - try: - end_time = int(time.time()) - start_time = end_time - 60 * self.cfg.lookback_min - - resp = self.contractAPI.get_kline( - contract_symbol=self.cfg.contract_symbol, - step=self.cfg.step_min, - start_time=start_time, - end_time=end_time - )[0] - - if resp.get("code") != 1000: - logger.error(f"获取K线失败: {resp}") - return None - - data = resp.get("data", []) - formatted = [] - for k in data: - formatted.append({ - "id": int(k["timestamp"]), - "open": float(k["open_price"]), - "high": float(k["high_price"]), - "low": float(k["low_price"]), - "close": float(k["close_price"]), - }) - formatted.sort(key=lambda x: x["id"]) - return formatted - except Exception as e: - logger.error(f"获取K线异常: {e}") - self.ding(f"获取K线异常: {e}", error=True) - return None - - def get_last_price(self, fallback_close: float) -> float: - """ - 优先取更实时的最新价;若SDK不支持/字段不同,回退到K线close。 - """ - try: - if hasattr(self.contractAPI, "get_contract_details"): - r = self.contractAPI.get_contract_details(contract_symbol=self.cfg.contract_symbol)[0] - d = r.get("data") if isinstance(r, dict) else None - if isinstance(d, dict): - for key in ("last_price", "mark_price", "index_price"): - if key in d and d[key] is not None: - return float(d[key]) - - if hasattr(self.contractAPI, "get_ticker"): - r = self.contractAPI.get_ticker(contract_symbol=self.cfg.contract_symbol)[0] - d = r.get("data") if isinstance(r, dict) else None - if isinstance(d, dict): - for key in ("last_price", "price", "last", "close"): - if key in d and d[key] is not None: - return float(d[key]) - except Exception: - pass - - return float(fallback_close) - - @staticmethod - def ema(values, n: int) -> float: - k = 2 / (n + 1) - e = values[0] - for v in values[1:]: - e = v * k + e * (1 - k) - return e - - @staticmethod - def atr(klines, n: int) -> float: - if len(klines) < n + 1: - return 0.0 - trs = [] - for i in range(-n, 0): - cur = klines[i] - prev = klines[i - 1] - tr = max( - cur["high"] - cur["low"], - abs(cur["high"] - prev["close"]), - abs(cur["low"] - prev["close"]), - ) - trs.append(tr) - return sum(trs) / len(trs) - - def is_danger_market(self, klines, price: float) -> bool: - last = klines[-1] - body = abs(last["close"] - last["open"]) / last["open"] if last["open"] else 0.0 - if body >= self.cfg.big_body_kill: - return True - - a = self.atr(klines, self.cfg.atr_len) - atr_ratio = (a / price) if price > 0 else 0.0 - if atr_ratio >= self.cfg.atr_ratio_kill: - return True - - return False - - def dynamic_thresholds(self, atr_ratio: float): - """ - 返回动态 entry_dev / tp / sl - """ - entry_dev = max(self.cfg.entry_dev_floor, self.cfg.entry_k * atr_ratio) - tp = max(self.cfg.tp_floor, self.cfg.tp_k * atr_ratio) - sl = max(self.cfg.sl_floor, self.cfg.sl_k * atr_ratio) - return entry_dev, tp, sl - - # ----------------- 账户/仓位 ----------------- - def get_assets_available(self) -> float: - try: - resp = self.contractAPI.get_assets_detail()[0] - if resp.get("code") != 1000: - return 0.0 - data = resp.get("data") - if isinstance(data, dict): - return float(data.get("available_balance", 0)) - if isinstance(data, list): - for asset in data: - if asset.get("currency") == "USDT": - return float(asset.get("available_balance", 0)) - return 0.0 - except Exception as e: - logger.error(f"余额查询异常: {e}") - return 0.0 - - def get_position_status(self) -> bool: - try: - resp = self.contractAPI.get_position(contract_symbol=self.cfg.contract_symbol)[0] - if resp.get("code") != 1000: - return False - - positions = resp.get("data", []) - if not positions: - self.pos = 0 - return True - - p = positions[0] - self.pos = 1 if p["position_type"] == 1 else -1 - return True - except Exception as e: - logger.error(f"持仓查询异常: {e}") - self.ding(f"持仓查询异常: {e}", error=True) - return False - - def get_equity_proxy(self) -> float: - return self.get_assets_available() - - def refresh_daily_baseline(self): - today = datetime.date.today() - if today != self.day_tag: - self.day_tag = today - self.day_start_equity = None - self.trading_enabled = True - self.ding(f"新的一天({today}):重置日内风控基准") - - def risk_kill_switch(self): - self.refresh_daily_baseline() - equity = self.get_equity_proxy() - if equity <= 0: - return - - if self.day_start_equity is None: - self.day_start_equity = equity - logger.info(f"日内权益基准设定:{equity:.2f} USDT") - return - - pnl = (equity - self.day_start_equity) / self.day_start_equity - if pnl <= -self.cfg.daily_loss_limit: - self.trading_enabled = False - self.ding(f"触发日止损:{pnl * 100:.2f}% -> 停机", error=True) - - if pnl >= self.cfg.daily_profit_cap: - self.trading_enabled = False - self.ding(f"达到日盈利封顶:{pnl * 100:.2f}% -> 停机") - - # ----------------- 下单 ----------------- - def calculate_size(self, price: float) -> int: - """ - 保守仓位估算:按 1张≈0.001ETH(你原假设) - """ - bal = self.get_assets_available() - if bal < 10: - return 0 - - margin = bal * self.cfg.risk_percent - lev = int(self.cfg.leverage) - - size = int((margin * lev) / (price * 0.001)) - size = max(self.cfg.min_size, size) - size = min(self.cfg.max_size, size) - return size - - def place_market_order(self, side: int, size: int) -> bool: - """ - side: - 1 开多 - 2 平空 - 3 平多 - 4 开空 - """ - if size <= 0: - return False - - client_order_id = f"mr_{int(time.time())}_{uuid.uuid4().hex[:8]}" - try: - resp = self.contractAPI.post_submit_order( - contract_symbol=self.cfg.contract_symbol, - client_order_id=client_order_id, - side=side, - mode=1, - type="market", - leverage=self.cfg.leverage, - open_type=self.cfg.open_type, - size=size - )[0] - - logger.info(f"order_resp: {resp}") - - if resp.get("code") == 1000: - return True - - self.ding(f"下单失败: {resp}", error=True) - return False - - except APIException as e: - logger.error(f"API下单异常: {e}") - self.ding(f"API下单异常: {e}", error=True) - return False - - except Exception as e: - logger.error(f"下单未知异常: {e}") - self.ding(f"下单未知异常: {e}", error=True) - return False - - def close_position_all(self): - if self.pos == 1: - ok = self.place_market_order(3, 999999) - if ok: - self.pos = 0 - elif self.pos == -1: - ok = self.place_market_order(2, 999999) - if ok: - self.pos = 0 - - # ----------------- 策略主逻辑 ----------------- - def in_cooldown(self) -> bool: - return (time.time() - self.last_exit_ts) < self.cfg.cooldown_sec_after_exit - - def maybe_enter(self, price: float, ema_value: float, entry_dev: float): - if self.pos != 0: - return - if self.in_cooldown(): - return - - dev = (price - ema_value) / ema_value if ema_value else 0.0 - size = self.calculate_size(price) - - logger.info( - f"enter_check: price={price:.2f}, ema={ema_value:.2f}, dev={dev * 100:.3f}% " - f"(阈值={entry_dev * 100:.3f}%), size={size}, pos={self.pos}" - ) - - if size <= 0: - return - - if dev <= -entry_dev: - if self.place_market_order(1, size): # 开多 - self.pos = 1 - self.entry_price = price - self.entry_ts = time.time() - self.ding(f"✅开多:dev={dev * 100:.3f}% size={size} entry={price:.2f}") - - elif dev >= entry_dev: - if self.place_market_order(4, size): # 开空 - self.pos = -1 - self.entry_price = price - self.entry_ts = time.time() - self.ding(f"✅开空:dev={dev * 100:.3f}% size={size} entry={price:.2f}") - - def maybe_exit(self, price: float, tp: float, sl: float): - if self.pos == 0 or self.entry_price is None or self.entry_ts is None: - return - - hold = time.time() - self.entry_ts - - if self.pos == 1: - pnl = (price - self.entry_price) / self.entry_price - else: - pnl = (self.entry_price - price) / self.entry_price - - if pnl >= tp: - self.close_position_all() - self.ding(f"🎯止盈:pnl={pnl * 100:.3f}% price={price:.2f} tp={tp * 100:.3f}%") - self.entry_price, self.entry_ts = None, None - self.last_exit_ts = time.time() - - elif pnl <= -sl: - self.close_position_all() - self.ding(f"🛑止损:pnl={pnl * 100:.3f}% price={price:.2f} sl={sl * 100:.3f}%", error=True) - self.entry_price, self.entry_ts = None, None - self.last_exit_ts = time.time() - - elif hold >= self.cfg.max_hold_sec: - self.close_position_all() - self.ding(f"⏱超时:hold={int(hold)}s pnl={pnl * 100:.3f}% price={price:.2f}") - self.entry_price, self.entry_ts = None, None - self.last_exit_ts = time.time() - - def notify_status_throttled(self, price: float, ema_value: float, dev: float, bal: float, - atr_ratio: float, entry_dev: float, tp: float, sl: float): - now = time.time() - if (now - self._last_status_notify_ts) < self.cfg.status_notify_sec: - return - self._last_status_notify_ts = now - - direction_str = "多" if self.pos == 1 else ("空" if self.pos == -1 else "无") - msg = ( - f"【BitMart {self.cfg.contract_symbol}|均价回归微利(动态阈值)】\n" - f"方向:{direction_str}\n" - f"现价:{price:.2f}\n" - f"EMA{self.cfg.ema_len}:{ema_value:.2f}\n" - f"dev:{dev * 100:.3f}%(阈值{entry_dev * 100:.3f}%)\n" - f"ATR比:{atr_ratio * 100:.3f}%\n" - f"tp/sl:{tp * 100:.3f}% / {sl * 100:.3f}%\n" - f"可用余额:{bal:.2f} USDT 杠杆:{self.cfg.leverage}x\n" - f"超时:{self.cfg.max_hold_sec}s 冷却:{self.cfg.cooldown_sec_after_exit}s" - ) - self.ding(msg) - - def action(self): - if not self.set_leverage(): - self.ding("杠杆设置失败,停止运行", error=True) - return - - while True: - now_dt = datetime.datetime.now() - self.pbar.n = now_dt.second - self.pbar.refresh() - - klines = self.get_klines_cached() - if not klines or len(klines) < (self.cfg.ema_len + 5): - time.sleep(1) - continue - - last_k = klines[-1] - closes = [k["close"] for k in klines[-(self.cfg.ema_len + 1):]] - ema_value = self.ema(closes, self.cfg.ema_len) - - # 用更实时的价格触发(取不到就回退K线close) - price = self.get_last_price(fallback_close=float(last_k["close"])) - dev = (price - ema_value) / ema_value if ema_value else 0.0 - - # 动态阈值:跟随 ATR/Price - a = self.atr(klines, self.cfg.atr_len) - atr_ratio = (a / price) if price > 0 else 0.0 - entry_dev, tp, sl = self.dynamic_thresholds(atr_ratio) - - # 日内风控 - self.risk_kill_switch() - - # 刷新仓位 - if not self.get_position_status(): - time.sleep(1) - continue - - # 停机:平仓+不再开仓 - if not self.trading_enabled: - if self.pos != 0: - self.close_position_all() - time.sleep(5) - continue - - # 危险市场:不新开仓(允许已有仓按tp/sl/超时退出) - if self.is_danger_market(klines, price): - logger.warning("危险模式:高波动/大实体K,暂停开仓") - self.maybe_exit(price, tp, sl) - time.sleep(self.cfg.tick_refresh_sec) - continue - - # 先出场再入场 - self.maybe_exit(price, tp, sl) - self.maybe_enter(price, ema_value, entry_dev) - - # 状态通知(限频) - bal = self.get_assets_available() - self.notify_status_throttled(price, ema_value, dev, bal, atr_ratio, entry_dev, tp, sl) - - time.sleep(self.cfg.tick_refresh_sec) - - -if __name__ == "__main__": - """ - Windows 设置环境变量示例(PowerShell): - setx BITMART_API_KEY "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" - setx BITMART_SECRET_KEY "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" - setx BITMART_MEMO "合约交易" - 重新打开终端再运行。 - """ - cfg = StrategyConfig() - bot = BitmartFuturesMeanReversionBot(cfg) - bot.action() - -# 9274.08 -# 9260.59 +from pathlib import Path +print(Path(__file__).parent) \ No newline at end of file diff --git a/weex/调试信号检测.py b/weex/调试信号检测.py new file mode 100644 index 0000000..6e5dba5 --- /dev/null +++ b/weex/调试信号检测.py @@ -0,0 +1,90 @@ +""" +调试信号检测逻辑,查看为什么没有检测到交易信号 +""" +import datetime +from models.weex import Weex30 + +def get_data_by_date(model, date_str: str): + """按天获取指定表的数据""" + try: + target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + print(f"日期格式不正确: {date_str}") + return [] + + start_ts = int(target_date.timestamp() * 1000) + end_ts = int((target_date + datetime.timedelta(days=1)).timestamp() * 1000) - 1 + + query = model.select().where(model.id.between(start_ts, end_ts)).order_by(model.id.asc()) + return [{'id': i.id, 'open': i.open, 'high': i.high, 'low': i.low, 'close': i.close} for i in query] + +def is_bullish(c): + return float(c['close']) > float(c['open']) + +def is_bearish(c): + return float(c['close']) < float(c['open']) + +def check_signal(prev, curr): + """包住形态信号判定""" + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 前跌后涨包住 -> 做多 + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + +# 获取数据 +dates = [] +for i in range(2, 32): + dates.append(f"2025-12-{i:02d}") +for i in range(1, 5): + dates.append(f"2026-01-{i:02d}") + +all_data = [] +for d in dates: + day_data = get_data_by_date(Weex30, d) + all_data.extend(day_data) + if day_data: + print(f"{d}: 读取到 {len(day_data)} 条数据") + +print(f"\n总共读取到 {len(all_data)} 条数据") + +if len(all_data) < 2: + print("数据不足,无法检测信号(至少需要2条K线)") + exit() + +# 排序 +all_data.sort(key=lambda x: x['id']) + +# 检查信号 +signal_count = 0 +for idx in range(1, len(all_data)): + prev = all_data[idx - 1] + curr = all_data[idx] + direction, signal_key = check_signal(prev, curr) + + if direction: + signal_count += 1 + prev_time = datetime.datetime.fromtimestamp(prev['id'] / 1000) + curr_time = datetime.datetime.fromtimestamp(curr['id'] / 1000) + print(f"\n信号 #{signal_count}: {signal_key} ({direction})") + print(f" 前一根K线 ({prev_time}): O={prev['open']:.2f} H={prev['high']:.2f} L={prev['low']:.2f} C={prev['close']:.2f} {'阳' if is_bullish(prev) else '阴'}") + print(f" 当前K线 ({curr_time}): O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f} C={curr['close']:.2f} {'阳' if is_bullish(curr) else '阴'}") + print(f" 包住条件检查:") + print(f" - 前跌后涨: {is_bearish(prev)} and {is_bullish(curr)}") + print(f" - 开盘<=前收盘: {float(curr['open'])} <= {float(prev['close'])} = {float(curr['open']) <= float(prev['close'])}") + print(f" - 收盘>=前开盘: {float(curr['close'])} >= {float(prev['open'])} = {float(curr['close']) >= float(prev['open'])}") + +if signal_count == 0: + print("\n未检测到任何信号!") + print("\n检查前10根K线的情况:") + for idx in range(min(10, len(all_data))): + k = all_data[idx] + k_time = datetime.datetime.fromtimestamp(k['id'] / 1000) + print(f" {idx}: {k_time} O={k['open']:.2f} C={k['close']:.2f} {'阳' if is_bullish(k) else '阴'}") diff --git a/weex/长期持有信号/database.db b/weex/长期持有信号/database.db new file mode 100644 index 0000000000000000000000000000000000000000..df6f204303461ed0ce0221e2957658250fc9d86a GIT binary patch literal 8192 zcmeI#y$ZrG6b0Z!1aT1E9J*XGDToNZfK`GNW7S#*C#f23p<2;`u0DY8>u8EUtGk{{ z!p#XG!?)e(L`A8lO>*1?DYaRJan3qKj4^I@&g^$(IsXo(XZc%k*1X+I<`ple%q$cH zAOHafKmY;|fB*y_009U<;BN%3MP8^{7QaSmu#ELt>&v3udbM27leRC(x4TM`c-;!B ziD)gzaedjB9=Ri*+?i4|_MD;Z&1oR#4HC&g?**9;lnG(94L>1H&Y$xtPEPun>e*~= dhJF+TAOHafKmY;|fB*y_009U<00KWO@Bkv