This commit is contained in:
27942
2025-11-04 10:39:42 +08:00
parent 0ca0834dc2
commit 39b95493b8

226
weex交易/text.py Normal file
View File

@@ -0,0 +1,226 @@
import datetime
import time
import hmac
import hashlib
import base64
import json
import re
import requests
from bs4 import BeautifulSoup
from loguru import logger
from tqdm import tqdm
from DrissionPage import ChromiumPage, ChromiumOptions
# ==============================================================
# ✅ 通用工具函数
# ==============================================================
def is_bullish(candle):
return float(candle['close']) > float(candle['open'])
def is_bearish(candle):
return float(candle['close']) < float(candle['open'])
def current_time():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ==============================================================
# ✅ 钉钉通知模块
# ==============================================================
class DingTalkNotifier:
def __init__(self, webhook_url: str, secret: str):
self.webhook_url = webhook_url
self.secret = secret
def _get_signature(self, timestamp):
string_to_sign = f'{timestamp}\n{self.secret}'
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256).digest()
return base64.b64encode(hmac_code).decode('utf-8')
def send(self, content: str):
timestamp = str(round(time.time() * 1000))
sign = self._get_signature(timestamp)
url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
msg = {"msgtype": "text", "text": {"content": content}}
try:
res = requests.post(url, json=msg, timeout=10)
res.raise_for_status()
logger.success(f"[钉钉通知成功] {content}")
except Exception as e:
logger.error(f"[钉钉通知失败] {e}")
# ==============================================================
# ✅ 浏览器与页面交互模块
# ==============================================================
class WeexBrowser:
def __init__(self, tge_id: int, api_key: str, base_url="http://127.0.0.1:50326"):
self.tge_id = tge_id
self.api_key = api_key
self.base_url = base_url
self.page = None
self.port = None
def start_browser(self):
try:
res = requests.post(
f"{self.base_url}/api/browser/start",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"envId": self.tge_id}
).json()
self.port = res["data"]["port"]
logger.info(f"浏览器已启动,端口:{self.port}")
return True
except Exception as e:
logger.error(f"启动浏览器失败: {e}")
return False
def connect(self, url):
try:
opts = ChromiumOptions()
opts.set_local_port(self.port)
self.page = ChromiumPage(addr_or_opts=opts)
self.page.set.window.max()
self.page.get(url=url)
logger.success("浏览器接管成功!")
return True
except Exception as e:
logger.error(f"浏览器接管失败: {e}")
return False
def click(self, xpath):
try:
ele = self.page.ele(f'x://{xpath}')
if ele:
ele.scroll.to_see(center=True)
ele.click()
return True
except Exception as e:
logger.error(f"点击元素失败:{xpath},原因:{e}")
return False
def contains_text(self, text):
soup = BeautifulSoup(self.page.html, "html.parser")
cleaned_target = re.sub(r'\s', '', text)
for tag in soup.find_all():
if cleaned_target in re.sub(r'\s', '', tag.get_text()):
return True
return False
# ==============================================================
# ✅ K线策略模块
# ==============================================================
class KlineStrategy:
@staticmethod
def engulf_signal(prev, curr):
"""包住形态信号"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long"
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short"
return None
# ==============================================================
# ✅ 主交易逻辑模块
# ==============================================================
class WeexTrader:
def __init__(self, tge_id):
self.browser = WeexBrowser(tge_id=tge_id, api_key="asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91")
self.notifier = DingTalkNotifier(
webhook_url="https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125",
secret="SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
)
self.direction = 0 # -1:空 0:无 1:多
self.page = None
def run(self):
if not self.browser.start_browser() or not self.browser.connect("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT"):
return
page = self.browser.page
page.listen.start("public/quote/v1/getKlineV2")
pbar = tqdm(total=30, desc="监控中", ncols=80)
while True:
try:
tm = time.localtime().tm_min
pbar.n = tm % 30
pbar.refresh()
# 每30分钟判断信号
if tm in [0, 1, 2, 30, 31, 32]:
self._check_position(page)
klines = self._get_kline_data(page)
if not klines:
continue
prev, curr = klines[-2:]
signal = KlineStrategy.engulf_signal(prev, curr)
if signal:
self._handle_signal(signal)
else:
logger.info("无信号触发")
time.sleep(10)
except Exception as e:
logger.exception(f"主循环异常: {e}")
time.sleep(15)
def _check_position(self, page):
if self.browser.contains_text("ETH/SUSDT多"):
self.direction = 1
elif self.browser.contains_text("ETH/SUSDT空"):
self.direction = -1
else:
self.direction = 0
def _get_kline_data(self, page):
try:
page.refresh()
res = page.listen.wait(timeout=10)
if not res: return None
return sorted([
{'id': int(d[4]), 'open': d[3], 'high': d[1], 'low': d[2], 'close': d[0]}
for d in res.response.body['data']["dataList"]
], key=lambda x: x['id'])
except Exception as e:
logger.error(f"获取K线失败{e}")
return None
def _handle_signal(self, signal):
if signal == "long" and self.direction <= 0:
self._trade("买入开多", 1)
elif signal == "short" and self.direction >= 0:
self._trade("卖出开空", -1)
def _trade(self, action_text, new_dir):
logger.success(f"{current_time()}:执行 {action_text}")
self.notifier.send(f"{current_time()} 执行 {action_text}")
self.browser.click(f'*[contains(text(), "闪电平仓")]')
time.sleep(2)
self.browser.click(f'*[contains(text(), "{action_text}")]')
self.direction = new_dir
# ==============================================================
# ✅ 启动程序
# ==============================================================
if __name__ == '__main__':
trader = WeexTrader(tge_id=146473)
trader.run()