""" 用户数据存储服务 """ import json import os from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime from python_core.models.user import User, generate_user_id from python_core.config import settings from python_core.utils.logger import logger class UserStorage: """用户数据存储服务""" def __init__(self): # 用户数据文件路径 self.users_file = Path(settings.temp_dir) / "cache" / "users.json" self.users_file.parent.mkdir(parents=True, exist_ok=True) # 内存中的用户数据 self._users: Dict[str, User] = {} # 加载用户数据 self._load_users() logger.info(f"UserStorage initialized with {len(self._users)} users") def _load_users(self): """从文件加载用户数据""" try: if self.users_file.exists(): with open(self.users_file, 'r', encoding='utf-8') as f: users_data = json.load(f) self._users = {} for user_data in users_data: user = User.from_dict(user_data) self._users[user.id] = user logger.info(f"Loaded {len(self._users)} users from {self.users_file}") else: self._users = {} logger.info("No existing users file found, starting with empty user database") except Exception as e: logger.error(f"Failed to load users: {e}") self._users = {} def _save_users(self): """保存用户数据到文件""" try: users_data = [user.to_dict() for user in self._users.values()] with open(self.users_file, 'w', encoding='utf-8') as f: json.dump(users_data, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(self._users)} users to {self.users_file}") except Exception as e: logger.error(f"Failed to save users: {e}") raise def create_user(self, username: str, email: str, password_hash: str, display_name: str) -> User: """ 创建新用户 Args: username: 用户名 email: 邮箱 password_hash: 密码哈希 display_name: 显示名称 Returns: User: 创建的用户对象 Raises: ValueError: 用户名或邮箱已存在 """ # 检查用户名是否已存在 if self.get_user_by_username(username): raise ValueError(f"Username '{username}' already exists") # 检查邮箱是否已存在 if self.get_user_by_email(email): raise ValueError(f"Email '{email}' already exists") # 创建新用户 user = User( id=generate_user_id(), username=username, email=email, password_hash=password_hash, display_name=display_name ) # 保存到内存和文件 self._users[user.id] = user self._save_users() logger.info(f"Created new user: {username} ({email})") return user def get_user_by_id(self, user_id: str) -> Optional[User]: """根据用户ID获取用户""" return self._users.get(user_id) def get_user_by_username(self, username: str) -> Optional[User]: """根据用户名获取用户""" for user in self._users.values(): if user.username == username: return user return None def get_user_by_email(self, email: str) -> Optional[User]: """根据邮箱获取用户""" for user in self._users.values(): if user.email == email: return user return None def get_user_by_username_or_email(self, username_or_email: str) -> Optional[User]: """根据用户名或邮箱获取用户""" # 先尝试用户名 user = self.get_user_by_username(username_or_email) if user: return user # 再尝试邮箱 return self.get_user_by_email(username_or_email) def update_user(self, user: User) -> bool: """ 更新用户信息 Args: user: 用户对象 Returns: bool: 更新是否成功 """ try: if user.id not in self._users: logger.warning(f"User {user.id} not found for update") return False # 更新时间戳 user.updated_at = datetime.now().isoformat() # 保存到内存和文件 self._users[user.id] = user self._save_users() logger.info(f"Updated user: {user.username}") return True except Exception as e: logger.error(f"Failed to update user {user.id}: {e}") return False def delete_user(self, user_id: str) -> bool: """ 删除用户 Args: user_id: 用户ID Returns: bool: 删除是否成功 """ try: if user_id not in self._users: logger.warning(f"User {user_id} not found for deletion") return False user = self._users[user_id] del self._users[user_id] self._save_users() logger.info(f"Deleted user: {user.username}") return True except Exception as e: logger.error(f"Failed to delete user {user_id}: {e}") return False def get_all_users(self, include_inactive: bool = False) -> List[User]: """ 获取所有用户 Args: include_inactive: 是否包含非活跃用户 Returns: List[User]: 用户列表 """ users = list(self._users.values()) if not include_inactive: users = [user for user in users if user.is_active] # 按创建时间排序 users.sort(key=lambda x: x.created_at, reverse=True) return users def get_user_count(self) -> Dict[str, int]: """获取用户统计信息""" all_users = list(self._users.values()) active_users = [user for user in all_users if user.is_active] return { "total": len(all_users), "active": len(active_users), "inactive": len(all_users) - len(active_users) } def search_users(self, keyword: str) -> List[User]: """ 搜索用户 Args: keyword: 搜索关键词 Returns: List[User]: 匹配的用户列表 """ keyword = keyword.lower().strip() if not keyword: return self.get_all_users() matching_users = [] for user in self._users.values(): if (keyword in user.username.lower() or keyword in user.email.lower() or keyword in user.display_name.lower()): matching_users.append(user) # 按创建时间排序 matching_users.sort(key=lambda x: x.created_at, reverse=True) return matching_users # 创建全局用户存储实例 user_storage = UserStorage()