This commit is contained in:
Administrator
2025-12-11 17:04:40 +08:00
parent 577e0f62e4
commit d20a0a4220
8 changed files with 51 additions and 32 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -433,7 +433,7 @@ class WeexTransaction:
self.to_do_page()
except Exception as e:
self.send_dingtalk_message(
message_content=f"weex:{self.get_now_time()},购买操作失败,{e}")
message_content=f"bitmart:{self.get_now_time()},购买操作失败,{e}")
self.pbar.reset() # 重置进度条
self.send_dingtalk_message(

View File

@@ -1,14 +1,16 @@
import re
import time
import asyncio
import datetime
from tqdm import *
from loguru import *
from DrissionPage import *
from bs4 import BeautifulSoup
from curl_cffi import requests
from telethon import TelegramClient
def is_bullish(c): # 阳线
@@ -41,26 +43,34 @@ class WeexTransaction:
self.session = requests.Session() # 接口请求对象
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
async def to_do_tg(self, message_content):
# ========== 配置区 ==========
API_ID = 2040 # 替换成你的 API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH
SESSION_FILE = "../telegram/8619211027341" # 登录会话保存文件
# ============================
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
client = TelegramClient(SESSION_FILE, API_ID, API_HASH, proxy=PROXY)
await client.start() # 登录,如果第一次会要求输入手机号和验证码
bot = await client.get_entity("ergggreef")
await client.send_message(bot, message_content)
return True
except:
return False
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:

View File

@@ -1,4 +1,3 @@
import re
import time
import asyncio
import datetime
@@ -9,6 +8,7 @@ from DrissionPage import *
from bs4 import BeautifulSoup
from curl_cffi import requests
from telethon import TelegramClient
def is_bullish(c): # 阳线
@@ -46,26 +46,34 @@ class WeexTransaction:
self.session = requests.Session() # 接口请求对象
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
async def to_do_tg(self, message_content):
# ========== 配置区 ==========
API_ID = 2040 # 替换成你的 API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH
SESSION_FILE = "../telegram/8619211027341" # 登录会话保存文件
# ============================
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
try:
response = requests.post(
f"{self.tge_url}/api/browser/start",
json={"envId": self.tge_id},
headers=self.tge_headers
)
self.tge_port = response.json()["data"]["port"]
client = TelegramClient(SESSION_FILE, API_ID, API_HASH, proxy=PROXY)
await client.start() # 登录,如果第一次会要求输入手机号和验证码
bot = await client.get_entity("ergggreef")
await client.send_message(bot, message_content)
return True
except:
return False
def send_dingtalk_message(self, message_content):
result = asyncio.run(self.to_do_tg(message_content))
return result
def openBrowser(self, ): # 直接指定ID打开窗口也可以使用 createBrowser 方法返回的ID
try:
@@ -134,7 +142,8 @@ class WeexTransaction:
for i in range(3):
logger.info(f"获取最新数据:{i + 1}次。。。")
try:
response = self.session.get('https://contract-v2.bitmart.com/v1/ifcontract/quote/kline', params=params, timeout=5)
response = self.session.get('https://contract-v2.bitmart.com/v1/ifcontract/quote/kline', params=params,
timeout=5)
for i in response.json()["data"]:
insert_data = {
@@ -343,8 +352,8 @@ class WeexTransaction:
self.pbar.n = current_minute - 30
self.pbar.refresh()
if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
# if current_minute not in range(60): # 判断是否是 新的30分钟了
# if current_minute not in [0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 34, ]: # 判断是否是 新的30分钟了
if current_minute not in range(60): # 判断是否是 新的30分钟了
time.sleep(10)
continue