Files
mini_code/tg/main.py
2026-01-09 17:51:09 +08:00

371 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import re
import random
import time
from loguru import logger
from urllib.parse import unquote
from telethon import TelegramClient
from telethon.errors import PhoneNumberBannedError
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.types import InputBotAppShortName, InputPeerUser
from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest
from models.tg_phone_devices import TgPhoneDevices1
# ================== 服务层 ==================
class TelegramService:
"""Telegram客户端服务类封装所有Telegram相关操作"""
def __init__(self, server_config):
"""初始化服务实例
Args:
server_config: 服务器配置对象
"""
self.server_config = server_config
self.client = None # Telegram客户端实例
async def __aenter__(self):
"""异步上下文管理器入口,用于连接客户端"""
try:
self.client = await self._create_client() # 创建客户端实例
try:
await self.client.connect() # 建立连接
except Exception as e:
error_msg = str(e)
logger.error(f"账号 {self.server_config.phone} 连接失败: {error_msg}")
# 处理连接失败错误
if "Connection to Telegram failed 5 time(s)" in error_msg:
logger.error(f"账号 {self.server_config.phone} 代理失效")
if hasattr(self.server_config, 'is_valid_session'):
if hasattr(self.server_config, 'save'):
self.server_config.save(only=[TgPhoneDevices1.is_valid_session])
# 重新抛出异常而不是返回False
raise
return self
except Exception as e:
logger.error(f"账号 {self.server_config['session']} 客户端连接失败: {str(e)}")
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口,用于断开连接"""
try:
if self.client and self.client.is_connected():
await self.client.disconnect() # 断开连接
except Exception as e:
logger.error(f"客户端断开连接失败: {str(e)}")
async def _create_client(self):
"""创建Telegram客户端实例
Returns:
TelegramClient: 配置好的Telegram客户端实例
"""
# 代理配置
proxy = {
'proxy_type': self.server_config["proxy_type"], # 代理类型
'addr': self.server_config["addr"], # 代理地址
'port': self.server_config["port"], # 代理端口
'username': self.server_config["username"] if self.server_config["username"] else "", # 代理用户名
'password': self.server_config["password"] if self.server_config["password"] else "" # 代理密码
}
# 创建并返回Telegram客户端实例
return TelegramClient(
session=self.server_config["session"], # 会话文件路径
api_id=self.server_config["api_id"], # API ID
api_hash=self.server_config["api_hash"], # API Hash
device_model=self.server_config["device_model"], # 设备型号
system_version=self.server_config["system_version"], # 系统版本
app_version=self.server_config["app_version"], # 应用版本
system_lang_code=self.server_config["system_lang_code"], # 系统语言代码
lang_code=self.server_config["lang_code"], # 应用语言代码
proxy=proxy, # 仅当配置了代理地址时使用代理
auto_reconnect=False,
)
@staticmethod
def extract_verification_code(text):
"""从文本中提取5位数字验证码
Args:
text: 包含验证码的文本
Returns:
str: 提取到的验证码如果提取失败返回None
"""
try:
matches = re.findall(r'\d{5}', text) # 使用单词边界匹配5位数字
return matches[0] if matches else None
except (IndexError, TypeError, AttributeError) as e:
try:
matches = re.findall(r'\d{6}', text) # 使用单词边界匹配5位数字
return matches[0] if matches else None
except (IndexError, TypeError, AttributeError) as e:
logger.warning(f"验证码提取失败: {str(e)}")
async def process_verification(self):
"""处理验证流程,获取验证码
Returns:
str: 验证码如果获取失败返回None
"""
try:
messages = await self.client.get_messages(777000, limit=1) # 获取来自Telegram的最新消息
if messages and messages[0].text:
return self.extract_verification_code(messages[0].text) # 提取验证码
return None
except Exception as e:
logger.error(f"验证流程处理失败: {str(e)}")
return None
async def click_confirmation(self):
"""点击确认按钮
Returns:
bool: 是否成功点击确认按钮
"""
try:
messages = await self.client.get_messages(777000, limit=5) # 获取最多5条消息
for msg in messages:
if hasattr(msg, 'buttons') and msg.buttons: # 检查消息是否有按钮
for row in msg.buttons: # 遍历按钮行
for btn in row: # 遍历每个按钮
if hasattr(btn, 'text') and 'confirm' in btn.text.lower(): # 查找确认按钮
await btn.click() # 点击按钮
return True
return False
except PhoneNumberBannedError:
logger.warning("点击确认时发现手机号被封禁")
return False
except Exception as e:
logger.error(f"点击确认失败: {str(e)}")
return False
async def check_bot_name(self, bot_username):
"""检查是否已经打开过指定bot
Args:
bot_username: bot的用户名
Returns:
bool: 是否已经打开过该bot
"""
try:
async for dialog in self.client.iter_dialogs():
try:
for i in dialog.entity.usernames:
if i.username == bot_username:
logger.info(f'已与机器人交互: @{bot_username}')
return True
except:
pass
if dialog.is_user and dialog.entity.username == bot_username:
logger.info(f'已与机器人交互: @{bot_username}')
return True
logger.info(f'尚未与机器人交互: @{bot_username}')
return False
except Exception as e:
logger.error(f"检查bot名称失败: {str(e)}")
return False
async def join_bot(self, bot_name, invite_code=""):
"""加入指定的bot
Args:
bot_name: bot的用户名
invite_code: 邀请码(可选)
Returns:
bool: 是否成功加入bot
"""
try:
bot = await self.client.get_entity(bot_name) # 获取bot实体
if invite_code:
# 使用邀请码启动bot
await self.client(StartBotRequest(
bot=bot_name,
peer=bot_name,
start_param=invite_code
))
else:
# 直接发送/start命令
await self.client.send_message(bot, '/start')
return True
except Exception as e:
logger.error(f"加入bot失败: {str(e)}")
return False
async def get_bot_token(self, bot_name, invite_code=None, platform=None, start_params=None, url=None):
"""获取bot的webview token
Args:
bot_name: bot的用户名
invite_code: 邀请码(可选)
platform: 平台类型(可选)
start_params: 启动参数(可选)
url: 自定义URL(可选)
Returns:
str: 获取到的token
Raises:
Exception: 获取token失败时抛出异常
"""
platform = platform or random.choice(['web', 'tdesktop']) # 默认随机选择平台
try:
if start_params:
me = await self.client.get_me()
logger.info(f'账号 {8617882004254} -- {me.first_name} 登录成功!')
# 使用AppWebView方式获取token
res = await self.client(RequestAppWebViewRequest(
'me',
InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params),
platform="web",
start_param=invite_code if invite_code else None
))
else:
# 使用普通WebView方式获取token
bot = await self.client.get_entity(bot_name)
peer = InputPeerUser(bot.id, bot.access_hash)
res = await self.client(
RequestWebViewRequest(
peer=peer,
bot=bot,
platform=platform,
from_bot_menu=True,
url=url,
)
)
if not res or not res.url:
raise ValueError("从Telegram获取的响应或URL为空")
# 从URL中解析token
parts = res.url.split('tgWebAppData=')
if len(parts) < 2:
raise ValueError("URL格式无效 - 缺少tgWebAppData参数")
token_part = parts[1].split('&tgWebAppVersion')[0]
return unquote(token_part) # URL解码
except Exception as e:
logger.error(f"电话号码:{self.server_config.phone}Token生成失败: {str(e)}")
return str(e)
async def send_click_bot_message(self, bot_name, datas):
"""
datas = [
{"send_message": [], "click_button": [""], },
]
"""
bot = await self.client.get_entity(bot_name)
for data in datas:
if data['send_message']:
for message in data['send_message']:
await self.client.send_message(bot, message)
logger.info(f'电话号码:{self.server_config.phone},发生消息:{message}成功!!!')
time.sleep(1)
break
if data['click_button']:
for button_name in data["click_button"]:
messages = await self.client.get_messages(bot, limit=5)
if not messages:
print("未找到验证消息")
return
start = 0
# 遍历消息,寻找包含按钮的验证消息
for message in messages:
if message.buttons:
for row in message.buttons:
for button in row:
if button_name in button.text.lower():
try:
await button.click()
logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!!!')
start = 1
except:
pass
if start == 1:
break
if start == 1:
break
if start == 1:
break
async def get_bot_message(self, bot_name):
bot = await self.client.get_entity(bot_name)
messages = await self.client.get_messages(bot, limit=1)
# # 遍历消息,寻找包含按钮的验证消息
for message in messages:
return message.message
else:
return ""
async def join_channe(self, bot_name):
for i in bot_name:
try:
await self.client(JoinChannelRequest(i))
except Exception as e:
return False
return True
async def main():
# 查询需要检查的设备
devices = TgPhoneDevices1.select().where(
# TgPhoneDevices1.device_start.is_null(),
TgPhoneDevices1.is_valid_session == 0,
)
for device in devices:
device1 = {
'proxy_type': "http",
'addr': "192.168.1.79",
'port': device.port,
'username': "",
'password': "",
'session': fr"sessions\{device.phone}",
'api_id': 2040,
'api_hash': "b18441a1ff607e10a989891a5462e627",
'device_model': device.device_model,
'system_version': device.system_version,
'app_version': device.app_version,
'system_lang_code': device.system_lang_code,
'lang_code': device.lang_code,
# 'auto_reconnect': "http",
}
async with TelegramService(device1) as service:
if not await service.client.is_user_authorized():
logger.info("失败!!!")
continue
device.is_valid_session = 1
device.save()
# # 操作session对象
# if await service.join_channe(
# bot_name=["WebseaMalaysia"],
# ):
# logger.info("加入群成功!!!")
# else:
# logger.info("加入群失败!!!")
if __name__ == '__main__':
asyncio.run(main())