import asyncio import threading import time from telethon import TelegramClient lock = threading.Lock() async def to_do_tg(message_content): # ========== 配置区 ========== API_ID = 2040 # 替换成你的 API ID API_HASH = "b18441a1ff607e10a989891a5462e627" # 替换成你的 API HASH SESSION_FILE = "../telegram/8619211027341" # 登录会话保存文件 # ============================ PROXY = { 'proxy_type': "socks5", 'addr': "199.168.137.123", 'port': 12345, 'username': "haha", 'password': "haha" } try: client = TelegramClient(SESSION_FILE, API_ID, API_HASH, proxy=PROXY) await client.start() # 登录,如果第一次会要求输入手机号和验证码 bot = await client.get_entity("ergggreef") await client.send_message(bot, message_content) time.sleep(1) await client.disconnect() time.sleep(5) return True except: return False def send_dingtalk_message(message_content): with lock: result = asyncio.run(to_do_tg(message_content)) return result