diff --git a/telegram/8619211027341.session b/telegram/8619211027341.session index b1bb19b..deddd3a 100644 Binary files a/telegram/8619211027341.session and b/telegram/8619211027341.session differ diff --git a/telegram/bot_session.session b/telegram/bot_session.session index 828b715..78e9332 100644 Binary files a/telegram/bot_session.session and b/telegram/bot_session.session differ diff --git a/telegram/sign.db b/telegram/sign.db index 7981048..22d2472 100644 Binary files a/telegram/sign.db and b/telegram/sign.db differ diff --git a/交易/websea-结构优化.py b/交易/websea-结构优化.py new file mode 100644 index 0000000..20f60ad --- /dev/null +++ b/交易/websea-结构优化.py @@ -0,0 +1,739 @@ +""" +WebSea ETH-USDT 永续合约自动交易系统 +优化版本:改进代码结构、模块化和可读性 +""" +import time +import datetime +from typing import Optional, Dict, List, Tuple +from tqdm import tqdm +from loguru import logger +from DrissionPage import ChromiumOptions, ChromiumPage +from curl_cffi import requests + +from 交易.tools import send_dingtalk_message + + +class KlineAnalyzer: + """K线分析器:负责K线数据分析和信号判断""" + + @staticmethod + def is_bullish(candle: Dict) -> bool: + """判断是否为阳线""" + return float(candle['close']) > float(candle['open']) + + @staticmethod + def is_bearish(candle: Dict) -> bool: + """判断是否为阴线""" + return float(candle['close']) < float(candle['open']) + + @staticmethod + def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]: + """ + 包住形态信号判定(仅30分钟K线): + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + + Returns: + (direction, signal_type): direction为"long"或"short",signal_type为信号类型 + """ + c_open, c_close = float(curr['open']), float(curr['close']) + p_open = float(prev['open']) + + # 前跌后涨包住 -> 做多 + if (KlineAnalyzer.is_bullish(curr) and + KlineAnalyzer.is_bearish(prev) and + int(c_close) >= int(p_open)): + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if (KlineAnalyzer.is_bearish(curr) and + KlineAnalyzer.is_bullish(prev) and + int(c_close) <= int(p_open)): + return "short", "bull_bear_engulf" + + return None, None + + +class BrowserManager: + """浏览器管理器:负责浏览器的启动、接管和标签页管理""" + + def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict): + self.tge_id = tge_id + self.tge_url = tge_url + self.tge_headers = tge_headers + self.tge_port: Optional[int] = None + self.page: Optional[ChromiumPage] = None + + def open_browser(self) -> bool: + """打开浏览器并获取端口""" + try: + response = requests.post( + f"{self.tge_url}/api/browser/start", + json={"envId": self.tge_id}, + headers=self.tge_headers, + timeout=10 + ) + self.tge_port = response.json()["data"]["port"] + return True + except Exception as e: + logger.error(f"打开浏览器失败: {e}") + return False + + def take_over_browser(self) -> bool: + """接管浏览器""" + if not self.tge_port: + logger.error("浏览器端口未设置") + return False + + try: + co = ChromiumOptions() + co.set_local_port(self.tge_port) + self.page = ChromiumPage(addr_or_opts=co) + self.page.set.window.max() + return True + except Exception as e: + logger.error(f"接管浏览器失败: {e}") + return False + + def close_extra_tabs(self) -> bool: + """关闭多余的标签页,只保留第一个""" + if not self.page: + return False + + try: + tabs = self.page.get_tabs() + for idx, tab in enumerate(tabs): + if idx == 0: + continue + tab.close() + return True + except Exception as e: + logger.warning(f"关闭多余标签页失败: {e}") + return False + + +class ApiClient: + """API客户端:负责与WebSea API的交互""" + + def __init__(self): + self.session = requests.Session() + self.headers: Dict = {} + self.cookies: Dict = {} + + def update_session(self, cookies: Dict, headers: Dict) -> None: + """更新session的cookies和headers""" + self.session.cookies.update(cookies) + self.session.headers.update(headers) + + def get_kline_data(self, symbol: str = 'ETH-USDT', period: str = '30min', + days: int = 10) -> List[Dict]: + """ + 获取K线数据 + + Args: + symbol: 交易对符号 + period: K线周期 + days: 获取最近N天的数据 + + Returns: + K线数据列表 + """ + now_ts = int(time.time()) + start_ts = int(time.time() - 86400 * days) + + params = { + 'symbol': symbol, + 'period': period, + 'start': str(start_ts), + 'end': str(now_ts), + } + + for attempt in range(3): + logger.info(f"获取最新K线数据:第{attempt + 1}次尝试...") + try: + response = self.session.get( + 'https://capi.websea.com/webApi/market/getKline', + params=params, + timeout=5 + ) + + result = response.json()["result"]['data'] + kline_data = [] + for data in result: + kline_data.append({ + 'id': int(data["id"]), + 'open': float(data["open"]), + 'high': float(data["high"]), + 'low': float(data["low"]), + 'close': float(data["close"]) + }) + return kline_data + except Exception as e: + logger.warning(f"获取K线数据失败: {e}") + if attempt < 2: + time.sleep(1) + + return [] + + def get_available_balance(self) -> Optional[float]: + """获取可用余额""" + for attempt in range(3): + try: + response = self.session.get( + 'https://capi.websea.com/webApi/asset/account', + timeout=5 + ) + return float(response.json()["result"]["available"]) + except Exception as e: + logger.warning(f"获取余额失败: {e}") + if attempt < 2: + time.sleep(1) + + return None + + def get_position_status(self) -> Tuple[bool, Optional[Dict]]: + """ + 获取持仓状态 + + Returns: + (success, position_data): success为是否成功,position_data为持仓信息 + """ + for attempt in range(3): + try: + response = self.session.get( + 'https://capi.websea.com/webApi/entrust/holdPosition', + timeout=5 + ) + resp_data = response.json() + + if not resp_data["result"]: + return True, {"type": 0, "position": None} + + position = resp_data["result"][0] + position_info = { + "type": position["type"], # 1: 多, 2: 空 + "position": position + } + return True, position_info + except Exception as e: + logger.warning(f"获取持仓状态失败: {e}") + if attempt < 2: + time.sleep(1) + + return False, None + + +class TokenManager: + """Token管理器:负责获取和更新认证token""" + + def __init__(self, api_client: ApiClient, page: ChromiumPage): + self.api_client = api_client + self.page = page + + def get_token(self) -> bool: + """从浏览器中获取token和cookies""" + try: + tab = self.page.new_tab() + tab.listen.start("api.websea.com/webApi/zendesk/url") + tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") + + res = tab.listen.wait() + + # 提取cookies + cookies = {} + for cookie in res.request.cookies: + if not cookie["name"].startswith(':'): + cookies[cookie["name"]] = cookie["value"] + + # 提取headers + headers = {} + if "token" in res.request.headers: + headers["token"] = res.request.headers["token"] + + tab.close() + + # 检查是否有shareToken + if cookies.get('shareToken'): + self.api_client.update_session(cookies, headers) + return True + else: + logger.warning("未获取到shareToken") + return False + except Exception as e: + logger.error(f"获取token失败: {e}") + return False + + +class TradingExecutor: + """交易执行器:负责执行交易操作""" + + def __init__(self, page: ChromiumPage, api_client: ApiClient): + self.page = page + self.api_client = api_client + + def navigate_to_trading_page(self) -> bool: + """导航到交易页面""" + try: + self.page.ele('x://*[normalize-space(text())= "市价"]', timeout=15).click() + time.sleep(1) + return True + except Exception as e: + logger.error(f"导航到交易页面失败: {e}") + return False + + def close_all_positions(self) -> bool: + """平仓所有持仓""" + try: + self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True) + time.sleep(1) + self.page.actions.click('x://*[normalize-space(text())= "市价全平"]') + time.sleep(3) + return True + except Exception as e: + logger.error(f"平仓失败: {e}") + return False + + def open_long(self, amount: float) -> bool: + """开多""" + try: + self.page.ele('x://*[@id="amountInput"]').input(amount) + time.sleep(1) + self.page.actions.click('x://*[normalize-space(text())= "买入/做多"]') + return True + except Exception as e: + logger.error(f"开多失败: {e}") + return False + + def open_short(self, amount: float) -> bool: + """开空""" + try: + self.page.ele('x://*[@id="amountInput"]').input(amount) + time.sleep(1) + self.page.actions.click('x://*[normalize-space(text())= "卖出/做空"]') + return True + except Exception as e: + logger.error(f"开空失败: {e}") + return False + + def execute_trade(self, direction: str, current_position: int, amount: float) -> bool: + """ + 执行交易操作 + + Args: + direction: 交易方向 "long" 或 "short" + current_position: 当前持仓状态 -1:空, 0:无, 1:多 + amount: 交易金额 + + Returns: + 是否成功 + """ + if not self.navigate_to_trading_page(): + return False + + try: + # 开多 + if direction == "long": + if current_position == 0: + logger.success(f"{datetime.datetime.now()},开多,金额:{amount}") + self.open_long(amount) + elif current_position == -1: + logger.success(f"{datetime.datetime.now()},反手平空做多,金额:{amount}") + self.close_all_positions() + self.open_long(amount) + + # 开空 + elif direction == "short": + if current_position == 0: + logger.success(f"{datetime.datetime.now()},开空,金额:{amount}") + self.open_short(amount) + elif current_position == 1: + logger.success(f"{datetime.datetime.now()},反手平多做空,金额:{amount}") + self.close_all_positions() + self.open_short(amount) + + return True + except Exception as e: + logger.error(f"执行交易失败: {e}") + return False + + +class PositionManager: + """持仓管理器:负责持仓状态管理和止损""" + + def __init__(self, trading_executor: TradingExecutor): + self.trading_executor = trading_executor + self.current_position: int = 0 # -1:空, 0:无, 1:多 + + def update_position(self, position_type: int) -> None: + """更新持仓状态""" + self.current_position = position_type + + def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool: + """ + 检查止损条件 + + Args: + kline_1: 倒数第二根K线 + kline_2: 倒数第一根K线 + + Returns: + 是否执行了止损 + """ + try: + # 持多仓,连续两根阴线,平多 + if self.current_position == 1: + if (KlineAnalyzer.is_bearish(kline_1) and + KlineAnalyzer.is_bearish(kline_2)): + logger.success( + f"{datetime.datetime.now()},止损信号:连续两根阴线,平多" + ) + self.trading_executor.close_all_positions() + self.current_position = 0 + return True + + # 持空仓,连续两根阳线,平空 + elif self.current_position == -1: + if (KlineAnalyzer.is_bullish(kline_1) and + KlineAnalyzer.is_bullish(kline_2)): + logger.success( + f"{datetime.datetime.now()},止损信号:连续两根阳线,平空" + ) + self.trading_executor.close_all_positions() + self.current_position = 0 + return True + except Exception as e: + logger.error(f"止损检查失败: {e}") + + return False + + +class MessageSender: + """消息发送器:负责发送钉钉消息""" + + @staticmethod + def send_dingtalk_message(message_content: str, is_error: bool = False) -> None: + """ + 发送钉钉消息 + + Args: + message_content: 消息内容 + is_error: 是否为错误消息(错误消息会发送多次) + """ + if is_error: + prefix = "❌websea:" + count = 15 + else: + prefix = "🔔websea:" + count = 1 + + for _ in range(count): + send_dingtalk_message(f"{prefix}{message_content}") + + @staticmethod + def format_position_message(position_data: Optional[Dict], + available_balance: float) -> str: + """格式化持仓信息消息""" + if position_data and position_data.get("position"): + pos = position_data["position"] + direction = "多" if int(pos['type']) == 1 else "空" + contracts = int(pos['number']) + eth_amount = float(pos['numberConvert']) + open_price = float(pos['openPriceAvg']) + current_price = float(pos['markPrice']) + unrealized_pnl = float(pos['unProfitLoss']) + pnl_rate = float(pos['profitRate']) + + return ( + f"【WebSea ETH-USDT 永续持仓】\n" + f"持仓方向:{direction}\n" + f"持仓张数:{contracts} 张\n" + f"持仓数量(eth):{eth_amount:.3f} ETH\n" + f"持仓数量(usdt):{float(pos['numberConvertU']) / 100:.3f} USDT\n" + f"开仓均价:{open_price:.2f} USDT\n" + f"标记现价:{current_price:.2f} USDT\n" + f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n" + f"账户可用:{available_balance:.2f} USDT" + ) + else: + return ( + f"【WebSea ETH-USDT 永续持仓】\n" + f"持仓方向:无\n" + f"账户可用:{available_balance:.2f} USDT" + ) + + +class TimeUtils: + """时间工具类""" + + @staticmethod + def get_current_kline_timestamp() -> int: + """ + 获取当前K线的时间戳(30分钟K线的整点或30分时刻) + + Returns: + 时间戳 + """ + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + # 计算距离当前时间最近的整点或30分时刻 + if current_datetime.minute < 30: + target_datetime = current_datetime.replace( + minute=0, second=0, microsecond=0 + ) + else: + target_datetime = current_datetime.replace( + minute=30, second=0, microsecond=0 + ) + + return int(target_datetime.timestamp()) + + @staticmethod + def get_formatted_time() -> str: + """获取格式化的当前时间字符串""" + timestamp = time.time() + local_time = time.localtime(timestamp) + return time.strftime("%Y-%m-%d %H:%M:%S", local_time) + + @staticmethod + def get_progress_bar_value() -> int: + """获取进度条的当前值(0-29)""" + current_time = time.localtime() + current_minute = current_time.tm_min + return current_minute if current_minute < 30 else current_minute - 30 + + +class WeexTransaction: + """WebSea自动交易主类""" + + def __init__(self, tge_id: int): + # 配置 + self.tge_id = tge_id + self.tge_url = "http://127.0.0.1:50326" + self.tge_headers = { + "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91", + "Content-Type": "application/json" + } + self.trading_url = "https://www.websea.com/zh-CN/futures/ETH-USDT" + + # 组件初始化 + self.browser_manager = BrowserManager(self.tge_id, self.tge_url, self.tge_headers) + self.api_client = ApiClient() + self.position_manager = None # 需要在浏览器初始化后创建 + self.trading_executor = None # 需要在浏览器初始化后创建 + self.token_manager = None # 需要在浏览器初始化后创建 + + # 状态 + self.pbar: Optional[tqdm] = None + self.last_kline_id: Optional[int] = None + self.kline_1: Optional[Dict] = None + self.kline_2: Optional[Dict] = None + self.kline_3: Optional[Dict] = None + + def initialize(self) -> bool: + """初始化所有组件""" + # 打开浏览器 + if not self.browser_manager.open_browser(): + logger.error("打开浏览器失败") + MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True) + return False + + # 接管浏览器 + if not self.browser_manager.take_over_browser(): + logger.error("接管浏览器失败") + MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True) + return False + + # 关闭多余标签页 + self.browser_manager.close_extra_tabs() + + # 初始化需要page的组件 + page = self.browser_manager.page + self.trading_executor = TradingExecutor(page, self.api_client) + self.position_manager = PositionManager(self.trading_executor) + self.token_manager = TokenManager(self.api_client, page) + + # 打开交易页面 + page.get(url=self.trading_url) + + # 初始化进度条 + self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) + + return True + + def update_progress_bar(self) -> None: + """更新进度条""" + if self.pbar: + self.pbar.n = TimeUtils.get_progress_bar_value() + self.pbar.refresh() + + def fetch_and_update_kline(self) -> bool: + """获取并更新K线数据""" + kline_data = self.api_client.get_kline_data() + if not kline_data: + logger.warning("获取K线数据失败") + MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True) + return False + + # 排序并取最后三根 + sorted_data = sorted(kline_data, key=lambda x: x["id"]) + self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:] + + # 检查是否是新K线 + current_kline_id = self.kline_3["id"] + if self.last_kline_id == current_kline_id: + return False # 没有新K线 + + self.last_kline_id = current_kline_id + return True + + def sync_position_status(self) -> bool: + """同步持仓状态""" + success, position_info = self.api_client.get_position_status() + if not success: + logger.error("获取持仓状态失败") + MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True) + return False + + if position_info: + pos_type = position_info.get("type", 0) + if pos_type == 1: + self.position_manager.current_position = 1 + elif pos_type == 2: + self.position_manager.current_position = -1 + else: + self.position_manager.current_position = 0 + + return True + + def process_trading_logic(self) -> None: + """处理交易逻辑""" + page = self.browser_manager.page + + # 刷新页面 + page.get(url=self.trading_url) + + # 获取token + if not self.token_manager.get_token(): + logger.error("获取token失败") + MessageSender.send_dingtalk_message("获取token失败", is_error=True) + return + + # 同步持仓状态 + if not self.sync_position_status(): + return + + # 检查止损 + try: + self.position_manager.check_stop_loss(self.kline_1, self.kline_2) + except Exception as e: + logger.error(f"止损检查出错: {e}") + MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True) + return + + # 检查交易信号 + direction, signal_type = KlineAnalyzer.check_engulfing_signal( + prev=self.kline_1, + curr=self.kline_2 + ) + + if direction: + # 获取可用余额 + balance = self.api_client.get_available_balance() + if balance is None: + logger.error("获取可用余额失败") + MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True) + return + + amount = balance / 100 + + # 执行交易 + if not self.trading_executor.execute_trade( + direction=direction, + current_position=self.position_manager.current_position, + amount=amount + ): + MessageSender.send_dingtalk_message( + f"交易执行失败,方向:{direction}", + is_error=True + ) + return + + # 更新持仓状态 + if direction == "long": + self.position_manager.current_position = 1 + elif direction == "short": + self.position_manager.current_position = -1 + + # 发送交易消息 + MessageSender.send_dingtalk_message( + f"信号:{direction},开仓金额:{amount:.2f}", + is_error=False + ) + + # 再次同步持仓状态(交易后) + if not self.sync_position_status(): + return + + # 发送持仓信息 + balance = self.api_client.get_available_balance() + if balance is not None: + success, position_info = self.api_client.get_position_status() + if success: + msg = MessageSender.format_position_message(position_info, balance) + MessageSender.send_dingtalk_message(msg, is_error=False) + + def run(self) -> None: + """主运行循环""" + # 初始化 + if not self.initialize(): + return + + logger.info("系统初始化完成,开始运行...") + + # 主循环 + while True: + try: + # 更新进度条 + self.update_progress_bar() + + # 获取当前K线时间戳 + current_timestamp = TimeUtils.get_current_kline_timestamp() + + # 检查是否是新K线(避免重复处理) + if self.kline_3 and self.kline_3["id"] == current_timestamp: + time.sleep(10) + continue + + # 获取并更新K线数据 + if not self.fetch_and_update_kline(): + time.sleep(10) + continue + + # 处理交易逻辑 + self.process_trading_logic() + + # 重置进度条 + if self.pbar: + self.pbar.reset() + + # 等待一段时间 + time.sleep(5) + + except KeyboardInterrupt: + logger.info("收到中断信号,正在退出...") + break + except Exception as e: + logger.error(f"运行出错: {e}") + MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True) + time.sleep(10) + + def action(self) -> None: + """兼容旧接口的方法名""" + self.run() + + +if __name__ == '__main__': + transaction = WeexTransaction(tge_id=191303) + transaction.action() + diff --git a/交易/weex-结构优化.py b/交易/weex-结构优化.py new file mode 100644 index 0000000..9783368 --- /dev/null +++ b/交易/weex-结构优化.py @@ -0,0 +1,885 @@ +""" +WEEX ETH-USDT 永续合约自动交易系统 +优化版本:改进代码结构、模块化和可读性 +""" +import time +import datetime +from typing import Optional, Dict, List, Tuple +from tqdm import tqdm +from loguru import logger +from DrissionPage import ChromiumOptions, ChromiumPage +from curl_cffi import requests + +from 交易.tools import send_dingtalk_message + + +# ==================== 配置常量 ==================== +class Config: + """配置类:集中管理所有配置常量""" + # TGE浏览器配置 + TGE_URL = "http://127.0.0.1:50326" + TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91" + + # WEEX API配置 + CONTRACT_ID = "10000002" + PRODUCT_CODE = "cmt_ethusdt" + KLINE_TYPE = "MINUTE_30" + KLINE_LIMIT = 300 + + # 交易页面URL + TRADING_URL = "https://www.weeaxs.site/zh-CN/futures/ETH-USDT" + + # 交易配置 + POSITION_RATIO = 100 # 开仓金额比例(余额的1/100) + + # 重试配置 + MAX_RETRY_ATTEMPTS = 3 + RETRY_DELAY = 1 # 秒 + + +# ==================== K线分析器 ==================== +class KlineAnalyzer: + """K线分析器:负责K线数据分析和信号判断""" + + @staticmethod + def is_bullish(candle: Dict) -> bool: + """判断是否为阳线""" + return float(candle['close']) > float(candle['open']) + + @staticmethod + def is_bearish(candle: Dict) -> bool: + """判断是否为阴线""" + return float(candle['close']) < float(candle['open']) + + @staticmethod + def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]: + """ + 包住形态信号判定(30分钟K线): + - 前跌后涨包住 -> 做多 + - 前涨后跌包住 -> 做空 + + Returns: + (direction, signal_type): direction为"long"或"short",signal_type为信号类型 + """ + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + # 前跌后涨包住 -> 做多 + if (KlineAnalyzer.is_bullish(curr) and + KlineAnalyzer.is_bearish(prev) and + int(c_open) <= int(p_close) and + int(c_close) >= int(p_open)): + return "long", "bear_bull_engulf" + + # 前涨后跌包住 -> 做空 + if (KlineAnalyzer.is_bearish(curr) and + KlineAnalyzer.is_bullish(prev) and + int(c_open) >= int(p_close) and + int(c_close) <= int(p_open)): + return "short", "bull_bear_engulf" + + return None, None + + +# ==================== 浏览器管理器 ==================== +class BrowserManager: + """浏览器管理器:负责浏览器的启动、接管和标签页管理""" + + def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict): + self.tge_id = tge_id + self.tge_url = tge_url + self.tge_headers = tge_headers + self.tge_port: Optional[int] = None + self.page: Optional[ChromiumPage] = None + + def open_browser(self) -> bool: + """打开浏览器并获取端口""" + try: + response = requests.post( + f"{self.tge_url}/api/browser/start", + json={"envId": self.tge_id}, + headers=self.tge_headers, + timeout=10 + ) + self.tge_port = response.json()["data"]["port"] + logger.success(f"成功打开浏览器,端口:{self.tge_port}") + return True + except Exception as e: + logger.error(f"打开浏览器失败: {e}") + return False + + def take_over_browser(self) -> bool: + """接管浏览器""" + if not self.tge_port: + logger.error("浏览器端口未设置") + return False + + try: + co = ChromiumOptions() + co.set_local_port(self.tge_port) + self.page = ChromiumPage(addr_or_opts=co) + self.page.set.window.max() + logger.success("成功接管浏览器") + return True + except Exception as e: + logger.error(f"接管浏览器失败: {e}") + return False + + def close_extra_tabs(self) -> bool: + """关闭多余的标签页,只保留第一个""" + if not self.page: + return False + + try: + tabs = self.page.get_tabs() + closed_count = 0 + for idx, tab in enumerate(tabs): + if idx == 0: + continue + tab.close() + closed_count += 1 + if closed_count > 0: + logger.info(f"已关闭{closed_count}个多余标签页") + return True + except Exception as e: + logger.warning(f"关闭多余标签页失败: {e}") + return False + + +# ==================== WEEX API客户端 ==================== +class WEEXApiClient: + """WEEX API客户端:负责与WEEX API的交互""" + + def __init__(self): + self.session = requests.Session() + self.headers: Optional[Dict] = None + + def update_headers(self, headers: Dict) -> None: + """更新session的headers""" + if not self.headers: + self.session.headers = headers + else: + self.session.headers.update(headers) + self.headers = headers + + def get_kline_data(self, contract_id: str = Config.CONTRACT_ID, + product_code: str = Config.PRODUCT_CODE, + kline_type: str = Config.KLINE_TYPE, + limit: int = Config.KLINE_LIMIT) -> List[Dict]: + """ + 获取K线数据 + + Args: + contract_id: 合约ID + product_code: 产品代码 + kline_type: K线类型 + limit: 返回数量限制 + + Returns: + K线数据列表 + """ + params = { + 'contractId': contract_id, + 'productCode': product_code, + 'priceType': 'LAST_PRICE', + 'klineType': kline_type, + 'limit': str(limit), + 'timeZone': 'string', + 'languageType': '1', + 'sign': 'SIGN', + } + + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + logger.info(f"获取最新K线数据:第{attempt + 1}/{Config.MAX_RETRY_ATTEMPTS}次尝试...") + try: + response = self.session.get( + 'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2', + params=params, + timeout=15 + ) + + # 检查响应状态码 + if response.status_code != 200: + logger.warning(f"获取K线数据失败,HTTP状态码:{response.status_code}") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + continue + + # 尝试解析JSON + try: + response_data = response.json() + except Exception as json_error: + logger.warning(f"JSON解析失败,响应内容:{response.text[:200]}...") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + continue + + # 检查数据结构 + if "data" not in response_data or "dataList" not in response_data["data"]: + logger.warning(f"响应数据格式不正确:{response_data}") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + continue + + result = response_data["data"]["dataList"] + kline_data = [] + for item in result: + # WEEX API返回格式: [close, high, low, open, id] + kline_data.append({ + 'id': int(item[4]), + 'open': float(item[3]), + 'high': float(item[1]), + 'low': float(item[2]), + 'close': float(item[0]) + }) + logger.success(f"成功获取{len(kline_data)}条K线数据") + return kline_data + except Exception as e: + logger.warning(f"获取K线数据失败(第{attempt + 1}次尝试): {e}") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + + logger.error("获取K线数据失败:已重试3次仍未成功") + return [] + + def get_available_balance(self) -> Optional[float]: + """获取可用余额""" + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + response = self.session.post( + 'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new', + timeout=15 + ) + balance = float(response.json()["data"]["newContract"]["balanceList"][0]["available"]) + logger.success(f"成功获取可用余额: {balance}") + return balance + except Exception as e: + logger.warning(f"获取余额失败(第{attempt + 1}次尝试): {e}") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + + logger.error("获取余额失败:已重试3次仍未成功") + return None + + def get_position_status(self) -> Tuple[bool, Optional[Dict]]: + """ + 获取持仓状态 + + Returns: + (success, position_data): success为是否成功,position_data为持仓信息 + """ + json_data = { + 'filterContractIdList': [10000002], + 'limit': 100, + 'languageType': 0, + 'sign': 'SIGN', + 'timeZone': 'string', + } + + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + response = self.session.post( + 'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage', + json=json_data, + timeout=15 + ) + + datas = response.json()["data"]["dataList"] + + if not datas: + return True, None + + latest_order = datas[0] + return True, latest_order + except Exception as e: + logger.warning(f"获取持仓状态失败(第{attempt + 1}次尝试): {e}") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + + return False, None + + +# ==================== Token管理器 ==================== +class TokenManager: + """Token管理器:负责获取和更新认证token""" + + def __init__(self, api_client: WEEXApiClient, page: ChromiumPage): + self.api_client = api_client + self.page = page + + def get_token(self) -> bool: + """从浏览器中获取U-TOKEN""" + tab = self.page.new_tab() + tab.listen.start("/user/security/getLanguageType") + + try: + for attempt in range(Config.MAX_RETRY_ATTEMPTS): + try: + tab.get(url=Config.TRADING_URL) + res = tab.listen.wait(timeout=5) + + if res.request.headers.get("U-TOKEN"): + headers = res.request.headers + self.api_client.update_headers(headers) + logger.success("成功获取token") + return True + except Exception as e: + logger.warning(f"获取token尝试{attempt + 1}失败: {e}") + if attempt < Config.MAX_RETRY_ATTEMPTS - 1: + time.sleep(Config.RETRY_DELAY) + + logger.error("获取token失败:已重试3次仍未成功") + return False + finally: + tab.close() + + +# ==================== 交易执行器 ==================== +class TradingExecutor: + """交易执行器:负责执行交易操作""" + + def __init__(self, page: ChromiumPage, api_client: WEEXApiClient): + self.page = page + self.api_client = api_client + + def navigate_to_trading_page(self) -> bool: + """导航到交易页面(选择市价)""" + try: + self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click() + time.sleep(1) + return True + except Exception as e: + logger.error(f"导航到交易页面失败: {e}") + return False + + def close_all_positions(self) -> bool: + """平仓所有持仓(闪电平仓)""" + try: + self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True) + time.sleep(1) + self.page.ele('x://*[contains(text(), "闪电平仓")]').click() + time.sleep(3) + logger.success("成功执行平仓操作") + return True + except Exception as e: + logger.error(f"平仓失败: {e}") + return False + + def open_long(self, amount: float) -> bool: + """开多""" + try: + self.page.ele('x://input[@placeholder="请输入数量"]').input(amount) + time.sleep(1) + self.page.ele('x://*[contains(text(), "买入开多")]').click() + logger.success(f"成功开多,金额:{amount}") + return True + except Exception as e: + logger.error(f"开多失败: {e}") + return False + + def open_short(self, amount: float) -> bool: + """开空""" + try: + self.page.ele('x://input[@placeholder="请输入数量"]').input(amount) + time.sleep(1) + self.page.ele('x://*[contains(text(), "卖出开空")]').click() + logger.success(f"成功开空,金额:{amount}") + return True + except Exception as e: + logger.error(f"开空失败: {e}") + return False + + def execute_trade(self, direction: str, current_position: int, amount: float) -> bool: + """ + 执行交易操作 + + Args: + direction: 交易方向 "long" 或 "short" + current_position: 当前持仓状态 -1:空, 0:无, 1:多 + amount: 交易金额 + + Returns: + 是否成功 + """ + if not self.navigate_to_trading_page(): + return False + + try: + # 开多 + if direction == "long": + if current_position == 0: + logger.info(f"{datetime.datetime.now()},开多,金额:{amount}") + return self.open_long(amount) + elif current_position == -1: + logger.info(f"{datetime.datetime.now()},反手平空做多,金额:{amount}") + if self.close_all_positions(): + time.sleep(1) + return self.open_long(amount) + + # 开空 + elif direction == "short": + if current_position == 0: + logger.info(f"{datetime.datetime.now()},开空,金额:{amount}") + return self.open_short(amount) + elif current_position == 1: + logger.info(f"{datetime.datetime.now()},反手平多做空,金额:{amount}") + if self.close_all_positions(): + time.sleep(1) + return self.open_short(amount) + + return False + except Exception as e: + logger.error(f"执行交易失败: {e}") + return False + + +# ==================== 持仓管理器 ==================== +class PositionManager: + """持仓管理器:负责持仓状态管理和止损""" + + # 持仓状态常量 + POSITION_SHORT = -1 # 做空 + POSITION_NONE = 0 # 无持仓 + POSITION_LONG = 1 # 做多 + + def __init__(self, trading_executor: TradingExecutor): + self.trading_executor = trading_executor + self.current_position: int = self.POSITION_NONE + self.position_data: Optional[Dict] = None + + def update_position(self, position_data: Optional[Dict]) -> None: + """ + 更新持仓状态 + + Args: + position_data: 持仓数据字典 + """ + self.position_data = position_data + + if not position_data: + self.current_position = self.POSITION_NONE + return + + direction = position_data.get("legacyOrderDirection") + if direction == "OPEN_LONG": + self.current_position = self.POSITION_LONG + elif direction == "OPEN_SHORT": + self.current_position = self.POSITION_SHORT + else: + self.current_position = self.POSITION_NONE + + def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool: + """ + 检查止损条件 + + Args: + kline_1: 倒数第二根K线 + kline_2: 倒数第一根K线 + + Returns: + 是否执行了止损 + """ + try: + # 持多仓,连续两根阴线,平多 + if self.current_position == self.POSITION_LONG: + if (KlineAnalyzer.is_bearish(kline_1) and + KlineAnalyzer.is_bearish(kline_2)): + logger.success( + f"{datetime.datetime.now()},止损信号:连续两根阴线,平多" + ) + if self.trading_executor.close_all_positions(): + self.current_position = self.POSITION_NONE + return True + + # 持空仓,连续两根阳线,平空 + elif self.current_position == self.POSITION_SHORT: + if (KlineAnalyzer.is_bullish(kline_1) and + KlineAnalyzer.is_bullish(kline_2)): + logger.success( + f"{datetime.datetime.now()},止损信号:连续两根阳线,平空" + ) + if self.trading_executor.close_all_positions(): + self.current_position = self.POSITION_NONE + return True + except Exception as e: + logger.error(f"止损检查失败: {e}") + + return False + + +# ==================== 消息发送器 ==================== +class MessageSender: + """消息发送器:负责发送钉钉消息""" + + @staticmethod + def send_dingtalk_message(message_content: str, is_error: bool = False) -> None: + """ + 发送钉钉消息 + + Args: + message_content: 消息内容 + is_error: 是否为错误消息(错误消息会发送多次) + """ + if is_error: + prefix = "❌weex:" + count = 15 + else: + prefix = "🔔weex:" + count = 1 + + for _ in range(count): + send_dingtalk_message(f"{prefix}{message_content}") + + @staticmethod + def format_position_message(position_data: Optional[Dict], + current_price: float, + available_balance: float) -> str: + """ + 格式化持仓信息消息 + + Args: + position_data: 持仓数据 + current_price: 当前价格 + available_balance: 可用余额 + """ + if position_data: + # 提取持仓信息 + fill_size = float(position_data['fillSize']) # 持仓量,单位:ETH + fill_value = float(position_data['fillValue']) # 成交名义价值 USDT + open_avg_price = fill_value / fill_size if fill_size > 0 else 0 # 开仓均价 + + position_side = position_data.get('positionSide', '') + legacy_direction = position_data.get('legacyOrderDirection', '') + + # 确定方向 + if position_side == 'SHORT' or legacy_direction == 'OPEN_SHORT': + direction = "空" + elif position_side == 'LONG' or legacy_direction == 'OPEN_LONG': + direction = "多" + else: + direction = "无" + + # 计算浮动盈亏 + if direction in ["多", "空"] and open_avg_price > 0: + if direction == "多": + unrealized_pnl = fill_size * (current_price - open_avg_price) + pnl_rate = (current_price - open_avg_price) / open_avg_price * 100 + else: + unrealized_pnl = fill_size * (open_avg_price - current_price) + pnl_rate = (open_avg_price - current_price) / open_avg_price * 100 + + pnl_str = f"{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)" + else: + pnl_str = "0.00 USDT" + + # 当前持仓名义价值 + current_value = fill_size * current_price + + return ( + "**【WEEX ETHUSDT 永续持仓监控】**\n\n" + f"**持仓方向**:{direction}\n" + f"**当前现价**:{current_price:.2f} USDT\n" + f"**开仓均价**:{open_avg_price:.2f} USDT\n" + f"**持仓数量(eth)**:{fill_size:.3f} ETH\n" + f"**持仓数量(usdt)**:{fill_value / 100:.2f} USDT\n" + f"**名义价值**:{current_value:.2f} USDT\n" + f"**浮动盈亏**:{pnl_str}\n" + f"**账户可用余额**:{available_balance:.2f} USDT" + ) + else: + return ( + "**【WEEX ETHUSDT 永续持仓监控】**\n\n" + f"**持仓方向**:无\n" + f"**当前现价**:{current_price:.2f} USDT\n" + f"**账户可用余额**:{available_balance:.2f} USDT" + ) + + +# ==================== 时间工具类 ==================== +class TimeUtils: + """时间工具类""" + + @staticmethod + def get_current_kline_timestamp() -> int: + """ + 获取当前K线的时间戳(30分钟K线的整点或30分时刻,毫秒) + + Returns: + 时间戳(毫秒) + """ + current_timestamp = time.time() + current_datetime = datetime.datetime.fromtimestamp(current_timestamp) + + # 计算距离当前时间最近的整点或30分时刻 + if current_datetime.minute < 30: + target_datetime = current_datetime.replace( + minute=0, second=0, microsecond=0 + ) + else: + target_datetime = current_datetime.replace( + minute=30, second=0, microsecond=0 + ) + + return int(target_datetime.timestamp()) * 1000 + + @staticmethod + def get_formatted_time() -> str: + """获取格式化的当前时间字符串""" + timestamp = time.time() + local_time = time.localtime(timestamp) + return time.strftime("%Y-%m-%d %H:%M:%S", local_time) + + @staticmethod + def get_progress_bar_value() -> int: + """获取进度条的当前值(0-29)""" + current_time = time.localtime() + current_minute = current_time.tm_min + return current_minute if current_minute < 30 else current_minute - 30 + + +# ==================== 主交易类 ==================== +class WeexTransaction: + """WEEX自动交易主类""" + + def __init__(self, tge_id: int): + # 配置 + self.tge_id = tge_id + self.tge_headers = { + "Authorization": Config.TGE_AUTHORIZATION, + "Content-Type": "application/json" + } + + # 组件初始化 + self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers) + self.api_client = WEEXApiClient() + self.position_manager = None # 需要在浏览器初始化后创建 + self.trading_executor = None # 需要在浏览器初始化后创建 + self.token_manager = None # 需要在浏览器初始化后创建 + + # 状态 + self.pbar: Optional[tqdm] = None + self.last_kline_timestamp: Optional[int] = None + self.kline_1: Optional[Dict] = None + self.kline_2: Optional[Dict] = None + self.kline_3: Optional[Dict] = None + + def initialize(self) -> bool: + """初始化所有组件""" + # 打开浏览器 + if not self.browser_manager.open_browser(): + logger.error("打开浏览器失败") + MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True) + return False + + # 接管浏览器 + if not self.browser_manager.take_over_browser(): + logger.error("接管浏览器失败") + MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True) + return False + + # 关闭多余标签页 + self.browser_manager.close_extra_tabs() + + # 初始化需要page的组件 + page = self.browser_manager.page + self.trading_executor = TradingExecutor(page, self.api_client) + self.position_manager = PositionManager(self.trading_executor) + self.token_manager = TokenManager(self.api_client, page) + + # 打开交易页面 + page.get(url=Config.TRADING_URL) + + # 初始化时先获取一次token + if not self.token_manager.get_token(): + logger.warning("初始化时获取token失败,将在获取K线数据时重试") + + # 初始化进度条 + self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) + + logger.success("系统初始化完成") + return True + + def update_progress_bar(self) -> None: + """更新进度条""" + if self.pbar: + self.pbar.n = TimeUtils.get_progress_bar_value() + self.pbar.refresh() + + def fetch_and_update_kline(self) -> bool: + """获取并更新K线数据""" + # 先获取token(获取K线数据需要token) + if not self.token_manager.get_token(): + logger.warning("获取token失败,无法获取K线数据") + return False + + kline_data = self.api_client.get_kline_data() + if not kline_data: + logger.warning("获取K线数据失败") + MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True) + return False + + # 排序并取最后三根 + sorted_data = sorted(kline_data, key=lambda x: x["id"]) + if len(sorted_data) < 3: + logger.error("K线数据不足3根") + return False + + self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:] + + # 检查是否是新K线 + current_kline_id = self.kline_3["id"] + current_timestamp = TimeUtils.get_current_kline_timestamp() + + # 判断K线时间是否匹配当前时间,并且是新K线 + if current_kline_id != current_timestamp: + return False # K线时间不匹配 + + if self.last_kline_timestamp == current_timestamp: + return False # 已经处理过这个K线 + + self.last_kline_timestamp = current_timestamp + return True + + def sync_position_status(self) -> bool: + """同步持仓状态""" + success, position_data = self.api_client.get_position_status() + if not success: + logger.error("获取持仓状态失败") + MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True) + return False + + self.position_manager.update_position(position_data) + return True + + def process_trading_logic(self) -> None: + """处理交易逻辑""" + page = self.browser_manager.page + + # 刷新token(因为可能已经过期,但这不是必须的,因为获取K线时已经获取过了) + # 如果token有效,这里会快速返回;如果无效,会重新获取 + self.token_manager.get_token() + + # 刷新页面 + page.get(url=Config.TRADING_URL) + + # 同步持仓状态 + if not self.sync_position_status(): + return + + # 检查止损 + try: + self.position_manager.check_stop_loss(self.kline_1, self.kline_2) + except Exception as e: + logger.error(f"止损检查出错: {e}") + MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True) + return + + # 检查交易信号 + direction, signal_type = KlineAnalyzer.check_engulfing_signal( + prev=self.kline_1, + curr=self.kline_2 + ) + + if direction: + # 获取可用余额 + balance = self.api_client.get_available_balance() + if balance is None: + logger.error("获取可用余额失败") + MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True) + return + + amount = balance / Config.POSITION_RATIO + + # 执行交易 + if not self.trading_executor.execute_trade( + direction=direction, + current_position=self.position_manager.current_position, + amount=amount + ): + MessageSender.send_dingtalk_message( + f"交易执行失败,方向:{direction}", + is_error=True + ) + return + + # 更新持仓状态 + if direction == "long": + self.position_manager.current_position = PositionManager.POSITION_LONG + elif direction == "short": + self.position_manager.current_position = PositionManager.POSITION_SHORT + + # 发送交易消息 + MessageSender.send_dingtalk_message( + f"信号:{direction},开仓金额:{amount:.2f}", + is_error=False + ) + + # 再次同步持仓状态(交易后) + if not self.sync_position_status(): + return + + # 发送持仓信息 + balance = self.api_client.get_available_balance() + current_price = float(self.kline_3["close"]) + if balance is not None: + position_info = self.position_manager.position_data + msg = MessageSender.format_position_message( + position_info, + current_price, + balance + ) + MessageSender.send_dingtalk_message(msg, is_error=False) + + def run(self) -> None: + """主运行循环""" + # 初始化 + if not self.initialize(): + return + + logger.info("系统初始化完成,开始运行...") + + # 主循环 + while True: + try: + # 更新进度条 + self.update_progress_bar() + + # 获取当前K线时间戳 + current_timestamp = TimeUtils.get_current_kline_timestamp() + + # 检查是否已经处理过这个K线 + if self.last_kline_timestamp == current_timestamp: + time.sleep(10) + continue + + # 获取并更新K线数据 + if not self.fetch_and_update_kline(): + time.sleep(10) + continue + + # 处理交易逻辑 + self.process_trading_logic() + + # 重置进度条 + if self.pbar: + self.pbar.reset() + + # 等待一段时间 + time.sleep(5) + + except KeyboardInterrupt: + logger.info("收到中断信号,正在退出...") + break + except Exception as e: + logger.error(f"运行出错: {e}") + MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True) + time.sleep(10) + + def action(self) -> None: + """兼容旧接口的方法名""" + self.run() + + +if __name__ == '__main__': + transaction = WeexTransaction(tge_id=146473) + transaction.action()