加入 weex
This commit is contained in:
289
weex/三分之一策略-5分钟交易.py
Normal file
289
weex/三分之一策略-5分钟交易.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""
|
||||
WEEX 三分之一策略 — 5分钟K线交易
|
||||
基于 weex/框架.py,策略逻辑与 bitmart/三分之一策略-5分钟交易.py 一致。
|
||||
|
||||
策略规则:
|
||||
1. 触发价(前一根有效K线实体>=0.1):做多=收盘+实体/3,做空=收盘-实体/3
|
||||
2. 信号:当前5分钟K线最高>=做多触发→多;最低<=做空触发→空;同根都触发用开盘价距离判断先后
|
||||
3. 反手一:持空且当前涨到上根最高且上根上影线>0.01%→反手多;持多且当前跌到上根最低且上根下影线>0.01%→反手空
|
||||
4. 反手二:持多且上根上影线>0.01%且当前跌到上根开盘→反手空;持空且上根下影线>0.01%且当前涨到上根开盘→反手多
|
||||
5. 同一根5分钟K线内只交易一次
|
||||
"""
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
|
||||
from tqdm import tqdm
|
||||
from loguru import logger
|
||||
|
||||
# 保证可导入同目录下的 框架
|
||||
_weex_dir = Path(__file__).resolve().parent
|
||||
if str(_weex_dir) not in sys.path:
|
||||
sys.path.insert(0, str(_weex_dir))
|
||||
from 框架 import Config, WeexFuturesTransaction
|
||||
|
||||
# 5 分钟 K 线类型(若 WEEX 接口名不同可改为实际值)
|
||||
KLINE_TYPE_5M = "MINUTE_5"
|
||||
|
||||
|
||||
class WeexOneThirdStrategy(WeexFuturesTransaction):
|
||||
"""WEEX 三分之一策略(5分钟K线),继承 weex/框架.py"""
|
||||
|
||||
def __init__(self, tge_id):
|
||||
super().__init__(tge_id)
|
||||
self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80)
|
||||
self.min_body_size = 0.1
|
||||
self.min_shadow_pct = 0.01
|
||||
self.check_interval = 3
|
||||
self.position_ratio = 100 # 开仓金额 = 余额 / position_ratio
|
||||
self.last_trigger_kline_id: Optional[int] = None
|
||||
self.last_trigger_direction: Optional[str] = None
|
||||
self.last_trade_kline_id: Optional[int] = None
|
||||
|
||||
# ------------------------- 5分钟K线 -------------------------
|
||||
def get_klines_5m(self) -> Optional[List[Dict]]:
|
||||
"""获取 5 分钟 K 线"""
|
||||
return self.get_klines(kline_type=KLINE_TYPE_5M, limit=100)
|
||||
|
||||
# ------------------------- 三分之一策略核心 -------------------------
|
||||
def get_body_size(self, candle: Dict) -> float:
|
||||
return abs(float(candle["open"]) - float(candle["close"]))
|
||||
|
||||
def find_valid_prev_bar(self, all_data: List[Dict], current_idx: int) -> Tuple[Optional[int], Optional[Dict]]:
|
||||
if current_idx <= 0:
|
||||
return None, None
|
||||
for i in range(current_idx - 1, -1, -1):
|
||||
prev = all_data[i]
|
||||
if self.get_body_size(prev) >= self.min_body_size:
|
||||
return i, prev
|
||||
return None, None
|
||||
|
||||
def get_one_third_levels(self, prev: Dict) -> Tuple[Optional[float], Optional[float]]:
|
||||
p_open, p_close = float(prev["open"]), float(prev["close"])
|
||||
body = abs(p_open - p_close)
|
||||
if body < 0.001:
|
||||
return None, None
|
||||
return p_close + body / 3, p_close - body / 3
|
||||
|
||||
def get_upper_shadow(self, candle: Dict) -> float:
|
||||
o, c, h = float(candle["open"]), float(candle["close"]), float(candle["high"])
|
||||
return h - max(o, c)
|
||||
|
||||
def get_lower_shadow(self, candle: Dict) -> float:
|
||||
o, c, l = float(candle["open"]), float(candle["close"]), float(candle["low"])
|
||||
return min(o, c) - l
|
||||
|
||||
def upper_shadow_pct(self, candle: Dict) -> float:
|
||||
o = float(candle["open"])
|
||||
return (self.get_upper_shadow(candle) / o * 100) if o > 0 else 0.0
|
||||
|
||||
def lower_shadow_pct(self, candle: Dict) -> float:
|
||||
o = float(candle["open"])
|
||||
return (self.get_lower_shadow(candle) / o * 100) if o > 0 else 0.0
|
||||
|
||||
def check_reverse_by_prev_high_low(self, kline_data: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]:
|
||||
if len(kline_data) < 2:
|
||||
return None, None
|
||||
curr, prev = kline_data[-1], kline_data[-2]
|
||||
curr_high = float(curr["high"])
|
||||
curr_low = float(curr["low"])
|
||||
prev_high = float(prev["high"])
|
||||
prev_low = float(prev["low"])
|
||||
if self.start == -1 and curr_high >= prev_high and self.upper_shadow_pct(prev) > self.min_shadow_pct:
|
||||
return "long", prev
|
||||
if self.start == 1 and curr_low <= prev_low and self.lower_shadow_pct(prev) > self.min_shadow_pct:
|
||||
return "short", prev
|
||||
return None, None
|
||||
|
||||
def check_reverse_by_prev_open(self, kline_data: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]:
|
||||
if len(kline_data) < 2:
|
||||
return None, None
|
||||
curr, prev = kline_data[-1], kline_data[-2]
|
||||
curr_high = float(curr["high"])
|
||||
curr_low = float(curr["low"])
|
||||
prev_open = float(prev["open"])
|
||||
if self.start == 1 and self.upper_shadow_pct(prev) > self.min_shadow_pct and curr_low <= prev_open:
|
||||
return "short", prev
|
||||
if self.start == -1 and self.lower_shadow_pct(prev) > self.min_shadow_pct and curr_high >= prev_open:
|
||||
return "long", prev
|
||||
return None, None
|
||||
|
||||
def check_realtime_trigger(
|
||||
self, kline_data: List[Dict]
|
||||
) -> Tuple[Optional[str], Optional[float], Optional[Dict], Optional[Dict]]:
|
||||
if len(kline_data) < 2:
|
||||
return None, None, None, None
|
||||
curr = kline_data[-1]
|
||||
curr_kline_id = curr["id"]
|
||||
curr_high = float(curr["high"])
|
||||
curr_low = float(curr["low"])
|
||||
curr_open = float(curr["open"])
|
||||
|
||||
valid_prev_idx, prev = self.find_valid_prev_bar(kline_data, len(kline_data) - 1)
|
||||
if prev is None:
|
||||
return None, None, None, None
|
||||
|
||||
long_trigger, short_trigger = self.get_one_third_levels(prev)
|
||||
if long_trigger is None:
|
||||
return None, None, None, None
|
||||
|
||||
long_triggered = curr_high >= long_trigger
|
||||
short_triggered = curr_low <= short_trigger
|
||||
both_triggered = long_triggered and short_triggered
|
||||
|
||||
direction = None
|
||||
trigger_price = None
|
||||
if both_triggered:
|
||||
dist_to_long = abs(long_trigger - curr_open)
|
||||
dist_to_short = abs(short_trigger - curr_open)
|
||||
if dist_to_short <= dist_to_long:
|
||||
direction, trigger_price = "short", short_trigger
|
||||
else:
|
||||
direction, trigger_price = "long", long_trigger
|
||||
elif short_triggered:
|
||||
direction, trigger_price = "short", short_trigger
|
||||
elif long_triggered:
|
||||
direction, trigger_price = "long", long_trigger
|
||||
|
||||
if direction is None:
|
||||
return None, None, None, None
|
||||
if self.last_trigger_kline_id == curr_kline_id and self.last_trigger_direction == direction:
|
||||
return None, None, None, None
|
||||
return direction, trigger_price, prev, curr
|
||||
|
||||
# ------------------------- 主循环 -------------------------
|
||||
def action(self) -> None:
|
||||
|
||||
while True:
|
||||
|
||||
for i in range(3):
|
||||
if self.openBrowser():
|
||||
break
|
||||
else:
|
||||
self.ding("打开 TGE 失败!", error=True)
|
||||
continue
|
||||
|
||||
logger.info("TGE 浏览器已打开")
|
||||
self.close_extra_tabs()
|
||||
self.page.get(url=Config.TRADING_URL)
|
||||
time.sleep(2)
|
||||
if not self._get_token():
|
||||
self.ding("获取 token 失败", error=True)
|
||||
return
|
||||
self.click_safe('x:(//button[normalize-space(text()) = "市价"])')
|
||||
logger.info("WEEX 三分之一策略(5分钟K线)开始监测")
|
||||
|
||||
try:
|
||||
kline_data = self.get_klines_5m()
|
||||
if not kline_data or len(kline_data) < 3:
|
||||
logger.warning("K线数据不足,等待重试...")
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
curr = kline_data[-1]
|
||||
curr_kline_id = curr["id"]
|
||||
curr_time_str = datetime.datetime.fromtimestamp(curr["id"] / 1000).strftime("%H:%M:%S")
|
||||
|
||||
if not self.get_position_status():
|
||||
logger.warning("获取仓位失败,使用缓存")
|
||||
|
||||
# 反手一:涨到上根最高/跌到上根最低 + 影线>0.01%
|
||||
rev_dir, rev_prev = self.check_reverse_by_prev_high_low(kline_data)
|
||||
rev_type = "一"
|
||||
if not rev_dir:
|
||||
rev_dir, rev_prev = self.check_reverse_by_prev_open(kline_data)
|
||||
rev_type = "二"
|
||||
|
||||
if rev_dir and self.last_trade_kline_id != curr_kline_id:
|
||||
balance = self.get_available_balance()
|
||||
amount = int((balance or 0) / self.position_ratio)
|
||||
if amount > 0:
|
||||
self.ding(f"反手信号{rev_type} {rev_dir} 金额={amount}")
|
||||
if rev_dir == "long":
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=1, size=amount)
|
||||
else:
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=-1, size=amount)
|
||||
self.last_trade_kline_id = curr_kline_id
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = rev_dir
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
# 主信号:三分之一触发
|
||||
direction, trigger_price, valid_prev, curr_kline = self.check_realtime_trigger(kline_data)
|
||||
|
||||
if direction:
|
||||
if self.last_trade_kline_id == curr_kline_id:
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = direction
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
if (direction == "long" and self.start == 1) or (direction == "short" and self.start == -1):
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = direction
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
balance = self.get_available_balance()
|
||||
amount = int((balance or 0) / self.position_ratio)
|
||||
if amount <= 0:
|
||||
time.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
executed = False
|
||||
if direction == "long":
|
||||
if self.start == -1:
|
||||
self.ding("平空反手开多")
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=1, size=amount)
|
||||
executed = True
|
||||
elif self.start == 0:
|
||||
self.ding("无仓位开多")
|
||||
self.开单(marketPriceLongOrder=1, size=amount)
|
||||
executed = True
|
||||
elif direction == "short":
|
||||
if self.start == 1:
|
||||
self.ding("平多反手开空")
|
||||
self.平仓()
|
||||
time.sleep(1)
|
||||
self.开单(marketPriceLongOrder=-1, size=amount)
|
||||
executed = True
|
||||
elif self.start == 0:
|
||||
self.ding("无仓位开空")
|
||||
self.开单(marketPriceLongOrder=-1, size=amount)
|
||||
executed = True
|
||||
|
||||
if executed:
|
||||
self.last_trade_kline_id = curr_kline_id
|
||||
self.get_position_status()
|
||||
self.ding(f"三分之一信号 {direction} 触发价={trigger_price:.2f} 金额={amount}")
|
||||
self.last_trigger_kline_id = curr_kline_id
|
||||
self.last_trigger_direction = direction
|
||||
else:
|
||||
logger.debug(f"[{curr_time_str}] O={curr['open']:.2f} H={curr['high']:.2f} L={curr['low']:.2f}")
|
||||
|
||||
time.sleep(self.check_interval)
|
||||
if random.randint(1, 11) > 7:
|
||||
self.page.close()
|
||||
time.sleep(5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("用户中断")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"主循环异常: {e}")
|
||||
self.ding(f"运行异常: {e}", error=True)
|
||||
time.sleep(self.check_interval)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
WeexOneThirdStrategy(tge_id="86837a981aba4576be6916a0ef6ad785").action()
|
||||
Reference in New Issue
Block a user