Files
tg_code/text/获取验证码.py
Administrator 37e9d4038c gfregfregfr
2025-11-12 12:50:39 +08:00

126 lines
3.6 KiB
Python

import asyncio
import re
from loguru import logger
from opentele.api import API
from telethon.errors import SessionPasswordNeededError, PhoneCodeInvalidError, RPCError
from telethon import TelegramClient
from models.tg_phone_devices import TgPhoneDevices
async def get_device_info():
"""
获取设备信息
"""
PROXY_CONFIG = {
'proxy_type': 'http', # 或 'socks4',具体看你的代理类型
'addr': '192.168.50.220', # 代理服务器地址
'port': 10809, # 代理服务器端口
'username': '', # 如果有用户名,填写
'password': '' # 如果有密码,填写
}
NewApi = API.TelegramDesktop.Generate(system="macos")
return {
"api_id": NewApi.api_id,
"api_hash": NewApi.api_hash,
"device_model": NewApi.device_model,
"system_version": NewApi.system_version,
"app_version": NewApi.app_version,
"lang_code": NewApi.lang_code,
"system_lang_code": NewApi.system_lang_code
}
async def create_telegram_client(server_type):
"""
创建并配置 Telegram 客户端
"""
# gw.dataimpulse.com:824:14fa7e7aa8280a914611__cr.gb:f1c1501b6b535c19
# 定义代理
proxy = {
'proxy_type': server_type.proxy_type, # 或 'socks4',具体看你的代理类型
'addr': server_type.addr, # 代理服务器地址
'port': int(server_type.port), # 代理服务器端口
'username': server_type.user if server_type.user else "", # 如果有用户名,填写
'password': server_type.pwd if server_type.pwd else "" # 如果有密码,填写
}
client = TelegramClient(
fr"C:\sessions\{server_type.phone}",
api_id=server_type.api_id,
api_hash=server_type.api_hash,
device_model=server_type.device_model,
system_version=server_type.system_version,
app_version=server_type.app_version,
system_lang_code=server_type.system_lang_code,
lang_code=server_type.lang_code,
proxy=proxy
)
return client
async def login_telegram(client, phone_num):
"""
处理登录流程
"""
try:
sent_code_info = await client.send_code_request(phone_num)
code = input("请输入验证码:")
try:
await client.sign_in(phone=phone_num, code=code)
except PhoneCodeInvalidError:
logger.error("验证码无效,请重试")
return False
except SessionPasswordNeededError:
password = input("请输入密码:")
await client.sign_in(password=password)
me = await client.get_me()
logger.info(f'账号 {phone_num} -- {me.first_name} 登录成功!')
return True
except (RPCError, Exception) as e:
logger.error(f'登录失败,错误信息:{e}')
return False
finally:
await client.disconnect()
async def main():
phone_num = input("请输入电话号码:")
server = TgPhoneDevices().get_or_none(
TgPhoneDevices.phone == phone_num,
)
# 创建 Telegram 客户端
client = await create_telegram_client(server)
try:
await client.connect()
messages = await client.get_messages(777000, 1)
message_text = messages[0].text
print(message_text)
try:
code = re.findall(r'\d{5}', message_text)[0]
logger.info(f"当前验证码:{code}")
except:
logger.error("获取验证码失败!!!")
except:
logger.error("账号已死!!!")
finally:
await client.disconnect()
if __name__ == '__main__':
asyncio.run(main())