From fdcec745850e0cdbf5dce05aead600391c27df9f Mon Sep 17 00:00:00 2001 From: ddrwode <34234@3来 34> Date: Tue, 3 Feb 2026 17:07:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=20weex?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1111 | 6 +- 交易/weex-三分之一策略-5分钟交易.py | 627 ++++++++++++++++++++++++++++ 2 files changed, 631 insertions(+), 2 deletions(-) create mode 100644 交易/weex-三分之一策略-5分钟交易.py diff --git a/1111 b/1111 index a1c51e4..2bd3767 100644 --- a/1111 +++ b/1111 @@ -36,8 +36,10 @@ Abc12345678 持多反手做空:价格跌到 开仓价 - 前一根实体 / 5 - - +反手信号, +当前这根线线跌倒了上一根的三分之一,我开了空,但是这根涨到了上一根 k 线的最高价格,上一根 k 线的上阴线涨幅大于 0.01%,反手做多 + +第二个反手信号,当前这根做了多,上一根 k 线的上阴线涨幅大于 0.01%,当前这个根跌倒了上一根的开盘价格,反手做空 diff --git a/交易/weex-三分之一策略-5分钟交易.py b/交易/weex-三分之一策略-5分钟交易.py new file mode 100644 index 0000000..af5598f --- /dev/null +++ b/交易/weex-三分之一策略-5分钟交易.py @@ -0,0 +1,627 @@ +""" +WEEX ETH-USDT 永续合约 — 三分之一策略(5分钟K线) + +策略规则(与 bitmart/三分之一策略-5分钟交易.py 一致): +1. 触发价格(前一根有效K线实体>=0.1): + - 做多触发价 = 收盘价 + 实体/3,做空触发价 = 收盘价 - 实体/3 +2. 信号:当前5分钟K线最高>=做多触发→多;最低<=做空触发→空;同根多空都触发用开盘价距离判断先后 +3. 反手一:持空且当前涨到上根最高且上根上影线>0.01%→反手多;持多且当前跌到上根最低且上根下影线>0.01%→反手空 +4. 反手二:持多且上根上影线>0.01%且当前跌到上根开盘→反手空;持空且上根下影线>0.01%且当前涨到上根开盘→反手多 +5. 同一根5分钟K线内只交易一次 +""" +import time +import datetime +from typing import Optional, Dict, List, Tuple +from tqdm import tqdm +from loguru import logger +from DrissionPage import ChromiumOptions, ChromiumPage +from curl_cffi import requests +from bit_tools import openBrowser +from 交易.tools import send_dingtalk_message + + +# ==================== 配置常量 ==================== +class Config: + TGE_URL = "http://127.0.0.1:50326" + TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91" + + CONTRACT_ID = "10000002" + PRODUCT_CODE = "cmt_ethusdt" + KLINE_TYPE = "MINUTE_5" # 5分钟K线(三分之一策略) + KLINE_LIMIT = 100 + + TRADING_URL = "https://www.weex.com/zh-CN/futures/ETH-USDT" + POSITION_RATIO = 100 + + MAX_RETRY_ATTEMPTS = 3 + RETRY_DELAY = 1 + + # 三分之一策略参数 + MIN_BODY_SIZE = 0.1 + MIN_SHADOW_PCT = 0.01 + + +# ==================== 三分之一策略分析器 ==================== +class OneThirdStrategyAnalyzer: + """三分之一策略 K 线分析(5分钟):触发价=收盘价±实体/3 + 两个反手信号""" + + @staticmethod + def get_body_size(candle: Dict) -> float: + return abs(float(candle['open']) - float(candle['close'])) + + @staticmethod + def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1) -> Tuple[Optional[int], Optional[Dict]]: + if current_idx <= 0: + return None, None + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + if OneThirdStrategyAnalyzer.get_body_size(prev) >= min_body_size: + return i, prev + return None, None + + @staticmethod + def get_one_third_levels(prev: Dict) -> Tuple[Optional[float], Optional[float]]: + p_open, p_close = float(prev['open']), float(prev['close']) + body = abs(p_open - p_close) + if body < 0.001: + return None, None + return p_close + body / 3, p_close - body / 3 + + @staticmethod + def get_upper_shadow(candle: Dict) -> float: + o, c, h = float(candle['open']), float(candle['close']), float(candle['high']) + return h - max(o, c) + + @staticmethod + def get_lower_shadow(candle: Dict) -> float: + o, c, l = float(candle['open']), float(candle['close']), float(candle['low']) + return min(o, c) - l + + @staticmethod + def upper_shadow_pct(candle: Dict) -> float: + o = float(candle['open']) + return (OneThirdStrategyAnalyzer.get_upper_shadow(candle) / o * 100) if o > 0 else 0.0 + + @staticmethod + def lower_shadow_pct(candle: Dict) -> float: + o = float(candle['open']) + return (OneThirdStrategyAnalyzer.get_lower_shadow(candle) / o * 100) if o > 0 else 0.0 + + @staticmethod + def check_reverse_by_prev_high_low(kline_data: List[Dict], start: int) -> Tuple[Optional[str], Optional[Dict]]: + if len(kline_data) < 2: + return None, None + curr, prev = kline_data[-1], kline_data[-2] + curr_high = float(curr['high']) + curr_low = float(curr['low']) + prev_high = float(prev['high']) + prev_low = float(prev['low']) + if start == -1 and curr_high >= prev_high and OneThirdStrategyAnalyzer.upper_shadow_pct(prev) > Config.MIN_SHADOW_PCT: + return 'long', prev + if start == 1 and curr_low <= prev_low and OneThirdStrategyAnalyzer.lower_shadow_pct(prev) > Config.MIN_SHADOW_PCT: + return 'short', prev + return None, None + + @staticmethod + def check_reverse_by_prev_open(kline_data: List[Dict], start: int) -> Tuple[Optional[str], Optional[Dict]]: + if len(kline_data) < 2: + return None, None + curr, prev = kline_data[-1], kline_data[-2] + curr_high = float(curr['high']) + curr_low = float(curr['low']) + prev_open = float(prev['open']) + if start == 1 and OneThirdStrategyAnalyzer.upper_shadow_pct(prev) > Config.MIN_SHADOW_PCT and curr_low <= prev_open: + return 'short', prev + if start == -1 and OneThirdStrategyAnalyzer.lower_shadow_pct(prev) > Config.MIN_SHADOW_PCT and curr_high >= prev_open: + return 'long', prev + return None, None + + @staticmethod + def check_realtime_trigger( + kline_data: List[Dict], + last_trigger_kline_id: Optional[int], + last_trigger_direction: Optional[str], + ) -> Tuple[Optional[str], Optional[float], Optional[Dict], Optional[Dict]]: + if len(kline_data) < 2: + return None, None, None, None + curr = kline_data[-1] + curr_kline_id = curr['id'] + curr_high = float(curr['high']) + curr_low = float(curr['low']) + curr_open = float(curr['open']) + + valid_prev_idx, prev = OneThirdStrategyAnalyzer.find_valid_prev_bar( + kline_data, len(kline_data) - 1, Config.MIN_BODY_SIZE + ) + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = OneThirdStrategyAnalyzer.get_one_third_levels(prev) + if long_trigger is None: + return None, None, None, None + + long_triggered = curr_high >= long_trigger + short_triggered = curr_low <= short_trigger + both_triggered = long_triggered and short_triggered + + direction = None + trigger_price = None + if both_triggered: + dist_to_long = abs(long_trigger - curr_open) + dist_to_short = abs(short_trigger - curr_open) + if dist_to_short <= dist_to_long: + direction, trigger_price = 'short', short_trigger + else: + direction, trigger_price = 'long', long_trigger + elif short_triggered: + direction, trigger_price = 'short', short_trigger + elif long_triggered: + direction, trigger_price = 'long', long_trigger + + if direction is None: + return None, None, None, None + if last_trigger_kline_id == curr_kline_id and last_trigger_direction == direction: + return None, None, None, None + return direction, trigger_price, prev, curr + + +# ==================== 浏览器管理器 ==================== +class BrowserManager: + def __init__(self, tge_id, tge_url: str, tge_headers: Dict): + self.tge_id = tge_id + self.tge_url = tge_url + self.tge_headers = tge_headers + self.tge_port: Optional[int] = None + self.page: Optional[ChromiumPage] = None + + def openBrowser(self): + try: + bit_port = openBrowser(id=self.tge_id) + co = ChromiumOptions() + co.set_local_port(port=bit_port) + self.page = ChromiumPage(addr_or_opts=co) + self.tge_port = bit_port + return True + except Exception: + return False + + def take_over_browser(self) -> bool: + if not self.tge_port: + return False + try: + co = ChromiumOptions() + co.set_local_port(self.tge_port) + self.page = ChromiumPage(addr_or_opts=co) + self.page.set.window.max() + return True + except Exception: + return False + + def close_extra_tabs(self) -> bool: + if not self.page: + return False + try: + for idx, tab in enumerate(self.page.get_tabs()): + if idx > 0: + tab.close() + return True + except Exception: + return False + + +# ==================== WEEX API 客户端 ==================== +class WEEXApiClient: + def __init__(self): + self.session = requests.Session() + self.headers: Optional[Dict] = None + + def update_headers(self, headers: Dict) -> None: + if not self.headers: + self.session.headers = headers + else: + self.session.headers.update(headers) + self.headers = headers + + def get_kline_data( + self, + contract_id: str = Config.CONTRACT_ID, + product_code: str = Config.PRODUCT_CODE, + kline_type: str = Config.KLINE_TYPE, + limit: int = Config.KLINE_LIMIT, + ) -> List[Dict]: + params = { + 'contractId': contract_id, + 'productCode': product_code, + 'priceType': 'LAST_PRICE', + 'klineType': kline_type, + 'limit': str(limit), + 'timeZone': 'string', + 'languageType': '1', + 'sign': 'SIGN', + } + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + response = self.session.get( + 'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2', + params=params, + timeout=15, + ) + if response.status_code != 200: + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + continue + response_data = response.json() + if "data" not in response_data or "dataList" not in response_data["data"]: + continue + result = response_data["data"]["dataList"] + kline_data = [] + for item in result: + kline_data.append({ + 'id': int(item[4]), + 'open': float(item[3]), + 'high': float(item[1]), + 'low': float(item[2]), + 'close': float(item[0]), + }) + return kline_data + except Exception as e: + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + return [] + + def get_available_balance(self) -> Optional[float]: + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + response = self.session.post( + 'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new', + timeout=15, + ) + return float(response.json()["data"]["newContract"]["balanceList"][0]["available"]) + except Exception: + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + return None + + def get_position_status(self) -> Tuple[bool, Optional[List]]: + json_data = { + 'filterContractIdList': [10000002], + 'limit': 100, + 'languageType': 0, + 'sign': 'SIGN', + 'timeZone': 'string', + } + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + response = self.session.post( + 'https://http-gateway2.janapw.com/api/v1/private/order/v2/getHistoryOrderFillTransactionPage', + json=json_data, + timeout=15, + ) + datas = response.json()["data"]["dataList"] + if not datas: + return True, None + return True, datas + except Exception: + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + return False, None + + +# ==================== Token 管理器 ==================== +class TokenManager: + def __init__(self, api_client: WEEXApiClient, page: ChromiumPage): + self.api_client = api_client + self.page = page + + def get_token(self) -> bool: + tab = self.page.new_tab() + tab.listen.start("/user/security/getLanguageType") + try: + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + tab.get(url=Config.TRADING_URL) + res = tab.listen.wait(timeout=5) + if res.request.headers.get("U-TOKEN"): + self.api_client.update_headers(res.request.headers) + return True + except Exception: + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + return False + finally: + tab.close() + + +# ==================== 交易执行器 ==================== +class TradingExecutor: + def __init__(self, page: ChromiumPage, api_client: WEEXApiClient): + self.page = page + self.api_client = api_client + + def navigate_to_trading_page(self) -> bool: + try: + self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click() + time.sleep(1) + return True + except Exception: + return False + + def close_all_positions(self) -> bool: + try: + self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').scroll.to_see(center=True) + time.sleep(1) + self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').click(by_js=True) + time.sleep(3) + return True + except Exception: + return False + + def open_long(self, amount: float) -> bool: + try: + self.page.ele('x://input[@placeholder="请输入数量"]').input(amount) + time.sleep(1) + self.page.ele('x://*[normalize-space(text()) ="买入开多"]').click(by_js=True) + return True + except Exception: + return False + + def open_short(self, amount: float) -> bool: + try: + self.page.ele('x://input[@placeholder="请输入数量"]').input(amount) + time.sleep(1) + self.page.ele('x://*[normalize-space(text()) ="卖出开空"]').click(by_js=True) + return True + except Exception: + return False + + def execute_trade(self, direction: str, current_position: int, amount: float) -> bool: + if (direction == "long" and current_position == PositionManager.POSITION_LONG) or \ + (direction == "short" and current_position == PositionManager.POSITION_SHORT): + return True + if not self.navigate_to_trading_page(): + return False + try: + if direction == "long": + if current_position == 0: + return self.open_long(amount) + elif current_position == -1: + if self.close_all_positions(): + time.sleep(1) + return self.open_long(amount) + elif direction == "short": + if current_position == 0: + return self.open_short(amount) + elif current_position == 1: + if self.close_all_positions(): + time.sleep(1) + return self.open_short(amount) + except Exception: + pass + return False + + +# ==================== 持仓管理器 ==================== +class PositionManager: + POSITION_SHORT = -1 + POSITION_NONE = 0 + POSITION_LONG = 1 + + def __init__(self, trading_executor: TradingExecutor): + self.trading_executor = trading_executor + self.current_position: int = self.POSITION_NONE + self.position_data: Optional[List] = None + + def update_position(self, position_data: Optional[List]) -> None: + self.position_data = position_data + if not position_data: + self.current_position = self.POSITION_NONE + return + position_data = list(position_data) + position_data.reverse() + start, start1 = 0, 0 + for i in position_data: + direction = i.get("legacyOrderDirection") + if direction == "CLOSE_SHORT": + start = 0 + elif direction == "CLOSE_LONG": + start1 = 0 + elif direction == "OPEN_SHORT": + start -= 1 + elif direction == "OPEN_LONG": + start1 += 1 + if start1: + self.current_position = self.POSITION_LONG + elif start: + self.current_position = self.POSITION_SHORT + else: + self.current_position = self.POSITION_NONE + + +# ==================== 消息发送 ==================== +class MessageSender: + @staticmethod + def send(msg: str, is_error: bool = False) -> None: + prefix = "❌weex三分之一:" if is_error else "🔔weex三分之一:" + for _ in range(15 if is_error else 1): + send_dingtalk_message(f"{prefix}{msg}") + + +# ==================== 5分钟K线时间工具 ==================== +class TimeUtils5m: + @staticmethod + def get_current_kline_timestamp() -> int: + """当前所在 5 分钟 K 线的时间戳(毫秒)""" + t = time.time() + dt = datetime.datetime.fromtimestamp(t) + minute = (dt.minute // 5) * 5 + target = dt.replace(minute=minute, second=0, microsecond=0) + return int(target.timestamp()) * 1000 + + @staticmethod + def get_progress_bar_value() -> int: + """0~4 表示当前 5 分钟内的分钟数""" + return datetime.datetime.now().minute % 5 + + +# ==================== 主交易类 ==================== +class WeexOneThirdTransaction: + """WEEX 三分之一策略(5分钟K线)自动交易""" + + def __init__(self, tge_id): + self.tge_id = tge_id + self.tge_headers = { + "Authorization": Config.TGE_AUTHORIZATION, + "Content-Type": "application/json", + } + self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers) + self.api_client = WEEXApiClient() + self.position_manager: Optional[PositionManager] = None + self.trading_executor: Optional[TradingExecutor] = None + self.token_manager: Optional[TokenManager] = None + + self.pbar: Optional[tqdm] = None + self.last_kline_timestamp: Optional[int] = None + self.kline_data: List[Dict] = [] + self.last_trigger_kline_id: Optional[int] = None + self.last_trigger_direction: Optional[str] = None + self.last_trade_kline_id: Optional[int] = None + + def initialize(self) -> bool: + if not self.browser_manager.openBrowser(): + MessageSender.send("打开浏览器失败", is_error=True) + return False + if not self.browser_manager.take_over_browser(): + MessageSender.send("接管浏览器失败", is_error=True) + return False + self.browser_manager.close_extra_tabs() + page = self.browser_manager.page + self.trading_executor = TradingExecutor(page, self.api_client) + self.position_manager = PositionManager(self.trading_executor) + self.token_manager = TokenManager(self.api_client, page) + page.get(url=Config.TRADING_URL) + if not self.token_manager.get_token(): + logger.warning("初始化获取 token 失败,将在获取K线时重试") + self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80) + return True + + def fetch_and_update_kline(self) -> bool: + if not self.token_manager.get_token(): + return False + kline_data = self.api_client.get_kline_data() + if not kline_data or len(kline_data) < 3: + return False + sorted_data = sorted(kline_data, key=lambda x: x["id"]) + self.kline_data = sorted_data + current_kline_id = sorted_data[-1]["id"] + current_ts = TimeUtils5m.get_current_kline_timestamp() + if current_kline_id != current_ts: + return False + if self.last_kline_timestamp == current_ts: + return False + self.last_kline_timestamp = current_ts + return True + + def sync_position_status(self) -> bool: + success, position_data = self.api_client.get_position_status() + if not success: + return False + self.position_manager.update_position(position_data) + return True + + def process_trading_logic(self) -> None: + self.token_manager.get_token() + self.browser_manager.page.get(url=Config.TRADING_URL) + if not self.sync_position_status(): + return + + kline_data = self.kline_data + curr = kline_data[-1] + curr_kline_id = curr["id"] + start = self.position_manager.current_position + + # 反手一:涨到上根最高/跌到上根最低 + 影线>0.01% + rev_dir, rev_prev = OneThirdStrategyAnalyzer.check_reverse_by_prev_high_low(kline_data, start) + rev_type = "一" + if not rev_dir: + rev_dir, rev_prev = OneThirdStrategyAnalyzer.check_reverse_by_prev_open(kline_data, start) + rev_type = "二" + + if rev_dir and self.last_trade_kline_id != curr_kline_id: + balance = self.api_client.get_available_balance() + amount = int((balance or 0) / Config.POSITION_RATIO) + if self.trading_executor.execute_trade(rev_dir, start, amount): + self.last_trade_kline_id = curr_kline_id + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = rev_dir + if rev_dir == "long": + self.position_manager.current_position = PositionManager.POSITION_LONG + else: + self.position_manager.current_position = PositionManager.POSITION_SHORT + MessageSender.send(f"反手信号{rev_type} {rev_dir},金额 {amount}") + return + + # 主信号:三分之一触发 + direction, trigger_price, valid_prev, curr_kline = OneThirdStrategyAnalyzer.check_realtime_trigger( + kline_data, self.last_trigger_kline_id, self.last_trigger_direction + ) + if not direction: + return + + if self.last_trade_kline_id == curr_kline_id: + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + return + + if (direction == "long" and start == PositionManager.POSITION_LONG) or \ + (direction == "short" and start == PositionManager.POSITION_SHORT): + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + return + + balance = self.api_client.get_available_balance() + if balance is None: + return + amount = int(balance / Config.POSITION_RATIO) + if self.trading_executor.execute_trade(direction, start, amount): + self.last_trade_kline_id = curr_kline_id + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + if direction == "long": + self.position_manager.current_position = PositionManager.POSITION_LONG + else: + self.position_manager.current_position = PositionManager.POSITION_SHORT + MessageSender.send(f"三分之一信号 {direction} 触发价={trigger_price:.2f} 金额={amount}") + + def run(self) -> None: + if not self.initialize(): + return + logger.info("WEEX 三分之一策略(5分钟K线)开始运行") + while True: + try: + if self.pbar: + self.pbar.n = TimeUtils5m.get_progress_bar_value() + self.pbar.refresh() + current_ts = TimeUtils5m.get_current_kline_timestamp() + if self.last_kline_timestamp == current_ts: + time.sleep(10) + continue + if not self.fetch_and_update_kline(): + time.sleep(10) + continue + self.process_trading_logic() + if self.pbar: + self.pbar.reset() + time.sleep(5) + except KeyboardInterrupt: + break + except Exception as e: + logger.error(e) + MessageSender.send(f"运行出错: {e}", is_error=True) + time.sleep(10) + + def action(self) -> None: + self.run() + + +if __name__ == '__main__': + WeexOneThirdTransaction(tge_id="86837a981aba4576be6916a0ef6ad785").action()