diff --git a/python_core/services/user_storage.py b/python_core/services/user_storage.py deleted file mode 100644 index 550e096..0000000 --- a/python_core/services/user_storage.py +++ /dev/null @@ -1,249 +0,0 @@ -""" -用户数据存储服务 -""" - -import json -import os -from pathlib import Path -from typing import List, Optional, Dict, Any -from datetime import datetime - -from python_core.models.user import User, generate_user_id -from python_core.config import settings -from python_core.utils.logger import logger - - -class UserStorage: - """用户数据存储服务""" - - def __init__(self): - # 用户数据文件路径 - self.users_file = Path(settings.temp_dir) / "cache" / "users.json" - self.users_file.parent.mkdir(parents=True, exist_ok=True) - - # 内存中的用户数据 - self._users: Dict[str, User] = {} - - # 加载用户数据 - self._load_users() - - logger.info(f"UserStorage initialized with {len(self._users)} users") - - def _load_users(self): - """从文件加载用户数据""" - try: - if self.users_file.exists(): - with open(self.users_file, 'r', encoding='utf-8') as f: - users_data = json.load(f) - - self._users = {} - for user_data in users_data: - user = User.from_dict(user_data) - self._users[user.id] = user - - logger.info(f"Loaded {len(self._users)} users from {self.users_file}") - else: - self._users = {} - logger.info("No existing users file found, starting with empty user database") - - except Exception as e: - logger.error(f"Failed to load users: {e}") - self._users = {} - - def _save_users(self): - """保存用户数据到文件""" - try: - users_data = [user.to_dict() for user in self._users.values()] - - with open(self.users_file, 'w', encoding='utf-8') as f: - json.dump(users_data, f, ensure_ascii=False, indent=2) - - logger.debug(f"Saved {len(self._users)} users to {self.users_file}") - - except Exception as e: - logger.error(f"Failed to save users: {e}") - raise - - def create_user(self, username: str, email: str, password_hash: str, display_name: str) -> User: - """ - 创建新用户 - - Args: - username: 用户名 - email: 邮箱 - password_hash: 密码哈希 - display_name: 显示名称 - - Returns: - User: 创建的用户对象 - - Raises: - ValueError: 用户名或邮箱已存在 - """ - # 检查用户名是否已存在 - if self.get_user_by_username(username): - raise ValueError(f"Username '{username}' already exists") - - # 检查邮箱是否已存在 - if self.get_user_by_email(email): - raise ValueError(f"Email '{email}' already exists") - - # 创建新用户 - user = User( - id=generate_user_id(), - username=username, - email=email, - password_hash=password_hash, - display_name=display_name - ) - - # 保存到内存和文件 - self._users[user.id] = user - self._save_users() - - logger.info(f"Created new user: {username} ({email})") - return user - - def get_user_by_id(self, user_id: str) -> Optional[User]: - """根据用户ID获取用户""" - return self._users.get(user_id) - - def get_user_by_username(self, username: str) -> Optional[User]: - """根据用户名获取用户""" - for user in self._users.values(): - if user.username == username: - return user - return None - - def get_user_by_email(self, email: str) -> Optional[User]: - """根据邮箱获取用户""" - for user in self._users.values(): - if user.email == email: - return user - return None - - def get_user_by_username_or_email(self, username_or_email: str) -> Optional[User]: - """根据用户名或邮箱获取用户""" - # 先尝试用户名 - user = self.get_user_by_username(username_or_email) - if user: - return user - - # 再尝试邮箱 - return self.get_user_by_email(username_or_email) - - def update_user(self, user: User) -> bool: - """ - 更新用户信息 - - Args: - user: 用户对象 - - Returns: - bool: 更新是否成功 - """ - try: - if user.id not in self._users: - logger.warning(f"User {user.id} not found for update") - return False - - # 更新时间戳 - user.updated_at = datetime.now().isoformat() - - # 保存到内存和文件 - self._users[user.id] = user - self._save_users() - - logger.info(f"Updated user: {user.username}") - return True - - except Exception as e: - logger.error(f"Failed to update user {user.id}: {e}") - return False - - def delete_user(self, user_id: str) -> bool: - """ - 删除用户 - - Args: - user_id: 用户ID - - Returns: - bool: 删除是否成功 - """ - try: - if user_id not in self._users: - logger.warning(f"User {user_id} not found for deletion") - return False - - user = self._users[user_id] - del self._users[user_id] - self._save_users() - - logger.info(f"Deleted user: {user.username}") - return True - - except Exception as e: - logger.error(f"Failed to delete user {user_id}: {e}") - return False - - def get_all_users(self, include_inactive: bool = False) -> List[User]: - """ - 获取所有用户 - - Args: - include_inactive: 是否包含非活跃用户 - - Returns: - List[User]: 用户列表 - """ - users = list(self._users.values()) - - if not include_inactive: - users = [user for user in users if user.is_active] - - # 按创建时间排序 - users.sort(key=lambda x: x.created_at, reverse=True) - - return users - - def get_user_count(self) -> Dict[str, int]: - """获取用户统计信息""" - all_users = list(self._users.values()) - active_users = [user for user in all_users if user.is_active] - - return { - "total": len(all_users), - "active": len(active_users), - "inactive": len(all_users) - len(active_users) - } - - def search_users(self, keyword: str) -> List[User]: - """ - 搜索用户 - - Args: - keyword: 搜索关键词 - - Returns: - List[User]: 匹配的用户列表 - """ - keyword = keyword.lower().strip() - if not keyword: - return self.get_all_users() - - matching_users = [] - for user in self._users.values(): - if (keyword in user.username.lower() or - keyword in user.email.lower() or - keyword in user.display_name.lower()): - matching_users.append(user) - - # 按创建时间排序 - matching_users.sort(key=lambda x: x.created_at, reverse=True) - - return matching_users - - -# 创建全局用户存储实例 -user_storage = UserStorage()