Files
lm_code/websea/main.py
Administrator 2cdf87e054 fwefwf
2025-12-02 15:17:39 +08:00

423 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import json
import hmac
import time
import base64
import hashlib
import datetime
import requests
from tqdm import *
from loguru import *
from DrissionPage import *
from bs4 import BeautifulSoup
def is_bullish(c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(c): # 阴线
return float(c['close']) < float(c['open'])
class WeexTransaction:
def __init__(self, tge_id):
self.tge_port = None # tge浏览器使用端口
self.tge_id = tge_id # tge id
self.tge_url = "http://127.0.0.1:50326" # tge本地服务url
self.tge_headers = {
"Authorization": f"Bearer asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91",
"Content-Type": "application/json"
}
# 替换为你自己的钉钉机器人 Webhook 地址
self.webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125"
# 替换为你自己的钉钉机器人秘钥
self.secret = "SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
self.page = None # 浏览器对象
self.start = 0 # 持仓状态 -1:做空0维持仓1做多
self.kline_1 = None # 01
self.kline_2 = None # 01
self.direction = None # 信号类型
self.pbar = None # 进度条对象
def get_signature(self, timestamp):
# 将时间戳和密钥拼接
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign = string_to_sign.encode('utf-8')
# 使用 HMAC-SHA256 算法进行签名
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest()
# 对签名结果进行 Base64 编码
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
# def send_dingtalk_message(self, message_content):
# # 获取当前时间戳(毫秒)
# timestamp = str(round(time.time() * 1000))
# # 生成签名
# sign = self.get_signature(timestamp, )
# # 拼接带有签名信息的完整 Webhook URL
# full_url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
#
# # 定义消息内容
# message = {
# "msgtype": "text",
# "text": {
# "content": message_content
# }
# }
#
# # 设置请求头
# headers = {
# "Content-Type": "application/json"
# }
#
# try:
# # 发送 POST 请求
# response = requests.post(full_url, headers=headers, data=json.dumps(message))
#
# except requests.RequestException as e:
# print(f"请求发生错误: {e}")
def send_dingtalk_message(self, message_content):
pass
# url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
#
# res = requests.post(
# url=url,
# json={
# "phone": "8613661496481",
# "bot_name": "ergggreef",
# "datas": [
# {"send_message": [message_content], "click_button": [""], },
# ]
#
# }
# )
#
# print(res.json())
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
return True
except:
return False
def take_over_browser(self):
try:
co = ChromiumOptions()
co.set_local_port(self.tge_port)
self.page = ChromiumPage(addr_or_opts=co)
self.page.set.window.max()
return True
except:
return False
def is_bullish(self, c): # 阳线
return float(c['close']) > float(c['open'])
def is_bearish(self, c): # 阴线
return float(c['close']) < float(c['open'])
def check_signal(self, prev, curr):
"""
包住形态信号判定仅15分钟K线
- 前跌后涨包住 -> 做多
- 前涨后跌包住 -> 做空
"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
# 前跌后涨包住 -> 做多
if is_bullish(curr) and is_bearish(prev) and int(c_open) <= int(p_close) and int(c_close) >= int(p_open):
return "long", "bear_bull_engulf"
# 前涨后跌包住 -> 做空
if is_bearish(curr) and is_bullish(prev) and int(c_open) >= int(p_close) and int(c_close) <= int(p_open):
return "short", "bull_bear_engulf"
return None, None
def get_price(self):
for i in range(3):
try:
logger.info(f"获取最新数据:{i + 1}次。。。")
self.mn_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
res = self.mn_tab.listen.wait(timeout=15) # 等待并获取一个数据包
datas = []
if res:
for data in res.response.body["result"]['data']:
insert_data = {
'id': int(data["id"]),
'open': float(data["open"]),
'high': float(data["high"]),
'low': float(data["low"]),
'close': float(data["close"])
}
datas.append(insert_data)
return datas
except:
pass
return False
def remove_tags_and_spaces(self, html):
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(html, 'html.parser')
# 获取去除标签后的文本
text = soup.get_text()
# 去除所有空格
text = text.replace(" ", "")
return text
def to_do_page(self):
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
self.mn_tab.ele('x://*[contains(text(), "市价")]', timeout=15).click()
time.sleep(1)
html_text = self.remove_tags_and_spaces(html=self.mn_tab.html)
# 定义正则表达式模式,用于匹配包含逗号和小数点的数值
pattern = r'可用:([\d,]+\.\d+)USDT'
# 使用 re.search 方法查找匹配项
match = re.search(pattern, html_text)
number = ""
if match:
# 提取匹配到的数值字符串
number_str = match.group(1).replace(',', '')
# 将提取的字符串转换为浮点数
number = float(number_str)
if not number:
return
self.mn_tab.ele('x://*[@id="amountInput"]').input(number / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "买入/做多")]').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "卖出/做空")]').click()
self.start = -1
def get_text(self, target_text):
# 去除目标文本中的空白字符
cleaned_target_text = re.sub(r'\s', '', target_text)
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(self.mn_tab.html, 'html.parser')
# 遍历所有标签的文本内容
for tag in soup.find_all():
tag_text = tag.get_text()
# 去除标签文本中的空白字符
cleaned_tag_text = re.sub(r'\s', '', tag_text)
if cleaned_target_text in cleaned_tag_text:
return True
else:
return False
def get_now_time(self):
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
return int(target_timestamp)
def close_extra_tabs_in_browser(self):
try:
for _, i in enumerate(self.page.get_tabs()):
if _ == 0:
continue
i.close()
return True
except:
pass
return False
def get_num(self):
params = {
'type': '1',
}
response = requests.get('https://gains.websea.com/webApi/analysis/convertInto', params=params,
cookies=self.cookies,
headers= self.new_dict)
print(response.json())
def action(self):
# 获取比特端口
if self.openBrowser():
logger.info("获取打开比特成功,成功获取端口!!!")
else:
logger.error("打开比特失败!!!")
return
# 接管浏览器
if self.take_over_browser():
logger.info("接管比特浏览器成功!!!")
else:
logger.error("接管浏览器失败!!!")
return
if self.close_extra_tabs_in_browser():
logger.info('关闭多余标签页成功!!!')
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab()
self.mn_tab.listen.start("https://capi.websea.com/webApi/market/getKline")
logger.success("浏览器开启抓包模式。。。")
self.mn_tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
while True:
# 获取当前时间
current_time = time.localtime()
current_minute = current_time.tm_min
if current_minute < 30:
self.pbar.n = current_minute
self.pbar.refresh()
else:
self.pbar.n = current_minute - 30
self.pbar.refresh()
# if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
else:
logger.info("获取最新价格有问题!!!")
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
# 判断抓取的数据是否正确
if self.get_now_time() != self.kline_3["id"]:
continue
time.sleep(15)
if self.get_text(target_text="全部平仓ETHUSDT永续多全仓100X持仓量"):
self.start = 1
elif self.get_text(target_text="全部平仓ETHUSDT永续空全仓100X持仓量"):
self.start = -1
else:
self.start = 0
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "市价全平")]').click()
self.start = 0
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
if self.direction:
try:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()}{e}")
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)
if __name__ == '__main__':
WeexTransaction(
tge_id=191303,
).action()
# //*[contains(text(), '特定文本')]