This commit is contained in:
ddrwode
2025-11-24 15:11:44 +08:00
parent 3cdddf154d
commit 5ebef9dcbb
8 changed files with 1223 additions and 212 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,22 +1,19 @@
# bot.py
import random
import asyncio import asyncio
import datetime import datetime
import sqlite3 import sqlite3
from zoneinfo import ZoneInfo import random
import requests import requests
from zoneinfo import ZoneInfo
from typing import Optional, Tuple, List
from telethon import TelegramClient, events, Button, types from telethon import TelegramClient, events, Button, types
# ========== 配置区 ========== # ========== 配置区 ==========
API_ID = 2040 API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627" API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
# 数据库文件
DB_PATH = "sign.db" DB_PATH = "sign.db"
# 签到积分、邀请积分、发言奖励 # 积分配置
SIGN_POINTS = 10 SIGN_POINTS = 10
INVITE_POINTS = 10 INVITE_POINTS = 10
DAILY_SPEAK_TOP_N = 10 DAILY_SPEAK_TOP_N = 10
@@ -37,8 +34,8 @@ PROXY = {
'password': "MH8ioL7EXf" 'password': "MH8ioL7EXf"
} }
# 邀请链接 + 币种链接示例(会出现在每条币价消息)
INVITE_LINK = "https://www.websea.my/en/signup?key=77346588" INVITE_LINK = "https://www.websea.my/en/signup?key=77346588"
crypto_currencies = { crypto_currencies = {
"BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"}, "BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"},
"ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"}, "ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"},
@@ -47,77 +44,119 @@ crypto_currencies = {
"WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"} "WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"}
} }
# 主菜单按钮
main_buttons = [ main_buttons = [
[Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")], [Button.inline("Sign in", b"/sign"), Button.inline("Today's Top Speakers", b"/daily_rank")],
[Button.inline("我的邀请", b"/my_invites"), Button.inline("币价", b"/btc")], [Button.inline("My invitation", b"/my_invites"), Button.inline("coin price", b"/btc")],
[Button.inline("帮助", b"/help")] [Button.inline("Assistance", b"/help")]
] ]
COMMANDS_TEXT = { COMMANDS_TEXT = {
'/sign': '签到,每天签到一次,获取积分', '/sign': 'Daily check-in, available once per day to earn points',
'签到': '签到,每天签到一次,获取积分', '/daily_rank': 'View today\'s message activity ranking',
'/daily_rank': '查看今日发言排行', '/my_invites': 'View the number of people you have invited',
'/my_invites': '查看你邀请的人数', '/btc': 'Get the Bitcoin price menu',
'/btc': '获取 币价 菜单', '/help': 'Show all available commands and descriptions'
'/help': '显示所有可用命令及说明'
} }
# ============================ # ============================
# ---------- 数据库操作 ---------- # ---------- 全局并发控制 ----------
def get_conn(): # 使用 asyncio.Lock 在同一时间避免大量数据库并发写入(与 to_thread 配合)
DB_LOCK = asyncio.Lock()
# ---------- SQLite 同步底层实现(运行在线程池) ----------
def _get_conn():
# 每次操作创建连接,避免 sqlite 的线程/并发问题 # 每次操作创建连接,避免 sqlite 的线程/并发问题
# 不设置 check_same_thread因为连接会在同一线程to_thread中被使用
return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
def init_db(): def _init_db_sync():
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
# 用户表:保存积分、上次签到日期、累计邀请数
cursor.execute(''' cursor.execute('''
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users
user_id INTEGER PRIMARY KEY, (
username TEXT, user_id
points INTEGER DEFAULT 0, INTEGER
last_sign_date TEXT, PRIMARY
total_invites INTEGER DEFAULT 0 KEY,
) username
''') TEXT,
# 每日发言表,复合主键 (user_id, date) points
INTEGER
DEFAULT
0,
last_sign_date
TEXT,
total_invites
INTEGER
DEFAULT
0
)
''')
cursor.execute(''' cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages ( CREATE TABLE IF NOT EXISTS daily_messages
user_id INTEGER, (
date TEXT, user_id
count INTEGER DEFAULT 0, INTEGER,
PRIMARY KEY (user_id, date) date
) TEXT,
''') count
# invites 表保存邀请关系inviter_id, invitee_id INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
cursor.execute(''' cursor.execute('''
CREATE TABLE IF NOT EXISTS invites ( CREATE TABLE IF NOT EXISTS invites
inviter_id INTEGER, (
invitee_id INTEGER, inviter_id
created_at TEXT, INTEGER,
PRIMARY KEY (inviter_id, invitee_id) invitee_id
) INTEGER,
''') created_at
# 日志表:记录每日结算、奖励发放记录(便于审计) TEXT,
PRIMARY
KEY
(
inviter_id,
invitee_id
)
)
''')
cursor.execute(''' cursor.execute('''
CREATE TABLE IF NOT EXISTS points_log ( CREATE TABLE IF NOT EXISTS points_log
id INTEGER PRIMARY KEY AUTOINCREMENT, (
user_id INTEGER, id
points INTEGER, INTEGER
reason TEXT, PRIMARY
created_at TEXT KEY
) AUTOINCREMENT,
''') user_id
INTEGER,
points
INTEGER,
reason
TEXT,
created_at
TEXT
)
''')
conn.commit() conn.commit()
conn.close() conn.close()
def add_or_update_user(user_id, username): def _add_or_update_user_sync(user_id: int, username: str):
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username)) cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id)) cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id))
@@ -125,30 +164,30 @@ def add_or_update_user(user_id, username):
conn.close() conn.close()
def can_sign(user_id): def _get_last_sign_date_sync(user_id: int) -> Optional[str]:
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,)) cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,))
row = cursor.fetchone() row = cursor.fetchone()
conn.close() conn.close()
today = datetime.datetime.now(LOCAL_TZ).date().isoformat() return row[0] if row else None
return False if row and row[0] == today else True
def add_points_immediate(user_id, points, reason=""): def _add_points_immediate_sync(user_id: int, points: int, reason: str = ""):
"""立即增加 points 并记录日志"""
now = datetime.datetime.now(LOCAL_TZ).isoformat() now = datetime.datetime.now(LOCAL_TZ).isoformat()
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("UPDATE users SET points = points + ? WHERE user_id = ?", (points, user_id)) # 如果用户可能不存在,先插入默认记录(避免 update 失败)
cursor.execute("INSERT OR IGNORE INTO users (user_id, username, points) VALUES (?, ?, 0)", (user_id, None))
cursor.execute("UPDATE users SET points = COALESCE(points,0) + ? WHERE user_id = ?", (points, user_id))
cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)", cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)",
(user_id, points, reason, now)) (user_id, points, reason, now))
conn.commit() conn.commit()
conn.close() conn.close()
def get_points(user_id): def _get_points_sync(user_id: int) -> int:
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,)) cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,))
row = cursor.fetchone() row = cursor.fetchone()
@@ -156,37 +195,43 @@ def get_points(user_id):
return row[0] if row else 0 return row[0] if row else 0
def set_last_sign(user_id): def _set_last_sign_sync(user_id: int):
today = datetime.datetime.now(LOCAL_TZ).date().isoformat() today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, None))
cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id)) cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id))
conn.commit() conn.commit()
conn.close() conn.close()
def inc_invite(inviter_id, invitee_id): def _insert_invite_sync(inviter_id: int, invitee_id: int) -> bool:
"""记录邀请关系并奖励 inviter只奖励第一次记录""" """
插入邀请记录,返回 True 表示插入成功(即第一次邀请)
如果已存在则抛出 sqlite3.IntegrityError 或返回 False
"""
now = datetime.datetime.now(LOCAL_TZ).isoformat() now = datetime.datetime.now(LOCAL_TZ).isoformat()
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
try: try:
cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)", cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)",
(inviter_id, invitee_id, now)) (inviter_id, invitee_id, now))
# 增加 inviter 的 total_invites # 更新邀请计数
cursor.execute("UPDATE users SET total_invites = total_invites + 1 WHERE user_id = ?", (inviter_id,)) cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (inviter_id, None))
# 立即奖励积分 cursor.execute("UPDATE users SET total_invites = COALESCE(total_invites,0) + 1 WHERE user_id = ?",
add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward") (inviter_id,))
except sqlite3.IntegrityError:
# 已经存在,不重复奖励
pass
finally:
conn.commit() conn.commit()
return True
except sqlite3.IntegrityError:
# 已存在
conn.rollback()
return False
finally:
conn.close() conn.close()
def get_invite_count(user_id): def _get_invite_count_sync(user_id: int) -> int:
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,)) cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,))
count = cursor.fetchone()[0] count = cursor.fetchone()[0]
@@ -194,68 +239,148 @@ def get_invite_count(user_id):
return count return count
# ---------- 发言统计 ---------- def _add_message_count_sync(user_id: int):
def add_message_count(user_id):
today = datetime.datetime.now(LOCAL_TZ).date().isoformat() today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(''' cursor.execute('''
INSERT INTO daily_messages (user_id, date, count) INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO
ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1 UPDATE SET count = count + 1
''', (user_id, today)) ''', (user_id, today))
conn.commit() conn.commit()
conn.close() conn.close()
def get_daily_message_ranking(date_iso=None, limit=None): def _get_daily_message_ranking_sync(date_iso: Optional[str] = None, limit: Optional[int] = None) -> List[
Tuple[int, Optional[str], int]]:
date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(''' query = '''
SELECT u.user_id, u.username, d.count SELECT u.user_id, u.username, d.count
FROM daily_messages d FROM daily_messages d
JOIN users u ON d.user_id = u.user_id JOIN users u ON d.user_id = u.user_id
WHERE d.date = ? WHERE d.date = ?
ORDER BY d.count DESC ORDER BY d.count DESC \
{} '''
'''.format(f"LIMIT {limit}" if limit else ""), (date_iso,)) if limit:
query += f" LIMIT {limit}"
cursor.execute(query, (date_iso,))
rows = cursor.fetchall() rows = cursor.fetchall()
conn.close() conn.close()
return rows return rows
def reset_daily_messages(date_iso=None): def _reset_daily_messages_sync(date_iso: Optional[str] = None):
"""删除指定日期的 daily_messages结算后调用"""
date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat() date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn() conn = _get_conn()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,)) cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,))
conn.commit() conn.commit()
conn.close() conn.close()
# ---------- 币价获取 ---------- # ---------- 异步包装(供 bot 使用) ----------
def get_price(symbol: str): async def init_db():
""" # 初始化 DB放在 to_thread 中)
使用外部 API 获取当日 K 线数据并返回当前价、24h 变化、今日高、今日低 await asyncio.to_thread(_init_db_sync)
这里调用的是 websea 的接口(和你原来一致)
"""
async def add_or_update_user(user_id: int, username: Optional[str]):
username = username or None
# use lock to avoid many threads trying to create the file at once
async with DB_LOCK:
await asyncio.to_thread(_add_or_update_user_sync, user_id, username)
async def can_sign(user_id: int) -> bool:
async with DB_LOCK:
last = await asyncio.to_thread(_get_last_sign_date_sync, user_id)
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
return False if last and last == today else True
async def add_points_immediate(user_id: int, points: int, reason: str = ""):
async with DB_LOCK:
await asyncio.to_thread(_add_points_immediate_sync, user_id, points, reason)
async def get_points(user_id: int) -> int:
async with DB_LOCK:
return await asyncio.to_thread(_get_points_sync, user_id)
async def set_last_sign(user_id: int):
async with DB_LOCK:
await asyncio.to_thread(_set_last_sign_sync, user_id)
async def inc_invite(inviter_id: int, invitee_id: int):
"""记录邀请关系并奖励 inviter只奖励第一次记录"""
if not inviter_id or not invitee_id:
print(f"[invite] 无效的邀请者或受邀者ID: inviter={inviter_id}, invitee={invitee_id}")
return False
if inviter_id == invitee_id:
print(f"[invite] 不能邀请自己: user_id={inviter_id}")
return False
# 使用锁,保证并发情况下不会重复奖励
async with DB_LOCK:
inserted = await asyncio.to_thread(_insert_invite_sync, inviter_id, invitee_id)
if inserted:
# 插入成功后再给积分(不在 DB_LOCK 内执行,将奖励记录写进 DB
try:
await add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward")
print(
f"[invite] 成功记录邀请并奖励积分: inviter={inviter_id}, invitee={invitee_id}, points={INVITE_POINTS}")
return True
except Exception as e:
print(f"[invite] 奖励积分失败: inviter={inviter_id}, error={e}")
return False
else:
print(f"[invite] 邀请记录已存在(重复邀请): inviter={inviter_id}, invitee={invitee_id}")
return False
async def get_invite_count(user_id: int) -> int:
async with DB_LOCK:
return await asyncio.to_thread(_get_invite_count_sync, user_id)
async def add_message_count(user_id: int):
async with DB_LOCK:
await asyncio.to_thread(_add_message_count_sync, user_id)
async def get_daily_message_ranking(date_iso: Optional[str] = None, limit: Optional[int] = None):
async with DB_LOCK:
return await asyncio.to_thread(_get_daily_message_ranking_sync, date_iso, limit)
async def reset_daily_messages(date_iso: Optional[str] = None):
async with DB_LOCK:
await asyncio.to_thread(_reset_daily_messages_sync, date_iso)
# ---------- 币价获取(阻塞 requests -> 放到线程池) ----------
def _get_price_sync(symbol: str) -> Tuple[float, float, float, float]:
headers = {'user-agent': 'Mozilla/5.0'} headers = {'user-agent': 'Mozilla/5.0'}
tz = LOCAL_TZ tz = LOCAL_TZ
today = datetime.datetime.now(tz).date() today = datetime.datetime.now(tz).date()
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz) start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz) end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = { params = {
'symbol': symbol, 'symbol': symbol,
'period': '30min', 'period': '30min',
'start': int(start_of_day.timestamp()), 'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()), 'end': int(end_of_day.timestamp()),
} }
try: try:
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, timeout=10) response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers,
timeout=10)
data = response.json().get("result", {}).get("data", []) data = response.json().get("result", {}).get("data", [])
if not data: if not data:
return 0.0, 0.0, 0.0, 0.0 return 0.0, 0.0, 0.0, 0.0
@@ -270,25 +395,33 @@ def get_price(symbol: str):
return 0.0, 0.0, 0.0, 0.0 return 0.0, 0.0, 0.0, 0.0
def get_price_msg(symbol: str): async def get_price(symbol: str):
current_price, change_24h, today_high, today_low = get_price(symbol) return await asyncio.to_thread(_get_price_sync, symbol)
name = symbol.split('-')[0]
url = crypto_currencies.get(name, {}).get("url", "#")
def get_price_msg_from_values(name: str, current_price: float, change_24h: float, today_high: float, today_low: float,
url: str):
msg = ( msg = (
f"📊 {name}/USDT 实时行情\n\n" f"📊 {name}/USDT Real-time Quotes\n\n"
f"💰 当前价格{current_price:.4f} USDT\n" f"💰 Current Price{current_price:.4f} USDT\n"
f"📈 24小时涨跌幅{change_24h:.2f}%\n" f"📈 24-hour price change{change_24h:.2f}%\n"
f"⬆️ 最高价{today_high:.4f} USDT\n" f"⬆️ Highest Price{today_high:.4f} USDT\n"
f"⬇️ 最低价{today_low:.4f} USDT\n\n" f"⬇️ Lowest Price{today_low:.4f} USDT\n\n"
f"🔗 交易链接{url}\n" f"🔗 Transaction Link{url}\n"
f"🎁 注册邀请{INVITE_LINK}\n\n" f"🎁 Registration Invitation{INVITE_LINK}\n\n"
f"此消息每小时自动更新" f"This message is automatically updated hourly."
) )
return msg return msg
async def get_price_msg(symbol: str):
current_price, change_24h, today_high, today_low = await get_price(symbol)
name = symbol.split('-')[0]
url = crypto_currencies.get(name, {}).get("url", "#")
return get_price_msg_from_values(name, current_price, change_24h, today_high, today_low, url)
def get_crypto_buttons(): def get_crypto_buttons():
"""生成币种选择按钮"""
buttons = [] buttons = []
row = [] row = []
for name in crypto_currencies.keys(): for name in crypto_currencies.keys():
@@ -298,50 +431,54 @@ def get_crypto_buttons():
row = [] row = []
if row: if row:
buttons.append(row) buttons.append(row)
buttons.append([Button.inline("返回菜单", b"/help")]) buttons.append([Button.inline("Return to Menu", b"/help")])
return buttons return buttons
# ---------- 机器人命令处理 ---------- # ---------- 命令帮助文本 ----------
def get_command_list_text(): def get_command_list_text():
msg = "🤖 机器人命令列表\n" msg = "🤖 Robot Command List\n"
for cmd, desc in COMMANDS_TEXT.items(): for cmd, desc in COMMANDS_TEXT.items():
msg += f"{cmd}{desc}\n" msg += f"{cmd}{desc}\n"
msg += f"\n签到奖励:每次 {SIGN_POINTS} 积分100积分可兑换 5 USDT 合约手续费代金券。\n发言奖励:每日排行前 {DAILY_SPEAK_TOP_N} 每人 {DAILY_SPEAK_REWARD} 积分。\n邀请奖励:每邀请一人 {INVITE_POINTS} 积分。\n每日结算时间:每天 12:00America/New_York 时区)。" msg += f"\nSign-in Reward: {SIGN_POINTS} points each time, and 100 points can be redeemed for a 5 USDT futures fee voucher.\nChat Reward: The top {DAILY_SPEAK_TOP_N} users in daily chat activity each receive {DAILY_SPEAK_REWARD} points.\nInvitation Reward: Each invited user grants {INVITE_POINTS} points.\nDaily Settlement Time: Every day at 12:00 PM (America/New_York timezone)."
return msg return msg
async def handle_command(event, cmd): # ---------- 事件处理(异步) ----------
async def handle_command(event, cmd: str):
user = await event.get_sender() user = await event.get_sender()
if user is None:
return
user_id = user.id user_id = user.id
username = user.username or user.first_name or "未知用户" username = user.username or user.first_name or "未知用户"
await add_or_update_user(user_id, username)
add_or_update_user(user_id, username)
reply = "" reply = ""
# 统一处理命令名(可能传进来已经小写)
if cmd in ["/sign", "签到", "/tanda"]: if cmd in ["/sign", "sign in","/tanda"]:
if not can_sign(user_id): if not await can_sign(user_id):
reply = f"🌞 @{username}你今天已经签到过啦" reply = f"🌞 @{username}You've already signed in today."
else: else:
add_points_immediate(user_id, SIGN_POINTS, reason="sign") await add_points_immediate(user_id, SIGN_POINTS, reason="sign")
set_last_sign(user_id) await set_last_sign(user_id)
total = get_points(user_id) total = await get_points(user_id)
reply = f"✅ @{username} 签到成功!获得 {SIGN_POINTS} 积分\n当前总积分:{total}\n\n100积分可兑换 5USDT 合约手续费代金券" reply = f"✅ @{username} Check-in successfulobtain {SIGN_POINTS} Points\nCurrent total points{total}\n\n100 points can be redeemed for a 5 USDT contract fee voucher."
elif cmd in ["/daily_rank", "发言", "/kedudukan_harian"]: elif cmd in ["/daily_rank", "rankings","/kedudukan_harian"]:
ranking = get_daily_message_ranking() ranking = await get_daily_message_ranking()
if not ranking: if not ranking:
reply = "📊 今日还没有人发言哦" reply = "📊 No one has posted yet today."
else: else:
reply_lines = [] reply_lines = []
for i, (uid, uname, cnt) in enumerate(ranking): for i, (uid, uname, cnt) in enumerate(ranking):
reply_lines.append(f"{i + 1}. @{uname or uid}: {cnt}") display = f"@{uname}" if uname else str(uid)
reply = "📊 今日发言排行:\n" + "\n".join(reply_lines) reply_lines.append(f"{i + 1}. {display}: {cnt}")
elif cmd in ["/my_invites", "邀请", "/daily_position"]: reply = "📊 Today's Top Speakers\n" + "\n".join(reply_lines)
count = get_invite_count(user_id) elif cmd in ["/my_invites", "invitation","/daily_position"]:
total_pts = get_points(user_id) count = await get_invite_count(user_id)
reply = f"👥 @{username},你邀请了 {count} 人。\n当前积分:{total_pts}" total_pts = await get_points(user_id)
elif cmd in ["/btc", "币价"]: reply = f"👥 @{username}You have invited {count} person。\nCurrent points{total_pts}"
await event.respond("请选择要查看的币种:", buttons=get_crypto_buttons()) elif cmd in ["/btc"]:
await event.respond("Please select the currency you wish to view", buttons=get_crypto_buttons())
return return
else: else:
await event.reply(get_command_list_text(), buttons=main_buttons) await event.reply(get_command_list_text(), buttons=main_buttons)
@@ -352,9 +489,9 @@ async def handle_command(event, cmd):
async def handle_price_command(event, symbol_name: str): async def handle_price_command(event, symbol_name: str):
symbol = crypto_currencies[symbol_name]["type"] symbol = crypto_currencies[symbol_name]["type"]
msg = get_price_msg(symbol) msg = await get_price_msg(symbol)
# event.edit 用于 CallbackQueryevent.respond/edit 都可
try: try:
# CallbackQuery 用 edit 更友好
await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False) await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception: except Exception:
await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False) await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False)
@@ -366,8 +503,7 @@ async def send_price_periodically(bot, chat_id):
while True: while True:
try: try:
random_key = random.choice(list(crypto_currencies.keys())) random_key = random.choice(list(crypto_currencies.keys()))
msg = get_price_msg(crypto_currencies[random_key]["type"]) msg = await get_price_msg(crypto_currencies[random_key]["type"])
# 注意:如果群组比较多,此处简单发送。可优化为只在特定时间或轮询。
await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False) await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception as e: except Exception as e:
print(f"发送价格失败: {e}") print(f"发送价格失败: {e}")
@@ -380,9 +516,10 @@ async def send_price_periodically(bot, chat_id):
# ---------- 定时任务:每天结算(每天 12:00 America/New_York ---------- # ---------- 定时任务:每天结算(每天 12:00 America/New_York ----------
async def daily_settlement_task(bot): async def daily_settlement_task(bot):
"""每天在 LOCAL_TZ 12:00 执行: """
- 计算当日发言排名,给前 N 名每人奖励 每天在 LOCAL_TZ 12:00 执行:
- 记录日志并清空当天的发言统计 - 计算当日发言排名,给前 N 名每人奖励
- 记录日志并清空当天的发言统计
""" """
while True: while True:
now = datetime.datetime.now(LOCAL_TZ) now = datetime.datetime.now(LOCAL_TZ)
@@ -391,35 +528,36 @@ async def daily_settlement_task(bot):
if now >= target: if now >= target:
target = target + datetime.timedelta(days=1) target = target + datetime.timedelta(days=1)
wait_seconds = (target - now).total_seconds() wait_seconds = (target - now).total_seconds()
print(f"[settlement] 下次结算12:00 {target.isoformat()}等待 {int(wait_seconds)} s") print(f"[settlement] Next settlement12:00in {target.isoformat()}Waiting {int(wait_seconds)} s")
await asyncio.sleep(wait_seconds) await asyncio.sleep(wait_seconds)
# 执行结算 # 执行结算(结算 target 的前一天)
date_iso = (target - datetime.timedelta(days=1)).date().isoformat() # 结算前一天(也可结算今天到目前为止) date_iso = (target - datetime.timedelta(days=1)).date().isoformat()
print(f"[settlement] 执行结算,目标日期{date_iso}") print(f"[settlement] Execute settlement, target date{date_iso}")
ranking = get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N) ranking = await get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N)
if ranking: if ranking:
for i, (uid, uname, cnt) in enumerate(ranking): for i, (uid, uname, cnt) in enumerate(ranking):
try: try:
add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i+1}") await add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i + 1}")
except Exception as e: except Exception as e:
print(f"[settlement] 给用户 {uid} 发奖励失败: {e}") print(f"[settlement] For users {uid} Failed to issue rewards: {e}")
# 给管理员或群组发送结算通知(可选) # 给管理员或群组发送结算通知(可选)
try: try:
summary_lines = [f"📅 {date_iso} 发言排行榜结算结果"] summary_lines = [f"📅 {date_iso} Speaker Ranking Settlement Results"]
for i, (uid, uname, cnt) in enumerate(ranking): for i, (uid, uname, cnt) in enumerate(ranking):
summary_lines.append(f"{i+1}. @{uname or uid}{cnt} 条 — 获得 {DAILY_SPEAK_REWARD} 积分") display = f"@{uname}" if uname else str(uid)
summary_lines.append(f"{i + 1}. {display}{cnt} Article — obtain {DAILY_SPEAK_REWARD} Points")
summary = "\n".join(summary_lines) summary = "\n".join(summary_lines)
for gid in ALLOWED_GROUPS: for gid in ALLOWED_GROUPS:
await bot.send_message(gid, summary) await bot.send_message(gid, summary)
except Exception as e: except Exception as e:
print("[settlement] 发送结算通知失败:", e) print("[settlement] Failed to send settlement notification:", e)
else: else:
print("[settlement] 当日无发言记录,跳过发言奖励") print("[settlement] No speaking records for the day; skip speaking rewards.")
# 清空那天的统计 # 清空那天的统计
try: try:
reset_daily_messages(date_iso=date_iso) await reset_daily_messages(date_iso=date_iso)
print(f"[settlement] 清理完成:{date_iso}") print(f"[settlement] 清理完成:{date_iso}")
except Exception as e: except Exception as e:
print("[settlement] 清理 daily_messages 失败:", e) print("[settlement] 清理 daily_messages 失败:", e)
@@ -427,8 +565,10 @@ async def daily_settlement_task(bot):
# ---------- 主逻辑 ---------- # ---------- 主逻辑 ----------
async def main(): async def main():
init_db() await init_db()
# 使用代理时传 proxy=PROXY若不需要代理则移除该参数 # 使用代理时传 proxy=PROXY若不需要代理则移除该参数
# Telethon proxy 格式可能和你的代理不同,请按需调整(或移除)
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY) bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN) await bot.start(bot_token=BOT_TOKEN)
@@ -442,82 +582,109 @@ async def main():
@bot.on(events.ChatAction) @bot.on(events.ChatAction)
async def welcome(event: events.ChatAction.Event): async def welcome(event: events.ChatAction.Event):
# 欢迎新成员并尝试记录邀请者(若存在) # 欢迎新成员并尝试记录邀请者(若存在)
if event.user_added or event.user_joined: if not (event.user_added or event.user_joined):
for user in event.users: return
name = user.username or user.first_name or "新成员"
# 尝试获 inviter id不同版本的 Telethon 结构不完全一致 # 尝试获 inviter id兼容多种 Telethon 结构)
inviter_id = None inviter_id = None
try:
# 优先 actor_id某些 Telethon 版本和场景)
if hasattr(event, "actor_id") and event.actor_id:
inviter_id = event.actor_id
# 再尝试 action_message.from_id另一些版本
elif hasattr(event, "action_message") and event.action_message:
maybe = getattr(event.action_message, "from_id", None)
if isinstance(maybe, types.PeerUser):
inviter_id = maybe.user_id
elif isinstance(maybe, int):
inviter_id = maybe
# 在某些情形下 from_id 是 a User 或 MessageEntity处理兼容性
elif hasattr(maybe, "user_id"):
inviter_id = getattr(maybe, "user_id", None)
except Exception as e:
print(f"[welcome] 获取邀请者ID时出错: {e}")
inviter_id = None
# 对 event.users 中的新成员逐个处理
for user in event.users:
try:
new_uid = user.id
new_name = user.username or user.first_name or "新成员"
# 保证用户存在于 users 表中
await add_or_update_user(new_uid, new_name)
# 如果找到邀请者并且不是自己,则记录邀请
if inviter_id and inviter_id != new_uid:
# 确保 inviter 在 users 表中(用户名未知时填 None
await add_or_update_user(inviter_id, None)
# 调用 inc_invite 并检查返回值
success = await inc_invite(inviter_id, new_uid)
if success:
print(f"[welcome] 成功处理邀请: {inviter_id} 邀请了 {new_uid}")
else:
print(f"[welcome] 邀请处理失败或已存在: {inviter_id} -> {new_uid}")
elif not inviter_id:
print(f"[welcome] 未找到邀请者,新成员 {new_uid} 可能是自己加入的")
else:
print(f"[welcome] 跳过自己邀请自己: {new_uid}")
# 欢迎消息
try: try:
# 若是邀请添加action_message 的 from_id 通常是 inviter await event.reply(f"欢迎 @{new_name} 加入群聊!\n{get_command_list_text()}")
if hasattr(event, 'action_message') and event.action_message:
maybe = getattr(event.action_message, "from_id", None)
if isinstance(maybe, types.PeerUser):
inviter_id = maybe.user_id
elif isinstance(maybe, (int,)):
inviter_id = maybe
# Telethon 也可能直接暴露 actor_id
if getattr(event, "actor_id", None):
inviter_id = event.actor_id
except Exception:
inviter_id = None
# 如果 inviter_id 存在且与新成员不同,则记录邀请并奖励
try:
if inviter_id and inviter_id != user.id:
# 确保 inviter 存在 users 表
add_or_update_user(inviter_id, "unknown")
inc_invite(inviter_id, user.id)
except Exception as e:
print("记录邀请失败:", e)
# 更新/插入新成员到 users 表
add_or_update_user(user.id, user.username or user.first_name or "新成员")
# 欢迎消息(发送到群内)
try:
await event.reply(f"欢迎 @{name} 加入群聊!\n{get_command_list_text()}")
except Exception: except Exception:
# 某些场景 event.reply 不允许(例如系统消息),忽略
pass pass
except Exception as e:
print(f"[welcome] 处理新成员异常: user_id={user.id if user else 'unknown'}, error={e}")
@bot.on(events.NewMessage) @bot.on(events.NewMessage)
async def on_message(event): async def on_message(event):
# 只处理白名单群组 # 只处理白名单群组
if event.chat_id not in ALLOWED_GROUPS: if event.chat_id not in ALLOWED_GROUPS:
return return
# 普通消息处理:命令或记录发言计数
text = (event.raw_text or "").strip() text = (event.raw_text or "").strip()
if not text: if not text:
return return
# 去掉命令的 @botname(如 /sign@mybot # 去掉命令末尾的 @botname
if "@" in text and text.startswith("/"): if "@" in text and text.startswith("/"):
text = text.split("@")[0] text = text.split("@")[0]
# 小写匹配命令(命令与 data 映射)
lowered = text.lower() lowered = text.lower()
# 命令集合
# 命令集合(注意使用小写比较)
cmds_map = { cmds_map = {
"签到": ["/sign", "签到", "/tanda"], "签到": ["/sign", "签到", "/tanda"],
"发言": ["/daily_rank", "发言", "/kedudukan_harian"], "发言": ["/daily_rank", "发言", "/kedudukan_harian"],
"邀请": ["/my_invites", "邀请", "/daily_position"], "邀请": ["/my_invites", "邀请", "/daily_position"],
"币价": ["/btc", "币价"] "币价": ["/btc", "币价"]
} }
for group_cmds in cmds_map.values(): for group_cmds in cmds_map.values():
if lowered in [c.lower() for c in group_cmds]: if lowered in [c.lower() for c in group_cmds]:
# 传入原始命令(保持原有行为)
await handle_command(event, lowered) await handle_command(event, lowered)
return return
# 不是命令记录发言次数 # 命令记录发言次数
sender = await event.get_sender() sender = await event.get_sender()
if sender: if sender:
add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户") await add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户")
add_message_count(sender.id) await add_message_count(sender.id)
@bot.on(events.CallbackQuery) @bot.on(events.CallbackQuery)
async def callback(event): async def callback(event):
await event.answer("正在处理...", alert=False) # 快速响应 UI
cmd = event.data.decode() try:
await event.answer("Currently being processed...", alert=False)
except Exception:
pass
cmd = event.data.decode() if event.data else ""
if "@" in cmd and cmd.startswith("/"): if "@" in cmd and cmd.startswith("/"):
cmd = cmd.split("@")[0] cmd = cmd.split("@")[0]

844
telegram/gpt1.py Normal file
View File

@@ -0,0 +1,844 @@
import asyncio
import datetime
import sqlite3
import random
import requests
from zoneinfo import ZoneInfo
from typing import Optional, Tuple, List
from telethon import TelegramClient, events, Button, types
from telethon.tl.functions.bots import SetBotCommandsRequest
from telethon.tl.types import BotCommand, BotCommandScopeDefault
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
# 积分配置
SIGN_POINTS = 10
INVITE_POINTS = 10
DAILY_SPEAK_TOP_N = 10
DAILY_SPEAK_REWARD = 10
# 允许机器人运行的群组(只处理这些群的消息)
ALLOWED_GROUPS = [-1003238845008]
# 时区(结算规则用)
LOCAL_TZ = ZoneInfo("America/New_York")
# 代理(可选)
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
INVITE_LINK = "https://www.websea.my/en/signup?key=77346588"
crypto_currencies = {
"BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"},
"ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"},
"SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"},
"BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"},
"WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"}
}
# ---------- 命令解析工具函数 ----------
def normalize_command(cmd: str) -> str:
"""
标准化命令:去除 @botname转换为小写去除前后空格
例如: /sign@docekrsebot -> /sign
支持多种格式: /sign@botname, /sign@BOTNAME, /SIGN@botname 等
"""
if not cmd:
return ""
cmd = cmd.strip()
if "@" in cmd:
cmd = cmd.split("@")[0].strip()
return cmd.lower().strip()
def command_definition(display: str, description: str, aliases: Optional[List[str]] = None) -> dict:
aliases = aliases or []
return {
"display": display,
"description": description,
"aliases": list(set([display] + aliases)),
}
COMMAND_DEFINITIONS = {
"sign": command_definition(
"/sign",
"Daily check-in, available once per day to earn points",
aliases=["sign", "/tanda"],
),
"daily_rank": command_definition(
"/daily_rank",
"View today's message activity ranking",
aliases=["rankings", "/kedudukan_harian"],
),
"my_invites": command_definition(
"/my_invites",
"View the number of people you have invited",
aliases=["invitation", "/daily_position"],
),
"points": command_definition(
"/points",
"View your current points balance",
aliases=["/balance", "/积分", "points"],
),
"btc": command_definition(
"/btc",
"Get the Bitcoin price menu",
aliases=["coin price"],
),
"help": command_definition(
"/help",
"Show all available commands and descriptions",
aliases=["help"],
),
}
COMMAND_ORDER = ["sign", "daily_rank", "my_invites", "points", "btc", "help"]
def build_alias_map() -> dict:
alias_map = {}
for key, info in COMMAND_DEFINITIONS.items():
for alias in info["aliases"]:
alias_map[normalize_command(alias)] = key
alias_map[normalize_command(key)] = key
return alias_map
COMMAND_ALIAS_MAP = build_alias_map()
def resolve_command(cmd: str) -> Optional[str]:
return COMMAND_ALIAS_MAP.get(normalize_command(cmd))
def command_payload(command_key: str) -> bytes:
return COMMAND_DEFINITIONS[command_key]["display"].encode()
main_buttons = [
[Button.inline("Sign in", command_payload("sign")),
Button.inline("Today's Top Speakers", command_payload("daily_rank"))],
[Button.inline("My invitation", command_payload("my_invites")),
Button.inline("coin price", command_payload("btc"))],
[Button.inline("My Points", command_payload("points")), Button.inline("Assistance", command_payload("help"))],
]
# ============================
def get_primary_command_display(command_key: str) -> str:
return COMMAND_DEFINITIONS[command_key]["display"]
# ---------- 全局并发控制 ----------
# 使用 asyncio.Lock 在同一时间避免大量数据库并发写入(与 to_thread 配合)
DB_LOCK = asyncio.Lock()
# ---------- SQLite 同步底层实现(运行在线程池) ----------
def _get_conn():
# 每次操作创建连接,避免 sqlite 的线程/并发问题
# 不设置 check_same_thread因为连接会在同一线程to_thread中被使用
return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
def _init_db_sync():
conn = _get_conn()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER
PRIMARY
KEY,
username
TEXT,
points
INTEGER
DEFAULT
0,
last_sign_date
TEXT,
total_invites
INTEGER
DEFAULT
0
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id
INTEGER,
date
TEXT,
count
INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id
INTEGER,
invitee_id
INTEGER,
created_at
TEXT,
PRIMARY
KEY
(
inviter_id,
invitee_id
)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS points_log
(
id
INTEGER
PRIMARY
KEY
AUTOINCREMENT,
user_id
INTEGER,
points
INTEGER,
reason
TEXT,
created_at
TEXT
)
''')
conn.commit()
conn.close()
def _add_or_update_user_sync(user_id: int, username: str):
conn = _get_conn()
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id))
conn.commit()
conn.close()
def _get_last_sign_date_sync(user_id: int) -> Optional[str]:
conn = _get_conn()
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else None
def _add_points_immediate_sync(user_id: int, points: int, reason: str = ""):
now = datetime.datetime.now(LOCAL_TZ).isoformat()
conn = _get_conn()
cursor = conn.cursor()
# 如果用户可能不存在,先插入默认记录(避免 update 失败)
cursor.execute("INSERT OR IGNORE INTO users (user_id, username, points) VALUES (?, ?, 0)", (user_id, None))
cursor.execute("UPDATE users SET points = COALESCE(points,0) + ? WHERE user_id = ?", (points, user_id))
cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)",
(user_id, points, reason, now))
conn.commit()
conn.close()
def _get_points_sync(user_id: int) -> int:
conn = _get_conn()
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else 0
def _set_last_sign_sync(user_id: int):
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = _get_conn()
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, None))
cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id))
conn.commit()
conn.close()
def _insert_invite_sync(inviter_id: int, invitee_id: int) -> bool:
"""
插入邀请记录,返回 True 表示插入成功(即第一次邀请)
如果已存在则抛出 sqlite3.IntegrityError 或返回 False
"""
now = datetime.datetime.now(LOCAL_TZ).isoformat()
conn = _get_conn()
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)",
(inviter_id, invitee_id, now))
# 更新邀请计数
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (inviter_id, None))
cursor.execute("UPDATE users SET total_invites = COALESCE(total_invites,0) + 1 WHERE user_id = ?",
(inviter_id,))
conn.commit()
return True
except sqlite3.IntegrityError:
# 已存在
conn.rollback()
return False
finally:
conn.close()
def _get_invite_count_sync(user_id: int) -> int:
conn = _get_conn()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,))
count = cursor.fetchone()[0]
conn.close()
return count
def _add_message_count_sync(user_id: int):
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = _get_conn()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO
UPDATE SET count = count + 1
''', (user_id, today))
conn.commit()
conn.close()
def _get_daily_message_ranking_sync(date_iso: Optional[str] = None, limit: Optional[int] = None) -> List[
Tuple[int, Optional[str], int]]:
date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = _get_conn()
cursor = conn.cursor()
query = '''
SELECT u.user_id, u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC \
'''
if limit:
query += f" LIMIT {limit}"
cursor.execute(query, (date_iso,))
rows = cursor.fetchall()
conn.close()
return rows
def _reset_daily_messages_sync(date_iso: Optional[str] = None):
date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = _get_conn()
cursor = conn.cursor()
cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,))
conn.commit()
conn.close()
# ---------- 异步包装(供 bot 使用) ----------
async def init_db():
# 初始化 DB放在 to_thread 中)
await asyncio.to_thread(_init_db_sync)
async def add_or_update_user(user_id: int, username: Optional[str]):
username = username or None
# use lock to avoid many threads trying to create the file at once
async with DB_LOCK:
await asyncio.to_thread(_add_or_update_user_sync, user_id, username)
async def can_sign(user_id: int) -> bool:
async with DB_LOCK:
last = await asyncio.to_thread(_get_last_sign_date_sync, user_id)
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
return False if last and last == today else True
async def add_points_immediate(user_id: int, points: int, reason: str = ""):
async with DB_LOCK:
await asyncio.to_thread(_add_points_immediate_sync, user_id, points, reason)
async def get_points(user_id: int) -> int:
async with DB_LOCK:
return await asyncio.to_thread(_get_points_sync, user_id)
async def set_last_sign(user_id: int):
async with DB_LOCK:
await asyncio.to_thread(_set_last_sign_sync, user_id)
async def inc_invite(inviter_id: int, invitee_id: int):
"""Record invite relationships and reward the inviter (only first invite)."""
if not inviter_id or not invitee_id:
print(f"[invite] Invalid inviter or invitee id: inviter={inviter_id}, invitee={invitee_id}")
return False
if inviter_id == invitee_id:
print(f"[invite] Cannot invite yourself: user_id={inviter_id}")
return False
# 使用锁,保证并发情况下不会重复奖励
async with DB_LOCK:
inserted = await asyncio.to_thread(_insert_invite_sync, inviter_id, invitee_id)
if inserted:
# 插入成功后再给积分(不在 DB_LOCK 内执行,将奖励记录写进 DB
try:
await add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward")
print(
f"[invite] Invite recorded and points rewarded: inviter={inviter_id}, invitee={invitee_id}, points={INVITE_POINTS}")
return True
except Exception as e:
print(f"[invite] Failed to reward inviter={inviter_id}: {e}")
return False
else:
print(f"[invite] Invite already exists (duplicate): inviter={inviter_id}, invitee={invitee_id}")
return False
async def get_invite_count(user_id: int) -> int:
async with DB_LOCK:
return await asyncio.to_thread(_get_invite_count_sync, user_id)
async def add_message_count(user_id: int):
async with DB_LOCK:
await asyncio.to_thread(_add_message_count_sync, user_id)
async def get_daily_message_ranking(date_iso: Optional[str] = None, limit: Optional[int] = None):
async with DB_LOCK:
return await asyncio.to_thread(_get_daily_message_ranking_sync, date_iso, limit)
async def reset_daily_messages(date_iso: Optional[str] = None):
async with DB_LOCK:
await asyncio.to_thread(_reset_daily_messages_sync, date_iso)
# ---------- 币价获取(阻塞 requests -> 放到线程池) ----------
def _get_price_sync(symbol: str) -> Tuple[float, float, float, float]:
headers = {'user-agent': 'Mozilla/5.0'}
tz = LOCAL_TZ
today = datetime.datetime.now(tz).date()
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = {
'symbol': symbol,
'period': '30min',
'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()),
}
try:
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers,
timeout=10)
data = response.json().get("result", {}).get("data", [])
if not data:
return 0.0, 0.0, 0.0, 0.0
current_price = float(data[0]['close'])
today_high = max(float(i['high']) for i in data)
today_low = min(float(i['low']) for i in data)
price_24h_ago = float(data[-1]['open'])
change_24h = (current_price - price_24h_ago) / price_24h_ago * 100 if price_24h_ago != 0 else 0.0
return current_price, change_24h, today_high, today_low
except Exception as e:
print("[price] Failed to fetch price data:", e)
return 0.0, 0.0, 0.0, 0.0
async def get_price(symbol: str):
return await asyncio.to_thread(_get_price_sync, symbol)
def get_price_msg_from_values(name: str, current_price: float, change_24h: float, today_high: float, today_low: float,
url: str):
msg = (
f"📊 {name}/USDT Real-time Quotes\n\n"
f"💰 Current Price{current_price:.4f} USDT\n"
f"📈 24-hour price change{change_24h:.2f}%\n"
f"⬆️ Highest Price{today_high:.4f} USDT\n"
f"⬇️ Lowest Price{today_low:.4f} USDT\n\n"
f"🔗 Transaction Link{url}\n"
f"🎁 Registration Invitation{INVITE_LINK}\n\n"
f"This message is automatically updated hourly."
)
return msg
async def get_price_msg(symbol: str):
current_price, change_24h, today_high, today_low = await get_price(symbol)
name = symbol.split('-')[0]
url = crypto_currencies.get(name, {}).get("url", "#")
return get_price_msg_from_values(name, current_price, change_24h, today_high, today_low, url)
def get_crypto_buttons():
buttons = []
row = []
for name in crypto_currencies.keys():
row.append(Button.inline(name, f"price_{name}".encode()))
if len(row) == 3:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([Button.inline("Return to Menu", command_payload("help"))])
return buttons
# ---------- 命令辅助 ----------
async def get_sender_identity(event) -> Optional[Tuple[int, str]]:
user = await event.get_sender()
if user is None:
return None
username = user.username or user.first_name or "Unknown User"
await add_or_update_user(user.id, username)
return user.id, username
async def build_sign_message(user_id: int, username: str) -> str:
if not await can_sign(user_id):
return f"🌞 @{username}, you have already checked in today!"
await add_points_immediate(user_id, SIGN_POINTS, reason="sign")
await set_last_sign(user_id)
total = await get_points(user_id)
return (
f"✅ @{username} check-in succeed! Earned {SIGN_POINTS} points.\n"
f"Current total points: {total}\n\n"
f"100 points can be exchanged for a 5 USDT futures fee voucher."
)
async def build_daily_rank_message() -> str:
ranking = await get_daily_message_ranking()
if not ranking:
return "📊 No one has spoken yet today."
lines = []
for idx, (uid, uname, cnt) in enumerate(ranking, start=1):
mention = f"@{uname}" if uname else str(uid)
lines.append(f"{idx}. {mention}: {cnt} messages")
return "📊 Today's speaker leaderboard:\n" + "\n".join(lines)
async def build_invite_message(user_id: int, username: str) -> str:
count = await get_invite_count(user_id)
total_pts = await get_points(user_id)
return f"👥 @{username}, you have invited {count} users.\nCurrent points: {total_pts}"
async def build_points_message(user_id: int, username: str) -> str:
total_pts = await get_points(user_id)
invite_count = await get_invite_count(user_id)
return (
f"💰 @{username} Points Overview\n\n"
f"📊 Current points: {total_pts}\n"
f"👥 Invited users: {invite_count}\n\n"
f"💡 100 points can be exchanged for a 5 USDT futures fee voucher."
)
def get_command_list_text() -> str:
msg = "🤖 Robot Command List\n\n"
for key in COMMAND_ORDER:
info = COMMAND_DEFINITIONS[key]
msg += f"`{info['display']}` → {info['description']}\n"
msg += (
f"\nSign-in Reward: {SIGN_POINTS} points each time, and 100 points can be redeemed for a 5 USDT futures fee voucher.\n"
f"Chat Reward: The top {DAILY_SPEAK_TOP_N} users in daily chat activity each receive {DAILY_SPEAK_REWARD} points.\n"
f"Invitation Reward: Each invited user grants {INVITE_POINTS} points.\n"
f"Daily Settlement Time: Every day at 12:00 PM (America/New_York timezone)."
)
return msg
async def send_help(event):
await event.reply(get_command_list_text(), buttons=main_buttons, parse_mode="md")
# ---------- 事件处理(异步) ----------
async def handle_command(event, cmd: str, canonical_cmd: Optional[str] = None):
identity = await get_sender_identity(event)
if not identity:
return
user_id, username = identity
command_key = canonical_cmd or resolve_command(cmd)
if not command_key:
await send_help(event)
return
if command_key == "sign":
reply = await build_sign_message(user_id, username)
elif command_key == "daily_rank":
reply = await build_daily_rank_message()
elif command_key == "my_invites":
reply = await build_invite_message(user_id, username)
elif command_key == "points":
reply = await build_points_message(user_id, username)
elif command_key == "btc":
await event.respond("Please select the currency you wish to view", buttons=get_crypto_buttons())
return
elif command_key == "help":
await send_help(event)
return
else:
await send_help(event)
return
await event.reply(reply)
async def handle_price_command(event, symbol_name: str):
symbol = crypto_currencies[symbol_name]["type"]
msg = await get_price_msg(symbol)
try:
# CallbackQuery 用 edit 更友好
await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception:
await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False)
# ---------- 定时任务:每小时推送随机币价 ----------
async def send_price_periodically(bot, chat_id):
await asyncio.sleep(5) # 启动缓冲
while True:
try:
random_key = random.choice(list(crypto_currencies.keys()))
msg = await get_price_msg(crypto_currencies[random_key]["type"])
await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception as e:
print(f"[price] Failed to send price update: {e}")
# 等待下一小时整点(保持每小时推送一次)
now = datetime.datetime.now(LOCAL_TZ)
next_hour = (now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1))
wait_seconds = (next_hour - now).total_seconds()
await asyncio.sleep(wait_seconds)
# ---------- 定时任务:每天结算(每天 12:00 America/New_York ----------
async def daily_settlement_task(bot):
"""
每天在 LOCAL_TZ 12:00 执行:
- 计算当日发言排名,给前 N 名每人奖励
- 记录日志并清空当天的发言统计
"""
while True:
now = datetime.datetime.now(LOCAL_TZ)
# 目标时间:今天 12:00如果已过则到明天
target = now.replace(hour=12, minute=0, second=0, microsecond=0)
if now >= target:
target = target + datetime.timedelta(days=1)
wait_seconds = (target - now).total_seconds()
print(f"[settlement] Next settlement (12:00) at {target.isoformat()}, waiting {int(wait_seconds)} s")
await asyncio.sleep(wait_seconds)
# 执行结算(结算 target 的前一天)
date_iso = (target - datetime.timedelta(days=1)).date().isoformat()
print(f"[settlement] Executing settlement, target date: {date_iso}")
ranking = await get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N)
if ranking:
for i, (uid, uname, cnt) in enumerate(ranking):
try:
await add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i + 1}")
except Exception as e:
print(f"[settlement] For users {uid} Failed to issue rewards: {e}")
# 给管理员或群组发送结算通知(可选)
try:
summary_lines = [f"📅 {date_iso} speaker ranking settlement results:"]
for i, (uid, uname, cnt) in enumerate(ranking):
display = f"@{uname}" if uname else str(uid)
summary_lines.append(f"{i + 1}. {display}{cnt} messages — awarded {DAILY_SPEAK_REWARD} points")
summary = "\n".join(summary_lines)
for gid in ALLOWED_GROUPS:
await bot.send_message(gid, summary)
except Exception as e:
print("[settlement] Failed to send settlement notification:", e)
else:
print("[settlement] No speaking records for the day; skip speaking rewards.")
# 清空那天的统计
try:
await reset_daily_messages(date_iso=date_iso)
print(f"[settlement] Cleared daily messages for {date_iso}")
except Exception as e:
print("[settlement] Failed to clear daily_messages:", e)
# ---------- 主逻辑 ----------
async def main():
await init_db()
# 使用代理时传 proxy=PROXY若不需要代理则移除该参数
# Telethon proxy 格式可能和你的代理不同,请按需调整(或移除)
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
# 设置 Bot Commands让用户可以通过命令菜单选择命令避免自动添加 @botname
try:
commands = [
BotCommand(
command=COMMAND_DEFINITIONS[key]["display"].lstrip("/"),
description=COMMAND_DEFINITIONS[key]["description"],
)
for key in COMMAND_ORDER
]
await bot(
SetBotCommandsRequest(
scope=BotCommandScopeDefault(),
lang_code="en",
commands=commands,
)
)
print("[bot] Bot commands set successfully")
except Exception as e:
print(f"[bot] Failed to set bot commands: {e}")
# 启动每小时发送币价任务(对每个白名单群)
for gid in ALLOWED_GROUPS:
asyncio.create_task(send_price_periodically(bot, gid))
# 启动每日结算任务12:00 America/New_York
asyncio.create_task(daily_settlement_task(bot))
@bot.on(events.ChatAction)
async def welcome(event: events.ChatAction.Event):
# 欢迎新成员并尝试记录邀请者(若存在)
if not (event.user_added or event.user_joined):
return
# 尝试获取 inviter id兼容多种 Telethon 结构)
inviter_id = None
try:
# 优先 actor_id某些 Telethon 版本和场景)
if hasattr(event, "actor_id") and event.actor_id:
inviter_id = event.actor_id
# 再尝试 action_message.from_id另一些版本
elif hasattr(event, "action_message") and event.action_message:
maybe = getattr(event.action_message, "from_id", None)
if isinstance(maybe, types.PeerUser):
inviter_id = maybe.user_id
elif isinstance(maybe, int):
inviter_id = maybe
# 在某些情形下 from_id 是 a User 或 MessageEntity处理兼容性
elif hasattr(maybe, "user_id"):
inviter_id = getattr(maybe, "user_id", None)
except Exception as e:
print(f"[welcome] Failed to obtain inviter id: {e}")
inviter_id = None
# 对 event.users 中的新成员逐个处理
for user in event.users:
try:
new_uid = user.id
new_name = user.username or user.first_name or "New member"
# 保证用户存在于 users 表中
await add_or_update_user(new_uid, new_name)
# 如果找到邀请者并且不是自己,则记录邀请
if inviter_id and inviter_id != new_uid:
# 确保 inviter 在 users 表中(用户名未知时填 None
await add_or_update_user(inviter_id, None)
# 调用 inc_invite 并检查返回值
success = await inc_invite(inviter_id, new_uid)
if success:
print(f"[welcome] Invite processed: {inviter_id} invited {new_uid}")
else:
print(f"[welcome] Invite already handled or failed: {inviter_id} -> {new_uid}")
elif not inviter_id:
print(f"[welcome] Inviter not found, new member {new_uid} likely joined independently")
else:
print(f"[welcome] Skipping self-invite: {new_uid}")
# 欢迎消息
try:
await event.reply(f"Welcome @{new_name} to the group!\n{get_command_list_text()}")
except Exception:
# 某些场景 event.reply 不允许(例如系统消息),忽略
pass
except Exception as e:
print(f"[welcome] Failed to handle new member: user_id={user.id if user else 'unknown'}, error={e}")
@bot.on(events.NewMessage)
async def on_message(event):
if event.chat_id not in ALLOWED_GROUPS:
return
text = (event.raw_text or "").strip()
if not text:
return
command_key = resolve_command(text)
if command_key:
if "@" in text:
print(f"[command] Raw: {text} -> Resolved: {get_primary_command_display(command_key)}")
await handle_command(event, text, canonical_cmd=command_key)
return
identity = await get_sender_identity(event)
if identity:
await add_message_count(identity[0])
@bot.on(events.CallbackQuery)
async def callback(event):
# 快速响应 UI
try:
await event.answer("Currently being processed...", alert=False)
except Exception:
pass
cmd = event.data.decode() if event.data else ""
normalized_cmd = normalize_command(cmd)
if normalized_cmd.startswith("price_"):
symbol = normalized_cmd.split("_", 1)[1]
await handle_price_command(event, symbol)
else:
command_key = resolve_command(cmd)
if command_key and "@" in cmd:
print(f"[callback] Raw: {cmd} -> Resolved: {get_primary_command_display(command_key)}")
await handle_command(event, cmd, canonical_cmd=command_key)
print("🤖 Bot is running and waiting for group messages...")
await bot.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Bot stopped")

Binary file not shown.

Binary file not shown.