Files
lm_code/telegram/bot-v3.0.py
2025-11-13 10:31:13 +08:00

385 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import random
import requests
from telethon import TelegramClient, events, Button
import sqlite3
from datetime import date
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
ALLOWED_GROUPS = [-1003238845008]
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
INVITE_LINK = "https://www.websea.my/en/signup?key=77346588"
# ============================
# 命令字典
COMMANDS = {
'/sign': '签到,每天签到一次,获取积分',
'签到': '签到,每天签到一次,获取积分',
'/daily_rank': '查看今日发言排行',
'/my_invites': '查看你邀请的人数',
# '/btc': '获取 BTC/USDT 最新价格',
'/help': '显示所有可用命令及说明'
}
# 命令分组
data = {
"签到": ['/sign', '签到',"/tanda"],
"发言": ["/daily_rank","发言","/kedudukan_harian"],
"邀请": ['/my_invites',"邀请","/daily_position"],
"币价": ['/btc', 'btc']
}
# 支持币种
crypto_currencies = {
"BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"},
"ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"},
"SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"},
"BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"},
"WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"}
}
# 主菜单按钮
main_buttons = [
[Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")],
[Button.inline("我的邀请", b"/my_invites"), Button.inline("币价", b"/btc")],
[Button.inline("帮助", b"/help")]
]
def get_command_list_text():
msg = "🤖 机器人命令列表:\n"
for cmd, desc in COMMANDS.items():
msg += f"{cmd}{desc}\n"
return msg
# ---------- 数据库 ----------
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER
PRIMARY
KEY,
username
TEXT,
points
INTEGER
DEFAULT
0,
last_sign_date
TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id
INTEGER,
date
TEXT,
count
INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id
INTEGER,
invitee_id
INTEGER,
PRIMARY
KEY
(
inviter_id,
invitee_id
)
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username=? WHERE user_id=?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
return False if row and row[0] == today else True
def add_points(user_id, points):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET points = points + ?, last_sign_date = ? WHERE user_id = ?",
(points, today, user_id)
)
conn.commit()
conn.close()
def get_points(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else 0
# ---------- 发言统计 ----------
def add_message(user_id):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date) DO
UPDATE SET count = count + 1
''', (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking():
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
''', (today,))
rows = cursor.fetchall()
conn.close()
return rows
# ---------- 币价 ----------
def get_price(symbol: str):
headers = {'user-agent': 'Mozilla/5.0'}
tz = datetime.timezone(datetime.timedelta(hours=8))
today = datetime.datetime.now(tz).date()
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = {
'symbol': symbol,
'period': '30min',
'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()),
}
try:
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers,
timeout=10)
data = response.json()["result"]["data"]
if not data:
return 0, 0, 0, 0
current_price = float(data[0]['close'])
today_high = max(float(i['high']) for i in data)
today_low = min(float(i['low']) for i in data)
price_24h_ago = float(data[-1]['open'])
change_24h = (current_price - price_24h_ago) / price_24h_ago * 100
return current_price, change_24h, today_high, today_low
except Exception as e:
print("获取币价失败:", e)
return 0, 0, 0, 0
def get_price_msg(symbol: str):
current_price, change_24h, today_high, today_low = get_price(symbol)
name = symbol.split('-')[0]
url = crypto_currencies.get(name, {}).get("url", "#")
msg = (
f"📊 {name}/USDT 实时行情\n\n"
f"💰 当前价格:{current_price:.4f} USDT\n"
f"📈 24小时涨跌幅{change_24h:.2f}%\n"
f"⬆️ 最高价:{today_high:.4f} USDT\n"
f"⬇️ 最低价:{today_low:.4f} USDT\n\n"
f"🔗 交易链接:[点击前往]({url})\n"
f"🎁 注册邀请:[立即加入]({INVITE_LINK})"
)
return msg
def get_crypto_buttons():
"""生成币种选择按钮"""
buttons = []
row = []
for name in crypto_currencies.keys():
row.append(Button.inline(name, f"price_{name}".encode()))
if len(row) == 3:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([Button.inline("返回菜单", b"/help")])
return buttons
# ---------- 命令处理 ----------
async def handle_command(event, cmd):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
reply = ""
if cmd in data["签到"]:
if not can_sign(user_id):
reply = f"🌞 @{username},你今天已经签到过啦!"
else:
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
reply = f"✅ @{username} 签到成功!获得 {SIGN_POINTS} 积分\n当前总积分:{total}"
elif cmd in data["发言"]:
ranking = get_daily_message_ranking()
if not ranking:
reply = "📊 今日还没有人发言哦~"
else:
reply = "📊 今日发言排行:\n" + "\n".join(f"{i + 1}. {u}: {c}" for i, (u, c) in enumerate(ranking))
elif cmd in data["邀请"]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id=?", (user_id,))
count = cursor.fetchone()[0]
conn.close()
reply = f"👥 @{username},你邀请了 {count} 人。"
elif cmd in data["币价"]:
await event.respond("请选择要查看的币种:", buttons=get_crypto_buttons())
return
else:
await event.reply(get_command_list_text(), buttons=main_buttons)
return
await event.reply(reply)
# ---------- 价格按钮处理 ----------
async def handle_price_command(event, symbol_name: str):
symbol = crypto_currencies[symbol_name]["type"]
msg = get_price_msg(symbol)
await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False)
# ---------- 主逻辑 ----------
async def main():
init_db()
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
# 定时发送 SOL 币价
for group_id in ALLOWED_GROUPS:
asyncio.create_task(send_price_periodically(bot, group_id))
# 欢迎新成员
@bot.on(events.ChatAction)
async def welcome(event):
if event.user_added or event.user_joined:
for user in event.users:
name = user.username or user.first_name or "新成员"
await event.reply(f"欢迎 @{name} 加入群聊!\n{get_command_list_text()}")
# 普通消息
@bot.on(events.NewMessage)
async def on_message(event):
if event.chat_id not in ALLOWED_GROUPS:
return
text = event.raw_text.strip().lower()
# ✅ 去掉命令中的 @botname如 /sign@dockersebot → /sign
if "@" in text:
text = text.split("@")[0]
for group in data:
if text in [cmd.lower() for cmd in data[group]]:
await handle_command(event, text)
return
add_message((await event.get_sender()).id)
# # help
# @bot.on(events.NewMessage(pattern='/help'))
# async def send_help(event):
# 按钮点击
@bot.on(events.CallbackQuery)
async def callback(event):
await event.answer("正在处理...", alert=False)
cmd = event.data.decode()
# ✅ 同样去掉命令中的 @botname
if "@" in cmd:
cmd = cmd.split("@")[0]
if cmd.startswith("price_"):
symbol = cmd.split("_")[1]
await handle_price_command(event, symbol)
else:
await handle_command(event, cmd)
print("🤖 机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
# ---------- 定时币价 ----------
async def send_price_periodically(bot, chat_id):
while True:
try:
random_key = random.choice(list(crypto_currencies.keys()))
msg = get_price_msg(crypto_currencies[random_key]["type"])
await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception as e:
print(f"发送价格失败: {e}")
await asyncio.sleep(3600) # 每小时更新
if __name__ == "__main__":
asyncio.run(main())