diff --git a/weex/三分之一策略-5分钟交易.py b/weex/三分之一策略-5分钟交易.py new file mode 100644 index 0000000..b71baab --- /dev/null +++ b/weex/三分之一策略-5分钟交易.py @@ -0,0 +1,289 @@ +""" +WEEX 三分之一策略 — 5分钟K线交易 +基于 weex/框架.py,策略逻辑与 bitmart/三分之一策略-5分钟交易.py 一致。 + +策略规则: +1. 触发价(前一根有效K线实体>=0.1):做多=收盘+实体/3,做空=收盘-实体/3 +2. 信号:当前5分钟K线最高>=做多触发→多;最低<=做空触发→空;同根都触发用开盘价距离判断先后 +3. 反手一:持空且当前涨到上根最高且上根上影线>0.01%→反手多;持多且当前跌到上根最低且上根下影线>0.01%→反手空 +4. 反手二:持多且上根上影线>0.01%且当前跌到上根开盘→反手空;持空且上根下影线>0.01%且当前涨到上根开盘→反手多 +5. 同一根5分钟K线内只交易一次 +""" +import random +import sys +import time +import datetime +from pathlib import Path +from typing import Optional, Dict, List, Tuple + +from tqdm import tqdm +from loguru import logger + +# 保证可导入同目录下的 框架 +_weex_dir = Path(__file__).resolve().parent +if str(_weex_dir) not in sys.path: + sys.path.insert(0, str(_weex_dir)) +from 框架 import Config, WeexFuturesTransaction + +# 5 分钟 K 线类型(若 WEEX 接口名不同可改为实际值) +KLINE_TYPE_5M = "MINUTE_5" + + +class WeexOneThirdStrategy(WeexFuturesTransaction): + """WEEX 三分之一策略(5分钟K线),继承 weex/框架.py""" + + def __init__(self, tge_id): + super().__init__(tge_id) + self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80) + self.min_body_size = 0.1 + self.min_shadow_pct = 0.01 + self.check_interval = 3 + self.position_ratio = 100 # 开仓金额 = 余额 / position_ratio + self.last_trigger_kline_id: Optional[int] = None + self.last_trigger_direction: Optional[str] = None + self.last_trade_kline_id: Optional[int] = None + + # ------------------------- 5分钟K线 ------------------------- + def get_klines_5m(self) -> Optional[List[Dict]]: + """获取 5 分钟 K 线""" + return self.get_klines(kline_type=KLINE_TYPE_5M, limit=100) + + # ------------------------- 三分之一策略核心 ------------------------- + def get_body_size(self, candle: Dict) -> float: + return abs(float(candle["open"]) - float(candle["close"])) + + def find_valid_prev_bar(self, all_data: List[Dict], current_idx: int) -> Tuple[Optional[int], Optional[Dict]]: + if current_idx <= 0: + return None, None + for i in range(current_idx - 1, -1, -1): + prev = all_data[i] + if self.get_body_size(prev) >= self.min_body_size: + return i, prev + return None, None + + def get_one_third_levels(self, prev: Dict) -> Tuple[Optional[float], Optional[float]]: + p_open, p_close = float(prev["open"]), float(prev["close"]) + body = abs(p_open - p_close) + if body < 0.001: + return None, None + return p_close + body / 3, p_close - body / 3 + + def get_upper_shadow(self, candle: Dict) -> float: + o, c, h = float(candle["open"]), float(candle["close"]), float(candle["high"]) + return h - max(o, c) + + def get_lower_shadow(self, candle: Dict) -> float: + o, c, l = float(candle["open"]), float(candle["close"]), float(candle["low"]) + return min(o, c) - l + + def upper_shadow_pct(self, candle: Dict) -> float: + o = float(candle["open"]) + return (self.get_upper_shadow(candle) / o * 100) if o > 0 else 0.0 + + def lower_shadow_pct(self, candle: Dict) -> float: + o = float(candle["open"]) + return (self.get_lower_shadow(candle) / o * 100) if o > 0 else 0.0 + + def check_reverse_by_prev_high_low(self, kline_data: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]: + if len(kline_data) < 2: + return None, None + curr, prev = kline_data[-1], kline_data[-2] + curr_high = float(curr["high"]) + curr_low = float(curr["low"]) + prev_high = float(prev["high"]) + prev_low = float(prev["low"]) + if self.start == -1 and curr_high >= prev_high and self.upper_shadow_pct(prev) > self.min_shadow_pct: + return "long", prev + if self.start == 1 and curr_low <= prev_low and self.lower_shadow_pct(prev) > self.min_shadow_pct: + return "short", prev + return None, None + + def check_reverse_by_prev_open(self, kline_data: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]: + if len(kline_data) < 2: + return None, None + curr, prev = kline_data[-1], kline_data[-2] + curr_high = float(curr["high"]) + curr_low = float(curr["low"]) + prev_open = float(prev["open"]) + if self.start == 1 and self.upper_shadow_pct(prev) > self.min_shadow_pct and curr_low <= prev_open: + return "short", prev + if self.start == -1 and self.lower_shadow_pct(prev) > self.min_shadow_pct and curr_high >= prev_open: + return "long", prev + return None, None + + def check_realtime_trigger( + self, kline_data: List[Dict] + ) -> Tuple[Optional[str], Optional[float], Optional[Dict], Optional[Dict]]: + if len(kline_data) < 2: + return None, None, None, None + curr = kline_data[-1] + curr_kline_id = curr["id"] + curr_high = float(curr["high"]) + curr_low = float(curr["low"]) + curr_open = float(curr["open"]) + + valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1) + if prev is None: + return None, None, None, None + + long_trigger, short_trigger = self.get_one_third_levels(prev) + if long_trigger is None: + return None, None, None, None + + long_triggered = curr_high >= long_trigger + short_triggered = curr_low <= short_trigger + both_triggered = long_triggered and short_triggered + + direction = None + trigger_price = None + if both_triggered: + dist_to_long = abs(long_trigger - curr_open) + dist_to_short = abs(short_trigger - curr_open) + if dist_to_short <= dist_to_long: + direction, trigger_price = "short", short_trigger + else: + direction, trigger_price = "long", long_trigger + elif short_triggered: + direction, trigger_price = "short", short_trigger + elif long_triggered: + direction, trigger_price = "long", long_trigger + + if direction is None: + return None, None, None, None + if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction: + return None, None, None, None + return direction, trigger_price, prev, curr + + # ------------------------- 主循环 ------------------------- + def action(self) -> None: + + while True: + + for i in range(3): + if self.openBrowser(): + break + else: + self.ding("打开 TGE 失败!", error=True) + continue + + logger.info("TGE 浏览器已打开") + self.close_extra_tabs() + self.page.get(url=Config.TRADING_URL) + time.sleep(2) + if not self._get_token(): + self.ding("获取 token 失败", error=True) + return + self.click_safe('x:(//button[normalize-space(text()) = "市价"])') + logger.info("WEEX 三分之一策略(5分钟K线)开始监测") + + try: + kline_data = self.get_klines_5m() + if not kline_data or len(kline_data) < 3: + logger.warning("K线数据不足,等待重试...") + time.sleep(self.check_interval) + continue + + curr = kline_data[-1] + curr_kline_id = curr["id"] + curr_time_str = datetime.datetime.fromtimestamp(curr["id"] / 1000).strftime("%H:%M:%S") + + if not self.get_position_status(): + logger.warning("获取仓位失败,使用缓存") + + # 反手一:涨到上根最高/跌到上根最低 + 影线>0.01% + rev_dir, rev_prev = self.check_reverse_by_prev_high_low(kline_data) + rev_type = "一" + if not rev_dir: + rev_dir, rev_prev = self.check_reverse_by_prev_open(kline_data) + rev_type = "二" + + if rev_dir and self.last_trade_kline_id != curr_kline_id: + balance = self.get_available_balance() + amount = int((balance or 0) / self.position_ratio) + if amount > 0: + self.ding(f"反手信号{rev_type} {rev_dir} 金额={amount}") + if rev_dir == "long": + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=amount) + else: + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=amount) + self.last_trade_kline_id = curr_kline_id + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = rev_dir + time.sleep(self.check_interval) + continue + + # 主信号:三分之一触发 + direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data) + + if direction: + if self.last_trade_kline_id == curr_kline_id: + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1): + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + time.sleep(self.check_interval) + continue + + balance = self.get_available_balance() + amount = int((balance or 0) / self.position_ratio) + if amount <= 0: + time.sleep(self.check_interval) + continue + + executed = False + if direction == "long": + if self.start == -1: + self.ding("平空反手开多") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=1, size=amount) + executed = True + elif self.start == 0: + self.ding("无仓位开多") + self.开单(marketPriceLongOrder=1, size=amount) + executed = True + elif direction == "short": + if self.start == 1: + self.ding("平多反手开空") + self.平仓() + time.sleep(1) + self.开单(marketPriceLongOrder=-1, size=amount) + executed = True + elif self.start == 0: + self.ding("无仓位开空") + self.开单(marketPriceLongOrder=-1, size=amount) + executed = True + + if executed: + self.last_trade_kline_id = curr_kline_id + self.get_position_status() + self.ding(f"三分之一信号 {direction} 触发价={trigger_price:.2f} 金额={amount}") + self.last_trigger_kline_id = curr_kline_id + self.last_trigger_direction = direction + else: + logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}") + + time.sleep(self.check_interval) + if random.randint(1, 11) > 7: + self.page.close() + time.sleep(5) + + except KeyboardInterrupt: + logger.info("用户中断") + break + except Exception as e: + logger.error(f"主循环异常: {e}") + self.ding(f"运行异常: {e}", error=True) + time.sleep(self.check_interval) + + +if __name__ == "__main__": + WeexOneThirdStrategy(tge_id="86837a981aba4576be6916a0ef6ad785").action()