from telethon import TelegramClient, events import sqlite3 from datetime import date import os # ========== 配置区 ========== API_ID = 2040 # ← 替换为你的 Telegram API ID API_HASH = "b18441a1ff607e10a989891a5462e627" # ← 替换为你的 API HASH BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" # ← 替换为你的 Bot Token DB_PATH = "sign.db" # 数据库文件 SIGN_POINTS = 10 # 每次签到获得的积分 # ============================ # ---------- 数据库操作 ---------- def init_db(): """初始化数据库""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, points INTEGER DEFAULT 0, last_sign_date TEXT ) ''') conn.commit() conn.close() def get_user(user_id): """获取用户信息""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE user_id=?", (user_id,)) user = cursor.fetchone() conn.close() return user def add_or_update_user(user_id, username): """如果用户不存在则新增""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) conn.commit() conn.close() def can_sign(user_id): """判断今天是否可以签到""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,)) row = cursor.fetchone() today = date.today().isoformat() conn.close() if row and row[0] == today: return False return True def add_points(user_id, points): """给用户加积分并更新签到日期""" today = date.today().isoformat() conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" UPDATE users SET points = points + ?, last_sign_date = ? WHERE user_id = ? """, (points, today, user_id)) conn.commit() conn.close() def get_points(user_id): """获取用户当前积分""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,)) points = cursor.fetchone() conn.close() return points[0] if points else 0 # ---------- 机器人主逻辑 ---------- def main(): # 初始化数据库 if not os.path.exists(DB_PATH): init_db() proxy = { 'proxy_type': "socks5", # 或 'socks4',具体看你的代理类型 'addr': "202.155.144.102", # 代理服务器地址 'port': 31102, # 代理服务器端口 'username': "SyNuejCtrQ", # 如果有用户名,填写 'password': "MH8ioL7EXf" # 如果有密码,填写 } # proxy = ("socks5", "202.155.144.102", 31102, True, "SyNuejCtrQ", "MH8ioL7EXf") bot = TelegramClient( 'haha', api_id=API_ID, api_hash=API_HASH, proxy=proxy ).start(bot_token=BOT_TOKEN) @bot.on(events.NewMessage(pattern='^(签到|/sign)$')) async def sign_handler(event): user = await event.get_sender() user_id = user.id username = user.username or user.first_name or "未知用户" # 注册用户 add_or_update_user(user_id, username) # 检查是否已签到 if not can_sign(user_id): await event.reply(f"🌞 {username},你今天已经签到过啦!") return # 签到成功,加积分 add_points(user_id, SIGN_POINTS) total = get_points(user_id) await event.reply(f"✅ 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}") print("🤖 签到机器人已启动,等待消息中...") bot.run_until_disconnected() if __name__ == "__main__": main()