Files
lm_code/telegram/bot-v3.0.py
2025-11-12 17:37:39 +08:00

335 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import requests
from telethon import TelegramClient, events, Button
import sqlite3
from datetime import date
import os
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
ALLOWED_GROUPS = [-1003238845008] # 只允许的群聊
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# ============================
# ---------- 命令字典 ----------
COMMANDS = {
'/sign': '签到,每天签到一次,获取积分',
'签到': '签到,每天签到一次,获取积分',
'/daily_rank': '查看今日发言排行',
'/my_invites': '查看你邀请的人数',
'/help': '显示所有可用命令及说明'
}
# ------------命令列表-------------------
data = {
"签到": ['/sign', '签到', "haha"],
"发言": ["/daily_rank"],
"邀请": ['/my_invites'],
}
def get_command_list_text():
msg = "🤖 机器人命令列表:\n"
for cmd, desc in COMMANDS.items():
msg += f"{cmd}{desc}\n"
return msg
# ---------- 数据库操作 ----------
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id INTEGER,
date TEXT,
count INTEGER DEFAULT 0,
PRIMARY KEY(user_id, date)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id INTEGER,
invitee_id INTEGER,
PRIMARY KEY(inviter_id, invitee_id)
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username=? WHERE user_id=?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
return False if row and row[0] == today else True
def add_points(user_id, points):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?,
last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 发言统计 ----------
def add_message(user_id):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date)
DO UPDATE SET count = count + 1
""", (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking():
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
""", (today,))
results = cursor.fetchall()
conn.close()
return results
# ---------- 邀请统计 ----------
def add_invite(inviter_id, invitee_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO invites (inviter_id, invitee_id) VALUES (?, ?)", (inviter_id, invitee_id))
conn.commit()
conn.close()
def get_invite_count(inviter_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id=?", (inviter_id,))
count = cursor.fetchone()[0]
conn.close()
return count
# ---------- 获取币价 ----------
def get_price():
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'cache-control': 'no-cache',
'dnt': '1',
'origin': 'https://www.websea.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://www.websea.com/',
'sec-ch-ua': '"Chromium";v="142", "Microsoft Edge";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0',
}
tz = datetime.timezone(datetime.timedelta(hours=8))
today = datetime.datetime.now(tz).date()
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = {
'symbol': 'SOL-USDT',
'period': '30min',
'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()),
}
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers)
data = response.json()["result"]["data"]
if not data:
return 0, 0, 0, 0
current_price = float(data[0]['close'])
today_high = max(float(item['high']) for item in data)
today_low = min(float(item['low']) for item in data)
price_24h_ago = float(data[-1]['open'])
change_24h = (current_price - price_24h_ago) / price_24h_ago * 100
return current_price, change_24h, today_high, today_low
# ---------- 发送币价消息 ----------
async def send_price_periodically(bot, chat_id):
while True:
try:
current_price, change_24h, today_high, today_low = get_price()
msg = (
"SOL/USDT 现货数据\n\n"
f"💰价格:{current_price:.2f} USDT\n"
f"📈24小时涨跌幅{change_24h:.2f}%\n"
f"⬆️最高价:{today_high:.2f} USDT\n"
f"⬇️最低价:{today_low:.2f} USDT"
)
await bot.send_message(chat_id, msg)
except Exception as e:
print(f"发送价格失败: {e}")
await asyncio.sleep(3600) # 每小时发送一次
# ---------- 统一命令处理 ----------
async def handle_command(event, cmd):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
reply_text = ""
if cmd in data["签到"]:
if not can_sign(user_id):
reply_text = f"🌞 @{username},你今天已经签到过啦!"
else:
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
reply_text = f"✅ @{username} 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}"
elif cmd in data["发言"]:
ranking = get_daily_message_ranking()
if not ranking:
reply_text = "📊 今日还没有人发言哦~"
else:
reply_text = "📊 今日发言排行:\n"
for i, (uname, count) in enumerate(ranking, 1):
reply_text += f"{i}. {uname}: {count}\n"
elif cmd in data["邀请"]:
count = get_invite_count(user_id)
reply_text = f"👥 @{username},你邀请了 {count} 人。"
elif cmd == '/help':
reply_text = get_command_list_text()
else:
reply_text = f"⚠️ 未知命令:{cmd}"
reply_text += "\n\n" + get_command_list_text()
await event.reply(reply_text)
# ---------- Bot 主逻辑 ----------
async def main():
init_db()
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
# 启动定时任务,每小时发送币价
for group_id in ALLOWED_GROUPS:
asyncio.create_task(send_price_periodically(bot, group_id))
# 新用户加入欢迎
@bot.on(events.ChatAction)
async def welcome_new_user(event):
if event.user_added or event.user_joined:
for user in event.users:
username = user.username or user.first_name or "新成员"
await event.reply(f"欢迎 @{username} 加入群聊!\n{get_command_list_text()}")
# 普通消息处理
@bot.on(events.NewMessage)
async def message_handler(event):
if event.chat_id not in ALLOWED_GROUPS:
return
text = event.raw_text.strip()
user = await event.get_sender()
user_id = user.id
for i in data:
if text in data[i]:
cmd = text.split('@')[0]
await handle_command(event, cmd)
break
else:
add_message(user_id)
# 按钮命令发送
@bot.on(events.NewMessage(pattern='/commands'))
async def send_buttons(event):
buttons = [
[Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")],
[Button.inline("我的邀请", b"/my_invites"), Button.inline("帮助", b"/help")]
]
await event.reply("请选择命令:", buttons=buttons)
# 按钮点击处理
@bot.on(events.CallbackQuery)
async def callback_handler(event):
cmd = event.data.decode()
await handle_command(event, cmd)
await event.answer()
print("🤖 机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(main())