diff --git a/test1.py b/test1.py index 1e6510d..48691ba 100644 --- a/test1.py +++ b/test1.py @@ -1,11 +1,229 @@ +import json import time +import requests from tqdm import tqdm +from loguru import logger +from DrissionPage import ChromiumOptions, ChromiumPage -pbar = tqdm(total=30,desc="正在执行任务",ncols=150) +# ============================== +# 全局配置 +# ============================== +BIT_URL = "http://127.0.0.1:54345" +SYMBOL = "ETH-SUSDT" +TRADE_URL = f"https://www.weeaxs.site/zh-CN/futures/demo-trading/{SYMBOL}" +TRADE_AMOUNT = 100 +REFRESH_INTERVAL = 10 # 循环检查间隔 +TRADE_INTERVAL = 30 # 每30分钟检查一次K线 +RETRY_LIMIT = 3 -for i in range(31): # 包含0~30 - pbar.n = i # 直接设置进度 - pbar.refresh() # 刷新显示 - time.sleep(0.2) -pbar.close() +# ============================== +# 工具函数 +# ============================== +def safe_run(func): + """装饰器:捕获函数异常并记录""" + + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.exception(f"[错误] {func.__name__} 执行失败: {e}") + return None + + return wrapper + + +def is_bullish(candle): # 阳线 + return float(candle['close']) > float(candle['open']) + + +def is_bearish(candle): # 阴线 + return float(candle['close']) < float(candle['open']) + + +def check_engulf_signal(prev, curr): + """包住形态信号判定""" + p_open, p_close = float(prev['open']), float(prev['close']) + c_open, c_close = float(curr['open']), float(curr['close']) + + if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open: + return "long", "bear_bull_engulf" + + if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open: + return "short", "bull_bear_engulf" + + return None, None + + +# ============================== +# 主类:WeexTransaction +# ============================== +class WeexTransaction: + def __init__(self, bit_id): + self.bit_id = bit_id + self.bit_port = None + self.page: ChromiumPage = None + self.position = 0 # -1: 空单, 0: 无仓, 1: 多单 + self.direction = None + self.pbar = tqdm(total=TRADE_INTERVAL, desc="等待交易时间", ncols=80) + + # ------------------------------ + # 浏览器控制 + # ------------------------------ + @safe_run + def open_browser(self): + res = requests.post( + f"{BIT_URL}/browser/open", + data=json.dumps({"id": self.bit_id}), + headers={"Content-Type": "application/json"} + ).json() + self.bit_port = res["data"]["http"].split(":")[1] + logger.success(f"比特浏览器启动成功,端口:{self.bit_port}") + return True + + @safe_run + def attach_browser(self): + co = ChromiumOptions() + co.set_local_port(self.bit_port) + self.page = ChromiumPage(addr_or_opts=co) + self.page.set.window.max() + self.page.get(TRADE_URL) + logger.success("成功接管比特浏览器!") + return True + + # ------------------------------ + # 数据处理 + # ------------------------------ + @safe_run + def get_kline_data(self): + """监听并解析K线数据""" + for i in range(RETRY_LIMIT): + logger.info(f"获取最新K线数据,第 {i + 1} 次尝试...") + self.page.refresh() + res = self.page.listen.wait(timeout=15) + if not res: + continue + + try: + datas = [{ + 'id': int(d[4]), + 'open': float(d[3]), + 'high': float(d[1]), + 'low': float(d[2]), + 'close': float(d[0]) + } for d in res.response.body['data']["dataList"]] + return sorted(datas, key=lambda x: x["id"]) + except Exception as e: + logger.error(f"K线数据解析失败: {e}") + return None + + # ------------------------------ + # 交易逻辑 + # ------------------------------ + @safe_run + def detect_position(self): + """检测当前持仓状态""" + try: + if self.page.ele('x://section[text()="仓位(1)"]', timeout=5): + text = self.page.ele( + 'x://*[@id="tradeLimitContainerJS"]/div[2]/div/div[1]/div/div/div/div[5]/div/div/div[2]' + ).text + if f"{SYMBOL} 多" in text: + self.position = 1 + elif f"{SYMBOL} 空" in text: + self.position = -1 + else: + self.position = 0 + else: + self.position = 0 + except: + self.position = 0 + logger.info(f"当前持仓状态: {self.position}") + return self.position + + @safe_run + def execute_trade(self): + """执行买卖操作""" + if not self.direction: + return + + page = self.page + + # 设置交易金额 + page.ele('x://*[@id="guide-order-type"]/div[2]/div/div[2]', timeout=15).click() + time.sleep(1) + page.ele('x://*[@id="guide-order-type"]/div[5]/div/div[2]/div[1]/input[2]').input(TRADE_AMOUNT) + time.sleep(1) + + if self.direction == "long" and self.position <= 0: + logger.info("执行开多...") + if self.position == -1: + self.close_position() + page.ele('x://span[normalize-space()="买入开多"]').click() + + elif self.direction == "short" and self.position >= 0: + logger.info("执行开空...") + if self.position == 1: + self.close_position() + page.ele('x://span[normalize-space()="卖出开空"]').click() + + @safe_run + def close_position(self): + """平仓""" + logger.info("执行平仓操作...") + ele = self.page.ele('x://*[contains(text(), "闪电平仓")]') + if ele: + ele.scroll.to_see(center=True) + ele.click() + time.sleep(2) + + # ------------------------------ + # 主循环 + # ------------------------------ + def action(self): + if not self.open_browser(): + return logger.error("打开比特浏览器失败") + if not self.attach_browser(): + return logger.error("接管比特浏览器失败") + + self.page.listen.start("public/quote/v1/getKlineV2") + logger.info("已开始监听K线数据...") + + while True: + now = time.localtime() + minute = now.tm_min + self.pbar.n = minute % TRADE_INTERVAL + self.pbar.refresh() + + if minute % TRADE_INTERVAL == 0: + self.detect_position() + kline = self.get_kline_data() + if not kline or len(kline) < 3: + logger.warning("K线数据不足") + time.sleep(REFRESH_INTERVAL) + continue + + k1, k2 = kline[-2], kline[-1] + + # 平仓条件 + if self.position == 1 and is_bearish(k1) and is_bearish(k2): + self.close_position() + elif self.position == -1 and is_bullish(k1) and is_bullish(k2): + self.close_position() + + # 检查信号 + self.direction, signal = check_engulf_signal(k1, k2) + if self.direction: + logger.success(f"检测到信号: {self.direction} ({signal})") + self.execute_trade() + + self.pbar.reset() + + time.sleep(REFRESH_INTERVAL) + + +# ============================== +# 启动 +# ============================== +if __name__ == '__main__': + WeexTransaction("8dcb4f744cf64ab190e465e153088515").action()