Files
lm_code/telegram/bot_test.py
2025-11-12 16:10:43 +08:00

136 lines
3.9 KiB
Python

from telethon import TelegramClient, events
import sqlite3
from datetime import date
import os
# ========== 配置区 ==========
API_ID = 2040 # ← 替换为你的 Telegram API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # ← 替换为你的 API HASH
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" # ← 替换为你的 Bot Token
DB_PATH = "sign.db" # 数据库文件
SIGN_POINTS = 10 # 每次签到获得的积分
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT
)
''')
conn.commit()
conn.close()
def get_user(user_id):
"""获取用户信息"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE user_id=?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def add_or_update_user(user_id, username):
"""如果用户不存在则新增"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
conn.commit()
conn.close()
def can_sign(user_id):
"""判断今天是否可以签到"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
if row and row[0] == today:
return False
return True
def add_points(user_id, points):
"""给用户加积分并更新签到日期"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?, last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
"""获取用户当前积分"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 机器人主逻辑 ----------
def main():
# 初始化数据库
if not os.path.exists(DB_PATH):
init_db()
proxy = {
'proxy_type': "socks5", # 或 'socks4',具体看你的代理类型
'addr': "202.155.144.102", # 代理服务器地址
'port': 31102, # 代理服务器端口
'username': "SyNuejCtrQ", # 如果有用户名,填写
'password': "MH8ioL7EXf" # 如果有密码,填写
}
# proxy = ("socks5", "202.155.144.102", 31102, True, "SyNuejCtrQ", "MH8ioL7EXf")
bot = TelegramClient(
'haha',
api_id=API_ID,
api_hash=API_HASH,
proxy=proxy
).start(bot_token=BOT_TOKEN)
@bot.on(events.NewMessage(pattern='^(签到|/sign)$'))
async def sign_handler(event):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
# 注册用户
add_or_update_user(user_id, username)
# 检查是否已签到
if not can_sign(user_id):
await event.reply(f"🌞 {username},你今天已经签到过啦!")
return
# 签到成功,加积分
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
print("🤖 签到机器人已启动,等待消息中...")
bot.run_until_disconnected()
if __name__ == "__main__":
main()