mxivideo/python_core/api/auth_api.py

246 lines
7.0 KiB
Python

"""
用户认证API接口
"""
from typing import Dict, Any, Optional
import json
from python_core.models.user import LoginRequest, RegisterRequest
from python_core.services.auth_service import auth_service
from python_core.utils.logger import logger
class AuthAPI:
"""用户认证API"""
def __init__(self):
self.auth_service = auth_service
logger.info("AuthAPI initialized")
def register(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
用户注册API
Args:
data: 注册数据
{
"username": "用户名",
"email": "邮箱",
"password": "密码",
"display_name": "显示名称" (可选)
}
Returns:
Dict: API响应
"""
try:
# 创建注册请求对象
display_name = data.get("display_name", "") or ""
request = RegisterRequest(
username=data.get("username", "").strip(),
email=data.get("email", "").strip(),
password=data.get("password", ""),
display_name=display_name.strip() or None
)
# 执行注册
response = self.auth_service.register(request)
# 返回API响应
return {
"success": response.success,
"message": response.message,
"data": {
"user": response.user,
"token": response.token,
"expires_at": response.expires_at
} if response.success else None
}
except Exception as e:
logger.error(f"Register API error: {e}")
return {
"success": False,
"message": "注册失败,请稍后重试",
"data": None
}
def login(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
用户登录API
Args:
data: 登录数据
{
"username_or_email": "用户名或邮箱",
"password": "密码"
}
Returns:
Dict: API响应
"""
try:
# 创建登录请求对象
request = LoginRequest(
username_or_email=data.get("username_or_email", "").strip(),
password=data.get("password", "")
)
# 执行登录
response = self.auth_service.login(request)
# 返回API响应
return {
"success": response.success,
"message": response.message,
"data": {
"user": response.user,
"token": response.token,
"expires_at": response.expires_at
} if response.success else None
}
except Exception as e:
logger.error(f"Login API error: {e}")
return {
"success": False,
"message": "登录失败,请稍后重试",
"data": None
}
def verify_token(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
验证token API
Args:
data: 验证数据
{
"token": "JWT token"
}
Returns:
Dict: API响应
"""
try:
token = data.get("token", "")
if not token:
return {
"success": False,
"message": "Token不能为空",
"data": None
}
# 验证token
user_info = self.auth_service.verify_token(token)
if user_info:
return {
"success": True,
"message": "Token验证成功",
"data": {
"user": user_info,
"valid": True
}
}
else:
return {
"success": False,
"message": "Token无效或已过期",
"data": {
"valid": False
}
}
except Exception as e:
logger.error(f"Verify token API error: {e}")
return {
"success": False,
"message": "Token验证失败",
"data": None
}
def get_current_user(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
获取当前用户信息API
Args:
data: 请求数据
{
"token": "JWT token"
}
Returns:
Dict: API响应
"""
try:
token = data.get("token", "")
if not token:
return {
"success": False,
"message": "Token不能为空",
"data": None
}
# 获取当前用户
user = self.auth_service.get_current_user(token)
if user:
return {
"success": True,
"message": "获取用户信息成功",
"data": {
"user": user.to_safe_dict()
}
}
else:
return {
"success": False,
"message": "用户不存在或token无效",
"data": None
}
except Exception as e:
logger.error(f"Get current user API error: {e}")
return {
"success": False,
"message": "获取用户信息失败",
"data": None
}
# 创建全局认证API实例
auth_api = AuthAPI()
# JSON-RPC风格的便捷函数
def register_user(username: str, email: str, password: str, display_name: Optional[str] = None) -> Dict[str, Any]:
"""注册用户的便捷函数"""
return auth_api.register({
"username": username,
"email": email,
"password": password,
"display_name": display_name
})
def login_user(username_or_email: str, password: str) -> Dict[str, Any]:
"""登录用户的便捷函数"""
return auth_api.login({
"username_or_email": username_or_email,
"password": password
})
def verify_user_token(token: str) -> Dict[str, Any]:
"""验证用户token的便捷函数"""
return auth_api.verify_token({
"token": token
})
def get_user_info(token: str) -> Dict[str, Any]:
"""获取用户信息的便捷函数"""
return auth_api.get_current_user({
"token": token
})