Files
tg_code/text/单独一个号码上号.py
Administrator 37e9d4038c gfregfregfr
2025-11-12 12:50:39 +08:00

141 lines
4.1 KiB
Python

import asyncio
from loguru import logger
from opentele.api import API
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError, PhoneCodeInvalidError, RPCError
from models.tg_phone_devices import TgPhoneDevices
async def get_device_info():
"""
获取设备信息
"""
NewApi = API.TelegramDesktop.Generate(system="windows")
return {
"api_id": NewApi.api_id,
"api_hash": NewApi.api_hash,
"device_model": NewApi.device_model,
"system_version": NewApi.system_version,
"app_version": NewApi.app_version,
"lang_code": NewApi.lang_code,
"system_lang_code": NewApi.system_lang_code
}
async def create_telegram_client(api_id, api_hash, phone_num, server_type):
"""
创建并配置 Telegram 客户端
"""
# gw.dataimpulse.com:824:14fa7e7aa8280a914611__cr.gb:f1c1501b6b535c19
# 定义代理
proxy = {
'proxy_type': 'socks5', # 或 'socks4',具体看你的代理类型
'addr': 'gw.dataimpulse.com', # 代理服务器地址
'port': 824, # 代理服务器端口
'username': '14fa7e7aa8280a914611__cr.gb', # 如果有用户名,填写
'password': 'f1c1501b6b535c19' # 如果有密码,填写
}
client = TelegramClient(
fr"C:\sessions\{phone_num}",
api_id=api_id,
api_hash=api_hash,
device_model=server_type.device_model,
system_version=server_type.system_version,
app_version=server_type.app_version,
system_lang_code=server_type.system_lang_code,
lang_code=server_type.lang_code,
# proxy=proxy
)
return client
async def login_telegram(client, phone_num):
"""
处理登录流程
"""
try:
sent_code_info = await client.send_code_request(phone_num)
code = input("请输入验证码:")
try:
await client.sign_in(phone=phone_num, code=code)
except PhoneCodeInvalidError:
logger.error("验证码无效,请重试")
return False
except SessionPasswordNeededError:
password = input("请输入密码:")
await client.sign_in(password=password)
me = await client.get_me()
logger.info(f'账号 {phone_num} -- {me.first_name} 登录成功!')
return True
except (RPCError, Exception) as e:
logger.error(f'登录失败,错误信息:{e}')
return False
finally:
await client.disconnect()
async def main():
# 读取设备信息和 API 参数
device_info = await get_device_info()
api_id = device_info["api_id"]
api_hash = device_info["api_hash"]
area_code = input("请输入区号:")
phone_number = input("请输入电话号码:")
server, server_type = TgPhoneDevices().get_or_create(
phone_number=phone_number,
)
server.area_code = area_code
server.api_id = api_id
server.api_hash = api_hash
# 这里我加了一行代码 by rachel 成功默认为1
server.is_valid_session = '1'
server.device_model = device_info["device_model"]
server.system_version = device_info["system_version"]
server.app_version = device_info["app_version"]
server.system_lang_code = device_info["system_lang_code"]
server.lang_code = device_info["lang_code"]
server.phone = area_code + phone_number
server.save()
# 创建 Telegram 客户端
phone = area_code + phone_number
client = await create_telegram_client(api_id, api_hash, phone, server)
await client.connect()
if await client.is_user_authorized():
me = await client.get_me()
logger.info(f'账号 {phone_number} -- {me.first_name} 登录成功!')
else:
# 登录 Telegram
if await login_telegram(client, phone):
logger.info("Telegram 客户端操作完成")
server.is_valid_session = 1
server.save()
else:
logger.error("Telegram 登录失败")
server.is_valid_session = 0
server.save()
if __name__ == '__main__':
asyncio.run(main())