Files
mini_code/tg/text.py

313 lines
12 KiB
Python
Raw Normal View History

2025-12-01 17:22:53 +08:00
import asyncio
import csv
import random
import re
2026-01-05 00:56:48 +08:00
from pathlib import Path
2025-12-01 17:22:53 +08:00
from urllib.parse import unquote
from loguru import logger
from telethon import TelegramClient, functions
from telethon.errors import NewSaltInvalidError, NewSettingsInvalidError, PasswordHashInvalidError, FloodWaitError, \
PeerFloodError, UserPrivacyRestrictedError, ChatAdminRequiredError, UserAlreadyParticipantError, UserKickedError, \
ChannelsTooMuchError, UsersTooMuchError, ChatWriteForbiddenError, BadRequestError, RPCError
from telethon.tl import types
from telethon.tl.functions.account import GetAuthorizationsRequest, ResetAuthorizationRequest
from telethon.tl.functions.channels import JoinChannelRequest, InviteToChannelRequest, GetFullChannelRequest
from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, ExportChatInviteRequest, \
AddChatUserRequest, ImportChatInviteRequest
from telethon.tl.types import InputBotAppShortName, ChannelParticipantsSearch, Chat, Channel, User
from models.tg_phone_devices import TgPhoneDevices
async def create_telegram_client(server):
"""
创建并配置 Telegram 客户端
"""
proxy = {
'proxy_type': server.proxy_type, # 或 'socks4',具体看你的代理类型
2026-01-09 14:40:20 +08:00
'addr': "192.168.1.79", # 代理服务器地址
2025-12-01 17:22:53 +08:00
'port': int(server.port), # 代理服务器端口
'username': server.user if server.user else "", # 如果有用户名,填写
'password': server.pwd if server.pwd else "" # 如果有密码,填写
}
2026-01-05 00:56:48 +08:00
base_dir = Path(__file__).parent.parent
2025-12-01 17:22:53 +08:00
client = TelegramClient(
2026-01-05 00:56:48 +08:00
fr"{base_dir}\sessions\{server.phone}",
2025-12-01 17:22:53 +08:00
api_id=server.api_id,
api_hash=server.api_hash,
device_model=server.device_model,
system_version=server.system_version,
app_version=server.app_version,
system_lang_code=server.system_lang_code,
lang_code=server.lang_code,
proxy=proxy
)
return client
async def main1111(phone=None):
server, server_type = TgPhoneDevices().get_or_create(
phone=phone,
)
client = await create_telegram_client(server)
await client.connect()
# await client.log_out()
# if await client.is_user_authorized():
# me = await client.get_me()
# logger.info(f'账号 {8617882004254} -- {me.username} 登录成功!')
#
# print(me.first_name)
# print(me.last_name)
# else:
# logger.error(f'账号 {8617882004254} 登录失败!')
# ===========================================================================================================
# await client.send_message('dockerse', 'hrhrerhrhrhrhml2')
# --------------------------------------------------------------------------------
# # 查询所有聊天
# bot_username = "pengu_clash_bot"
# async for dialog in client.iter_dialogs():
# # print(dialog)
# try:
# for i in dialog.entity.usernames:
# if i.username == bot_username:
# logger.info(f'已与机器人交互: @{bot_username}')
# return True
# except:
# pass
# if dialog.is_user and dialog.entity.username == bot_username:
# logger.info(f'已与机器人交互: @{bot_username}')
# return True
# logger.info(f'尚未与机器人交互: @{bot_username}')
# bot_username = "BlumCryptoBot"
#
# # 获取目标 Bot 的实体信息
# bot_entity = await client.get_input_entity(bot_username)
# print(bot_entity)
# # 获取用户的所有对话列表
# dialogs = await client.get_dialogs()
#
# # 遍历对话列表,检查是否存在该 Bot 的私聊
# for dialog in dialogs:
# print(dialog)
# if dialog.entity.id == bot_entity.id:
# print(1)
# ------------------------------------------------------------------------------
# ###点击 confirm
# # 获取最近5条消息避免漏掉
# messages = await client.get_messages(777000, limit=5)
# if not messages:
# print("未找到验证消息")
# return
#
# # 遍历消息,寻找包含按钮的验证消息
# for message in messages:
# if message.buttons:
# for row in message.buttons:
# for button in row:
# if 'confirm' in button.text.lower():
# try:
# await button.click()
# print(f"已点击消息 {message.id} 的 Confirm 按钮")
# except:
# pass
#
# ---------------------------------------------------------
# # 获取所有登录设别信息
# # 发送请求以获取授权信息
# authorizations = await client(GetAuthorizationsRequest())
#
# # 打印每个授权会话的信息
# for authorization in authorizations.authorizations:
# print(f"授权ID: {authorization.hash}")
# print(f"设备模型: {authorization.device_model}")
# print(f"平台: {authorization.platform}")
# print(f"系统版本: {authorization.system_version}")
# print(f"应用名称: {authorization.app_name}")
# print(f"应用版本: {authorization.app_version}")
# print(f"IP 地址: {authorization.ip}")
# print(f"国家: {authorization.country}")
# print(f"创建时间: {authorization.date_created}")
# print(f"最后活跃时间: {authorization.date_active}")
# print(f"是否官方应用: {'是' if authorization.official_app else '否'}")
# print(f"是否当前设备: {'是' if authorization.current else '否'}")
# print('-' * 40)
# ----------------------------------------------------------------------------------------
# # 踢出指定设备名的设备
#
# # 发送请求以获取授权信息
# authorizations = await client(GetAuthorizationsRequest())
#
# # 设定你要踢出的设备名
# target_device_model = 'Microsoft Edge 121'
#
# # 遍历所有授权会话
# for authorization in authorizations.authorizations:
# if authorization.device_model == target_device_model and not authorization.current:
# # 踢出指定设备
# await client(ResetAuthorizationRequest(hash=authorization.hash))
# print(f"已踢出设备: {authorization.device_model} (授权ID: {authorization.hash})")
# else:
# print(f"未找到或跳过当前设备: {authorization.device_model} (授权ID: {authorization.hash})")
# # ---------------------------------------------------------------------------------------------
# try:
# await client.edit_2fa(current_password="Haxi@123456@", new_password="123456qwe")
# except NewSettingsInvalidError:
# print(1)
# except PasswordHashInvalidError:
# print(2)
# ---------------------------------------------------------------------------------------------------
# 使用邀请码启动bot
# 使用邀请码启动bot
# bot_name = "BlumCryptoBot"
# invite_code = "ref_H21tvNwu3n"
#
# result = await client(StartBotRequest(
# bot=bot_name,
# peer=bot_name,
# start_param=invite_code
# ))
#
#
# logger.info(result)
# -------------------------------------------------------------------------------------------------
# 获取bot实体
# 使用start_params请求WebApp
# bot_name = "BlumCryptoBot"
# invite_code = "ref_H21tvNwu3n"
#
# await client.get_input_entity(bot_name)
#
# app_info = await client(RequestAppWebViewRequest(
# 'me',
# InputBotAppShortName(await client.get_input_entity(bot_name), "app"),
# platform="web",
# start_param=invite_code if invite_code else None
# ))
#
# tgData = unquote(app_info.url.split('tgWebAppData=')[1].split('&tgWebAppVersion')[0])
# print(tgData)
# ------------------------------------------------------------------------------------------------
# a = await client(JoinChannelRequest('webseaml2'))
# print(a)
# messages = await client.get_messages('webseaml2', 100)
# for line in messages:
# print(line)
# # ✅ 用链接获取群实体
# group = await client.get_entity('https://t.me/webseaml2')
#
# # ✅ 获取目标用户实体(可以是用户名或 user_id
# user = await client.get_entity('dockerse')
#
# # ✅ 邀请(必须是群管理员)
# await client(InviteToChannelRequest(
# channel=group,
# users=[user]
# ))
# user = await client.get_entity('dockerse')
# print(user.id) # 打印用户 ID
# messages = await client.get_messages(bot, limit=1)
# # print(messages[0].Message.data.message)
#
# # if not messages:
# # print("未找到验证消息")
# # return
#
# # # 遍历消息,寻找包含按钮的验证消息
# for message in messages:
# print(message.message)
# # if message.buttons:
# # for row in message.buttons:
# # for button in row:
# # if 'now' in button.text.lower():
# # try:
# # await button.click()
# # print(f"已点击消息 {message.id} 的 Confirm 按钮")
# # except:
# # pass
# =============================================================================
# 拉人进群
# target_group = await client.get_entity("webseaml2")
# users_to_add = [
# await client.get_entity("pantakill"),
# # 可放更多用户
# ]
# await client(InviteToChannelRequest(target_group, users_to_add))
# # 2⃣ 发送消息
# update = await client(ImportChatInviteRequest("+E6E4i3svka81MGY1"))
# chat = update.chats[0]
# try:
# await client.send_message(chat.id, "你好,这是一条来自 Telethon 的测试消息!")
# print("消息已发送!")
# except Exception as e:
# print(f"发送失败: {e}")
# #+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# print("正在获取对话列表...")
# # 获取所有对话
# dialogs = await client.get_dialogs()
#
# # 退出所有群聊
# for dialog in dialogs:
# if dialog.is_group or dialog.is_channel:
# try:
# await client.delete_dialog(dialog.entity)
# print(f"已退出群聊/频道: {dialog.title}")
# except Exception as e:
# print(f"退出群聊/频道 {dialog.title} 时出错: {e}")
#
# # 取消关注所有机器人
# for dialog in dialogs:
# if hasattr(dialog.entity, 'bot') and dialog.entity.bot:
# try:
# await client.delete_dialog(dialog.entity)
# print(f"已取消关注机器人: {dialog.title}")
# except Exception as e:
# print(f"取消关注机器人 {dialog.title} 时出错: {e}")
# ============================================================================================
# 获取验证码
messages = await client.get_messages(777000, 1)
message_text = messages[0].text
2026-01-09 14:40:20 +08:00
print(message_text)
2025-12-01 17:22:53 +08:00
try:
code = re.findall(r'\d{6}', message_text)[0]
logger.info(f"当前验证码:{code}")
await client.disconnect()
return code
except:
logger.error("获取验证码失败!!!")
2026-01-05 08:55:13 +08:00
if __name__ == '__main__':
2026-01-09 14:40:20 +08:00
asyncio.run(main1111(phone="14424290892"))
2025-12-01 17:22:53 +08:00
2026-01-05 08:55:13 +08:00
# .\xray.exe -c .\1_3078_client_liu_http_ip_1v1.json