Files
lm_code/telegram/bot-v3.0.py
2025-11-12 16:10:43 +08:00

300 lines
8.9 KiB
Python

from nbformat.v4 import new_raw_cell
from telethon import TelegramClient, events, Button
import sqlite3
from datetime import date
import os
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
ALLOWED_GROUPS = [-1003238845008] # 只允许的群聊
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# ============================
# ---------- 命令字典 ----------
COMMANDS = {
'/sign': '签到,每天签到一次,获取积分',
'签到': '签到,每天签到一次,获取积分',
'/daily_rank': '查看今日发言排行',
'/my_invites': '查看你邀请的人数',
'/help': '显示所有可用命令及说明'
}
def get_command_list_text():
msg = "🤖 机器人命令列表:\n"
for cmd, desc in COMMANDS.items():
msg += f"{cmd}{desc}\n"
return msg
# ---------- 数据库操作 ----------
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER
PRIMARY
KEY,
username
TEXT,
points
INTEGER
DEFAULT
0,
last_sign_date
TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id
INTEGER,
date
TEXT,
count
INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id
INTEGER,
invitee_id
INTEGER
PRIMARY
KEY
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username=? WHERE user_id=?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
return False if row and row[0] == today else True
def add_points(user_id, points):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?,
last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 发言统计 ----------
def add_message(user_id):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date)
DO
UPDATE SET count = count + 1
""", (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking():
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
""", (today,))
results = cursor.fetchall()
conn.close()
return results
# ---------- 邀请统计 ----------
def add_invite(inviter_id, invitee_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO invites (inviter_id, invitee_id) VALUES (?, ?)", (inviter_id, invitee_id))
conn.commit()
conn.close()
def get_invite_count(inviter_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id=?", (inviter_id,))
count = cursor.fetchone()[0]
conn.close()
return count
# ---------- 统一命令处理 ----------
async def handle_command(event, cmd):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
reply_text = ""
if cmd in data["签到"]:
if not can_sign(user_id):
reply_text = f"🌞 @{username},你今天已经签到过啦!"
else:
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
reply_text = f"✅ @{username} 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}"
elif cmd in data["发言"]:
ranking = get_daily_message_ranking()
if not ranking:
reply_text = "📊 今日还没有人发言哦~"
else:
reply_text = "📊 今日发言排行:\n"
for i, (uname, count) in enumerate(ranking, 1):
reply_text += f"{i}. {uname}: {count}\n"
elif cmd in data["邀请"]:
count = get_invite_count(user_id)
reply_text = f"👥 @{username},你邀请了 {count} 人。"
elif cmd == '/help':
reply_text = get_command_list_text()
else:
reply_text = f"⚠️ 未知命令:{cmd}"
# 每次命令回复附带命令列表
reply_text += "\n\n" + get_command_list_text()
await event.reply(reply_text)
data = {
"签到": ['/sign', '签到', "haha"],
"发言": ["/daily_rank"],
"邀请": ['/my_invites'],
}
# ---------- Bot 主逻辑 ----------
async def main():
init_db()
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
# 新用户加入欢迎
@bot.on(events.ChatAction)
async def welcome_new_user(event):
if event.user_added or event.user_joined:
for user in event.users:
username = user.username or user.first_name or "新成员"
await event.reply(f"欢迎 @{username} 加入群聊!\n{get_command_list_text()}")
# 普通消息处理
@bot.on(events.NewMessage)
async def message_handler(event):
if event.chat_id not in ALLOWED_GROUPS:
return
text = event.raw_text.strip()
user = await event.get_sender()
user_id = user.id
# 普通消息计入发言
for i in data:
if text in data[i]:
# 去掉 @botusername 后缀
cmd = text.split('@')[0]
await handle_command(event, cmd)
break
else:
add_message(user_id)
# if not text.startswith('/'):
# add_message(user_id)
# else:
# # 去掉 @botusername 后缀
# cmd = text.split('@')[0]
# await handle_command(event, cmd)
# 按钮命令发送
@bot.on(events.NewMessage(pattern='/commands'))
async def send_buttons(event):
buttons = [
[Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")],
[Button.inline("我的邀请", b"/my_invites"), Button.inline("帮助", b"/help")]
]
await event.reply("请选择命令:", buttons=buttons)
# 按钮点击处理
@bot.on(events.CallbackQuery)
async def callback_handler(event):
cmd = event.data.decode() # 获取按钮绑定的命令
print(cmd)
await handle_command(event, cmd)
await event.answer() # 防止按钮 loading 一直转
print("🤖 机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(main())