From 2a9199ca1620182f8e8dd460afe8b6a25b8b3bc0 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 12 Jul 2025 22:18:45 +0800 Subject: [PATCH] fix --- python_core/cli/commands/template.py | 94 +++++ python_core/database/template.py | 502 --------------------------- python_core/database/user.py | 479 ------------------------- src-tauri/src/commands/template.rs | 24 ++ 4 files changed, 118 insertions(+), 981 deletions(-) delete mode 100644 python_core/database/template.py delete mode 100644 python_core/database/user.py diff --git a/python_core/cli/commands/template.py b/python_core/cli/commands/template.py index 1ce974c..aa44a21 100644 --- a/python_core/cli/commands/template.py +++ b/python_core/cli/commands/template.py @@ -131,6 +131,100 @@ def get_template( response.error(-32603, f"获取模板详情失败: {str(e)}") +@template_app.command("detail") +def get_template_detail( + template_id: str = typer.Argument(..., help="模板ID"), + user_id: Optional[str] = typer.Option(None, "--user-id", help="用户ID"), + verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出"), + json_output: bool = typer.Option(True, "--json", help="JSON格式输出") +): + """获取模板详细信息(包含轨道和片段信息)""" + response = create_response_handler() + try: + # 使用 PostgreSQL 模板表 + from python_core.database.template_postgres import template_table + + # 获取模板基本信息 + template = template_table.get_template_by_id(template_id) + if not template: + response.error(-32603, f"模板不存在: {template_id}") + return + + # 获取 draft_content + draft_content = template_table.get_draft_content(template_id) + if not draft_content: + response.error(-32603, f"模板详细信息不存在: {template_id}") + return + + # 构建详细信息 + detail = { + 'id': template.id, + 'name': template.name, + 'description': template.description, + 'canvas_config': template.canvas_config, + 'duration': template.duration, + 'fps': draft_content.get('canvas_config', {}).get('fps', 30), + 'sample_rate': draft_content.get('canvas_config', {}).get('sample_rate'), + 'tracks': [] + } + + # 处理轨道信息 + tracks_data = draft_content.get('tracks', []) + materials_data = draft_content.get('materials', []) + + # 创建素材查找表 + materials_lookup = {} + for material in materials_data: + material_id = material.get('id', '') + if material_id: + materials_lookup[material_id] = material + + # 处理轨道 + for idx, track_data in enumerate(tracks_data): + track = { + 'id': track_data.get('id', f'track_{idx}'), + 'name': track_data.get('name', f'轨道 {idx + 1}'), + 'type': track_data.get('type', 'video'), + 'index': idx, + 'segments': [], + 'properties': track_data.get('properties', {}) + } + + # 处理片段 + segments_data = track_data.get('segments', []) + for segment_data in segments_data: + # 获取时间信息 + start_time = segment_data.get('start', 0) / 1000.0 # 转换为秒 + end_time = segment_data.get('end', 0) / 1000.0 + duration = end_time - start_time + + # 获取素材信息 + material_id = segment_data.get('material_id', '') + material = materials_lookup.get(material_id, {}) + + segment = { + 'id': segment_data.get('id', f'segment_{len(track["segments"])}'), + 'type': segment_data.get('type', material.get('type', 'video')), + 'name': segment_data.get('name', material.get('name', f'片段 {len(track["segments"]) + 1}')), + 'start_time': start_time, + 'end_time': end_time, + 'duration': duration, + 'resource_path': material.get('path', ''), + 'properties': segment_data.get('properties', {}), + 'effects': segment_data.get('effects', []) + } + + track['segments'].append(segment) + + detail['tracks'].append(track) + + response.success(detail) + + except Exception as e: + logger.error(f"获取模板详情失败: {e}") + response.error(-32603, f"获取模板详情失败: {str(e)}") + + @template_app.command("delete") def delete_template( template_id: str = typer.Argument(..., help="模板ID"), diff --git a/python_core/database/template.py b/python_core/database/template.py deleted file mode 100644 index 8715bdc..0000000 --- a/python_core/database/template.py +++ /dev/null @@ -1,502 +0,0 @@ -# 模板表 - -import uuid -import json -from typing import Dict, List, Any, Optional -from datetime import datetime -from .db import Db -from .types import TemplateInfo -from python_core.utils.logger import setup_logger - -logger = setup_logger(__name__) - -class TemplateTable(Db): - """ - 模板表类 - 基于Db类实现的模板管理功能 - """ - - def __init__(self): - # 使用固定的数据库键 - super().__init__("mixvideo") - self.table_name = "templates" - - # 初始化模板表 - self._init_template_table() - - def _init_template_table(self): - """初始化模板表""" - try: - # 检查模板表是否存在,不存在则创建 - if not self._table_exists(self.table_name): - schema = { - "name": "string", - "description": "string", - "thumbnail_path": "string", - "draft_content_path": "string", - "resources_path": "string", - "canvas_config": "json", - "duration": "integer", - "material_count": "integer", - "track_count": "integer", - "tags": "json", - "is_cloud": "boolean", - "user_id": "string", - "created_at": "datetime", - "updated_at": "datetime" - } - - self.create_table(self.table_name, schema) - logger.info("Template table initialized") - else: - logger.info("Template table already exists") - - except Exception as e: - logger.error(f"Failed to initialize template table: {e}") - raise e - - def create_template(self, template_info: TemplateInfo) -> str: - """ - 创建模板 - - Args: - template_info: 模板信息 - - Returns: - 模板ID - """ - try: - # 检查模板名称是否已存在(同一用户下) - existing_template = self.get_template_by_name(template_info.name, template_info.user_id) - if existing_template: - raise ValueError(f"Template name '{template_info.name}' already exists for this user") - - # 准备模板数据 - template_data = { - "name": template_info.name, - "description": template_info.description, - "thumbnail_path": template_info.thumbnail_path, - "draft_content_path": template_info.draft_content_path, - "resources_path": template_info.resources_path, - "canvas_config": template_info.canvas_config, - "duration": template_info.duration, - "material_count": template_info.material_count, - "track_count": template_info.track_count, - "tags": template_info.tags, - "is_cloud": template_info.is_cloud, - "user_id": template_info.user_id, - "created_at": datetime.now().isoformat(), - "updated_at": datetime.now().isoformat() - } - - # 插入模板记录 - template_id = self.insert(self.table_name, template_data) - - logger.info(f"Created template: {template_info.name} (ID: {template_id})") - return template_id - - except Exception as e: - logger.error(f"Failed to create template '{template_info.name}': {e}") - raise e - - def get_template_by_id(self, template_id: str) -> Optional[TemplateInfo]: - """ - 根据ID获取模板 - - Args: - template_id: 模板ID - - Returns: - 模板信息,如果不存在返回None - """ - try: - record = self.get(self.table_name, template_id) - if record: - return self._record_to_template_info(record) - return None - - except Exception as e: - logger.error(f"Failed to get template by ID '{template_id}': {e}") - return None - - def get_template_by_name(self, name: str, user_id: str = None) -> Optional[TemplateInfo]: - """ - 根据名称获取模板 - - Args: - name: 模板名称 - user_id: 用户ID(可选,用于过滤用户模板) - - Returns: - 模板信息,如果不存在返回None - """ - try: - # 获取所有同名模板 - templates = self.find_by_field(self.table_name, "name", name) - - for record in templates: - template_info = self._record_to_template_info(record) - # 如果指定了用户ID,则过滤用户模板 - if user_id is None or template_info.user_id == user_id: - return template_info - - return None - - except Exception as e: - logger.error(f"Failed to get template by name '{name}': {e}") - return None - - def update_template(self, template_id: str, updates: Dict[str, Any]) -> bool: - """ - 更新模板信息 - - Args: - template_id: 模板ID - updates: 要更新的字段 - - Returns: - 更新成功返回True - """ - try: - # 获取现有模板信息 - existing_template = self.get_template_by_id(template_id) - if not existing_template: - logger.warning(f"Template not found: {template_id}") - return False - - # 准备更新数据 - updated_data = existing_template.__dict__.copy() - - # 移除不应该直接更新的字段 - protected_fields = ["id", "created_at"] - for field in protected_fields: - if field in updates: - del updates[field] - - # 应用更新 - updated_data.update(updates) - updated_data["updated_at"] = datetime.now().isoformat() - - # 移除元数据字段,只保留数据字段 - data_fields = {k: v for k, v in updated_data.items() - if k not in ["id", "created_at", "updated_at"]} - - # 执行更新 - result = self.update(self.table_name, template_id, data_fields) - - if result: - logger.info(f"Updated template: {template_id}") - - return result - - except Exception as e: - logger.error(f"Failed to update template '{template_id}': {e}") - return False - - def delete_template(self, template_id: str) -> bool: - """ - 删除模板 - - Args: - template_id: 模板ID - - Returns: - 删除成功返回True - """ - try: - result = self.delete(self.table_name, template_id) - - if result: - logger.info(f"Deleted template: {template_id}") - - return result - - except Exception as e: - logger.error(f"Failed to delete template '{template_id}': {e}") - return False - - def get_templates_by_user(self, user_id: str, include_cloud: bool = True, limit: int = 100) -> List[TemplateInfo]: - """ - 获取用户的模板列表 - - Args: - user_id: 用户ID - include_cloud: 是否包含云端公共模板 - limit: 最大返回数量 - - Returns: - 模板列表 - """ - try: - all_records = self.find_all(self.table_name, limit * 2) # 获取更多记录以便过滤 - - templates = [] - for record in all_records: - if len(templates) >= limit: - break - - template_info = self._record_to_template_info(record) - - # 过滤条件:用户自己的模板 或 云端公共模板 - if (template_info.user_id == user_id or - (include_cloud and template_info.is_cloud)): - templates.append(template_info) - - return templates - - except Exception as e: - logger.error(f"Failed to get templates for user '{user_id}': {e}") - return [] - - def search_templates(self, query: str, user_id: str = None, include_cloud: bool = True, limit: int = 50) -> List[TemplateInfo]: - """ - 搜索模板 - - Args: - query: 搜索关键词(匹配名称、描述、标签) - user_id: 用户ID(可选,用于过滤用户模板) - include_cloud: 是否包含云端公共模板 - limit: 最大返回数量 - - Returns: - 匹配的模板列表 - """ - try: - all_templates = self.get_templates_by_user(user_id or "", include_cloud, limit * 2) - - matching_templates = [] - query_lower = query.lower() - - for template in all_templates: - if len(matching_templates) >= limit: - break - - # 检查名称、描述、标签是否匹配 - name_match = query_lower in template.name.lower() - desc_match = query_lower in template.description.lower() - tags_match = any(query_lower in tag.lower() for tag in template.tags) - - if name_match or desc_match or tags_match: - matching_templates.append(template) - - return matching_templates - - except Exception as e: - logger.error(f"Failed to search templates with query '{query}': {e}") - return [] - - def get_templates_by_tag(self, tag: str, user_id: str = None, include_cloud: bool = True, limit: int = 50) -> List[TemplateInfo]: - """ - 根据标签获取模板 - - Args: - tag: 标签名称 - user_id: 用户ID(可选) - include_cloud: 是否包含云端公共模板 - limit: 最大返回数量 - - Returns: - 匹配的模板列表 - """ - try: - all_templates = self.get_templates_by_user(user_id or "", include_cloud, limit * 2) - - matching_templates = [] - tag_lower = tag.lower() - - for template in all_templates: - if len(matching_templates) >= limit: - break - - # 检查标签是否匹配 - if any(tag_lower == t.lower() for t in template.tags): - matching_templates.append(template) - - return matching_templates - - except Exception as e: - logger.error(f"Failed to get templates by tag '{tag}': {e}") - return [] - - def get_cloud_templates(self, limit: int = 100) -> List[TemplateInfo]: - """ - 获取云端公共模板 - - Args: - limit: 最大返回数量 - - Returns: - 云端模板列表 - """ - try: - cloud_templates = self.find_by_field(self.table_name, "is_cloud", True) - - templates = [] - for record in cloud_templates[:limit]: - template_info = self._record_to_template_info(record) - templates.append(template_info) - - return templates - - except Exception as e: - logger.error(f"Failed to get cloud templates: {e}") - return [] - - def get_template_count(self, user_id: str = None, include_cloud: bool = True) -> int: - """ - 获取模板数量 - - Args: - user_id: 用户ID(可选) - include_cloud: 是否包含云端公共模板 - - Returns: - 模板数量 - """ - try: - if user_id is None: - return self.count(self.table_name) - else: - templates = self.get_templates_by_user(user_id, include_cloud) - return len(templates) - - except Exception as e: - logger.error(f"Failed to get template count: {e}") - return 0 - - def batch_import_templates(self, templates: List[TemplateInfo]) -> Dict[str, Any]: - """ - 批量导入模板 - - Args: - templates: 模板列表 - - Returns: - 导入结果统计 - """ - try: - success_count = 0 - failed_count = 0 - failed_templates = [] - - for template in templates: - try: - # 检查是否已存在(根据unique id属性) - existing = self.get_template_by_id(template.id) - if existing: - logger.warning(f"Template already exists, skipping: {template.name}") - continue - - # 创建模板 - self.create_template(template) - success_count += 1 - - except Exception as e: - logger.error(f"Failed to import template '{template.name}': {e}") - failed_count += 1 - failed_templates.append({ - "name": template.name, - "error": str(e) - }) - - result = { - "total": len(templates), - "success": success_count, - "failed": failed_count, - "failed_templates": failed_templates - } - - logger.info(f"Batch import completed: {success_count} success, {failed_count} failed") - return result - - except Exception as e: - logger.error(f"Failed to batch import templates: {e}") - return { - "total": len(templates), - "success": 0, - "failed": len(templates), - "error": str(e) - } - - def get_popular_tags(self, user_id: str = None, limit: int = 20) -> List[Dict[str, Any]]: - """ - 获取热门标签 - - Args: - user_id: 用户ID(可选) - limit: 最大返回数量 - - Returns: - 标签列表,包含标签名称和使用次数 - """ - try: - templates = self.get_templates_by_user(user_id or "", include_cloud=True) - - # 统计标签使用次数 - tag_counts = {} - for template in templates: - for tag in template.tags: - tag_counts[tag] = tag_counts.get(tag, 0) + 1 - - # 按使用次数排序 - sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True) - - # 返回前N个标签 - popular_tags = [] - for tag, count in sorted_tags[:limit]: - popular_tags.append({ - "tag": tag, - "count": count - }) - - return popular_tags - - except Exception as e: - logger.error(f"Failed to get popular tags: {e}") - return [] - - # 辅助方法 - def _record_to_template_info(self, record: Dict[str, Any]) -> TemplateInfo: - """将数据库记录转换为TemplateInfo对象""" - data = record["data"] - return TemplateInfo( - id=record["id"], - name=data["name"], - description=data["description"], - thumbnail_path=data["thumbnail_path"], - draft_content_path=data["draft_content_path"], - resources_path=data["resources_path"], - canvas_config=data["canvas_config"], - duration=data["duration"], - material_count=data["material_count"], - track_count=data["track_count"], - tags=data["tags"], - is_cloud=data["is_cloud"], - user_id=data["user_id"], - created_at=data.get("created_at", record.get("created_at", "")), - updated_at=data.get("updated_at", record.get("updated_at", "")) - ) - - def _template_info_to_dict(self, template_info: TemplateInfo) -> Dict[str, Any]: - """将TemplateInfo对象转换为字典""" - return { - "id": template_info.id, - "name": template_info.name, - "description": template_info.description, - "thumbnail_path": template_info.thumbnail_path, - "draft_content_path": template_info.draft_content_path, - "resources_path": template_info.resources_path, - "canvas_config": template_info.canvas_config, - "duration": template_info.duration, - "material_count": template_info.material_count, - "track_count": template_info.track_count, - "tags": template_info.tags, - "is_cloud": template_info.is_cloud, - "user_id": template_info.user_id, - "created_at": template_info.created_at, - "updated_at": template_info.updated_at - } - - -# 创建全局模板表实例 -template_table = TemplateTable() \ No newline at end of file diff --git a/python_core/database/user.py b/python_core/database/user.py deleted file mode 100644 index 4bbbed5..0000000 --- a/python_core/database/user.py +++ /dev/null @@ -1,479 +0,0 @@ -# 用户表 - -import hashlib -import uuid -from typing import Dict, List, Any, Optional -from datetime import datetime -from .db import Db -from python_core.utils.logger import setup_logger - -logger = setup_logger(__name__) - -class UserTable(Db): - """ - 用户表类 - 基于Db类实现的用户管理功能 - """ - - def __init__(self): - # 使用固定的数据库键 - super().__init__("mixvideo") - self.table_name = "users" - - # 初始化用户表 - self._init_user_table() - - def _init_user_table(self): - """初始化用户表""" - try: - # 检查用户表是否存在,不存在则创建 - if not self._table_exists(self.table_name): - schema = { - "username": "string", - "email": "string", - "password_hash": "string", - "display_name": "string", - "avatar_url": "string", - "is_active": "boolean", - "last_login": "datetime", - "created_at": "datetime", - "updated_at": "datetime" - } - - self.create_table(self.table_name, schema) - logger.info("User table initialized") - else: - logger.info("User table already exists") - - except Exception as e: - logger.error(f"Failed to initialize user table: {e}") - raise e - - def _hash_password(self, password: str) -> str: - """密码哈希""" - return hashlib.sha256(password.encode()).hexdigest() - - def _verify_password(self, password: str, password_hash: str) -> bool: - """验证密码""" - return self._hash_password(password) == password_hash - - def create_user(self, username: str, email: str, password: str, display_name: str = None) -> str: - """ - 创建用户 - - Args: - username: 用户名 - email: 邮箱 - password: 密码 - display_name: 显示名称 - - Returns: - 用户ID - """ - try: - # 检查用户名是否已存在 - if self.get_user_by_username(username): - raise ValueError(f"Username '{username}' already exists") - - # 检查邮箱是否已存在 - if self.get_user_by_email(email): - raise ValueError(f"Email '{email}' already exists") - - # 创建用户数据 - user_data = { - "username": username, - "email": email, - "password_hash": self._hash_password(password), - "display_name": display_name or username, - "avatar_url": "", - "is_active": True, - "last_login": None, - "created_at": datetime.now().isoformat(), - "updated_at": datetime.now().isoformat() - } - - # 插入用户记录 - user_id = self.insert(self.table_name, user_data) - - logger.info(f"Created user: {username} (ID: {user_id})") - return user_id - - except Exception as e: - logger.error(f"Failed to create user '{username}': {e}") - raise e - - def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: - """ - 根据ID获取用户 - - Args: - user_id: 用户ID - - Returns: - 用户信息,如果不存在返回None - """ - try: - record = self.get(self.table_name, user_id) - if record: - return { - "id": record["id"], - **record["data"], - "created_at": record.get("created_at"), - "updated_at": record.get("updated_at") - } - return None - - except Exception as e: - logger.error(f"Failed to get user by ID '{user_id}': {e}") - return None - - def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]: - """ - 根据用户名获取用户 - - Args: - username: 用户名 - - Returns: - 用户信息,如果不存在返回None - """ - try: - users = self.find_by_field(self.table_name, "username", username) - if users: - record = users[0] - return { - "id": record["id"], - **record["data"], - "created_at": record.get("created_at"), - "updated_at": record.get("updated_at") - } - return None - - except Exception as e: - logger.error(f"Failed to get user by username '{username}': {e}") - return None - - def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]: - """ - 根据邮箱获取用户 - - Args: - email: 邮箱 - - Returns: - 用户信息,如果不存在返回None - """ - try: - users = self.find_by_field(self.table_name, "email", email) - if users: - record = users[0] - return { - "id": record["id"], - **record["data"], - "created_at": record.get("created_at"), - "updated_at": record.get("updated_at") - } - return None - - except Exception as e: - logger.error(f"Failed to get user by email '{email}': {e}") - return None - - def authenticate_user(self, username_or_email: str, password: str) -> Optional[Dict[str, Any]]: - """ - 用户认证 - - Args: - username_or_email: 用户名或邮箱 - password: 密码 - - Returns: - 认证成功返回用户信息,失败返回None - """ - try: - # 尝试按用户名查找 - user = self.get_user_by_username(username_or_email) - - # 如果按用户名找不到,尝试按邮箱查找 - if not user: - user = self.get_user_by_email(username_or_email) - - # 验证用户存在且密码正确 - if user and self._verify_password(password, user["password_hash"]): - # 更新最后登录时间 - self.update_last_login(user["id"]) - - # 返回用户信息(不包含密码哈希) - user_info = user.copy() - del user_info["password_hash"] - - logger.info(f"User authenticated: {user['username']}") - return user_info - - logger.warning(f"Authentication failed for: {username_or_email}") - return None - - except Exception as e: - logger.error(f"Authentication error for '{username_or_email}': {e}") - return None - - def update_user(self, user_id: str, updates: Dict[str, Any]) -> bool: - """ - 更新用户信息 - - Args: - user_id: 用户ID - updates: 要更新的字段 - - Returns: - 更新成功返回True - """ - try: - # 获取现有用户信息 - existing_user = self.get_user_by_id(user_id) - if not existing_user: - logger.warning(f"User not found: {user_id}") - return False - - # 准备更新数据 - updated_data = existing_user.copy() - - # 移除不应该直接更新的字段 - protected_fields = ["id", "created_at", "password_hash"] - for field in protected_fields: - if field in updates: - del updates[field] - - # 应用更新 - updated_data.update(updates) - updated_data["updated_at"] = datetime.now().isoformat() - - # 移除元数据字段,只保留数据字段 - data_fields = {k: v for k, v in updated_data.items() - if k not in ["id", "created_at", "updated_at"]} - - # 执行更新 - result = self.update(self.table_name, user_id, data_fields) - - if result: - logger.info(f"Updated user: {user_id}") - - return result - - except Exception as e: - logger.error(f"Failed to update user '{user_id}': {e}") - return False - - def update_password(self, user_id: str, new_password: str) -> bool: - """ - 更新用户密码 - - Args: - user_id: 用户ID - new_password: 新密码 - - Returns: - 更新成功返回True - """ - try: - # 获取现有用户信息 - existing_user = self.get_user_by_id(user_id) - if not existing_user: - logger.warning(f"User not found: {user_id}") - return False - - # 更新密码哈希 - updated_data = existing_user.copy() - updated_data["password_hash"] = self._hash_password(new_password) - updated_data["updated_at"] = datetime.now().isoformat() - - # 移除元数据字段 - data_fields = {k: v for k, v in updated_data.items() - if k not in ["id", "created_at", "updated_at"]} - - # 执行更新 - result = self.update(self.table_name, user_id, data_fields) - - if result: - logger.info(f"Password updated for user: {user_id}") - - return result - - except Exception as e: - logger.error(f"Failed to update password for user '{user_id}': {e}") - return False - - def update_last_login(self, user_id: str) -> bool: - """ - 更新最后登录时间 - - Args: - user_id: 用户ID - - Returns: - 更新成功返回True - """ - try: - return self.update_user(user_id, { - "last_login": datetime.now().isoformat() - }) - - except Exception as e: - logger.error(f"Failed to update last login for user '{user_id}': {e}") - return False - - def deactivate_user(self, user_id: str) -> bool: - """ - 停用用户 - - Args: - user_id: 用户ID - - Returns: - 操作成功返回True - """ - try: - return self.update_user(user_id, {"is_active": False}) - - except Exception as e: - logger.error(f"Failed to deactivate user '{user_id}': {e}") - return False - - def activate_user(self, user_id: str) -> bool: - """ - 激活用户 - - Args: - user_id: 用户ID - - Returns: - 操作成功返回True - """ - try: - return self.update_user(user_id, {"is_active": True}) - - except Exception as e: - logger.error(f"Failed to activate user '{user_id}': {e}") - return False - - def delete_user(self, user_id: str) -> bool: - """ - 删除用户 - - Args: - user_id: 用户ID - - Returns: - 删除成功返回True - """ - try: - result = self.delete(self.table_name, user_id) - - if result: - logger.info(f"Deleted user: {user_id}") - - return result - - except Exception as e: - logger.error(f"Failed to delete user '{user_id}': {e}") - return False - - def get_all_users(self, include_inactive: bool = False, limit: int = 100) -> List[Dict[str, Any]]: - """ - 获取所有用户 - - Args: - include_inactive: 是否包含非活跃用户 - limit: 最大返回数量 - - Returns: - 用户列表 - """ - try: - all_records = self.find_all(self.table_name, limit) - - users = [] - for record in all_records: - user_info = { - "id": record["id"], - **record["data"], - "created_at": record.get("created_at"), - "updated_at": record.get("updated_at") - } - - # 移除密码哈希 - if "password_hash" in user_info: - del user_info["password_hash"] - - # 根据活跃状态过滤 - if include_inactive or user_info.get("is_active", True): - users.append(user_info) - - return users - - except Exception as e: - logger.error(f"Failed to get all users: {e}") - return [] - - def get_user_count(self, include_inactive: bool = False) -> int: - """ - 获取用户数量 - - Args: - include_inactive: 是否包含非活跃用户 - - Returns: - 用户数量 - """ - try: - if include_inactive: - return self.count(self.table_name) - else: - # 获取活跃用户数量(简单实现,可优化) - active_users = self.get_all_users(include_inactive=False) - return len(active_users) - - except Exception as e: - logger.error(f"Failed to get user count: {e}") - return 0 - - def search_users(self, query: str, limit: int = 50) -> List[Dict[str, Any]]: - """ - 搜索用户 - - Args: - query: 搜索关键词(匹配用户名、邮箱、显示名称) - limit: 最大返回数量 - - Returns: - 匹配的用户列表 - """ - try: - all_users = self.get_all_users(include_inactive=True, limit=limit * 2) - - matching_users = [] - query_lower = query.lower() - - for user in all_users: - if len(matching_users) >= limit: - break - - # 检查用户名、邮箱、显示名称是否匹配 - username = user.get("username", "").lower() - email = user.get("email", "").lower() - display_name = user.get("display_name", "").lower() - - if (query_lower in username or - query_lower in email or - query_lower in display_name): - matching_users.append(user) - - return matching_users - - except Exception as e: - logger.error(f"Failed to search users with query '{query}': {e}") - return [] - - -# 创建全局用户表实例 -user_table = UserTable() \ No newline at end of file diff --git a/src-tauri/src/commands/template.rs b/src-tauri/src/commands/template.rs index c976149..329cbb6 100644 --- a/src-tauri/src/commands/template.rs +++ b/src-tauri/src/commands/template.rs @@ -341,6 +341,30 @@ pub async fn search_templates_cli( execute_python_cli_command(app, args).await } +/// 获取模板详情(新版本,使用CLI模式) +#[tauri::command] +pub async fn get_template_detail_cli( + app: AppHandle, + request: TemplateGetRequest, +) -> Result { + let mut args = vec!["template".to_string(), "detail".to_string(), request.template_id]; + + if let Some(user_id) = request.user_id { + args.push("--user-id".to_string()); + args.push(user_id); + } + + if request.verbose.unwrap_or(false) { + args.push("--verbose".to_string()); + } + + if request.json_output.unwrap_or(true) { + args.push("--json".to_string()); + } + + execute_python_cli_command(app, args).await +} + /// 获取模板详情(保持向后兼容) #[tauri::command] pub async fn get_template_detail(app: tauri::AppHandle, template_id: String) -> Result {