import asyncio import datetime import sqlite3 import random import requests from zoneinfo import ZoneInfo from typing import Optional, Tuple, List from telethon import TelegramClient, events, Button, types # ========== 配置区 ========== API_ID = 2040 API_HASH = "b18441a1ff607e10a989891a5462e627" BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" DB_PATH = "sign.db" # 积分配置 SIGN_POINTS = 10 INVITE_POINTS = 10 DAILY_SPEAK_TOP_N = 10 DAILY_SPEAK_REWARD = 10 # 允许机器人运行的群组(只处理这些群的消息) ALLOWED_GROUPS = [-1003238845008] # 时区(结算规则用) LOCAL_TZ = ZoneInfo("America/New_York") # 代理(可选) PROXY = { 'proxy_type': "socks5", 'addr': "202.155.144.102", 'port': 31102, 'username': "SyNuejCtrQ", 'password': "MH8ioL7EXf" } INVITE_LINK = "https://www.websea.my/en/signup?key=77346588" crypto_currencies = { "BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"}, "ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"}, "SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"}, "BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"}, "WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"} } main_buttons = [ [Button.inline("Sign in", b"/sign"), Button.inline("Today's Top Speakers", b"/daily_rank")], [Button.inline("My invitation", b"/my_invites"), Button.inline("coin price", b"/btc")], [Button.inline("Assistance", b"/help")] ] COMMANDS_TEXT = { '/sign': 'Daily check-in, available once per day to earn points', '/daily_rank': 'View today\'s message activity ranking', '/my_invites': 'View the number of people you have invited', '/btc': 'Get the Bitcoin price menu', '/help': 'Show all available commands and descriptions' } # ============================ # ---------- 全局并发控制 ---------- # 使用 asyncio.Lock 在同一时间避免大量数据库并发写入(与 to_thread 配合) DB_LOCK = asyncio.Lock() # ---------- SQLite 同步底层实现(运行在线程池) ---------- def _get_conn(): # 每次操作创建连接,避免 sqlite 的线程/并发问题 # 不设置 check_same_thread:因为连接会在同一线程(to_thread)中被使用 return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) def _init_db_sync(): conn = _get_conn() cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, points INTEGER DEFAULT 0, last_sign_date TEXT, total_invites INTEGER DEFAULT 0 ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS daily_messages ( user_id INTEGER, date TEXT, count INTEGER DEFAULT 0, PRIMARY KEY ( user_id, date ) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS invites ( inviter_id INTEGER, invitee_id INTEGER, created_at TEXT, PRIMARY KEY ( inviter_id, invitee_id ) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS points_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, points INTEGER, reason TEXT, created_at TEXT ) ''') conn.commit() conn.close() def _add_or_update_user_sync(user_id: int, username: str): conn = _get_conn() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id)) conn.commit() conn.close() def _get_last_sign_date_sync(user_id: int) -> Optional[str]: conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() return row[0] if row else None def _add_points_immediate_sync(user_id: int, points: int, reason: str = ""): now = datetime.datetime.now(LOCAL_TZ).isoformat() conn = _get_conn() cursor = conn.cursor() # 如果用户可能不存在,先插入默认记录(避免 update 失败) cursor.execute("INSERT OR IGNORE INTO users (user_id, username, points) VALUES (?, ?, 0)", (user_id, None)) cursor.execute("UPDATE users SET points = COALESCE(points,0) + ? WHERE user_id = ?", (points, user_id)) cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)", (user_id, points, reason, now)) conn.commit() conn.close() def _get_points_sync(user_id: int) -> int: conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() return row[0] if row else 0 def _set_last_sign_sync(user_id: int): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, None)) cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id)) conn.commit() conn.close() def _insert_invite_sync(inviter_id: int, invitee_id: int) -> bool: """ 插入邀请记录,返回 True 表示插入成功(即第一次邀请) 如果已存在则抛出 sqlite3.IntegrityError 或返回 False """ now = datetime.datetime.now(LOCAL_TZ).isoformat() conn = _get_conn() cursor = conn.cursor() try: cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)", (inviter_id, invitee_id, now)) # 更新邀请计数 cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (inviter_id, None)) cursor.execute("UPDATE users SET total_invites = COALESCE(total_invites,0) + 1 WHERE user_id = ?", (inviter_id,)) conn.commit() return True except sqlite3.IntegrityError: # 已存在 conn.rollback() return False finally: conn.close() def _get_invite_count_sync(user_id: int) -> int: conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,)) count = cursor.fetchone()[0] conn.close() return count def _add_message_count_sync(user_id: int): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() cursor.execute(''' INSERT INTO daily_messages (user_id, date, count) VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1 ''', (user_id, today)) conn.commit() conn.close() def _get_daily_message_ranking_sync(date_iso: Optional[str] = None, limit: Optional[int] = None) -> List[ Tuple[int, Optional[str], int]]: date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() query = ''' SELECT u.user_id, u.username, d.count FROM daily_messages d JOIN users u ON d.user_id = u.user_id WHERE d.date = ? ORDER BY d.count DESC \ ''' if limit: query += f" LIMIT {limit}" cursor.execute(query, (date_iso,)) rows = cursor.fetchall() conn.close() return rows def _reset_daily_messages_sync(date_iso: Optional[str] = None): date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() conn = _get_conn() cursor = conn.cursor() cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,)) conn.commit() conn.close() # ---------- 异步包装(供 bot 使用) ---------- async def init_db(): # 初始化 DB(放在 to_thread 中) await asyncio.to_thread(_init_db_sync) async def add_or_update_user(user_id: int, username: Optional[str]): username = username or None # use lock to avoid many threads trying to create the file at once async with DB_LOCK: await asyncio.to_thread(_add_or_update_user_sync, user_id, username) async def can_sign(user_id: int) -> bool: async with DB_LOCK: last = await asyncio.to_thread(_get_last_sign_date_sync, user_id) today = datetime.datetime.now(LOCAL_TZ).date().isoformat() return False if last and last == today else True async def add_points_immediate(user_id: int, points: int, reason: str = ""): async with DB_LOCK: await asyncio.to_thread(_add_points_immediate_sync, user_id, points, reason) async def get_points(user_id: int) -> int: async with DB_LOCK: return await asyncio.to_thread(_get_points_sync, user_id) async def set_last_sign(user_id: int): async with DB_LOCK: await asyncio.to_thread(_set_last_sign_sync, user_id) async def inc_invite(inviter_id: int, invitee_id: int): """记录邀请关系并奖励 inviter(只奖励第一次记录)""" if not inviter_id or not invitee_id: print(f"[invite] 无效的邀请者或受邀者ID: inviter={inviter_id}, invitee={invitee_id}") return False if inviter_id == invitee_id: print(f"[invite] 不能邀请自己: user_id={inviter_id}") return False # 使用锁,保证并发情况下不会重复奖励 async with DB_LOCK: inserted = await asyncio.to_thread(_insert_invite_sync, inviter_id, invitee_id) if inserted: # 插入成功后再给积分(不在 DB_LOCK 内执行,将奖励记录写进 DB) try: await add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") print( f"[invite] 成功记录邀请并奖励积分: inviter={inviter_id}, invitee={invitee_id}, points={INVITE_POINTS}") return True except Exception as e: print(f"[invite] 奖励积分失败: inviter={inviter_id}, error={e}") return False else: print(f"[invite] 邀请记录已存在(重复邀请): inviter={inviter_id}, invitee={invitee_id}") return False async def get_invite_count(user_id: int) -> int: async with DB_LOCK: return await asyncio.to_thread(_get_invite_count_sync, user_id) async def add_message_count(user_id: int): async with DB_LOCK: await asyncio.to_thread(_add_message_count_sync, user_id) async def get_daily_message_ranking(date_iso: Optional[str] = None, limit: Optional[int] = None): async with DB_LOCK: return await asyncio.to_thread(_get_daily_message_ranking_sync, date_iso, limit) async def reset_daily_messages(date_iso: Optional[str] = None): async with DB_LOCK: await asyncio.to_thread(_reset_daily_messages_sync, date_iso) # ---------- 币价获取(阻塞 requests -> 放到线程池) ---------- def _get_price_sync(symbol: str) -> Tuple[float, float, float, float]: headers = {'user-agent': 'Mozilla/5.0'} tz = LOCAL_TZ today = datetime.datetime.now(tz).date() start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz) end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz) params = { 'symbol': symbol, 'period': '30min', 'start': int(start_of_day.timestamp()), 'end': int(end_of_day.timestamp()), } try: response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, timeout=10) data = response.json().get("result", {}).get("data", []) if not data: return 0.0, 0.0, 0.0, 0.0 current_price = float(data[0]['close']) today_high = max(float(i['high']) for i in data) today_low = min(float(i['low']) for i in data) price_24h_ago = float(data[-1]['open']) change_24h = (current_price - price_24h_ago) / price_24h_ago * 100 if price_24h_ago != 0 else 0.0 return current_price, change_24h, today_high, today_low except Exception as e: print("获取币价失败:", e) return 0.0, 0.0, 0.0, 0.0 async def get_price(symbol: str): return await asyncio.to_thread(_get_price_sync, symbol) def get_price_msg_from_values(name: str, current_price: float, change_24h: float, today_high: float, today_low: float, url: str): msg = ( f"📊 {name}/USDT Real-time Quotes\n\n" f"💰 Current Price:{current_price:.4f} USDT\n" f"📈 24-hour price change:{change_24h:.2f}%\n" f"⬆️ Highest Price:{today_high:.4f} USDT\n" f"⬇️ Lowest Price:{today_low:.4f} USDT\n\n" f"🔗 Transaction Link:{url}\n" f"🎁 Registration Invitation:{INVITE_LINK}\n\n" f"(This message is automatically updated hourly.)" ) return msg async def get_price_msg(symbol: str): current_price, change_24h, today_high, today_low = await get_price(symbol) name = symbol.split('-')[0] url = crypto_currencies.get(name, {}).get("url", "#") return get_price_msg_from_values(name, current_price, change_24h, today_high, today_low, url) def get_crypto_buttons(): buttons = [] row = [] for name in crypto_currencies.keys(): row.append(Button.inline(name, f"price_{name}".encode())) if len(row) == 3: buttons.append(row) row = [] if row: buttons.append(row) buttons.append([Button.inline("Return to Menu", b"/help")]) return buttons # ---------- 命令帮助文本 ---------- def get_command_list_text(): msg = "🤖 Robot Command List:\n" for cmd, desc in COMMANDS_TEXT.items(): msg += f"{cmd} → {desc}\n" msg += f"\nSign-in Reward: {SIGN_POINTS} points each time, and 100 points can be redeemed for a 5 USDT futures fee voucher.\nChat Reward: The top {DAILY_SPEAK_TOP_N} users in daily chat activity each receive {DAILY_SPEAK_REWARD} points.\nInvitation Reward: Each invited user grants {INVITE_POINTS} points.\nDaily Settlement Time: Every day at 12:00 PM (America/New_York timezone)." return msg # ---------- 事件处理(异步) ---------- async def handle_command(event, cmd: str): user = await event.get_sender() if user is None: return user_id = user.id username = user.username or user.first_name or "未知用户" await add_or_update_user(user_id, username) reply = "" # 统一处理命令名(可能传进来已经小写) if cmd in ["/sign", "sign in","/tanda"]: if not await can_sign(user_id): reply = f"🌞 @{username},You've already signed in today.!" else: await add_points_immediate(user_id, SIGN_POINTS, reason="sign") await set_last_sign(user_id) total = await get_points(user_id) reply = f"✅ @{username} Check-in successful!obtain {SIGN_POINTS} Points\nCurrent total points:{total}\n\n100 points can be redeemed for a 5 USDT contract fee voucher.。" elif cmd in ["/daily_rank", "rankings","/kedudukan_harian"]: ranking = await get_daily_message_ranking() if not ranking: reply = "📊 No one has posted yet today.~" else: reply_lines = [] for i, (uid, uname, cnt) in enumerate(ranking): display = f"@{uname}" if uname else str(uid) reply_lines.append(f"{i + 1}. {display}: {cnt} 条") reply = "📊 Today's Top Speakers:\n" + "\n".join(reply_lines) elif cmd in ["/my_invites", "invitation","/daily_position"]: count = await get_invite_count(user_id) total_pts = await get_points(user_id) reply = f"👥 @{username},You have invited {count} person。\nCurrent points:{total_pts}" elif cmd in ["/btc"]: await event.respond("Please select the currency you wish to view:", buttons=get_crypto_buttons()) return else: await event.reply(get_command_list_text(), buttons=main_buttons) return await event.reply(reply) async def handle_price_command(event, symbol_name: str): symbol = crypto_currencies[symbol_name]["type"] msg = await get_price_msg(symbol) try: # CallbackQuery 用 edit 更友好 await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False) except Exception: await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False) # ---------- 定时任务:每小时推送随机币价 ---------- async def send_price_periodically(bot, chat_id): await asyncio.sleep(5) # 启动缓冲 while True: try: random_key = random.choice(list(crypto_currencies.keys())) msg = await get_price_msg(crypto_currencies[random_key]["type"]) await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False) except Exception as e: print(f"发送价格失败: {e}") # 等待下一小时整点(保持每小时推送一次) now = datetime.datetime.now(LOCAL_TZ) next_hour = (now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)) wait_seconds = (next_hour - now).total_seconds() await asyncio.sleep(wait_seconds) # ---------- 定时任务:每天结算(每天 12:00 America/New_York) ---------- async def daily_settlement_task(bot): """ 每天在 LOCAL_TZ 12:00 执行: - 计算当日发言排名,给前 N 名每人奖励 - 记录日志并清空当天的发言统计 """ while True: now = datetime.datetime.now(LOCAL_TZ) # 目标时间:今天 12:00(如果已过则到明天) target = now.replace(hour=12, minute=0, second=0, microsecond=0) if now >= target: target = target + datetime.timedelta(days=1) wait_seconds = (target - now).total_seconds() print(f"[settlement] Next settlement(12:00)in {target.isoformat()},Waiting {int(wait_seconds)} s") await asyncio.sleep(wait_seconds) # 执行结算(结算 target 的前一天) date_iso = (target - datetime.timedelta(days=1)).date().isoformat() print(f"[settlement] Execute settlement, target date:{date_iso}") ranking = await get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) if ranking: for i, (uid, uname, cnt) in enumerate(ranking): try: await add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i + 1}") except Exception as e: print(f"[settlement] For users {uid} Failed to issue rewards: {e}") # 给管理员或群组发送结算通知(可选) try: summary_lines = [f"📅 {date_iso} Speaker Ranking Settlement Results:"] for i, (uid, uname, cnt) in enumerate(ranking): display = f"@{uname}" if uname else str(uid) summary_lines.append(f"{i + 1}. {display} — {cnt} Article — obtain {DAILY_SPEAK_REWARD} Points") summary = "\n".join(summary_lines) for gid in ALLOWED_GROUPS: await bot.send_message(gid, summary) except Exception as e: print("[settlement] Failed to send settlement notification:", e) else: print("[settlement] No speaking records for the day; skip speaking rewards.") # 清空那天的统计 try: await reset_daily_messages(date_iso=date_iso) print(f"[settlement] 清理完成:{date_iso}") except Exception as e: print("[settlement] 清理 daily_messages 失败:", e) # ---------- 主逻辑 ---------- async def main(): await init_db() # 使用代理时传 proxy=PROXY,若不需要代理则移除该参数 # Telethon proxy 格式可能和你的代理不同,请按需调整(或移除) bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY) await bot.start(bot_token=BOT_TOKEN) # 启动每小时发送币价任务(对每个白名单群) for gid in ALLOWED_GROUPS: asyncio.create_task(send_price_periodically(bot, gid)) # 启动每日结算任务(12:00 America/New_York) asyncio.create_task(daily_settlement_task(bot)) @bot.on(events.ChatAction) async def welcome(event: events.ChatAction.Event): # 欢迎新成员并尝试记录邀请者(若存在) if not (event.user_added or event.user_joined): return # 尝试获取 inviter id(兼容多种 Telethon 结构) inviter_id = None try: # 优先 actor_id(某些 Telethon 版本和场景) if hasattr(event, "actor_id") and event.actor_id: inviter_id = event.actor_id # 再尝试 action_message.from_id(另一些版本) elif hasattr(event, "action_message") and event.action_message: maybe = getattr(event.action_message, "from_id", None) if isinstance(maybe, types.PeerUser): inviter_id = maybe.user_id elif isinstance(maybe, int): inviter_id = maybe # 在某些情形下 from_id 是 a User 或 MessageEntity,处理兼容性 elif hasattr(maybe, "user_id"): inviter_id = getattr(maybe, "user_id", None) except Exception as e: print(f"[welcome] 获取邀请者ID时出错: {e}") inviter_id = None # 对 event.users 中的新成员逐个处理 for user in event.users: try: new_uid = user.id new_name = user.username or user.first_name or "新成员" # 保证用户存在于 users 表中 await add_or_update_user(new_uid, new_name) # 如果找到邀请者并且不是自己,则记录邀请 if inviter_id and inviter_id != new_uid: # 确保 inviter 在 users 表中(用户名未知时填 None) await add_or_update_user(inviter_id, None) # 调用 inc_invite 并检查返回值 success = await inc_invite(inviter_id, new_uid) if success: print(f"[welcome] 成功处理邀请: {inviter_id} 邀请了 {new_uid}") else: print(f"[welcome] 邀请处理失败或已存在: {inviter_id} -> {new_uid}") elif not inviter_id: print(f"[welcome] 未找到邀请者,新成员 {new_uid} 可能是自己加入的") else: print(f"[welcome] 跳过自己邀请自己: {new_uid}") # 欢迎消息 try: await event.reply(f"欢迎 @{new_name} 加入群聊!\n{get_command_list_text()}") except Exception: # 某些场景 event.reply 不允许(例如系统消息),忽略 pass except Exception as e: print(f"[welcome] 处理新成员异常: user_id={user.id if user else 'unknown'}, error={e}") @bot.on(events.NewMessage) async def on_message(event): # 只处理白名单群组 if event.chat_id not in ALLOWED_GROUPS: return text = (event.raw_text or "").strip() if not text: return # 去掉命令末尾的 @botname if "@" in text and text.startswith("/"): text = text.split("@")[0] lowered = text.lower() # 命令集合(注意使用小写比较) cmds_map = { "签到": ["/sign", "签到", "/tanda"], "发言": ["/daily_rank", "发言", "/kedudukan_harian"], "邀请": ["/my_invites", "邀请", "/daily_position"], "币价": ["/btc", "币价"] } for group_cmds in cmds_map.values(): if lowered in [c.lower() for c in group_cmds]: # 传入原始命令(保持原有行为) await handle_command(event, lowered) return # 非命令,记录发言次数 sender = await event.get_sender() if sender: await add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户") await add_message_count(sender.id) @bot.on(events.CallbackQuery) async def callback(event): # 快速响应 UI try: await event.answer("Currently being processed...", alert=False) except Exception: pass cmd = event.data.decode() if event.data else "" if "@" in cmd and cmd.startswith("/"): cmd = cmd.split("@")[0] if cmd.startswith("price_"): symbol = cmd.split("_", 1)[1] await handle_price_command(event, symbol) else: await handle_command(event, cmd) print("🤖 机器人已启动,等待群聊消息...") await bot.run_until_disconnected() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("机器人已停止")