Files
tg_code/模拟器生成session.py
Administrator 37e9d4038c gfregfregfr
2025-11-12 12:50:39 +08:00

244 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import socket
import asyncio
from concurrent.futures import ThreadPoolExecutor
import requests
from loguru import logger
from opentele.api import API
from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError, PhoneCodeInvalidError, RPCError, PhoneNumberBannedError
from job_models.tg_models import TelegramAccount
from tootls import split_phone_number, get_code
from models.tg_phone_devices import TgPhoneDevices
def get_host_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = None
try:
s.connect(('8.8.8.8', 80)) # 114.114.114.114也是dns地址
ip = s.getsockname()[0]
finally:
s.close()
return ip
async def get_device_info():
"""
获取设备信息
"""
NewApi = API.TelegramDesktop.Generate(system="windows")
return {
"api_id": NewApi.api_id,
"api_hash": NewApi.api_hash,
"device_model": NewApi.device_model,
"system_version": NewApi.system_version,
"app_version": NewApi.app_version,
"lang_code": NewApi.lang_code,
"system_lang_code": NewApi.system_lang_code
}
async def create_telegram_client(server):
"""
创建并配置 Telegram 客户端
"""
client = TelegramClient(
fr"C:\sessions\{server.phone}",
api_id=server.api_id,
api_hash=server.api_hash,
device_model=server.device_model,
system_version=server.system_version,
app_version=server.app_version,
system_lang_code=server.system_lang_code,
lang_code=server.lang_code,
)
return client
async def login_telegram(client, tg_phone_data, server):
"""
处理登录流程
"""
try:
sent_code_info = await client.send_code_request(server.phone)
logger.info(f"账号:{server.phone},发送验证码成功》》》")
time.sleep(5)
res = requests.get(
url=f"http://{server_ip}:8080/get_code?ld_name={tg_phone_data.ld_name}",
)
if res.status_code == 200:
logger.info(f"账号:{server.phone},获取模拟器验证码成功》》》")
try:
await client.sign_in(phone=server.phone, code=res.json()["codes"])
logger.info(f"账号:{server.phone},输入验证码成功》》》")
except PhoneCodeInvalidError:
logger.error(f"账号 {server.phone},验证码无效,请重试")
return 3
except SessionPasswordNeededError:
password = "jiedata123"
if server.to_code:
password = server.to_code
await client.sign_in(password=password)
logger.info(f"账号:{server.phone}输入2fa成功》》》")
server.code = password
server.save()
return 1
except PhoneNumberBannedError:
logger.error(f"账号 {server.phone},已被封禁!!!")
return 4
except (RPCError, Exception) as e:
logger.error(f'账号:{server.phone},登录失败,错误信息:{e}')
return 0
async def handle_login_failure(server):
"""
处理登录失败的情况
"""
time.sleep(5)
session_file = fr"C:\sessions\{server.phone}.session"
if os.path.exists(session_file):
os.remove(session_file)
print(f"文件 {session_file} 已删除")
# 更新服务器信息
server.area_code = server.area_code
server.api_id = server.api_id
server.api_hash = server.api_hash
server.device_model = server.device_model
server.system_version = server.system_version
server.app_version = server.app_version
server.system_lang_code = server.system_lang_code
server.lang_code = server.lang_code
server.phone = server.phone
server.save()
# 重新创建客户端并尝试登录
client = await create_telegram_client(server)
login_start = await login_telegram(client, tg_phone_data, server)
if login_start == 1:
me = await client.get_me()
logger.info(f'账号 {server.phone} -- {me.first_name} 登录成功!')
server.is_valid_session = "1"
server.save()
elif login_start == 0:
logger.error("Telegram 登录失败")
server.is_valid_session = "0"
server.save()
async def main(_, tg_phone_data):
device_info = await get_device_info()
area_code = tg_phone_data.one_phone_number
phone_number = tg_phone_data.telephone
server, created = TgPhoneDevices().get_or_create(
phone=area_code + phone_number,
)
if server.is_valid_session:
logger.info(f"{_},账号 {server.phone},登录成功!!!")
return
if server.is_valid_session is None:
server.area_code = area_code
server.phone_number = phone_number
server.api_id = device_info["api_id"]
server.api_hash = device_info["api_hash"]
server.device_model = device_info["device_model"]
server.system_version = device_info["system_version"]
server.app_version = device_info["app_version"]
server.system_lang_code = device_info["system_lang_code"]
server.lang_code = device_info["lang_code"]
server.phone = area_code + phone_number
server.save()
logger.info(f"{_},账号:{server.phone},开始登录。。。")
res = requests.get(
url=f"http://{server_ip}:8080/start?ld_name={tg_phone_data.ld_name}",
timeout=500
)
if res.status_code != 200:
logger.info(f"{_},账号:{server.phone},启动模拟器失败。。。")
return
logger.info(f"{_},账号:{server.phone},启动模拟器成功。。。")
client = await create_telegram_client(server)
await client.connect()
if await client.is_user_authorized():
me = await client.get_me()
logger.info(f'{_},账号:{server.phone} -- {me.first_name} 登录成功!')
await client.disconnect()
return
login_start = await login_telegram(client, tg_phone_data, server)
if login_start == 1:
me = await client.get_me()
logger.info(f'{_},账号 {server.phone} -- {me.first_name} 登录成功!')
server.is_valid_session = 1
server.server_ip = server_ip
server.save()
elif login_start == 0:
logger.error(f"{_},账号 {server.phone}Telegram 登录失败")
server.is_valid_session = 0
server.save()
elif login_start == 3:
await client.disconnect()
await handle_login_failure(server)
elif login_start == 4:
server.is_valid_session = -1
server.save()
await client.disconnect()
res = requests.get(
url=f"http://{server_ip}:8080/close?ld_name={tg_phone_data.ld_name}",
)
if res.status_code == 200:
logger.info(f"{_},账号 {server.phone},关闭模拟器成功!!!")
def main1(_, tg_phone_data):
asyncio.run(main(_, tg_phone_data))
if __name__ == '__main__':
server_ip = "192.168.50.97"
tg_phone_datas = TelegramAccount().select().where(
TelegramAccount.server_id == server_ip,
TelegramAccount.is_logged_in_telegram == 1,
)
# 同时运行
max_threads = 3
delay_between_start = 15 # 每次启动线程之间的延迟时间(秒)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
for _, tg_phone_data in enumerate(tg_phone_datas):
executor.submit(main1(_, tg_phone_data))
# time.sleep(delay_between_start)