This commit is contained in:
Administrator
2025-12-02 17:50:22 +08:00
parent 2c9ea4db65
commit 32789b992d

View File

@@ -3,17 +3,14 @@ import json
import hmac
import time
import base64
import random
import hashlib
import datetime
import requests
from tqdm import *
from loguru import *
from DrissionPage import *
from bs4 import BeautifulSoup
from curl_cffi import requests
def is_bullish(c): # 阳线
@@ -48,7 +45,6 @@ class WeexTransaction:
self.direction = None # 信号类型
self.pbar = None # 进度条对象
self.session = requests.Session() # 接口请求对象
def get_signature(self, timestamp):
# 将时间戳和密钥拼接
@@ -60,33 +56,53 @@ class WeexTransaction:
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
# def send_dingtalk_message(self, message_content):
# # 获取当前时间戳(毫秒)
# timestamp = str(round(time.time() * 1000))
# # 生成签名
# sign = self.get_signature(timestamp, )
# # 拼接带有签名信息的完整 Webhook URL
# full_url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
#
# # 定义消息内容
# message = {
# "msgtype": "text",
# "text": {
# "content": message_content
# }
# }
#
# # 设置请求头
# headers = {
# "Content-Type": "application/json"
# }
#
# try:
# # 发送 POST 请求
# response = requests.post(full_url, headers=headers, data=json.dumps(message))
#
# except requests.RequestException as e:
# print(f"请求发生错误: {e}")
def send_dingtalk_message(self, message_content):
# 获取当前时间戳(毫秒)
timestamp = str(round(time.time() * 1000))
# 生成签名
sign = self.get_signature(timestamp, )
# 拼接带有签名信息的完整 Webhook URL
full_url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
# 定义消息内容
message = {
"msgtype": "text",
"text": {
"content": message_content
}
}
pass
# 设置请求头
headers = {
"Content-Type": "application/json"
}
try:
# 发送 POST 请求
response = requests.post(full_url, headers=headers, data=json.dumps(message))
except requests.RequestException as e:
print(f"请求发生错误: {e}")
# url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
#
# res = requests.post(
# url=url,
# json={
# "phone": "8613661496481",
# "bot_name": "ergggreef",
# "datas": [
# {"send_message": [message_content], "click_button": [""], },
# ]
#
# }
# )
#
# print(res.json())
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
@@ -143,28 +159,16 @@ class WeexTransaction:
return None, None
def get_price(self):
params = {
'contractId': '10000002',
'productCode': 'cmt_ethusdt',
'priceType': 'LAST_PRICE',
'klineType': 'MINUTE_30',
'limit': '300',
'timeZone': 'string',
'languageType': '1',
'sign': 'SIGN',
}
for i in range(3):
try:
logger.info(f"获取最新数据:{i + 1}次。。。")
res = self.session.get(
url='https://http-gateway2.ngsvsfx.cn/api/v1/public/quote/v1/getKlineV2',
params=params,
)
self.mn_tab.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT")
res = self.mn_tab.listen.wait(timeout=15) # 等待并获取一个数据包
datas = []
if res:
for data in res.json()['data']["dataList"]:
for data in res.response.body['data']["dataList"]:
insert_data = {
'id': int(data[4]),
'open': float(data[3]),
@@ -181,30 +185,53 @@ class WeexTransaction:
return False
def remove_tags_and_spaces(self, html):
# 创建 BeautifulSoup 对象
soup = BeautifulSoup(html, 'html.parser')
# 获取去除标签后的文本
text = soup.get_text()
# 去除所有空格
text = text.replace(" ", "")
return text
def to_do_page(self):
self.mn_tab = self.page.new_tab("https://www.weeaxs.site/zh-CN/futures/ETH-USDT")
# self.page.get("https://www.weeaxs.site/zh-CN/futures/ETH-USDT")
self.mn_tab.ele('x://*[contains(text(), "市价")]', timeout=15).click()
time.sleep(1)
self.mn_tab.ele('x://input[@placeholder="请输入数量"]').input(10)
html_text = self.remove_tags_and_spaces(html=self.mn_tab.html)
# 使用 re.search 方法查找匹配项
match = re.search(r'委托可用\s*([0-9]+(?:\.[0-9]+)?)SUSDT', html_text)
match = re.search(r"可用\s*([0-9]+(?:\.[0-9]+)?)", html_text)
number = ""
if match:
# 提取匹配到的数值字符串
number_str = match.group(1).replace(',', '')
# 将提取的字符串转换为浮点数
number = float(number_str)
if not number:
return
self.mn_tab.ele('x://input[@placeholder="请输入数量"]').input(number / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.mn_tab.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
@@ -213,15 +240,13 @@ class WeexTransaction:
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
self.mn_tab.close()
def get_text(self, target_text):
# 去除目标文本中的空白字符
cleaned_target_text = re.sub(r'\s', '', target_text)
@@ -271,94 +296,6 @@ class WeexTransaction:
return False
def get_user_data(self):
self.mn_tab = self.page.new_tab()
self.mn_tab.listen.start("https://gateway2.ngsvsfx.cn/v1/user/overview/userinfo")
self.mn_tab.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
res = self.mn_tab.listen.wait(timeout=15) # 等待并获取一个数据包
if res:
self.weex_headers = dict(res.request.headers)
self.session.headers.update(self.weex_headers)
self.mn_tab.close()
return True
self.mn_tab.close()
return False
def get_start(self):
json_data = {
'filterCoinIdList': [
2,
],
'filterContractIdList': [],
'filterOrderStatusList': [
'CANCELED',
'FILLED',
],
'filterOrderTypeList': [],
'languageType': 1,
'limit': 20,
'sign': 'SIGN',
'timeZone': 'string',
}
try:
response = self.session.post(
'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderPage',
json=json_data,
)
data = []
for i in response.json()["data"]["dataList"]:
print(i)
data.append({
"id": int(i["createdTime"]),
"start": i["legacyOrderDirectionDesc"]
})
new_data = sorted(data, key=lambda x: x["id"])
if new_data[-1]["data1"] == "开多仓":
self.start = 1
elif new_data[-1]["data1"] == "开空仓":
self.start = -1
return True
except:
pass
return False
def close_position(self):
try:
self.mn_tab = self.page.new_tab("https://www.weeaxs.site/zh-CN/futures/ETH-USDT")
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
return True
except:
pass
return False
def action(self):
# 获取比特端口
if self.openBrowser():
@@ -379,6 +316,12 @@ class WeexTransaction:
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab()
self.mn_tab.listen.start("public/quote/v1/getKlineV2")
logger.success("浏览器开启抓包模式。。。")
self.mn_tab.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
while True:
@@ -393,21 +336,17 @@ class WeexTransaction:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, 58]: # 判断是否是 新的30分钟了
time.sleep(10)
continue
new_price_datas = None
for i in range(3):
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
else:
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},获取最新价格有问题,重新获取用户信息!!!")
self.get_user_data()
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
else:
logger.info("获取最新价格有问题!!!")
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
self.kline_1, self.kline_2, self.kline_3 = new_price_datas1[-3:]
@@ -416,16 +355,32 @@ class WeexTransaction:
if self.get_now_time() != self.kline_3["id"]:
continue
if self.close_position():
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()},平仓信号,平仓成功!!!")
time.sleep(15)
continue
if self.get_text(target_text="ETH/USDT多"):
self.start = 1
elif self.get_text(target_text="ETH/USDT空"):
self.start = -1
else:
self.start = 0
for i in range(3):
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()}平仓信息,平仓失败======================================")
message_content=f"{datetime.datetime.now()}第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
@@ -433,16 +388,13 @@ class WeexTransaction:
try:
self.to_do_page()
except Exception as e:
while True:
self.send_dingtalk_message(
message_content=f"真实盘:{datetime.datetime.now()}{e}")
time.sleep(random.randint(5, 25))
self.send_dingtalk_message(
message_content=f"{datetime.datetime.now()}{e}")
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"真实盘:{datetime.datetime.now()}"
f"{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)