mxivideo/python_core/utils/jwt_auth.py

206 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
JWT认证工具类
"""
import jwt
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from pathlib import Path
from python_core.utils.logger import logger
class JWTAuth:
"""JWT认证工具类"""
def __init__(self, secret_key: str = "mixvideo_secret_key_2024"):
self.secret_key = secret_key
self.algorithm = "HS256"
# JWT过期时间6个月
self.expires_delta = timedelta(days=180)
def generate_token(self, user_id: str, username: str, email: str) -> Dict[str, Any]:
"""
生成JWT token
Args:
user_id: 用户ID
username: 用户名
email: 邮箱
Returns:
Dict: 包含token和过期时间的字典
"""
try:
# 计算过期时间
expires_at = datetime.utcnow() + self.expires_delta
# JWT payload
payload = {
"user_id": user_id,
"username": username,
"email": email,
"iat": datetime.utcnow(), # 签发时间
"exp": expires_at, # 过期时间
"iss": "mixvideo", # 签发者
"sub": user_id # 主题用户ID
}
# 生成token
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
logger.info(f"Generated JWT token for user: {username}")
return {
"token": token,
"expires_at": expires_at.isoformat(),
"expires_in": int(self.expires_delta.total_seconds())
}
except Exception as e:
logger.error(f"Failed to generate JWT token: {e}")
raise Exception(f"Token generation failed: {str(e)}")
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
验证JWT token
Args:
token: JWT token字符串
Returns:
Dict: 解码后的payload验证失败返回None
"""
try:
# 解码并验证token
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": True} # 验证过期时间
)
logger.debug(f"Token verified for user: {payload.get('username')}")
return payload
except jwt.ExpiredSignatureError:
logger.warning("JWT token has expired")
return None
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid JWT token: {e}")
return None
except Exception as e:
logger.error(f"JWT token verification failed: {e}")
return None
def refresh_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
刷新JWT token
Args:
token: 当前的JWT token
Returns:
Dict: 新的token信息失败返回None
"""
try:
# 验证当前token忽略过期时间
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm],
options={"verify_exp": False} # 不验证过期时间
)
# 生成新token
new_token_info = self.generate_token(
user_id=payload["user_id"],
username=payload["username"],
email=payload["email"]
)
logger.info(f"Token refreshed for user: {payload['username']}")
return new_token_info
except Exception as e:
logger.error(f"Token refresh failed: {e}")
return None
def decode_token_without_verification(self, token: str) -> Optional[Dict[str, Any]]:
"""
解码token但不验证用于获取过期token的信息
Args:
token: JWT token字符串
Returns:
Dict: 解码后的payload
"""
try:
payload = jwt.decode(
token,
options={"verify_signature": False, "verify_exp": False}
)
return payload
except Exception as e:
logger.error(f"Token decode failed: {e}")
return None
def get_token_info(self, token: str) -> Dict[str, Any]:
"""
获取token信息
Args:
token: JWT token字符串
Returns:
Dict: token信息
"""
payload = self.decode_token_without_verification(token)
if not payload:
return {"valid": False, "error": "Invalid token format"}
try:
exp_timestamp = payload.get("exp")
if exp_timestamp:
exp_datetime = datetime.fromtimestamp(exp_timestamp)
is_expired = datetime.utcnow() > exp_datetime
time_remaining = exp_datetime - datetime.utcnow()
else:
is_expired = True
time_remaining = timedelta(0)
return {
"valid": not is_expired,
"user_id": payload.get("user_id"),
"username": payload.get("username"),
"email": payload.get("email"),
"issued_at": payload.get("iat"),
"expires_at": payload.get("exp"),
"is_expired": is_expired,
"time_remaining": str(time_remaining) if not is_expired else "Expired"
}
except Exception as e:
return {"valid": False, "error": str(e)}
# 创建全局JWT认证实例
jwt_auth = JWTAuth()
def generate_access_token(user_id: str, username: str, email: str) -> Dict[str, Any]:
"""生成访问token的便捷函数"""
return jwt_auth.generate_token(user_id, username, email)
def verify_access_token(token: str) -> Optional[Dict[str, Any]]:
"""验证访问token的便捷函数"""
return jwt_auth.verify_token(token)
def refresh_access_token(token: str) -> Optional[Dict[str, Any]]:
"""刷新访问token的便捷函数"""
return jwt_auth.refresh_token(token)