371 lines
14 KiB
Python
371 lines
14 KiB
Python
import asyncio
|
||
import re
|
||
import random
|
||
import time
|
||
|
||
from loguru import logger
|
||
from urllib.parse import unquote
|
||
|
||
from telethon import TelegramClient
|
||
from telethon.errors import PhoneNumberBannedError
|
||
from telethon.tl.functions.channels import JoinChannelRequest
|
||
from telethon.tl.types import InputBotAppShortName, InputPeerUser
|
||
from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest
|
||
|
||
from models.tg_phone_devices import TgPhoneDevices1
|
||
|
||
|
||
# ================== 服务层 ==================
|
||
class TelegramService:
|
||
"""Telegram客户端服务类,封装所有Telegram相关操作"""
|
||
|
||
def __init__(self, server_config):
|
||
"""初始化服务实例
|
||
Args:
|
||
server_config: 服务器配置对象
|
||
"""
|
||
self.server_config = server_config
|
||
self.client = None # Telegram客户端实例
|
||
|
||
async def __aenter__(self):
|
||
"""异步上下文管理器入口,用于连接客户端"""
|
||
try:
|
||
self.client = await self._create_client() # 创建客户端实例
|
||
|
||
try:
|
||
await self.client.connect() # 建立连接
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.error(f"账号 {self.server_config.phone} 连接失败: {error_msg}")
|
||
|
||
# 处理连接失败错误
|
||
if "Connection to Telegram failed 5 time(s)" in error_msg:
|
||
logger.error(f"账号 {self.server_config.phone} 代理失效")
|
||
if hasattr(self.server_config, 'is_valid_session'):
|
||
if hasattr(self.server_config, 'save'):
|
||
self.server_config.save(only=[TgPhoneDevices1.is_valid_session])
|
||
|
||
# 重新抛出异常,而不是返回False
|
||
raise
|
||
|
||
return self
|
||
except Exception as e:
|
||
logger.error(f"账号 {self.server_config['session']} 客户端连接失败: {str(e)}")
|
||
raise
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
"""异步上下文管理器出口,用于断开连接"""
|
||
try:
|
||
if self.client and self.client.is_connected():
|
||
await self.client.disconnect() # 断开连接
|
||
except Exception as e:
|
||
logger.error(f"客户端断开连接失败: {str(e)}")
|
||
|
||
async def _create_client(self):
|
||
"""创建Telegram客户端实例
|
||
Returns:
|
||
TelegramClient: 配置好的Telegram客户端实例
|
||
"""
|
||
# 代理配置
|
||
proxy = {
|
||
'proxy_type': self.server_config["proxy_type"], # 代理类型
|
||
'addr': self.server_config["addr"], # 代理地址
|
||
'port': self.server_config["port"], # 代理端口
|
||
'username': self.server_config["username"] if self.server_config["username"] else "", # 代理用户名
|
||
'password': self.server_config["password"] if self.server_config["password"] else "" # 代理密码
|
||
}
|
||
|
||
# 创建并返回Telegram客户端实例
|
||
return TelegramClient(
|
||
session=self.server_config["session"], # 会话文件路径
|
||
api_id=self.server_config["api_id"], # API ID
|
||
api_hash=self.server_config["api_hash"], # API Hash
|
||
device_model=self.server_config["device_model"], # 设备型号
|
||
system_version=self.server_config["system_version"], # 系统版本
|
||
app_version=self.server_config["app_version"], # 应用版本
|
||
system_lang_code=self.server_config["system_lang_code"], # 系统语言代码
|
||
lang_code=self.server_config["lang_code"], # 应用语言代码
|
||
proxy=proxy, # 仅当配置了代理地址时使用代理
|
||
auto_reconnect=False,
|
||
)
|
||
|
||
@staticmethod
|
||
def extract_verification_code(text):
|
||
"""从文本中提取5位数字验证码
|
||
Args:
|
||
text: 包含验证码的文本
|
||
Returns:
|
||
str: 提取到的验证码,如果提取失败返回None
|
||
"""
|
||
try:
|
||
matches = re.findall(r'\d{5}', text) # 使用单词边界匹配5位数字
|
||
return matches[0] if matches else None
|
||
except (IndexError, TypeError, AttributeError) as e:
|
||
try:
|
||
matches = re.findall(r'\d{6}', text) # 使用单词边界匹配5位数字
|
||
return matches[0] if matches else None
|
||
except (IndexError, TypeError, AttributeError) as e:
|
||
logger.warning(f"验证码提取失败: {str(e)}")
|
||
|
||
async def process_verification(self):
|
||
"""处理验证流程,获取验证码
|
||
Returns:
|
||
str: 验证码,如果获取失败返回None
|
||
"""
|
||
try:
|
||
messages = await self.client.get_messages(777000, limit=1) # 获取来自Telegram的最新消息
|
||
if messages and messages[0].text:
|
||
return self.extract_verification_code(messages[0].text) # 提取验证码
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"验证流程处理失败: {str(e)}")
|
||
return None
|
||
|
||
async def click_confirmation(self):
|
||
"""点击确认按钮
|
||
Returns:
|
||
bool: 是否成功点击确认按钮
|
||
"""
|
||
try:
|
||
messages = await self.client.get_messages(777000, limit=5) # 获取最多5条消息
|
||
for msg in messages:
|
||
if hasattr(msg, 'buttons') and msg.buttons: # 检查消息是否有按钮
|
||
for row in msg.buttons: # 遍历按钮行
|
||
for btn in row: # 遍历每个按钮
|
||
if hasattr(btn, 'text') and 'confirm' in btn.text.lower(): # 查找确认按钮
|
||
await btn.click() # 点击按钮
|
||
return True
|
||
return False
|
||
except PhoneNumberBannedError:
|
||
logger.warning("点击确认时发现手机号被封禁")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"点击确认失败: {str(e)}")
|
||
return False
|
||
|
||
async def check_bot_name(self, bot_username):
|
||
"""检查是否已经打开过指定bot
|
||
Args:
|
||
bot_username: bot的用户名
|
||
Returns:
|
||
bool: 是否已经打开过该bot
|
||
"""
|
||
try:
|
||
async for dialog in self.client.iter_dialogs():
|
||
try:
|
||
for i in dialog.entity.usernames:
|
||
if i.username == bot_username:
|
||
logger.info(f'已与机器人交互: @{bot_username}')
|
||
return True
|
||
except:
|
||
pass
|
||
if dialog.is_user and dialog.entity.username == bot_username:
|
||
logger.info(f'已与机器人交互: @{bot_username}')
|
||
return True
|
||
logger.info(f'尚未与机器人交互: @{bot_username}')
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"检查bot名称失败: {str(e)}")
|
||
return False
|
||
|
||
async def join_bot(self, bot_name, invite_code=""):
|
||
"""加入指定的bot
|
||
Args:
|
||
bot_name: bot的用户名
|
||
invite_code: 邀请码(可选)
|
||
Returns:
|
||
bool: 是否成功加入bot
|
||
"""
|
||
try:
|
||
bot = await self.client.get_entity(bot_name) # 获取bot实体
|
||
if invite_code:
|
||
# 使用邀请码启动bot
|
||
await self.client(StartBotRequest(
|
||
bot=bot_name,
|
||
peer=bot_name,
|
||
start_param=invite_code
|
||
))
|
||
else:
|
||
# 直接发送/start命令
|
||
await self.client.send_message(bot, '/start')
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"加入bot失败: {str(e)}")
|
||
return False
|
||
|
||
async def get_bot_token(self, bot_name, invite_code=None, platform=None, start_params=None, url=None):
|
||
"""获取bot的webview token
|
||
Args:
|
||
bot_name: bot的用户名
|
||
invite_code: 邀请码(可选)
|
||
platform: 平台类型(可选)
|
||
start_params: 启动参数(可选)
|
||
url: 自定义URL(可选)
|
||
Returns:
|
||
str: 获取到的token
|
||
Raises:
|
||
Exception: 获取token失败时抛出异常
|
||
"""
|
||
|
||
platform = platform or random.choice(['web', 'tdesktop']) # 默认随机选择平台
|
||
|
||
try:
|
||
if start_params:
|
||
me = await self.client.get_me()
|
||
logger.info(f'账号 {8617882004254} -- {me.first_name} 登录成功!')
|
||
|
||
# 使用AppWebView方式获取token
|
||
res = await self.client(RequestAppWebViewRequest(
|
||
'me',
|
||
InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params),
|
||
platform="web",
|
||
start_param=invite_code if invite_code else None
|
||
))
|
||
|
||
else:
|
||
# 使用普通WebView方式获取token
|
||
bot = await self.client.get_entity(bot_name)
|
||
peer = InputPeerUser(bot.id, bot.access_hash)
|
||
res = await self.client(
|
||
RequestWebViewRequest(
|
||
peer=peer,
|
||
bot=bot,
|
||
platform=platform,
|
||
from_bot_menu=True,
|
||
url=url,
|
||
)
|
||
)
|
||
|
||
if not res or not res.url:
|
||
raise ValueError("从Telegram获取的响应或URL为空")
|
||
|
||
# 从URL中解析token
|
||
parts = res.url.split('tgWebAppData=')
|
||
if len(parts) < 2:
|
||
raise ValueError("URL格式无效 - 缺少tgWebAppData参数")
|
||
|
||
token_part = parts[1].split('&tgWebAppVersion')[0]
|
||
return unquote(token_part) # URL解码
|
||
except Exception as e:
|
||
logger.error(f"电话号码:{self.server_config.phone},Token生成失败: {str(e)}")
|
||
return str(e)
|
||
|
||
async def send_click_bot_message(self, bot_name, datas):
|
||
"""
|
||
|
||
datas = [
|
||
{"send_message": [], "click_button": [""], },
|
||
|
||
]
|
||
|
||
|
||
"""
|
||
|
||
bot = await self.client.get_entity(bot_name)
|
||
|
||
for data in datas:
|
||
if data['send_message']:
|
||
for message in data['send_message']:
|
||
await self.client.send_message(bot, message)
|
||
logger.info(f'电话号码:{self.server_config.phone},发生消息:{message}成功!!!')
|
||
time.sleep(1)
|
||
break
|
||
|
||
if data['click_button']:
|
||
|
||
for button_name in data["click_button"]:
|
||
messages = await self.client.get_messages(bot, limit=5)
|
||
if not messages:
|
||
print("未找到验证消息")
|
||
return
|
||
|
||
start = 0
|
||
# 遍历消息,寻找包含按钮的验证消息
|
||
for message in messages:
|
||
if message.buttons:
|
||
for row in message.buttons:
|
||
for button in row:
|
||
if button_name in button.text.lower():
|
||
try:
|
||
await button.click()
|
||
logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!!!')
|
||
start = 1
|
||
except:
|
||
pass
|
||
|
||
if start == 1:
|
||
break
|
||
if start == 1:
|
||
break
|
||
if start == 1:
|
||
break
|
||
|
||
async def get_bot_message(self, bot_name):
|
||
bot = await self.client.get_entity(bot_name)
|
||
|
||
messages = await self.client.get_messages(bot, limit=1)
|
||
|
||
# # 遍历消息,寻找包含按钮的验证消息
|
||
for message in messages:
|
||
return message.message
|
||
else:
|
||
return ""
|
||
|
||
async def join_channe(self, bot_name):
|
||
|
||
for i in bot_name:
|
||
try:
|
||
await self.client(JoinChannelRequest(i))
|
||
|
||
|
||
except Exception as e:
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
async def main():
|
||
# 查询需要检查的设备
|
||
devices = TgPhoneDevices1.select().where(
|
||
# TgPhoneDevices1.device_start.is_null(),
|
||
TgPhoneDevices1.is_valid_session == 0,
|
||
)
|
||
|
||
for device in devices:
|
||
device1 = {
|
||
'proxy_type': "http",
|
||
'addr': "192.168.1.79",
|
||
'port': device.port,
|
||
'username': "",
|
||
'password': "",
|
||
'session': fr"sessions\{device.phone}",
|
||
'api_id': 2040,
|
||
'api_hash': "b18441a1ff607e10a989891a5462e627",
|
||
'device_model': device.device_model,
|
||
'system_version': device.system_version,
|
||
'app_version': device.app_version,
|
||
'system_lang_code': device.system_lang_code,
|
||
'lang_code': device.lang_code,
|
||
# 'auto_reconnect': "http",
|
||
}
|
||
|
||
async with TelegramService(device1) as service:
|
||
if not await service.client.is_user_authorized():
|
||
logger.info("失败!!!")
|
||
continue
|
||
|
||
device.is_valid_session = 1
|
||
device.save()
|
||
|
||
# # 操作session对象
|
||
# if await service.join_channe(
|
||
# bot_name=["WebseaMalaysia"],
|
||
# ):
|
||
# logger.info("加入群成功!!!")
|
||
# else:
|
||
# logger.info("加入群失败!!!")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.run(main())
|