mxivideo/python_core/services/auth_service.py

264 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
用户认证服务
"""
import re
from typing import Optional, Dict, Any
from datetime import datetime
from python_core.models.user import (
User, LoginRequest, RegisterRequest, AuthResponse,
hash_password, verify_password
)
from python_core.services.user_storage import user_storage
from python_core.utils.jwt_auth import generate_access_token, verify_access_token
from python_core.utils.logger import logger
class AuthService:
"""用户认证服务"""
def __init__(self):
self.user_storage = user_storage
logger.info("AuthService initialized")
def register(self, request: RegisterRequest) -> AuthResponse:
"""
用户注册
Args:
request: 注册请求数据
Returns:
AuthResponse: 注册响应
"""
try:
# 验证输入数据
validation_error = self._validate_register_request(request)
if validation_error:
return AuthResponse(
success=False,
message=validation_error
)
# 密码哈希
password_hash = hash_password(request.password)
# 创建用户
user = self.user_storage.create_user(
username=request.username,
email=request.email,
password_hash=password_hash,
display_name=request.display_name
)
# 生成JWT token
token_info = generate_access_token(
user_id=user.id,
username=user.username,
email=user.email
)
logger.info(f"User registered successfully: {user.username}")
return AuthResponse(
success=True,
message="注册成功",
user=user.to_safe_dict(),
token=token_info["token"],
expires_at=token_info["expires_at"]
)
except ValueError as e:
logger.warning(f"Registration failed: {e}")
return AuthResponse(
success=False,
message=str(e)
)
except Exception as e:
logger.error(f"Registration error: {e}")
return AuthResponse(
success=False,
message="注册失败,请稍后重试"
)
def login(self, request: LoginRequest) -> AuthResponse:
"""
用户登录
Args:
request: 登录请求数据
Returns:
AuthResponse: 登录响应
"""
try:
# 验证输入数据
validation_error = self._validate_login_request(request)
if validation_error:
return AuthResponse(
success=False,
message=validation_error
)
# 查找用户
user = self.user_storage.get_user_by_username_or_email(request.username_or_email)
if not user:
return AuthResponse(
success=False,
message="用户名或密码错误"
)
# 检查用户状态
if not user.is_active:
return AuthResponse(
success=False,
message="账户已被禁用"
)
# 验证密码
if not verify_password(request.password, user.password_hash):
return AuthResponse(
success=False,
message="用户名或密码错误"
)
# 更新最后登录时间
user.update_last_login()
self.user_storage.update_user(user)
# 生成JWT token
token_info = generate_access_token(
user_id=user.id,
username=user.username,
email=user.email
)
logger.info(f"User logged in successfully: {user.username}")
return AuthResponse(
success=True,
message="登录成功",
user=user.to_safe_dict(),
token=token_info["token"],
expires_at=token_info["expires_at"]
)
except Exception as e:
logger.error(f"Login error: {e}")
return AuthResponse(
success=False,
message="登录失败,请稍后重试"
)
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
验证JWT token
Args:
token: JWT token字符串
Returns:
Dict: 用户信息验证失败返回None
"""
try:
# 验证token
payload = verify_access_token(token)
if not payload:
return None
# 获取用户信息
user = self.user_storage.get_user_by_id(payload["user_id"])
if not user or not user.is_active:
return None
return {
"user_id": user.id,
"username": user.username,
"email": user.email,
"display_name": user.display_name,
"avatar_url": user.avatar_url,
"last_login": user.last_login
}
except Exception as e:
logger.error(f"Token verification error: {e}")
return None
def get_current_user(self, token: str) -> Optional[User]:
"""
根据token获取当前用户
Args:
token: JWT token字符串
Returns:
User: 用户对象失败返回None
"""
try:
payload = verify_access_token(token)
if not payload:
return None
user = self.user_storage.get_user_by_id(payload["user_id"])
if not user or not user.is_active:
return None
return user
except Exception as e:
logger.error(f"Get current user error: {e}")
return None
def _validate_register_request(self, request: RegisterRequest) -> Optional[str]:
"""验证注册请求数据"""
# 验证用户名
if not request.username or len(request.username.strip()) < 3:
return "用户名至少需要3个字符"
if len(request.username) > 50:
return "用户名不能超过50个字符"
if not re.match(r'^[a-zA-Z0-9_]+$', request.username):
return "用户名只能包含字母、数字和下划线"
# 验证邮箱
if not request.email:
return "邮箱地址不能为空"
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, request.email):
return "邮箱地址格式不正确"
# 验证密码
if not request.password:
return "密码不能为空"
if len(request.password) < 6:
return "密码至少需要6个字符"
if len(request.password) > 100:
return "密码不能超过100个字符"
# 验证显示名称
if request.display_name and len(request.display_name) > 100:
return "显示名称不能超过100个字符"
return None
def _validate_login_request(self, request: LoginRequest) -> Optional[str]:
"""验证登录请求数据"""
if not request.username_or_email:
return "用户名或邮箱不能为空"
if not request.password:
return "密码不能为空"
return None
# 创建全局认证服务实例
auth_service = AuthService()