# bot.py import random import asyncio import datetime import sqlite3 from zoneinfo import ZoneInfo import requests from telethon import TelegramClient, events, Button, types # ========== 配置区 ========== API_ID = 2040 API_HASH = "b18441a1ff607e10a989891a5462e627" BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" # 数据库文件 DB_PATH = "sign.db" # 签到积分、邀请积分、发言奖励 SIGN_POINTS = 10 INVITE_POINTS = 10 DAILY_SPEAK_TOP_N = 10 DAILY_SPEAK_REWARD = 10 # 允许机器人运行的群组(只处理这些群的消息) ALLOWED_GROUPS = [-1003238845008] # 时区(结算规则用) LOCAL_TZ = ZoneInfo("America/New_York") # 代理(可选) PROXY = { 'proxy_type': "socks5", 'addr': "202.155.144.102", 'port': 31102, 'username': "SyNuejCtrQ", 'password': "MH8ioL7EXf" } # 邀请链接 + 币种链接示例(会出现在每条币价消息) INVITE_LINK = "https://www.websea.my/en/signup?key=77346588" crypto_currencies = { "BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"}, "ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"}, "SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"}, "BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"}, "WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"} } # 主菜单按钮 main_buttons = [ [Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")], [Button.inline("我的邀请", b"/my_invites"), Button.inline("币价", b"/btc")], [Button.inline("帮助", b"/help")] ] COMMANDS_TEXT = { '/sign': '签到,每天签到一次,获取积分', '签到': '签到,每天签到一次,获取积分', '/daily_rank': '查看今日发言排行', '/my_invites': '查看你邀请的人数', '/btc': '获取 币价 菜单', '/help': '显示所有可用命令及说明' } # ============================ # ---------- 数据库操作 ---------- def get_conn(): # 每次操作创建连接,避免 sqlite 的线程/并发问题 return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) def init_db(): conn = get_conn() cursor = conn.cursor() # 用户表:保存积分、上次签到日期、累计邀请数 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, points INTEGER DEFAULT 0, last_sign_date TEXT, total_invites INTEGER DEFAULT 0 ) ''') # 每日发言表,复合主键 (user_id, date) cursor.execute(''' CREATE TABLE IF NOT EXISTS daily_messages ( user_id INTEGER, date TEXT, count INTEGER DEFAULT 0, PRIMARY KEY (user_id, date) ) ''') # invites 表保存邀请关系(inviter_id, invitee_id) cursor.execute(''' CREATE TABLE IF NOT EXISTS invites ( inviter_id INTEGER, invitee_id INTEGER, created_at TEXT, PRIMARY KEY (inviter_id, invitee_id) ) ''') # 日志表:记录每日结算、奖励发放记录(便于审计) cursor.execute(''' CREATE TABLE IF NOT EXISTS points_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, points INTEGER, reason TEXT, created_at TEXT ) ''') conn.commit() conn.close() def add_or_update_user(user_id, username): conn = get_conn() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id)) conn.commit() conn.close() def can_sign(user_id): conn = get_conn() cursor = conn.cursor() cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() today = datetime.datetime.now(LOCAL_TZ).date().isoformat() return False if row and row[0] == today else True def add_points_immediate(user_id, points, reason=""): """立即增加 points 并记录日志""" now = datetime.datetime.now(LOCAL_TZ).isoformat() conn = get_conn() cursor = conn.cursor() cursor.execute("UPDATE users SET points = points + ? WHERE user_id = ?", (points, user_id)) cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)", (user_id, points, reason, now)) conn.commit() conn.close() def get_points(user_id): conn = get_conn() cursor = conn.cursor() cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() return row[0] if row else 0 def set_last_sign(user_id): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = get_conn() cursor = conn.cursor() cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id)) conn.commit() conn.close() def inc_invite(inviter_id, invitee_id): """记录邀请关系并奖励 inviter(只奖励第一次记录)""" now = datetime.datetime.now(LOCAL_TZ).isoformat() conn = get_conn() cursor = conn.cursor() try: cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)", (inviter_id, invitee_id, now)) # 增加 inviter 的 total_invites cursor.execute("UPDATE users SET total_invites = total_invites + 1 WHERE user_id = ?", (inviter_id,)) # 立即奖励积分 add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") except sqlite3.IntegrityError: # 已经存在,不重复奖励 pass finally: conn.commit() conn.close() def get_invite_count(user_id): conn = get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,)) count = cursor.fetchone()[0] conn.close() return count # ---------- 发言统计 ---------- def add_message_count(user_id): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = get_conn() cursor = conn.cursor() cursor.execute(''' INSERT INTO daily_messages (user_id, date, count) VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1 ''', (user_id, today)) conn.commit() conn.close() def get_daily_message_ranking(date_iso=None, limit=None): date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = get_conn() cursor = conn.cursor() cursor.execute(''' SELECT u.user_id, u.username, d.count FROM daily_messages d JOIN users u ON d.user_id = u.user_id WHERE d.date = ? ORDER BY d.count DESC {} '''.format(f"LIMIT {limit}" if limit else ""), (date_iso,)) rows = cursor.fetchall() conn.close() return rows def reset_daily_messages(date_iso=None): """删除指定日期的 daily_messages(结算后调用)""" date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = get_conn() cursor = conn.cursor() cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,)) conn.commit() conn.close() # ---------- 币价获取 ---------- def get_price(symbol: str): """ 使用外部 API 获取当日 K 线数据并返回当前价、24h 变化、今日高、今日低 这里调用的是 websea 的接口(和你原来一致) """ headers = {'user-agent': 'Mozilla/5.0'} tz = LOCAL_TZ today = datetime.datetime.now(tz).date() start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz) end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz) params = { 'symbol': symbol, 'period': '30min', 'start': int(start_of_day.timestamp()), 'end': int(end_of_day.timestamp()), } try: response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, timeout=10) data = response.json().get("result", {}).get("data", []) if not data: return 0.0, 0.0, 0.0, 0.0 current_price = float(data[0]['close']) today_high = max(float(i['high']) for i in data) today_low = min(float(i['low']) for i in data) price_24h_ago = float(data[-1]['open']) change_24h = (current_price - price_24h_ago) / price_24h_ago * 100 if price_24h_ago != 0 else 0.0 return current_price, change_24h, today_high, today_low except Exception as e: print("获取币价失败:", e) return 0.0, 0.0, 0.0, 0.0 def get_price_msg(symbol: str): current_price, change_24h, today_high, today_low = get_price(symbol) name = symbol.split('-')[0] url = crypto_currencies.get(name, {}).get("url", "#") msg = ( f"📊 {name}/USDT 实时行情\n\n" f"💰 当前价格:{current_price:.4f} USDT\n" f"📈 24小时涨跌幅:{change_24h:.2f}%\n" f"⬆️ 最高价:{today_high:.4f} USDT\n" f"⬇️ 最低价:{today_low:.4f} USDT\n\n" f"🔗 交易链接:{url}\n" f"🎁 注册邀请:{INVITE_LINK}\n\n" f"(此消息每小时自动更新)" ) return msg def get_crypto_buttons(): """生成币种选择按钮""" buttons = [] row = [] for name in crypto_currencies.keys(): row.append(Button.inline(name, f"price_{name}".encode())) if len(row) == 3: buttons.append(row) row = [] if row: buttons.append(row) buttons.append([Button.inline("返回菜单", b"/help")]) return buttons # ---------- 机器人命令处理 ---------- def get_command_list_text(): msg = "🤖 机器人命令列表:\n" for cmd, desc in COMMANDS_TEXT.items(): msg += f"{cmd} → {desc}\n" msg += f"\n签到奖励:每次 {SIGN_POINTS} 积分,100积分可兑换 5 USDT 合约手续费代金券。\n发言奖励:每日排行前 {DAILY_SPEAK_TOP_N} 每人 {DAILY_SPEAK_REWARD} 积分。\n邀请奖励:每邀请一人 {INVITE_POINTS} 积分。\n每日结算时间:每天 12:00(America/New_York 时区)。" return msg async def handle_command(event, cmd): user = await event.get_sender() user_id = user.id username = user.username or user.first_name or "未知用户" add_or_update_user(user_id, username) reply = "" if cmd in ["/sign", "签到", "/tanda"]: if not can_sign(user_id): reply = f"🌞 @{username},你今天已经签到过啦!" else: add_points_immediate(user_id, SIGN_POINTS, reason="sign") set_last_sign(user_id) total = get_points(user_id) reply = f"✅ @{username} 签到成功!获得 {SIGN_POINTS} 积分\n当前总积分:{total}\n\n100积分可兑换 5USDT 合约手续费代金券。" elif cmd in ["/daily_rank", "发言", "/kedudukan_harian"]: ranking = get_daily_message_ranking() if not ranking: reply = "📊 今日还没有人发言哦~" else: reply_lines = [] for i, (uid, uname, cnt) in enumerate(ranking): reply_lines.append(f"{i + 1}. @{uname or uid}: {cnt} 条") reply = "📊 今日发言排行:\n" + "\n".join(reply_lines) elif cmd in ["/my_invites", "邀请", "/daily_position"]: count = get_invite_count(user_id) total_pts = get_points(user_id) reply = f"👥 @{username},你邀请了 {count} 人。\n当前积分:{total_pts}" elif cmd in ["/btc", "币价"]: await event.respond("请选择要查看的币种:", buttons=get_crypto_buttons()) return else: await event.reply(get_command_list_text(), buttons=main_buttons) return await event.reply(reply) async def handle_price_command(event, symbol_name: str): symbol = crypto_currencies[symbol_name]["type"] msg = get_price_msg(symbol) # event.edit 用于 CallbackQuery,event.respond/edit 都可 try: await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False) except Exception: await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False) # ---------- 定时任务:每小时推送随机币价 ---------- async def send_price_periodically(bot, chat_id): await asyncio.sleep(5) # 启动缓冲 while True: try: random_key = random.choice(list(crypto_currencies.keys())) msg = get_price_msg(crypto_currencies[random_key]["type"]) # 注意:如果群组比较多,此处简单发送。可优化为只在特定时间或轮询。 await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False) except Exception as e: print(f"发送价格失败: {e}") # 等待下一小时整点(保持每小时推送一次) now = datetime.datetime.now(LOCAL_TZ) next_hour = (now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)) wait_seconds = (next_hour - now).total_seconds() await asyncio.sleep(wait_seconds) # ---------- 定时任务:每天结算(每天 12:00 America/New_York) ---------- async def daily_settlement_task(bot): """每天在 LOCAL_TZ 12:00 执行: - 计算当日发言排名,给前 N 名每人奖励 - 记录日志并清空当天的发言统计 """ while True: now = datetime.datetime.now(LOCAL_TZ) # 目标时间:今天 12:00(如果已过则到明天) target = now.replace(hour=12, minute=0, second=0, microsecond=0) if now >= target: target = target + datetime.timedelta(days=1) wait_seconds = (target - now).total_seconds() print(f"[settlement] 下次结算(12:00)在 {target.isoformat()},等待 {int(wait_seconds)} s") await asyncio.sleep(wait_seconds) # 执行结算 date_iso = (target - datetime.timedelta(days=1)).date().isoformat() # 结算前一天(也可结算今天到目前为止) print(f"[settlement] 执行结算,目标日期:{date_iso}") ranking = get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) if ranking: for i, (uid, uname, cnt) in enumerate(ranking): try: add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i+1}") except Exception as e: print(f"[settlement] 给用户 {uid} 发奖励失败: {e}") # 给管理员或群组发送结算通知(可选) try: summary_lines = [f"📅 {date_iso} 发言排行榜结算结果:"] for i, (uid, uname, cnt) in enumerate(ranking): summary_lines.append(f"{i+1}. @{uname or uid} — {cnt} 条 — 获得 {DAILY_SPEAK_REWARD} 积分") summary = "\n".join(summary_lines) for gid in ALLOWED_GROUPS: await bot.send_message(gid, summary) except Exception as e: print("[settlement] 发送结算通知失败:", e) else: print("[settlement] 当日无发言记录,跳过发言奖励") # 清空那天的统计 try: reset_daily_messages(date_iso=date_iso) print(f"[settlement] 清理完成:{date_iso}") except Exception as e: print("[settlement] 清理 daily_messages 失败:", e) # ---------- 主逻辑 ---------- async def main(): init_db() # 使用代理时传 proxy=PROXY,若不需要代理则移除该参数 bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY) await bot.start(bot_token=BOT_TOKEN) # 启动每小时发送币价任务(对每个白名单群) for gid in ALLOWED_GROUPS: asyncio.create_task(send_price_periodically(bot, gid)) # 启动每日结算任务(12:00 America/New_York) asyncio.create_task(daily_settlement_task(bot)) @bot.on(events.ChatAction) async def welcome(event: events.ChatAction.Event): # 欢迎新成员并尝试记录邀请者(若存在) if event.user_added or event.user_joined: for user in event.users: name = user.username or user.first_name or "新成员" # 尝试获得 inviter id(不同版本的 Telethon 结构不完全一致) inviter_id = None try: # 若是邀请添加,action_message 的 from_id 通常是 inviter if hasattr(event, 'action_message') and event.action_message: maybe = getattr(event.action_message, "from_id", None) if isinstance(maybe, types.PeerUser): inviter_id = maybe.user_id elif isinstance(maybe, (int,)): inviter_id = maybe # Telethon 也可能直接暴露 actor_id if getattr(event, "actor_id", None): inviter_id = event.actor_id except Exception: inviter_id = None # 如果 inviter_id 存在且与新成员不同,则记录邀请并奖励 try: if inviter_id and inviter_id != user.id: # 确保 inviter 存在 users 表 add_or_update_user(inviter_id, "unknown") inc_invite(inviter_id, user.id) except Exception as e: print("记录邀请失败:", e) # 更新/插入新成员到 users 表 add_or_update_user(user.id, user.username or user.first_name or "新成员") # 欢迎消息(发送到群内) try: await event.reply(f"欢迎 @{name} 加入群聊!\n{get_command_list_text()}") except Exception: pass @bot.on(events.NewMessage) async def on_message(event): # 只处理白名单群组 if event.chat_id not in ALLOWED_GROUPS: return # 普通消息处理:命令或记录发言计数 text = (event.raw_text or "").strip() if not text: return # 去掉命令中的 @botname(如 /sign@mybot) if "@" in text and text.startswith("/"): text = text.split("@")[0] # 小写匹配命令(命令与 data 映射) lowered = text.lower() # 命令集合 cmds_map = { "签到": ["/sign", "签到", "/tanda"], "发言": ["/daily_rank", "发言", "/kedudukan_harian"], "邀请": ["/my_invites", "邀请", "/daily_position"], "币价": ["/btc", "币价"] } for group_cmds in cmds_map.values(): if lowered in [c.lower() for c in group_cmds]: await handle_command(event, lowered) return # 不是命令则记录发言次数 sender = await event.get_sender() if sender: add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户") add_message_count(sender.id) @bot.on(events.CallbackQuery) async def callback(event): await event.answer("正在处理...", alert=False) cmd = event.data.decode() if "@" in cmd and cmd.startswith("/"): cmd = cmd.split("@")[0] if cmd.startswith("price_"): symbol = cmd.split("_", 1)[1] await handle_price_command(event, symbol) else: await handle_command(event, cmd) print("🤖 机器人已启动,等待群聊消息...") await bot.run_until_disconnected() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("机器人已停止")