This commit is contained in:
root 2025-07-12 20:15:52 +08:00
parent 6bb7b49091
commit b846aca852
1 changed files with 107 additions and 77 deletions

View File

@ -1,34 +1,62 @@
"""
用户认证服务
基于UserTable实现的用户认证功能
"""
import re
from typing import Optional, Dict, Any
from datetime import datetime
from dataclasses import dataclass
from python_core.models.user import (
User, LoginRequest, RegisterRequest, AuthResponse,
hash_password, verify_password
)
from python_core.services.user_storage import user_storage
from python_core.database.user import user_table
from python_core.utils.jwt_auth import generate_access_token, verify_access_token
from python_core.utils.logger import logger
from python_core.utils.logger import setup_logger
logger = setup_logger(__name__)
@dataclass
class LoginRequest:
"""登录请求数据"""
username_or_email: str
password: str
@dataclass
class RegisterRequest:
"""注册请求数据"""
username: str
email: str
password: str
display_name: str = None
@dataclass
class AuthResponse:
"""认证响应数据"""
success: bool
message: str = ""
user: Dict[str, Any] = None
token: str = None
expires_at: str = None
class AuthService:
"""用户认证服务"""
def __init__(self):
self.user_storage = user_storage
self.user_table = user_table
logger.info("AuthService initialized")
def register(self, request: RegisterRequest) -> AuthResponse:
"""
用户注册
Args:
request: 注册请求数据
Returns:
AuthResponse: 注册响应
"""
@ -40,35 +68,43 @@ class AuthService:
success=False,
message=validation_error
)
# 密码哈希
password_hash = hash_password(request.password)
# 创建用户
user = self.user_storage.create_user(
user_id = self.user_table.create_user(
username=request.username,
email=request.email,
password_hash=password_hash,
password=request.password,
display_name=request.display_name
)
# 获取创建的用户信息
user = self.user_table.get_user_by_id(user_id)
if not user:
return AuthResponse(
success=False,
message="用户创建失败"
)
# 生成JWT token
token_info = generate_access_token(
user_id=user.id,
username=user.username,
email=user.email
user_id=user["id"],
username=user["username"],
email=user["email"]
)
logger.info(f"User registered successfully: {user.username}")
logger.info(f"User registered successfully: {user['username']}")
# 准备安全的用户信息(不包含密码哈希)
safe_user = {k: v for k, v in user.items() if k != "password_hash"}
return AuthResponse(
success=True,
message="注册成功",
user=user.to_safe_dict(),
user=safe_user,
token=token_info["token"],
expires_at=token_info["expires_at"]
)
except ValueError as e:
logger.warning(f"Registration failed: {e}")
return AuthResponse(
@ -85,10 +121,10 @@ class AuthService:
def login(self, request: LoginRequest) -> AuthResponse:
"""
用户登录
Args:
request: 登录请求数据
Returns:
AuthResponse: 登录响应
"""
@ -100,50 +136,42 @@ class AuthService:
success=False,
message=validation_error
)
# 查找用户
user = self.user_storage.get_user_by_username_or_email(request.username_or_email)
# 使用UserTable的认证方法
user = self.user_table.authenticate_user(request.username_or_email, request.password)
if not user:
return AuthResponse(
success=False,
message="用户名或密码错误"
)
# 检查用户状态
if not user.is_active:
if not user.get("is_active", True):
return AuthResponse(
success=False,
message="账户已被禁用"
)
# 验证密码
if not verify_password(request.password, user.password_hash):
return AuthResponse(
success=False,
message="用户名或密码错误"
)
# 更新最后登录时间
user.update_last_login()
self.user_storage.update_user(user)
# 生成JWT token
token_info = generate_access_token(
user_id=user.id,
username=user.username,
email=user.email
user_id=user["id"],
username=user["username"],
email=user["email"]
)
logger.info(f"User logged in successfully: {user.username}")
logger.info(f"User logged in successfully: {user['username']}")
# 准备安全的用户信息(不包含密码哈希)
safe_user = {k: v for k, v in user.items() if k != "password_hash"}
return AuthResponse(
success=True,
message="登录成功",
user=user.to_safe_dict(),
user=safe_user,
token=token_info["token"],
expires_at=token_info["expires_at"]
)
except Exception as e:
logger.error(f"Login error: {e}")
return AuthResponse(
@ -154,10 +182,10 @@ class AuthService:
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
验证JWT token
Args:
token: JWT token字符串
Returns:
Dict: 用户信息验证失败返回None
"""
@ -166,46 +194,48 @@ class AuthService:
payload = verify_access_token(token)
if not payload:
return None
# 获取用户信息
user = self.user_storage.get_user_by_id(payload["user_id"])
if not user or not user.is_active:
user = self.user_table.get_user_by_id(payload["user_id"])
if not user or not user.get("is_active", True):
return None
return {
"user_id": user.id,
"username": user.username,
"email": user.email,
"display_name": user.display_name,
"avatar_url": user.avatar_url,
"last_login": user.last_login
"user_id": user["id"],
"username": user["username"],
"email": user["email"],
"display_name": user["display_name"],
"avatar_url": user.get("avatar_url", ""),
"last_login": user.get("last_login")
}
except Exception as e:
logger.error(f"Token verification error: {e}")
return None
def get_current_user(self, token: str) -> Optional[User]:
def get_current_user(self, token: str) -> Optional[Dict[str, Any]]:
"""
根据token获取当前用户
Args:
token: JWT token字符串
Returns:
User: 用户对象失败返回None
Dict: 用户信息字典失败返回None
"""
try:
payload = verify_access_token(token)
if not payload:
return None
user = self.user_storage.get_user_by_id(payload["user_id"])
if not user or not user.is_active:
user = self.user_table.get_user_by_id(payload["user_id"])
if not user or not user.get("is_active", True):
return None
return user
# 返回安全的用户信息(不包含密码哈希)
safe_user = {k: v for k, v in user.items() if k != "password_hash"}
return safe_user
except Exception as e:
logger.error(f"Get current user error: {e}")
return None