""" JWT认证工具类 """ import jwt import json from datetime import datetime, timedelta from typing import Optional, Dict, Any from pathlib import Path from python_core.utils.logger import logger class JWTAuth: """JWT认证工具类""" def __init__(self, secret_key: str = "mixvideo_secret_key_2024"): self.secret_key = secret_key self.algorithm = "HS256" # JWT过期时间:6个月 self.expires_delta = timedelta(days=180) def generate_token(self, user_id: str, username: str, email: str) -> Dict[str, Any]: """ 生成JWT token Args: user_id: 用户ID username: 用户名 email: 邮箱 Returns: Dict: 包含token和过期时间的字典 """ try: # 计算过期时间 expires_at = datetime.utcnow() + self.expires_delta # JWT payload payload = { "user_id": user_id, "username": username, "email": email, "iat": datetime.utcnow(), # 签发时间 "exp": expires_at, # 过期时间 "iss": "mixvideo", # 签发者 "sub": user_id # 主题(用户ID) } # 生成token token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm) logger.info(f"Generated JWT token for user: {username}") return { "token": token, "expires_at": expires_at.isoformat(), "expires_in": int(self.expires_delta.total_seconds()) } except Exception as e: logger.error(f"Failed to generate JWT token: {e}") raise Exception(f"Token generation failed: {str(e)}") def verify_token(self, token: str) -> Optional[Dict[str, Any]]: """ 验证JWT token Args: token: JWT token字符串 Returns: Dict: 解码后的payload,验证失败返回None """ try: # 解码并验证token payload = jwt.decode( token, self.secret_key, algorithms=[self.algorithm], options={"verify_exp": True} # 验证过期时间 ) logger.debug(f"Token verified for user: {payload.get('username')}") return payload except jwt.ExpiredSignatureError: logger.warning("JWT token has expired") return None except jwt.InvalidTokenError as e: logger.warning(f"Invalid JWT token: {e}") return None except Exception as e: logger.error(f"JWT token verification failed: {e}") return None def refresh_token(self, token: str) -> Optional[Dict[str, Any]]: """ 刷新JWT token Args: token: 当前的JWT token Returns: Dict: 新的token信息,失败返回None """ try: # 验证当前token(忽略过期时间) payload = jwt.decode( token, self.secret_key, algorithms=[self.algorithm], options={"verify_exp": False} # 不验证过期时间 ) # 生成新token new_token_info = self.generate_token( user_id=payload["user_id"], username=payload["username"], email=payload["email"] ) logger.info(f"Token refreshed for user: {payload['username']}") return new_token_info except Exception as e: logger.error(f"Token refresh failed: {e}") return None def decode_token_without_verification(self, token: str) -> Optional[Dict[str, Any]]: """ 解码token但不验证(用于获取过期token的信息) Args: token: JWT token字符串 Returns: Dict: 解码后的payload """ try: payload = jwt.decode( token, options={"verify_signature": False, "verify_exp": False} ) return payload except Exception as e: logger.error(f"Token decode failed: {e}") return None def get_token_info(self, token: str) -> Dict[str, Any]: """ 获取token信息 Args: token: JWT token字符串 Returns: Dict: token信息 """ payload = self.decode_token_without_verification(token) if not payload: return {"valid": False, "error": "Invalid token format"} try: exp_timestamp = payload.get("exp") if exp_timestamp: exp_datetime = datetime.fromtimestamp(exp_timestamp) is_expired = datetime.utcnow() > exp_datetime time_remaining = exp_datetime - datetime.utcnow() else: is_expired = True time_remaining = timedelta(0) return { "valid": not is_expired, "user_id": payload.get("user_id"), "username": payload.get("username"), "email": payload.get("email"), "issued_at": payload.get("iat"), "expires_at": payload.get("exp"), "is_expired": is_expired, "time_remaining": str(time_remaining) if not is_expired else "Expired" } except Exception as e: return {"valid": False, "error": str(e)} # 创建全局JWT认证实例 jwt_auth = JWTAuth() def generate_access_token(user_id: str, username: str, email: str) -> Dict[str, Any]: """生成访问token的便捷函数""" return jwt_auth.generate_token(user_id, username, email) def verify_access_token(token: str) -> Optional[Dict[str, Any]]: """验证访问token的便捷函数""" return jwt_auth.verify_token(token) def refresh_access_token(token: str) -> Optional[Dict[str, Any]]: """刷新访问token的便捷函数""" return jwt_auth.refresh_token(token)