This commit is contained in:
27942
2025-10-30 09:58:17 +08:00
parent 007b8a412d
commit 3863b12562

230
test1.py
View File

@@ -1,11 +1,229 @@
import json
import time
import requests
from tqdm import tqdm
from loguru import logger
from DrissionPage import ChromiumOptions, ChromiumPage
pbar = tqdm(total=30,desc="正在执行任务",ncols=150)
# ==============================
# 全局配置
# ==============================
BIT_URL = "http://127.0.0.1:54345"
SYMBOL = "ETH-SUSDT"
TRADE_URL = f"https://www.weeaxs.site/zh-CN/futures/demo-trading/{SYMBOL}"
TRADE_AMOUNT = 100
REFRESH_INTERVAL = 10 # 循环检查间隔
TRADE_INTERVAL = 30 # 每30分钟检查一次K线
RETRY_LIMIT = 3
for i in range(31): # 包含0~30
pbar.n = i # 直接设置进度
pbar.refresh() # 刷新显示
time.sleep(0.2)
pbar.close()
# ==============================
# 工具函数
# ==============================
def safe_run(func):
"""装饰器:捕获函数异常并记录"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.exception(f"[错误] {func.__name__} 执行失败: {e}")
return None
return wrapper
def is_bullish(candle): # 阳线
return float(candle['close']) > float(candle['open'])
def is_bearish(candle): # 阴线
return float(candle['close']) < float(candle['open'])
def check_engulf_signal(prev, curr):
"""包住形态信号判定"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long", "bear_bull_engulf"
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short", "bull_bear_engulf"
return None, None
# ==============================
# 主类WeexTransaction
# ==============================
class WeexTransaction:
def __init__(self, bit_id):
self.bit_id = bit_id
self.bit_port = None
self.page: ChromiumPage = None
self.position = 0 # -1: 空单, 0: 无仓, 1: 多单
self.direction = None
self.pbar = tqdm(total=TRADE_INTERVAL, desc="等待交易时间", ncols=80)
# ------------------------------
# 浏览器控制
# ------------------------------
@safe_run
def open_browser(self):
res = requests.post(
f"{BIT_URL}/browser/open",
data=json.dumps({"id": self.bit_id}),
headers={"Content-Type": "application/json"}
).json()
self.bit_port = res["data"]["http"].split(":")[1]
logger.success(f"比特浏览器启动成功,端口:{self.bit_port}")
return True
@safe_run
def attach_browser(self):
co = ChromiumOptions()
co.set_local_port(self.bit_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
self.page.get(TRADE_URL)
logger.success("成功接管比特浏览器!")
return True
# ------------------------------
# 数据处理
# ------------------------------
@safe_run
def get_kline_data(self):
"""监听并解析K线数据"""
for i in range(RETRY_LIMIT):
logger.info(f"获取最新K线数据{i + 1} 次尝试...")
self.page.refresh()
res = self.page.listen.wait(timeout=15)
if not res:
continue
try:
datas = [{
'id': int(d[4]),
'open': float(d[3]),
'high': float(d[1]),
'low': float(d[2]),
'close': float(d[0])
} for d in res.response.body['data']["dataList"]]
return sorted(datas, key=lambda x: x["id"])
except Exception as e:
logger.error(f"K线数据解析失败: {e}")
return None
# ------------------------------
# 交易逻辑
# ------------------------------
@safe_run
def detect_position(self):
"""检测当前持仓状态"""
try:
if self.page.ele('x://section[text()="仓位(1)"]', timeout=5):
text = self.page.ele(
'x://*[@id="tradeLimitContainerJS"]/div[2]/div/div[1]/div/div/div/div[5]/div/div/div[2]'
).text
if f"{SYMBOL}" in text:
self.position = 1
elif f"{SYMBOL}" in text:
self.position = -1
else:
self.position = 0
else:
self.position = 0
except:
self.position = 0
logger.info(f"当前持仓状态: {self.position}")
return self.position
@safe_run
def execute_trade(self):
"""执行买卖操作"""
if not self.direction:
return
page = self.page
# 设置交易金额
page.ele('x://*[@id="guide-order-type"]/div[2]/div/div[2]', timeout=15).click()
time.sleep(1)
page.ele('x://*[@id="guide-order-type"]/div[5]/div/div[2]/div[1]/input[2]').input(TRADE_AMOUNT)
time.sleep(1)
if self.direction == "long" and self.position <= 0:
logger.info("执行开多...")
if self.position == -1:
self.close_position()
page.ele('x://span[normalize-space()="买入开多"]').click()
elif self.direction == "short" and self.position >= 0:
logger.info("执行开空...")
if self.position == 1:
self.close_position()
page.ele('x://span[normalize-space()="卖出开空"]').click()
@safe_run
def close_position(self):
"""平仓"""
logger.info("执行平仓操作...")
ele = self.page.ele('x://*[contains(text(), "闪电平仓")]')
if ele:
ele.scroll.to_see(center=True)
ele.click()
time.sleep(2)
# ------------------------------
# 主循环
# ------------------------------
def action(self):
if not self.open_browser():
return logger.error("打开比特浏览器失败")
if not self.attach_browser():
return logger.error("接管比特浏览器失败")
self.page.listen.start("public/quote/v1/getKlineV2")
logger.info("已开始监听K线数据...")
while True:
now = time.localtime()
minute = now.tm_min
self.pbar.n = minute % TRADE_INTERVAL
self.pbar.refresh()
if minute % TRADE_INTERVAL == 0:
self.detect_position()
kline = self.get_kline_data()
if not kline or len(kline) < 3:
logger.warning("K线数据不足")
time.sleep(REFRESH_INTERVAL)
continue
k1, k2 = kline[-2], kline[-1]
# 平仓条件
if self.position == 1 and is_bearish(k1) and is_bearish(k2):
self.close_position()
elif self.position == -1 and is_bullish(k1) and is_bullish(k2):
self.close_position()
# 检查信号
self.direction, signal = check_engulf_signal(k1, k2)
if self.direction:
logger.success(f"检测到信号: {self.direction} ({signal})")
self.execute_trade()
self.pbar.reset()
time.sleep(REFRESH_INTERVAL)
# ==============================
# 启动
# ==============================
if __name__ == '__main__':
WeexTransaction("8dcb4f744cf64ab190e465e153088515").action()