diff --git a/python_core/database/user.py b/python_core/database/user.py index 601501b..1c13722 100644 --- a/python_core/database/user.py +++ b/python_core/database/user.py @@ -1,9 +1,479 @@ # 用户表 -from python_core.kv import kv +import hashlib +import uuid +from typing import Dict, List, Any, Optional +from datetime import datetime from .db import Db -class UserDb(Db): +from python_core.utils.logger import setup_logger + +logger = setup_logger(__name__) + +class UserTable(Db): + """ + 用户表类 + 基于Db类实现的用户管理功能 + """ + def __init__(self): - self.key = "model" - self.users = self.kv.get(self.key) - pass \ No newline at end of file + # 使用固定的数据库键 + super().__init__("user_system") + self.table_name = "users" + + # 初始化用户表 + self._init_user_table() + + def _init_user_table(self): + """初始化用户表""" + try: + # 检查用户表是否存在,不存在则创建 + if not self._table_exists(self.table_name): + schema = { + "username": "string", + "email": "string", + "password_hash": "string", + "display_name": "string", + "avatar_url": "string", + "is_active": "boolean", + "last_login": "datetime", + "created_at": "datetime", + "updated_at": "datetime" + } + + self.create_table(self.table_name, schema) + logger.info("User table initialized") + else: + logger.info("User table already exists") + + except Exception as e: + logger.error(f"Failed to initialize user table: {e}") + raise e + + def _hash_password(self, password: str) -> str: + """密码哈希""" + return hashlib.sha256(password.encode()).hexdigest() + + def _verify_password(self, password: str, password_hash: str) -> bool: + """验证密码""" + return self._hash_password(password) == password_hash + + def create_user(self, username: str, email: str, password: str, display_name: str = None) -> str: + """ + 创建用户 + + Args: + username: 用户名 + email: 邮箱 + password: 密码 + display_name: 显示名称 + + Returns: + 用户ID + """ + try: + # 检查用户名是否已存在 + if self.get_user_by_username(username): + raise ValueError(f"Username '{username}' already exists") + + # 检查邮箱是否已存在 + if self.get_user_by_email(email): + raise ValueError(f"Email '{email}' already exists") + + # 创建用户数据 + user_data = { + "username": username, + "email": email, + "password_hash": self._hash_password(password), + "display_name": display_name or username, + "avatar_url": "", + "is_active": True, + "last_login": None, + "created_at": datetime.now().isoformat(), + "updated_at": datetime.now().isoformat() + } + + # 插入用户记录 + user_id = self.insert(self.table_name, user_data) + + logger.info(f"Created user: {username} (ID: {user_id})") + return user_id + + except Exception as e: + logger.error(f"Failed to create user '{username}': {e}") + raise e + + def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: + """ + 根据ID获取用户 + + Args: + user_id: 用户ID + + Returns: + 用户信息,如果不存在返回None + """ + try: + record = self.get(self.table_name, user_id) + if record: + return { + "id": record["id"], + **record["data"], + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at") + } + return None + + except Exception as e: + logger.error(f"Failed to get user by ID '{user_id}': {e}") + return None + + def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: + """ + 根据用户名获取用户 + + Args: + username: 用户名 + + Returns: + 用户信息,如果不存在返回None + """ + try: + users = self.find_by_field(self.table_name, "username", username) + if users: + record = users[0] + return { + "id": record["id"], + **record["data"], + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at") + } + return None + + except Exception as e: + logger.error(f"Failed to get user by username '{username}': {e}") + return None + + def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]: + """ + 根据邮箱获取用户 + + Args: + email: 邮箱 + + Returns: + 用户信息,如果不存在返回None + """ + try: + users = self.find_by_field(self.table_name, "email", email) + if users: + record = users[0] + return { + "id": record["id"], + **record["data"], + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at") + } + return None + + except Exception as e: + logger.error(f"Failed to get user by email '{email}': {e}") + return None + + def authenticate_user(self, username_or_email: str, password: str) -> Optional[Dict[str, Any]]: + """ + 用户认证 + + Args: + username_or_email: 用户名或邮箱 + password: 密码 + + Returns: + 认证成功返回用户信息,失败返回None + """ + try: + # 尝试按用户名查找 + user = self.get_user_by_username(username_or_email) + + # 如果按用户名找不到,尝试按邮箱查找 + if not user: + user = self.get_user_by_email(username_or_email) + + # 验证用户存在且密码正确 + if user and self._verify_password(password, user["password_hash"]): + # 更新最后登录时间 + self.update_last_login(user["id"]) + + # 返回用户信息(不包含密码哈希) + user_info = user.copy() + del user_info["password_hash"] + + logger.info(f"User authenticated: {user['username']}") + return user_info + + logger.warning(f"Authentication failed for: {username_or_email}") + return None + + except Exception as e: + logger.error(f"Authentication error for '{username_or_email}': {e}") + return None + + def update_user(self, user_id: str, updates: Dict[str, Any]) -> bool: + """ + 更新用户信息 + + Args: + user_id: 用户ID + updates: 要更新的字段 + + Returns: + 更新成功返回True + """ + try: + # 获取现有用户信息 + existing_user = self.get_user_by_id(user_id) + if not existing_user: + logger.warning(f"User not found: {user_id}") + return False + + # 准备更新数据 + updated_data = existing_user.copy() + + # 移除不应该直接更新的字段 + protected_fields = ["id", "created_at", "password_hash"] + for field in protected_fields: + if field in updates: + del updates[field] + + # 应用更新 + updated_data.update(updates) + updated_data["updated_at"] = datetime.now().isoformat() + + # 移除元数据字段,只保留数据字段 + data_fields = {k: v for k, v in updated_data.items() + if k not in ["id", "created_at", "updated_at"]} + + # 执行更新 + result = self.update(self.table_name, user_id, data_fields) + + if result: + logger.info(f"Updated user: {user_id}") + + return result + + except Exception as e: + logger.error(f"Failed to update user '{user_id}': {e}") + return False + + def update_password(self, user_id: str, new_password: str) -> bool: + """ + 更新用户密码 + + Args: + user_id: 用户ID + new_password: 新密码 + + Returns: + 更新成功返回True + """ + try: + # 获取现有用户信息 + existing_user = self.get_user_by_id(user_id) + if not existing_user: + logger.warning(f"User not found: {user_id}") + return False + + # 更新密码哈希 + updated_data = existing_user.copy() + updated_data["password_hash"] = self._hash_password(new_password) + updated_data["updated_at"] = datetime.now().isoformat() + + # 移除元数据字段 + data_fields = {k: v for k, v in updated_data.items() + if k not in ["id", "created_at", "updated_at"]} + + # 执行更新 + result = self.update(self.table_name, user_id, data_fields) + + if result: + logger.info(f"Password updated for user: {user_id}") + + return result + + except Exception as e: + logger.error(f"Failed to update password for user '{user_id}': {e}") + return False + + def update_last_login(self, user_id: str) -> bool: + """ + 更新最后登录时间 + + Args: + user_id: 用户ID + + Returns: + 更新成功返回True + """ + try: + return self.update_user(user_id, { + "last_login": datetime.now().isoformat() + }) + + except Exception as e: + logger.error(f"Failed to update last login for user '{user_id}': {e}") + return False + + def deactivate_user(self, user_id: str) -> bool: + """ + 停用用户 + + Args: + user_id: 用户ID + + Returns: + 操作成功返回True + """ + try: + return self.update_user(user_id, {"is_active": False}) + + except Exception as e: + logger.error(f"Failed to deactivate user '{user_id}': {e}") + return False + + def activate_user(self, user_id: str) -> bool: + """ + 激活用户 + + Args: + user_id: 用户ID + + Returns: + 操作成功返回True + """ + try: + return self.update_user(user_id, {"is_active": True}) + + except Exception as e: + logger.error(f"Failed to activate user '{user_id}': {e}") + return False + + def delete_user(self, user_id: str) -> bool: + """ + 删除用户 + + Args: + user_id: 用户ID + + Returns: + 删除成功返回True + """ + try: + result = self.delete(self.table_name, user_id) + + if result: + logger.info(f"Deleted user: {user_id}") + + return result + + except Exception as e: + logger.error(f"Failed to delete user '{user_id}': {e}") + return False + + def get_all_users(self, include_inactive: bool = False, limit: int = 100) -> List[Dict[str, Any]]: + """ + 获取所有用户 + + Args: + include_inactive: 是否包含非活跃用户 + limit: 最大返回数量 + + Returns: + 用户列表 + """ + try: + all_records = self.find_all(self.table_name, limit) + + users = [] + for record in all_records: + user_info = { + "id": record["id"], + **record["data"], + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at") + } + + # 移除密码哈希 + if "password_hash" in user_info: + del user_info["password_hash"] + + # 根据活跃状态过滤 + if include_inactive or user_info.get("is_active", True): + users.append(user_info) + + return users + + except Exception as e: + logger.error(f"Failed to get all users: {e}") + return [] + + def get_user_count(self, include_inactive: bool = False) -> int: + """ + 获取用户数量 + + Args: + include_inactive: 是否包含非活跃用户 + + Returns: + 用户数量 + """ + try: + if include_inactive: + return self.count(self.table_name) + else: + # 获取活跃用户数量(简单实现,可优化) + active_users = self.get_all_users(include_inactive=False) + return len(active_users) + + except Exception as e: + logger.error(f"Failed to get user count: {e}") + return 0 + + def search_users(self, query: str, limit: int = 50) -> List[Dict[str, Any]]: + """ + 搜索用户 + + Args: + query: 搜索关键词(匹配用户名、邮箱、显示名称) + limit: 最大返回数量 + + Returns: + 匹配的用户列表 + """ + try: + all_users = self.get_all_users(include_inactive=True, limit=limit * 2) + + matching_users = [] + query_lower = query.lower() + + for user in all_users: + if len(matching_users) >= limit: + break + + # 检查用户名、邮箱、显示名称是否匹配 + username = user.get("username", "").lower() + email = user.get("email", "").lower() + display_name = user.get("display_name", "").lower() + + if (query_lower in username or + query_lower in email or + query_lower in display_name): + matching_users.append(user) + + return matching_users + + except Exception as e: + logger.error(f"Failed to search users with query '{query}': {e}") + return [] + + +# 创建全局用户表实例 +user_table = UserTable() \ No newline at end of file diff --git a/python_core/kv.py b/python_core/kv.py index efe616e..639384a 100644 --- a/python_core/kv.py +++ b/python_core/kv.py @@ -154,7 +154,30 @@ class Kv: except Exception as e: logger.error(f"An unexpected error occurred while bulk setting keys: {e}") raise e - + + def gets(self, keys: list[str]): + """ + 批量获取多个键的值 + + Args: + keys: 要获取的键列表 + + Returns: + 包含键值对的字典 + """ + try: + # 由于Cloudflare KV的批量获取API有问题,我们使用单个获取的方式 + result = {} + for key in keys: + value = self.get(key) + if value is not None: + result[key] = value + return result + + except Exception as e: + logger.error(f"An error occurred while bulk getting keys from cloudflare: {e}") + raise e + def removes(self, keys: list[str]): """ 批量删除多个键