dededdew
This commit is contained in:
290
test1.py
290
test1.py
@@ -1,229 +1,71 @@
|
||||
import json
|
||||
import time
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
from loguru import logger
|
||||
from DrissionPage import ChromiumOptions, ChromiumPage
|
||||
import time
|
||||
from DrissionPage import ChromiumPage, ChromiumOptions
|
||||
|
||||
# ==============================
|
||||
# 全局配置
|
||||
# ==============================
|
||||
BIT_URL = "http://127.0.0.1:54345"
|
||||
SYMBOL = "ETH-SUSDT"
|
||||
TRADE_URL = f"https://www.weeaxs.site/zh-CN/futures/demo-trading/{SYMBOL}"
|
||||
TRADE_AMOUNT = 100
|
||||
REFRESH_INTERVAL = 10 # 循环检查间隔
|
||||
TRADE_INTERVAL = 30 # 每30分钟检查一次K线
|
||||
RETRY_LIMIT = 3
|
||||
# ========== 配置 ==========
|
||||
API_BASE_URL = "http://127.0.0.1:50326"
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# ==============================
|
||||
# 工具函数
|
||||
# ==============================
|
||||
def safe_run(func):
|
||||
"""装饰器:捕获函数异常并记录"""
|
||||
# # ========== 步骤 1: 创建浏览器环境 ==========
|
||||
# print("1. 创建浏览器环境...")
|
||||
# response = requests.post(
|
||||
# f"{API_BASE_URL}/api/browser/create",
|
||||
# headers=headers,
|
||||
# params = {
|
||||
# "current": 1,
|
||||
# "pageSize": 20,
|
||||
# "keyword": "test"
|
||||
# }
|
||||
# )
|
||||
#
|
||||
# env_id = response.json()["data"]["envId"]
|
||||
# print(f" 环境创建成功,ID: {env_id}")
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
logger.exception(f"[错误] {func.__name__} 执行失败: {e}")
|
||||
return None
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def is_bullish(candle): # 阳线
|
||||
return float(candle['close']) > float(candle['open'])
|
||||
|
||||
|
||||
def is_bearish(candle): # 阴线
|
||||
return float(candle['close']) < float(candle['open'])
|
||||
|
||||
|
||||
def check_engulf_signal(prev, curr):
|
||||
"""包住形态信号判定"""
|
||||
p_open, p_close = float(prev['open']), float(prev['close'])
|
||||
c_open, c_close = float(curr['open']), float(curr['close'])
|
||||
|
||||
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
|
||||
return "long", "bear_bull_engulf"
|
||||
|
||||
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
|
||||
return "short", "bull_bear_engulf"
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
# ==============================
|
||||
# 主类:WeexTransaction
|
||||
# ==============================
|
||||
class WeexTransaction:
|
||||
def __init__(self, bit_id):
|
||||
self.bit_id = bit_id
|
||||
self.bit_port = None
|
||||
self.page: ChromiumPage = None
|
||||
self.position = 0 # -1: 空单, 0: 无仓, 1: 多单
|
||||
self.direction = None
|
||||
self.pbar = tqdm(total=TRADE_INTERVAL, desc="等待交易时间", ncols=80)
|
||||
|
||||
# ------------------------------
|
||||
# 浏览器控制
|
||||
# ------------------------------
|
||||
@safe_run
|
||||
def open_browser(self):
|
||||
res = requests.post(
|
||||
f"{BIT_URL}/browser/open",
|
||||
data=json.dumps({"id": self.bit_id}),
|
||||
headers={"Content-Type": "application/json"}
|
||||
).json()
|
||||
self.bit_port = res["data"]["http"].split(":")[1]
|
||||
logger.success(f"比特浏览器启动成功,端口:{self.bit_port}")
|
||||
return True
|
||||
|
||||
@safe_run
|
||||
def attach_browser(self):
|
||||
co = ChromiumOptions()
|
||||
co.set_local_port(self.bit_port)
|
||||
self.page = ChromiumPage(addr_or_opts=co)
|
||||
self.page.set.window.max()
|
||||
self.page.get(TRADE_URL)
|
||||
logger.success("成功接管比特浏览器!")
|
||||
return True
|
||||
|
||||
# ------------------------------
|
||||
# 数据处理
|
||||
# ------------------------------
|
||||
@safe_run
|
||||
def get_kline_data(self):
|
||||
"""监听并解析K线数据"""
|
||||
for i in range(RETRY_LIMIT):
|
||||
logger.info(f"获取最新K线数据,第 {i + 1} 次尝试...")
|
||||
self.page.refresh()
|
||||
res = self.page.listen.wait(timeout=15)
|
||||
if not res:
|
||||
continue
|
||||
|
||||
try:
|
||||
datas = [{
|
||||
'id': int(d[4]),
|
||||
'open': float(d[3]),
|
||||
'high': float(d[1]),
|
||||
'low': float(d[2]),
|
||||
'close': float(d[0])
|
||||
} for d in res.response.body['data']["dataList"]]
|
||||
return sorted(datas, key=lambda x: x["id"])
|
||||
except Exception as e:
|
||||
logger.error(f"K线数据解析失败: {e}")
|
||||
return None
|
||||
|
||||
# ------------------------------
|
||||
# 交易逻辑
|
||||
# ------------------------------
|
||||
@safe_run
|
||||
def detect_position(self):
|
||||
"""检测当前持仓状态"""
|
||||
try:
|
||||
if self.page.ele('x://section[text()="仓位(1)"]', timeout=5):
|
||||
text = self.page.ele(
|
||||
'x://*[@id="tradeLimitContainerJS"]/div[2]/div/div[1]/div/div/div/div[5]/div/div/div[2]'
|
||||
).text
|
||||
if f"{SYMBOL} 多" in text:
|
||||
self.position = 1
|
||||
elif f"{SYMBOL} 空" in text:
|
||||
self.position = -1
|
||||
else:
|
||||
self.position = 0
|
||||
else:
|
||||
self.position = 0
|
||||
except:
|
||||
self.position = 0
|
||||
logger.info(f"当前持仓状态: {self.position}")
|
||||
return self.position
|
||||
|
||||
@safe_run
|
||||
def execute_trade(self):
|
||||
"""执行买卖操作"""
|
||||
if not self.direction:
|
||||
return
|
||||
|
||||
page = self.page
|
||||
|
||||
# 设置交易金额
|
||||
page.ele('x://*[@id="guide-order-type"]/div[2]/div/div[2]', timeout=15).click()
|
||||
time.sleep(1)
|
||||
page.ele('x://*[@id="guide-order-type"]/div[5]/div/div[2]/div[1]/input[2]').input(TRADE_AMOUNT)
|
||||
time.sleep(1)
|
||||
|
||||
if self.direction == "long" and self.position <= 0:
|
||||
logger.info("执行开多...")
|
||||
if self.position == -1:
|
||||
self.close_position()
|
||||
page.ele('x://span[normalize-space()="买入开多"]').click()
|
||||
|
||||
elif self.direction == "short" and self.position >= 0:
|
||||
logger.info("执行开空...")
|
||||
if self.position == 1:
|
||||
self.close_position()
|
||||
page.ele('x://span[normalize-space()="卖出开空"]').click()
|
||||
|
||||
@safe_run
|
||||
def close_position(self):
|
||||
"""平仓"""
|
||||
logger.info("执行平仓操作...")
|
||||
ele = self.page.ele('x://*[contains(text(), "闪电平仓")]')
|
||||
if ele:
|
||||
ele.scroll.to_see(center=True)
|
||||
ele.click()
|
||||
time.sleep(2)
|
||||
|
||||
# ------------------------------
|
||||
# 主循环
|
||||
# ------------------------------
|
||||
def action(self):
|
||||
if not self.open_browser():
|
||||
return logger.error("打开比特浏览器失败")
|
||||
if not self.attach_browser():
|
||||
return logger.error("接管比特浏览器失败")
|
||||
|
||||
self.page.listen.start("public/quote/v1/getKlineV2")
|
||||
logger.info("已开始监听K线数据...")
|
||||
|
||||
while True:
|
||||
now = time.localtime()
|
||||
minute = now.tm_min
|
||||
self.pbar.n = minute % TRADE_INTERVAL
|
||||
self.pbar.refresh()
|
||||
|
||||
if minute % TRADE_INTERVAL == 0:
|
||||
self.detect_position()
|
||||
kline = self.get_kline_data()
|
||||
if not kline or len(kline) < 3:
|
||||
logger.warning("K线数据不足")
|
||||
time.sleep(REFRESH_INTERVAL)
|
||||
continue
|
||||
|
||||
k1, k2 = kline[-2], kline[-1]
|
||||
|
||||
# 平仓条件
|
||||
if self.position == 1 and is_bearish(k1) and is_bearish(k2):
|
||||
self.close_position()
|
||||
elif self.position == -1 and is_bullish(k1) and is_bullish(k2):
|
||||
self.close_position()
|
||||
|
||||
# 检查信号
|
||||
self.direction, signal = check_engulf_signal(k1, k2)
|
||||
if self.direction:
|
||||
logger.success(f"检测到信号: {self.direction} ({signal})")
|
||||
self.execute_trade()
|
||||
|
||||
self.pbar.reset()
|
||||
|
||||
time.sleep(REFRESH_INTERVAL)
|
||||
|
||||
|
||||
# ==============================
|
||||
# 启动
|
||||
# ==============================
|
||||
if __name__ == '__main__':
|
||||
WeexTransaction("8dcb4f744cf64ab190e465e153088515").action()
|
||||
# ========== 步骤 2: 启动浏览器 ==========
|
||||
print("2. 启动浏览器...")
|
||||
response = requests.post(
|
||||
f"{API_BASE_URL}/api/browser/start",
|
||||
json={"envId": 146473},
|
||||
headers=headers
|
||||
)
|
||||
#
|
||||
# debug_port = response.json()["data"]["port"]
|
||||
# print(f" 浏览器已启动,调试端口: {debug_port}")
|
||||
#
|
||||
# # ========== 步骤 3: 连接 DrissionPage ==========
|
||||
# print("3. 连接 DrissionPage...")
|
||||
# time.sleep(3) # 等待浏览器完全启动
|
||||
#
|
||||
# # 方式1: 通过端口连接(推荐)
|
||||
# co = ChromiumOptions().set_local_port(debug_port)
|
||||
# page = ChromiumPage(addr_or_opts=co)
|
||||
#
|
||||
# # 方式2: 直接使用端口号
|
||||
# # from DrissionPage import Chromium
|
||||
# # page = Chromium(debug_port)
|
||||
#
|
||||
# # 方式3: 使用地址:端口
|
||||
# # page = Chromium(f'127.0.0.1:{debug_port}')
|
||||
#
|
||||
# # 方式4: 使用 WebSocket URL(需要先获取 ws_url)
|
||||
# # ws_url = response.json()["data"]["ws"]
|
||||
# # page = Chromium(ws_url)
|
||||
#
|
||||
# print(" DrissionPage 连接成功")
|
||||
#
|
||||
# # ========== 步骤 4: 执行自动化操作 ==========
|
||||
# print("4. 执行自动化操作...")
|
||||
#
|
||||
# # 访问网页
|
||||
# page.get('https://www.example.com')
|
||||
# print(f" 当前页面: {page.title}")
|
||||
#
|
||||
# # 更多操作示例
|
||||
# page.get_screenshot('screenshot.png') # 截图
|
||||
# print(" 已保存截图")
|
||||
#
|
||||
# print("\n完成!")
|
||||
Reference in New Issue
Block a user