This commit is contained in:
root 2025-07-12 20:51:47 +08:00
parent 6eb511e138
commit fb63e5b0a8
1 changed files with 0 additions and 249 deletions

View File

@ -1,249 +0,0 @@
"""
用户数据存储服务
"""
import json
import os
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from python_core.models.user import User, generate_user_id
from python_core.config import settings
from python_core.utils.logger import logger
class UserStorage:
"""用户数据存储服务"""
def __init__(self):
# 用户数据文件路径
self.users_file = Path(settings.temp_dir) / "cache" / "users.json"
self.users_file.parent.mkdir(parents=True, exist_ok=True)
# 内存中的用户数据
self._users: Dict[str, User] = {}
# 加载用户数据
self._load_users()
logger.info(f"UserStorage initialized with {len(self._users)} users")
def _load_users(self):
"""从文件加载用户数据"""
try:
if self.users_file.exists():
with open(self.users_file, 'r', encoding='utf-8') as f:
users_data = json.load(f)
self._users = {}
for user_data in users_data:
user = User.from_dict(user_data)
self._users[user.id] = user
logger.info(f"Loaded {len(self._users)} users from {self.users_file}")
else:
self._users = {}
logger.info("No existing users file found, starting with empty user database")
except Exception as e:
logger.error(f"Failed to load users: {e}")
self._users = {}
def _save_users(self):
"""保存用户数据到文件"""
try:
users_data = [user.to_dict() for user in self._users.values()]
with open(self.users_file, 'w', encoding='utf-8') as f:
json.dump(users_data, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(self._users)} users to {self.users_file}")
except Exception as e:
logger.error(f"Failed to save users: {e}")
raise
def create_user(self, username: str, email: str, password_hash: str, display_name: str) -> User:
"""
创建新用户
Args:
username: 用户名
email: 邮箱
password_hash: 密码哈希
display_name: 显示名称
Returns:
User: 创建的用户对象
Raises:
ValueError: 用户名或邮箱已存在
"""
# 检查用户名是否已存在
if self.get_user_by_username(username):
raise ValueError(f"Username '{username}' already exists")
# 检查邮箱是否已存在
if self.get_user_by_email(email):
raise ValueError(f"Email '{email}' already exists")
# 创建新用户
user = User(
id=generate_user_id(),
username=username,
email=email,
password_hash=password_hash,
display_name=display_name
)
# 保存到内存和文件
self._users[user.id] = user
self._save_users()
logger.info(f"Created new user: {username} ({email})")
return user
def get_user_by_id(self, user_id: str) -> Optional[User]:
"""根据用户ID获取用户"""
return self._users.get(user_id)
def get_user_by_username(self, username: str) -> Optional[User]:
"""根据用户名获取用户"""
for user in self._users.values():
if user.username == username:
return user
return None
def get_user_by_email(self, email: str) -> Optional[User]:
"""根据邮箱获取用户"""
for user in self._users.values():
if user.email == email:
return user
return None
def get_user_by_username_or_email(self, username_or_email: str) -> Optional[User]:
"""根据用户名或邮箱获取用户"""
# 先尝试用户名
user = self.get_user_by_username(username_or_email)
if user:
return user
# 再尝试邮箱
return self.get_user_by_email(username_or_email)
def update_user(self, user: User) -> bool:
"""
更新用户信息
Args:
user: 用户对象
Returns:
bool: 更新是否成功
"""
try:
if user.id not in self._users:
logger.warning(f"User {user.id} not found for update")
return False
# 更新时间戳
user.updated_at = datetime.now().isoformat()
# 保存到内存和文件
self._users[user.id] = user
self._save_users()
logger.info(f"Updated user: {user.username}")
return True
except Exception as e:
logger.error(f"Failed to update user {user.id}: {e}")
return False
def delete_user(self, user_id: str) -> bool:
"""
删除用户
Args:
user_id: 用户ID
Returns:
bool: 删除是否成功
"""
try:
if user_id not in self._users:
logger.warning(f"User {user_id} not found for deletion")
return False
user = self._users[user_id]
del self._users[user_id]
self._save_users()
logger.info(f"Deleted user: {user.username}")
return True
except Exception as e:
logger.error(f"Failed to delete user {user_id}: {e}")
return False
def get_all_users(self, include_inactive: bool = False) -> List[User]:
"""
获取所有用户
Args:
include_inactive: 是否包含非活跃用户
Returns:
List[User]: 用户列表
"""
users = list(self._users.values())
if not include_inactive:
users = [user for user in users if user.is_active]
# 按创建时间排序
users.sort(key=lambda x: x.created_at, reverse=True)
return users
def get_user_count(self) -> Dict[str, int]:
"""获取用户统计信息"""
all_users = list(self._users.values())
active_users = [user for user in all_users if user.is_active]
return {
"total": len(all_users),
"active": len(active_users),
"inactive": len(all_users) - len(active_users)
}
def search_users(self, keyword: str) -> List[User]:
"""
搜索用户
Args:
keyword: 搜索关键词
Returns:
List[User]: 匹配的用户列表
"""
keyword = keyword.lower().strip()
if not keyword:
return self.get_all_users()
matching_users = []
for user in self._users.values():
if (keyword in user.username.lower() or
keyword in user.email.lower() or
keyword in user.display_name.lower()):
matching_users.append(user)
# 按创建时间排序
matching_users.sort(key=lambda x: x.created_at, reverse=True)
return matching_users
# 创建全局用户存储实例
user_storage = UserStorage()