rgfewfger

This commit is contained in:
27942
2025-12-12 11:05:44 +08:00
parent dc6f03fc6e
commit c38679eb32
3 changed files with 147 additions and 149 deletions

View File

@@ -157,6 +157,8 @@ class WeexTransaction:
logger.info("获取可用余额成功!!!")
else:
logger.error("获取可用余额失败!!!")
self.send_dingtalk_message("获取可用余额失败!!!", type=0)
return
self.page.ele('x://*[@id="size_0"]').input(float(num) / 100)
@@ -325,13 +327,13 @@ class WeexTransaction:
if type:
send_dingtalk_message(
message_content=f"🔔bitmart{self.get_now_time1()}" + message_content
message_content=f"🔔bitmart{self.get_now_time1()}" + message_content
)
else:
for i in range(15):
send_dingtalk_message(
message_content=f"🔔bitmart{self.get_now_time1()}" + message_content
message_content=f"❌bitmart{self.get_now_time1()}" + message_content
)
def action(self):
@@ -417,6 +419,8 @@ class WeexTransaction:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
@@ -425,6 +429,8 @@ class WeexTransaction:
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x:(//span[normalize-space(text()) = "市价"])').scroll.to_see(center=True)
time.sleep(1)
@@ -447,7 +453,6 @@ class WeexTransaction:
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(
message_content=
f"{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)

View File

@@ -1,5 +1,4 @@
import time
import asyncio
import datetime
from tqdm import *
@@ -7,7 +6,6 @@ from loguru import *
from DrissionPage import *
from curl_cffi import requests
from telethon import TelegramClient
from 交易.tools import send_dingtalk_message
@@ -42,33 +40,8 @@ class WeexTransaction:
self.session = requests.Session() # 接口请求对象
async def to_do_tg(self, message_content):
# ========== 配置区 ==========
API_ID = 2040 # 替换成你的 API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH
SESSION_FILE = "../telegram/8619211027341" # 登录会话保存文件
# ============================
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
try:
client = TelegramClient(SESSION_FILE, API_ID, API_HASH, proxy=PROXY)
await client.start() # 登录,如果第一次会要求输入手机号和验证码
bot = await client.get_entity("ergggreef")
await client.send_message(bot, message_content)
return True
except:
return False
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
self.headers = {}
self.cookies = {}
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
@@ -167,7 +140,7 @@ class WeexTransaction:
def to_do_page(self):
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
self.mn_tab.ele('x://*[normalize-space(text())= "市价"]', timeout=15).click()
self.page.ele('x://*[normalize-space(text())= "市价"]', timeout=15).click()
time.sleep(1)
num = self.get_num()
@@ -175,46 +148,44 @@ class WeexTransaction:
logger.info("获取可用余额成功!!!")
else:
logger.error("获取可用余额失败!!!")
self.send_dingtalk_message("获取可用余额失败!!!", type=0)
return
self.mn_tab.ele('x://*[@id="amountInput"]').input(float(num) / 100)
self.page.ele('x://*[@id="amountInput"]').input(float(num) / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(f"信号:{self.direction},开多,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[normalize-space(text())= "买入/做多"]').click()
self.page.ele('x://*[normalize-space(text())= "买入/做多"]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(f"信号:{self.direction},开空,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[normalize-space(text())= "卖出/做空"]').click()
self.page.ele('x://*[normalize-space(text())= "卖出/做空"]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(f"信号:{self.direction},反手平空做多,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
time.sleep(3)
self.mn_tab.ele('x://*[normalize-space(text())= "买入/做多"]').click()
self.page.ele('x://*[normalize-space(text())= "买入/做多"]').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(f"信号:{self.direction},反手平多做空,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
time.sleep(3)
self.mn_tab.ele('x://*[normalize-space(text())= "卖出/做空"]').click()
self.page.ele('x://*[normalize-space(text())= "卖出/做空"]').click()
self.start = -1
def get_now_time(self):
@@ -269,8 +240,7 @@ class WeexTransaction:
tab.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT")
res = tab.listen.wait()
self.headers = {}
self.cookies = {}
for i in res.request.cookies:
if not i["name"].startswith(':'):
self.cookies[i["name"]] = i["value"]
@@ -312,6 +282,19 @@ class WeexTransaction:
return False
def get_now_time1(self):
timestamp = time.time()
local_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
return formatted_time
def send_dingtalk_message(self, message_content, type=1):
if type:
send_dingtalk_message(f"🔔websea{self.get_now_time1()}" + message_content)
else:
for i in range(15):
send_dingtalk_message(f"❌websea{self.get_now_time1()}" + message_content)
def action(self):
# 获取比特端口
if self.openBrowser():
@@ -332,7 +315,7 @@ class WeexTransaction:
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页
self.page.get(url="https://www.websea.com/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
@@ -349,15 +332,15 @@ class WeexTransaction:
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
else:
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0)
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
@@ -367,33 +350,45 @@ class WeexTransaction:
if self.get_now_time() != self.kline_3["id"]:
continue
self.get_token()
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败")
self.send_dingtalk_message(message_content=f"获取token失败", type=0)
continue
if self.get_position_status():
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
try:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()}第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
self.page.ele('x://*[normalize-space(text())= "市价全平"]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[normalize-space(text())= "市价全平"]').click()
self.start = 0
except:
self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0)
continue
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
@@ -401,13 +396,12 @@ class WeexTransaction:
try:
self.to_do_page()
except Exception as e:
send_dingtalk_message(
message_content=f"websea{datetime.datetime.now()}{e}")
self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0)
continue
self.pbar.reset() # 重置进度条
send_dingtalk_message(
self.send_dingtalk_message(
message_content=
f"websea{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)

View File

@@ -47,33 +47,24 @@ class WeexTransaction:
self.session = requests.Session() # 接口请求对象
async def to_do_tg(self, message_content):
# ========== 配置区 ==========
API_ID = 2040 # 替换成你的 API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH
SESSION_FILE = "../telegram/8619211027341" # 登录会话保存文件
# ============================
def get_now_time1(self):
timestamp = time.time()
local_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
return formatted_time
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
def send_dingtalk_message(self, message_content, type=1):
try:
client = TelegramClient(SESSION_FILE, API_ID, API_HASH, proxy=PROXY)
await client.start() # 登录,如果第一次会要求输入手机号和验证码
bot = await client.get_entity("ergggreef")
await client.send_message(bot, message_content)
return True
except:
return False
if type:
send_dingtalk_message(
message_content=f"🔔weex{self.get_now_time1()}" + message_content
)
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
else:
for i in range(15):
send_dingtalk_message(
message_content=f"❌weex{self.get_now_time1()}" + message_content
)
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
@@ -165,7 +156,7 @@ class WeexTransaction:
def to_do_page(self):
# self.page.get("https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT")
self.mn_tab.ele('x:(//button[normalize-space(text()) = "市价"])').click()
self.page.ele('x:(//button[normalize-space(text()) = "市价"])').click()
time.sleep(1)
num = self.get_num()
@@ -173,46 +164,43 @@ class WeexTransaction:
logger.info("获取可用余额成功!!!")
else:
logger.error("获取可用余额失败!!!")
self.send_dingtalk_message("获取可用余额失败!!!", type=0)
return
self.mn_tab.ele('x://input[@placeholder="请输入数量"]').input(float(num) / 100)
self.page.ele('x://input[@placeholder="请输入数量"]').input(float(num) / 100)
time.sleep(1)
if self.direction == "long" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开多")
self.send_dingtalk_message(f"信号:{self.direction},开多,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[contains(text(), "买入开多")]').click()
self.page.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and not self.start:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},开空")
self.send_dingtalk_message(f"信号:{self.direction},开空,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.page.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
elif self.direction == "long" and self.start == -1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平空做多")
self.send_dingtalk_message(f"信号:{self.direction},反手平空做多,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "买入开多")]').click()
self.page.ele('x://*[contains(text(), "买入开多")]').click()
self.start = 1
elif self.direction == "short" and self.start == 1:
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},反手平多做空")
self.send_dingtalk_message(f"信号:{self.direction},反手平多做空,开仓金额:{float(num) / 100}")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
time.sleep(3)
self.mn_tab.ele('x://*[contains(text(), "卖出开空")]').click()
self.page.ele('x://*[contains(text(), "卖出开空")]').click()
self.start = -1
def get_now_time(self):
@@ -330,7 +318,7 @@ class WeexTransaction:
else:
logger.info('关闭多余标签页失败!!!')
self.mn_tab = self.page.new_tab(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
self.page.get(url="https://www.weeaxs.site/zh-CN/futures/ETH-USDT") # 打开网页
self.pbar = tqdm(total=30, desc="等待时间中", ncols=80) # desc进度条说明ncols长度
@@ -351,12 +339,18 @@ class WeexTransaction:
time.sleep(10)
continue
self.get_token()
new_price_datas = self.get_price()
if new_price_datas:
logger.success("获取最新交易价格成功!!!")
if self.get_token(): # 获取token
logger.info("获取token成功!!!")
else:
logger.info("获取token失败")
self.send_dingtalk_message(message_content=f"获取token失败", type=0)
continue
new_price_datas = self.get_price()
if not new_price_datas:
logger.info("获取最新价格有问题!!!")
self.send_dingtalk_message(message_content=f"获取价格有问题!!!", type=0)
continue
new_price_datas1 = sorted(new_price_datas, key=lambda x: x["id"])
@@ -370,28 +364,34 @@ class WeexTransaction:
logger.info("获取仓位信息成功!!!")
else:
logger.info("获取仓位信息失败!!!")
self.send_dingtalk_message(message_content=f"获取仓位信息失败!!!", type=0)
continue
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
try:
if self.start == 1:
if is_bearish(self.kline_1) and is_bearish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平多")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平多")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()}第一根信号:{self.kline_1}{self.kline_2},平空")
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
elif self.start == -1:
if is_bullish(self.kline_1) and is_bullish(self.kline_2):
logger.success(f"{datetime.datetime.now()},第一根信号:{self.kline_1}{self.kline_2},平空")
self.send_dingtalk_message(
message_content=f"第一根信号:{self.kline_1}{self.kline_2},平空")
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.mn_tab.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
self.page.ele('x://*[contains(text(), "闪电平仓")]').scroll.to_see(center=True)
time.sleep(1)
self.page.ele('x://*[contains(text(), "闪电平仓")]').click()
self.start = 0
except:
self.send_dingtalk_message(message_content=f"止损平仓出错!!!", type=0)
continue
self.direction, signal_key = self.check_signal(prev=self.kline_1, curr=self.kline_2) # 判断信号
@@ -399,13 +399,12 @@ class WeexTransaction:
try:
self.to_do_page()
except Exception as e:
send_dingtalk_message(
message_content=f"weex{datetime.datetime.now()}{e}")
self.send_dingtalk_message(message_content=f"购买操作失败,{e}", type=0)
continue
self.pbar.reset() # 重置进度条
send_dingtalk_message(
self.send_dingtalk_message(
message_content=
f"weex{datetime.datetime.now()}"
f"目前有持仓:{"" if self.start == 0 else ("" if self.start == 1 else "")}"
f"当前信号:{"" if not self.direction else ("" if self.direction == "long" else "")}"
)