Files
lm_code/websea/main.py

418 lines
15 KiB
Python
Raw Normal View History

2025-12-02 15:17:39 +08:00
import re
import json
import hmac
import time
import base64
import hashlib
import datetime
import requests
2025-09-25 18:29:53 +08:00
2025-12-02 15:17:39 +08:00
from tqdm import *
from loguru import *
from DrissionPage import *
from bs4 import BeautifulSoup
2025-09-25 18:29:53 +08:00
2025-12-02 15:17:39 +08:00
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
2025-09-25 18:29:53 +08:00
2025-12-02 15:17:39 +08:00
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
class WeexTransaction:
def __init__(self, tge_id):
self.tge_port = None # tge浏览器使用端口
self.tge_id = tge_id # tge id
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# 替换为你自己的钉钉机器人 Webhook 地址
self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125"
# 替换为你自己的钉钉机器人秘钥
self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
self.page = None # 浏览器对象
self.start = 0 # 持仓状态 -1:做空0维持仓1做多
self.kline_1 = None # 01
self.kline_2 = None # 01
self.direction = None # 信号类型
self.pbar = None # 进度条对象
def get_signature(self, timestamp):
# 将时间戳和密钥拼接
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign = string_to_sign.encode('utf-8')
# 使用 HMAC-SHA256 算法进行签名
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest()
# 对签名结果进行 Base64 编码
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
# def send_dingtalk_message(self, message_content):
# # 获取当前时间戳(毫秒)
# timestamp = str(round(time.time() * 1000))
# # 生成签名
# sign = self.get_signature(timestamp, )
# # 拼接带有签名信息的完整 Webhook URL
# full_url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
#
# # 定义消息内容
# message = {
# "msgtype": "text",
# "text": {
# "content": message_content
# }
# }
#
# # 设置请求头
# headers = {
# "Content-Type": "application/json"
# }
#
# try:
# # 发送 POST 请求
# response = requests.post(full_url, headers=headers, data=json.dumps(message))
#
# except requests.RequestException as e:
# print(f"请求发生错误: {e}")
def send_dingtalk_message(self, message_content):
pass
# url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
#
# res = requests.post(
# url=url,
# json={
# "phone": "8613661496481",
# "bot_name": "ergggreef",
# "datas": [
# {"send_message": [message_content], "click_button": [""], },
# ]
#
# }
# )
#
# print(res.json())
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open):
return "short", "bull_bear_engulf"
return None, None
def get_price(self):
for i in range(3):
try:
logger.info(f"获取最新数据:{i + 1}次。。。")
self.mn_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
2025-12-03 18:27:09 +08:00
res = self.mn_tab.listen.wait(timeout=25) # 等待并获取一个数据包
2025-12-02 15:17:39 +08:00
datas = []
if res:
for data in res.response.body["result"]['data']:
insert_data = {
'id': int(data["id"]),
'open': float(data["open"]),
'high': float(data["high"]),
'low': float(data["low"]),
'close': float(data["close"])
}
datas.append(insert_data)
return datas
except:
pass
return False
def remove_tags_and_spaces(self, html):
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(html, 'html.parser')
# 获取去除标签后的文本
text = soup.get_text()
# 去除所有空格
text = text.replace(" ", "")
return text
def to_do_page(self):
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
self.mn_tab.ele('x://*[contains(text(), "市价")]', timeout=15).click()
time.sleep(1)
2025-12-02 15:24:04 +08:00
self.mn_tab.ele('x://*[@id="amountInput"]').input(float(self.get_num()) / 100)
2025-12-02 15:17:39 +08:00
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click()
self.start = -1
def get_text(self, target_text):
# 去除目标文本中的空白字符
cleaned_target_text = re.sub(r'\s', '', target_text)
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(self.mn_tab.html, 'html.parser')
# 遍历所有标签的文本内容
for tag in soup.find_all():
tag_text = tag.get_text()
# 去除标签文本中的空白字符
cleaned_tag_text = re.sub(r'\s', '', tag_text)
if cleaned_target_text in cleaned_tag_text:
return True
else:
return False
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp)
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_num(self):
2025-12-02 15:24:04 +08:00
num_tab = self.page.new_tab()
num_tab.listen.start("capi.websea.com/webApi/asset/account")
for i in range(3):
try:
logger.info(f"获取最新数据:{i + 1}次。。。")
num_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
res = num_tab.listen.wait(timeout=15) # 等待并获取一个数据包
2025-12-02 15:17:39 +08:00
2025-12-02 15:24:04 +08:00
if res:
2025-12-03 13:16:56 +08:00
num_tab.close()
2025-12-02 16:18:04 +08:00
return res.response.body["result"]["availableUsd"]
2025-12-02 15:24:04 +08:00
except:
pass
num_tab.close()
return False
2025-12-02 15:17:39 +08:00
def action(self):
# 获取比特端口
if self.openBrowser():
logger.info("获取打开比特成功,成功获取端口!!!")
else:
logger.error("打开比特失败!!!")
return
# 接管浏览器
if self.take_over_browser():
logger.info("接管比特浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab()
2025-12-03 18:27:09 +08:00
self.mn_tab.listen.start("capi.websea.com/webApi/market/getKline")
2025-12-02 15:17:39 +08:00
logger.success("浏览器开启抓包模式。。。")
self.mn_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
2025-12-02 16:18:04 +08:00
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
2025-12-02 15:17:39 +08:00
time.sleep(10)
continue
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
else:
logger.info("获取最新价格有问题!!!")
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() != self.kline_3["id"]:
continue
time.sleep(15)
if self.get_text(target_text="全部平仓ETHUSDT永续多全仓100X持仓量"):
self.start = 1
elif self.get_text(target_text="全部平仓ETHUSDT永续空全仓100X持仓量"):
self.start = -1
else:
self.start = 0
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
self.start = 0
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()}{e}")
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
2025-09-25 18:29:53 +08:00
if __name__ == '__main__':
2025-12-02 15:17:39 +08:00
WeexTransaction(
tge_id=191303,
).action()
# //*[contains(text(), '特定文本')]