""" WEEX ETH-USDT 永续合约自动交易系统 优化版本:改进代码结构、模块化和可读性 """ import time import datetime from typing import Optional, Dict, List, Tuple from tqdm import tqdm from loguru import logger from DrissionPage import ChromiumOptions, ChromiumPage from curl_cffi import requests from bit_tools import openBrowser from 交易.tools import send_dingtalk_message # ==================== 配置常量 ==================== class Config: """配置类:集中管理所有配置常量""" # TGE浏览器配置 TGE_URL = "http://127.0.0.1:50326" TGE_AUTHORIZATION = "Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91" # WEEX API配置 CONTRACT_ID = "10000002" PRODUCT_CODE = "cmt_ethusdt" KLINE_TYPE = "MINUTE_30" KLINE_LIMIT = 300 # 交易页面URL TRADING_URL = "https://www.weex.com/zh-CN/futures/ETH-USDT" # 交易配置 POSITION_RATIO = 100 # 开仓金额比例(余额的1/100) # 重试配置 MAX_RETRY_ATTEMPTS = 3 RETRY_DELAY = 1 # 秒 # ==================== K线分析器 ==================== class KlineAnalyzer: """K线分析器:负责K线数据分析和信号判断""" @staticmethod def is_bullish(candle: Dict) -> bool: """判断是否为阳线""" return float(candle['close']) > float(candle['open']) @staticmethod def is_bearish(candle: Dict) -> bool: """判断是否为阴线""" return float(candle['close']) < float(candle['open']) @staticmethod def check_engulfing_signal(prev: Dict, curr: Dict) -> Tuple[Optional[str], Optional[str]]: """ 包住形态信号判定(30分钟K线): - 前跌后涨包住 -> 做多 - 前涨后跌包住 -> 做空 Returns: (direction, signal_type): direction为"long"或"short",signal_type为信号类型 """ p_open, p_close = float(prev['open']), float(prev['close']) c_open, c_close = float(curr['open']), float(curr['close']) # 前跌后涨包住 -> 做多 if (KlineAnalyzer.is_bullish(curr) and KlineAnalyzer.is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open)): return "long", "bear_bull_engulf" # 前涨后跌包住 -> 做空 if (KlineAnalyzer.is_bearish(curr) and KlineAnalyzer.is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open)): return "short", "bull_bear_engulf" return None, None # ==================== 浏览器管理器 ==================== class BrowserManager: """浏览器管理器:负责浏览器的启动、接管和标签页管理""" def __init__(self, tge_id: int, tge_url: str, tge_headers: Dict): self.tge_id = tge_id self.tge_url = tge_url self.tge_headers = tge_headers self.tge_port: Optional[int] = None self.page: Optional[ChromiumPage] = None def openBrowser(self): """打开 TGE 对应浏览器实例""" try: bit_port = openBrowser(id=self.tge_id) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) self.tge_port = bit_port return True except: return False def take_over_browser(self) -> bool: """接管浏览器""" if not self.tge_port: logger.error("浏览器端口未设置") return False try: co = ChromiumOptions() co.set_local_port(self.tge_port) self.page = ChromiumPage(addr_or_opts=co) self.page.set.window.max() logger.success("成功接管浏览器") return True except Exception as e: logger.error(f"接管浏览器失败: {e}") return False def close_extra_tabs(self) -> bool: """关闭多余的标签页,只保留第一个""" if not self.page: return False try: tabs = self.page.get_tabs() closed_count = 0 for idx, tab in enumerate(tabs): if idx == 0: continue tab.close() closed_count += 1 if closed_count > 0: logger.info(f"已关闭{closed_count}个多余标签页") return True except Exception as e: logger.warning(f"关闭多余标签页失败: {e}") return False # ==================== WEEX API客户端 ==================== class WEEXApiClient: """WEEX API客户端:负责与WEEX API的交互""" def __init__(self): self.session = requests.Session() self.headers: Optional[Dict] = None def update_headers(self, headers: Dict) -> None: """更新session的headers""" if not self.headers: self.session.headers = headers else: self.session.headers.update(headers) self.headers = headers def get_kline_data(self, contract_id: str = Config.CONTRACT_ID, product_code: str = Config.PRODUCT_CODE, kline_type: str = Config.KLINE_TYPE, limit: int = Config.KLINE_LIMIT) -> List[Dict]: """ 获取K线数据 Args: contract_id: 合约ID product_code: 产品代码 kline_type: K线类型 limit: 返回数量限制 Returns: K线数据列表 """ params = { 'contractId': contract_id, 'productCode': product_code, 'priceType': 'LAST_PRICE', 'klineType': kline_type, 'limit': str(limit), 'timeZone': 'string', 'languageType': '1', 'sign': 'SIGN', } for attempt in range(Config.MAX_RETRY_ATTEMPTS): logger.info(f"获取最新K线数据:第{attempt + 1}/{Config.MAX_RETRY_ATTEMPTS}次尝试...") try: response = self.session.get( 'https://http-gateway2.elconvo.com/api/v1/public/quote/v1/getKlineV2', params=params, timeout=15 ) # 检查响应状态码 if response.status_code != 200: logger.warning(f"获取K线数据失败,HTTP状态码:{response.status_code}") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) continue # 尝试解析JSON try: response_data = response.json() except Exception as json_error: logger.warning(f"JSON解析失败,响应内容:{response.text[:200]}...") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) continue # 检查数据结构 if "data" not in response_data or "dataList" not in response_data["data"]: logger.warning(f"响应数据格式不正确:{response_data}") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) continue result = response_data["data"]["dataList"] kline_data = [] for item in result: # WEEX API返回格式: [close, high, low, open, id] kline_data.append({ 'id': int(item[4]), 'open': float(item[3]), 'high': float(item[1]), 'low': float(item[2]), 'close': float(item[0]) }) logger.success(f"成功获取{len(kline_data)}条K线数据") return kline_data except Exception as e: logger.warning(f"获取K线数据失败(第{attempt + 1}次尝试): {e}") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) logger.error("获取K线数据失败:已重试3次仍未成功") return [] def get_available_balance(self) -> Optional[float]: """获取可用余额""" for attempt in range(Config.MAX_RETRY_ATTEMPTS): try: response = self.session.post( 'https://gateway2.ngsvsfx.cn/v1/gw/assetsWithBalance/new', timeout=15 ) balance = float(response.json()["data"]["newContract"]["balanceList"][0]["available"]) logger.success(f"成功获取可用余额: {balance}") return balance except Exception as e: logger.warning(f"获取余额失败(第{attempt + 1}次尝试): {e}") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) logger.error("获取余额失败:已重试3次仍未成功") return None def get_position_status(self) -> Tuple[bool, Optional[List]]: """ 获取持仓状态 Returns: (success, position_data): success为是否成功,position_data为持仓信息列表 """ json_data = { 'filterContractIdList': [10000002], 'limit': 100, 'languageType': 0, 'sign': 'SIGN', 'timeZone': 'string', } for attempt in range(Config.MAX_RETRY_ATTEMPTS): try: response = self.session.post( # 'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderFillTransactionPage', 'https://http-gateway2.janapw.com/api/v1/private/order/v2/getHistoryOrderFillTransactionPage', json=json_data, timeout=15 ) datas = response.json()["data"]["dataList"] if not datas: return True, None latest_order = datas return True, latest_order except Exception as e: logger.warning(f"获取持仓状态失败(第{attempt + 1}次尝试): {e}") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) return False, None # ==================== Token管理器 ==================== class TokenManager: """Token管理器:负责获取和更新认证token""" def __init__(self, api_client: WEEXApiClient, page: ChromiumPage): self.api_client = api_client self.page = page def get_token(self) -> bool: """从浏览器中获取U-TOKEN""" tab = self.page.new_tab() tab.listen.start("/user/security/getLanguageType") try: for attempt in range(Config.MAX_RETRY_ATTEMPTS): try: tab.get(url=Config.TRADING_URL) res = tab.listen.wait(timeout=5) if res.request.headers.get("U-TOKEN"): headers = res.request.headers self.api_client.update_headers(headers) logger.success("成功获取token") return True except Exception as e: logger.warning(f"获取token尝试{attempt + 1}失败: {e}") if attempt < Config.MAX_RETRY_ATTEMPTS - 1: time.sleep(Config.RETRY_DELAY) logger.error("获取token失败:已重试3次仍未成功") return False finally: tab.close() # ==================== 交易执行器 ==================== class TradingExecutor: """交易执行器:负责执行交易操作""" def __init__(self, page: ChromiumPage, api_client: WEEXApiClient): self.page = page self.api_client = api_client def navigate_to_trading_page(self) -> bool: """导航到交易页面(选择市价)""" try: self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click() time.sleep(1) return True except Exception as e: logger.error(f"导航到交易页面失败: {e}") return False def close_all_positions(self) -> bool: """平仓所有持仓(闪电平仓)""" try: self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').scroll.to_see(center=True) time.sleep(1) self.page.ele('x:(//span[normalize-space(text()) = "闪电平仓"])').click(by_js=True) time.sleep(3) logger.success("成功执行平仓操作") return True except Exception as e: logger.error(f"平仓失败: {e}") return False def open_long(self, amount: float) -> bool: """开多""" try: self.page.ele('x://input[@placeholder="请输入数量"]').input(amount) time.sleep(1) self.page.ele('x://*[normalize-space(text()) ="买入开多"]').click(by_js=True) logger.success(f"成功开多,金额:{amount}") return True except Exception as e: logger.error(f"开多失败: {e}") return False def open_short(self, amount: float) -> bool: """开空""" try: self.page.ele('x://input[@placeholder="请输入数量"]').input(amount) time.sleep(1) self.page.ele('x://*[normalize-space(text()) ="卖出开空"]').click(by_js=True) logger.success(f"成功开空,金额:{amount}") return True except Exception as e: logger.error(f"开空失败: {e}") return False def execute_trade(self, direction: str, current_position: int, amount: float) -> bool: """ 执行交易操作 Args: direction: 交易方向 "long" 或 "short" current_position: 当前持仓状态 -1:空, 0:无, 1:多 amount: 交易金额 Returns: 是否成功(如果已持有相同方向仓位,返回True但不执行操作) """ # 检查是否已经持有相同方向的仓位 if (direction == "long" and current_position == PositionManager.POSITION_LONG) or \ (direction == "short" and current_position == PositionManager.POSITION_SHORT): logger.info(f"已持有{direction}方向仓位,无需重复开仓") return True # 返回True表示"状态正常",虽然没有执行交易 if not self.navigate_to_trading_page(): return False try: # 开多 if direction == "long": if current_position == 0: logger.info(f"{datetime.datetime.now()},开多,金额:{amount}") return self.open_long(amount) elif current_position == -1: logger.info(f"{datetime.datetime.now()},反手平空做多,金额:{amount}") if self.close_all_positions(): time.sleep(1) return self.open_long(amount) # 开空 elif direction == "short": if current_position == 0: logger.info(f"{datetime.datetime.now()},开空,金额:{amount}") return self.open_short(amount) elif current_position == 1: logger.info(f"{datetime.datetime.now()},反手平多做空,金额:{amount}") if self.close_all_positions(): time.sleep(1) return self.open_short(amount) return False except Exception as e: logger.error(f"执行交易失败: {e}") return False # ==================== 持仓管理器 ==================== class PositionManager: """持仓管理器:负责持仓状态管理和止损""" # 持仓状态常量 POSITION_SHORT = -1 # 做空 POSITION_NONE = 0 # 无持仓 POSITION_LONG = 1 # 做多 def __init__(self, trading_executor: TradingExecutor): self.trading_executor = trading_executor self.current_position: int = self.POSITION_NONE self.position_data: Optional[List] = None def update_position(self, position_data: Optional[List]) -> None: """ 更新持仓状态 Args: position_data: 持仓数据列表 """ self.position_data = position_data # 修复:统一使用position_data if not position_data: self.current_position = self.POSITION_NONE return position_data.reverse() start = 0 start1 = 0 for _, i in enumerate(position_data): direction = i.get("legacyOrderDirection") if direction == "CLOSE_SHORT": start = 0 elif direction == "CLOSE_LONG": start1 = 0 elif direction == "OPEN_SHORT": start -= 1 elif direction == "OPEN_LONG": start1 += 1 # direction = position_data.get("legacyOrderDirection") if start1: self.current_position = self.POSITION_LONG elif start: self.current_position = self.POSITION_SHORT else: self.current_position = self.POSITION_NONE def check_stop_loss(self, kline_1: Dict, kline_2: Dict) -> bool: """ 检查止损条件 Args: kline_1: 倒数第二根K线 kline_2: 倒数第一根K线 Returns: 是否执行了止损 """ try: # 持多仓,连续两根阴线,平多 if self.current_position == self.POSITION_LONG: if (KlineAnalyzer.is_bearish(kline_1) and KlineAnalyzer.is_bearish(kline_2)): logger.success( f"{datetime.datetime.now()},止损信号:连续两根阴线,平多" ) if self.trading_executor.close_all_positions(): self.current_position = self.POSITION_NONE return True # 持空仓,连续两根阳线,平空 elif self.current_position == self.POSITION_SHORT: if (KlineAnalyzer.is_bullish(kline_1) and KlineAnalyzer.is_bullish(kline_2)): logger.success( f"{datetime.datetime.now()},止损信号:连续两根阳线,平空" ) if self.trading_executor.close_all_positions(): self.current_position = self.POSITION_NONE return True except Exception as e: logger.error(f"止损检查失败: {e}") return False # ==================== 消息发送器 ==================== class MessageSender: """消息发送器:负责发送钉钉消息""" @staticmethod def send_dingtalk_message(message_content: str, is_error: bool = False) -> None: """ 发送钉钉消息 Args: message_content: 消息内容 is_error: 是否为错误消息(错误消息会发送多次) """ if is_error: prefix = "❌weex:" count = 15 else: prefix = "🔔weex:" count = 1 for _ in range(count): send_dingtalk_message(f"{prefix}{message_content}") @staticmethod def format_position_message(position_data: Optional[List], current_position: int, current_price: float, available_balance: float) -> str: """ 格式化持仓信息消息 Args: position_data: 持仓数据列表 current_position: 当前持仓方向 (-1:空, 0:无, 1:多) current_price: 当前价格 available_balance: 可用余额 """ # 根据current_position确定持仓方向 if current_position == PositionManager.POSITION_LONG: direction = "多" elif current_position == PositionManager.POSITION_SHORT: direction = "空" else: direction = "无" # 如果没有持仓,直接返回无持仓消息 if current_position == PositionManager.POSITION_NONE: return ( "**【WEEX ETHUSDT 永续持仓监控】**\n\n" f"**持仓方向**:无\n" f"**当前现价**:{current_price:.2f} USDT\n" f"**账户可用余额**:{available_balance:.2f} USDT" ) # 如果有持仓,尝试从position_data中提取持仓信息 if position_data and isinstance(position_data, list) and len(position_data) > 0: try: # 从列表中查找最新的开仓记录 open_order = None total_fill_size = 0.0 total_fill_value = 0.0 for order in reversed(position_data): # 从最新到最旧遍历 legacy_direction = order.get("legacyOrderDirection", "") # 查找对应方向的开仓记录 if current_position == PositionManager.POSITION_LONG and legacy_direction == "OPEN_LONG": fill_size = float(order.get('fillSize', 0)) fill_value = float(order.get('fillValue', 0)) if fill_size > 0: total_fill_size += fill_size total_fill_value += fill_value if open_order is None: open_order = order elif current_position == PositionManager.POSITION_SHORT and legacy_direction == "OPEN_SHORT": fill_size = float(order.get('fillSize', 0)) fill_value = float(order.get('fillValue', 0)) if fill_size > 0: total_fill_size += fill_size total_fill_value += fill_value if open_order is None: open_order = order # 如果遇到平仓记录,停止累计 if (current_position == PositionManager.POSITION_LONG and legacy_direction == "CLOSE_LONG") or \ (current_position == PositionManager.POSITION_SHORT and legacy_direction == "CLOSE_SHORT"): break if total_fill_size > 0: # 计算开仓均价 open_avg_price = total_fill_value / total_fill_size if total_fill_size > 0 else 0 # 计算浮动盈亏 if direction == "多": unrealized_pnl = total_fill_size * (current_price - open_avg_price) pnl_rate = (current_price - open_avg_price) / open_avg_price * 100 if open_avg_price > 0 else 0 else: # 空 unrealized_pnl = total_fill_size * (open_avg_price - current_price) pnl_rate = (open_avg_price - current_price) / open_avg_price * 100 if open_avg_price > 0 else 0 pnl_str = f"{unrealized_pnl:+.2f} USDT ({pnl_rate:+.2f}%)" current_value = total_fill_size * current_price return ( "**【WEEX ETHUSDT 永续持仓监控】**\n\n" f"**持仓方向**:{direction}\n" f"**当前现价**:{current_price:.2f} USDT\n" f"**开仓均价**:{open_avg_price:.2f} USDT\n" f"**持仓数量(eth)**:{total_fill_size:.3f} ETH\n" f"**持仓数量(usdt)**:{total_fill_value / 100:.2f} USDT\n" f"**名义价值**:{current_value:.2f} USDT\n" f"**浮动盈亏**:{pnl_str}\n" f"**账户可用余额**:{available_balance:.2f} USDT" ) except Exception as e: logger.warning(f"解析持仓数据失败: {e}") # 如果无法解析持仓数据,至少显示持仓方向 return ( "**【WEEX ETHUSDT 永续持仓监控】**\n\n" f"**持仓方向**:{direction}\n" f"**当前现价**:{current_price:.2f} USDT\n" f"**账户可用余额**:{available_balance:.2f} USDT" ) # ==================== 时间工具类 ==================== class TimeUtils: """时间工具类""" @staticmethod def get_current_kline_timestamp() -> int: """ 获取当前K线的时间戳(30分钟K线的整点或30分时刻,毫秒) Returns: 时间戳(毫秒) """ current_timestamp = time.time() current_datetime = datetime.datetime.fromtimestamp(current_timestamp) # 计算距离当前时间最近的整点或30分时刻 if current_datetime.minute < 30: target_datetime = current_datetime.replace( minute=0, second=0, microsecond=0 ) else: target_datetime = current_datetime.replace( minute=30, second=0, microsecond=0 ) return int(target_datetime.timestamp()) * 1000 @staticmethod def get_formatted_time() -> str: """获取格式化的当前时间字符串""" timestamp = time.time() local_time = time.localtime(timestamp) return time.strftime("%Y-%m-%d %H:%M:%S", local_time) @staticmethod def get_progress_bar_value() -> int: """获取进度条的当前值(0-29)""" current_time = time.localtime() current_minute = current_time.tm_min return current_minute if current_minute < 30 else current_minute - 30 # ==================== 主交易类 ==================== class WeexTransaction: """WEEX自动交易主类""" def __init__(self, tge_id): # 配置 self.tge_id = tge_id self.tge_headers = { "Authorization": Config.TGE_AUTHORIZATION, "Content-Type": "application/json" } # 组件初始化 self.browser_manager = BrowserManager(tge_id, Config.TGE_URL, self.tge_headers) self.api_client = WEEXApiClient() self.position_manager = None # 需要在浏览器初始化后创建 self.trading_executor = None # 需要在浏览器初始化后创建 self.token_manager = None # 需要在浏览器初始化后创建 # 状态 self.pbar: Optional[tqdm] = None self.last_kline_timestamp: Optional[int] = None self.kline_1: Optional[Dict] = None self.kline_2: Optional[Dict] = None self.kline_3: Optional[Dict] = None def initialize(self) -> bool: """初始化所有组件""" # 打开浏览器 if not self.browser_manager.openBrowser(): logger.error("打开浏览器失败") MessageSender.send_dingtalk_message("打开浏览器失败", is_error=True) return False # 接管浏览器 if not self.browser_manager.take_over_browser(): logger.error("接管浏览器失败") MessageSender.send_dingtalk_message("接管浏览器失败", is_error=True) return False # 关闭多余标签页 self.browser_manager.close_extra_tabs() # 初始化需要page的组件 page = self.browser_manager.page self.trading_executor = TradingExecutor(page, self.api_client) self.position_manager = PositionManager(self.trading_executor) self.token_manager = TokenManager(self.api_client, page) # 打开交易页面 page.get(url=Config.TRADING_URL) # 初始化时先获取一次token if not self.token_manager.get_token(): logger.warning("初始化时获取token失败,将在获取K线数据时重试") # 初始化进度条 self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) logger.success("系统初始化完成") return True def update_progress_bar(self) -> None: """更新进度条""" if self.pbar: self.pbar.n = TimeUtils.get_progress_bar_value() self.pbar.refresh() def fetch_and_update_kline(self) -> bool: """获取并更新K线数据""" # 先获取token(获取K线数据需要token) if not self.token_manager.get_token(): logger.warning("获取token失败,无法获取K线数据") return False kline_data = self.api_client.get_kline_data() if not kline_data: logger.warning("获取K线数据失败") MessageSender.send_dingtalk_message("获取K线数据失败", is_error=True) return False # 排序并取最后三根 sorted_data = sorted(kline_data, key=lambda x: x["id"]) if len(sorted_data) < 3: logger.error("K线数据不足3根") return False self.kline_1, self.kline_2, self.kline_3 = sorted_data[-3:] # 检查是否是新K线 current_kline_id = self.kline_3["id"] current_timestamp = TimeUtils.get_current_kline_timestamp() # 判断K线时间是否匹配当前时间,并且是新K线 if current_kline_id != current_timestamp: return False # K线时间不匹配 if self.last_kline_timestamp == current_timestamp: return False # 已经处理过这个K线 self.last_kline_timestamp = current_timestamp return True def sync_position_status(self) -> bool: """同步持仓状态""" success, position_data = self.api_client.get_position_status() if not success: logger.error("获取持仓状态失败") MessageSender.send_dingtalk_message("获取持仓状态失败", is_error=True) return False self.position_manager.update_position(position_data) return True def process_trading_logic(self) -> None: """处理交易逻辑""" page = self.browser_manager.page # 刷新token(因为可能已经过期,但这不是必须的,因为获取K线时已经获取过了) # 如果token有效,这里会快速返回;如果无效,会重新获取 self.token_manager.get_token() # 刷新页面 page.get(url=Config.TRADING_URL) # 同步持仓状态 if not self.sync_position_status(): return # 检查止损 try: self.position_manager.check_stop_loss(self.kline_1, self.kline_2) except Exception as e: logger.error(f"止损检查出错: {e}") MessageSender.send_dingtalk_message(f"止损检查出错: {e}", is_error=True) return # 检查交易信号 direction, signal_type = KlineAnalyzer.check_engulfing_signal( prev=self.kline_1, curr=self.kline_2 ) if direction: # 检查是否已经持有相同方向的仓位 current_pos = self.position_manager.current_position if (direction == "long" and current_pos == PositionManager.POSITION_LONG) or \ (direction == "short" and current_pos == PositionManager.POSITION_SHORT): logger.info(f"已持有{direction}方向仓位,继续持仓,不执行新开仓操作") # 继续持仓,不发送开仓信息,直接返回继续执行后续的持仓信息发送 else: # 需要执行交易(新开仓或反手) # 获取可用余额 balance = self.api_client.get_available_balance() if balance is None: logger.error("获取可用余额失败") MessageSender.send_dingtalk_message("获取可用余额失败", is_error=True) return amount = int(balance / 100) # 执行交易 trade_executed = self.trading_executor.execute_trade( direction=direction, current_position=self.position_manager.current_position, amount=amount ) if not trade_executed: MessageSender.send_dingtalk_message( f"交易执行失败,方向:{direction}", is_error=True ) return # 更新持仓状态 if direction == "long": self.position_manager.current_position = PositionManager.POSITION_LONG elif direction == "short": self.position_manager.current_position = PositionManager.POSITION_SHORT # 发送交易消息(只有真正执行了开仓或反手操作才发送) MessageSender.send_dingtalk_message( f"信号:{direction},开仓金额:{amount:.2f}", is_error=False ) # 再次同步持仓状态(交易后) if not self.sync_position_status(): return # 发送持仓信息 balance = self.api_client.get_available_balance() current_price = float(self.kline_3["close"]) if balance is not None: position_info = self.position_manager.position_data current_pos = self.position_manager.current_position msg = MessageSender.format_position_message( position_info, current_pos, current_price, balance ) MessageSender.send_dingtalk_message(msg, is_error=False) def run(self) -> None: """主运行循环""" # 初始化 if not self.initialize(): return logger.info("系统初始化完成,开始运行...") # 主循环 while True: try: # 更新进度条 self.update_progress_bar() # 获取当前K线时间戳 current_timestamp = TimeUtils.get_current_kline_timestamp() # 检查是否已经处理过这个K线 if self.last_kline_timestamp == current_timestamp: time.sleep(10) continue # 获取并更新K线数据 if not self.fetch_and_update_kline(): time.sleep(10) continue # 处理交易逻辑 self.process_trading_logic() # 重置进度条 if self.pbar: self.pbar.reset() # 等待一段时间 time.sleep(5) except KeyboardInterrupt: logger.info("收到中断信号,正在退出...") break except Exception as e: logger.error(f"运行出错: {e}") MessageSender.send_dingtalk_message(f"运行出错: {e}", is_error=True) time.sleep(10) def action(self) -> None: """兼容旧接口的方法名""" self.run() if __name__ == '__main__': transaction = WeexTransaction(tge_id="86837a981aba4576be6916a0ef6ad785") transaction.action()