206 lines
6.4 KiB
Python
206 lines
6.4 KiB
Python
"""
|
||
JWT认证工具类
|
||
"""
|
||
|
||
import jwt
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Any
|
||
from pathlib import Path
|
||
|
||
from python_core.utils.logger import logger
|
||
|
||
|
||
class JWTAuth:
|
||
"""JWT认证工具类"""
|
||
|
||
def __init__(self, secret_key: str = "mixvideo_secret_key_2024"):
|
||
self.secret_key = secret_key
|
||
self.algorithm = "HS256"
|
||
# JWT过期时间:6个月
|
||
self.expires_delta = timedelta(days=180)
|
||
|
||
def generate_token(self, user_id: str, username: str, email: str) -> Dict[str, Any]:
|
||
"""
|
||
生成JWT token
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
username: 用户名
|
||
email: 邮箱
|
||
|
||
Returns:
|
||
Dict: 包含token和过期时间的字典
|
||
"""
|
||
try:
|
||
# 计算过期时间
|
||
expires_at = datetime.utcnow() + self.expires_delta
|
||
|
||
# JWT payload
|
||
payload = {
|
||
"user_id": user_id,
|
||
"username": username,
|
||
"email": email,
|
||
"iat": datetime.utcnow(), # 签发时间
|
||
"exp": expires_at, # 过期时间
|
||
"iss": "mixvideo", # 签发者
|
||
"sub": user_id # 主题(用户ID)
|
||
}
|
||
|
||
# 生成token
|
||
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
|
||
|
||
logger.info(f"Generated JWT token for user: {username}")
|
||
|
||
return {
|
||
"token": token,
|
||
"expires_at": expires_at.isoformat(),
|
||
"expires_in": int(self.expires_delta.total_seconds())
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate JWT token: {e}")
|
||
raise Exception(f"Token generation failed: {str(e)}")
|
||
|
||
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
验证JWT token
|
||
|
||
Args:
|
||
token: JWT token字符串
|
||
|
||
Returns:
|
||
Dict: 解码后的payload,验证失败返回None
|
||
"""
|
||
try:
|
||
# 解码并验证token
|
||
payload = jwt.decode(
|
||
token,
|
||
self.secret_key,
|
||
algorithms=[self.algorithm],
|
||
options={"verify_exp": True} # 验证过期时间
|
||
)
|
||
|
||
logger.debug(f"Token verified for user: {payload.get('username')}")
|
||
return payload
|
||
|
||
except jwt.ExpiredSignatureError:
|
||
logger.warning("JWT token has expired")
|
||
return None
|
||
except jwt.InvalidTokenError as e:
|
||
logger.warning(f"Invalid JWT token: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"JWT token verification failed: {e}")
|
||
return None
|
||
|
||
def refresh_token(self, token: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
刷新JWT token
|
||
|
||
Args:
|
||
token: 当前的JWT token
|
||
|
||
Returns:
|
||
Dict: 新的token信息,失败返回None
|
||
"""
|
||
try:
|
||
# 验证当前token(忽略过期时间)
|
||
payload = jwt.decode(
|
||
token,
|
||
self.secret_key,
|
||
algorithms=[self.algorithm],
|
||
options={"verify_exp": False} # 不验证过期时间
|
||
)
|
||
|
||
# 生成新token
|
||
new_token_info = self.generate_token(
|
||
user_id=payload["user_id"],
|
||
username=payload["username"],
|
||
email=payload["email"]
|
||
)
|
||
|
||
logger.info(f"Token refreshed for user: {payload['username']}")
|
||
return new_token_info
|
||
|
||
except Exception as e:
|
||
logger.error(f"Token refresh failed: {e}")
|
||
return None
|
||
|
||
def decode_token_without_verification(self, token: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
解码token但不验证(用于获取过期token的信息)
|
||
|
||
Args:
|
||
token: JWT token字符串
|
||
|
||
Returns:
|
||
Dict: 解码后的payload
|
||
"""
|
||
try:
|
||
payload = jwt.decode(
|
||
token,
|
||
options={"verify_signature": False, "verify_exp": False}
|
||
)
|
||
return payload
|
||
except Exception as e:
|
||
logger.error(f"Token decode failed: {e}")
|
||
return None
|
||
|
||
def get_token_info(self, token: str) -> Dict[str, Any]:
|
||
"""
|
||
获取token信息
|
||
|
||
Args:
|
||
token: JWT token字符串
|
||
|
||
Returns:
|
||
Dict: token信息
|
||
"""
|
||
payload = self.decode_token_without_verification(token)
|
||
if not payload:
|
||
return {"valid": False, "error": "Invalid token format"}
|
||
|
||
try:
|
||
exp_timestamp = payload.get("exp")
|
||
if exp_timestamp:
|
||
exp_datetime = datetime.fromtimestamp(exp_timestamp)
|
||
is_expired = datetime.utcnow() > exp_datetime
|
||
time_remaining = exp_datetime - datetime.utcnow()
|
||
else:
|
||
is_expired = True
|
||
time_remaining = timedelta(0)
|
||
|
||
return {
|
||
"valid": not is_expired,
|
||
"user_id": payload.get("user_id"),
|
||
"username": payload.get("username"),
|
||
"email": payload.get("email"),
|
||
"issued_at": payload.get("iat"),
|
||
"expires_at": payload.get("exp"),
|
||
"is_expired": is_expired,
|
||
"time_remaining": str(time_remaining) if not is_expired else "Expired"
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"valid": False, "error": str(e)}
|
||
|
||
|
||
# 创建全局JWT认证实例
|
||
jwt_auth = JWTAuth()
|
||
|
||
|
||
def generate_access_token(user_id: str, username: str, email: str) -> Dict[str, Any]:
|
||
"""生成访问token的便捷函数"""
|
||
return jwt_auth.generate_token(user_id, username, email)
|
||
|
||
|
||
def verify_access_token(token: str) -> Optional[Dict[str, Any]]:
|
||
"""验证访问token的便捷函数"""
|
||
return jwt_auth.verify_token(token)
|
||
|
||
|
||
def refresh_access_token(token: str) -> Optional[Dict[str, Any]]:
|
||
"""刷新访问token的便捷函数"""
|
||
return jwt_auth.refresh_token(token)
|