rgfewfger

This commit is contained in:
27942
2025-11-20 18:12:52 +08:00
parent b1e6f681c1
commit 0e50249ab2
10 changed files with 560 additions and 587 deletions

View File

@@ -1,233 +0,0 @@
from telethon import TelegramClient, events
import sqlite3
from datetime import date
import os
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
ALLOWED_GROUPS = [-1003238845008] # 只允许的群聊
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库,创建缺失表"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 用户积分与签到
cursor.execute('''
CREATE TABLE IF NOT EXISTS users
(
user_id
INTEGER
PRIMARY
KEY,
username
TEXT,
points
INTEGER
DEFAULT
0,
last_sign_date
TEXT
)
''')
# 发言统计
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages
(
user_id
INTEGER,
date
TEXT,
count
INTEGER
DEFAULT
0,
PRIMARY
KEY
(
user_id,
date
)
)
''')
# 邀请统计
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites
(
inviter_id
INTEGER,
invitee_id
INTEGER
PRIMARY
KEY
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username=? WHERE user_id=?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
return False if row and row[0] == today else True
def add_points(user_id, points):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?,
last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 发言统计 ----------
def add_message(user_id):
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1) ON CONFLICT(user_id, date)
DO
UPDATE SET count = count + 1
""", (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking():
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
""", (today,))
results = cursor.fetchall()
conn.close()
return results
# ---------- 邀请统计 ----------
def add_invite(inviter_id, invitee_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO invites (inviter_id, invitee_id) VALUES (?, ?)", (inviter_id, invitee_id))
conn.commit()
conn.close()
def get_invite_count(inviter_id):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id=?", (inviter_id,))
count = cursor.fetchone()[0]
conn.close()
return count
# ---------- Bot 主逻辑 ----------
async def main():
# 初始化数据库,保证缺失表也会创建
init_db()
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
@bot.on(events.NewMessage)
async def message_handler(event):
# 限制群聊
if event.chat_id not in ALLOWED_GROUPS:
return
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
# 只统计普通消息,不统计命令
if not event.raw_text.startswith('/'):
add_message(user_id)
# 处理签到命令
if event.raw_text in ['签到', '/sign']:
if not can_sign(user_id):
await event.reply(f"🌞 @{username},你今天已经签到过啦!")
return
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ @{username} 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
# 查询每日发言排行
elif event.raw_text == '/daily_rank':
ranking = get_daily_message_ranking()
if not ranking:
await event.reply("📊 今日还没有人发言哦~")
return
msg = "📊 今日发言排行:\n"
for i, (uname, count) in enumerate(ranking, 1):
msg += f"{i}. {uname}: {count}\n"
await event.reply(msg)
# 查询自己邀请人数
elif event.raw_text == '/my_invites':
count = get_invite_count(user_id)
await event.reply(f"👥 @{username},你邀请了 {count} 人。")
print("🤖 签到机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(main())

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,135 +0,0 @@
from telethon import TelegramClient, events
import sqlite3
from datetime import date
import os
# ========== 配置区 ==========
API_ID = 2040 # ← 替换为你的 Telegram API ID
API_HASH = "b18441a1ff607e10a989891a5462e627" # ← 替换为你的 API HASH
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y" # ← 替换为你的 Bot Token
DB_PATH = "sign.db" # 数据库文件
SIGN_POINTS = 10 # 每次签到获得的积分
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT
)
''')
conn.commit()
conn.close()
def get_user(user_id):
"""获取用户信息"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE user_id=?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def add_or_update_user(user_id, username):
"""如果用户不存在则新增"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
conn.commit()
conn.close()
def can_sign(user_id):
"""判断今天是否可以签到"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
if row and row[0] == today:
return False
return True
def add_points(user_id, points):
"""给用户加积分并更新签到日期"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?, last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
"""获取用户当前积分"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 机器人主逻辑 ----------
def main():
# 初始化数据库
if not os.path.exists(DB_PATH):
init_db()
proxy = {
'proxy_type': "socks5", # 或 'socks4',具体看你的代理类型
'addr': "202.155.144.102", # 代理服务器地址
'port': 31102, # 代理服务器端口
'username': "SyNuejCtrQ", # 如果有用户名,填写
'password': "MH8ioL7EXf" # 如果有密码,填写
}
# proxy = ("socks5", "202.155.144.102", 31102, True, "SyNuejCtrQ", "MH8ioL7EXf")
bot = TelegramClient(
'haha',
api_id=API_ID,
api_hash=API_HASH,
proxy=proxy
).start(bot_token=BOT_TOKEN)
@bot.on(events.NewMessage(pattern='^(签到|/sign)$'))
async def sign_handler(event):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
# 注册用户
add_or_update_user(user_id, username)
# 检查是否已签到
if not can_sign(user_id):
await event.reply(f"🌞 {username},你今天已经签到过啦!")
return
# 签到成功,加积分
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
print("🤖 签到机器人已启动,等待消息中...")
bot.run_until_disconnected()
if __name__ == "__main__":
main()

View File

@@ -1,161 +0,0 @@
from telethon import TelegramClient, events
from telethon.tl.functions.messages import ImportChatInviteRequest
import sqlite3
from datetime import date
import os
import asyncio
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
DB_PATH = "sign.db"
SIGN_POINTS = 10
INVITE_LINK = "ergggreef" # t.me/后面的群链接部分
# 代理配置,如果需要
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# ============================
# ---------- 数据库操作 ----------
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT
)
''')
conn.commit()
conn.close()
def get_user(user_id):
"""获取用户信息"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE user_id=?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def add_or_update_user(user_id, username):
"""如果用户不存在则新增"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
conn.commit()
conn.close()
def can_sign(user_id):
"""判断今天是否可以签到"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id=?", (user_id,))
row = cursor.fetchone()
today = date.today().isoformat()
conn.close()
if row and row[0] == today:
return False
return True
def add_points(user_id, points):
"""给用户加积分并更新签到日期"""
today = date.today().isoformat()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE users
SET points = points + ?, last_sign_date = ?
WHERE user_id = ?
""", (points, today, user_id))
conn.commit()
conn.close()
def get_points(user_id):
"""获取用户当前积分"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id=?", (user_id,))
points = cursor.fetchone()
conn.close()
return points[0] if points else 0
# ---------- 机器人主逻辑 ----------
async def main():
# 初始化数据库
if not os.path.exists(DB_PATH):
init_db()
bot = TelegramClient(
'bot_session',
api_id=API_ID,
api_hash=API_HASH,
proxy=PROXY
)
await bot.start(bot_token=BOT_TOKEN) # 注意这里是 await
# # 尝试加入群聊并获取 chat_id
# try:
# result = await bot(ImportChatInviteRequest(INVITE_LINK))
# chat_id = result.chats[0].id
# print(f"✅ 已加入群聊: {result.chats[0].title}, chat_id={chat_id}")
# except Exception as e:
# # 如果已经加入,遍历对话获取 chat_id
# async for dialog in bot.iter_dialogs():
# if dialog.name and INVITE_LINK in dialog.name: # 简单匹配群名
# chat_id = dialog.id
# print(f"✅ 已存在群聊: {dialog.name}, chat_id={chat_id}")
# break
# else:
# print("❌ 无法加入或找到群聊,请检查邀请链接")
# return
ALLOWED_GROUPS = [-1003238845008]
@bot.on(events.NewMessage(pattern='^(签到|/sign)$'))
async def sign_handler(event):
# 检查是否在允许的群聊中
if event.chat_id not in ALLOWED_GROUPS:
return # 非指定群聊,忽略
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
# 注册用户
add_or_update_user(user_id, username)
# 检查是否已签到
if not can_sign(user_id):
await event.reply(f"🌞 @{username},你今天已经签到过啦!")
return
# 签到成功,加积分
add_points(user_id, SIGN_POINTS)
total = get_points(user_id)
await event.reply(f"✅ @{username} 签到成功!你获得 {SIGN_POINTS} 积分。\n当前总积分:{total}")
print("🤖 签到机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
asyncio.run(main())

538
telegram/gpt.py Normal file
View File

@@ -0,0 +1,538 @@
# bot.py
import random
import asyncio
import datetime
import sqlite3
from zoneinfo import ZoneInfo
import requests
from telethon import TelegramClient, events, Button, types
# ========== 配置区 ==========
API_ID = 2040
API_HASH = "b18441a1ff607e10a989891a5462e627"
BOT_TOKEN = "8451724418:AAGTGqCmc1JiUr88IABhMiQTHeVLcAcnT5Y"
# 数据库文件
DB_PATH = "sign.db"
# 签到积分、邀请积分、发言奖励
SIGN_POINTS = 10
INVITE_POINTS = 10
DAILY_SPEAK_TOP_N = 10
DAILY_SPEAK_REWARD = 10
# 允许机器人运行的群组(只处理这些群的消息)
ALLOWED_GROUPS = [-1003238845008]
# 时区(结算规则用)
LOCAL_TZ = ZoneInfo("America/New_York")
# 代理(可选)
PROXY = {
'proxy_type': "socks5",
'addr': "202.155.144.102",
'port': 31102,
'username': "SyNuejCtrQ",
'password': "MH8ioL7EXf"
}
# 邀请链接 + 币种链接示例(会出现在每条币价消息)
INVITE_LINK = "https://www.websea.my/en/signup?key=77346588"
crypto_currencies = {
"BTC": {"type": "BTC-USDT", "url": "https://www.websea.com/zh-CN/trade/BTC-USDT"},
"ETH": {"type": "ETH-USDT", "url": "https://www.websea.com/zh-CN/trade/ETH-USDT"},
"SOL": {"type": "SOL-USDT", "url": "https://www.websea.com/zh-CN/trade/SOL-USDT"},
"BNB": {"type": "BNB-USDT", "url": "https://www.websea.com/zh-CN/trade/BNB-USDT"},
"WBS": {"type": "WBS-USDT", "url": "https://www.websea.com/zh-CN/trade/WBS-USDT"}
}
# 主菜单按钮
main_buttons = [
[Button.inline("签到", b"/sign"), Button.inline("今日发言排行", b"/daily_rank")],
[Button.inline("我的邀请", b"/my_invites"), Button.inline("币价", b"/btc")],
[Button.inline("帮助", b"/help")]
]
COMMANDS_TEXT = {
'/sign': '签到,每天签到一次,获取积分',
'签到': '签到,每天签到一次,获取积分',
'/daily_rank': '查看今日发言排行',
'/my_invites': '查看你邀请的人数',
'/btc': '获取 币价 菜单',
'/help': '显示所有可用命令及说明'
}
# ============================
# ---------- 数据库操作 ----------
def get_conn():
# 每次操作创建连接,避免 sqlite 的线程/并发问题
return sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
def init_db():
conn = get_conn()
cursor = conn.cursor()
# 用户表:保存积分、上次签到日期、累计邀请数
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
points INTEGER DEFAULT 0,
last_sign_date TEXT,
total_invites INTEGER DEFAULT 0
)
''')
# 每日发言表,复合主键 (user_id, date)
cursor.execute('''
CREATE TABLE IF NOT EXISTS daily_messages (
user_id INTEGER,
date TEXT,
count INTEGER DEFAULT 0,
PRIMARY KEY (user_id, date)
)
''')
# invites 表保存邀请关系inviter_id, invitee_id
cursor.execute('''
CREATE TABLE IF NOT EXISTS invites (
inviter_id INTEGER,
invitee_id INTEGER,
created_at TEXT,
PRIMARY KEY (inviter_id, invitee_id)
)
''')
# 日志表:记录每日结算、奖励发放记录(便于审计)
cursor.execute('''
CREATE TABLE IF NOT EXISTS points_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
points INTEGER,
reason TEXT,
created_at TEXT
)
''')
conn.commit()
conn.close()
def add_or_update_user(user_id, username):
conn = get_conn()
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO users (user_id, username) VALUES (?, ?)", (user_id, username))
cursor.execute("UPDATE users SET username = ? WHERE user_id = ?", (username, user_id))
conn.commit()
conn.close()
def can_sign(user_id):
conn = get_conn()
cursor = conn.cursor()
cursor.execute("SELECT last_sign_date FROM users WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
return False if row and row[0] == today else True
def add_points_immediate(user_id, points, reason=""):
"""立即增加 points 并记录日志"""
now = datetime.datetime.now(LOCAL_TZ).isoformat()
conn = get_conn()
cursor = conn.cursor()
cursor.execute("UPDATE users SET points = points + ? WHERE user_id = ?", (points, user_id))
cursor.execute("INSERT INTO points_log (user_id, points, reason, created_at) VALUES (?, ?, ?, ?)",
(user_id, points, reason, now))
conn.commit()
conn.close()
def get_points(user_id):
conn = get_conn()
cursor = conn.cursor()
cursor.execute("SELECT points FROM users WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
return row[0] if row else 0
def set_last_sign(user_id):
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn()
cursor = conn.cursor()
cursor.execute("UPDATE users SET last_sign_date = ? WHERE user_id = ?", (today, user_id))
conn.commit()
conn.close()
def inc_invite(inviter_id, invitee_id):
"""记录邀请关系并奖励 inviter只奖励第一次记录"""
now = datetime.datetime.now(LOCAL_TZ).isoformat()
conn = get_conn()
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO invites (inviter_id, invitee_id, created_at) VALUES (?, ?, ?)",
(inviter_id, invitee_id, now))
# 增加 inviter 的 total_invites
cursor.execute("UPDATE users SET total_invites = total_invites + 1 WHERE user_id = ?", (inviter_id,))
# 立即奖励积分
add_points_immediate(inviter_id, INVITE_POINTS, reason="invite_reward")
except sqlite3.IntegrityError:
# 已经存在,不重复奖励
pass
finally:
conn.commit()
conn.close()
def get_invite_count(user_id):
conn = get_conn()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM invites WHERE inviter_id = ?", (user_id,))
count = cursor.fetchone()[0]
conn.close()
return count
# ---------- 发言统计 ----------
def add_message_count(user_id):
today = datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO daily_messages (user_id, date, count)
VALUES (?, ?, 1)
ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1
''', (user_id, today))
conn.commit()
conn.close()
def get_daily_message_ranking(date_iso=None, limit=None):
date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn()
cursor = conn.cursor()
cursor.execute('''
SELECT u.user_id, u.username, d.count
FROM daily_messages d
JOIN users u ON d.user_id = u.user_id
WHERE d.date = ?
ORDER BY d.count DESC
{}
'''.format(f"LIMIT {limit}" if limit else ""), (date_iso,))
rows = cursor.fetchall()
conn.close()
return rows
def reset_daily_messages(date_iso=None):
"""删除指定日期的 daily_messages结算后调用"""
date_iso = date_iso or datetime.datetime.now(LOCAL_TZ).date().isoformat()
conn = get_conn()
cursor = conn.cursor()
cursor.execute("DELETE FROM daily_messages WHERE date = ?", (date_iso,))
conn.commit()
conn.close()
# ---------- 币价获取 ----------
def get_price(symbol: str):
"""
使用外部 API 获取当日 K 线数据并返回当前价、24h 变化、今日高、今日低
这里调用的是 websea 的接口(和你原来一致)
"""
headers = {'user-agent': 'Mozilla/5.0'}
tz = LOCAL_TZ
today = datetime.datetime.now(tz).date()
start_of_day = datetime.datetime.combine(today, datetime.time.min, tzinfo=tz)
end_of_day = datetime.datetime.combine(today, datetime.time.max, tzinfo=tz)
params = {
'symbol': symbol,
'period': '30min',
'start': int(start_of_day.timestamp()),
'end': int(end_of_day.timestamp()),
}
try:
response = requests.get('https://eapi.websea.com/webApi/market/getKline', params=params, headers=headers, timeout=10)
data = response.json().get("result", {}).get("data", [])
if not data:
return 0.0, 0.0, 0.0, 0.0
current_price = float(data[0]['close'])
today_high = max(float(i['high']) for i in data)
today_low = min(float(i['low']) for i in data)
price_24h_ago = float(data[-1]['open'])
change_24h = (current_price - price_24h_ago) / price_24h_ago * 100 if price_24h_ago != 0 else 0.0
return current_price, change_24h, today_high, today_low
except Exception as e:
print("获取币价失败:", e)
return 0.0, 0.0, 0.0, 0.0
def get_price_msg(symbol: str):
current_price, change_24h, today_high, today_low = get_price(symbol)
name = symbol.split('-')[0]
url = crypto_currencies.get(name, {}).get("url", "#")
msg = (
f"📊 {name}/USDT 实时行情\n\n"
f"💰 当前价格:{current_price:.4f} USDT\n"
f"📈 24小时涨跌幅{change_24h:.2f}%\n"
f"⬆️ 最高价:{today_high:.4f} USDT\n"
f"⬇️ 最低价:{today_low:.4f} USDT\n\n"
f"🔗 交易链接:{url}\n"
f"🎁 注册邀请:{INVITE_LINK}\n\n"
f"(此消息每小时自动更新)"
)
return msg
def get_crypto_buttons():
"""生成币种选择按钮"""
buttons = []
row = []
for name in crypto_currencies.keys():
row.append(Button.inline(name, f"price_{name}".encode()))
if len(row) == 3:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([Button.inline("返回菜单", b"/help")])
return buttons
# ---------- 机器人命令处理 ----------
def get_command_list_text():
msg = "🤖 机器人命令列表:\n"
for cmd, desc in COMMANDS_TEXT.items():
msg += f"{cmd}{desc}\n"
msg += f"\n签到奖励:每次 {SIGN_POINTS} 积分100积分可兑换 5 USDT 合约手续费代金券。\n发言奖励:每日排行前 {DAILY_SPEAK_TOP_N} 每人 {DAILY_SPEAK_REWARD} 积分。\n邀请奖励:每邀请一人 {INVITE_POINTS} 积分。\n每日结算时间:每天 12:00America/New_York 时区)。"
return msg
async def handle_command(event, cmd):
user = await event.get_sender()
user_id = user.id
username = user.username or user.first_name or "未知用户"
add_or_update_user(user_id, username)
reply = ""
if cmd in ["/sign", "签到", "/tanda"]:
if not can_sign(user_id):
reply = f"🌞 @{username},你今天已经签到过啦!"
else:
add_points_immediate(user_id, SIGN_POINTS, reason="sign")
set_last_sign(user_id)
total = get_points(user_id)
reply = f"✅ @{username} 签到成功!获得 {SIGN_POINTS} 积分\n当前总积分:{total}\n\n100积分可兑换 5USDT 合约手续费代金券。"
elif cmd in ["/daily_rank", "发言", "/kedudukan_harian"]:
ranking = get_daily_message_ranking()
if not ranking:
reply = "📊 今日还没有人发言哦~"
else:
reply_lines = []
for i, (uid, uname, cnt) in enumerate(ranking):
reply_lines.append(f"{i + 1}. @{uname or uid}: {cnt}")
reply = "📊 今日发言排行:\n" + "\n".join(reply_lines)
elif cmd in ["/my_invites", "邀请", "/daily_position"]:
count = get_invite_count(user_id)
total_pts = get_points(user_id)
reply = f"👥 @{username},你邀请了 {count} 人。\n当前积分:{total_pts}"
elif cmd in ["/btc", "币价"]:
await event.respond("请选择要查看的币种:", buttons=get_crypto_buttons())
return
else:
await event.reply(get_command_list_text(), buttons=main_buttons)
return
await event.reply(reply)
async def handle_price_command(event, symbol_name: str):
symbol = crypto_currencies[symbol_name]["type"]
msg = get_price_msg(symbol)
# event.edit 用于 CallbackQueryevent.respond/edit 都可
try:
await event.edit(msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception:
await event.respond(msg, buttons=get_crypto_buttons(), link_preview=False)
# ---------- 定时任务:每小时推送随机币价 ----------
async def send_price_periodically(bot, chat_id):
await asyncio.sleep(5) # 启动缓冲
while True:
try:
random_key = random.choice(list(crypto_currencies.keys()))
msg = get_price_msg(crypto_currencies[random_key]["type"])
# 注意:如果群组比较多,此处简单发送。可优化为只在特定时间或轮询。
await bot.send_message(chat_id, msg, buttons=get_crypto_buttons(), link_preview=False)
except Exception as e:
print(f"发送价格失败: {e}")
# 等待下一小时整点(保持每小时推送一次)
now = datetime.datetime.now(LOCAL_TZ)
next_hour = (now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1))
wait_seconds = (next_hour - now).total_seconds()
await asyncio.sleep(wait_seconds)
# ---------- 定时任务:每天结算(每天 12:00 America/New_York ----------
async def daily_settlement_task(bot):
"""每天在 LOCAL_TZ 12:00 执行:
- 计算当日发言排名,给前 N 名每人奖励
- 记录日志并清空当天的发言统计
"""
while True:
now = datetime.datetime.now(LOCAL_TZ)
# 目标时间:今天 12:00如果已过则到明天
target = now.replace(hour=12, minute=0, second=0, microsecond=0)
if now >= target:
target = target + datetime.timedelta(days=1)
wait_seconds = (target - now).total_seconds()
print(f"[settlement] 下次结算12:00{target.isoformat()},等待 {int(wait_seconds)} s")
await asyncio.sleep(wait_seconds)
# 执行结算
date_iso = (target - datetime.timedelta(days=1)).date().isoformat() # 结算前一天(也可结算今天到目前为止)
print(f"[settlement] 执行结算,目标日期:{date_iso}")
ranking = get_daily_message_ranking(date_iso=date_iso, limit=DAILY_SPEAK_TOP_N)
if ranking:
for i, (uid, uname, cnt) in enumerate(ranking):
try:
add_points_immediate(uid, DAILY_SPEAK_REWARD, reason=f"daily_speak_rank_{i+1}")
except Exception as e:
print(f"[settlement] 给用户 {uid} 发奖励失败: {e}")
# 给管理员或群组发送结算通知(可选)
try:
summary_lines = [f"📅 {date_iso} 发言排行榜结算结果:"]
for i, (uid, uname, cnt) in enumerate(ranking):
summary_lines.append(f"{i+1}. @{uname or uid}{cnt} 条 — 获得 {DAILY_SPEAK_REWARD} 积分")
summary = "\n".join(summary_lines)
for gid in ALLOWED_GROUPS:
await bot.send_message(gid, summary)
except Exception as e:
print("[settlement] 发送结算通知失败:", e)
else:
print("[settlement] 当日无发言记录,跳过发言奖励")
# 清空那天的统计
try:
reset_daily_messages(date_iso=date_iso)
print(f"[settlement] 清理完成:{date_iso}")
except Exception as e:
print("[settlement] 清理 daily_messages 失败:", e)
# ---------- 主逻辑 ----------
async def main():
init_db()
# 使用代理时传 proxy=PROXY若不需要代理则移除该参数
bot = TelegramClient('bot_session', API_ID, API_HASH, proxy=PROXY)
await bot.start(bot_token=BOT_TOKEN)
# 启动每小时发送币价任务(对每个白名单群)
for gid in ALLOWED_GROUPS:
asyncio.create_task(send_price_periodically(bot, gid))
# 启动每日结算任务12:00 America/New_York
asyncio.create_task(daily_settlement_task(bot))
@bot.on(events.ChatAction)
async def welcome(event: events.ChatAction.Event):
# 欢迎新成员并尝试记录邀请者(若存在)
if event.user_added or event.user_joined:
for user in event.users:
name = user.username or user.first_name or "新成员"
# 尝试获得 inviter id不同版本的 Telethon 结构不完全一致)
inviter_id = None
try:
# 若是邀请添加action_message 的 from_id 通常是 inviter
if hasattr(event, 'action_message') and event.action_message:
maybe = getattr(event.action_message, "from_id", None)
if isinstance(maybe, types.PeerUser):
inviter_id = maybe.user_id
elif isinstance(maybe, (int,)):
inviter_id = maybe
# Telethon 也可能直接暴露 actor_id
if getattr(event, "actor_id", None):
inviter_id = event.actor_id
except Exception:
inviter_id = None
# 如果 inviter_id 存在且与新成员不同,则记录邀请并奖励
try:
if inviter_id and inviter_id != user.id:
# 确保 inviter 存在 users 表
add_or_update_user(inviter_id, "unknown")
inc_invite(inviter_id, user.id)
except Exception as e:
print("记录邀请失败:", e)
# 更新/插入新成员到 users 表
add_or_update_user(user.id, user.username or user.first_name or "新成员")
# 欢迎消息(发送到群内)
try:
await event.reply(f"欢迎 @{name} 加入群聊!\n{get_command_list_text()}")
except Exception:
pass
@bot.on(events.NewMessage)
async def on_message(event):
# 只处理白名单群组
if event.chat_id not in ALLOWED_GROUPS:
return
# 普通消息处理:命令或记录发言计数
text = (event.raw_text or "").strip()
if not text:
return
# 去掉命令中的 @botname如 /sign@mybot
if "@" in text and text.startswith("/"):
text = text.split("@")[0]
# 小写匹配命令(命令与 data 映射)
lowered = text.lower()
# 命令集合
cmds_map = {
"签到": ["/sign", "签到", "/tanda"],
"发言": ["/daily_rank", "发言", "/kedudukan_harian"],
"邀请": ["/my_invites", "邀请", "/daily_position"],
"币价": ["/btc", "币价"]
}
for group_cmds in cmds_map.values():
if lowered in [c.lower() for c in group_cmds]:
await handle_command(event, lowered)
return
# 不是命令则记录发言次数
sender = await event.get_sender()
if sender:
add_or_update_user(sender.id, sender.username or sender.first_name or "未知用户")
add_message_count(sender.id)
@bot.on(events.CallbackQuery)
async def callback(event):
await event.answer("正在处理...", alert=False)
cmd = event.data.decode()
if "@" in cmd and cmd.startswith("/"):
cmd = cmd.split("@")[0]
if cmd.startswith("price_"):
symbol = cmd.split("_", 1)[1]
await handle_price_command(event, symbol)
else:
await handle_command(event, cmd)
print("🤖 机器人已启动,等待群聊消息...")
await bot.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("机器人已停止")

Binary file not shown.

80
test.py
View File

@@ -1,64 +1,28 @@
import requests
from cozepy import Coze, TokenAuth, Message, ChatEventType, COZE_CN_BASE_URL
cookies = {
'guest_id_marketing': 'v1%3A176354098710586324',
'guest_id_ads': 'v1%3A176354098710586324',
'guest_id': 'v1%3A176354098710586324',
'gt': '1991061326962270389',
'__cuid': 'bc4bb3318b8746c4960a0edfb2220910',
'personalization_id': '"v1_zni2BqKNlpGzMk7sPKb7TQ=="',
'g_state': '{"i_l":0,"i_ll":1763541027555,"i_b":"vumczuwzuH950z/ce0D0r85AlyqFZdXKjwakaa0Q3OY"}',
'kdt': 'UsPg3G0YUE9AX1KekidFHURQ3sLU6MxeO0KgEAQx',
'auth_token': '7775fb70c540a53a25bf1f0cd46f3cdc20a1d02c',
'ct0': 'c6fc686e2feb83cbd8f08f2e38e2261b05b67b3746bc0c10adf50a7ed3fc0946995fc4c38401bde2a3f34adeb7bcf83e94a35a5bfce0f7ede162301ff303fcb78b1907ba4566f3069c02972749f719b8',
'att': '1-SKx1SRcfVBgEIiHRpu5ZTxfSRLbmpbnUOZ9mMQWi',
'twid': 'u%3D1528694652034875394',
'lang': 'en',
'__cf_bm': 'hwKktTMrrT16g.Eupzx3L5Q1RkycVS0bopGHPlEGi8o-1763606210.1419973-1.0.1.1-I.kWkIZwO.KM3mg8xf14I5ndINV63tz9hRYAqy_7Cs0DCh_lR_AOG9UFK6mUqKm5FIBpXyNSHjSEPKP6wSMlRN103lj_8QWAXhEnqV9Xk7GJqb0LUueVH1j4Q8Eq3F6u',
}
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA',
'cache-control': 'no-cache',
'content-type': 'application/json',
'dnt': '1',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://x.com/JagadeeshHubba3',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
'x-client-transaction-id': 'cforl8McAciujd9vK8bXw0NJX7cBMczX3+rDSnXU3SWD6Nnm9IqlSr5LtJ7Jmn27NiJqvnW0b/nXyTJykFpXZ0pTuU1mcg',
'x-csrf-token': 'c6fc686e2feb83cbd8f08f2e38e2261b05b67b3746bc0c10adf50a7ed3fc0946995fc4c38401bde2a3f34adeb7bcf83e94a35a5bfce0f7ede162301ff303fcb78b1907ba4566f3069c02972749f719b8',
'x-twitter-active-user': 'yes',
'x-twitter-auth-type': 'OAuth2Session',
'x-twitter-client-language': 'en',
'x-xp-forwarded-for': 'b1d57f4ffda0ecfb39dbf4b1ead15e7950587f64103ecba2f31a4026c56f9d1457780d41080fb47e8508903974f4c44118fd9c7037b0a006e051b0e750bcecdfec6e7278438cbdc92c3e957124c35a869b4b1eec498ae784c5961dedbdbcf96baf6e92090400ffb4d7b980b4a965acb18b3e97f729d4864c8a1b0afc6ddaf9ca179a0741824472bcdeab611904e469eec5407fdb86a3b277b4a80994124da61318c5a59b29b12bdc5522de3cddfd682a8cbd51ef5abf12d2ca1efbbc06f1e247c9021109970a00de0f62cd3726da858620c251185d1a934c50850d413f3524cedafa387cd1b2b86f697abb64a95d2612fab8803b342e45065ab7ea',
# 'cookie': 'guest_id_marketing=v1%3A176354098710586324; guest_id_ads=v1%3A176354098710586324; guest_id=v1%3A176354098710586324; gt=1991061326962270389; __cuid=bc4bb3318b8746c4960a0edfb2220910; personalization_id="v1_zni2BqKNlpGzMk7sPKb7TQ=="; g_state={"i_l":0,"i_ll":1763541027555,"i_b":"vumczuwzuH950z/ce0D0r85AlyqFZdXKjwakaa0Q3OY"}; kdt=UsPg3G0YUE9AX1KekidFHURQ3sLU6MxeO0KgEAQx; auth_token=7775fb70c540a53a25bf1f0cd46f3cdc20a1d02c; ct0=c6fc686e2feb83cbd8f08f2e38e2261b05b67b3746bc0c10adf50a7ed3fc0946995fc4c38401bde2a3f34adeb7bcf83e94a35a5bfce0f7ede162301ff303fcb78b1907ba4566f3069c02972749f719b8; att=1-SKx1SRcfVBgEIiHRpu5ZTxfSRLbmpbnUOZ9mMQWi; twid=u%3D1528694652034875394; lang=en; __cf_bm=hwKktTMrrT16g.Eupzx3L5Q1RkycVS0bopGHPlEGi8o-1763606210.1419973-1.0.1.1-I.kWkIZwO.KM3mg8xf14I5ndINV63tz9hRYAqy_7Cs0DCh_lR_AOG9UFK6mUqKm5FIBpXyNSHjSEPKP6wSMlRN103lj_8QWAXhEnqV9Xk7GJqb0LUueVH1j4Q8Eq3F6u',
}
def to_xx(user_id, quest_text):
"""调用Coze API进行对话"""
coze = Coze(auth=TokenAuth(token="pat_TNHVdiVDn9poKjUXYqVArNwGd9C2cWIIP04ltz4N85WClayks7VuNDF9ewHb5rTi"),
base_url=COZE_CN_BASE_URL)
params = {
'variables': '{"userId":"1528694652034875394","count":20,"includePromotedContent":true,"withQuickPromoteEligibilityTweetFields":true,"withVoice":true}',
'features': '{"rweb_video_screen_enabled":false,"profile_label_improvements_pcf_label_in_post_enabled":true,"responsive_web_profile_redirect_enabled":false,"rweb_tipjar_consumption_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":true,"responsive_web_jetfuel_frame":true,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_show_grok_translated_post":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_grok_imagine_annotation_enabled":true,"responsive_web_grok_community_note_auto_translation_is_enabled":false,"responsive_web_enhance_cards_enabled":false}',
'fieldToggles': '{"withArticlePlainText":false}',
}
bot_id = "7566918492960620559"
user_id = user_id
response = requests.get(
'https://x.com/i/api/graphql/lZRf8IC-GTuGxDwcsHW8aw/UserTweets',
params=params,
cookies=cookies,
headers=headers,
)
text = ""
for event in coze.chat.stream(
bot_id=bot_id,
user_id=user_id,
additional_messages=[
Message.build_user_question_text(quest_text)
]
):
if event.event == ChatEventType.CONVERSATION_MESSAGE_DELTA:
message = event.message
text += message.content
url_id = ""
for i in response.json()["data"]['user']['result']["timeline"]['timeline']["instructions"][1]["entries"]:
if "tweet" in i["entryId"]:
url_id += i["sortIndex"] + ";"
return text
print(url_id)
if __name__ == '__main__':
print(to_xx(user_id="7566918492960620559", quest_text="我想要加入你们公司,需要什么要求"))