import sys import time from pathlib import Path from tqdm import tqdm from loguru import logger from bit_tools import openBrowser from DrissionPage import ChromiumPage from DrissionPage import ChromiumOptions from bitmart.api_contract import APIContract # --------------------------------------------------------------------------- # 开单逻辑:完全由 方案B(AI 策略)驱动 # - 方案B:strategy.ai_strategy 的 get_live_signal(period=15),返回 0=观望 1=做多 2=做空 # - 每根新 15 分钟 K 线取一次信号,再根据当前持仓转为 开多/开空/反手多/反手空,由 execute_trade 执行开单 # 前置条件: # 1. 先运行方案B训练并保存模型(如 run_scheme_b_train.py 或 strategy.compare 方案B) # 2. models/database.db 中有最新 15m/5m/1h K 线(如运行 抓取多周期K线.py) # --------------------------------------------------------------------------- sys.path.insert(0, str(Path(__file__).resolve().parent)) from strategy.ai_strategy import get_live_signal class BitmartFuturesTransaction: def __init__(self, bit_id): self.page: ChromiumPage | None = None self.api_key = "a0fb7b98464fd9bcce67e7c519d58ec10d0c38a8" self.secret_key = "4eaeba78e77aeaab1c2027f846a276d164f264a44c2c1bb1c5f3be50c8de1ca5" self.memo = "合约交易" self.contract_symbol = "ETHUSDT" self.contractAPI = APIContract(self.api_key, self.secret_key, self.memo, timeout=(5, 15)) self.start = 0 # 持仓状态: -1 空, 0 无, 1 多 self.pbar = tqdm(total=30, desc="等待K线", ncols=80) # 可选:用于长时间等待时展示进度 self.last_kline_time = None # 上一次出信号的 15 分钟 K 线 id(方案B 每根 15m 只出一次信号) # 反手频率控制 self.reverse_cooldown_seconds = 1.5 * 60 # 反手冷却时间(秒) self.reverse_min_move_pct = 0.05 # 反手最小价差过滤(百分比) self.last_reverse_time = None # 上次反手时间 # 开仓频率控制 self.open_cooldown_seconds = 60 # 开仓冷却时间(秒),两次开仓至少间隔此时长 self.last_open_time = None # 上次开仓时间 self.last_open_kline_id = None # 上次开仓所在 K 线 id,同一根 K 线只允许开仓一次 self.leverage = "100" # 高杠杆(全仓模式下可开更大仓位) self.open_type = "cross" # 全仓模式 self.risk_percent = 0 # 未使用;若启用则可为每次开仓占可用余额的百分比 self.take_profit_usd = 5 # 仓位盈利达到此金额(美元)时平仓止盈 self.stop_loss_usd = -3 # 固定止损:亏损达到 3 美元平仓 self.trailing_activation_usd = 2 # 盈利达到此金额后启动移动止损 self.trailing_distance_usd = 1.5 # 从最高盈利回撤此金额则平仓 self.max_unrealized_pnl_seen = None # 持仓期间见过的最大盈利(用于移动止损) self.open_avg_price = None # 开仓价格 self.current_amount = None # 持仓量 self.bit_id = bit_id self.default_order_size = 25 # 开仓/反手张数,统一在此修改 # 策略相关变量 self.prev_kline = None # 上一根K线 self.current_kline = None # 当前K线 self.prev_entity = None # 上一根K线实体大小 self.current_open = None # 当前K线开盘价 # API 节流:主循环与持仓/价格查询间隔,避免 429 Request too many requests self.loop_interval_seconds = 2.5 # 每轮循环最少间隔(秒) self._last_position_fetch = 0.0 self._last_price_fetch = 0.0 self._position_refresh_interval = 2.0 # 持仓/价格最少隔多久才重新请求(秒) self._position_cache = None # 上次成功的持仓状态,用于节流时复用 self._price_cache = None # 上次成功的价格,用于节流时复用 def get_klines(self): """获取最近2根K线(当前K线和上一根K线)""" try: end_time = int(time.time()) # 获取足够多的条目确保有最新的K线 response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=5, # 5分钟 start_time=end_time - 3600 * 3, # 取最近3小时 end_time=end_time )[0]["data"] # 每根: [timestamp, open, high, low, close, volume] formatted = [] for k in response: formatted.append({ 'id': int(k["timestamp"]), 'open': float(k["open_price"]), 'high': float(k["high_price"]), 'low': float(k["low_price"]), 'close': float(k["close_price"]) }) formatted.sort(key=lambda x: x['id']) # 返回最近2根K线:倒数第二根(上一根)和最后一根(当前) if len(formatted) >= 2: return formatted[-2], formatted[-1] return None, None except Exception as e: logger.error(f"获取K线异常: {e}") self.ding(text="获取K线异常", error=True) return None, None def get_current_price(self): """获取当前最新价格(带节流:间隔内返回缓存值)""" now = time.time() if self._price_cache is not None and (now - self._last_price_fetch) < self._position_refresh_interval: return self._price_cache try: end_time = int(now) response = self.contractAPI.get_kline( contract_symbol=self.contract_symbol, step=1, # 1分钟 start_time=end_time - 3600 * 1, # 取最近1小时 end_time=end_time )[0] if response['code'] == 1000: price = float(response['data'][-1]["close_price"]) self._price_cache = price self._last_price_fetch = now return price return self._price_cache # 失败时用旧价格 except Exception as e: logger.error(f"获取价格异常: {e}") return self._price_cache def get_available_balance(self): """获取合约账户可用USDT余额""" try: response = self.contractAPI.get_assets_detail()[0] if response['code'] == 1000: data = response['data'] if isinstance(data, dict): return float(data.get('available_balance', 0)) elif isinstance(data, list): for asset in data: if asset.get('currency') == 'USDT': return float(asset.get('available_balance', 0)) return None except Exception as e: logger.error(f"余额查询异常: {e}") return None def get_position_status(self): """获取当前持仓方向(带节流:间隔内返回缓存状态)""" now = time.time() if self._position_cache is not None and (now - self._last_position_fetch) < self._position_refresh_interval: c = self._position_cache self.start = c['start'] self.open_avg_price = c.get('open_avg_price') self.current_amount = c.get('current_amount') self.unrealized_pnl = c.get('unrealized_pnl') return True try: response = self.contractAPI.get_position(contract_symbol=self.contract_symbol)[0] if response['code'] == 1000: positions = response['data'] if not positions: self.start = 0 self.open_avg_price = None self.current_amount = None self.unrealized_pnl = None self._position_cache = {'start': 0, 'open_avg_price': None, 'current_amount': None, 'unrealized_pnl': None} self._last_position_fetch = now return True pos = positions[0] self.start = 1 if pos['position_type'] == 1 else -1 self.open_avg_price = float(pos['open_avg_price']) self.current_amount = float(pos['current_amount']) self.position_cross = pos["position_cross"] self.unrealized_pnl = float(pos.get('unrealized_value', 0)) self._position_cache = {'start': self.start, 'open_avg_price': self.open_avg_price, 'current_amount': self.current_amount, 'unrealized_pnl': self.unrealized_pnl} self._last_position_fetch = now logger.debug(f"持仓详情: 方向={self.start}, 开仓均价={self.open_avg_price}, " f"持仓量={self.current_amount}, 未实现盈亏={self.unrealized_pnl:.2f}") return True else: if self._position_cache is not None: c = self._position_cache self.start, self.open_avg_price = c['start'], c.get('open_avg_price') self.current_amount, self.unrealized_pnl = c.get('current_amount'), c.get('unrealized_pnl') return True # 请求失败但用缓存 return False except Exception as e: logger.error(f"持仓查询异常: {e}") if self._position_cache is not None: c = self._position_cache self.start, self.open_avg_price = c['start'], c.get('open_avg_price') self.current_amount, self.unrealized_pnl = c.get('current_amount'), c.get('unrealized_pnl') return True # 429 等异常时用缓存 return False def get_unrealized_pnl_usd(self): """ 获取当前持仓未实现盈亏(美元),直接使用API返回值 """ if self.start == 0 or self.unrealized_pnl is None: return None return self.unrealized_pnl def set_leverage(self): """程序启动时设置全仓 + 高杠杆""" try: response = self.contractAPI.post_submit_leverage( contract_symbol=self.contract_symbol, leverage=self.leverage, open_type=self.open_type )[0] if response['code'] == 1000: logger.success(f"全仓模式 + {self.leverage}x 杠杆设置成功") return True else: logger.error(f"杠杆设置失败: {response}") return False except Exception as e: logger.error(f"设置杠杆异常: {e}") return False def openBrowser(self): """打开 TGE 对应浏览器实例""" try: bit_port = openBrowser(id=self.bit_id) co = ChromiumOptions() co.set_local_port(port=bit_port) self.page = ChromiumPage(addr_or_opts=co) return True except: return False def click_safe(self, xpath, sleep=0.5): """安全点击""" try: ele = self.page.ele(xpath) if not ele: return False # ele.scroll.to_see(center=True) # time.sleep(sleep) ele.click(by_js=True) return True except: return False def input_size(self, size: int): """在数量框输入张数(兼容受控组件:先聚焦、输入、再触发 input 事件)""" try: # 优先用 id,可能不是 input 而是 div/自定义组件 ele = self.page.ele('x://*[@id="size_0"]') if not ele: ele = self.page.ele('x://input[@id="size_0"]') if not ele: logger.warning("未找到数量框 size_0") return False time.sleep(0.3) ele.click(by_js=True) time.sleep(0.2) s = str(int(size)) # 清空后输入;若页面是 React/Vue 受控组件,再通过 JS 设值并触发事件 ele.input(s, clear=True) ele.run_js("this.value = arguments[0]; this.dispatchEvent(new Event('input', { bubbles: true }));", s) return True except Exception as e: logger.warning(f"输入数量失败: {e}") return False def 平仓(self): """平仓操作(优先显式平仓按钮,失败时回退到反向市价)""" self.click_safe('x://span[normalize-space(text()) ="市价"]') # 优先点击页面上的平仓按钮,避免误开反向仓 close_selectors = [ 'x://span[contains(normalize-space(text()),"一键平仓")]', 'x://button[contains(normalize-space(text()),"一键平仓")]', 'x://span[contains(normalize-space(text()),"全部平仓")]', 'x://button[contains(normalize-space(text()),"全部平仓")]', 'x://span[contains(normalize-space(text()),"平多")]', 'x://button[contains(normalize-space(text()),"平多")]', 'x://span[contains(normalize-space(text()),"平空")]', 'x://button[contains(normalize-space(text()),"平空")]', ] for xpath in close_selectors: if self.click_safe(xpath): logger.info(f"平仓点击成功: {xpath}") return True # 回退:若没有显式平仓按钮,使用反向市价尝试平仓 if self.start == 1 and self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]'): logger.info("未找到显式平仓按钮,使用反向市价卖出平多") return True if self.start == -1 and self.click_safe('x://span[normalize-space(text()) ="买入/做多"]'): logger.info("未找到显式平仓按钮,使用反向市价买入平空") return True logger.warning("平仓操作未触发任何按钮") return False def 开单(self, marketPriceLongOrder=0, limitPriceShortOrder=0, size=None, price=None): """ marketPriceLongOrder 市价做多或者做空,1是做多,-1是做空 limitPriceShortOrder 限价做多或者做空 """ if size is not None: self.input_size(size) time.sleep(0.2) if marketPriceLongOrder == -1: # self.click_safe('x://button[normalize-space(text()) ="市价"]') # self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') elif marketPriceLongOrder == 1: # self.click_safe('x://button[normalize-space(text()) ="市价"]') # self.page.ele('x://*[@id="size_0"]').input(vals=size, clear=True) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') if limitPriceShortOrder == -1: self.click_safe('x://button[normalize-space(text()) ="限价"]') self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) time.sleep(1) self.page.ele('x://*[@id="size_0"]').input(1) self.click_safe('x://span[normalize-space(text()) ="卖出/做空"]') elif limitPriceShortOrder == 1: self.click_safe('x://button[normalize-space(text()) ="限价"]') self.page.ele('x://*[@id="price_0"]').input(vals=price, clear=True) time.sleep(1) self.page.ele('x://*[@id="size_0"]').input(1) self.click_safe('x://span[normalize-space(text()) ="买入/做多"]') def ding(self, text, error=False): """日志通知""" if error: logger.error(text) else: logger.info(text) def calculate_entity(self, kline): """计算K线实体大小(绝对值)""" return abs(kline['close'] - kline['open']) def calculate_upper_shadow(self, kline): """计算上阴线(上影线)涨幅百分比""" # 上阴线 = (最高价 - max(开盘价, 收盘价)) / max(开盘价, 收盘价) body_top = max(kline['open'], kline['close']) if body_top == 0: return 0 return (kline['high'] - body_top) / body_top * 100 def calculate_lower_shadow(self, kline): """计算下阴线(下影线)跌幅百分比""" # 下阴线 = (min(开盘价, 收盘价) - 最低价) / min(开盘价, 收盘价) body_bottom = min(kline['open'], kline['close']) if body_bottom == 0: return 0 return (body_bottom - kline['low']) / body_bottom * 100 def get_entity_edge(self, kline): """获取K线实体边(收盘价或开盘价,取决于是阳线还是阴线)""" # 阳线(收盘>开盘):实体上边=收盘价,实体下边=开盘价 # 阴线(收盘<开盘):实体上边=开盘价,实体下边=收盘价 return { 'upper': max(kline['open'], kline['close']), # 实体上边 'lower': min(kline['open'], kline['close']) # 实体下边 } def check_signal(self, current_price, prev_kline, current_kline): """ 检查交易信号 返回: ('long', trigger_price) / ('short', trigger_price) / None """ # 计算上一根K线实体 prev_entity = self.calculate_entity(prev_kline) # 实体过小不交易(实体 < 0.1) if prev_entity < 0.1: logger.info(f"上一根K线实体过小: {prev_entity:.4f},跳过信号检测") return None # 获取上一根K线的实体上下边 prev_entity_edge = self.get_entity_edge(prev_kline) prev_entity_upper = prev_entity_edge['upper'] # 实体上边 prev_entity_lower = prev_entity_edge['lower'] # 实体下边 # 优化:以下两种情况以当前这根的开盘价作为计算基准 # 1) 上一根阳线 且 当前开盘价 > 上一根收盘价(跳空高开) # 2) 上一根阴线 且 当前开盘价 < 上一根收盘价(跳空低开) prev_is_bullish_for_calc = prev_kline['close'] > prev_kline['open'] prev_is_bearish_for_calc = prev_kline['close'] < prev_kline['open'] current_open_above_prev_close = current_kline['open'] > prev_kline['close'] current_open_below_prev_close = current_kline['open'] < prev_kline['close'] use_current_open_as_base = (prev_is_bullish_for_calc and current_open_above_prev_close) or (prev_is_bearish_for_calc and current_open_below_prev_close) if use_current_open_as_base: # 以当前K线开盘价为基准计算(跳空时用当前开盘价参与计算) calc_lower = current_kline['open'] calc_upper = current_kline['open'] # 同一基准,上下四分之一对称 long_trigger = calc_lower + prev_entity / 4 short_trigger = calc_upper - prev_entity / 4 long_breakout = calc_upper + prev_entity / 4 short_breakout = calc_lower - prev_entity / 4 else: # 原有计算方式 long_trigger = prev_entity_lower + prev_entity / 4 # 做多触发价 = 实体下边 + 实体/4(下四分之一处) short_trigger = prev_entity_upper - prev_entity / 4 # 做空触发价 = 实体上边 - 实体/4(上四分之一处) long_breakout = prev_entity_upper + prev_entity / 4 # 做多突破价 = 实体上边 + 实体/4 short_breakout = prev_entity_lower - prev_entity / 4 # 做空突破价 = 实体下边 - 实体/4 # 上一根阴线 + 当前阳线:做多形态,不按上一根K线上三分之一做空 prev_is_bearish = prev_kline['close'] < prev_kline['open'] current_is_bullish = current_kline['close'] > current_kline['open'] skip_short_by_upper_third = prev_is_bearish and current_is_bullish # 上一根阳线 + 当前阴线:做空形态,不按上一根K线下三分之一做多 prev_is_bullish = prev_kline['close'] > prev_kline['open'] current_is_bearish = current_kline['close'] < current_kline['open'] skip_long_by_lower_third = prev_is_bullish and current_is_bearish if use_current_open_as_base: if prev_is_bullish_for_calc and current_open_above_prev_close: logger.info(f"上一根阳线且当前开盘价({current_kline['open']:.2f})>上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算") else: logger.info(f"上一根阴线且当前开盘价({current_kline['open']:.2f})<上一根收盘价({prev_kline['close']:.2f}),以当前开盘价为基准计算") logger.info(f"当前价格: {current_price:.2f}, 上一根实体: {prev_entity:.4f}") logger.info(f"上一根实体上边: {prev_entity_upper:.2f}, 下边: {prev_entity_lower:.2f}") logger.info(f"做多触发价(下1/4): {long_trigger:.2f}, 做空触发价(上1/4): {short_trigger:.2f}") logger.info(f"突破做多价(上1/4外): {long_breakout:.2f}, 突破做空价(下1/4外): {short_breakout:.2f}") if skip_short_by_upper_third: logger.info("上一根阴线+当前阳线(做多形态),不按上四分之一做空") if skip_long_by_lower_third: logger.info("上一根阳线+当前阴线(做空形态),不按下四分之一做多") # 无持仓时检查开仓信号 if self.start == 0: if current_price >= long_breakout and not skip_long_by_lower_third: logger.info(f"触发做多信号!价格 {current_price:.2f} >= 突破价(上1/4外) {long_breakout:.2f}") return ('long', long_breakout) elif current_price <= short_breakout and not skip_short_by_upper_third: logger.info(f"触发做空信号!价格 {current_price:.2f} <= 突破价(下1/4外) {short_breakout:.2f}") return ('short', short_breakout) # 持仓时检查反手信号 elif self.start == 1: # 持多仓 # 反手条件1: 价格跌到上一根K线的上三分之一处(做空触发价);上一根阴线+当前阳线做多时跳过 if current_price <= short_trigger and not skip_short_by_upper_third: logger.info(f"持多反手做空!价格 {current_price:.2f} <= 触发价(上1/4) {short_trigger:.2f}") return ('reverse_short', short_trigger) # 反手条件2: 上一根K线上阴线涨幅>0.01%,当前跌到上一根实体下边 upper_shadow_pct = self.calculate_upper_shadow(prev_kline) if upper_shadow_pct > 0.01 and current_price <= prev_entity_lower: logger.info(f"持多反手做空!上阴线涨幅 {upper_shadow_pct:.4f}% > 0.01%," f"价格 {current_price:.2f} <= 实体下边 {prev_entity_lower:.2f}") return ('reverse_short', prev_entity_lower) elif self.start == -1: # 持空仓 # 反手条件1: 价格涨到上一根K线的下三分之一处(做多触发价);上一根阳线+当前阴线做空时跳过 if current_price >= long_trigger and not skip_long_by_lower_third: logger.info(f"持空反手做多!价格 {current_price:.2f} >= 触发价(下1/4) {long_trigger:.2f}") return ('reverse_long', long_trigger) # 反手条件2: 上一根K线下阴线跌幅>0.01%,当前涨到上一根实体上边 lower_shadow_pct = self.calculate_lower_shadow(prev_kline) if lower_shadow_pct > 0.01 and current_price >= prev_entity_upper: logger.info(f"持空反手做多!下阴线跌幅 {lower_shadow_pct:.4f}% > 0.01%," f"价格 {current_price:.2f} >= 实体上边 {prev_entity_upper:.2f}") return ('reverse_long', prev_entity_upper) return None def can_open(self, current_kline_id): """开仓前过滤:同一根 K 线只开一次 + 开仓冷却时间。仅用于 long/short 新开仓。""" now = time.time() if self.last_open_kline_id is not None and self.last_open_kline_id == current_kline_id: logger.info(f"开仓频率控制:本 K 线({current_kline_id})已开过仓,跳过") return False if self.last_open_time is not None and now - self.last_open_time < self.open_cooldown_seconds: remain = self.open_cooldown_seconds - (now - self.last_open_time) logger.info(f"开仓冷却中,剩余 {remain:.0f} 秒") return False return True def can_reverse(self, current_price, trigger_price=None): """反手前过滤:冷却时间 + 最小价差""" now = time.time() if self.last_reverse_time and now - self.last_reverse_time < self.reverse_cooldown_seconds: remain = self.reverse_cooldown_seconds - (now - self.last_reverse_time) logger.info(f"反手冷却中,剩余 {remain:.0f} 秒") return False if trigger_price is not None and trigger_price > 0: move_pct = abs(current_price - trigger_price) / trigger_price * 100 # 触发价与现价相同(或近似相同)时,不做最小价差过滤,避免策略被永久拦截 if abs(current_price - trigger_price) < 1e-9: logger.debug("反手价差过滤跳过:触发价与现价一致") elif move_pct < self.reverse_min_move_pct: logger.info(f"反手价差不足: {move_pct:.4f}% < {self.reverse_min_move_pct}%") return False return True def verify_no_position(self, max_retries=5, retry_interval=3): """ 验证当前无持仓 返回: True 表示无持仓可以开仓,False 表示有持仓不能开仓 """ for i in range(max_retries): if self.get_position_status(): if self.start == 0: logger.info(f"确认无持仓,可以开仓") return True else: logger.warning( f"仍有持仓 (方向: {self.start}),等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})") time.sleep(retry_interval) else: logger.warning(f"查询持仓状态失败,等待 {retry_interval} 秒后重试 ({i + 1}/{max_retries})") time.sleep(retry_interval) logger.error(f"经过 {max_retries} 次重试仍有持仓或查询失败,放弃开仓") return False def verify_position_direction(self, expected_direction): """ 验证当前持仓方向是否与预期一致 expected_direction: 1 多仓, -1 空仓 返回: True 表示持仓方向正确,False 表示不正确 """ if self.get_position_status(): if self.start == expected_direction: logger.info(f"持仓方向验证成功: {self.start}") return True else: logger.warning(f"持仓方向不符: 期望 {expected_direction}, 实际 {self.start}") return False else: logger.error("查询持仓状态失败") return False def execute_trade(self, signal, size=None): """执行交易。size 不传或为 None 时使用 default_order_size。""" signal_type, trigger_price = signal size = self.default_order_size if size is None else size trigger_price_text = f"{trigger_price:.2f}" if isinstance(trigger_price, (int, float)) else "N/A" if signal_type == 'long': # 开多前先确认无持仓 logger.info(f"准备开多,触发价: {trigger_price_text}") if not self.get_position_status(): logger.error("开仓前查询持仓状态失败,放弃开仓") return False if self.start != 0: logger.warning(f"开多前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓") return False logger.info(f"确认无持仓,执行开多") self.开单(marketPriceLongOrder=1, size=size) time.sleep(3) # 等待订单执行 # 验证开仓是否成功 if self.verify_position_direction(1): self.max_unrealized_pnl_seen = None # 新仓位重置移动止损记录 self.last_open_time = time.time() self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None) self._last_position_fetch = 0 # 强制下一轮重新拉持仓 logger.success("开多成功") return True else: logger.error("开多后持仓验证失败") return False elif signal_type == 'short': # 开空前先确认无持仓 logger.info(f"准备开空,触发价: {trigger_price_text}") if not self.get_position_status(): logger.error("开仓前查询持仓状态失败,放弃开仓") return False if self.start != 0: logger.warning(f"开空前发现已有持仓 (方向: {self.start}),放弃开仓避免双向持仓") return False logger.info(f"确认无持仓,执行开空") self.开单(marketPriceLongOrder=-1, size=size) time.sleep(3) # 等待订单执行 # 验证开仓是否成功 if self.verify_position_direction(-1): self.max_unrealized_pnl_seen = None # 新仓位重置移动止损记录 self.last_open_time = time.time() self.last_open_kline_id = getattr(self, "_current_kline_id_for_open", None) self._last_position_fetch = 0 # 强制下一轮重新拉持仓 logger.success("开空成功") return True else: logger.error("开空后持仓验证失败") return False elif signal_type == 'reverse_long': # 平空 + 开多(反手做多):先平仓,确认无仓后再开多,避免双向持仓 logger.info(f"执行反手做多,触发价: {trigger_price_text}") if not self.平仓(): logger.warning("反手做多:平仓按钮未触发,放弃本次反手") return False time.sleep(1) # 给交易所处理平仓的时间 # 轮询确认已无持仓再开多(最多等约 10 秒) for _ in range(10): if self.get_position_status() and self.start == 0: break time.sleep(1) if self.start != 0: logger.warning("反手做多:平仓后仍有持仓,放弃本次开多") return False logger.info("已确认无持仓,执行开多") self.开单(marketPriceLongOrder=1, size=size) time.sleep(3) if self.verify_position_direction(1): self.max_unrealized_pnl_seen = None self._last_position_fetch = 0 # 强制下一轮重新拉持仓 logger.success("反手做多成功") self.last_reverse_time = time.time() time.sleep(20) return True else: logger.error("反手做多后持仓验证失败") return False elif signal_type == 'reverse_short': # 平多 + 开空(反手做空):先平仓,确认无仓后再开空 logger.info(f"执行反手做空,触发价: {trigger_price_text}") if not self.平仓(): logger.warning("反手做空:平仓按钮未触发,放弃本次反手") return False time.sleep(1) for _ in range(10): if self.get_position_status() and self.start == 0: break time.sleep(1) if self.start != 0: logger.warning("反手做空:平仓后仍有持仓,放弃本次开空") return False logger.info("已确认无持仓,执行开空") self.开单(marketPriceLongOrder=-1, size=size) time.sleep(3) if self.verify_position_direction(-1): self.max_unrealized_pnl_seen = None self._last_position_fetch = 0 # 强制下一轮重新拉持仓 logger.success("反手做空成功") self.last_reverse_time = time.time() time.sleep(20) return True else: logger.error("反手做空后持仓验证失败") return False return False def action(self): """主循环""" logger.info("开始运行方案B(AI 策略)交易...") # 启动时设置全仓高杠杆 if not self.set_leverage(): logger.error("杠杆设置失败,程序继续运行但可能下单失败") return page_start = True while True: if page_start: # 打开浏览器 for i in range(5): if self.openBrowser(): logger.info("浏览器打开成功") break else: self.ding("打开浏览器失败!", error=True) return # 进入交易页面 self.page.get("https://derivatives.bitmart.com/zh-CN/futures/ETHUSDT") time.sleep(1) # 等待交易表单渲染 self.click_safe('x://button[normalize-space(text()) ="市价"]') time.sleep(0.3) self.input_size(self.default_order_size) page_start = False try: # 1. 获取当前价格 current_price = self.get_current_price() if not current_price: logger.warning("获取价格失败,等待重试...") time.sleep(2) continue # 2. 每次循环都通过SDK获取真实持仓状态(避免状态不同步导致双向持仓) if not self.get_position_status(): logger.warning("获取持仓状态失败,等待重试...") time.sleep(2) continue logger.debug(f"当前持仓状态: {self.start} (0=无, 1=多, -1=空)") # 3. 止损/止盈/移动止损 if self.start != 0: pnl_usd = self.get_unrealized_pnl_usd() if pnl_usd is not None: # 固定止损:亏损达到 3 美元平仓 if pnl_usd <= self.stop_loss_usd: logger.info(f"仓位亏损 {pnl_usd:.2f} 美元 <= 止损 {self.stop_loss_usd} 美元,执行止损平仓") self.平仓() self.max_unrealized_pnl_seen = None time.sleep(3) continue # 更新持仓期间最大盈利(用于移动止损) if self.max_unrealized_pnl_seen is None: self.max_unrealized_pnl_seen = pnl_usd else: self.max_unrealized_pnl_seen = max(self.max_unrealized_pnl_seen, pnl_usd) # 移动止损:盈利曾达到 activation 后,从最高盈利回撤 trailing_distance 则平仓 if self.max_unrealized_pnl_seen >= self.trailing_activation_usd: if pnl_usd < self.max_unrealized_pnl_seen - self.trailing_distance_usd: logger.info(f"移动止损:当前盈利 {pnl_usd:.2f} 从最高 {self.max_unrealized_pnl_seen:.2f} 回撤 >= {self.trailing_distance_usd} 美元,平仓") self.平仓() self.max_unrealized_pnl_seen = None time.sleep(3) continue # 止盈:盈利达到 take_profit_usd 平仓 if pnl_usd >= self.take_profit_usd: logger.info(f"仓位盈利 {pnl_usd:.2f} 美元 >= {self.take_profit_usd} 美元,执行止盈平仓") self.平仓() self.max_unrealized_pnl_seen = None time.sleep(3) continue # 4. 方案B 开单:新 15m K 线取一次 AI 信号 -> 转为开仓/反手 -> 下方执行开单 current_15m_id = int(time.time() // 900) * 900 # 15 分钟 bar 起始时间戳 signal = None if current_15m_id != self.last_kline_time: self.last_kline_time = current_15m_id logger.info(f"进入新 15m K 线: {current_15m_id}") raw = get_live_signal(period=15) # 方案B:0=观望 1=做多 2=做空 logger.info(f"方案B 信号: {raw} (0=观望 1=做多 2=做空), 当前持仓: {self.start}") if raw == 1: if self.start == 0: signal = ('long', current_price) elif self.start == -1: reverse_trigger = self.open_avg_price if self.open_avg_price else None signal = ('reverse_long', reverse_trigger) elif raw == 2: if self.start == 0: signal = ('short', current_price) elif self.start == 1: reverse_trigger = self.open_avg_price if self.open_avg_price else None signal = ('reverse_short', reverse_trigger) # 5. 反手过滤:冷却时间 + 最小价差 if signal and signal[0].startswith('reverse_'): if not self.can_reverse(current_price, signal[1]): signal = None # 5.5 开仓频率过滤:同一根 15m K 线只开一次 + 开仓冷却 if signal and signal[0] in ('long', 'short'): if not self.can_open(current_15m_id): signal = None else: self._current_kline_id_for_open = current_15m_id # 供 execute_trade 成功后记录 # 6. 方案B 开单:有信号则执行(开多/开空/反手多/反手空) if signal: trade_success = self.execute_trade(signal) if trade_success: logger.success(f"交易执行完成: {signal[0]}, 当前持仓状态: {self.start}") page_start = True else: logger.warning(f"交易执行失败或被阻止: {signal[0]}") # 主循环间隔,避免持仓/价格查询过于频繁触发 429 time.sleep(self.loop_interval_seconds) if page_start: self.page.close() time.sleep(5) except KeyboardInterrupt: logger.info("用户中断,程序退出") break except Exception as e: logger.error(f"主循环异常: {e}") time.sleep(5) if __name__ == '__main__': BitmartFuturesTransaction(bit_id="f2320f57e24c45529a009e1541e25961").action()