加入 weex

This commit is contained in:
ddrwode
2026-02-03 17:07:40 +08:00
parent 784a2a3af3
commit fdcec74585
2 changed files with 631 additions and 2 deletions

6
1111
View File

@@ -36,8 +36,10 @@ Abc12345678
持多反手做空:价格跌到 开仓价 - 前一根实体 / 5
反手信号,
当前这根线线跌倒了上一根的三分之一,我开了空,但是这根涨到了上一根 k 线的最高价格,上一根 k 线的上阴线涨幅大于 0.01%,反手做多
第二个反手信号,当前这根做了多,上一根 k 线的上阴线涨幅大于 0.01%,当前这个根跌倒了上一根的开盘价格,反手做空

View File

@@ -0,0 +1,627 @@
"""
WEEX ETH-USDT 永续合约 — 三分之一策略5分钟K线
策略规则(与 bitmart/三分之一策略-5分钟交易.py 一致):
1. 触发价格前一根有效K线实体>=0.1
- 做多触发价 = 收盘价 + 实体/3做空触发价 = 收盘价 - 实体/3
2. 信号当前5分钟K线最高>=做多触发→多;最低<=做空触发→空;同根多空都触发用开盘价距离判断先后
3. 反手一:持空且当前涨到上根最高且上根上影线>0.01%→反手多;持多且当前跌到上根最低且上根下影线>0.01%→反手空
4. 反手二:持多且上根上影线>0.01%且当前跌到上根开盘→反手空;持空且上根下影线>0.01%且当前涨到上根开盘→反手多
5. 同一根5分钟K线内只交易一次
"""
import time
import datetime
from typing import Optional, Dict, List, Tuple
from tqdm import tqdm
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
from curl_cffi import requests
from bit_tools import openBrowser
from 交易.tools import send_dingtalk_message
# ==================== 配置常量 ====================
class Config:
TGE_URL = "http://127.0.0.1:50326"
TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91"
CONTRACT_ID = "10000002"
PRODUCT_CODE = "cmt_ethusdt"
KLINE_TYPE = "MINUTE_5" # 5分钟K线三分之一策略
KLINE_LIMIT = 100
TRADING_URL = "https://www.weex.com/zh-CN/futures/ETH-USDT"
POSITION_RATIO = 100
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 1
# 三分之一策略参数
MIN_BODY_SIZE = 0.1
MIN_SHADOW_PCT = 0.01
# ==================== 三分之一策略分析器 ====================
class OneThirdStrategyAnalyzer:
"""三分之一策略 K 线分析5分钟触发价=收盘价±实体/3 + 两个反手信号"""
@staticmethod
def get_body_size(candle: Dict) -> float:
return abs(float(candle['open']) - float(candle['close']))
@staticmethod
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1) -> Tuple[Optional[int], Optional[Dict]]:
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if OneThirdStrategyAnalyzer.get_body_size(prev) >= min_body_size:
return i, prev
return None, None
@staticmethod
def get_one_third_levels(prev: Dict) -> Tuple[Optional[float], Optional[float]]:
p_open, p_close = float(prev['open']), float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
return p_close + body / 3, p_close - body / 3
@staticmethod
def get_upper_shadow(candle: Dict) -> float:
o, c, h = float(candle['open']), float(candle['close']), float(candle['high'])
return h - max(o, c)
@staticmethod
def get_lower_shadow(candle: Dict) -> float:
o, c, l = float(candle['open']), float(candle['close']), float(candle['low'])
return min(o, c) - l
@staticmethod
def upper_shadow_pct(candle: Dict) -> float:
o = float(candle['open'])
return (OneThirdStrategyAnalyzer.get_upper_shadow(candle) / o * 100) if o > 0 else 0.0
@staticmethod
def lower_shadow_pct(candle: Dict) -> float:
o = float(candle['open'])
return (OneThirdStrategyAnalyzer.get_lower_shadow(candle) / o * 100) if o > 0 else 0.0
@staticmethod
def check_reverse_by_prev_high_low(kline_data: List[Dict], start: int) -> Tuple[Optional[str], Optional[Dict]]:
if len(kline_data) < 2:
return None, None
curr, prev = kline_data[-1], kline_data[-2]
curr_high = float(curr['high'])
curr_low = float(curr['low'])
prev_high = float(prev['high'])
prev_low = float(prev['low'])
if start == -1 and curr_high >= prev_high and OneThirdStrategyAnalyzer.upper_shadow_pct(prev) > Config.MIN_SHADOW_PCT:
return 'long', prev
if start == 1 and curr_low <= prev_low and OneThirdStrategyAnalyzer.lower_shadow_pct(prev) > Config.MIN_SHADOW_PCT:
return 'short', prev
return None, None
@staticmethod
def check_reverse_by_prev_open(kline_data: List[Dict], start: int) -> Tuple[Optional[str], Optional[Dict]]:
if len(kline_data) < 2:
return None, None
curr, prev = kline_data[-1], kline_data[-2]
curr_high = float(curr['high'])
curr_low = float(curr['low'])
prev_open = float(prev['open'])
if start == 1 and OneThirdStrategyAnalyzer.upper_shadow_pct(prev) > Config.MIN_SHADOW_PCT and curr_low <= prev_open:
return 'short', prev
if start == -1 and OneThirdStrategyAnalyzer.lower_shadow_pct(prev) > Config.MIN_SHADOW_PCT and curr_high >= prev_open:
return 'long', prev
return None, None
@staticmethod
def check_realtime_trigger(
kline_data: List[Dict],
last_trigger_kline_id: Optional[int],
last_trigger_direction: Optional[str],
) -> Tuple[Optional[str], Optional[float], Optional[Dict], Optional[Dict]]:
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr['id']
curr_high = float(curr['high'])
curr_low = float(curr['low'])
curr_open = float(curr['open'])
valid_prev_idx, prev = OneThirdStrategyAnalyzer.find_valid_prev_bar(
kline_data, len(kline_data) - 1, Config.MIN_BODY_SIZE
)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = OneThirdStrategyAnalyzer.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, None
long_triggered = curr_high >= long_trigger
short_triggered = curr_low <= short_trigger
both_triggered = long_triggered and short_triggered
direction = None
trigger_price = None
if both_triggered:
dist_to_long = abs(long_trigger - curr_open)
dist_to_short = abs(short_trigger - curr_open)
if dist_to_short <= dist_to_long:
direction, trigger_price = 'short', short_trigger
else:
direction, trigger_price = 'long', long_trigger
elif short_triggered:
direction, trigger_price = 'short', short_trigger
elif long_triggered:
direction, trigger_price = 'long', long_trigger
if direction is None:
return None, None, None, None
if last_trigger_kline_id == curr_kline_id and last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ==================== 浏览器管理器 ====================
class BrowserManager:
def __init__(self, tge_id, tge_url: str, tge_headers: Dict):
self.tge_id = tge_id
self.tge_url = tge_url
self.tge_headers = tge_headers
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def openBrowser(self):
try:
bit_port = openBrowser(id=self.tge_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.tge_port = bit_port
return True
except Exception:
return False
def take_over_browser(self) -> bool:
if not self.tge_port:
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except Exception:
return False
def close_extra_tabs(self) -> bool:
if not self.page:
return False
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
# ==================== WEEX API 客户端 ====================
class WEEXApiClient:
def __init__(self):
self.session = requests.Session()
self.headers: Optional[Dict] = None
def update_headers(self, headers: Dict) -> None:
if not self.headers:
self.session.headers = headers
else:
self.session.headers.update(headers)
self.headers = headers
def get_kline_data(
self,
contract_id: str = Config.CONTRACT_ID,
product_code: str = Config.PRODUCT_CODE,
kline_type: str = Config.KLINE_TYPE,
limit: int = Config.KLINE_LIMIT,
) -> List[Dict]:
params = {
'contractId': contract_id,
'productCode': product_code,
'priceType': 'LAST_PRICE',
'klineType': kline_type,
'limit': str(limit),
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.get(
'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
params=params,
timeout=15,
)
if response.status_code != 200:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
response_data = response.json()
if "data" not in response_data or "dataList" not in response_data["data"]:
continue
result = response_data["data"]["dataList"]
kline_data = []
for item in result:
kline_data.append({
'id': int(item[4]),
'open': float(item[3]),
'high': float(item[1]),
'low': float(item[2]),
'close': float(item[0]),
})
return kline_data
except Exception as e:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return []
def get_available_balance(self) -> Optional[float]:
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new',
timeout=15,
)
return float(response.json()["data"]["newContract"]["balanceList"][0]["available"])
except Exception:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return None
def get_position_status(self) -> Tuple[bool, Optional[List]]:
json_data = {
'filterContractIdList': [10000002],
'limit': 100,
'languageType': 0,
'sign': 'SIGN',
'timeZone': 'string',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
'https://http-gateway2.janapw.com/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
json=json_data,
timeout=15,
)
datas = response.json()["data"]["dataList"]
if not datas:
return True, None
return True, datas
except Exception:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return False, None
# ==================== Token 管理器 ====================
class TokenManager:
def __init__(self, api_client: WEEXApiClient, page: ChromiumPage):
self.api_client = api_client
self.page = page
def get_token(self) -> bool:
tab = self.page.new_tab()
tab.listen.start("/user/security/getLanguageType")
try:
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
tab.get(url=Config.TRADING_URL)
res = tab.listen.wait(timeout=5)
if res.request.headers.get("U-TOKEN"):
self.api_client.update_headers(res.request.headers)
return True
except Exception:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return False
finally:
tab.close()
# ==================== 交易执行器 ====================
class TradingExecutor:
def __init__(self, page: ChromiumPage, api_client: WEEXApiClient):
self.page = page
self.api_client = api_client
def navigate_to_trading_page(self) -> bool:
try:
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
return True
except Exception:
return False
def close_all_positions(self) -> bool:
try:
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').click(by_js=True)
time.sleep(3)
return True
except Exception:
return False
def open_long(self, amount: float) -> bool:
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
self.page.ele('x://*[normalize-space(text()) ="买入开多"]').click(by_js=True)
return True
except Exception:
return False
def open_short(self, amount: float) -> bool:
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
self.page.ele('x://*[normalize-space(text()) ="卖出开空"]').click(by_js=True)
return True
except Exception:
return False
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
if (direction == "long" and current_position == PositionManager.POSITION_LONG) or \
(direction == "short" and current_position == PositionManager.POSITION_SHORT):
return True
if not self.navigate_to_trading_page():
return False
try:
if direction == "long":
if current_position == 0:
return self.open_long(amount)
elif current_position == -1:
if self.close_all_positions():
time.sleep(1)
return self.open_long(amount)
elif direction == "short":
if current_position == 0:
return self.open_short(amount)
elif current_position == 1:
if self.close_all_positions():
time.sleep(1)
return self.open_short(amount)
except Exception:
pass
return False
# ==================== 持仓管理器 ====================
class PositionManager:
POSITION_SHORT = -1
POSITION_NONE = 0
POSITION_LONG = 1
def __init__(self, trading_executor: TradingExecutor):
self.trading_executor = trading_executor
self.current_position: int = self.POSITION_NONE
self.position_data: Optional[List] = None
def update_position(self, position_data: Optional[List]) -> None:
self.position_data = position_data
if not position_data:
self.current_position = self.POSITION_NONE
return
position_data = list(position_data)
position_data.reverse()
start, start1 = 0, 0
for i in position_data:
direction = i.get("legacyOrderDirection")
if direction == "CLOSE_SHORT":
start = 0
elif direction == "CLOSE_LONG":
start1 = 0
elif direction == "OPEN_SHORT":
start -= 1
elif direction == "OPEN_LONG":
start1 += 1
if start1:
self.current_position = self.POSITION_LONG
elif start:
self.current_position = self.POSITION_SHORT
else:
self.current_position = self.POSITION_NONE
# ==================== 消息发送 ====================
class MessageSender:
@staticmethod
def send(msg: str, is_error: bool = False) -> None:
prefix = "❌weex三分之一" if is_error else "🔔weex三分之一"
for _ in range(15 if is_error else 1):
send_dingtalk_message(f"{prefix}{msg}")
# ==================== 5分钟K线时间工具 ====================
class TimeUtils5m:
@staticmethod
def get_current_kline_timestamp() -> int:
"""当前所在 5 分钟 K 线的时间戳(毫秒)"""
t = time.time()
dt = datetime.datetime.fromtimestamp(t)
minute = (dt.minute // 5) * 5
target = dt.replace(minute=minute, second=0, microsecond=0)
return int(target.timestamp()) * 1000
@staticmethod
def get_progress_bar_value() -> int:
"""0~4 表示当前 5 分钟内的分钟数"""
return datetime.datetime.now().minute % 5
# ==================== 主交易类 ====================
class WeexOneThirdTransaction:
"""WEEX 三分之一策略5分钟K线自动交易"""
def __init__(self, tge_id):
self.tge_id = tge_id
self.tge_headers = {
"Authorization": Config.TGE_AUTHORIZATION,
"Content-Type": "application/json",
}
self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers)
self.api_client = WEEXApiClient()
self.position_manager: Optional[PositionManager] = None
self.trading_executor: Optional[TradingExecutor] = None
self.token_manager: Optional[TokenManager] = None
self.pbar: Optional[tqdm] = None
self.last_kline_timestamp: Optional[int] = None
self.kline_data: List[Dict] = []
self.last_trigger_kline_id: Optional[int] = None
self.last_trigger_direction: Optional[str] = None
self.last_trade_kline_id: Optional[int] = None
def initialize(self) -> bool:
if not self.browser_manager.openBrowser():
MessageSender.send("打开浏览器失败", is_error=True)
return False
if not self.browser_manager.take_over_browser():
MessageSender.send("接管浏览器失败", is_error=True)
return False
self.browser_manager.close_extra_tabs()
page = self.browser_manager.page
self.trading_executor = TradingExecutor(page, self.api_client)
self.position_manager = PositionManager(self.trading_executor)
self.token_manager = TokenManager(self.api_client, page)
page.get(url=Config.TRADING_URL)
if not self.token_manager.get_token():
logger.warning("初始化获取 token 失败将在获取K线时重试")
self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80)
return True
def fetch_and_update_kline(self) -> bool:
if not self.token_manager.get_token():
return False
kline_data = self.api_client.get_kline_data()
if not kline_data or len(kline_data) < 3:
return False
sorted_data = sorted(kline_data, key=lambda x: x["id"])
self.kline_data = sorted_data
current_kline_id = sorted_data[-1]["id"]
current_ts = TimeUtils5m.get_current_kline_timestamp()
if current_kline_id != current_ts:
return False
if self.last_kline_timestamp == current_ts:
return False
self.last_kline_timestamp = current_ts
return True
def sync_position_status(self) -> bool:
success, position_data = self.api_client.get_position_status()
if not success:
return False
self.position_manager.update_position(position_data)
return True
def process_trading_logic(self) -> None:
self.token_manager.get_token()
self.browser_manager.page.get(url=Config.TRADING_URL)
if not self.sync_position_status():
return
kline_data = self.kline_data
curr = kline_data[-1]
curr_kline_id = curr["id"]
start = self.position_manager.current_position
# 反手一:涨到上根最高/跌到上根最低 + 影线>0.01%
rev_dir, rev_prev = OneThirdStrategyAnalyzer.check_reverse_by_prev_high_low(kline_data, start)
rev_type = ""
if not rev_dir:
rev_dir, rev_prev = OneThirdStrategyAnalyzer.check_reverse_by_prev_open(kline_data, start)
rev_type = ""
if rev_dir and self.last_trade_kline_id != curr_kline_id:
balance = self.api_client.get_available_balance()
amount = int((balance or 0) / Config.POSITION_RATIO)
if self.trading_executor.execute_trade(rev_dir, start, amount):
self.last_trade_kline_id = curr_kline_id
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = rev_dir
if rev_dir == "long":
self.position_manager.current_position = PositionManager.POSITION_LONG
else:
self.position_manager.current_position = PositionManager.POSITION_SHORT
MessageSender.send(f"反手信号{rev_type} {rev_dir},金额 {amount}")
return
# 主信号:三分之一触发
direction, trigger_price, valid_prev, curr_kline = OneThirdStrategyAnalyzer.check_realtime_trigger(
kline_data, self.last_trigger_kline_id, self.last_trigger_direction
)
if not direction:
return
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
return
if (direction == "long" and start == PositionManager.POSITION_LONG) or \
(direction == "short" and start == PositionManager.POSITION_SHORT):
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
return
balance = self.api_client.get_available_balance()
if balance is None:
return
amount = int(balance / Config.POSITION_RATIO)
if self.trading_executor.execute_trade(direction, start, amount):
self.last_trade_kline_id = curr_kline_id
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if direction == "long":
self.position_manager.current_position = PositionManager.POSITION_LONG
else:
self.position_manager.current_position = PositionManager.POSITION_SHORT
MessageSender.send(f"三分之一信号 {direction} 触发价={trigger_price:.2f} 金额={amount}")
def run(self) -> None:
if not self.initialize():
return
logger.info("WEEX 三分之一策略5分钟K线开始运行")
while True:
try:
if self.pbar:
self.pbar.n = TimeUtils5m.get_progress_bar_value()
self.pbar.refresh()
current_ts = TimeUtils5m.get_current_kline_timestamp()
if self.last_kline_timestamp == current_ts:
time.sleep(10)
continue
if not self.fetch_and_update_kline():
time.sleep(10)
continue
self.process_trading_logic()
if self.pbar:
self.pbar.reset()
time.sleep(5)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
MessageSender.send(f"运行出错: {e}", is_error=True)
time.sleep(10)
def action(self) -> None:
self.run()
if __name__ == '__main__':
WeexOneThirdTransaction(tge_id="86837a981aba4576be6916a0ef6ad785").action()