diff --git a/telegram/2349073562091.session b/telegram/2349073562091.session deleted file mode 100644 index e601da7..0000000 Binary files a/telegram/2349073562091.session and /dev/null differ diff --git a/telegram/bot_session.session b/telegram/bot_session.session index 04590ed..8b8f0ea 100644 Binary files a/telegram/bot_session.session and b/telegram/bot_session.session differ diff --git a/telegram/bot_session_23612.session b/telegram/bot_session_23612.session deleted file mode 100644 index 6b4eb94..0000000 Binary files a/telegram/bot_session_23612.session and /dev/null differ diff --git a/telegram/bot_session_35072.session b/telegram/bot_session_35072.session deleted file mode 100644 index cb3289a..0000000 Binary files a/telegram/bot_session_35072.session and /dev/null differ diff --git a/telegram/gpt.py b/telegram/gpt.py index 044a43e..85ccd61 100644 --- a/telegram/gpt.py +++ b/telegram/gpt.py @@ -1,22 +1,19 @@ -# bot.py -import random import asyncio import datetime import sqlite3 -from zoneinfo import ZoneInfo - +import random import requests +from zoneinfo import ZoneInfo +from typing import Optional, Tuple, List from telethon import TelegramClient, events, Button, types # ========== 配置区 ========== API_ID = 2040 API_HASH = "b18441a1ff607e10a989891a5462e627" BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" - -# 数据库文件 DB_PATH = "sign.db" -# 签到积分、邀请积分、发言奖励 +# 积分配置 SIGN_POINTS = 10 INVITE_POINTS = 10 DAILY_SPEAK_TOP_N = 10 @@ -37,8 +34,8 @@ PROXY = { 'password': "MH8ioL7EXf" } -# 邀请链接 + 币种链接示例(会出现在每条币价消息) INVITE_LINK = "https://www.websea.my/en/signup?key=77346588" + crypto_currencies = { "BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"}, "ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"}, @@ -47,77 +44,119 @@ crypto_currencies = { "WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"} } -# 主菜单按钮 main_buttons = [ - [Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")], - [Button.inline("我的邀请", b"/my_invites"), Button.inline("币价", b"/btc")], - [Button.inline("帮助", b"/help")] + [Button.inline("Sign in", b"/sign"), Button.inline("Today's Top Speakers", b"/daily_rank")], + [Button.inline("My invitation", b"/my_invites"), Button.inline("coin price", b"/btc")], + [Button.inline("Assistance", b"/help")] ] COMMANDS_TEXT = { - '/sign': '签到,每天签到一次,获取积分', - '签到': '签到,每天签到一次,获取积分', - '/daily_rank': '查看今日发言排行', - '/my_invites': '查看你邀请的人数', - '/btc': '获取 币价 菜单', - '/help': '显示所有可用命令及说明' + '/sign': 'Daily check-in, available once per day to earn points', + '/daily_rank': 'View today\'s message activity ranking', + '/my_invites': 'View the number of people you have invited', + '/btc': 'Get the Bitcoin price menu', + '/help': 'Show all available commands and descriptions' } + # ============================ -# ---------- 数据库操作 ---------- -def get_conn(): +# ---------- 全局并发控制 ---------- +# 使用 asyncio.Lock 在同一时间避免大量数据库并发写入(与 to_thread 配合) +DB_LOCK = asyncio.Lock() + + +# ---------- SQLite 同步底层实现(运行在线程池) ---------- +def _get_conn(): # 每次操作创建连接,避免 sqlite 的线程/并发问题 + # 不设置 check_same_thread:因为连接会在同一线程(to_thread)中被使用 return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) -def init_db(): - conn = get_conn() +def _init_db_sync(): + conn = _get_conn() cursor = conn.cursor() - # 用户表:保存积分、上次签到日期、累计邀请数 cursor.execute(''' - CREATE TABLE IF NOT EXISTS users ( - user_id INTEGER PRIMARY KEY, - username TEXT, - points INTEGER DEFAULT 0, - last_sign_date TEXT, - total_invites INTEGER DEFAULT 0 - ) - ''') - # 每日发言表,复合主键 (user_id, date) + CREATE TABLE IF NOT EXISTS users + ( + user_id + INTEGER + PRIMARY + KEY, + username + TEXT, + points + INTEGER + DEFAULT + 0, + last_sign_date + TEXT, + total_invites + INTEGER + DEFAULT + 0 + ) + ''') cursor.execute(''' - CREATE TABLE IF NOT EXISTS daily_messages ( - user_id INTEGER, - date TEXT, - count INTEGER DEFAULT 0, - PRIMARY KEY (user_id, date) - ) - ''') - # invites 表保存邀请关系(inviter_id, invitee_id) + CREATE TABLE IF NOT EXISTS daily_messages + ( + user_id + INTEGER, + date + TEXT, + count + INTEGER + DEFAULT + 0, + PRIMARY + KEY + ( + user_id, + date + ) + ) + ''') cursor.execute(''' - CREATE TABLE IF NOT EXISTS invites ( - inviter_id INTEGER, - invitee_id INTEGER, - created_at TEXT, - PRIMARY KEY (inviter_id, invitee_id) - ) - ''') - # 日志表:记录每日结算、奖励发放记录(便于审计) + CREATE TABLE IF NOT EXISTS invites + ( + inviter_id + INTEGER, + invitee_id + INTEGER, + created_at + TEXT, + PRIMARY + KEY + ( + inviter_id, + invitee_id + ) + ) + ''') cursor.execute(''' - CREATE TABLE IF NOT EXISTS points_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER, - points INTEGER, - reason TEXT, - created_at TEXT - ) - ''') + CREATE TABLE IF NOT EXISTS points_log + ( + id + INTEGER + PRIMARY + KEY + AUTOINCREMENT, + user_id + INTEGER, + points + INTEGER, + reason + TEXT, + created_at + TEXT + ) + ''') conn.commit() conn.close() -def add_or_update_user(user_id, username): - conn = get_conn() +def _add_or_update_user_sync(user_id: int, username: str): + conn = _get_conn() cursor = conn.cursor() cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id)) @@ -125,30 +164,30 @@ def add_or_update_user(user_id, username): conn.close() -def can_sign(user_id): - conn = get_conn() +def _get_last_sign_date_sync(user_id: int) -> Optional[str]: + conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() conn.close() - today = datetime.datetime.now(LOCAL_TZ).date().isoformat() - return False if row and row[0] == today else True + return row[0] if row else None -def add_points_immediate(user_id, points, reason=""): - """立即增加 points 并记录日志""" +def _add_points_immediate_sync(user_id: int, points: int, reason: str = ""): now = datetime.datetime.now(LOCAL_TZ).isoformat() - conn = get_conn() + conn = _get_conn() cursor = conn.cursor() - cursor.execute("UPDATE users SET points = points + ? WHERE user_id = ?", (points, user_id)) + # 如果用户可能不存在,先插入默认记录(避免 update 失败) + cursor.execute("INSERT OR IGNORE INTO users (user_id, username, points) VALUES (?, ?, 0)", (user_id, None)) + cursor.execute("UPDATE users SET points = COALESCE(points,0) + ? WHERE user_id = ?", (points, user_id)) cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)", (user_id, points, reason, now)) conn.commit() conn.close() -def get_points(user_id): - conn = get_conn() +def _get_points_sync(user_id: int) -> int: + conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() @@ -156,37 +195,43 @@ def get_points(user_id): return row[0] if row else 0 -def set_last_sign(user_id): +def _set_last_sign_sync(user_id: int): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() - conn = get_conn() + conn = _get_conn() cursor = conn.cursor() + cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, None)) cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id)) conn.commit() conn.close() -def inc_invite(inviter_id, invitee_id): - """记录邀请关系并奖励 inviter(只奖励第一次记录)""" +def _insert_invite_sync(inviter_id: int, invitee_id: int) -> bool: + """ + 插入邀请记录,返回 True 表示插入成功(即第一次邀请) + 如果已存在则抛出 sqlite3.IntegrityError 或返回 False + """ now = datetime.datetime.now(LOCAL_TZ).isoformat() - conn = get_conn() + conn = _get_conn() cursor = conn.cursor() try: cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)", (inviter_id, invitee_id, now)) - # 增加 inviter 的 total_invites - cursor.execute("UPDATE users SET total_invites = total_invites + 1 WHERE user_id = ?", (inviter_id,)) - # 立即奖励积分 - add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") - except sqlite3.IntegrityError: - # 已经存在,不重复奖励 - pass - finally: + # 更新邀请计数 + cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (inviter_id, None)) + cursor.execute("UPDATE users SET total_invites = COALESCE(total_invites,0) + 1 WHERE user_id = ?", + (inviter_id,)) conn.commit() + return True + except sqlite3.IntegrityError: + # 已存在 + conn.rollback() + return False + finally: conn.close() -def get_invite_count(user_id): - conn = get_conn() +def _get_invite_count_sync(user_id: int) -> int: + conn = _get_conn() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,)) count = cursor.fetchone()[0] @@ -194,68 +239,148 @@ def get_invite_count(user_id): return count -# ---------- 发言统计 ---------- -def add_message_count(user_id): +def _add_message_count_sync(user_id: int): today = datetime.datetime.now(LOCAL_TZ).date().isoformat() - conn = get_conn() + conn = _get_conn() cursor = conn.cursor() cursor.execute(''' - INSERT INTO daily_messages (user_id, date, count) - VALUES (?, ?, 1) - ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1 - ''', (user_id, today)) + INSERT INTO daily_messages (user_id, date, count) + VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO + UPDATE SET count = count + 1 + ''', (user_id, today)) conn.commit() conn.close() -def get_daily_message_ranking(date_iso=None, limit=None): +def _get_daily_message_ranking_sync(date_iso: Optional[str] = None, limit: Optional[int] = None) -> List[ + Tuple[int, Optional[str], int]]: date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() - conn = get_conn() + conn = _get_conn() cursor = conn.cursor() - cursor.execute(''' - SELECT u.user_id, u.username, d.count - FROM daily_messages d - JOIN users u ON d.user_id = u.user_id - WHERE d.date = ? - ORDER BY d.count DESC - {} - '''.format(f"LIMIT {limit}" if limit else ""), (date_iso,)) + query = ''' + SELECT u.user_id, u.username, d.count + FROM daily_messages d + JOIN users u ON d.user_id = u.user_id + WHERE d.date = ? + ORDER BY d.count DESC \ + ''' + if limit: + query += f" LIMIT {limit}" + cursor.execute(query, (date_iso,)) rows = cursor.fetchall() conn.close() return rows -def reset_daily_messages(date_iso=None): - """删除指定日期的 daily_messages(结算后调用)""" +def _reset_daily_messages_sync(date_iso: Optional[str] = None): date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() - conn = get_conn() + conn = _get_conn() cursor = conn.cursor() cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,)) conn.commit() conn.close() -# ---------- 币价获取 ---------- -def get_price(symbol: str): - """ - 使用外部 API 获取当日 K 线数据并返回当前价、24h 变化、今日高、今日低 - 这里调用的是 websea 的接口(和你原来一致) - """ +# ---------- 异步包装(供 bot 使用) ---------- +async def init_db(): + # 初始化 DB(放在 to_thread 中) + await asyncio.to_thread(_init_db_sync) + + +async def add_or_update_user(user_id: int, username: Optional[str]): + username = username or None + # use lock to avoid many threads trying to create the file at once + async with DB_LOCK: + await asyncio.to_thread(_add_or_update_user_sync, user_id, username) + + +async def can_sign(user_id: int) -> bool: + async with DB_LOCK: + last = await asyncio.to_thread(_get_last_sign_date_sync, user_id) + today = datetime.datetime.now(LOCAL_TZ).date().isoformat() + return False if last and last == today else True + + +async def add_points_immediate(user_id: int, points: int, reason: str = ""): + async with DB_LOCK: + await asyncio.to_thread(_add_points_immediate_sync, user_id, points, reason) + + +async def get_points(user_id: int) -> int: + async with DB_LOCK: + return await asyncio.to_thread(_get_points_sync, user_id) + + +async def set_last_sign(user_id: int): + async with DB_LOCK: + await asyncio.to_thread(_set_last_sign_sync, user_id) + + +async def inc_invite(inviter_id: int, invitee_id: int): + """记录邀请关系并奖励 inviter(只奖励第一次记录)""" + if not inviter_id or not invitee_id: + print(f"[invite] 无效的邀请者或受邀者ID: inviter={inviter_id}, invitee={invitee_id}") + return False + + if inviter_id == invitee_id: + print(f"[invite] 不能邀请自己: user_id={inviter_id}") + return False + + # 使用锁,保证并发情况下不会重复奖励 + async with DB_LOCK: + inserted = await asyncio.to_thread(_insert_invite_sync, inviter_id, invitee_id) + + if inserted: + # 插入成功后再给积分(不在 DB_LOCK 内执行,将奖励记录写进 DB) + try: + await add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") + print( + f"[invite] 成功记录邀请并奖励积分: inviter={inviter_id}, invitee={invitee_id}, points={INVITE_POINTS}") + return True + except Exception as e: + print(f"[invite] 奖励积分失败: inviter={inviter_id}, error={e}") + return False + else: + print(f"[invite] 邀请记录已存在(重复邀请): inviter={inviter_id}, invitee={invitee_id}") + return False + + +async def get_invite_count(user_id: int) -> int: + async with DB_LOCK: + return await asyncio.to_thread(_get_invite_count_sync, user_id) + + +async def add_message_count(user_id: int): + async with DB_LOCK: + await asyncio.to_thread(_add_message_count_sync, user_id) + + +async def get_daily_message_ranking(date_iso: Optional[str] = None, limit: Optional[int] = None): + async with DB_LOCK: + return await asyncio.to_thread(_get_daily_message_ranking_sync, date_iso, limit) + + +async def reset_daily_messages(date_iso: Optional[str] = None): + async with DB_LOCK: + await asyncio.to_thread(_reset_daily_messages_sync, date_iso) + + +# ---------- 币价获取(阻塞 requests -> 放到线程池) ---------- +def _get_price_sync(symbol: str) -> Tuple[float, float, float, float]: headers = {'user-agent': 'Mozilla/5.0'} tz = LOCAL_TZ today = datetime.datetime.now(tz).date() start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz) end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz) - params = { 'symbol': symbol, 'period': '30min', 'start': int(start_of_day.timestamp()), 'end': int(end_of_day.timestamp()), } - try: - response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, timeout=10) + response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, + timeout=10) data = response.json().get("result", {}).get("data", []) if not data: return 0.0, 0.0, 0.0, 0.0 @@ -270,25 +395,33 @@ def get_price(symbol: str): return 0.0, 0.0, 0.0, 0.0 -def get_price_msg(symbol: str): - current_price, change_24h, today_high, today_low = get_price(symbol) - name = symbol.split('-')[0] - url = crypto_currencies.get(name, {}).get("url", "#") +async def get_price(symbol: str): + return await asyncio.to_thread(_get_price_sync, symbol) + + +def get_price_msg_from_values(name: str, current_price: float, change_24h: float, today_high: float, today_low: float, + url: str): msg = ( - f"📊 {name}/USDT 实时行情\n\n" - f"💰 当前价格:{current_price:.4f} USDT\n" - f"📈 24小时涨跌幅:{change_24h:.2f}%\n" - f"⬆️ 最高价:{today_high:.4f} USDT\n" - f"⬇️ 最低价:{today_low:.4f} USDT\n\n" - f"🔗 交易链接:{url}\n" - f"🎁 注册邀请:{INVITE_LINK}\n\n" - f"(此消息每小时自动更新)" + f"📊 {name}/USDT Real-time Quotes\n\n" + f"💰 Current Price:{current_price:.4f} USDT\n" + f"📈 24-hour price change:{change_24h:.2f}%\n" + f"⬆️ Highest Price:{today_high:.4f} USDT\n" + f"⬇️ Lowest Price:{today_low:.4f} USDT\n\n" + f"🔗 Transaction Link:{url}\n" + f"🎁 Registration Invitation:{INVITE_LINK}\n\n" + f"(This message is automatically updated hourly.)" ) return msg +async def get_price_msg(symbol: str): + current_price, change_24h, today_high, today_low = await get_price(symbol) + name = symbol.split('-')[0] + url = crypto_currencies.get(name, {}).get("url", "#") + return get_price_msg_from_values(name, current_price, change_24h, today_high, today_low, url) + + def get_crypto_buttons(): - """生成币种选择按钮""" buttons = [] row = [] for name in crypto_currencies.keys(): @@ -298,50 +431,54 @@ def get_crypto_buttons(): row = [] if row: buttons.append(row) - buttons.append([Button.inline("返回菜单", b"/help")]) + buttons.append([Button.inline("Return to Menu", b"/help")]) return buttons -# ---------- 机器人命令处理 ---------- +# ---------- 命令帮助文本 ---------- def get_command_list_text(): - msg = "🤖 机器人命令列表:\n" + msg = "🤖 Robot Command List:\n" for cmd, desc in COMMANDS_TEXT.items(): msg += f"{cmd} → {desc}\n" - msg += f"\n签到奖励:每次 {SIGN_POINTS} 积分,100积分可兑换 5 USDT 合约手续费代金券。\n发言奖励:每日排行前 {DAILY_SPEAK_TOP_N} 每人 {DAILY_SPEAK_REWARD} 积分。\n邀请奖励:每邀请一人 {INVITE_POINTS} 积分。\n每日结算时间:每天 12:00(America/New_York 时区)。" + msg += f"\nSign-in Reward: {SIGN_POINTS} points each time, and 100 points can be redeemed for a 5 USDT futures fee voucher.\nChat Reward: The top {DAILY_SPEAK_TOP_N} users in daily chat activity each receive {DAILY_SPEAK_REWARD} points.\nInvitation Reward: Each invited user grants {INVITE_POINTS} points.\nDaily Settlement Time: Every day at 12:00 PM (America/New_York timezone)." return msg -async def handle_command(event, cmd): +# ---------- 事件处理(异步) ---------- +async def handle_command(event, cmd: str): user = await event.get_sender() + if user is None: + return user_id = user.id username = user.username or user.first_name or "未知用户" + await add_or_update_user(user_id, username) - add_or_update_user(user_id, username) reply = "" - - if cmd in ["/sign", "签到", "/tanda"]: - if not can_sign(user_id): - reply = f"🌞 @{username},你今天已经签到过啦!" + # 统一处理命令名(可能传进来已经小写) + if cmd in ["/sign", "sign in","/tanda"]: + if not await can_sign(user_id): + reply = f"🌞 @{username},You've already signed in today.!" else: - add_points_immediate(user_id, SIGN_POINTS, reason="sign") - set_last_sign(user_id) - total = get_points(user_id) - reply = f"✅ @{username} 签到成功!获得 {SIGN_POINTS} 积分\n当前总积分:{total}\n\n100积分可兑换 5USDT 合约手续费代金券。" - elif cmd in ["/daily_rank", "发言", "/kedudukan_harian"]: - ranking = get_daily_message_ranking() + await add_points_immediate(user_id, SIGN_POINTS, reason="sign") + await set_last_sign(user_id) + total = await get_points(user_id) + reply = f"✅ @{username} Check-in successful!obtain {SIGN_POINTS} Points\nCurrent total points:{total}\n\n100 points can be redeemed for a 5 USDT contract fee voucher.。" + elif cmd in ["/daily_rank", "rankings","/kedudukan_harian"]: + ranking = await get_daily_message_ranking() if not ranking: - reply = "📊 今日还没有人发言哦~" + reply = "📊 No one has posted yet today.~" else: reply_lines = [] for i, (uid, uname, cnt) in enumerate(ranking): - reply_lines.append(f"{i + 1}. @{uname or uid}: {cnt} 条") - reply = "📊 今日发言排行:\n" + "\n".join(reply_lines) - elif cmd in ["/my_invites", "邀请", "/daily_position"]: - count = get_invite_count(user_id) - total_pts = get_points(user_id) - reply = f"👥 @{username},你邀请了 {count} 人。\n当前积分:{total_pts}" - elif cmd in ["/btc", "币价"]: - await event.respond("请选择要查看的币种:", buttons=get_crypto_buttons()) + display = f"@{uname}" if uname else str(uid) + reply_lines.append(f"{i + 1}. {display}: {cnt} 条") + reply = "📊 Today's Top Speakers:\n" + "\n".join(reply_lines) + elif cmd in ["/my_invites", "invitation","/daily_position"]: + count = await get_invite_count(user_id) + total_pts = await get_points(user_id) + reply = f"👥 @{username},You have invited {count} person。\nCurrent points:{total_pts}" + elif cmd in ["/btc"]: + await event.respond("Please select the currency you wish to view:", buttons=get_crypto_buttons()) return else: await event.reply(get_command_list_text(), buttons=main_buttons) @@ -352,9 +489,9 @@ async def handle_command(event, cmd): async def handle_price_command(event, symbol_name: str): symbol = crypto_currencies[symbol_name]["type"] - msg = get_price_msg(symbol) - # event.edit 用于 CallbackQuery,event.respond/edit 都可 + msg = await get_price_msg(symbol) try: + # CallbackQuery 用 edit 更友好 await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False) except Exception: await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False) @@ -366,8 +503,7 @@ async def send_price_periodically(bot, chat_id): while True: try: random_key = random.choice(list(crypto_currencies.keys())) - msg = get_price_msg(crypto_currencies[random_key]["type"]) - # 注意:如果群组比较多,此处简单发送。可优化为只在特定时间或轮询。 + msg = await get_price_msg(crypto_currencies[random_key]["type"]) await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False) except Exception as e: print(f"发送价格失败: {e}") @@ -380,9 +516,10 @@ async def send_price_periodically(bot, chat_id): # ---------- 定时任务:每天结算(每天 12:00 America/New_York) ---------- async def daily_settlement_task(bot): - """每天在 LOCAL_TZ 12:00 执行: - - 计算当日发言排名,给前 N 名每人奖励 - - 记录日志并清空当天的发言统计 + """ + 每天在 LOCAL_TZ 12:00 执行: + - 计算当日发言排名,给前 N 名每人奖励 + - 记录日志并清空当天的发言统计 """ while True: now = datetime.datetime.now(LOCAL_TZ) @@ -391,35 +528,36 @@ async def daily_settlement_task(bot): if now >= target: target = target + datetime.timedelta(days=1) wait_seconds = (target - now).total_seconds() - print(f"[settlement] 下次结算(12:00)在 {target.isoformat()},等待 {int(wait_seconds)} s") + print(f"[settlement] Next settlement(12:00)in {target.isoformat()},Waiting {int(wait_seconds)} s") await asyncio.sleep(wait_seconds) - # 执行结算 - date_iso = (target - datetime.timedelta(days=1)).date().isoformat() # 结算前一天(也可结算今天到目前为止) - print(f"[settlement] 执行结算,目标日期:{date_iso}") - ranking = get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) + # 执行结算(结算 target 的前一天) + date_iso = (target - datetime.timedelta(days=1)).date().isoformat() + print(f"[settlement] Execute settlement, target date:{date_iso}") + ranking = await get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) if ranking: for i, (uid, uname, cnt) in enumerate(ranking): try: - add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i+1}") + await add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i + 1}") except Exception as e: - print(f"[settlement] 给用户 {uid} 发奖励失败: {e}") + print(f"[settlement] For users {uid} Failed to issue rewards: {e}") # 给管理员或群组发送结算通知(可选) try: - summary_lines = [f"📅 {date_iso} 发言排行榜结算结果:"] + summary_lines = [f"📅 {date_iso} Speaker Ranking Settlement Results:"] for i, (uid, uname, cnt) in enumerate(ranking): - summary_lines.append(f"{i+1}. @{uname or uid} — {cnt} 条 — 获得 {DAILY_SPEAK_REWARD} 积分") + display = f"@{uname}" if uname else str(uid) + summary_lines.append(f"{i + 1}. {display} — {cnt} Article — obtain {DAILY_SPEAK_REWARD} Points") summary = "\n".join(summary_lines) for gid in ALLOWED_GROUPS: await bot.send_message(gid, summary) except Exception as e: - print("[settlement] 发送结算通知失败:", e) + print("[settlement] Failed to send settlement notification:", e) else: - print("[settlement] 当日无发言记录,跳过发言奖励") + print("[settlement] No speaking records for the day; skip speaking rewards.") # 清空那天的统计 try: - reset_daily_messages(date_iso=date_iso) + await reset_daily_messages(date_iso=date_iso) print(f"[settlement] 清理完成:{date_iso}") except Exception as e: print("[settlement] 清理 daily_messages 失败:", e) @@ -427,8 +565,10 @@ async def daily_settlement_task(bot): # ---------- 主逻辑 ---------- async def main(): - init_db() + await init_db() + # 使用代理时传 proxy=PROXY,若不需要代理则移除该参数 + # Telethon proxy 格式可能和你的代理不同,请按需调整(或移除) bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY) await bot.start(bot_token=BOT_TOKEN) @@ -442,82 +582,109 @@ async def main(): @bot.on(events.ChatAction) async def welcome(event: events.ChatAction.Event): # 欢迎新成员并尝试记录邀请者(若存在) - if event.user_added or event.user_joined: - for user in event.users: - name = user.username or user.first_name or "新成员" - # 尝试获得 inviter id(不同版本的 Telethon 结构不完全一致) - inviter_id = None + if not (event.user_added or event.user_joined): + return + + # 尝试获取 inviter id(兼容多种 Telethon 结构) + inviter_id = None + try: + # 优先 actor_id(某些 Telethon 版本和场景) + if hasattr(event, "actor_id") and event.actor_id: + inviter_id = event.actor_id + # 再尝试 action_message.from_id(另一些版本) + elif hasattr(event, "action_message") and event.action_message: + maybe = getattr(event.action_message, "from_id", None) + if isinstance(maybe, types.PeerUser): + inviter_id = maybe.user_id + elif isinstance(maybe, int): + inviter_id = maybe + # 在某些情形下 from_id 是 a User 或 MessageEntity,处理兼容性 + elif hasattr(maybe, "user_id"): + inviter_id = getattr(maybe, "user_id", None) + except Exception as e: + print(f"[welcome] 获取邀请者ID时出错: {e}") + inviter_id = None + + # 对 event.users 中的新成员逐个处理 + for user in event.users: + try: + new_uid = user.id + new_name = user.username or user.first_name or "新成员" + + # 保证用户存在于 users 表中 + await add_or_update_user(new_uid, new_name) + + # 如果找到邀请者并且不是自己,则记录邀请 + if inviter_id and inviter_id != new_uid: + # 确保 inviter 在 users 表中(用户名未知时填 None) + await add_or_update_user(inviter_id, None) + + # 调用 inc_invite 并检查返回值 + success = await inc_invite(inviter_id, new_uid) + if success: + print(f"[welcome] 成功处理邀请: {inviter_id} 邀请了 {new_uid}") + else: + print(f"[welcome] 邀请处理失败或已存在: {inviter_id} -> {new_uid}") + elif not inviter_id: + print(f"[welcome] 未找到邀请者,新成员 {new_uid} 可能是自己加入的") + else: + print(f"[welcome] 跳过自己邀请自己: {new_uid}") + + # 欢迎消息 try: - # 若是邀请添加,action_message 的 from_id 通常是 inviter - if hasattr(event, 'action_message') and event.action_message: - maybe = getattr(event.action_message, "from_id", None) - if isinstance(maybe, types.PeerUser): - inviter_id = maybe.user_id - elif isinstance(maybe, (int,)): - inviter_id = maybe - # Telethon 也可能直接暴露 actor_id - if getattr(event, "actor_id", None): - inviter_id = event.actor_id - except Exception: - inviter_id = None - - # 如果 inviter_id 存在且与新成员不同,则记录邀请并奖励 - try: - if inviter_id and inviter_id != user.id: - # 确保 inviter 存在 users 表 - add_or_update_user(inviter_id, "unknown") - inc_invite(inviter_id, user.id) - except Exception as e: - print("记录邀请失败:", e) - - # 更新/插入新成员到 users 表 - add_or_update_user(user.id, user.username or user.first_name or "新成员") - - # 欢迎消息(发送到群内) - try: - await event.reply(f"欢迎 @{name} 加入群聊!\n{get_command_list_text()}") + await event.reply(f"欢迎 @{new_name} 加入群聊!\n{get_command_list_text()}") except Exception: + # 某些场景 event.reply 不允许(例如系统消息),忽略 pass + except Exception as e: + print(f"[welcome] 处理新成员异常: user_id={user.id if user else 'unknown'}, error={e}") + @bot.on(events.NewMessage) async def on_message(event): # 只处理白名单群组 if event.chat_id not in ALLOWED_GROUPS: return - # 普通消息处理:命令或记录发言计数 text = (event.raw_text or "").strip() if not text: return - # 去掉命令中的 @botname(如 /sign@mybot) + # 去掉命令末尾的 @botname if "@" in text and text.startswith("/"): text = text.split("@")[0] - # 小写匹配命令(命令与 data 映射) lowered = text.lower() - # 命令集合 + + # 命令集合(注意使用小写比较) cmds_map = { "签到": ["/sign", "签到", "/tanda"], "发言": ["/daily_rank", "发言", "/kedudukan_harian"], "邀请": ["/my_invites", "邀请", "/daily_position"], "币价": ["/btc", "币价"] } + for group_cmds in cmds_map.values(): if lowered in [c.lower() for c in group_cmds]: + # 传入原始命令(保持原有行为) await handle_command(event, lowered) return - # 不是命令则记录发言次数 + # 非命令,记录发言次数 sender = await event.get_sender() if sender: - add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户") - add_message_count(sender.id) + await add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户") + await add_message_count(sender.id) @bot.on(events.CallbackQuery) async def callback(event): - await event.answer("正在处理...", alert=False) - cmd = event.data.decode() + # 快速响应 UI + try: + await event.answer("Currently being processed...", alert=False) + except Exception: + pass + + cmd = event.data.decode() if event.data else "" if "@" in cmd and cmd.startswith("/"): cmd = cmd.split("@")[0] diff --git a/telegram/gpt1.py b/telegram/gpt1.py new file mode 100644 index 0000000..2ea9a20 --- /dev/null +++ b/telegram/gpt1.py @@ -0,0 +1,844 @@ +import asyncio +import datetime +import sqlite3 +import random +import requests +from zoneinfo import ZoneInfo +from typing import Optional, Tuple, List +from telethon import TelegramClient, events, Button, types +from telethon.tl.functions.bots import SetBotCommandsRequest +from telethon.tl.types import BotCommand, BotCommandScopeDefault + +# ========== 配置区 ========== +API_ID = 2040 +API_HASH = "b18441a1ff607e10a989891a5462e627" +BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" +DB_PATH = "sign.db" + +# 积分配置 +SIGN_POINTS = 10 +INVITE_POINTS = 10 +DAILY_SPEAK_TOP_N = 10 +DAILY_SPEAK_REWARD = 10 + +# 允许机器人运行的群组(只处理这些群的消息) +ALLOWED_GROUPS = [-1003238845008] + +# 时区(结算规则用) +LOCAL_TZ = ZoneInfo("America/New_York") + +# 代理(可选) +PROXY = { + 'proxy_type': "socks5", + 'addr': "202.155.144.102", + 'port': 31102, + 'username': "SyNuejCtrQ", + 'password': "MH8ioL7EXf" +} + +INVITE_LINK = "https://www.websea.my/en/signup?key=77346588" + +crypto_currencies = { + "BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"}, + "ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"}, + "SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"}, + "BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"}, + "WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"} +} + + +# ---------- 命令解析工具函数 ---------- +def normalize_command(cmd: str) -> str: + """ + 标准化命令:去除 @botname,转换为小写,去除前后空格 + 例如: /sign@docekrsebot -> /sign + 支持多种格式: /sign@botname, /sign@BOTNAME, /SIGN@botname 等 + """ + if not cmd: + return "" + cmd = cmd.strip() + if "@" in cmd: + cmd = cmd.split("@")[0].strip() + return cmd.lower().strip() + + +def command_definition(display: str, description: str, aliases: Optional[List[str]] = None) -> dict: + aliases = aliases or [] + return { + "display": display, + "description": description, + "aliases": list(set([display] + aliases)), + } + + +COMMAND_DEFINITIONS = { + "sign": command_definition( + "/sign", + "Daily check-in, available once per day to earn points", + aliases=["sign", "/tanda"], + ), + "daily_rank": command_definition( + "/daily_rank", + "View today's message activity ranking", + aliases=["rankings", "/kedudukan_harian"], + ), + "my_invites": command_definition( + "/my_invites", + "View the number of people you have invited", + aliases=["invitation", "/daily_position"], + ), + "points": command_definition( + "/points", + "View your current points balance", + aliases=["/balance", "/积分", "points"], + ), + "btc": command_definition( + "/btc", + "Get the Bitcoin price menu", + aliases=["coin price"], + ), + "help": command_definition( + "/help", + "Show all available commands and descriptions", + aliases=["help"], + ), +} + +COMMAND_ORDER = ["sign", "daily_rank", "my_invites", "points", "btc", "help"] + + +def build_alias_map() -> dict: + alias_map = {} + for key, info in COMMAND_DEFINITIONS.items(): + for alias in info["aliases"]: + alias_map[normalize_command(alias)] = key + alias_map[normalize_command(key)] = key + return alias_map + + +COMMAND_ALIAS_MAP = build_alias_map() + + +def resolve_command(cmd: str) -> Optional[str]: + return COMMAND_ALIAS_MAP.get(normalize_command(cmd)) + + +def command_payload(command_key: str) -> bytes: + return COMMAND_DEFINITIONS[command_key]["display"].encode() + + +main_buttons = [ + [Button.inline("Sign in", command_payload("sign")), + Button.inline("Today's Top Speakers", command_payload("daily_rank"))], + [Button.inline("My invitation", command_payload("my_invites")), + Button.inline("coin price", command_payload("btc"))], + [Button.inline("My Points", command_payload("points")), Button.inline("Assistance", command_payload("help"))], +] + + +# ============================ + +def get_primary_command_display(command_key: str) -> str: + return COMMAND_DEFINITIONS[command_key]["display"] + + +# ---------- 全局并发控制 ---------- +# 使用 asyncio.Lock 在同一时间避免大量数据库并发写入(与 to_thread 配合) +DB_LOCK = asyncio.Lock() + + +# ---------- SQLite 同步底层实现(运行在线程池) ---------- +def _get_conn(): + # 每次操作创建连接,避免 sqlite 的线程/并发问题 + # 不设置 check_same_thread:因为连接会在同一线程(to_thread)中被使用 + return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) + + +def _init_db_sync(): + conn = _get_conn() + cursor = conn.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS users + ( + user_id + INTEGER + PRIMARY + KEY, + username + TEXT, + points + INTEGER + DEFAULT + 0, + last_sign_date + TEXT, + total_invites + INTEGER + DEFAULT + 0 + ) + ''') + cursor.execute(''' + CREATE TABLE IF NOT EXISTS daily_messages + ( + user_id + INTEGER, + date + TEXT, + count + INTEGER + DEFAULT + 0, + PRIMARY + KEY + ( + user_id, + date + ) + ) + ''') + cursor.execute(''' + CREATE TABLE IF NOT EXISTS invites + ( + inviter_id + INTEGER, + invitee_id + INTEGER, + created_at + TEXT, + PRIMARY + KEY + ( + inviter_id, + invitee_id + ) + ) + ''') + cursor.execute(''' + CREATE TABLE IF NOT EXISTS points_log + ( + id + INTEGER + PRIMARY + KEY + AUTOINCREMENT, + user_id + INTEGER, + points + INTEGER, + reason + TEXT, + created_at + TEXT + ) + ''') + conn.commit() + conn.close() + + +def _add_or_update_user_sync(user_id: int, username: str): + conn = _get_conn() + cursor = conn.cursor() + cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) + cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id)) + conn.commit() + conn.close() + + +def _get_last_sign_date_sync(user_id: int) -> Optional[str]: + conn = _get_conn() + cursor = conn.cursor() + cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,)) + row = cursor.fetchone() + conn.close() + return row[0] if row else None + + +def _add_points_immediate_sync(user_id: int, points: int, reason: str = ""): + now = datetime.datetime.now(LOCAL_TZ).isoformat() + conn = _get_conn() + cursor = conn.cursor() + # 如果用户可能不存在,先插入默认记录(避免 update 失败) + cursor.execute("INSERT OR IGNORE INTO users (user_id, username, points) VALUES (?, ?, 0)", (user_id, None)) + cursor.execute("UPDATE users SET points = COALESCE(points,0) + ? WHERE user_id = ?", (points, user_id)) + cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)", + (user_id, points, reason, now)) + conn.commit() + conn.close() + + +def _get_points_sync(user_id: int) -> int: + conn = _get_conn() + cursor = conn.cursor() + cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,)) + row = cursor.fetchone() + conn.close() + return row[0] if row else 0 + + +def _set_last_sign_sync(user_id: int): + today = datetime.datetime.now(LOCAL_TZ).date().isoformat() + conn = _get_conn() + cursor = conn.cursor() + cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, None)) + cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id)) + conn.commit() + conn.close() + + +def _insert_invite_sync(inviter_id: int, invitee_id: int) -> bool: + """ + 插入邀请记录,返回 True 表示插入成功(即第一次邀请) + 如果已存在则抛出 sqlite3.IntegrityError 或返回 False + """ + now = datetime.datetime.now(LOCAL_TZ).isoformat() + conn = _get_conn() + cursor = conn.cursor() + try: + cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)", + (inviter_id, invitee_id, now)) + # 更新邀请计数 + cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (inviter_id, None)) + cursor.execute("UPDATE users SET total_invites = COALESCE(total_invites,0) + 1 WHERE user_id = ?", + (inviter_id,)) + conn.commit() + return True + except sqlite3.IntegrityError: + # 已存在 + conn.rollback() + return False + finally: + conn.close() + + +def _get_invite_count_sync(user_id: int) -> int: + conn = _get_conn() + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,)) + count = cursor.fetchone()[0] + conn.close() + return count + + +def _add_message_count_sync(user_id: int): + today = datetime.datetime.now(LOCAL_TZ).date().isoformat() + conn = _get_conn() + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO daily_messages (user_id, date, count) + VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO + UPDATE SET count = count + 1 + ''', (user_id, today)) + conn.commit() + conn.close() + + +def _get_daily_message_ranking_sync(date_iso: Optional[str] = None, limit: Optional[int] = None) -> List[ + Tuple[int, Optional[str], int]]: + date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() + conn = _get_conn() + cursor = conn.cursor() + query = ''' + SELECT u.user_id, u.username, d.count + FROM daily_messages d + JOIN users u ON d.user_id = u.user_id + WHERE d.date = ? + ORDER BY d.count DESC \ + ''' + if limit: + query += f" LIMIT {limit}" + cursor.execute(query, (date_iso,)) + rows = cursor.fetchall() + conn.close() + return rows + + +def _reset_daily_messages_sync(date_iso: Optional[str] = None): + date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() + conn = _get_conn() + cursor = conn.cursor() + cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,)) + conn.commit() + conn.close() + + +# ---------- 异步包装(供 bot 使用) ---------- +async def init_db(): + # 初始化 DB(放在 to_thread 中) + await asyncio.to_thread(_init_db_sync) + + +async def add_or_update_user(user_id: int, username: Optional[str]): + username = username or None + # use lock to avoid many threads trying to create the file at once + async with DB_LOCK: + await asyncio.to_thread(_add_or_update_user_sync, user_id, username) + + +async def can_sign(user_id: int) -> bool: + async with DB_LOCK: + last = await asyncio.to_thread(_get_last_sign_date_sync, user_id) + today = datetime.datetime.now(LOCAL_TZ).date().isoformat() + return False if last and last == today else True + + +async def add_points_immediate(user_id: int, points: int, reason: str = ""): + async with DB_LOCK: + await asyncio.to_thread(_add_points_immediate_sync, user_id, points, reason) + + +async def get_points(user_id: int) -> int: + async with DB_LOCK: + return await asyncio.to_thread(_get_points_sync, user_id) + + +async def set_last_sign(user_id: int): + async with DB_LOCK: + await asyncio.to_thread(_set_last_sign_sync, user_id) + + +async def inc_invite(inviter_id: int, invitee_id: int): + """Record invite relationships and reward the inviter (only first invite).""" + if not inviter_id or not invitee_id: + print(f"[invite] Invalid inviter or invitee id: inviter={inviter_id}, invitee={invitee_id}") + return False + + if inviter_id == invitee_id: + print(f"[invite] Cannot invite yourself: user_id={inviter_id}") + return False + + # 使用锁,保证并发情况下不会重复奖励 + async with DB_LOCK: + inserted = await asyncio.to_thread(_insert_invite_sync, inviter_id, invitee_id) + + if inserted: + # 插入成功后再给积分(不在 DB_LOCK 内执行,将奖励记录写进 DB) + try: + await add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") + print( + f"[invite] Invite recorded and points rewarded: inviter={inviter_id}, invitee={invitee_id}, points={INVITE_POINTS}") + return True + except Exception as e: + print(f"[invite] Failed to reward inviter={inviter_id}: {e}") + return False + else: + print(f"[invite] Invite already exists (duplicate): inviter={inviter_id}, invitee={invitee_id}") + return False + + +async def get_invite_count(user_id: int) -> int: + async with DB_LOCK: + return await asyncio.to_thread(_get_invite_count_sync, user_id) + + +async def add_message_count(user_id: int): + async with DB_LOCK: + await asyncio.to_thread(_add_message_count_sync, user_id) + + +async def get_daily_message_ranking(date_iso: Optional[str] = None, limit: Optional[int] = None): + async with DB_LOCK: + return await asyncio.to_thread(_get_daily_message_ranking_sync, date_iso, limit) + + +async def reset_daily_messages(date_iso: Optional[str] = None): + async with DB_LOCK: + await asyncio.to_thread(_reset_daily_messages_sync, date_iso) + + +# ---------- 币价获取(阻塞 requests -> 放到线程池) ---------- +def _get_price_sync(symbol: str) -> Tuple[float, float, float, float]: + headers = {'user-agent': 'Mozilla/5.0'} + tz = LOCAL_TZ + today = datetime.datetime.now(tz).date() + start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz) + end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz) + params = { + 'symbol': symbol, + 'period': '30min', + 'start': int(start_of_day.timestamp()), + 'end': int(end_of_day.timestamp()), + } + try: + response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, + timeout=10) + data = response.json().get("result", {}).get("data", []) + if not data: + return 0.0, 0.0, 0.0, 0.0 + current_price = float(data[0]['close']) + today_high = max(float(i['high']) for i in data) + today_low = min(float(i['low']) for i in data) + price_24h_ago = float(data[-1]['open']) + change_24h = (current_price - price_24h_ago) / price_24h_ago * 100 if price_24h_ago != 0 else 0.0 + return current_price, change_24h, today_high, today_low + except Exception as e: + print("[price] Failed to fetch price data:", e) + return 0.0, 0.0, 0.0, 0.0 + + +async def get_price(symbol: str): + return await asyncio.to_thread(_get_price_sync, symbol) + + +def get_price_msg_from_values(name: str, current_price: float, change_24h: float, today_high: float, today_low: float, + url: str): + msg = ( + f"📊 {name}/USDT Real-time Quotes\n\n" + f"💰 Current Price:{current_price:.4f} USDT\n" + f"📈 24-hour price change:{change_24h:.2f}%\n" + f"⬆️ Highest Price:{today_high:.4f} USDT\n" + f"⬇️ Lowest Price:{today_low:.4f} USDT\n\n" + f"🔗 Transaction Link:{url}\n" + f"🎁 Registration Invitation:{INVITE_LINK}\n\n" + f"(This message is automatically updated hourly.)" + ) + return msg + + +async def get_price_msg(symbol: str): + current_price, change_24h, today_high, today_low = await get_price(symbol) + name = symbol.split('-')[0] + url = crypto_currencies.get(name, {}).get("url", "#") + return get_price_msg_from_values(name, current_price, change_24h, today_high, today_low, url) + + +def get_crypto_buttons(): + buttons = [] + row = [] + for name in crypto_currencies.keys(): + row.append(Button.inline(name, f"price_{name}".encode())) + if len(row) == 3: + buttons.append(row) + row = [] + if row: + buttons.append(row) + buttons.append([Button.inline("Return to Menu", command_payload("help"))]) + return buttons + + +# ---------- 命令辅助 ---------- +async def get_sender_identity(event) -> Optional[Tuple[int, str]]: + user = await event.get_sender() + if user is None: + return None + username = user.username or user.first_name or "Unknown User" + await add_or_update_user(user.id, username) + return user.id, username + + +async def build_sign_message(user_id: int, username: str) -> str: + if not await can_sign(user_id): + return f"🌞 @{username}, you have already checked in today!" + + await add_points_immediate(user_id, SIGN_POINTS, reason="sign") + await set_last_sign(user_id) + total = await get_points(user_id) + return ( + f"✅ @{username} check-in succeed! Earned {SIGN_POINTS} points.\n" + f"Current total points: {total}\n\n" + f"100 points can be exchanged for a 5 USDT futures fee voucher." + ) + + +async def build_daily_rank_message() -> str: + ranking = await get_daily_message_ranking() + if not ranking: + return "📊 No one has spoken yet today." + + lines = [] + for idx, (uid, uname, cnt) in enumerate(ranking, start=1): + mention = f"@{uname}" if uname else str(uid) + lines.append(f"{idx}. {mention}: {cnt} messages") + return "📊 Today's speaker leaderboard:\n" + "\n".join(lines) + + +async def build_invite_message(user_id: int, username: str) -> str: + count = await get_invite_count(user_id) + total_pts = await get_points(user_id) + return f"👥 @{username}, you have invited {count} users.\nCurrent points: {total_pts}" + + +async def build_points_message(user_id: int, username: str) -> str: + total_pts = await get_points(user_id) + invite_count = await get_invite_count(user_id) + return ( + f"💰 @{username} Points Overview\n\n" + f"📊 Current points: {total_pts}\n" + f"👥 Invited users: {invite_count}\n\n" + f"💡 100 points can be exchanged for a 5 USDT futures fee voucher." + ) + + +def get_command_list_text() -> str: + msg = "🤖 Robot Command List:\n\n" + for key in COMMAND_ORDER: + info = COMMAND_DEFINITIONS[key] + msg += f"`{info['display']}` → {info['description']}\n" + msg += ( + f"\nSign-in Reward: {SIGN_POINTS} points each time, and 100 points can be redeemed for a 5 USDT futures fee voucher.\n" + f"Chat Reward: The top {DAILY_SPEAK_TOP_N} users in daily chat activity each receive {DAILY_SPEAK_REWARD} points.\n" + f"Invitation Reward: Each invited user grants {INVITE_POINTS} points.\n" + f"Daily Settlement Time: Every day at 12:00 PM (America/New_York timezone)." + ) + return msg + + +async def send_help(event): + await event.reply(get_command_list_text(), buttons=main_buttons, parse_mode="md") + + +# ---------- 事件处理(异步) ---------- +async def handle_command(event, cmd: str, canonical_cmd: Optional[str] = None): + identity = await get_sender_identity(event) + if not identity: + return + user_id, username = identity + + command_key = canonical_cmd or resolve_command(cmd) + if not command_key: + await send_help(event) + return + + if command_key == "sign": + reply = await build_sign_message(user_id, username) + elif command_key == "daily_rank": + reply = await build_daily_rank_message() + elif command_key == "my_invites": + reply = await build_invite_message(user_id, username) + elif command_key == "points": + reply = await build_points_message(user_id, username) + elif command_key == "btc": + await event.respond("Please select the currency you wish to view:", buttons=get_crypto_buttons()) + return + elif command_key == "help": + await send_help(event) + return + else: + await send_help(event) + return + + await event.reply(reply) + + +async def handle_price_command(event, symbol_name: str): + symbol = crypto_currencies[symbol_name]["type"] + msg = await get_price_msg(symbol) + try: + # CallbackQuery 用 edit 更友好 + await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False) + except Exception: + await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False) + + +# ---------- 定时任务:每小时推送随机币价 ---------- +async def send_price_periodically(bot, chat_id): + await asyncio.sleep(5) # 启动缓冲 + while True: + try: + random_key = random.choice(list(crypto_currencies.keys())) + msg = await get_price_msg(crypto_currencies[random_key]["type"]) + await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False) + except Exception as e: + print(f"[price] Failed to send price update: {e}") + # 等待下一小时整点(保持每小时推送一次) + now = datetime.datetime.now(LOCAL_TZ) + next_hour = (now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)) + wait_seconds = (next_hour - now).total_seconds() + await asyncio.sleep(wait_seconds) + + +# ---------- 定时任务:每天结算(每天 12:00 America/New_York) ---------- +async def daily_settlement_task(bot): + """ + 每天在 LOCAL_TZ 12:00 执行: + - 计算当日发言排名,给前 N 名每人奖励 + - 记录日志并清空当天的发言统计 + """ + while True: + now = datetime.datetime.now(LOCAL_TZ) + # 目标时间:今天 12:00(如果已过则到明天) + target = now.replace(hour=12, minute=0, second=0, microsecond=0) + if now >= target: + target = target + datetime.timedelta(days=1) + wait_seconds = (target - now).total_seconds() + print(f"[settlement] Next settlement (12:00) at {target.isoformat()}, waiting {int(wait_seconds)} s") + await asyncio.sleep(wait_seconds) + + # 执行结算(结算 target 的前一天) + date_iso = (target - datetime.timedelta(days=1)).date().isoformat() + print(f"[settlement] Executing settlement, target date: {date_iso}") + ranking = await get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) + if ranking: + for i, (uid, uname, cnt) in enumerate(ranking): + try: + await add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i + 1}") + except Exception as e: + print(f"[settlement] For users {uid} Failed to issue rewards: {e}") + # 给管理员或群组发送结算通知(可选) + try: + summary_lines = [f"📅 {date_iso} speaker ranking settlement results:"] + for i, (uid, uname, cnt) in enumerate(ranking): + display = f"@{uname}" if uname else str(uid) + summary_lines.append(f"{i + 1}. {display} — {cnt} messages — awarded {DAILY_SPEAK_REWARD} points") + summary = "\n".join(summary_lines) + for gid in ALLOWED_GROUPS: + await bot.send_message(gid, summary) + except Exception as e: + print("[settlement] Failed to send settlement notification:", e) + else: + print("[settlement] No speaking records for the day; skip speaking rewards.") + + # 清空那天的统计 + try: + await reset_daily_messages(date_iso=date_iso) + print(f"[settlement] Cleared daily messages for {date_iso}") + except Exception as e: + print("[settlement] Failed to clear daily_messages:", e) + + +# ---------- 主逻辑 ---------- +async def main(): + await init_db() + + # 使用代理时传 proxy=PROXY,若不需要代理则移除该参数 + # Telethon proxy 格式可能和你的代理不同,请按需调整(或移除) + bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY) + await bot.start(bot_token=BOT_TOKEN) + + # 设置 Bot Commands(让用户可以通过命令菜单选择命令,避免自动添加 @botname) + try: + commands = [ + BotCommand( + command=COMMAND_DEFINITIONS[key]["display"].lstrip("/"), + description=COMMAND_DEFINITIONS[key]["description"], + ) + for key in COMMAND_ORDER + ] + + await bot( + SetBotCommandsRequest( + scope=BotCommandScopeDefault(), + lang_code="en", + commands=commands, + ) + ) + print("[bot] Bot commands set successfully") + except Exception as e: + print(f"[bot] Failed to set bot commands: {e}") + + # 启动每小时发送币价任务(对每个白名单群) + for gid in ALLOWED_GROUPS: + asyncio.create_task(send_price_periodically(bot, gid)) + + # 启动每日结算任务(12:00 America/New_York) + asyncio.create_task(daily_settlement_task(bot)) + + @bot.on(events.ChatAction) + async def welcome(event: events.ChatAction.Event): + # 欢迎新成员并尝试记录邀请者(若存在) + if not (event.user_added or event.user_joined): + return + + # 尝试获取 inviter id(兼容多种 Telethon 结构) + inviter_id = None + try: + # 优先 actor_id(某些 Telethon 版本和场景) + if hasattr(event, "actor_id") and event.actor_id: + inviter_id = event.actor_id + # 再尝试 action_message.from_id(另一些版本) + elif hasattr(event, "action_message") and event.action_message: + maybe = getattr(event.action_message, "from_id", None) + if isinstance(maybe, types.PeerUser): + inviter_id = maybe.user_id + elif isinstance(maybe, int): + inviter_id = maybe + # 在某些情形下 from_id 是 a User 或 MessageEntity,处理兼容性 + elif hasattr(maybe, "user_id"): + inviter_id = getattr(maybe, "user_id", None) + except Exception as e: + print(f"[welcome] Failed to obtain inviter id: {e}") + inviter_id = None + + # 对 event.users 中的新成员逐个处理 + for user in event.users: + try: + new_uid = user.id + new_name = user.username or user.first_name or "New member" + + # 保证用户存在于 users 表中 + await add_or_update_user(new_uid, new_name) + + # 如果找到邀请者并且不是自己,则记录邀请 + if inviter_id and inviter_id != new_uid: + # 确保 inviter 在 users 表中(用户名未知时填 None) + await add_or_update_user(inviter_id, None) + + # 调用 inc_invite 并检查返回值 + success = await inc_invite(inviter_id, new_uid) + if success: + print(f"[welcome] Invite processed: {inviter_id} invited {new_uid}") + else: + print(f"[welcome] Invite already handled or failed: {inviter_id} -> {new_uid}") + elif not inviter_id: + print(f"[welcome] Inviter not found, new member {new_uid} likely joined independently") + else: + print(f"[welcome] Skipping self-invite: {new_uid}") + + # 欢迎消息 + try: + await event.reply(f"Welcome @{new_name} to the group!\n{get_command_list_text()}") + except Exception: + # 某些场景 event.reply 不允许(例如系统消息),忽略 + pass + + except Exception as e: + print(f"[welcome] Failed to handle new member: user_id={user.id if user else 'unknown'}, error={e}") + + @bot.on(events.NewMessage) + async def on_message(event): + if event.chat_id not in ALLOWED_GROUPS: + return + + text = (event.raw_text or "").strip() + if not text: + return + + command_key = resolve_command(text) + if command_key: + if "@" in text: + print(f"[command] Raw: {text} -> Resolved: {get_primary_command_display(command_key)}") + await handle_command(event, text, canonical_cmd=command_key) + return + + identity = await get_sender_identity(event) + if identity: + await add_message_count(identity[0]) + + @bot.on(events.CallbackQuery) + async def callback(event): + # 快速响应 UI + try: + await event.answer("Currently being processed...", alert=False) + except Exception: + pass + + cmd = event.data.decode() if event.data else "" + normalized_cmd = normalize_command(cmd) + + if normalized_cmd.startswith("price_"): + symbol = normalized_cmd.split("_", 1)[1] + await handle_price_command(event, symbol) + else: + command_key = resolve_command(cmd) + if command_key and "@" in cmd: + print(f"[callback] Raw: {cmd} -> Resolved: {get_primary_command_display(command_key)}") + await handle_command(event, cmd, canonical_cmd=command_key) + + print("🤖 Bot is running and waiting for group messages...") + await bot.run_until_disconnected() + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Bot stopped") diff --git a/telegram/haha.session b/telegram/haha.session deleted file mode 100644 index 8c75353..0000000 Binary files a/telegram/haha.session and /dev/null differ diff --git a/telegram/sign.db b/telegram/sign.db index eea0466..aefd235 100644 Binary files a/telegram/sign.db and b/telegram/sign.db differ