Files
lm_code/交易/weex-三分之一策略-5分钟交易.py
Administrator d07d3b6a1d haha
2026-02-03 17:57:13 +08:00

642 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WEEX ETH-USDT 永续合约 — 三分之一策略5分钟K线
策略规则(与 bitmart/三分之一策略-5分钟交易.py 一致):
1. 触发价格前一根有效K线实体>=0.1
- 做多触发价 = 收盘价 + 实体/3做空触发价 = 收盘价 - 实体/3
2. 信号当前5分钟K线最高>=做多触发→多;最低<=做空触发→空;同根多空都触发用开盘价距离判断先后
3. 反手一:持空且当前涨到上根最高且上根上影线>0.01%→反手多;持多且当前跌到上根最低且上根下影线>0.01%→反手空
4. 反手二:持多且上根上影线>0.01%且当前跌到上根开盘→反手空;持空且上根下影线>0.01%且当前涨到上根开盘→反手多
5. 同一根5分钟K线内只交易一次
"""
import random
import time
import datetime
from typing import Optional, Dict, List, Tuple
from tqdm import tqdm
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
from curl_cffi import requests
from bit_tools import openBrowser
from 交易.tools import send_dingtalk_message
# ==================== 配置常量 ====================
class Config:
TGE_URL = "http://127.0.0.1:50326"
TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91"
CONTRACT_ID = "10000002"
PRODUCT_CODE = "cmt_ethusdt"
KLINE_TYPE = "MINUTE_5" # 5分钟K线三分之一策略
KLINE_LIMIT = 100
TRADING_URL = "https://www.weex.com/zh-CN/futures/ETH-USDT"
POSITION_RATIO = 100
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 1
# 三分之一策略参数
MIN_BODY_SIZE = 0.1
MIN_SHADOW_PCT = 0.01
# ==================== 三分之一策略分析器 ====================
class OneThirdStrategyAnalyzer:
"""三分之一策略 K 线分析5分钟触发价=收盘价±实体/3 + 两个反手信号"""
@staticmethod
def get_body_size(candle: Dict) -> float:
return abs(float(candle['open']) - float(candle['close']))
@staticmethod
def find_valid_prev_bar(all_data: List[Dict], current_idx: int, min_body_size: float = 0.1) -> Tuple[
Optional[int], Optional[Dict]]:
if current_idx <= 0:
return None, None
for i in range(current_idx - 1, -1, -1):
prev = all_data[i]
if OneThirdStrategyAnalyzer.get_body_size(prev) >= min_body_size:
return i, prev
return None, None
@staticmethod
def get_one_third_levels(prev: Dict) -> Tuple[Optional[float], Optional[float]]:
p_open, p_close = float(prev['open']), float(prev['close'])
body = abs(p_open - p_close)
if body < 0.001:
return None, None
return p_close + body / 3, p_close - body / 3
@staticmethod
def get_upper_shadow(candle: Dict) -> float:
o, c, h = float(candle['open']), float(candle['close']), float(candle['high'])
return h - max(o, c)
@staticmethod
def get_lower_shadow(candle: Dict) -> float:
o, c, l = float(candle['open']), float(candle['close']), float(candle['low'])
return min(o, c) - l
@staticmethod
def upper_shadow_pct(candle: Dict) -> float:
o = float(candle['open'])
return (OneThirdStrategyAnalyzer.get_upper_shadow(candle) / o * 100) if o > 0 else 0.0
@staticmethod
def lower_shadow_pct(candle: Dict) -> float:
o = float(candle['open'])
return (OneThirdStrategyAnalyzer.get_lower_shadow(candle) / o * 100) if o > 0 else 0.0
@staticmethod
def check_reverse_by_prev_high_low(kline_data: List[Dict], start: int) -> Tuple[Optional[str], Optional[Dict]]:
if len(kline_data) < 2:
return None, None
curr, prev = kline_data[-1], kline_data[-2]
curr_high = float(curr['high'])
curr_low = float(curr['low'])
prev_high = float(prev['high'])
prev_low = float(prev['low'])
if start == -1 and curr_high >= prev_high and OneThirdStrategyAnalyzer.upper_shadow_pct(
prev) > Config.MIN_SHADOW_PCT:
return 'long', prev
if start == 1 and curr_low <= prev_low and OneThirdStrategyAnalyzer.lower_shadow_pct(
prev) > Config.MIN_SHADOW_PCT:
return 'short', prev
return None, None
@staticmethod
def check_reverse_by_prev_open(kline_data: List[Dict], start: int) -> Tuple[Optional[str], Optional[Dict]]:
if len(kline_data) < 2:
return None, None
curr, prev = kline_data[-1], kline_data[-2]
curr_high = float(curr['high'])
curr_low = float(curr['low'])
prev_open = float(prev['open'])
if start == 1 and OneThirdStrategyAnalyzer.upper_shadow_pct(
prev) > Config.MIN_SHADOW_PCT and curr_low <= prev_open:
return 'short', prev
if start == -1 and OneThirdStrategyAnalyzer.lower_shadow_pct(
prev) > Config.MIN_SHADOW_PCT and curr_high >= prev_open:
return 'long', prev
return None, None
@staticmethod
def check_realtime_trigger(
kline_data: List[Dict],
last_trigger_kline_id: Optional[int],
last_trigger_direction: Optional[str],
) -> Tuple[Optional[str], Optional[float], Optional[Dict], Optional[Dict]]:
if len(kline_data) < 2:
return None, None, None, None
curr = kline_data[-1]
curr_kline_id = curr['id']
curr_high = float(curr['high'])
curr_low = float(curr['low'])
curr_open = float(curr['open'])
valid_prev_idx, prev = OneThirdStrategyAnalyzer.find_valid_prev_bar(
kline_data, len(kline_data) - 1, Config.MIN_BODY_SIZE
)
if prev is None:
return None, None, None, None
long_trigger, short_trigger = OneThirdStrategyAnalyzer.get_one_third_levels(prev)
if long_trigger is None:
return None, None, None, None
long_triggered = curr_high >= long_trigger
short_triggered = curr_low <= short_trigger
both_triggered = long_triggered and short_triggered
direction = None
trigger_price = None
if both_triggered:
dist_to_long = abs(long_trigger - curr_open)
dist_to_short = abs(short_trigger - curr_open)
if dist_to_short <= dist_to_long:
direction, trigger_price = 'short', short_trigger
else:
direction, trigger_price = 'long', long_trigger
elif short_triggered:
direction, trigger_price = 'short', short_trigger
elif long_triggered:
direction, trigger_price = 'long', long_trigger
if direction is None:
return None, None, None, None
if last_trigger_kline_id == curr_kline_id and last_trigger_direction == direction:
return None, None, None, None
return direction, trigger_price, prev, curr
# ==================== 浏览器管理器 ====================
class BrowserManager:
def __init__(self, tge_id, tge_url: str, tge_headers: Dict):
self.tge_id = tge_id
self.tge_url = tge_url
self.tge_headers = tge_headers
self.tge_port: Optional[int] = None
self.page: Optional[ChromiumPage] = None
def openBrowser(self):
try:
bit_port = openBrowser(id=self.tge_id)
co = ChromiumOptions()
co.set_local_port(port=bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.tge_port = bit_port
return True
except Exception:
return False
def take_over_browser(self) -> bool:
if not self.tge_port:
return False
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except Exception:
return False
def close_extra_tabs(self) -> bool:
if not self.page:
return False
try:
for idx, tab in enumerate(self.page.get_tabs()):
if idx > 0:
tab.close()
return True
except Exception:
return False
# ==================== WEEX API 客户端 ====================
class WEEXApiClient:
def __init__(self):
self.session = requests.Session()
self.headers: Optional[Dict] = None
def update_headers(self, headers: Dict) -> None:
if not self.headers:
self.session.headers = headers
else:
self.session.headers.update(headers)
self.headers = headers
def get_kline_data(
self,
contract_id: str = Config.CONTRACT_ID,
product_code: str = Config.PRODUCT_CODE,
kline_type: str = Config.KLINE_TYPE,
limit: int = Config.KLINE_LIMIT,
) -> List[Dict]:
params = {
'contractId': contract_id,
'productCode': product_code,
'priceType': 'LAST_PRICE',
'klineType': kline_type,
'limit': str(limit),
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.get(
'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
params=params,
timeout=15,
)
if response.status_code != 200:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
continue
response_data = response.json()
if "data" not in response_data or "dataList" not in response_data["data"]:
continue
result = response_data["data"]["dataList"]
kline_data = []
for item in result:
kline_data.append({
'id': int(item[4]),
'open': float(item[3]),
'high': float(item[1]),
'low': float(item[2]),
'close': float(item[0]),
})
return kline_data
except Exception as e:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return []
def get_available_balance(self) -> Optional[float]:
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new',
timeout=15,
)
return float(response.json()["data"]["newContract"]["balanceList"][0]["available"])
except Exception:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return None
def get_position_status(self) -> Tuple[bool, Optional[List]]:
json_data = {
'filterContractIdList': [10000002],
'limit': 100,
'languageType': 0,
'sign': 'SIGN',
'timeZone': 'string',
}
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
response = self.session.post(
'https://http-gateway2.janapw.com/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
json=json_data,
timeout=15,
)
datas = response.json()["data"]["dataList"]
if not datas:
return True, None
return True, datas
except Exception:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return False, None
# ==================== Token 管理器 ====================
class TokenManager:
def __init__(self, api_client: WEEXApiClient, page: ChromiumPage):
self.api_client = api_client
self.page = page
def get_token(self) -> bool:
tab = self.page.new_tab()
tab.listen.start("/user/security/getLanguageType")
try:
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
try:
tab.get(url=Config.TRADING_URL)
res = tab.listen.wait(timeout=5)
if res.request.headers.get("U-TOKEN"):
self.api_client.update_headers(res.request.headers)
return True
except Exception:
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
time.sleep(Config.RETRY_DELAY)
return False
finally:
tab.close()
# ==================== 交易执行器 ====================
class TradingExecutor:
def __init__(self, page: ChromiumPage, api_client: WEEXApiClient):
self.page = page
self.api_client = api_client
def navigate_to_trading_page(self) -> bool:
try:
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
return True
except Exception:
return False
def close_all_positions(self) -> bool:
try:
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').click(by_js=True)
time.sleep(3)
return True
except Exception:
return False
def open_long(self, amount: float) -> bool:
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
self.page.ele('x://*[normalize-space(text()) ="买入开多"]').click(by_js=True)
return True
except Exception:
return False
def open_short(self, amount: float) -> bool:
try:
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
time.sleep(1)
self.page.ele('x://*[normalize-space(text()) ="卖出开空"]').click(by_js=True)
return True
except Exception:
return False
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
if (direction == "long" and current_position == PositionManager.POSITION_LONG) or \
(direction == "short" and current_position == PositionManager.POSITION_SHORT):
return True
if not self.navigate_to_trading_page():
return False
try:
if direction == "long":
if current_position == 0:
return self.open_long(amount)
elif current_position == -1:
if self.close_all_positions():
time.sleep(1)
return self.open_long(amount)
elif direction == "short":
if current_position == 0:
return self.open_short(amount)
elif current_position == 1:
if self.close_all_positions():
time.sleep(1)
return self.open_short(amount)
except Exception:
pass
return False
# ==================== 持仓管理器 ====================
class PositionManager:
POSITION_SHORT = -1
POSITION_NONE = 0
POSITION_LONG = 1
def __init__(self, trading_executor: TradingExecutor):
self.trading_executor = trading_executor
self.current_position: int = self.POSITION_NONE
self.position_data: Optional[List] = None
def update_position(self, position_data: Optional[List]) -> None:
self.position_data = position_data
if not position_data:
self.current_position = self.POSITION_NONE
return
position_data = list(position_data)
position_data.reverse()
start, start1 = 0, 0
for i in position_data:
direction = i.get("legacyOrderDirection")
if direction == "CLOSE_SHORT":
start = 0
elif direction == "CLOSE_LONG":
start1 = 0
elif direction == "OPEN_SHORT":
start -= 1
elif direction == "OPEN_LONG":
start1 += 1
if start1:
self.current_position = self.POSITION_LONG
elif start:
self.current_position = self.POSITION_SHORT
else:
self.current_position = self.POSITION_NONE
# ==================== 消息发送 ====================
class MessageSender:
@staticmethod
def send(msg: str, is_error: bool = False) -> None:
prefix = "❌weex三分之一" if is_error else "🔔weex三分之一"
for _ in range(15 if is_error else 1):
send_dingtalk_message(f"{prefix}{msg}")
# ==================== 5分钟K线时间工具 ====================
class TimeUtils5m:
@staticmethod
def get_current_kline_timestamp() -> int:
"""当前所在 5 分钟 K 线的时间戳(毫秒)"""
t = time.time()
dt = datetime.datetime.fromtimestamp(t)
minute = (dt.minute // 5) * 5
target = dt.replace(minute=minute, second=0, microsecond=0)
return int(target.timestamp()) * 1000
@staticmethod
def get_progress_bar_value() -> int:
"""0~4 表示当前 5 分钟内的分钟数"""
return datetime.datetime.now().minute % 5
# ==================== 主交易类 ====================
class WeexOneThirdTransaction:
"""WEEX 三分之一策略5分钟K线自动交易"""
def __init__(self, tge_id):
self.tge_id = tge_id
self.tge_headers = {
"Authorization": Config.TGE_AUTHORIZATION,
"Content-Type": "application/json",
}
self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers)
self.api_client = WEEXApiClient()
self.position_manager: Optional[PositionManager] = None
self.trading_executor: Optional[TradingExecutor] = None
self.token_manager: Optional[TokenManager] = None
self.pbar: Optional[tqdm] = None
self.last_kline_timestamp: Optional[int] = None
self.kline_data: List[Dict] = []
self.last_trigger_kline_id: Optional[int] = None
self.last_trigger_direction: Optional[str] = None
self.last_trade_kline_id: Optional[int] = None
def initialize(self) -> bool:
for i in range(3):
if self.browser_manager.openBrowser():
break
else:
MessageSender.send("打开浏览器失败", is_error=True)
if not self.browser_manager.take_over_browser():
MessageSender.send("接管浏览器失败", is_error=True)
return False
self.browser_manager.close_extra_tabs()
page = self.browser_manager.page
self.trading_executor = TradingExecutor(page, self.api_client)
self.position_manager = PositionManager(self.trading_executor)
self.token_manager = TokenManager(self.api_client, page)
page.get(url=Config.TRADING_URL)
if not self.token_manager.get_token():
logger.warning("初始化获取 token 失败将在获取K线时重试")
self.pbar = tqdm(total=5, desc="等待5分钟K线", ncols=80)
return True
def fetch_and_update_kline(self) -> bool:
if not self.token_manager.get_token():
return False
kline_data = self.api_client.get_kline_data()
if not kline_data or len(kline_data) < 3:
return False
sorted_data = sorted(kline_data, key=lambda x: x["id"])
self.kline_data = sorted_data
current_kline_id = sorted_data[-1]["id"]
current_ts = TimeUtils5m.get_current_kline_timestamp()
if current_kline_id != current_ts:
return False
if self.last_kline_timestamp == current_ts:
return False
self.last_kline_timestamp = current_ts
return True
def sync_position_status(self) -> bool:
success, position_data = self.api_client.get_position_status()
if not success:
return False
self.position_manager.update_position(position_data)
return True
def process_trading_logic(self) -> None:
self.token_manager.get_token()
self.browser_manager.page.get(url=Config.TRADING_URL)
if not self.sync_position_status():
return
kline_data = self.kline_data
curr = kline_data[-1]
curr_kline_id = curr["id"]
start = self.position_manager.current_position
# 反手一:涨到上根最高/跌到上根最低 + 影线>0.01%
rev_dir, rev_prev = OneThirdStrategyAnalyzer.check_reverse_by_prev_high_low(kline_data, start)
rev_type = ""
if not rev_dir:
rev_dir, rev_prev = OneThirdStrategyAnalyzer.check_reverse_by_prev_open(kline_data, start)
rev_type = ""
if rev_dir and self.last_trade_kline_id != curr_kline_id:
balance = self.api_client.get_available_balance()
amount = int((balance or 0) / Config.POSITION_RATIO)
if self.trading_executor.execute_trade(rev_dir, start, amount):
self.last_trade_kline_id = curr_kline_id
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = rev_dir
if rev_dir == "long":
self.position_manager.current_position = PositionManager.POSITION_LONG
else:
self.position_manager.current_position = PositionManager.POSITION_SHORT
MessageSender.send(f"反手信号{rev_type} {rev_dir},金额 {amount}")
return
# 主信号:三分之一触发
direction, trigger_price, valid_prev, curr_kline = OneThirdStrategyAnalyzer.check_realtime_trigger(
kline_data, self.last_trigger_kline_id, self.last_trigger_direction
)
if not direction:
return
if self.last_trade_kline_id == curr_kline_id:
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
return
if (direction == "long" and start == PositionManager.POSITION_LONG) or \
(direction == "short" and start == PositionManager.POSITION_SHORT):
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
return
balance = self.api_client.get_available_balance()
if balance is None:
return
amount = int(balance / Config.POSITION_RATIO)
if self.trading_executor.execute_trade(direction, start, amount):
self.last_trade_kline_id = curr_kline_id
self.last_trigger_kline_id = curr_kline_id
self.last_trigger_direction = direction
if direction == "long":
self.position_manager.current_position = PositionManager.POSITION_LONG
else:
self.position_manager.current_position = PositionManager.POSITION_SHORT
MessageSender.send(f"三分之一信号 {direction} 触发价={trigger_price:.2f} 金额={amount}")
def run(self) -> None:
logger.info("WEEX 三分之一策略5分钟K线开始运行")
while True:
if not self.initialize():
return
try:
if self.pbar:
self.pbar.n = TimeUtils5m.get_progress_bar_value()
self.pbar.refresh()
current_ts = TimeUtils5m.get_current_kline_timestamp()
if self.last_kline_timestamp == current_ts:
time.sleep(10)
continue
if not self.fetch_and_update_kline():
time.sleep(10)
continue
self.process_trading_logic()
if self.pbar:
self.pbar.reset()
time.sleep(5)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(e)
MessageSender.send(f"运行出错: {e}", is_error=True)
time.sleep(10)
if random.randint(1, 10) > 7:
self.browser_manager.page.close()
time.sleep(5)
def action(self) -> None:
self.run()
if __name__ == '__main__':
WeexOneThirdTransaction(tge_id="86837a981aba4576be6916a0ef6ad785").action()