""" WebSea ETH-USDT 永续合约自动交易系统 优化版本:改进代码结构、模块化和可读性 """ import time import datetime from typing import Optional, Dict, List, Tuple from tqdm import tqdm from loguru import logger from DrissionPage import ChromiumOptions, ChromiumPage from curl_cffi import requests from 交易.tools import send_dingtalk_message class KlineAnalyzer: """K线分析器:负责K线数据分析和信号判断""" @staticmethod def is_bullish(candle: Dict) -> bool: """判断是否为阳线""" return float(candle['close']) > float(candle['open']) @staticmethod def is_bearish(candle: Dict) -> bool: """判断是否为阴线""" return float(candle['close']) < float(candle['open']) @staticmethod def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]: """ 包住形态信号判定(仅30分钟K线): - 前跌后涨包住 -> 做多 - 前涨后跌包住 -> 做空 Returns: (direction, signal_type): direction为"long"或"short",signal_type为信号类型 """ c_open, c_close = float(curr['open']), float(curr['close']) p_open = float(prev['open']) # 前跌后涨包住 -> 做多 if (KlineAnalyzer.is_bullish(curr) and KlineAnalyzer.is_bearish(prev) and int(c_close) >= int(p_open)): return "long", "bear_bull_engulf" # 前涨后跌包住 -> 做空 if (KlineAnalyzer.is_bearish(curr) and KlineAnalyzer.is_bullish(prev) and int(c_close) <= int(p_open)): return "short", "bull_bear_engulf" return None, None class BrowserManager: """浏览器管理器:负责浏览器的启动、接管和标签页管理""" def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict): self.tge_id = tge_id self.tge_url = tge_url self.tge_headers = tge_headers self.tge_port: Optional[int] = None self.page: Optional[ChromiumPage] = None def open_browser(self) -> bool: """打开浏览器并获取端口""" try: response = requests.post( f"{self.tge_url}/api/browser/start", json={"envId": self.tge_id}, headers=self.tge_headers, timeout=10 ) self.tge_port = response.json()["data"]["port"] return True except Exception as e: logger.error(f"打开浏览器失败: {e}") return False def take_over_browser(self) -> bool: """接管浏览器""" if not self.tge_port: logger.error("浏览器端口未设置") return False try: co = ChromiumOptions() co.set_local_port(self.tge_port) self.page = ChromiumPage(addr_or_opts=co) self.page.set.window.max() return True except Exception as e: logger.error(f"接管浏览器失败: {e}") return False def close_extra_tabs(self) -> bool: """关闭多余的标签页,只保留第一个""" if not self.page: return False try: tabs = self.page.get_tabs() for idx, tab in enumerate(tabs): if idx == 0: continue tab.close() return True except Exception as e: logger.warning(f"关闭多余标签页失败: {e}") return False class ApiClient: """API客户端:负责与WebSea API的交互""" def __init__(self): self.session = requests.Session() self.headers: Dict = {} self.cookies: Dict = {} def update_session(self, cookies: Dict, headers: Dict) -> None: """更新session的cookies和headers""" self.session.cookies.update(cookies) self.session.headers.update(headers) def get_kline_data(self, symbol: str = 'ETH-USDT', period: str = '30min', days: int = 10) -> List[Dict]: """ 获取K线数据 Args: symbol: 交易对符号 period: K线周期 days: 获取最近N天的数据 Returns: K线数据列表 """ now_ts = int(time.time()) start_ts = int(time.time() - 86400 * days) params = { 'symbol': symbol, 'period': period, 'start': str(start_ts), 'end': str(now_ts), } for attempt in range(3): logger.info(f"获取最新K线数据:第{attempt + 1}次尝试...") try: response = self.session.get( 'https://capi.websea.com/webApi/market/getKline', params=params, timeout=5 ) result = response.json()["result"]['data'] kline_data = [] for data in result: kline_data.append({ 'id': int(data["id"]), 'open': float(data["open"]), 'high': float(data["high"]), 'low': float(data["low"]), 'close': float(data["close"]) }) return kline_data except Exception as e: logger.warning(f"获取K线数据失败: {e}") if attempt < 2: time.sleep(1) return [] def get_available_balance(self) -> Optional[float]: """获取可用余额""" for attempt in range(3): try: response = self.session.get( 'https://capi.websea.com/webApi/asset/account', timeout=5 ) return float(response.json()["result"]["available"]) except Exception as e: logger.warning(f"获取余额失败: {e}") if attempt < 2: time.sleep(1) return None def get_position_status(self) -> Tuple[bool, Optional[Dict]]: """ 获取持仓状态 Returns: (success, position_data): success为是否成功,position_data为持仓信息 """ for attempt in range(3): try: response = self.session.get( 'https://capi.websea.com/webApi/entrust/holdPosition', timeout=5 ) resp_data = response.json() if not resp_data["result"]: return True, {"type": 0, "position": None} position = resp_data["result"][0] position_info = { "type": position["type"], # 1: 多, 2: 空 "position": position } return True, position_info except Exception as e: logger.warning(f"获取持仓状态失败: {e}") if attempt < 2: time.sleep(1) return False, None class TokenManager: """Token管理器:负责获取和更新认证token""" def __init__(self, api_client: ApiClient, page: ChromiumPage): self.api_client = api_client self.page = page def get_token(self) -> bool: """从浏览器中获取token和cookies""" try: tab = self.page.new_tab() tab.listen.start("api.websea.com/webApi/zendesk/url") tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") res = tab.listen.wait() # 提取cookies cookies = {} for cookie in res.request.cookies: if not cookie["name"].startswith(':'): cookies[cookie["name"]] = cookie["value"] # 提取headers headers = {} if "token" in res.request.headers: headers["token"] = res.request.headers["token"] tab.close() # 检查是否有shareToken if cookies.get('shareToken'): self.api_client.update_session(cookies, headers) return True else: logger.warning("未获取到shareToken") return False except Exception as e: logger.error(f"获取token失败: {e}") return False class TradingExecutor: """交易执行器:负责执行交易操作""" def __init__(self, page: ChromiumPage, api_client: ApiClient): self.page = page self.api_client = api_client def navigate_to_trading_page(self) -> bool: """导航到交易页面""" try: self.page.ele('x://*[normalize-space(text())= "市价"]', timeout=15).click() time.sleep(1) return True except Exception as e: logger.error(f"导航到交易页面失败: {e}") return False def close_all_positions(self) -> bool: """平仓所有持仓""" try: self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True) time.sleep(1) self.page.actions.click('x://*[normalize-space(text())= "市价全平"]') time.sleep(3) return True except Exception as e: logger.error(f"平仓失败: {e}") return False def open_long(self, amount: float) -> bool: """开多""" try: self.page.ele('x://*[@id="amountInput"]').input(amount) time.sleep(1) self.page.actions.click('x://*[normalize-space(text())= "买入/做多"]') return True except Exception as e: logger.error(f"开多失败: {e}") return False def open_short(self, amount: float) -> bool: """开空""" try: self.page.ele('x://*[@id="amountInput"]').input(amount) time.sleep(1) self.page.actions.click('x://*[normalize-space(text())= "卖出/做空"]') return True except Exception as e: logger.error(f"开空失败: {e}") return False def execute_trade(self, direction: str, current_position: int, amount: float) -> bool: """ 执行交易操作 Args: direction: 交易方向 "long" 或 "short" current_position: 当前持仓状态 -1:空, 0:无, 1:多 amount: 交易金额 Returns: 是否成功 """ if not self.navigate_to_trading_page(): return False try: # 开多 if direction == "long": if current_position == 0: logger.success(f"{datetime.datetime.now()},开多,金额:{amount}") self.open_long(amount) elif current_position == -1: logger.success(f"{datetime.datetime.now()},反手平空做多,金额:{amount}") self.close_all_positions() self.open_long(amount) # 开空 elif direction == "short": if current_position == 0: logger.success(f"{datetime.datetime.now()},开空,金额:{amount}") self.open_short(amount) elif current_position == 1: logger.success(f"{datetime.datetime.now()},反手平多做空,金额:{amount}") self.close_all_positions() self.open_short(amount) return True except Exception as e: logger.error(f"执行交易失败: {e}") return False class PositionManager: """持仓管理器:负责持仓状态管理和止损""" def __init__(self, trading_executor: TradingExecutor): self.trading_executor = trading_executor self.current_position: int = 0 # -1:空, 0:无, 1:多 def update_position(self, position_type: int) -> None: """更新持仓状态""" self.current_position = position_type def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool: """ 检查止损条件 Args: kline_1: 倒数第二根K线 kline_2: 倒数第一根K线 Returns: 是否执行了止损 """ try: # 持多仓,连续两根阴线,平多 if self.current_position == 1: if (KlineAnalyzer.is_bearish(kline_1) and KlineAnalyzer.is_bearish(kline_2)): logger.success( f"{datetime.datetime.now()},止损信号:连续两根阴线,平多" ) self.trading_executor.close_all_positions() self.current_position = 0 return True # 持空仓,连续两根阳线,平空 elif self.current_position == -1: if (KlineAnalyzer.is_bullish(kline_1) and KlineAnalyzer.is_bullish(kline_2)): logger.success( f"{datetime.datetime.now()},止损信号:连续两根阳线,平空" ) self.trading_executor.close_all_positions() self.current_position = 0 return True except Exception as e: logger.error(f"止损检查失败: {e}") return False class MessageSender: """消息发送器:负责发送钉钉消息""" @staticmethod def send_dingtalk_message(message_content: str, is_error: bool = False) -> None: """ 发送钉钉消息 Args: message_content: 消息内容 is_error: 是否为错误消息(错误消息会发送多次) """ if is_error: prefix = "❌websea:" count = 15 else: prefix = "🔔websea:" count = 1 for _ in range(count): send_dingtalk_message(f"{prefix}{message_content}") @staticmethod def format_position_message(position_data: Optional[Dict], available_balance: float) -> str: """格式化持仓信息消息""" if position_data and position_data.get("position"): pos = position_data["position"] direction = "多" if int(pos['type']) == 1 else "空" contracts = int(pos['number']) eth_amount = float(pos['numberConvert']) open_price = float(pos['openPriceAvg']) current_price = float(pos['markPrice']) unrealized_pnl = float(pos['unProfitLoss']) pnl_rate = float(pos['profitRate']) return ( f"【WebSea ETH-USDT 永续持仓】\n" f"持仓方向:{direction}\n" f"持仓张数:{contracts} 张\n" f"持仓数量(eth):{eth_amount:.3f} ETH\n" f"持仓数量(usdt):{float(pos['numberConvertU']) / 100:.3f} USDT\n" f"开仓均价:{open_price:.2f} USDT\n" f"标记现价:{current_price:.2f} USDT\n" f"浮动盈亏:{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)\n" f"账户可用:{available_balance:.2f} USDT" ) else: return ( f"【WebSea ETH-USDT 永续持仓】\n" f"持仓方向:无\n" f"账户可用:{available_balance:.2f} USDT" ) class TimeUtils: """时间工具类""" @staticmethod def get_current_kline_timestamp() -> int: """ 获取当前K线的时间戳(30分钟K线的整点或30分时刻) Returns: 时间戳 """ current_timestamp = time.time() current_datetime = datetime.datetime.fromtimestamp(current_timestamp) # 计算距离当前时间最近的整点或30分时刻 if current_datetime.minute < 30: target_datetime = current_datetime.replace( minute=0, second=0, microsecond=0 ) else: target_datetime = current_datetime.replace( minute=30, second=0, microsecond=0 ) return int(target_datetime.timestamp()) @staticmethod def get_formatted_time() -> str: """获取格式化的当前时间字符串""" timestamp = time.time() local_time = time.localtime(timestamp) return time.strftime("%Y-%m-%d %H:%M:%S", local_time) @staticmethod def get_progress_bar_value() -> int: """获取进度条的当前值(0-29)""" current_time = time.localtime() current_minute = current_time.tm_min return current_minute if current_minute < 30 else current_minute - 30 class WeexTransaction: """WebSea自动交易主类""" def __init__(self, tge_id: int): # 配置 self.tge_id = tge_id self.tge_url = "http://127.0.0.1:50326" self.tge_headers = { "Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91", "Content-Type": "application/json" } self.trading_url = "https://www.websea.com/zh-CN/futures/ETH-USDT" # 组件初始化 self.browser_manager = BrowserManager(self.tge_id, self.tge_url, self.tge_headers) self.api_client = ApiClient() self.position_manager = None # 需要在浏览器初始化后创建 self.trading_executor = None # 需要在浏览器初始化后创建 self.token_manager = None # 需要在浏览器初始化后创建 # 状态 self.pbar: Optional[tqdm] = None self.last_kline_id: Optional[int] = None self.kline_1: Optional[Dict] = None self.kline_2: Optional[Dict] = None self.kline_3: Optional[Dict] = None def initialize(self) -> bool: """初始化所有组件""" # 打开浏览器 if not self.browser_manager.open_browser(): logger.error("打开浏览器失败") MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True) return False # 接管浏览器 if not self.browser_manager.take_over_browser(): logger.error("接管浏览器失败") MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True) return False # 关闭多余标签页 self.browser_manager.close_extra_tabs() # 初始化需要page的组件 page = self.browser_manager.page self.trading_executor = TradingExecutor(page, self.api_client) self.position_manager = PositionManager(self.trading_executor) self.token_manager = TokenManager(self.api_client, page) # 打开交易页面 page.get(url=self.trading_url) # 初始化进度条 self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) return True def update_progress_bar(self) -> None: """更新进度条""" if self.pbar: self.pbar.n = TimeUtils.get_progress_bar_value() self.pbar.refresh() def fetch_and_update_kline(self) -> bool: """获取并更新K线数据""" kline_data = self.api_client.get_kline_data() if not kline_data: logger.warning("获取K线数据失败") MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True) return False # 排序并取最后三根 sorted_data = sorted(kline_data, key=lambda x: x["id"]) self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:] # 检查是否是新K线 current_kline_id = self.kline_3["id"] if self.last_kline_id == current_kline_id: return False # 没有新K线 self.last_kline_id = current_kline_id return True def sync_position_status(self) -> bool: """同步持仓状态""" success, position_info = self.api_client.get_position_status() if not success: logger.error("获取持仓状态失败") MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True) return False if position_info: pos_type = position_info.get("type", 0) if pos_type == 1: self.position_manager.current_position = 1 elif pos_type == 2: self.position_manager.current_position = -1 else: self.position_manager.current_position = 0 return True def process_trading_logic(self) -> None: """处理交易逻辑""" page = self.browser_manager.page # 刷新页面 page.get(url=self.trading_url) # 获取token if not self.token_manager.get_token(): logger.error("获取token失败") MessageSender.send_dingtalk_message("获取token失败", is_error=True) return # 同步持仓状态 if not self.sync_position_status(): return # 检查止损 try: self.position_manager.check_stop_loss(self.kline_1, self.kline_2) except Exception as e: logger.error(f"止损检查出错: {e}") MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True) return # 检查交易信号 direction, signal_type = KlineAnalyzer.check_engulfing_signal( prev=self.kline_1, curr=self.kline_2 ) if direction: # 获取可用余额 balance = self.api_client.get_available_balance() if balance is None: logger.error("获取可用余额失败") MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True) return amount = balance / 100 # 执行交易 if not self.trading_executor.execute_trade( direction=direction, current_position=self.position_manager.current_position, amount=amount ): MessageSender.send_dingtalk_message( f"交易执行失败,方向:{direction}", is_error=True ) return # 更新持仓状态 if direction == "long": self.position_manager.current_position = 1 elif direction == "short": self.position_manager.current_position = -1 # 发送交易消息 MessageSender.send_dingtalk_message( f"信号:{direction},开仓金额:{amount:.2f}", is_error=False ) # 再次同步持仓状态(交易后) if not self.sync_position_status(): return # 发送持仓信息 balance = self.api_client.get_available_balance() if balance is not None: success, position_info = self.api_client.get_position_status() if success: msg = MessageSender.format_position_message(position_info, balance) MessageSender.send_dingtalk_message(msg, is_error=False) def run(self) -> None: """主运行循环""" # 初始化 if not self.initialize(): return logger.info("系统初始化完成,开始运行...") # 主循环 while True: try: # 更新进度条 self.update_progress_bar() # 获取当前K线时间戳 current_timestamp = TimeUtils.get_current_kline_timestamp() # 检查是否是新K线(避免重复处理) if self.kline_3 and self.kline_3["id"] == current_timestamp: time.sleep(10) continue # 获取并更新K线数据 if not self.fetch_and_update_kline(): time.sleep(10) continue # 处理交易逻辑 self.process_trading_logic() # 重置进度条 if self.pbar: self.pbar.reset() # 等待一段时间 time.sleep(5) except KeyboardInterrupt: logger.info("收到中断信号,正在退出...") break except Exception as e: logger.error(f"运行出错: {e}") MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True) time.sleep(10) def action(self) -> None: """兼容旧接口的方法名""" self.run() if __name__ == '__main__': transaction = WeexTransaction(tge_id=191303) transaction.action()