Files
ai_api_web/backend/routes/auth.py
2026-01-22 18:26:47 +08:00

110 lines
3.4 KiB
Python

from flask import Blueprint, request, jsonify
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity
from models import db
from models.user import User
import secrets
import string
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册"""
data = request.get_json()
username = data.get('username')
password = data.get('password')
confirm_password = data.get('confirm_password')
invite_code = data.get('invite_code')
# 验证输入
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
if len(password) < 8 or len(password) > 20:
return jsonify({'error': '密码长度必须在8-20位之间'}), 400
if password != confirm_password:
return jsonify({'error': '两次输入的密码不一致'}), 400
# 检查用户名是否已存在
if User.query.filter_by(username=username).first():
return jsonify({'error': '用户名已存在'}), 400
# 处理邀请码
inviter = None
if invite_code:
inviter = User.query.filter_by(invite_code=invite_code).first()
# 创建用户
user = User(username=username)
user.set_password(password)
# 生成邀请码
user.invite_code = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(6))
if inviter:
user.invited_by = inviter.id
try:
db.session.add(user)
db.session.commit()
# 生成token
access_token = create_access_token(identity=str(user.id))
refresh_token = create_refresh_token(identity=str(user.id))
return jsonify({
'message': '注册成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}), 201
except Exception as e:
db.session.rollback()
return jsonify({'error': '注册失败: ' + str(e)}), 500
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': '用户名或密码错误'}), 401
# 生成token
access_token = create_access_token(identity=str(user.id))
refresh_token = create_refresh_token(identity=str(user.id))
return jsonify({
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}), 200
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""刷新token"""
current_user_id = get_jwt_identity()
new_token = create_access_token(identity=str(current_user_id))
return jsonify({'access_token': new_token}), 200
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
"""获取当前用户信息"""
current_user_id = int(get_jwt_identity())
user = User.query.get(current_user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
return jsonify({'user': user.to_dict()}), 200