Files
tg_code/接口/接口.py
Administrator 37e9d4038c gfregfregfr
2025-11-12 12:50:39 +08:00

925 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import random
import time
import logging
from logging.handlers import RotatingFileHandler
from loguru import logger
from urllib.parse import unquote
from flask import Flask, request, jsonify, Blueprint
from telethon import TelegramClient
from telethon.errors import PhoneNumberBannedError
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.types import InputBotAppShortName, InputPeerUser
from telethon.tl.functions.messages import RequestAppWebViewRequest, StartBotRequest, RequestWebViewRequest
from models.tg_phone_devices import TgPhoneDevices
# ================== 配置层 ==================
class Config:
"""应用配置类,包含所有全局配置参数"""
SECRET_KEY = 'your-secret-key' # Flask应用密钥
TG_SESSION_PATH = r"C:\sessions" # Telegram会话文件存储路径
PROXY_CONFIG = {
'proxy_type': 'http', # 或 'socks4',具体看你的代理类型
'addr': '192.168.50.220', # 代理服务器地址
'port': 10811, # 代理服务器端口
'username': '', # 如果有用户名,填写
'password': '' # 如果有密码,填写
}
# 日志配置
LOG_FILE = 'app1.log' # 日志文件路径
LOG_LEVEL = logging.INFO # 日志级别
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5 # 保留5个备份日志文件
# 配置日志系统
def setup_logging():
"""配置日志系统"""
# 创建日志目录(如果不存在)
import os
log_dir = os.path.dirname(Config.LOG_FILE)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置loguru日志
logger.add(
Config.LOG_FILE,
level=Config.LOG_LEVEL,
rotation=Config.LOG_MAX_BYTES,
retention=Config.LOG_BACKUP_COUNT,
encoding='utf-8',
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name} | {function} | {line} | {message}",
enqueue=True # 异步写入日志
)
# 配置标准logging模块(可选,如果需要与其他库集成)
logging.basicConfig(
level=Config.LOG_LEVEL,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler(
Config.LOG_FILE,
maxBytes=Config.LOG_MAX_BYTES,
backupCount=Config.LOG_BACKUP_COUNT
),
logging.StreamHandler() # 同时输出到控制台
]
)
logging.getLogger('telethon').setLevel(logging.WARNING) # 设置telethon日志级别为WARNING
# ================== 服务层 ==================
class TelegramService:
"""Telegram客户端服务类封装所有Telegram相关操作"""
def __init__(self, server_config):
"""初始化服务实例
Args:
server_config: 服务器配置对象
"""
self.server_config = server_config
self.client = None # Telegram客户端实例
logger.info(f"初始化TelegramService手机号: {self.server_config.phone}")
async def __aenter__(self):
"""异步上下文管理器入口,用于连接客户端"""
try:
logger.info(f"开始连接Telegram客户端手机号: {self.server_config.phone}")
self.client = await self._create_client() # 创建客户端实例
await self.client.connect() # 建立连接
logger.info(f"Telegram客户端连接成功手机号: {self.server_config.phone}")
return self
except Exception as e:
logger.error(f"电话号码:{self.server_config.phone},客户端连接失败: {str(e)}", exc_info=True)
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口,用于断开连接"""
try:
if self.client and self.client.is_connected():
logger.info(f"开始断开Telegram客户端连接手机号: {self.server_config.phone}")
await self.client.disconnect() # 断开连接
logger.info(f"Telegram客户端断开连接成功手机号: {self.server_config.phone}")
except Exception as e:
logger.error(f"客户端断开连接失败: {str(e)}", exc_info=True)
async def _create_client(self):
"""创建Telegram客户端实例
Returns:
TelegramClient: 配置好的Telegram客户端实例
"""
# 代理配置
proxy = {
'proxy_type': self.server_config.proxy_type, # 代理类型
'addr': self.server_config.addr, # 代理地址
'port': int(self.server_config.port), # 代理端口
'username': self.server_config.user if self.server_config.user else "", # 代理用户名
'password': self.server_config.pwd if self.server_config.pwd else "" # 代理密码
}
# 创建并返回Telegram客户端实例
client = TelegramClient(
session=fr"{Config.TG_SESSION_PATH}\{self.server_config.phone}", # 会话文件路径
api_id=self.server_config.api_id, # API ID
api_hash=self.server_config.api_hash, # API Hash
device_model=self.server_config.device_model, # 设备型号
system_version=self.server_config.system_version, # 系统版本
app_version=self.server_config.app_version, # 应用版本
system_lang_code=self.server_config.system_lang_code, # 系统语言代码
lang_code=self.server_config.lang_code, # 应用语言代码
proxy=proxy # 仅当配置了代理地址时使用代理
)
logger.info(f"创建Telegram客户端实例成功手机号: {self.server_config.phone}")
return client
@staticmethod
def extract_verification_code(text):
"""从文本中提取5位数字验证码
Args:
text: 包含验证码的文本
Returns:
str: 提取到的验证码如果提取失败返回None
"""
try:
logger.debug(f"尝试从文本中提取验证码,文本片段: {text[:50]}...") # 只记录前50个字符避免日志过大
matches = re.findall(r'\d{5}', text) # 使用单词边界匹配5位数字
code = matches[0] if matches else None
if code:
logger.debug(f"成功提取验证码: {code}")
else:
logger.debug("未在文本中找到5位数字验证码")
return code
except (IndexError, TypeError, AttributeError) as e:
logger.warning(f"验证码提取失败: {str(e)}", exc_info=True)
return None
async def process_verification(self):
"""处理验证流程,获取验证码
Returns:
str: 验证码如果获取失败返回None
"""
try:
logger.info(f"开始处理验证流程,手机号: {self.server_config.phone}")
messages = await self.client.get_messages(777000, limit=1) # 获取来自Telegram的最新消息
if messages and messages[0].text:
code = self.extract_verification_code(messages[0].text) # 提取验证码
if code:
logger.info(f"成功获取验证码: {code},手机号: {self.server_config.phone}")
return code
else:
logger.warning(f"从消息中未找到验证码,手机号: {self.server_config.phone}")
else:
logger.warning(f"未获取到最新消息,手机号: {self.server_config.phone}")
return None
except Exception as e:
logger.error(f"验证流程处理失败: {str(e)}", exc_info=True)
return None
async def click_confirmation(self):
"""点击确认按钮
Returns:
bool: 是否成功点击确认按钮
"""
try:
logger.info(f"开始点击确认按钮,手机号: {self.server_config.phone}")
messages = await self.client.get_messages(777000, limit=5) # 获取最多5条消息
for msg in messages:
if hasattr(msg, 'buttons') and msg.buttons: # 检查消息是否有按钮
for row in msg.buttons: # 遍历按钮行
for btn in row: # 遍历每个按钮
if hasattr(btn, 'text') and 'confirm' in btn.text.lower(): # 查找确认按钮
await btn.click() # 点击按钮
logger.info(f"成功点击确认按钮,手机号: {self.server_config.phone}")
return True
logger.warning(f"未找到确认按钮,手机号: {self.server_config.phone}")
return False
except PhoneNumberBannedError:
logger.warning(f"点击确认时发现手机号被封禁: {self.server_config.phone}")
return False
except Exception as e:
logger.error(f"点击确认失败: {str(e)}", exc_info=True)
return False
async def check_bot_name(self, bot_username):
"""检查是否已经打开过指定bot
Args:
bot_username: bot的用户名
Returns:
bool: 是否已经打开过该bot
"""
try:
logger.info(f"开始检查bot名称: @{bot_username},手机号: {self.server_config.phone}")
async for dialog in self.client.iter_dialogs():
try:
for i in dialog.entity.usernames:
if i.username == bot_username:
logger.info(f'已与机器人交互: @{bot_username},手机号: {self.server_config.phone}')
return True
except:
pass
if dialog.is_user and dialog.entity.username == bot_username:
logger.info(f'已与机器人交互: @{bot_username},手机号: {self.server_config.phone}')
return True
logger.info(f'尚未与机器人交互: @{bot_username},手机号: {self.server_config.phone}')
return False
except Exception as e:
logger.error(f"检查bot名称失败: {str(e)}", exc_info=True)
return False
async def join_bot(self, bot_name, invite_code=""):
"""加入指定的bot
Args:
bot_name: bot的用户名
invite_code: 邀请码(可选)
Returns:
bool: 是否成功加入bot
"""
try:
logger.info(f"开始加入bot: @{bot_name},手机号: {self.server_config.phone},邀请码: {invite_code}")
bot = await self.client.get_entity(bot_name) # 获取bot实体
if invite_code:
# 使用邀请码启动bot
await self.client(StartBotRequest(
bot=bot_name,
peer=bot_name,
start_param=invite_code
))
# else:
# 直接发送/start命令
# await self.client.send_message(bot, '/start')
logger.info(f"成功加入bot: @{bot_name},手机号: {self.server_config.phone}")
return True
except Exception as e:
logger.error(f"加入bot失败: {str(e)}", exc_info=True)
return False
async def get_bot_token(self, bot_name, invite_code=None, platform=None, start_params=None, url=None):
"""获取bot的webview token
Args:
bot_name: bot的用户名
invite_code: 邀请码(可选)
platform: 平台类型(可选)
start_params: 启动参数(可选)
url: 自定义URL(可选)
Returns:
str: 获取到的token
Raises:
Exception: 获取token失败时抛出异常
"""
platform = platform or random.choice(['web', 'tdesktop']) # 默认随机选择平台
try:
logger.info(f"开始获取bot token: @{bot_name},手机号: {self.server_config.phone},平台: {platform}")
if start_params:
me = await self.client.get_me()
logger.info(f'账号 {8617882004254} -- {me.first_name} 登录成功!')
# 使用AppWebView方式获取token
res = await self.client(RequestAppWebViewRequest(
'me',
InputBotAppShortName(await self.client.get_input_entity(bot_name), start_params),
platform="web",
start_param=invite_code if invite_code else None
))
else:
# 使用普通WebView方式获取token
bot = await self.client.get_entity(bot_name)
peer = InputPeerUser(bot.id, bot.access_hash)
res = await self.client(
RequestWebViewRequest(
peer=peer,
bot=bot,
platform=platform,
from_bot_menu=True,
url=url,
)
)
if not res or not res.url:
raise ValueError("从Telegram获取的响应或URL为空")
# 从URL中解析token
parts = res.url.split('tgWebAppData=')
if len(parts) < 2:
raise ValueError("URL格式无效 - 缺少tgWebAppData参数")
token_part = parts[1].split('&tgWebAppVersion')[0]
token = unquote(token_part) # URL解码
logger.info(
f"成功获取bot token: @{bot_name},手机号: {self.server_config.phone}token: {token[:10]}...") # 只记录token前10个字符
return token
except Exception as e:
logger.error(f"电话号码:{self.server_config.phone}Token生成失败: {str(e)}", exc_info=True)
raise
async def send_click_bot_message(self, bot_name, datas):
"""
datas = [
{"send_message": [], "click_button": [""], },
]
"""
bot = await self.client.get_entity(bot_name)
for data in datas:
if data['send_message']:
for message in data['send_message']:
await self.client.send_message(bot, message)
logger.info(f'电话号码:{self.server_config.phone},发送消息:{message}成功!!!')
time.sleep(1)
if data['click_button']:
for button_name in data["click_button"]:
messages = await self.client.get_messages(bot, limit=5)
if not messages:
logger.warning("未找到验证消息")
return
start = 0
# 遍历消息,寻找包含按钮的验证消息
for message in messages:
if message.buttons:
for row in message.buttons:
for button in row:
if button_name in button.text.lower():
try:
await button.click()
logger.info(f'电话号码:{self.server_config.phone},点击{button_name}成功!!!')
start = 1
except:
logger.warning(f'电话号码:{self.server_config.phone},点击{button_name}失败',
exc_info=True)
if start == 1:
break
if start == 1:
break
if start == 1:
break
async def get_bot_message(self, bot_name):
bot = await self.client.get_entity(bot_name)
messages = await self.client.get_messages(bot, limit=1)
# # 遍历消息,寻找包含按钮的验证消息
for message in messages:
logger.debug(f"获取到bot消息: {message.message[:100]}...") # 只记录消息前100个字符
return message.message
else:
logger.warning(f"未获取到bot消息手机号: {self.server_config.phone}bot: @{bot_name}")
return ""
async def join_channe(self, bot_name):
try:
logger.info(f"开始加入频道: @{bot_name},手机号: {self.server_config.phone}")
await self.client(JoinChannelRequest(bot_name))
logger.info(f"成功加入频道: @{bot_name},手机号: {self.server_config.phone}")
return True
except Exception as e:
logger.error(f"加入频道失败: {str(e)}", exc_info=True)
return False
# ================== 响应格式层 ==================
class ApiResponse:
"""统一API响应格式类"""
@staticmethod
def success(data=None, message="操作成功", code=200):
"""成功响应
Args:
data: 返回的数据
message: 成功消息
code: HTTP状态码
Returns:
tuple: (json响应, HTTP状态码)
"""
response = {
"code": code,
"message": message,
"data": data
}
logger.debug(f"API成功响应: {response}")
return jsonify(response), code
@staticmethod
def error(message="请求失败", code=500, errors=None, details=None):
"""错误响应
Args:
message: 错误消息
code: HTTP状态码
errors: 错误列表
details: 错误详情(调试用)
Returns:
tuple: (json响应, HTTP状态码)
"""
response = {
"code": code,
"message": message,
"errors": errors or [],
"details": details # 仅在调试模式下显示
}
logger.error(f"API错误响应: {response}")
return jsonify(response), code
# ================== 路由层 ==================
app = Flask(__name__)
api_bp = Blueprint('api', __name__, url_prefix='/api') # 创建API蓝图
# ================== 错误处理器 ==================
@app.errorhandler(404)
def handle_not_found(e):
"""404错误处理器"""
logger.error(f"404错误: {str(e)}")
return ApiResponse.error("资源未找到", code=404)
@app.errorhandler(405)
def handle_method_not_allowed(e):
"""405错误处理器"""
logger.error(f"405错误: {str(e)}")
return ApiResponse.error("方法不允许", code=405)
@app.errorhandler(400)
def handle_bad_request(e):
"""400错误处理器特别处理JSON解析错误"""
if "Failed to decode JSON object" in str(e):
logger.error(f"400错误: 无效的JSON格式 - 属性名必须使用双引号")
return ApiResponse.error(
"无效的JSON格式 - 属性名必须使用双引号",
code=400
)
logger.error(f"400错误: {str(e)}")
return ApiResponse.error(str(e), code=400)
@app.errorhandler(Exception)
def handle_general_exception(e):
"""全局异常处理器"""
logger.error(f"未处理的异常: {str(e)}", exc_info=True)
return ApiResponse.error(
"服务器内部错误",
code=500,
details=str(e) if app.debug else None
)
# ================== 业务逻辑层 ==================
# @api_bp.route('/check_phone', methods=['GET'])
# async def check_phone():
# """检查手机号状态接口
# 返回:
# JSON响应: 包含验证码或错误信息
# """
# phone_num = request.args.get('phone')
# if not phone_num or not phone_num.strip():
# logger.error("必须提供手机号参数")
# return ApiResponse.error("必须提供手机号参数", code=400)
#
# try:
# device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone_num)
# if not device:
# logger.error(f"设备未注册: {phone_num}")
# return ApiResponse.error("设备未注册", code=404)
#
# async with TelegramService(device) as service:
# code = await service.process_verification()
# if code:
# logger.info(f"验证码获取成功: {phone_num}, code: {code}")
# return ApiResponse.success(
# {"verification_code": code},
# "验证码获取成功"
# )
# return ApiResponse.error("验证码解析失败", code=422)
# except PhoneNumberBannedError:
# logger.error(f"被封禁的账号: {phone_num}")
# return ApiResponse.error("账号已被封禁", code=403)
# except Exception as e:
# logger.error(f"服务异常: {str(e)}")
# return ApiResponse.error(
# "服务器内部错误",
# code=500,
# details=str(e) if app.debug else None
# )
@api_bp.route('/click_confirm', methods=['GET'])
async def click_confirm():
"""点击确认按钮接口
返回:
JSON响应: 操作结果或错误信息
"""
start = None
token = request.args.get('token')
for i in tokens:
if i["token"] == token:
start = i
if start:
logger.info(f"{start['name']},访问{request.args.get('phone')}")
else:
logger.error(f"token不正确: {token}")
return ApiResponse.error(message="token不正确!!!")
phone_num = request.args.get('phone')
if not phone_num or not phone_num.strip():
logger.error("必须提供手机号参数")
return ApiResponse.error("必须提供手机号参数", code=400)
try:
device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == phone_num)
if not device:
logger.error(f"设备未注册: {phone_num}")
return ApiResponse.error("设备未注册", code=404)
async with TelegramService(device) as service:
result = await service.click_confirmation()
if result:
logger.info(f"确认操作成功: {phone_num}")
return ApiResponse.success(None, "确认操作成功")
logger.warning(f"未找到确认按钮: {phone_num}")
return ApiResponse.error("未找到确认按钮", code=404)
except PhoneNumberBannedError:
logger.error(f"账号已被封禁: {phone_num}")
return ApiResponse.error("账号已被封禁", code=403)
except Exception as e:
logger.error(f"确认异常: {str(e)}", exc_info=True)
return ApiResponse.error(
"服务器内部错误",
code=500,
details=str(e) if app.debug else None
)
# @api_bp.route('/get_token', methods=['POST'])
# async def get_token():
# """获取bot token接口
# 返回:
# JSON响应: 包含token或错误信息
# """
# # 检查请求是否为JSON格式
# if not request.is_json:
# logger.error("请求必须是JSON格式")
# return ApiResponse.error("请求必须是JSON格式", code=400)
#
# try:
# params = request.get_json() # 解析JSON参数
# if not params:
# logger.error("空的JSON负载")
# return ApiResponse.error("空的JSON负载", code=400)
# except Exception as e:
# logger.error(f"JSON解析失败: {str(e)}")
# return ApiResponse.error("无效的JSON格式", code=400)
#
# # 验证必需参数
# missing = [p for p in ['phone', 'bot_name'] if not params.get(p)]
# if missing:
# logger.error(f"缺少必需参数: {', '.join(missing)}")
# return ApiResponse.error(
# f"缺少必需参数: {', '.join(missing)}",
# code=400
# )
#
# device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone'])
# if not device:
# logger.error(f"设备未注册: {params['phone']}")
# return ApiResponse.error("设备未注册", code=404)
# elif device.is_valid_session != 1:
# logger.error(f"账号有问题: {params['phone']}")
# return ApiResponse.error("账号有问题", code=505)
#
# try:
# async with TelegramService(device) as service:
#
# if not await service.client.is_user_authorized():
# device.is_valid_session = 0
# device.save(only=[TgPhoneDevices.is_valid_session])
#
# logger.error("授权失败!!!")
# return ApiResponse.error("授权失败!!!", code=500)
#
# # 检查并加入bot
# if not await service.check_bot_name(bot_username=params["bot_name"]):
# success = await service.join_bot(
# bot_name=params["bot_name"],
# invite_code=params.get("invite_code", "")
# )
# if not success:
# logger.error("加入bot失败")
# return ApiResponse.error("加入bot失败", code=500)
#
# # 获取token
# token = await service.get_bot_token(
# bot_name=params['bot_name'],
# invite_code=params.get('invite_code'),
# platform=params.get('platform'),
# start_params=params.get('start_params'),
# url=params.get('url')
# )
# logger.info(f"Token获取成功: {params['phone']}, bot: {params['bot_name']}")
# return ApiResponse.success(token, "Token获取成功")
# except PhoneNumberBannedError:
# device.is_valid_session = -1
# device.save(only=[TgPhoneDevices.is_valid_session])
#
# logger.error(f"账号已被封禁: {params['phone']}")
# return ApiResponse.error("账号已被封禁", code=403)
# except Exception as e:
# logger.error(f"Token生成失败: {str(e)}", exc_info=True)
# return ApiResponse.error(
# "Token生成失败",
# code=500,
# details=str(e) if app.debug else None
# )
@api_bp.route('/send_click', methods=['POST', 'GET'])
async def send_click():
"""
与bot交互点击或发生消息
datas = [
{"message": [], "click_button":[""],}
]
"""
start = None
token = request.args.get('token')
for i in tokens:
if i["token"] == token:
start = i
if start:
logger.info(f"{start['name']},访问{request.args.get('phone')}")
else:
logger.error(f"token不正确: {token}")
return ApiResponse.error(message="token不正确!!!")
try:
params = request.get_json() # 解析JSON参数
if not params:
logger.error("空的JSON负载")
return ApiResponse.error("空的JSON负载", code=400)
except Exception as e:
logger.error(f"JSON解析失败: {str(e)}", exc_info=True)
return ApiResponse.error("无效的JSON格式", code=400)
# 验证必需参数
missing = [p for p in ['phone', 'bot_name'] if not params.get(p)]
if missing:
logger.error(f"缺少必需参数: {', '.join(missing)}")
return ApiResponse.error(
f"缺少必需参数: {', '.join(missing)}",
code=400
)
device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone'])
if not device:
logger.error(f"设备未注册: {params['phone']}")
return ApiResponse.error("设备未注册", code=404)
try:
async with TelegramService(device) as service:
if not await service.client.is_user_authorized():
device.is_valid_session = 0
device.save(only=[TgPhoneDevices.is_valid_session])
logger.error("授权失败!!!")
return ApiResponse.error("授权失败!!!", code=500)
# 检查并加入bot
if not await service.check_bot_name(bot_username=params["bot_name"]):
success = await service.join_bot(
bot_name=params["bot_name"],
invite_code=params.get("invite_code", "")
)
if not success:
logger.error("加入bot失败")
return ApiResponse.error("加入bot失败", code=500)
# 操作session对象
token = await service.send_click_bot_message(
bot_name=params['bot_name'],
datas=params["datas"]
)
logger.info(f"点击和发送消息成功: {params['phone']}, bot: {params['bot_name']}")
return ApiResponse.success(token, "点击和发生成功!!!")
except PhoneNumberBannedError:
device.is_valid_session = -1
device.save(only=[TgPhoneDevices.is_valid_session])
logger.error(f"账号已被封禁: {params['phone']}")
return ApiResponse.error("账号已被封禁", code=403)
except Exception as e:
logger.error(f"Token生成失败: {str(e)}", exc_info=True)
return ApiResponse.error(
"Token生成失败",
code=500,
details=str(e) if app.debug else None
)
@api_bp.route('/get_bot_message', methods=['POST', 'GET'])
async def get_bot_message():
start = None
token = request.args.get('token')
for i in tokens:
if i["token"] == token:
start = i
if start:
logger.info(f"{start['name']},访问{request.args.get('phone')}")
else:
logger.error(f"token不正确: {token}")
return ApiResponse.error(message="token不正确!!!")
try:
params = request.get_json() # 解析JSON参数
if not params:
logger.error("空的JSON负载")
return ApiResponse.error("空的JSON负载", code=400)
except Exception as e:
logger.error(f"JSON解析失败: {str(e)}", exc_info=True)
return ApiResponse.error("无效的JSON格式", code=400)
# 验证必需参数
missing = [p for p in ['phone', 'bot_name'] if not params.get(p)]
if missing:
logger.error(f"缺少必需参数: {', '.join(missing)}")
return ApiResponse.error(
f"缺少必需参数: {', '.join(missing)}",
code=400
)
device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone'])
if not device:
logger.error(f"设备未注册: {params['phone']}")
return ApiResponse.error("设备未注册", code=404)
elif device.is_valid_session != 1:
logger.error(f"账号有问题: {params['phone']}")
return ApiResponse.error("账号有问题", code=505)
try:
async with TelegramService(device) as service:
if not await service.client.is_user_authorized():
device.is_valid_session = 0
device.save(only=[TgPhoneDevices.is_valid_session])
logger.error("授权失败!!!")
return ApiResponse.error("授权失败!!!", code=500)
# 检查并加入bot
if not await service.check_bot_name(bot_username=params["bot_name"]):
success = await service.join_bot(
bot_name=params["bot_name"],
invite_code=params.get("invite_code", "")
)
if not success:
logger.error("加入bot失败")
return ApiResponse.error("加入bot失败", code=500)
# 操作session对象
data = await service.get_bot_message(
bot_name=params['bot_name'],
)
logger.info(f"获取bot消息成功: {params['phone']}, bot: {params['bot_name']}")
return ApiResponse.success(data, "获取token成功")
except PhoneNumberBannedError:
device.is_valid_session = -1
device.save(only=[TgPhoneDevices.is_valid_session])
logger.error(f"账号已被封禁: {params['phone']}")
return ApiResponse.error("账号已被封禁", code=403)
except Exception as e:
logger.error(f"Token生成失败: {str(e)}", exc_info=True)
return ApiResponse.error(
"Token生成失败",
code=500,
details=str(e) if app.debug else None
)
# @api_bp.route('/join_channe', methods=['POST'])
# async def join_channe():
# try:
# params = request.get_json() # 解析JSON参数
# if not params:
# logger.error("空的JSON负载")
# return ApiResponse.error("空的JSON负载", code=400)
# except Exception as e:
# logger.error(f"JSON解析失败: {str(e)}", exc_info=True)
# return ApiResponse.error("无效的JSON格式", code=400)
#
# # 验证必需参数
# missing = [p for p in ['phone', 'bot_name'] if not params.get(p)]
# if missing:
# logger.error(f"缺少必需参数: {', '.join(missing)}")
# return ApiResponse.error(
# f"缺少必需参数: {', '.join(missing)}",
# code=400
# )
#
# device = TgPhoneDevices.get_or_none(TgPhoneDevices.phone == params['phone'])
# if not device:
# logger.error(f"设备未注册: {params['phone']}")
# return ApiResponse.error("设备未注册", code=404)
# elif device.is_valid_session != 1:
# logger.error(f"账号有问题: {params['phone']}")
# return ApiResponse.error("账号有问题", code=505)
#
# try:
# async with TelegramService(device) as service:
#
# if not await service.client.is_user_authorized():
# device.is_valid_session = 0
# device.save(only=[TgPhoneDevices.is_valid_session])
#
# logger.error("授权失败!!!")
# return ApiResponse.error("授权失败!!!", code=500)
#
# # 操作session对象
# if await service.join_channe(
# bot_name=params['bot_name'],
# ):
# logger.info(f"加入群成功: {params['phone']}, bot: {params['bot_name']}")
# return ApiResponse.success("加入群成功!!!")
# else:
# logger.error("加入群失败: {params['phone']}, bot: {params['bot_name']}")
# return ApiResponse.error("加入群失败!!!")
# except PhoneNumberBannedError:
# device.is_valid_session = -1
# device.save(only=[TgPhoneDevices.is_valid_session])
#
# logger.error(f"账号已被封禁: {params['phone']}")
# return ApiResponse.error("账号已被封禁", code=403)
# except Exception as e:
# logger.error(f"访问失败: {str(e)}", exc_info=True)
# return ApiResponse.error(
# "访问失败!!!",
# code=500,
# details=str(e) if app.debug else None
# )
# ================== 应用初始化 ==================
def initialize_app():
"""初始化Flask应用"""
setup_logging() # 初始化日志系统
app.config.from_object(Config) # 加载配置
app.register_blueprint(api_bp) # 注册蓝图
# 配置JSON处理
app.config['JSON_AS_ASCII'] = False # 支持Unicode
app.config['JSON_SORT_KEYS'] = False # 保持字典顺序
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True # 调试模式下美化输出
if __name__ == '__main__':
tokens = [
{
"name": "尹华城",
"token": "fegergauiernguie"
},
{
"name": "王春",
"token": "hbrethrwsthrth"
},
{
"name": "谢瑞庭",
"token": "hrehrherhrherthreth"
},
{
"name": "张哥",
"token": "jytkjyjdykjdkj"
}
]
initialize_app() # 初始化应用
app.run(
host='0.0.0.0', # 监听所有网络接口
port=9001, # 使用9000端口
debug=True, # 调试模式
threaded=True # 使用多线程处理请求
)