Merge remote-tracking branch 'origin/master'

# Conflicts:
#	test.py
This commit is contained in:
Administrator
2025-11-19 13:58:55 +08:00
13 changed files with 1062 additions and 254 deletions

Binary file not shown.

233
telegram/bot-n2.0.py Normal file
View File

@@ -0,0 +1,233 @@
from telethon import TelegramClient, events
import sqlite3
from datetime import date
import os
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
ALLOWED_GROUPS = [-1003238845008] # 只允许的群聊
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库,创建缺失表"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 用户积分与签到
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER
PRIMARY
KEY,
username
TEXT,
points
INTEGER
DEFAULT
0,
last_sign_date
TEXT
)
''')
# 发言统计
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id
INTEGER,
date
TEXT,
count
INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
# 邀请统计
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id
INTEGER,
invitee_id
INTEGER
PRIMARY
KEY
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username=? WHERE user_id=?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
return False if row and row[0] == today else True
def add_points(user_id, points):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?,
last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 发言统计 ----------
def add_message(user_id):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date)
DO
UPDATE SET count = count + 1
""", (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking():
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
""", (today,))
results = cursor.fetchall()
conn.close()
return results
# ---------- 邀请统计 ----------
def add_invite(inviter_id, invitee_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO invites (inviter_id, invitee_id) VALUES (?, ?)", (inviter_id, invitee_id))
conn.commit()
conn.close()
def get_invite_count(inviter_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id=?", (inviter_id,))
count = cursor.fetchone()[0]
conn.close()
return count
# ---------- Bot 主逻辑 ----------
async def main():
# 初始化数据库,保证缺失表也会创建
init_db()
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
@bot.on(events.NewMessage)
async def message_handler(event):
# 限制群聊
if event.chat_id not in ALLOWED_GROUPS:
return
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
# 只统计普通消息,不统计命令
if not event.raw_text.startswith('/'):
add_message(user_id)
# 处理签到命令
if event.raw_text in ['签到', '/sign']:
if not can_sign(user_id):
await event.reply(f"🌞 @{username},你今天已经签到过啦!")
return
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ @{username} 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
# 查询每日发言排行
elif event.raw_text == '/daily_rank':
ranking = get_daily_message_ranking()
if not ranking:
await event.reply("📊 今日还没有人发言哦~")
return
msg = "📊 今日发言排行:\n"
for i, (uname, count) in enumerate(ranking, 1):
msg += f"{i}. {uname}: {count}\n"
await event.reply(msg)
# 查询自己邀请人数
elif event.raw_text == '/my_invites':
count = get_invite_count(user_id)
await event.reply(f"👥 @{username},你邀请了 {count} 人。")
print("🤖 签到机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(main())

384
telegram/bot-v3.0.py Normal file
View File

@@ -0,0 +1,384 @@
import datetime
import random
import requests
from telethon import TelegramClient, events, Button
import sqlite3
from datetime import date
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
ALLOWED_GROUPS = [-1003238845008]
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
INVITE_LINK = "https://www.websea.my/en/signup?key=77346588"
# ============================
# 命令字典
COMMANDS = {
'/sign': '签到,每天签到一次,获取积分',
'签到': '签到,每天签到一次,获取积分',
'/daily_rank': '查看今日发言排行',
'/my_invites': '查看你邀请的人数',
# '/btc': '获取 BTC/USDT 最新价格',
'/help': '显示所有可用命令及说明'
}
# 命令分组
data = {
"签到": ['/sign', '签到',"/tanda"],
"发言": ["/daily_rank","发言","/kedudukan_harian"],
"邀请": ['/my_invites',"邀请","/daily_position"],
"币价": ['/btc', 'btc']
}
# 支持币种
crypto_currencies = {
"BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"},
"ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"},
"SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"},
"BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"},
"WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"}
}
# 主菜单按钮
main_buttons = [
[Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")],
[Button.inline("我的邀请", b"/my_invites"), Button.inline("币价", b"/btc")],
[Button.inline("帮助", b"/help")]
]
def get_command_list_text():
msg = "🤖 机器人命令列表:\n"
for cmd, desc in COMMANDS.items():
msg += f"{cmd}{desc}\n"
return msg
# ---------- 数据库 ----------
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER
PRIMARY
KEY,
username
TEXT,
points
INTEGER
DEFAULT
0,
last_sign_date
TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id
INTEGER,
date
TEXT,
count
INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id
INTEGER,
invitee_id
INTEGER,
PRIMARY
KEY
(
inviter_id,
invitee_id
)
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username=? WHERE user_id=?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
return False if row and row[0] == today else True
def add_points(user_id, points):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET points = points + ?, last_sign_date = ? WHERE user_id = ?",
(points, today, user_id)
)
conn.commit()
conn.close()
def get_points(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else 0
# ---------- 发言统计 ----------
def add_message(user_id):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO
UPDATE SET count = count + 1
''', (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking():
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
''', (today,))
rows = cursor.fetchall()
conn.close()
return rows
# ---------- 币价 ----------
def get_price(symbol: str):
headers = {'user-agent': 'Mozilla/5.0'}
tz = datetime.timezone(datetime.timedelta(hours=8))
today = datetime.datetime.now(tz).date()
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = {
'symbol': symbol,
'period': '30min',
'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()),
}
try:
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers,
timeout=10)
data = response.json()["result"]["data"]
if not data:
return 0, 0, 0, 0
current_price = float(data[0]['close'])
today_high = max(float(i['high']) for i in data)
today_low = min(float(i['low']) for i in data)
price_24h_ago = float(data[-1]['open'])
change_24h = (current_price - price_24h_ago) / price_24h_ago * 100
return current_price, change_24h, today_high, today_low
except Exception as e:
print("获取币价失败:", e)
return 0, 0, 0, 0
def get_price_msg(symbol: str):
current_price, change_24h, today_high, today_low = get_price(symbol)
name = symbol.split('-')[0]
url = crypto_currencies.get(name, {}).get("url", "#")
msg = (
f"📊 {name}/USDT 实时行情\n\n"
f"💰 当前价格:{current_price:.4f} USDT\n"
f"📈 24小时涨跌幅{change_24h:.2f}%\n"
f"⬆️ 最高价:{today_high:.4f} USDT\n"
f"⬇️ 最低价:{today_low:.4f} USDT\n\n"
f"🔗 交易链接:[点击前往]({url})\n"
f"🎁 注册邀请:[立即加入]({INVITE_LINK})"
)
return msg
def get_crypto_buttons():
"""生成币种选择按钮"""
buttons = []
row = []
for name in crypto_currencies.keys():
row.append(Button.inline(name, f"price_{name}".encode()))
if len(row) == 3:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([Button.inline("返回菜单", b"/help")])
return buttons
# ---------- 命令处理 ----------
async def handle_command(event, cmd):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
reply = ""
if cmd in data["签到"]:
if not can_sign(user_id):
reply = f"🌞 @{username},你今天已经签到过啦!"
else:
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
reply = f"✅ @{username} 签到成功!获得 {SIGN_POINTS} 积分\n当前总积分:{total}"
elif cmd in data["发言"]:
ranking = get_daily_message_ranking()
if not ranking:
reply = "📊 今日还没有人发言哦~"
else:
reply = "📊 今日发言排行:\n" + "\n".join(f"{i + 1}. {u}: {c}" for i, (u, c) in enumerate(ranking))
elif cmd in data["邀请"]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id=?", (user_id,))
count = cursor.fetchone()[0]
conn.close()
reply = f"👥 @{username},你邀请了 {count} 人。"
elif cmd in data["币价"]:
await event.respond("请选择要查看的币种:", buttons=get_crypto_buttons())
return
else:
await event.reply(get_command_list_text(), buttons=main_buttons)
return
await event.reply(reply)
# ---------- 价格按钮处理 ----------
async def handle_price_command(event, symbol_name: str):
symbol = crypto_currencies[symbol_name]["type"]
msg = get_price_msg(symbol)
await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False)
# ---------- 主逻辑 ----------
async def main():
init_db()
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
# 定时发送 SOL 币价
for group_id in ALLOWED_GROUPS:
asyncio.create_task(send_price_periodically(bot, group_id))
# 欢迎新成员
@bot.on(events.ChatAction)
async def welcome(event):
if event.user_added or event.user_joined:
for user in event.users:
name = user.username or user.first_name or "新成员"
await event.reply(f"欢迎 @{name} 加入群聊!\n{get_command_list_text()}")
# 普通消息
@bot.on(events.NewMessage)
async def on_message(event):
if event.chat_id not in ALLOWED_GROUPS:
return
text = event.raw_text.strip().lower()
# ✅ 去掉命令中的 @botname如 /sign@dockersebot → /sign
if "@" in text:
text = text.split("@")[0]
for group in data:
if text in [cmd.lower() for cmd in data[group]]:
await handle_command(event, text)
return
add_message((await event.get_sender()).id)
# # help
# @bot.on(events.NewMessage(pattern='/help'))
# async def send_help(event):
# 按钮点击
@bot.on(events.CallbackQuery)
async def callback(event):
await event.answer("正在处理...", alert=False)
cmd = event.data.decode()
# ✅ 同样去掉命令中的 @botname
if "@" in cmd:
cmd = cmd.split("@")[0]
if cmd.startswith("price_"):
symbol = cmd.split("_")[1]
await handle_price_command(event, symbol)
else:
await handle_command(event, cmd)
print("🤖 机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
# ---------- 定时币价 ----------
async def send_price_periodically(bot, chat_id):
while True:
try:
random_key = random.choice(list(crypto_currencies.keys()))
msg = get_price_msg(crypto_currencies[random_key]["type"])
await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception as e:
print(f"发送价格失败: {e}")
await asyncio.sleep(3600) # 每小时更新
if __name__ == "__main__":
asyncio.run(main())

Binary file not shown.

Binary file not shown.

135
telegram/bot_test.py Normal file
View File

@@ -0,0 +1,135 @@
from telethon import TelegramClient, events
import sqlite3
from datetime import date
import os
# ========== 配置区 ==========
API_ID = 2040 # ← 替换为你的 Telegram API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # ← 替换为你的 API HASH
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" # ← 替换为你的 Bot Token
DB_PATH = "sign.db" # 数据库文件
SIGN_POINTS = 10 # 每次签到获得的积分
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT
)
''')
conn.commit()
conn.close()
def get_user(user_id):
"""获取用户信息"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE user_id=?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def add_or_update_user(user_id, username):
"""如果用户不存在则新增"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
conn.commit()
conn.close()
def can_sign(user_id):
"""判断今天是否可以签到"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
if row and row[0] == today:
return False
return True
def add_points(user_id, points):
"""给用户加积分并更新签到日期"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?, last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
"""获取用户当前积分"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 机器人主逻辑 ----------
def main():
# 初始化数据库
if not os.path.exists(DB_PATH):
init_db()
proxy = {
'proxy_type': "socks5", # 或 'socks4',具体看你的代理类型
'addr': "202.155.144.102", # 代理服务器地址
'port': 31102, # 代理服务器端口
'username': "SyNuejCtrQ", # 如果有用户名,填写
'password': "MH8ioL7EXf" # 如果有密码,填写
}
# proxy = ("socks5", "202.155.144.102", 31102, True, "SyNuejCtrQ", "MH8ioL7EXf")
bot = TelegramClient(
'haha',
api_id=API_ID,
api_hash=API_HASH,
proxy=proxy
).start(bot_token=BOT_TOKEN)
@bot.on(events.NewMessage(pattern='^(签到|/sign)$'))
async def sign_handler(event):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
# 注册用户
add_or_update_user(user_id, username)
# 检查是否已签到
if not can_sign(user_id):
await event.reply(f"🌞 {username},你今天已经签到过啦!")
return
# 签到成功,加积分
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
print("🤖 签到机器人已启动,等待消息中...")
bot.run_until_disconnected()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,161 @@
from telethon import TelegramClient, events
from telethon.tl.functions.messages import ImportChatInviteRequest
import sqlite3
from datetime import date
import os
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
INVITE_LINK = "ergggreef" # t.me/后面的群链接部分
# 代理配置,如果需要
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT
)
''')
conn.commit()
conn.close()
def get_user(user_id):
"""获取用户信息"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE user_id=?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def add_or_update_user(user_id, username):
"""如果用户不存在则新增"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
conn.commit()
conn.close()
def can_sign(user_id):
"""判断今天是否可以签到"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
if row and row[0] == today:
return False
return True
def add_points(user_id, points):
"""给用户加积分并更新签到日期"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?, last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
"""获取用户当前积分"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 机器人主逻辑 ----------
async def main():
# 初始化数据库
if not os.path.exists(DB_PATH):
init_db()
bot = TelegramClient(
'bot_session',
api_id=API_ID,
api_hash=API_HASH,
proxy=PROXY
)
await bot.start(bot_token=BOT_TOKEN) # 注意这里是 await
# # 尝试加入群聊并获取 chat_id
# try:
# result = await bot(ImportChatInviteRequest(INVITE_LINK))
# chat_id = result.chats[0].id
# print(f"✅ 已加入群聊: {result.chats[0].title}, chat_id={chat_id}")
# except Exception as e:
# # 如果已经加入,遍历对话获取 chat_id
# async for dialog in bot.iter_dialogs():
# if dialog.name and INVITE_LINK in dialog.name: # 简单匹配群名
# chat_id = dialog.id
# print(f"✅ 已存在群聊: {dialog.name}, chat_id={chat_id}")
# break
# else:
# print("❌ 无法加入或找到群聊,请检查邀请链接")
# return
ALLOWED_GROUPS = [-1003238845008]
@bot.on(events.NewMessage(pattern='^(签到|/sign)$'))
async def sign_handler(event):
# 检查是否在允许的群聊中
if event.chat_id not in ALLOWED_GROUPS:
return # 非指定群聊,忽略
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
# 注册用户
add_or_update_user(user_id, username)
# 检查是否已签到
if not can_sign(user_id):
await event.reply(f"🌞 @{username},你今天已经签到过啦!")
return
# 签到成功,加积分
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ @{username} 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
print("🤖 签到机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(main())

BIN
telegram/haha.session Normal file

Binary file not shown.

BIN
telegram/sign.db Normal file

Binary file not shown.

32
telegram/test.py Normal file
View File

@@ -0,0 +1,32 @@
from telethon import TelegramClient, sync
from telethon.sessions import StringSession
# ========== 配置区 ==========
API_ID = 2040 # 替换成你的 API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH
SESSION_FILE = "2349073562091" # 登录会话保存文件
# ============================
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
client = TelegramClient(SESSION_FILE, API_ID, API_HASH,proxy=PROXY)
async def main():
await client.start() # 登录,如果第一次会要求输入手机号和验证码
print("✅ 登录成功!正在获取群聊列表...\n")
async for dialog in client.iter_dialogs():
if dialog.is_group or dialog.is_channel:
print(f"群聊名称: {dialog.name}, chat_id: {dialog.id}")
print("\n✅ 完成!请复制你想要的群聊 chat_id 到 Bot 的 ALLOWED_GROUPS")
if __name__ == "__main__":
import asyncio
asyncio.run(main())

56
test.py
View File

@@ -1,15 +1,49 @@
import requests
url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
headers = {'vs': '05g7pXj83SRrFp94citJ7BkbjNqFc6kQ', 'language': 'zh_CN', 'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"', 'sec-ch-ua-mobile': '?0',
'appVersion': '2.0.1', 'Accept': 'application/json, text/plain, */*', 'Content-Type': 'application/json',
'locale': 'zh_CN', 'terminalCode': 'ea778f2af8f3a418595f4993fe0ed949', 'Referer': 'https://www.weeaxs.site/',
'X-SIG': '64fbc4638b9132d0639c0864ad7b2631',
'sidecar': '01752def6506a5c558793e0c6ea8793a0e6defd487fbe009718c20e36c7b5e16fc',
'X-TIMESTAMP': '1762425944928', 'bundleid': '',
'U-TOKEN': 'eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI4MjE2MDlhOS1kMDI3LTRiOTAtOTU0OC1iZGU3YTlmNmRiOWMxMzczNjgzNjU1IiwidWlkIjoidEQzQ1FIaFJUblVYcm5MNFNwckw3UT09Iiwic3ViIjoieXgyMDI1KioqKkBnbWFpbC5jb20iLCJpcCI6IllXb3dNNkVVWnYzamdjOTkwUHVHUFE9PSIsImRpZCI6InVxSnQ1N1N2SVBZcE5oM1MzU3VxcktCVERiaHBoeTN6S2VyMmZVNnF3MktnMk12cVptaUhFNmJ0YSt0OUgrcUEiLCJzdHMiOjAsImlhdCI6MTc2MjQwOTc3NywiZXhwIjoxNzcwMTg1Nzc3LCJwdXNoaWQiOiJvTmpMNm1ab2h4T203V3ZyZlIvcWdBPT0iLCJhdGwiOiIwIiwiaXNzIjoidXBleCJ9.idUb4bjGwoDy2MRZWmaIuNZAwCRAos6t6nt4sAZBw_Urg2Jtuz5sEZQYnZxx0fczg7RG8zm0bzeAqoIRHXWLSEQs366HA55bAIcz_GM12Wik7-zWZ_CVz5VqTVCO1xUUfRX7a-qRB6HfgdUu_f-rlqG8U8l__65sWhEtDZ2mJNk',
'terminaltype': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36',
'traceId': '378eb6c9-eb23-4925-a306-d437c3865a87', 'accept-encoding': 'gzip, deflate, br, zstd',
'accept-language': 'fi', 'content-length': '68', 'origin': 'https://www.weeaxs.site', 'priority': 'u=1, i',
'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'cross-site'}
res = requests.post(
url=url,
json={
"phone": "8613661496481",
"bot_name": "ergggreef",
"datas": [
{"send_message": ["123"], "click_button": [""], },
]
json_data = {
'filterCoinIdList': [
2,
],
'filterContractIdList': [],
'filterOrderStatusList': [
'CANCELED',
'FILLED',
],
'filterOrderTypeList': [],
'languageType': 1,
'limit': 20,
'sign': 'SIGN',
'timeZone': 'string',
}
}
)
response = requests.post(
'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderPage',
headers=headers,
json=json_data,
)
for i in response.json()["data"]["dataList"]:
print(i)
# Note: json_data will not be serialized by requests
# exactly as it was in the original request.
# data = '{"filterCoinIdList":[2],"filterContractIdList":[],"filterOrderStatusList":["CANCELED","FILLED"],"filterOrderTypeList":[],"languageType":1,"limit":20,"sign":"SIGN","timeZone":"string"}'
# response = requests.post(
# 'https://http-gateway2.ngsvsfx.cn/api/v1/private/order/v2/getHistoryOrderPage',
# headers=headers,
# data=data,
# )

View File

@@ -1,27 +1,68 @@
import time
import datetime
import requests
# 获取当前时间戳
current_timestamp = time.time()
# 将当前时间戳转换为 datetime 对象
current_datetime = datetime.datetime.fromtimestamp(current_timestamp)
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'cache-control': 'no-cache',
'dnt': '1',
'origin': 'https://www.websea.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.websea.com/',
'sec-ch-ua': '"Chromium";v="142", "Microsoft Edge";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0',
}
# 计算距离当前时间最近的整点或 30 分时刻
if current_datetime.minute < 30:
target_datetime = current_datetime.replace(minute=0, second=0, microsecond=0)
# # 获取当前日期UTC
# today = datetime.datetime.now(datetime.timezone.utc).date()
#
# # 当天开始和结束时间
# start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=datetime.timezone.utc)
# end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=datetime.timezone.utc)
# 获取今天日期(本地时间,中国时区 UTC+8
tz = datetime.timezone(datetime.timedelta(hours=8))
today = datetime.datetime.now(tz).date()
# 当天开始时间(北京时间 00:00
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
# 当天结束时间(北京时间 23:59:59.999999
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = {
'symbol': 'BTC-USDT',
'period': '30min',
'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()),
}
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers)
data = response.json()["result"]["data"]
if not data:
print("没有获取到数据")
else:
target_datetime = current_datetime.replace(minute=30, second=0, microsecond=0)
# 当前价格 = 最新一条 K 线的 close
current_price = float(data[0]['close'])
# 将目标 datetime 对象转换为时间戳
target_timestamp = target_datetime.timestamp()
# 当天最高价 = 当天所有 K 线的 high
today_high = max(float(item['high']) for item in data)
# 当天最低价 = 当天所有 K 线的 low
today_low = min(float(item['low']) for item in data)
print(f"当前时间戳: {current_timestamp}")
print(f"目标时间戳: {target_timestamp}")
# 24 小时涨幅计算(用最新一条 close 与 24 小时前的价格对比)
# 如果接口只能获取当天数据,可能不足 24 小时,可根据需求扩展
# 假设获取过去 24 小时数据,使用第一条 K 线的 open 作为 24 小时前价格
price_24h_ago = float(data[-1]['open'])
change_24h = (current_price - price_24h_ago) / price_24h_ago * 100
# 进行时间戳比对
if current_timestamp == target_timestamp:
print("前时间就是目标时间。")
elif current_timestamp < target_timestamp:
print(f"当前时间早于目标时间,还需等待 {target_timestamp - current_timestamp} 秒。")
else:
print(f"当前时间晚于目标时间,已经过去了 {current_timestamp - target_timestamp} 秒。")
print(f"当前价格: {current_price}")
print(f"24小时涨幅: {change_24h:.2f}%")
print(f"天最高价: {today_high}")
print(f"当天最低价: {today_low}")

View File

@@ -1,227 +1,15 @@
import datetime
import time
import hmac
import hashlib
import base64
import json
import re
import requests
from bs4 import BeautifulSoup
from loguru import logger
from tqdm import tqdm
from DrissionPage import ChromiumPage, ChromiumOptions
url = "http://8.137.99.82:9005/api/send_click?token=fegergauiernguie&phone=8613661496481"
# ==============================================================
# ✅ 通用工具函数
# ==============================================================
res = requests.post(
url=url,
json={
"phone": "8613661496481",
"bot_name": "ergggreef",
"datas": [
{"send_message": ["grgegg"], "click_button": [""], },
]
def is_bullish(candle):
return float(candle['close']) > float(candle['open'])
def is_bearish(candle):
return float(candle['close']) < float(candle['open'])
def current_time():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ==============================================================
# ✅ 钉钉通知模块
# ==============================================================
class DingTalkNotifier:
def __init__(self, webhook_url: str, secret: str):
self.webhook_url = webhook_url
self.secret = secret
def _get_signature(self, timestamp):
string_to_sign = f'{timestamp}\n{self.secret}'
hmac_code = hmac.new(self.secret.encode('utf-8'), string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256).digest()
return base64.b64encode(hmac_code).decode('utf-8')
def send(self, content: str):
timestamp = str(round(time.time() * 1000))
sign = self._get_signature(timestamp)
url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
msg = {"msgtype": "text", "text": {"content": content}}
try:
res = requests.post(url, json=msg, timeout=10)
res.raise_for_status()
logger.success(f"[钉钉通知成功] {content}")
except Exception as e:
logger.error(f"[钉钉通知失败] {e}")
# ==============================================================
# ✅ 浏览器与页面交互模块
# ==============================================================
class WeexBrowser:
def __init__(self, tge_id: int, api_key: str, base_url="http://127.0.0.1:50326"):
self.tge_id = tge_id
self.api_key = api_key
self.base_url = base_url
self.page = None
self.port = None
def start_browser(self):
try:
res = requests.post(
f"{self.base_url}/api/browser/start",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"envId": self.tge_id}
).json()
self.port = res["data"]["port"]
logger.info(f"浏览器已启动,端口:{self.port}")
return True
except Exception as e:
logger.error(f"启动浏览器失败: {e}")
return False
def connect(self, url):
try:
opts = ChromiumOptions()
opts.set_local_port(self.port)
self.page = ChromiumPage(addr_or_opts=opts)
self.page.set.window.max()
self.page.get(url=url)
logger.success("浏览器接管成功!")
return True
except Exception as e:
logger.error(f"浏览器接管失败: {e}")
return False
def click(self, xpath):
try:
ele = self.page.ele(f'x://{xpath}')
if ele:
ele.scroll.to_see(center=True)
ele.click()
return True
except Exception as e:
logger.error(f"点击元素失败:{xpath},原因:{e}")
return False
def contains_text(self, text):
soup = BeautifulSoup(self.page.html, "html.parser")
cleaned_target = re.sub(r'\s', '', text)
for tag in soup.find_all():
if cleaned_target in re.sub(r'\s', '', tag.get_text()):
return True
return False
# ==============================================================
# ✅ K线策略模块
# ==============================================================
class KlineStrategy:
@staticmethod
def engulf_signal(prev, curr):
"""包住形态信号"""
p_open, p_close = float(prev['open']), float(prev['close'])
c_open, c_close = float(curr['open']), float(curr['close'])
if is_bullish(curr) and is_bearish(prev) and c_open <= p_close and c_close >= p_open:
return "long"
if is_bearish(curr) and is_bullish(prev) and c_open >= p_close and c_close <= p_open:
return "short"
return None
# ==============================================================
# ✅ 主交易逻辑模块
# ==============================================================
class WeexTrader:
def __init__(self, tge_id):
self.browser = WeexBrowser(tge_id=tge_id, api_key="asp_174003986c9b0799677c5b2c1adb76e402735d753bc91a91")
self.notifier = DingTalkNotifier(
webhook_url="https://oapi.dingtalk.com/robot/send?access_token=e2fafb3f46866d50fe52cbb29650ba9ef1cbc97915dde238192f04c906fe4125",
secret="SEC5f320e72d7a4eaca540c66c3d09edff2f74936517390dee99ece6dd1b3611998"
)
self.direction = 0 # -1:空 0:无 1:多
self.page = None
def run(self):
if not self.browser.start_browser() or not self.browser.connect(
"https://www.weeaxs.site/zh-CN/futures/demo-trading/ETH-SUSDT"):
return
page = self.browser.page
page.listen.start("public/quote/v1/getKlineV2")
pbar = tqdm(total=30, desc="监控中", ncols=80)
while True:
try:
tm = time.localtime().tm_min
pbar.n = tm % 30
pbar.refresh()
# 每30分钟判断信号
if tm in [0, 1, 2, 30, 31, 32]:
self._check_position(page)
klines = self._get_kline_data(page)
if not klines:
continue
prev, curr = klines[-2:]
signal = KlineStrategy.engulf_signal(prev, curr)
if signal:
self._handle_signal(signal)
else:
logger.info("无信号触发")
time.sleep(10)
except Exception as e:
logger.exception(f"主循环异常: {e}")
time.sleep(15)
def _check_position(self, page):
if self.browser.contains_text("ETH/SUSDT多"):
self.direction = 1
elif self.browser.contains_text("ETH/SUSDT空"):
self.direction = -1
else:
self.direction = 0
def _get_kline_data(self, page):
try:
page.refresh()
res = page.listen.wait(timeout=10)
if not res: return None
return sorted([
{'id': int(d[4]), 'open': d[3], 'high': d[1], 'low': d[2], 'close': d[0]}
for d in res.response.body['data']["dataList"]
], key=lambda x: x['id'])
except Exception as e:
logger.error(f"获取K线失败{e}")
return None
def _handle_signal(self, signal):
if signal == "long" and self.direction <= 0:
self._trade("买入开多", 1)
elif signal == "short" and self.direction >= 0:
self._trade("卖出开空", -1)
def _trade(self, action_text, new_dir):
logger.success(f"{current_time()}:执行 {action_text}")
self.notifier.send(f"{current_time()} 执行 {action_text}")
self.browser.click(f'*[contains(text(), "闪电平仓")]')
time.sleep(2)
self.browser.click(f'*[contains(text(), "{action_text}")]')
self.direction = new_dir
# ==============================================================
# ✅ 启动程序
# ==============================================================
if __name__ == '__main__':
trader = WeexTrader(tge_id=146473)
trader.run()
}
)