import asyncio import datetime import sqlite3 import random import requests from zoneinfo import ZoneInfo from typing import Optional, Tuple, List from telethon import TelegramClient, events, Button, types from telethon.tl.functions.bots import SetBotCommandsRequest from telethon.tl.types import BotCommand, BotCommandScopeDefault # ========== 配置区 ========== API_ID = 2040 API_HASH = "b18441a1ff607e10a989891a5462e627" BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" DB_PATH = "sign.db" # 积分配置 SIGN_POINTS = 10 INVITE_POINTS = 10 DAILY_SPEAK_TOP_N = 10 DAILY_SPEAK_REWARD = 10 # 允许机器人运行的群组(只处理这些群的消息) ALLOWED_GROUPS = [-1003494480544] # 时区(结算规则用) LOCAL_TZ = ZoneInfo("America/New_York") # 代理(可选) PROXY = { 'proxy_type': "socks5", 'addr': "202.155.144.102", 'port': 31102, 'username': "SyNuejCtrQ", 'password': "MH8ioL7EXf" } INVITE_LINK = "https://www.websea.my/en/signup?key=77346588" crypto_currencies = { "BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"}, "ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"}, "SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"}, "BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"}, "WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"} } # ---------- 命令解析工具函数 ---------- def normalize_command(cmd: str) -> str: """ 标准化命令:去除 @botname,转换为小写,去除前后空格 例如: /sign@docekrsebot -> /sign 支持多种格式: /sign@botname, /sign@BOTNAME, /SIGN@botname 等 """ if not cmd: return "" cmd = cmd.strip() if "@" in cmd: cmd = cmd.split("@")[0].strip() return cmd.lower().strip() def command_definition(display: str, description: str, aliases: Optional[List[str]] = None) -> dict: aliases = aliases or [] return { "display": display, "description": description, "aliases": list(set([display] + aliases)), } COMMAND_DEFINITIONS = { "sign": command_definition( "/sign", "Daily check-in, available once per day to earn points", aliases=["sign", "/tanda"], ), "daily_rank": command_definition( "/daily_rank", "View today's message activity ranking", aliases=["rankings", "/kedudukan_harian"], ), "my_invites": command_definition( "/my_invites", "View the number of people you have invited", aliases=["invitation", "/daily_position"], ), "points": command_definition( "/points", "View your current points balance", aliases=["/balance", "/积分", "points"], ), "btc": command_definition( "/btc", "Get the Bitcoin price menu", aliases=["coin price"], ), "help": command_definition( "/help", "Show all available commands and descriptions", aliases=["help"], ), } COMMAND_ORDER = ["sign", "daily_rank", "my_invites", "points", "btc", "help"] def build_alias_map() -> dict: alias_map = {} for key, info in COMMAND_DEFINITIONS.items(): for alias in info["aliases"]: alias_map[normalize_command(alias)] = key alias_map[normalize_command(key)] = key return alias_map COMMAND_ALIAS_MAP = build_alias_map() def resolve_command(cmd: str) -> Optional[str]: return COMMAND_ALIAS_MAP.get(normalize_command(cmd)) def command_payload(command_key: str) -> bytes: return COMMAND_DEFINITIONS[command_key]["display"].encode() main_buttons = [ [Button.inline("Sign in", command_payload("sign")), Button.inline("Today's Top Speakers", command_payload("daily_rank"))], [Button.inline("My invitation", command_payload("my_invites")), Button.inline("coin price", command_payload("btc"))], [Button.inline("My Points", command_payload("points")), Button.inline("Assistance", command_payload("help"))], ] # ============================ def get_primary_command_display(command_key: str) -> str: return COMMAND_DEFINITIONS[command_key]["display"] # ---------- 全局并发控制 ---------- # 使用 asyncio.Lock 在同一时间避免大量数据库并发写入(与 to_thread 配合) DB_LOCK = asyncio.Lock() # ---------- SQLite 同步底层实现(运行在线程池) ---------- def _get_conn(): # 每次操作创建连接,避免 sqlite 的线程/并发问题 # 不设置 check_same_thread:因为连接会在同一线程(to_thread)中被使用 return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) def _init_db_sync(): conn = _get_conn() cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, points INTEGER DEFAULT 0, last_sign_date TEXT, total_invites INTEGER DEFAULT 0 ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS daily_messages ( user_id INTEGER, date TEXT, count INTEGER DEFAULT 0, PRIMARY KEY ( user_id, date ) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS invites ( inviter_id INTEGER, invitee_id INTEGER, created_at TEXT, PRIMARY KEY ( inviter_id, invitee_id ) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS points_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, points INTEGER, reason TEXT, created_at TEXT ) ''') conn.commit() conn.close() def _add_or_update_user_sync(user_id: int, username: str): conn = _get_conn() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id)) conn.commit() conn.close() def _get_last_sign_date_sync(user_id: int) -> Optional[str]: conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() return row[0] if row else None def _add_points_immediate_sync(user_id: int, points: int, reason: str = ""): now = datetime.datetime.now(LOCAL_TZ).isoformat() conn = _get_conn() cursor = conn.cursor() # 如果用户可能不存在,先插入默认记录(避免 update 失败) cursor.execute("INSERT OR IGNORE INTO users (user_id, username, points) VALUES (?, ?, 0)", (user_id, None)) cursor.execute("UPDATE users SET points = COALESCE(points,0) + ? WHERE user_id = ?", (points, user_id)) cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)", (user_id, points, reason, now)) conn.commit() conn.close() def _get_points_sync(user_id: int) -> int: conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() return row[0] if row else 0 def _set_last_sign_sync(user_id: int): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, None)) cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id)) conn.commit() conn.close() def _insert_invite_sync(inviter_id: int, invitee_id: int) -> bool: """ 插入邀请记录,返回 True 表示插入成功(即第一次邀请) 如果已存在则抛出 sqlite3.IntegrityError 或返回 False """ now = datetime.datetime.now(LOCAL_TZ).isoformat() conn = _get_conn() cursor = conn.cursor() try: cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)", (inviter_id, invitee_id, now)) # 更新邀请计数 cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (inviter_id, None)) cursor.execute("UPDATE users SET total_invites = COALESCE(total_invites,0) + 1 WHERE user_id = ?", (inviter_id,)) conn.commit() return True except sqlite3.IntegrityError: # 已存在 conn.rollback() return False finally: conn.close() def _get_invite_count_sync(user_id: int) -> int: conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,)) count = cursor.fetchone()[0] conn.close() return count def _add_message_count_sync(user_id: int): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() cursor.execute(''' INSERT INTO daily_messages (user_id, date, count) VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1 ''', (user_id, today)) conn.commit() conn.close() def _get_daily_message_ranking_sync(date_iso: Optional[str] = None, limit: Optional[int] = None) -> List[ Tuple[int, Optional[str], int]]: date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() query = ''' SELECT u.user_id, u.username, d.count FROM daily_messages d JOIN users u ON d.user_id = u.user_id WHERE d.date = ? ORDER BY d.count DESC \ ''' if limit: query += f" LIMIT {limit}" cursor.execute(query, (date_iso,)) rows = cursor.fetchall() conn.close() return rows def _reset_daily_messages_sync(date_iso: Optional[str] = None): date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,)) conn.commit() conn.close() # ---------- 异步包装(供 bot 使用) ---------- async def init_db(): # 初始化 DB(放在 to_thread 中) await asyncio.to_thread(_init_db_sync) async def add_or_update_user(user_id: int, username: Optional[str]): username = username or None # use lock to avoid many threads trying to create the file at once async with DB_LOCK: await asyncio.to_thread(_add_or_update_user_sync, user_id, username) async def can_sign(user_id: int) -> bool: async with DB_LOCK: last = await asyncio.to_thread(_get_last_sign_date_sync, user_id) today = datetime.datetime.now(LOCAL_TZ).date().isoformat() return False if last and last == today else True async def add_points_immediate(user_id: int, points: int, reason: str = ""): async with DB_LOCK: await asyncio.to_thread(_add_points_immediate_sync, user_id, points, reason) async def get_points(user_id: int) -> int: async with DB_LOCK: return await asyncio.to_thread(_get_points_sync, user_id) async def set_last_sign(user_id: int): async with DB_LOCK: await asyncio.to_thread(_set_last_sign_sync, user_id) async def inc_invite(inviter_id: int, invitee_id: int): """Record invite relationships and reward the inviter (only first invite).""" if not inviter_id or not invitee_id: print(f"[invite] Invalid inviter or invitee id: inviter={inviter_id}, invitee={invitee_id}") return False if inviter_id == invitee_id: print(f"[invite] Cannot invite yourself: user_id={inviter_id}") return False # 使用锁,保证并发情况下不会重复奖励 async with DB_LOCK: inserted = await asyncio.to_thread(_insert_invite_sync, inviter_id, invitee_id) if inserted: # 插入成功后再给积分(不在 DB_LOCK 内执行,将奖励记录写进 DB) try: await add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") print( f"[invite] Invite recorded and points rewarded: inviter={inviter_id}, invitee={invitee_id}, points={INVITE_POINTS}") return True except Exception as e: print(f"[invite] Failed to reward inviter={inviter_id}: {e}") return False else: print(f"[invite] Invite already exists (duplicate): inviter={inviter_id}, invitee={invitee_id}") return False async def get_invite_count(user_id: int) -> int: async with DB_LOCK: return await asyncio.to_thread(_get_invite_count_sync, user_id) async def add_message_count(user_id: int): async with DB_LOCK: await asyncio.to_thread(_add_message_count_sync, user_id) async def get_daily_message_ranking(date_iso: Optional[str] = None, limit: Optional[int] = None): async with DB_LOCK: return await asyncio.to_thread(_get_daily_message_ranking_sync, date_iso, limit) async def reset_daily_messages(date_iso: Optional[str] = None): async with DB_LOCK: await asyncio.to_thread(_reset_daily_messages_sync, date_iso) # ---------- 币价获取(阻塞 requests -> 放到线程池) ---------- def _get_price_sync(symbol: str) -> Tuple[float, float, float, float]: headers = {'user-agent': 'Mozilla/5.0'} tz = LOCAL_TZ today = datetime.datetime.now(tz).date() start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz) end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz) params = { 'symbol': symbol, 'period': '30min', 'start': int(start_of_day.timestamp()), 'end': int(end_of_day.timestamp()), } try: response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, timeout=10) data = response.json().get("result", {}).get("data", []) if not data: return 0.0, 0.0, 0.0, 0.0 current_price = float(data[0]['close']) today_high = max(float(i['high']) for i in data) today_low = min(float(i['low']) for i in data) price_24h_ago = float(data[-1]['open']) change_24h = (current_price - price_24h_ago) / price_24h_ago * 100 if price_24h_ago != 0 else 0.0 return current_price, change_24h, today_high, today_low except Exception as e: print("[price] Failed to fetch price data:", e) return 0.0, 0.0, 0.0, 0.0 async def get_price(symbol: str): return await asyncio.to_thread(_get_price_sync, symbol) def get_price_msg_from_values(name: str, current_price: float, change_24h: float, today_high: float, today_low: float, url: str): msg = ( f"📊 {name}/USDT Real-time Quotes\n\n" f"💰 Current Price:{current_price:.4f} USDT\n" f"📈 24-hour price change:{change_24h:.2f}%\n" f"⬆️ Highest Price:{today_high:.4f} USDT\n" f"⬇️ Lowest Price:{today_low:.4f} USDT\n\n" f"🔗 Transaction Link:{url}\n" f"🎁 Registration Invitation:{INVITE_LINK}\n\n" f"(This message is automatically updated hourly.)" ) return msg async def get_price_msg(symbol: str): current_price, change_24h, today_high, today_low = await get_price(symbol) name = symbol.split('-')[0] url = crypto_currencies.get(name, {}).get("url", "#") return get_price_msg_from_values(name, current_price, change_24h, today_high, today_low, url) def get_crypto_buttons(): buttons = [] row = [] for name in crypto_currencies.keys(): row.append(Button.inline(name, f"price_{name}".encode())) if len(row) == 3: buttons.append(row) row = [] if row: buttons.append(row) buttons.append([Button.inline("Return to Menu", command_payload("help"))]) return buttons # ---------- 命令辅助 ---------- async def get_sender_identity(event) -> Optional[Tuple[int, str]]: user = await event.get_sender() if user is None: return None username = user.username or user.first_name or "Unknown User" await add_or_update_user(user.id, username) return user.id, username async def build_sign_message(user_id: int, username: str) -> str: if not await can_sign(user_id): return f"🌞 @{username}, you have already checked in today!" await add_points_immediate(user_id, SIGN_POINTS, reason="sign") await set_last_sign(user_id) total = await get_points(user_id) return ( f"✅ @{username} check-in succeed! Earned {SIGN_POINTS} points.\n" f"Current total points: {total}\n\n" f"100 points can be exchanged for a 5 (USDT) $ trial bonus." ) async def build_daily_rank_message() -> str: ranking = await get_daily_message_ranking() if not ranking: return "📊 No one has spoken yet today." lines = [] for idx, (uid, uname, cnt) in enumerate(ranking, start=1): mention = f"@{uname}" if uname else str(uid) lines.append(f"{idx}. {mention}: {cnt} messages") return "📊 Today's speaker leaderboard:\n" + "\n".join(lines) async def build_invite_message(user_id: int, username: str) -> str: count = await get_invite_count(user_id) total_pts = await get_points(user_id) return f"👥 @{username}, you have invited {count} users.\nCurrent points: {total_pts}" async def build_points_message(user_id: int, username: str) -> str: total_pts = await get_points(user_id) invite_count = await get_invite_count(user_id) return ( f"💰 @{username} Points Overview\n\n" f"📊 Current points: {total_pts}\n" f"👥 Invited users: {invite_count}\n\n" f"💡 100 points can be exchanged for a 5 (USDT) $ trial bonus." ) def get_command_list_text() -> str: msg = "🤖 Robot Command List:\n\n" for key in COMMAND_ORDER: info = COMMAND_DEFINITIONS[key] msg += f"`{info['display']}` → {info['description']}\n" msg += ( f"\nSign-in Reward: {SIGN_POINTS} points each time, and 100 points can be exchanged for a 5 (USDT) $ trial bonus.\n" f"Chat Reward: The top {DAILY_SPEAK_TOP_N} users in daily chat activity each receive {DAILY_SPEAK_REWARD} points.\n" f"Invitation Reward: Each invited user grants {INVITE_POINTS} points.\n" f"Daily Settlement Time: Every day at 12:00 PM (America/New_York timezone)." ) return msg async def send_help(event): await event.reply(get_command_list_text(), buttons=main_buttons, parse_mode="md") # ---------- 事件处理(异步) ---------- async def handle_command(event, cmd: str, canonical_cmd: Optional[str] = None): identity = await get_sender_identity(event) if not identity: return user_id, username = identity command_key = canonical_cmd or resolve_command(cmd) if not command_key: await send_help(event) return if command_key == "sign": reply = await build_sign_message(user_id, username) elif command_key == "daily_rank": reply = await build_daily_rank_message() elif command_key == "my_invites": reply = await build_invite_message(user_id, username) elif command_key == "points": reply = await build_points_message(user_id, username) elif command_key == "btc": await event.respond("Please select the currency you wish to view:", buttons=get_crypto_buttons()) return elif command_key == "help": await send_help(event) return else: await send_help(event) return await event.reply(reply) async def handle_price_command(event, symbol_name: str): symbol = crypto_currencies[symbol_name]["type"] msg = await get_price_msg(symbol) try: # CallbackQuery 用 edit 更友好 await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False) except Exception: await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False) # ---------- 定时任务:每小时推送随机币价 ---------- async def send_price_periodically(bot, chat_id): await asyncio.sleep(5) # 启动缓冲 while True: try: random_key = random.choice(list(crypto_currencies.keys())) msg = await get_price_msg(crypto_currencies[random_key]["type"]) await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False) except Exception as e: print(f"[price] Failed to send price update: {e}") # 等待下一小时整点(保持每小时推送一次) now = datetime.datetime.now(LOCAL_TZ) next_hour = (now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=2)) wait_seconds = (next_hour - now).total_seconds() await asyncio.sleep(wait_seconds) # ---------- 定时任务:每天结算(每天 12:00 America/New_York) ---------- async def daily_settlement_task(bot): """ 每天在 LOCAL_TZ 12:00 执行: - 计算当日发言排名,给前 N 名每人奖励 - 记录日志并清空当天的发言统计 """ while True: now = datetime.datetime.now(LOCAL_TZ) # 目标时间:今天 12:00(如果已过则到明天) target = now.replace(hour=12, minute=0, second=0, microsecond=0) if now >= target: target = target + datetime.timedelta(days=1) wait_seconds = (target - now).total_seconds() print(f"[settlement] Next settlement (12:00) at {target.isoformat()}, waiting {int(wait_seconds)} s") await asyncio.sleep(wait_seconds) # 执行结算(结算 target 的前一天) date_iso = (target - datetime.timedelta(days=1)).date().isoformat() print(f"[settlement] Executing settlement, target date: {date_iso}") ranking = await get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) if ranking: for i, (uid, uname, cnt) in enumerate(ranking): try: await add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i + 1}") except Exception as e: print(f"[settlement] For users {uid} Failed to issue rewards: {e}") # 给管理员或群组发送结算通知(可选) try: summary_lines = [f"📅 {date_iso} speaker ranking settlement results:"] for i, (uid, uname, cnt) in enumerate(ranking): display = f"@{uname}" if uname else str(uid) summary_lines.append(f"{i + 1}. {display} — {cnt} messages — awarded {DAILY_SPEAK_REWARD} points") summary = "\n".join(summary_lines) for gid in ALLOWED_GROUPS: await bot.send_message(gid, summary) except Exception as e: print("[settlement] Failed to send settlement notification:", e) else: print("[settlement] No speaking records for the day; skip speaking rewards.") # 清空那天的统计 try: await reset_daily_messages(date_iso=date_iso) print(f"[settlement] Cleared daily messages for {date_iso}") except Exception as e: print("[settlement] Failed to clear daily_messages:", e) # ---------- 主逻辑 ---------- async def main(): await init_db() # 使用代理时传 proxy=PROXY,若不需要代理则移除该参数 # Telethon proxy 格式可能和你的代理不同,请按需调整(或移除) bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY) await bot.start(bot_token=BOT_TOKEN) # 设置 Bot Commands(让用户可以通过命令菜单选择命令,避免自动添加 @botname) try: commands = [ BotCommand( command=COMMAND_DEFINITIONS[key]["display"].lstrip("/"), description=COMMAND_DEFINITIONS[key]["description"], ) for key in COMMAND_ORDER ] await bot( SetBotCommandsRequest( scope=BotCommandScopeDefault(), lang_code="en", commands=commands, ) ) print("[bot] Bot commands set successfully") except Exception as e: print(f"[bot] Failed to set bot commands: {e}") # 启动每小时发送币价任务(对每个白名单群) for gid in ALLOWED_GROUPS: asyncio.create_task(send_price_periodically(bot, gid)) # 启动每日结算任务(12:00 America/New_York) asyncio.create_task(daily_settlement_task(bot)) @bot.on(events.ChatAction) async def welcome(event: events.ChatAction.Event): # 欢迎新成员并尝试记录邀请者(若存在) if not (event.user_added or event.user_joined): return # 尝试获取 inviter id(兼容多种 Telethon 结构) inviter_id = None try: # 优先 actor_id(某些 Telethon 版本和场景) if hasattr(event, "actor_id") and event.actor_id: inviter_id = event.actor_id # 再尝试 action_message.from_id(另一些版本) elif hasattr(event, "action_message") and event.action_message: maybe = getattr(event.action_message, "from_id", None) if isinstance(maybe, types.PeerUser): inviter_id = maybe.user_id elif isinstance(maybe, int): inviter_id = maybe # 在某些情形下 from_id 是 a User 或 MessageEntity,处理兼容性 elif hasattr(maybe, "user_id"): inviter_id = getattr(maybe, "user_id", None) except Exception as e: print(f"[welcome] Failed to obtain inviter id: {e}") inviter_id = None # 对 event.users 中的新成员逐个处理 for user in event.users: try: new_uid = user.id new_name = user.username or user.first_name or "New member" # 保证用户存在于 users 表中 await add_or_update_user(new_uid, new_name) # 如果找到邀请者并且不是自己,则记录邀请 if inviter_id and inviter_id != new_uid: # 确保 inviter 在 users 表中(用户名未知时填 None) await add_or_update_user(inviter_id, None) # 调用 inc_invite 并检查返回值 success = await inc_invite(inviter_id, new_uid) if success: print(f"[welcome] Invite processed: {inviter_id} invited {new_uid}") else: print(f"[welcome] Invite already handled or failed: {inviter_id} -> {new_uid}") elif not inviter_id: print(f"[welcome] Inviter not found, new member {new_uid} likely joined independently") else: print(f"[welcome] Skipping self-invite: {new_uid}") # 欢迎消息 try: await event.reply(f"Welcome @{new_name} to the group!\n{get_command_list_text()}") except Exception: # 某些场景 event.reply 不允许(例如系统消息),忽略 pass except Exception as e: print(f"[welcome] Failed to handle new member: user_id={user.id if user else 'unknown'}, error={e}") @bot.on(events.NewMessage) async def on_message(event): if event.chat_id not in ALLOWED_GROUPS: return text = (event.raw_text or "").strip() if not text: return command_key = resolve_command(text) if command_key: if "@" in text: print(f"[command] Raw: {text} -> Resolved: {get_primary_command_display(command_key)}") await handle_command(event, text, canonical_cmd=command_key) return identity = await get_sender_identity(event) if identity: await add_message_count(identity[0]) @bot.on(events.CallbackQuery) async def callback(event): # 快速响应 UI try: await event.answer("Currently being processed...", alert=False) except Exception: pass cmd = event.data.decode() if event.data else "" normalized_cmd = normalize_command(cmd) if normalized_cmd.startswith("price_"): symbol = normalized_cmd.split("_", 1)[1] await handle_price_command(event, symbol) else: command_key = resolve_command(cmd) if command_key and "@" in cmd: print(f"[callback] Raw: {cmd} -> Resolved: {get_primary_command_display(command_key)}") await handle_command(event, cmd, canonical_cmd=command_key) print("🤖 Bot is running and waiting for group messages...") await bot.run_until_disconnected() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("Bot stopped")