Files
lm_code/weex/三分之一策略-5分钟交易.py
Administrator fd63961e3c haha
2026-02-03 18:26:58 +08:00

294 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WEEX 三分之一策略 — 5分钟K线交易
基于 weex/框架.py策略逻辑与 bitmart/三分之一策略-5分钟交易.py 一致。
策略规则:
1. 触发价前一根有效K线实体>=0.1):做多=收盘+实体/3做空=收盘-实体/3
2. 信号当前5分钟K线最高>=做多触发→多;最低<=做空触发→空;同根都触发用开盘价距离判断先后
3. 反手一:持空且当前涨到上根最高且上根上影线>0.01%→反手多;持多且当前跌到上根最低且上根下影线>0.01%→反手空
4. 反手二:持多且上根上影线>0.01%且当前跌到上根开盘→反手空;持空且上根下影线>0.01%且当前涨到上根开盘→反手多
5. 同一根5分钟K线内只交易一次
"""
import random
import sys
import time
import datetime
from pathlib import Path
from typing import Optional, Dict, List, Tuple
from tqdm import tqdm
from loguru import logger
# 保证可导入同目录下的 框架
_weex_dir = Path(__file__).resolve().parent
if str(_weex_dir) not in sys.path:
sys.path.insert(0, str(_weex_dir))
from 框架 import Config, WeexFuturesTransaction
# 5 分钟 K 线类型(若 WEEX 接口名不同可改为实际值)
KLINE_TYPE_5M = "MINUTE_5"
class WeexOneThirdStrategy(WeexFuturesTransaction):
"""WEEX 三分之一策略5分钟K线继承 weex/框架.py"""
def __init__(self, tge_id):
super().__init__(tge_id)
self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80)
self.min_body_size = 0.1
self.min_shadow_pct = 0.01
self.check_interval = 5
self.position_ratio = 100 # 开仓金额 = 余额 / position_ratio
self.last_trigger_kline_id: Optional[int] = None
self.last_trigger_direction: Optional[str] = None
self.last_trade_kline_id: Optional[int] = None
# ------------------------- 5分钟K线 -------------------------
def get_klines_5m(self) -> Optional[List[Dict]]:
"""获取 5 分钟 K 线"""
return self.get_klines(kline_type=KLINE_TYPE_5M, limit=100)
# ------------------------- 三分之一策略核心 -------------------------
def get_body_size(self, candle: Dict) -> float:
return abs(float(candle["open"]) - float(candle["close"]))
def find_valid_prev_bar(self, all_data: List[Dict], current_idx: int) -> Tuple[Optional[int], Optional[Dict]]:
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if self.get_body_size(prev) >= self.min_body_size:
return i, prev
return None, None
def get_one_third_levels(self, prev: Dict) -> Tuple[Optional[float], Optional[float]]:
p_open, p_close = float(prev["open"]), float(prev["close"])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
return p_close + body / 3, p_close - body / 3
def get_upper_shadow(self, candle: Dict) -> float:
o, c, h = float(candle["open"]), float(candle["close"]), float(candle["high"])
return h - max(o, c)
def get_lower_shadow(self, candle: Dict) -> float:
o, c, l = float(candle["open"]), float(candle["close"]), float(candle["low"])
return min(o, c) - l
def upper_shadow_pct(self, candle: Dict) -> float:
o = float(candle["open"])
return (self.get_upper_shadow(candle) / o * 100) if o > 0 else 0.0
def lower_shadow_pct(self, candle: Dict) -> float:
o = float(candle["open"])
return (self.get_lower_shadow(candle) / o * 100) if o > 0 else 0.0
def check_reverse_by_prev_high_low(self, kline_data: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]:
if len(kline_data) < 2:
return None, None
curr, prev = kline_data[-1], kline_data[-2]
curr_high = float(curr["high"])
curr_low = float(curr["low"])
prev_high = float(prev["high"])
prev_low = float(prev["low"])
if self.start == -1 and curr_high >= prev_high and self.upper_shadow_pct(prev) > self.min_shadow_pct:
return "long", prev
if self.start == 1 and curr_low <= prev_low and self.lower_shadow_pct(prev) > self.min_shadow_pct:
return "short", prev
return None, None
def check_reverse_by_prev_open(self, kline_data: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]:
if len(kline_data) < 2:
return None, None
curr, prev = kline_data[-1], kline_data[-2]
curr_high = float(curr["high"])
curr_low = float(curr["low"])
prev_open = float(prev["open"])
if self.start == 1 and self.upper_shadow_pct(prev) > self.min_shadow_pct and curr_low <= prev_open:
return "short", prev
if self.start == -1 and self.lower_shadow_pct(prev) > self.min_shadow_pct and curr_high >= prev_open:
return "long", prev
return None, None
def check_realtime_trigger(
self, kline_data: List[Dict]
) -> Tuple[Optional[str], Optional[float], Optional[Dict], Optional[Dict]]:
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr["id"]
curr_high = float(curr["high"])
curr_low = float(curr["low"])
curr_open = float(curr["open"])
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = self.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, None
long_triggered = curr_high >= long_trigger
short_triggered = curr_low <= short_trigger
both_triggered = long_triggered and short_triggered
direction = None
trigger_price = None
if both_triggered:
dist_to_long = abs(long_trigger - curr_open)
dist_to_short = abs(short_trigger - curr_open)
if dist_to_short <= dist_to_long:
direction, trigger_price = "short", short_trigger
else:
direction, trigger_price = "long", long_trigger
elif short_triggered:
direction, trigger_price = "short", short_trigger
elif long_triggered:
direction, trigger_price = "long", long_trigger
if direction is None:
return None, None, None, None
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ------------------------- 主循环 -------------------------
def action(self) -> None:
n = 0
while True:
for i in range(3):
if self.openBrowser():
break
else:
self.ding("打开 TGE 失败!", error=True)
continue
logger.info("TGE 浏览器已打开")
self.close_extra_tabs()
self.page.get(url=Config.TRADING_URL)
time.sleep(self.check_interval)
if n == 0 or random.randint(1,11)> 6:
n = 1
if not self._get_token():
self.ding("获取 token 失败", error=True)
return
self.click_safe('x:(//button[normalize-space(text()) = "市价"])')
logger.info("WEEX 三分之一策略5分钟K线开始监测")
try:
kline_data = self.get_klines_5m()
if not kline_data or len(kline_data) < 3:
logger.warning("K线数据不足等待重试...")
time.sleep(self.check_interval)
continue
curr = kline_data[-1]
curr_kline_id = curr["id"]
curr_time_str = datetime.datetime.fromtimestamp(curr["id"] / 1000).strftime("%H:%M:%S")
if not self.get_position_status():
logger.warning("获取仓位失败,使用缓存")
# 反手一:涨到上根最高/跌到上根最低 + 影线>0.01%
rev_dir, rev_prev = self.check_reverse_by_prev_high_low(kline_data)
rev_type = ""
if not rev_dir:
rev_dir, rev_prev = self.check_reverse_by_prev_open(kline_data)
rev_type = ""
if rev_dir and self.last_trade_kline_id != curr_kline_id:
balance = self.get_available_balance()
amount = int((balance or 0) / self.position_ratio)
if amount > 0:
self.ding(f"反手信号{rev_type} {rev_dir} 金额={amount}")
if rev_dir == "long":
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=amount)
else:
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=amount)
self.last_trade_kline_id = curr_kline_id
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = rev_dir
time.sleep(self.check_interval)
continue
# 主信号:三分之一触发
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
if direction:
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
time.sleep(self.check_interval)
continue
balance = self.get_available_balance()
amount = int((balance or 0) / self.position_ratio)
if amount <= 0:
time.sleep(self.check_interval)
continue
executed = False
if direction == "long":
if self.start == -1:
self.ding("平空反手开多")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=1, size=amount)
executed = True
elif self.start == 0:
self.ding("无仓位开多")
self.开单(marketPriceLongOrder=1, size=amount)
executed = True
elif direction == "short":
if self.start == 1:
self.ding("平多反手开空")
self.平仓()
time.sleep(1)
self.开单(marketPriceLongOrder=-1, size=amount)
executed = True
elif self.start == 0:
self.ding("无仓位开空")
self.开单(marketPriceLongOrder=-1, size=amount)
executed = True
if executed:
self.last_trade_kline_id = curr_kline_id
self.get_position_status()
self.ding(f"三分之一信号 {direction} 触发价={trigger_price:.2f} 金额={amount}")
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
else:
logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
time.sleep(self.check_interval)
if random.randint(1, 11) > 7:
self.page.close()
time.sleep(5)
except KeyboardInterrupt:
logger.info("用户中断")
break
except Exception as e:
logger.error(f"主循环异常: {e}")
self.ding(f"运行异常: {e}", error=True)
time.sleep(self.check_interval)
if __name__ == "__main__":
WeexOneThirdStrategy(tge_id="86837a981aba4576be6916a0ef6ad785").action()