bitmart优化完成
This commit is contained in:
Binary file not shown.
Binary file not shown.
BIN
telegram/sign.db
BIN
telegram/sign.db
Binary file not shown.
739
交易/websea-结构优化.py
Normal file
739
交易/websea-结构优化.py
Normal file
@@ -0,0 +1,739 @@
|
||||
"""
|
||||
WebSea ETH-USDT 永续合约自动交易系统
|
||||
优化版本:改进代码结构、模块化和可读性
|
||||
"""
|
||||
import time
|
||||
import datetime
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
from tqdm import tqdm
|
||||
from loguru import logger
|
||||
from DrissionPage import ChromiumOptions, ChromiumPage
|
||||
from curl_cffi import requests
|
||||
|
||||
from 交易.tools import send_dingtalk_message
|
||||
|
||||
|
||||
class KlineAnalyzer:
|
||||
"""K线分析器:负责K线数据分析和信号判断"""
|
||||
|
||||
@staticmethod
|
||||
def is_bullish(candle: Dict) -> bool:
|
||||
"""判断是否为阳线"""
|
||||
return float(candle['close']) > float(candle['open'])
|
||||
|
||||
@staticmethod
|
||||
def is_bearish(candle: Dict) -> bool:
|
||||
"""判断是否为阴线"""
|
||||
return float(candle['close']) < float(candle['open'])
|
||||
|
||||
@staticmethod
|
||||
def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
包住形态信号判定(仅30分钟K线):
|
||||
- 前跌后涨包住 -> 做多
|
||||
- 前涨后跌包住 -> 做空
|
||||
|
||||
Returns:
|
||||
(direction, signal_type): direction为"long"或"short",signal_type为信号类型
|
||||
"""
|
||||
c_open, c_close = float(curr['open']), float(curr['close'])
|
||||
p_open = float(prev['open'])
|
||||
|
||||
# 前跌后涨包住 -> 做多
|
||||
if (KlineAnalyzer.is_bullish(curr) and
|
||||
KlineAnalyzer.is_bearish(prev) and
|
||||
int(c_close) >= int(p_open)):
|
||||
return "long", "bear_bull_engulf"
|
||||
|
||||
# 前涨后跌包住 -> 做空
|
||||
if (KlineAnalyzer.is_bearish(curr) and
|
||||
KlineAnalyzer.is_bullish(prev) and
|
||||
int(c_close) <= int(p_open)):
|
||||
return "short", "bull_bear_engulf"
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
class BrowserManager:
|
||||
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
|
||||
|
||||
def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict):
|
||||
self.tge_id = tge_id
|
||||
self.tge_url = tge_url
|
||||
self.tge_headers = tge_headers
|
||||
self.tge_port: Optional[int] = None
|
||||
self.page: Optional[ChromiumPage] = None
|
||||
|
||||
def open_browser(self) -> bool:
|
||||
"""打开浏览器并获取端口"""
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{self.tge_url}/api/browser/start",
|
||||
json={"envId": self.tge_id},
|
||||
headers=self.tge_headers,
|
||||
timeout=10
|
||||
)
|
||||
self.tge_port = response.json()["data"]["port"]
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"打开浏览器失败: {e}")
|
||||
return False
|
||||
|
||||
def take_over_browser(self) -> bool:
|
||||
"""接管浏览器"""
|
||||
if not self.tge_port:
|
||||
logger.error("浏览器端口未设置")
|
||||
return False
|
||||
|
||||
try:
|
||||
co = ChromiumOptions()
|
||||
co.set_local_port(self.tge_port)
|
||||
self.page = ChromiumPage(addr_or_opts=co)
|
||||
self.page.set.window.max()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"接管浏览器失败: {e}")
|
||||
return False
|
||||
|
||||
def close_extra_tabs(self) -> bool:
|
||||
"""关闭多余的标签页,只保留第一个"""
|
||||
if not self.page:
|
||||
return False
|
||||
|
||||
try:
|
||||
tabs = self.page.get_tabs()
|
||||
for idx, tab in enumerate(tabs):
|
||||
if idx == 0:
|
||||
continue
|
||||
tab.close()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"关闭多余标签页失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class ApiClient:
|
||||
"""API客户端:负责与WebSea API的交互"""
|
||||
|
||||
def __init__(self):
|
||||
self.session = requests.Session()
|
||||
self.headers: Dict = {}
|
||||
self.cookies: Dict = {}
|
||||
|
||||
def update_session(self, cookies: Dict, headers: Dict) -> None:
|
||||
"""更新session的cookies和headers"""
|
||||
self.session.cookies.update(cookies)
|
||||
self.session.headers.update(headers)
|
||||
|
||||
def get_kline_data(self, symbol: str = 'ETH-USDT', period: str = '30min',
|
||||
days: int = 10) -> List[Dict]:
|
||||
"""
|
||||
获取K线数据
|
||||
|
||||
Args:
|
||||
symbol: 交易对符号
|
||||
period: K线周期
|
||||
days: 获取最近N天的数据
|
||||
|
||||
Returns:
|
||||
K线数据列表
|
||||
"""
|
||||
now_ts = int(time.time())
|
||||
start_ts = int(time.time() - 86400 * days)
|
||||
|
||||
params = {
|
||||
'symbol': symbol,
|
||||
'period': period,
|
||||
'start': str(start_ts),
|
||||
'end': str(now_ts),
|
||||
}
|
||||
|
||||
for attempt in range(3):
|
||||
logger.info(f"获取最新K线数据:第{attempt + 1}次尝试...")
|
||||
try:
|
||||
response = self.session.get(
|
||||
'https://capi.websea.com/webApi/market/getKline',
|
||||
params=params,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
result = response.json()["result"]['data']
|
||||
kline_data = []
|
||||
for data in result:
|
||||
kline_data.append({
|
||||
'id': int(data["id"]),
|
||||
'open': float(data["open"]),
|
||||
'high': float(data["high"]),
|
||||
'low': float(data["low"]),
|
||||
'close': float(data["close"])
|
||||
})
|
||||
return kline_data
|
||||
except Exception as e:
|
||||
logger.warning(f"获取K线数据失败: {e}")
|
||||
if attempt < 2:
|
||||
time.sleep(1)
|
||||
|
||||
return []
|
||||
|
||||
def get_available_balance(self) -> Optional[float]:
|
||||
"""获取可用余额"""
|
||||
for attempt in range(3):
|
||||
try:
|
||||
response = self.session.get(
|
||||
'https://capi.websea.com/webApi/asset/account',
|
||||
timeout=5
|
||||
)
|
||||
return float(response.json()["result"]["available"])
|
||||
except Exception as e:
|
||||
logger.warning(f"获取余额失败: {e}")
|
||||
if attempt < 2:
|
||||
time.sleep(1)
|
||||
|
||||
return None
|
||||
|
||||
def get_position_status(self) -> Tuple[bool, Optional[Dict]]:
|
||||
"""
|
||||
获取持仓状态
|
||||
|
||||
Returns:
|
||||
(success, position_data): success为是否成功,position_data为持仓信息
|
||||
"""
|
||||
for attempt in range(3):
|
||||
try:
|
||||
response = self.session.get(
|
||||
'https://capi.websea.com/webApi/entrust/holdPosition',
|
||||
timeout=5
|
||||
)
|
||||
resp_data = response.json()
|
||||
|
||||
if not resp_data["result"]:
|
||||
return True, {"type": 0, "position": None}
|
||||
|
||||
position = resp_data["result"][0]
|
||||
position_info = {
|
||||
"type": position["type"], # 1: 多, 2: 空
|
||||
"position": position
|
||||
}
|
||||
return True, position_info
|
||||
except Exception as e:
|
||||
logger.warning(f"获取持仓状态失败: {e}")
|
||||
if attempt < 2:
|
||||
time.sleep(1)
|
||||
|
||||
return False, None
|
||||
|
||||
|
||||
class TokenManager:
|
||||
"""Token管理器:负责获取和更新认证token"""
|
||||
|
||||
def __init__(self, api_client: ApiClient, page: ChromiumPage):
|
||||
self.api_client = api_client
|
||||
self.page = page
|
||||
|
||||
def get_token(self) -> bool:
|
||||
"""从浏览器中获取token和cookies"""
|
||||
try:
|
||||
tab = self.page.new_tab()
|
||||
tab.listen.start("api.websea.com/webApi/zendesk/url")
|
||||
tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
|
||||
|
||||
res = tab.listen.wait()
|
||||
|
||||
# 提取cookies
|
||||
cookies = {}
|
||||
for cookie in res.request.cookies:
|
||||
if not cookie["name"].startswith(':'):
|
||||
cookies[cookie["name"]] = cookie["value"]
|
||||
|
||||
# 提取headers
|
||||
headers = {}
|
||||
if "token" in res.request.headers:
|
||||
headers["token"] = res.request.headers["token"]
|
||||
|
||||
tab.close()
|
||||
|
||||
# 检查是否有shareToken
|
||||
if cookies.get('shareToken'):
|
||||
self.api_client.update_session(cookies, headers)
|
||||
return True
|
||||
else:
|
||||
logger.warning("未获取到shareToken")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"获取token失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class TradingExecutor:
|
||||
"""交易执行器:负责执行交易操作"""
|
||||
|
||||
def __init__(self, page: ChromiumPage, api_client: ApiClient):
|
||||
self.page = page
|
||||
self.api_client = api_client
|
||||
|
||||
def navigate_to_trading_page(self) -> bool:
|
||||
"""导航到交易页面"""
|
||||
try:
|
||||
self.page.ele('x://*[normalize-space(text())= "市价"]', timeout=15).click()
|
||||
time.sleep(1)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"导航到交易页面失败: {e}")
|
||||
return False
|
||||
|
||||
def close_all_positions(self) -> bool:
|
||||
"""平仓所有持仓"""
|
||||
try:
|
||||
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
|
||||
time.sleep(1)
|
||||
self.page.actions.click('x://*[normalize-space(text())= "市价全平"]')
|
||||
time.sleep(3)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"平仓失败: {e}")
|
||||
return False
|
||||
|
||||
def open_long(self, amount: float) -> bool:
|
||||
"""开多"""
|
||||
try:
|
||||
self.page.ele('x://*[@id="amountInput"]').input(amount)
|
||||
time.sleep(1)
|
||||
self.page.actions.click('x://*[normalize-space(text())= "买入/做多"]')
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"开多失败: {e}")
|
||||
return False
|
||||
|
||||
def open_short(self, amount: float) -> bool:
|
||||
"""开空"""
|
||||
try:
|
||||
self.page.ele('x://*[@id="amountInput"]').input(amount)
|
||||
time.sleep(1)
|
||||
self.page.actions.click('x://*[normalize-space(text())= "卖出/做空"]')
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"开空失败: {e}")
|
||||
return False
|
||||
|
||||
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
|
||||
"""
|
||||
执行交易操作
|
||||
|
||||
Args:
|
||||
direction: 交易方向 "long" 或 "short"
|
||||
current_position: 当前持仓状态 -1:空, 0:无, 1:多
|
||||
amount: 交易金额
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
if not self.navigate_to_trading_page():
|
||||
return False
|
||||
|
||||
try:
|
||||
# 开多
|
||||
if direction == "long":
|
||||
if current_position == 0:
|
||||
logger.success(f"{datetime.datetime.now()},开多,金额:{amount}")
|
||||
self.open_long(amount)
|
||||
elif current_position == -1:
|
||||
logger.success(f"{datetime.datetime.now()},反手平空做多,金额:{amount}")
|
||||
self.close_all_positions()
|
||||
self.open_long(amount)
|
||||
|
||||
# 开空
|
||||
elif direction == "short":
|
||||
if current_position == 0:
|
||||
logger.success(f"{datetime.datetime.now()},开空,金额:{amount}")
|
||||
self.open_short(amount)
|
||||
elif current_position == 1:
|
||||
logger.success(f"{datetime.datetime.now()},反手平多做空,金额:{amount}")
|
||||
self.close_all_positions()
|
||||
self.open_short(amount)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"执行交易失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class PositionManager:
|
||||
"""持仓管理器:负责持仓状态管理和止损"""
|
||||
|
||||
def __init__(self, trading_executor: TradingExecutor):
|
||||
self.trading_executor = trading_executor
|
||||
self.current_position: int = 0 # -1:空, 0:无, 1:多
|
||||
|
||||
def update_position(self, position_type: int) -> None:
|
||||
"""更新持仓状态"""
|
||||
self.current_position = position_type
|
||||
|
||||
def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool:
|
||||
"""
|
||||
检查止损条件
|
||||
|
||||
Args:
|
||||
kline_1: 倒数第二根K线
|
||||
kline_2: 倒数第一根K线
|
||||
|
||||
Returns:
|
||||
是否执行了止损
|
||||
"""
|
||||
try:
|
||||
# 持多仓,连续两根阴线,平多
|
||||
if self.current_position == 1:
|
||||
if (KlineAnalyzer.is_bearish(kline_1) and
|
||||
KlineAnalyzer.is_bearish(kline_2)):
|
||||
logger.success(
|
||||
f"{datetime.datetime.now()},止损信号:连续两根阴线,平多"
|
||||
)
|
||||
self.trading_executor.close_all_positions()
|
||||
self.current_position = 0
|
||||
return True
|
||||
|
||||
# 持空仓,连续两根阳线,平空
|
||||
elif self.current_position == -1:
|
||||
if (KlineAnalyzer.is_bullish(kline_1) and
|
||||
KlineAnalyzer.is_bullish(kline_2)):
|
||||
logger.success(
|
||||
f"{datetime.datetime.now()},止损信号:连续两根阳线,平空"
|
||||
)
|
||||
self.trading_executor.close_all_positions()
|
||||
self.current_position = 0
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"止损检查失败: {e}")
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class MessageSender:
|
||||
"""消息发送器:负责发送钉钉消息"""
|
||||
|
||||
@staticmethod
|
||||
def send_dingtalk_message(message_content: str, is_error: bool = False) -> None:
|
||||
"""
|
||||
发送钉钉消息
|
||||
|
||||
Args:
|
||||
message_content: 消息内容
|
||||
is_error: 是否为错误消息(错误消息会发送多次)
|
||||
"""
|
||||
if is_error:
|
||||
prefix = "❌websea:"
|
||||
count = 15
|
||||
else:
|
||||
prefix = "🔔websea:"
|
||||
count = 1
|
||||
|
||||
for _ in range(count):
|
||||
send_dingtalk_message(f"{prefix}{message_content}")
|
||||
|
||||
@staticmethod
|
||||
def format_position_message(position_data: Optional[Dict],
|
||||
available_balance: float) -> str:
|
||||
"""格式化持仓信息消息"""
|
||||
if position_data and position_data.get("position"):
|
||||
pos = position_data["position"]
|
||||
direction = "多" if int(pos['type']) == 1 else "空"
|
||||
contracts = int(pos['number'])
|
||||
eth_amount = float(pos['numberConvert'])
|
||||
open_price = float(pos['openPriceAvg'])
|
||||
current_price = float(pos['markPrice'])
|
||||
unrealized_pnl = float(pos['unProfitLoss'])
|
||||
pnl_rate = float(pos['profitRate'])
|
||||
|
||||
return (
|
||||
f"【WebSea ETH-USDT 永续持仓】\n"
|
||||
f"持仓方向:{direction}\n"
|
||||
f"持仓张数:{contracts} 张\n"
|
||||
f"持仓数量(eth):{eth_amount:.3f} ETH\n"
|
||||
f"持仓数量(usdt):{float(pos['numberConvertU']) / 100:.3f} USDT\n"
|
||||
f"开仓均价:{open_price:.2f} USDT\n"
|
||||
f"标记现价:{current_price:.2f} USDT\n"
|
||||
f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n"
|
||||
f"账户可用:{available_balance:.2f} USDT"
|
||||
)
|
||||
else:
|
||||
return (
|
||||
f"【WebSea ETH-USDT 永续持仓】\n"
|
||||
f"持仓方向:无\n"
|
||||
f"账户可用:{available_balance:.2f} USDT"
|
||||
)
|
||||
|
||||
|
||||
class TimeUtils:
|
||||
"""时间工具类"""
|
||||
|
||||
@staticmethod
|
||||
def get_current_kline_timestamp() -> int:
|
||||
"""
|
||||
获取当前K线的时间戳(30分钟K线的整点或30分时刻)
|
||||
|
||||
Returns:
|
||||
时间戳
|
||||
"""
|
||||
current_timestamp = time.time()
|
||||
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
|
||||
|
||||
# 计算距离当前时间最近的整点或30分时刻
|
||||
if current_datetime.minute < 30:
|
||||
target_datetime = current_datetime.replace(
|
||||
minute=0, second=0, microsecond=0
|
||||
)
|
||||
else:
|
||||
target_datetime = current_datetime.replace(
|
||||
minute=30, second=0, microsecond=0
|
||||
)
|
||||
|
||||
return int(target_datetime.timestamp())
|
||||
|
||||
@staticmethod
|
||||
def get_formatted_time() -> str:
|
||||
"""获取格式化的当前时间字符串"""
|
||||
timestamp = time.time()
|
||||
local_time = time.localtime(timestamp)
|
||||
return time.strftime("%Y-%m-%d %H:%M:%S", local_time)
|
||||
|
||||
@staticmethod
|
||||
def get_progress_bar_value() -> int:
|
||||
"""获取进度条的当前值(0-29)"""
|
||||
current_time = time.localtime()
|
||||
current_minute = current_time.tm_min
|
||||
return current_minute if current_minute < 30 else current_minute - 30
|
||||
|
||||
|
||||
class WeexTransaction:
|
||||
"""WebSea自动交易主类"""
|
||||
|
||||
def __init__(self, tge_id: int):
|
||||
# 配置
|
||||
self.tge_id = tge_id
|
||||
self.tge_url = "http://127.0.0.1:50326"
|
||||
self.tge_headers = {
|
||||
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
self.trading_url = "https://www.websea.com/zh-CN/futures/ETH-USDT"
|
||||
|
||||
# 组件初始化
|
||||
self.browser_manager = BrowserManager(self.tge_id, self.tge_url, self.tge_headers)
|
||||
self.api_client = ApiClient()
|
||||
self.position_manager = None # 需要在浏览器初始化后创建
|
||||
self.trading_executor = None # 需要在浏览器初始化后创建
|
||||
self.token_manager = None # 需要在浏览器初始化后创建
|
||||
|
||||
# 状态
|
||||
self.pbar: Optional[tqdm] = None
|
||||
self.last_kline_id: Optional[int] = None
|
||||
self.kline_1: Optional[Dict] = None
|
||||
self.kline_2: Optional[Dict] = None
|
||||
self.kline_3: Optional[Dict] = None
|
||||
|
||||
def initialize(self) -> bool:
|
||||
"""初始化所有组件"""
|
||||
# 打开浏览器
|
||||
if not self.browser_manager.open_browser():
|
||||
logger.error("打开浏览器失败")
|
||||
MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True)
|
||||
return False
|
||||
|
||||
# 接管浏览器
|
||||
if not self.browser_manager.take_over_browser():
|
||||
logger.error("接管浏览器失败")
|
||||
MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True)
|
||||
return False
|
||||
|
||||
# 关闭多余标签页
|
||||
self.browser_manager.close_extra_tabs()
|
||||
|
||||
# 初始化需要page的组件
|
||||
page = self.browser_manager.page
|
||||
self.trading_executor = TradingExecutor(page, self.api_client)
|
||||
self.position_manager = PositionManager(self.trading_executor)
|
||||
self.token_manager = TokenManager(self.api_client, page)
|
||||
|
||||
# 打开交易页面
|
||||
page.get(url=self.trading_url)
|
||||
|
||||
# 初始化进度条
|
||||
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80)
|
||||
|
||||
return True
|
||||
|
||||
def update_progress_bar(self) -> None:
|
||||
"""更新进度条"""
|
||||
if self.pbar:
|
||||
self.pbar.n = TimeUtils.get_progress_bar_value()
|
||||
self.pbar.refresh()
|
||||
|
||||
def fetch_and_update_kline(self) -> bool:
|
||||
"""获取并更新K线数据"""
|
||||
kline_data = self.api_client.get_kline_data()
|
||||
if not kline_data:
|
||||
logger.warning("获取K线数据失败")
|
||||
MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True)
|
||||
return False
|
||||
|
||||
# 排序并取最后三根
|
||||
sorted_data = sorted(kline_data, key=lambda x: x["id"])
|
||||
self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:]
|
||||
|
||||
# 检查是否是新K线
|
||||
current_kline_id = self.kline_3["id"]
|
||||
if self.last_kline_id == current_kline_id:
|
||||
return False # 没有新K线
|
||||
|
||||
self.last_kline_id = current_kline_id
|
||||
return True
|
||||
|
||||
def sync_position_status(self) -> bool:
|
||||
"""同步持仓状态"""
|
||||
success, position_info = self.api_client.get_position_status()
|
||||
if not success:
|
||||
logger.error("获取持仓状态失败")
|
||||
MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True)
|
||||
return False
|
||||
|
||||
if position_info:
|
||||
pos_type = position_info.get("type", 0)
|
||||
if pos_type == 1:
|
||||
self.position_manager.current_position = 1
|
||||
elif pos_type == 2:
|
||||
self.position_manager.current_position = -1
|
||||
else:
|
||||
self.position_manager.current_position = 0
|
||||
|
||||
return True
|
||||
|
||||
def process_trading_logic(self) -> None:
|
||||
"""处理交易逻辑"""
|
||||
page = self.browser_manager.page
|
||||
|
||||
# 刷新页面
|
||||
page.get(url=self.trading_url)
|
||||
|
||||
# 获取token
|
||||
if not self.token_manager.get_token():
|
||||
logger.error("获取token失败")
|
||||
MessageSender.send_dingtalk_message("获取token失败", is_error=True)
|
||||
return
|
||||
|
||||
# 同步持仓状态
|
||||
if not self.sync_position_status():
|
||||
return
|
||||
|
||||
# 检查止损
|
||||
try:
|
||||
self.position_manager.check_stop_loss(self.kline_1, self.kline_2)
|
||||
except Exception as e:
|
||||
logger.error(f"止损检查出错: {e}")
|
||||
MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True)
|
||||
return
|
||||
|
||||
# 检查交易信号
|
||||
direction, signal_type = KlineAnalyzer.check_engulfing_signal(
|
||||
prev=self.kline_1,
|
||||
curr=self.kline_2
|
||||
)
|
||||
|
||||
if direction:
|
||||
# 获取可用余额
|
||||
balance = self.api_client.get_available_balance()
|
||||
if balance is None:
|
||||
logger.error("获取可用余额失败")
|
||||
MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True)
|
||||
return
|
||||
|
||||
amount = balance / 100
|
||||
|
||||
# 执行交易
|
||||
if not self.trading_executor.execute_trade(
|
||||
direction=direction,
|
||||
current_position=self.position_manager.current_position,
|
||||
amount=amount
|
||||
):
|
||||
MessageSender.send_dingtalk_message(
|
||||
f"交易执行失败,方向:{direction}",
|
||||
is_error=True
|
||||
)
|
||||
return
|
||||
|
||||
# 更新持仓状态
|
||||
if direction == "long":
|
||||
self.position_manager.current_position = 1
|
||||
elif direction == "short":
|
||||
self.position_manager.current_position = -1
|
||||
|
||||
# 发送交易消息
|
||||
MessageSender.send_dingtalk_message(
|
||||
f"信号:{direction},开仓金额:{amount:.2f}",
|
||||
is_error=False
|
||||
)
|
||||
|
||||
# 再次同步持仓状态(交易后)
|
||||
if not self.sync_position_status():
|
||||
return
|
||||
|
||||
# 发送持仓信息
|
||||
balance = self.api_client.get_available_balance()
|
||||
if balance is not None:
|
||||
success, position_info = self.api_client.get_position_status()
|
||||
if success:
|
||||
msg = MessageSender.format_position_message(position_info, balance)
|
||||
MessageSender.send_dingtalk_message(msg, is_error=False)
|
||||
|
||||
def run(self) -> None:
|
||||
"""主运行循环"""
|
||||
# 初始化
|
||||
if not self.initialize():
|
||||
return
|
||||
|
||||
logger.info("系统初始化完成,开始运行...")
|
||||
|
||||
# 主循环
|
||||
while True:
|
||||
try:
|
||||
# 更新进度条
|
||||
self.update_progress_bar()
|
||||
|
||||
# 获取当前K线时间戳
|
||||
current_timestamp = TimeUtils.get_current_kline_timestamp()
|
||||
|
||||
# 检查是否是新K线(避免重复处理)
|
||||
if self.kline_3 and self.kline_3["id"] == current_timestamp:
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 获取并更新K线数据
|
||||
if not self.fetch_and_update_kline():
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 处理交易逻辑
|
||||
self.process_trading_logic()
|
||||
|
||||
# 重置进度条
|
||||
if self.pbar:
|
||||
self.pbar.reset()
|
||||
|
||||
# 等待一段时间
|
||||
time.sleep(5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("收到中断信号,正在退出...")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"运行出错: {e}")
|
||||
MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True)
|
||||
time.sleep(10)
|
||||
|
||||
def action(self) -> None:
|
||||
"""兼容旧接口的方法名"""
|
||||
self.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
transaction = WeexTransaction(tge_id=191303)
|
||||
transaction.action()
|
||||
|
||||
885
交易/weex-结构优化.py
Normal file
885
交易/weex-结构优化.py
Normal file
@@ -0,0 +1,885 @@
|
||||
"""
|
||||
WEEX ETH-USDT 永续合约自动交易系统
|
||||
优化版本:改进代码结构、模块化和可读性
|
||||
"""
|
||||
import time
|
||||
import datetime
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
from tqdm import tqdm
|
||||
from loguru import logger
|
||||
from DrissionPage import ChromiumOptions, ChromiumPage
|
||||
from curl_cffi import requests
|
||||
|
||||
from 交易.tools import send_dingtalk_message
|
||||
|
||||
|
||||
# ==================== 配置常量 ====================
|
||||
class Config:
|
||||
"""配置类:集中管理所有配置常量"""
|
||||
# TGE浏览器配置
|
||||
TGE_URL = "http://127.0.0.1:50326"
|
||||
TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91"
|
||||
|
||||
# WEEX API配置
|
||||
CONTRACT_ID = "10000002"
|
||||
PRODUCT_CODE = "cmt_ethusdt"
|
||||
KLINE_TYPE = "MINUTE_30"
|
||||
KLINE_LIMIT = 300
|
||||
|
||||
# 交易页面URL
|
||||
TRADING_URL = "https://www.weeaxs.site/zh-CN/futures/ETH-USDT"
|
||||
|
||||
# 交易配置
|
||||
POSITION_RATIO = 100 # 开仓金额比例(余额的1/100)
|
||||
|
||||
# 重试配置
|
||||
MAX_RETRY_ATTEMPTS = 3
|
||||
RETRY_DELAY = 1 # 秒
|
||||
|
||||
|
||||
# ==================== K线分析器 ====================
|
||||
class KlineAnalyzer:
|
||||
"""K线分析器:负责K线数据分析和信号判断"""
|
||||
|
||||
@staticmethod
|
||||
def is_bullish(candle: Dict) -> bool:
|
||||
"""判断是否为阳线"""
|
||||
return float(candle['close']) > float(candle['open'])
|
||||
|
||||
@staticmethod
|
||||
def is_bearish(candle: Dict) -> bool:
|
||||
"""判断是否为阴线"""
|
||||
return float(candle['close']) < float(candle['open'])
|
||||
|
||||
@staticmethod
|
||||
def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
包住形态信号判定(30分钟K线):
|
||||
- 前跌后涨包住 -> 做多
|
||||
- 前涨后跌包住 -> 做空
|
||||
|
||||
Returns:
|
||||
(direction, signal_type): direction为"long"或"short",signal_type为信号类型
|
||||
"""
|
||||
p_open, p_close = float(prev['open']), float(prev['close'])
|
||||
c_open, c_close = float(curr['open']), float(curr['close'])
|
||||
|
||||
# 前跌后涨包住 -> 做多
|
||||
if (KlineAnalyzer.is_bullish(curr) and
|
||||
KlineAnalyzer.is_bearish(prev) and
|
||||
int(c_open) <= int(p_close) and
|
||||
int(c_close) >= int(p_open)):
|
||||
return "long", "bear_bull_engulf"
|
||||
|
||||
# 前涨后跌包住 -> 做空
|
||||
if (KlineAnalyzer.is_bearish(curr) and
|
||||
KlineAnalyzer.is_bullish(prev) and
|
||||
int(c_open) >= int(p_close) and
|
||||
int(c_close) <= int(p_open)):
|
||||
return "short", "bull_bear_engulf"
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
# ==================== 浏览器管理器 ====================
|
||||
class BrowserManager:
|
||||
"""浏览器管理器:负责浏览器的启动、接管和标签页管理"""
|
||||
|
||||
def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict):
|
||||
self.tge_id = tge_id
|
||||
self.tge_url = tge_url
|
||||
self.tge_headers = tge_headers
|
||||
self.tge_port: Optional[int] = None
|
||||
self.page: Optional[ChromiumPage] = None
|
||||
|
||||
def open_browser(self) -> bool:
|
||||
"""打开浏览器并获取端口"""
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{self.tge_url}/api/browser/start",
|
||||
json={"envId": self.tge_id},
|
||||
headers=self.tge_headers,
|
||||
timeout=10
|
||||
)
|
||||
self.tge_port = response.json()["data"]["port"]
|
||||
logger.success(f"成功打开浏览器,端口:{self.tge_port}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"打开浏览器失败: {e}")
|
||||
return False
|
||||
|
||||
def take_over_browser(self) -> bool:
|
||||
"""接管浏览器"""
|
||||
if not self.tge_port:
|
||||
logger.error("浏览器端口未设置")
|
||||
return False
|
||||
|
||||
try:
|
||||
co = ChromiumOptions()
|
||||
co.set_local_port(self.tge_port)
|
||||
self.page = ChromiumPage(addr_or_opts=co)
|
||||
self.page.set.window.max()
|
||||
logger.success("成功接管浏览器")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"接管浏览器失败: {e}")
|
||||
return False
|
||||
|
||||
def close_extra_tabs(self) -> bool:
|
||||
"""关闭多余的标签页,只保留第一个"""
|
||||
if not self.page:
|
||||
return False
|
||||
|
||||
try:
|
||||
tabs = self.page.get_tabs()
|
||||
closed_count = 0
|
||||
for idx, tab in enumerate(tabs):
|
||||
if idx == 0:
|
||||
continue
|
||||
tab.close()
|
||||
closed_count += 1
|
||||
if closed_count > 0:
|
||||
logger.info(f"已关闭{closed_count}个多余标签页")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"关闭多余标签页失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ==================== WEEX API客户端 ====================
|
||||
class WEEXApiClient:
|
||||
"""WEEX API客户端:负责与WEEX API的交互"""
|
||||
|
||||
def __init__(self):
|
||||
self.session = requests.Session()
|
||||
self.headers: Optional[Dict] = None
|
||||
|
||||
def update_headers(self, headers: Dict) -> None:
|
||||
"""更新session的headers"""
|
||||
if not self.headers:
|
||||
self.session.headers = headers
|
||||
else:
|
||||
self.session.headers.update(headers)
|
||||
self.headers = headers
|
||||
|
||||
def get_kline_data(self, contract_id: str = Config.CONTRACT_ID,
|
||||
product_code: str = Config.PRODUCT_CODE,
|
||||
kline_type: str = Config.KLINE_TYPE,
|
||||
limit: int = Config.KLINE_LIMIT) -> List[Dict]:
|
||||
"""
|
||||
获取K线数据
|
||||
|
||||
Args:
|
||||
contract_id: 合约ID
|
||||
product_code: 产品代码
|
||||
kline_type: K线类型
|
||||
limit: 返回数量限制
|
||||
|
||||
Returns:
|
||||
K线数据列表
|
||||
"""
|
||||
params = {
|
||||
'contractId': contract_id,
|
||||
'productCode': product_code,
|
||||
'priceType': 'LAST_PRICE',
|
||||
'klineType': kline_type,
|
||||
'limit': str(limit),
|
||||
'timeZone': 'string',
|
||||
'languageType': '1',
|
||||
'sign': 'SIGN',
|
||||
}
|
||||
|
||||
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
|
||||
logger.info(f"获取最新K线数据:第{attempt + 1}/{Config.MAX_RETRY_ATTEMPTS}次尝试...")
|
||||
try:
|
||||
response = self.session.get(
|
||||
'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2',
|
||||
params=params,
|
||||
timeout=15
|
||||
)
|
||||
|
||||
# 检查响应状态码
|
||||
if response.status_code != 200:
|
||||
logger.warning(f"获取K线数据失败,HTTP状态码:{response.status_code}")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
continue
|
||||
|
||||
# 尝试解析JSON
|
||||
try:
|
||||
response_data = response.json()
|
||||
except Exception as json_error:
|
||||
logger.warning(f"JSON解析失败,响应内容:{response.text[:200]}...")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
continue
|
||||
|
||||
# 检查数据结构
|
||||
if "data" not in response_data or "dataList" not in response_data["data"]:
|
||||
logger.warning(f"响应数据格式不正确:{response_data}")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
continue
|
||||
|
||||
result = response_data["data"]["dataList"]
|
||||
kline_data = []
|
||||
for item in result:
|
||||
# WEEX API返回格式: [close, high, low, open, id]
|
||||
kline_data.append({
|
||||
'id': int(item[4]),
|
||||
'open': float(item[3]),
|
||||
'high': float(item[1]),
|
||||
'low': float(item[2]),
|
||||
'close': float(item[0])
|
||||
})
|
||||
logger.success(f"成功获取{len(kline_data)}条K线数据")
|
||||
return kline_data
|
||||
except Exception as e:
|
||||
logger.warning(f"获取K线数据失败(第{attempt + 1}次尝试): {e}")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
|
||||
logger.error("获取K线数据失败:已重试3次仍未成功")
|
||||
return []
|
||||
|
||||
def get_available_balance(self) -> Optional[float]:
|
||||
"""获取可用余额"""
|
||||
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
|
||||
try:
|
||||
response = self.session.post(
|
||||
'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new',
|
||||
timeout=15
|
||||
)
|
||||
balance = float(response.json()["data"]["newContract"]["balanceList"][0]["available"])
|
||||
logger.success(f"成功获取可用余额: {balance}")
|
||||
return balance
|
||||
except Exception as e:
|
||||
logger.warning(f"获取余额失败(第{attempt + 1}次尝试): {e}")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
|
||||
logger.error("获取余额失败:已重试3次仍未成功")
|
||||
return None
|
||||
|
||||
def get_position_status(self) -> Tuple[bool, Optional[Dict]]:
|
||||
"""
|
||||
获取持仓状态
|
||||
|
||||
Returns:
|
||||
(success, position_data): success为是否成功,position_data为持仓信息
|
||||
"""
|
||||
json_data = {
|
||||
'filterContractIdList': [10000002],
|
||||
'limit': 100,
|
||||
'languageType': 0,
|
||||
'sign': 'SIGN',
|
||||
'timeZone': 'string',
|
||||
}
|
||||
|
||||
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
|
||||
try:
|
||||
response = self.session.post(
|
||||
'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage',
|
||||
json=json_data,
|
||||
timeout=15
|
||||
)
|
||||
|
||||
datas = response.json()["data"]["dataList"]
|
||||
|
||||
if not datas:
|
||||
return True, None
|
||||
|
||||
latest_order = datas[0]
|
||||
return True, latest_order
|
||||
except Exception as e:
|
||||
logger.warning(f"获取持仓状态失败(第{attempt + 1}次尝试): {e}")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
|
||||
return False, None
|
||||
|
||||
|
||||
# ==================== Token管理器 ====================
|
||||
class TokenManager:
|
||||
"""Token管理器:负责获取和更新认证token"""
|
||||
|
||||
def __init__(self, api_client: WEEXApiClient, page: ChromiumPage):
|
||||
self.api_client = api_client
|
||||
self.page = page
|
||||
|
||||
def get_token(self) -> bool:
|
||||
"""从浏览器中获取U-TOKEN"""
|
||||
tab = self.page.new_tab()
|
||||
tab.listen.start("/user/security/getLanguageType")
|
||||
|
||||
try:
|
||||
for attempt in range(Config.MAX_RETRY_ATTEMPTS):
|
||||
try:
|
||||
tab.get(url=Config.TRADING_URL)
|
||||
res = tab.listen.wait(timeout=5)
|
||||
|
||||
if res.request.headers.get("U-TOKEN"):
|
||||
headers = res.request.headers
|
||||
self.api_client.update_headers(headers)
|
||||
logger.success("成功获取token")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"获取token尝试{attempt + 1}失败: {e}")
|
||||
if attempt < Config.MAX_RETRY_ATTEMPTS - 1:
|
||||
time.sleep(Config.RETRY_DELAY)
|
||||
|
||||
logger.error("获取token失败:已重试3次仍未成功")
|
||||
return False
|
||||
finally:
|
||||
tab.close()
|
||||
|
||||
|
||||
# ==================== 交易执行器 ====================
|
||||
class TradingExecutor:
|
||||
"""交易执行器:负责执行交易操作"""
|
||||
|
||||
def __init__(self, page: ChromiumPage, api_client: WEEXApiClient):
|
||||
self.page = page
|
||||
self.api_client = api_client
|
||||
|
||||
def navigate_to_trading_page(self) -> bool:
|
||||
"""导航到交易页面(选择市价)"""
|
||||
try:
|
||||
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
|
||||
time.sleep(1)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"导航到交易页面失败: {e}")
|
||||
return False
|
||||
|
||||
def close_all_positions(self) -> bool:
|
||||
"""平仓所有持仓(闪电平仓)"""
|
||||
try:
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
|
||||
time.sleep(1)
|
||||
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
|
||||
time.sleep(3)
|
||||
logger.success("成功执行平仓操作")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"平仓失败: {e}")
|
||||
return False
|
||||
|
||||
def open_long(self, amount: float) -> bool:
|
||||
"""开多"""
|
||||
try:
|
||||
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
|
||||
time.sleep(1)
|
||||
self.page.ele('x://*[contains(text(), "买入开多")]').click()
|
||||
logger.success(f"成功开多,金额:{amount}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"开多失败: {e}")
|
||||
return False
|
||||
|
||||
def open_short(self, amount: float) -> bool:
|
||||
"""开空"""
|
||||
try:
|
||||
self.page.ele('x://input[@placeholder="请输入数量"]').input(amount)
|
||||
time.sleep(1)
|
||||
self.page.ele('x://*[contains(text(), "卖出开空")]').click()
|
||||
logger.success(f"成功开空,金额:{amount}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"开空失败: {e}")
|
||||
return False
|
||||
|
||||
def execute_trade(self, direction: str, current_position: int, amount: float) -> bool:
|
||||
"""
|
||||
执行交易操作
|
||||
|
||||
Args:
|
||||
direction: 交易方向 "long" 或 "short"
|
||||
current_position: 当前持仓状态 -1:空, 0:无, 1:多
|
||||
amount: 交易金额
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
if not self.navigate_to_trading_page():
|
||||
return False
|
||||
|
||||
try:
|
||||
# 开多
|
||||
if direction == "long":
|
||||
if current_position == 0:
|
||||
logger.info(f"{datetime.datetime.now()},开多,金额:{amount}")
|
||||
return self.open_long(amount)
|
||||
elif current_position == -1:
|
||||
logger.info(f"{datetime.datetime.now()},反手平空做多,金额:{amount}")
|
||||
if self.close_all_positions():
|
||||
time.sleep(1)
|
||||
return self.open_long(amount)
|
||||
|
||||
# 开空
|
||||
elif direction == "short":
|
||||
if current_position == 0:
|
||||
logger.info(f"{datetime.datetime.now()},开空,金额:{amount}")
|
||||
return self.open_short(amount)
|
||||
elif current_position == 1:
|
||||
logger.info(f"{datetime.datetime.now()},反手平多做空,金额:{amount}")
|
||||
if self.close_all_positions():
|
||||
time.sleep(1)
|
||||
return self.open_short(amount)
|
||||
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"执行交易失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ==================== 持仓管理器 ====================
|
||||
class PositionManager:
|
||||
"""持仓管理器:负责持仓状态管理和止损"""
|
||||
|
||||
# 持仓状态常量
|
||||
POSITION_SHORT = -1 # 做空
|
||||
POSITION_NONE = 0 # 无持仓
|
||||
POSITION_LONG = 1 # 做多
|
||||
|
||||
def __init__(self, trading_executor: TradingExecutor):
|
||||
self.trading_executor = trading_executor
|
||||
self.current_position: int = self.POSITION_NONE
|
||||
self.position_data: Optional[Dict] = None
|
||||
|
||||
def update_position(self, position_data: Optional[Dict]) -> None:
|
||||
"""
|
||||
更新持仓状态
|
||||
|
||||
Args:
|
||||
position_data: 持仓数据字典
|
||||
"""
|
||||
self.position_data = position_data
|
||||
|
||||
if not position_data:
|
||||
self.current_position = self.POSITION_NONE
|
||||
return
|
||||
|
||||
direction = position_data.get("legacyOrderDirection")
|
||||
if direction == "OPEN_LONG":
|
||||
self.current_position = self.POSITION_LONG
|
||||
elif direction == "OPEN_SHORT":
|
||||
self.current_position = self.POSITION_SHORT
|
||||
else:
|
||||
self.current_position = self.POSITION_NONE
|
||||
|
||||
def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool:
|
||||
"""
|
||||
检查止损条件
|
||||
|
||||
Args:
|
||||
kline_1: 倒数第二根K线
|
||||
kline_2: 倒数第一根K线
|
||||
|
||||
Returns:
|
||||
是否执行了止损
|
||||
"""
|
||||
try:
|
||||
# 持多仓,连续两根阴线,平多
|
||||
if self.current_position == self.POSITION_LONG:
|
||||
if (KlineAnalyzer.is_bearish(kline_1) and
|
||||
KlineAnalyzer.is_bearish(kline_2)):
|
||||
logger.success(
|
||||
f"{datetime.datetime.now()},止损信号:连续两根阴线,平多"
|
||||
)
|
||||
if self.trading_executor.close_all_positions():
|
||||
self.current_position = self.POSITION_NONE
|
||||
return True
|
||||
|
||||
# 持空仓,连续两根阳线,平空
|
||||
elif self.current_position == self.POSITION_SHORT:
|
||||
if (KlineAnalyzer.is_bullish(kline_1) and
|
||||
KlineAnalyzer.is_bullish(kline_2)):
|
||||
logger.success(
|
||||
f"{datetime.datetime.now()},止损信号:连续两根阳线,平空"
|
||||
)
|
||||
if self.trading_executor.close_all_positions():
|
||||
self.current_position = self.POSITION_NONE
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"止损检查失败: {e}")
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# ==================== 消息发送器 ====================
|
||||
class MessageSender:
|
||||
"""消息发送器:负责发送钉钉消息"""
|
||||
|
||||
@staticmethod
|
||||
def send_dingtalk_message(message_content: str, is_error: bool = False) -> None:
|
||||
"""
|
||||
发送钉钉消息
|
||||
|
||||
Args:
|
||||
message_content: 消息内容
|
||||
is_error: 是否为错误消息(错误消息会发送多次)
|
||||
"""
|
||||
if is_error:
|
||||
prefix = "❌weex:"
|
||||
count = 15
|
||||
else:
|
||||
prefix = "🔔weex:"
|
||||
count = 1
|
||||
|
||||
for _ in range(count):
|
||||
send_dingtalk_message(f"{prefix}{message_content}")
|
||||
|
||||
@staticmethod
|
||||
def format_position_message(position_data: Optional[Dict],
|
||||
current_price: float,
|
||||
available_balance: float) -> str:
|
||||
"""
|
||||
格式化持仓信息消息
|
||||
|
||||
Args:
|
||||
position_data: 持仓数据
|
||||
current_price: 当前价格
|
||||
available_balance: 可用余额
|
||||
"""
|
||||
if position_data:
|
||||
# 提取持仓信息
|
||||
fill_size = float(position_data['fillSize']) # 持仓量,单位:ETH
|
||||
fill_value = float(position_data['fillValue']) # 成交名义价值 USDT
|
||||
open_avg_price = fill_value / fill_size if fill_size > 0 else 0 # 开仓均价
|
||||
|
||||
position_side = position_data.get('positionSide', '')
|
||||
legacy_direction = position_data.get('legacyOrderDirection', '')
|
||||
|
||||
# 确定方向
|
||||
if position_side == 'SHORT' or legacy_direction == 'OPEN_SHORT':
|
||||
direction = "空"
|
||||
elif position_side == 'LONG' or legacy_direction == 'OPEN_LONG':
|
||||
direction = "多"
|
||||
else:
|
||||
direction = "无"
|
||||
|
||||
# 计算浮动盈亏
|
||||
if direction in ["多", "空"] and open_avg_price > 0:
|
||||
if direction == "多":
|
||||
unrealized_pnl = fill_size * (current_price - open_avg_price)
|
||||
pnl_rate = (current_price - open_avg_price) / open_avg_price * 100
|
||||
else:
|
||||
unrealized_pnl = fill_size * (open_avg_price - current_price)
|
||||
pnl_rate = (open_avg_price - current_price) / open_avg_price * 100
|
||||
|
||||
pnl_str = f"{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)"
|
||||
else:
|
||||
pnl_str = "0.00 USDT"
|
||||
|
||||
# 当前持仓名义价值
|
||||
current_value = fill_size * current_price
|
||||
|
||||
return (
|
||||
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
|
||||
f"**持仓方向**:{direction}\n"
|
||||
f"**当前现价**:{current_price:.2f} USDT\n"
|
||||
f"**开仓均价**:{open_avg_price:.2f} USDT\n"
|
||||
f"**持仓数量(eth)**:{fill_size:.3f} ETH\n"
|
||||
f"**持仓数量(usdt)**:{fill_value / 100:.2f} USDT\n"
|
||||
f"**名义价值**:{current_value:.2f} USDT\n"
|
||||
f"**浮动盈亏**:{pnl_str}\n"
|
||||
f"**账户可用余额**:{available_balance:.2f} USDT"
|
||||
)
|
||||
else:
|
||||
return (
|
||||
"**【WEEX ETHUSDT 永续持仓监控】**\n\n"
|
||||
f"**持仓方向**:无\n"
|
||||
f"**当前现价**:{current_price:.2f} USDT\n"
|
||||
f"**账户可用余额**:{available_balance:.2f} USDT"
|
||||
)
|
||||
|
||||
|
||||
# ==================== 时间工具类 ====================
|
||||
class TimeUtils:
|
||||
"""时间工具类"""
|
||||
|
||||
@staticmethod
|
||||
def get_current_kline_timestamp() -> int:
|
||||
"""
|
||||
获取当前K线的时间戳(30分钟K线的整点或30分时刻,毫秒)
|
||||
|
||||
Returns:
|
||||
时间戳(毫秒)
|
||||
"""
|
||||
current_timestamp = time.time()
|
||||
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
|
||||
|
||||
# 计算距离当前时间最近的整点或30分时刻
|
||||
if current_datetime.minute < 30:
|
||||
target_datetime = current_datetime.replace(
|
||||
minute=0, second=0, microsecond=0
|
||||
)
|
||||
else:
|
||||
target_datetime = current_datetime.replace(
|
||||
minute=30, second=0, microsecond=0
|
||||
)
|
||||
|
||||
return int(target_datetime.timestamp()) * 1000
|
||||
|
||||
@staticmethod
|
||||
def get_formatted_time() -> str:
|
||||
"""获取格式化的当前时间字符串"""
|
||||
timestamp = time.time()
|
||||
local_time = time.localtime(timestamp)
|
||||
return time.strftime("%Y-%m-%d %H:%M:%S", local_time)
|
||||
|
||||
@staticmethod
|
||||
def get_progress_bar_value() -> int:
|
||||
"""获取进度条的当前值(0-29)"""
|
||||
current_time = time.localtime()
|
||||
current_minute = current_time.tm_min
|
||||
return current_minute if current_minute < 30 else current_minute - 30
|
||||
|
||||
|
||||
# ==================== 主交易类 ====================
|
||||
class WeexTransaction:
|
||||
"""WEEX自动交易主类"""
|
||||
|
||||
def __init__(self, tge_id: int):
|
||||
# 配置
|
||||
self.tge_id = tge_id
|
||||
self.tge_headers = {
|
||||
"Authorization": Config.TGE_AUTHORIZATION,
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# 组件初始化
|
||||
self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers)
|
||||
self.api_client = WEEXApiClient()
|
||||
self.position_manager = None # 需要在浏览器初始化后创建
|
||||
self.trading_executor = None # 需要在浏览器初始化后创建
|
||||
self.token_manager = None # 需要在浏览器初始化后创建
|
||||
|
||||
# 状态
|
||||
self.pbar: Optional[tqdm] = None
|
||||
self.last_kline_timestamp: Optional[int] = None
|
||||
self.kline_1: Optional[Dict] = None
|
||||
self.kline_2: Optional[Dict] = None
|
||||
self.kline_3: Optional[Dict] = None
|
||||
|
||||
def initialize(self) -> bool:
|
||||
"""初始化所有组件"""
|
||||
# 打开浏览器
|
||||
if not self.browser_manager.open_browser():
|
||||
logger.error("打开浏览器失败")
|
||||
MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True)
|
||||
return False
|
||||
|
||||
# 接管浏览器
|
||||
if not self.browser_manager.take_over_browser():
|
||||
logger.error("接管浏览器失败")
|
||||
MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True)
|
||||
return False
|
||||
|
||||
# 关闭多余标签页
|
||||
self.browser_manager.close_extra_tabs()
|
||||
|
||||
# 初始化需要page的组件
|
||||
page = self.browser_manager.page
|
||||
self.trading_executor = TradingExecutor(page, self.api_client)
|
||||
self.position_manager = PositionManager(self.trading_executor)
|
||||
self.token_manager = TokenManager(self.api_client, page)
|
||||
|
||||
# 打开交易页面
|
||||
page.get(url=Config.TRADING_URL)
|
||||
|
||||
# 初始化时先获取一次token
|
||||
if not self.token_manager.get_token():
|
||||
logger.warning("初始化时获取token失败,将在获取K线数据时重试")
|
||||
|
||||
# 初始化进度条
|
||||
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80)
|
||||
|
||||
logger.success("系统初始化完成")
|
||||
return True
|
||||
|
||||
def update_progress_bar(self) -> None:
|
||||
"""更新进度条"""
|
||||
if self.pbar:
|
||||
self.pbar.n = TimeUtils.get_progress_bar_value()
|
||||
self.pbar.refresh()
|
||||
|
||||
def fetch_and_update_kline(self) -> bool:
|
||||
"""获取并更新K线数据"""
|
||||
# 先获取token(获取K线数据需要token)
|
||||
if not self.token_manager.get_token():
|
||||
logger.warning("获取token失败,无法获取K线数据")
|
||||
return False
|
||||
|
||||
kline_data = self.api_client.get_kline_data()
|
||||
if not kline_data:
|
||||
logger.warning("获取K线数据失败")
|
||||
MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True)
|
||||
return False
|
||||
|
||||
# 排序并取最后三根
|
||||
sorted_data = sorted(kline_data, key=lambda x: x["id"])
|
||||
if len(sorted_data) < 3:
|
||||
logger.error("K线数据不足3根")
|
||||
return False
|
||||
|
||||
self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:]
|
||||
|
||||
# 检查是否是新K线
|
||||
current_kline_id = self.kline_3["id"]
|
||||
current_timestamp = TimeUtils.get_current_kline_timestamp()
|
||||
|
||||
# 判断K线时间是否匹配当前时间,并且是新K线
|
||||
if current_kline_id != current_timestamp:
|
||||
return False # K线时间不匹配
|
||||
|
||||
if self.last_kline_timestamp == current_timestamp:
|
||||
return False # 已经处理过这个K线
|
||||
|
||||
self.last_kline_timestamp = current_timestamp
|
||||
return True
|
||||
|
||||
def sync_position_status(self) -> bool:
|
||||
"""同步持仓状态"""
|
||||
success, position_data = self.api_client.get_position_status()
|
||||
if not success:
|
||||
logger.error("获取持仓状态失败")
|
||||
MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True)
|
||||
return False
|
||||
|
||||
self.position_manager.update_position(position_data)
|
||||
return True
|
||||
|
||||
def process_trading_logic(self) -> None:
|
||||
"""处理交易逻辑"""
|
||||
page = self.browser_manager.page
|
||||
|
||||
# 刷新token(因为可能已经过期,但这不是必须的,因为获取K线时已经获取过了)
|
||||
# 如果token有效,这里会快速返回;如果无效,会重新获取
|
||||
self.token_manager.get_token()
|
||||
|
||||
# 刷新页面
|
||||
page.get(url=Config.TRADING_URL)
|
||||
|
||||
# 同步持仓状态
|
||||
if not self.sync_position_status():
|
||||
return
|
||||
|
||||
# 检查止损
|
||||
try:
|
||||
self.position_manager.check_stop_loss(self.kline_1, self.kline_2)
|
||||
except Exception as e:
|
||||
logger.error(f"止损检查出错: {e}")
|
||||
MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True)
|
||||
return
|
||||
|
||||
# 检查交易信号
|
||||
direction, signal_type = KlineAnalyzer.check_engulfing_signal(
|
||||
prev=self.kline_1,
|
||||
curr=self.kline_2
|
||||
)
|
||||
|
||||
if direction:
|
||||
# 获取可用余额
|
||||
balance = self.api_client.get_available_balance()
|
||||
if balance is None:
|
||||
logger.error("获取可用余额失败")
|
||||
MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True)
|
||||
return
|
||||
|
||||
amount = balance / Config.POSITION_RATIO
|
||||
|
||||
# 执行交易
|
||||
if not self.trading_executor.execute_trade(
|
||||
direction=direction,
|
||||
current_position=self.position_manager.current_position,
|
||||
amount=amount
|
||||
):
|
||||
MessageSender.send_dingtalk_message(
|
||||
f"交易执行失败,方向:{direction}",
|
||||
is_error=True
|
||||
)
|
||||
return
|
||||
|
||||
# 更新持仓状态
|
||||
if direction == "long":
|
||||
self.position_manager.current_position = PositionManager.POSITION_LONG
|
||||
elif direction == "short":
|
||||
self.position_manager.current_position = PositionManager.POSITION_SHORT
|
||||
|
||||
# 发送交易消息
|
||||
MessageSender.send_dingtalk_message(
|
||||
f"信号:{direction},开仓金额:{amount:.2f}",
|
||||
is_error=False
|
||||
)
|
||||
|
||||
# 再次同步持仓状态(交易后)
|
||||
if not self.sync_position_status():
|
||||
return
|
||||
|
||||
# 发送持仓信息
|
||||
balance = self.api_client.get_available_balance()
|
||||
current_price = float(self.kline_3["close"])
|
||||
if balance is not None:
|
||||
position_info = self.position_manager.position_data
|
||||
msg = MessageSender.format_position_message(
|
||||
position_info,
|
||||
current_price,
|
||||
balance
|
||||
)
|
||||
MessageSender.send_dingtalk_message(msg, is_error=False)
|
||||
|
||||
def run(self) -> None:
|
||||
"""主运行循环"""
|
||||
# 初始化
|
||||
if not self.initialize():
|
||||
return
|
||||
|
||||
logger.info("系统初始化完成,开始运行...")
|
||||
|
||||
# 主循环
|
||||
while True:
|
||||
try:
|
||||
# 更新进度条
|
||||
self.update_progress_bar()
|
||||
|
||||
# 获取当前K线时间戳
|
||||
current_timestamp = TimeUtils.get_current_kline_timestamp()
|
||||
|
||||
# 检查是否已经处理过这个K线
|
||||
if self.last_kline_timestamp == current_timestamp:
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 获取并更新K线数据
|
||||
if not self.fetch_and_update_kline():
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
||||
# 处理交易逻辑
|
||||
self.process_trading_logic()
|
||||
|
||||
# 重置进度条
|
||||
if self.pbar:
|
||||
self.pbar.reset()
|
||||
|
||||
# 等待一段时间
|
||||
time.sleep(5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("收到中断信号,正在退出...")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"运行出错: {e}")
|
||||
MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True)
|
||||
time.sleep(10)
|
||||
|
||||
def action(self) -> None:
|
||||
"""兼容旧接口的方法名"""
|
||||
self.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
transaction = WeexTransaction(tge_id=146473)
|
||||
transaction.action()
|
||||
Reference in New Issue
Block a user