From 3f5af3404bec33bbd5b44cb2f2cf35266b660f6a Mon Sep 17 00:00:00 2001 From: root Date: Sat, 12 Jul 2025 22:19:39 +0800 Subject: [PATCH] fix: bug --- examples/test_batch_workflow.py | 226 ------------------------------ examples/test_video_validation.py | 154 -------------------- examples/user_postgres_example.py | 69 --------- src-tauri/src/lib.rs | 1 + src/services/TemplateServiceV2.ts | 19 ++- 5 files changed, 13 insertions(+), 456 deletions(-) delete mode 100644 examples/test_batch_workflow.py delete mode 100644 examples/test_video_validation.py delete mode 100644 examples/user_postgres_example.py diff --git a/examples/test_batch_workflow.py b/examples/test_batch_workflow.py deleted file mode 100644 index 503258c..0000000 --- a/examples/test_batch_workflow.py +++ /dev/null @@ -1,226 +0,0 @@ -#!/usr/bin/env python3 -""" -Test Batch Scene Detection and Splitting Workflow -测试批量场景检测和切分工作流 -""" - -import sys -import json -import tempfile -from pathlib import Path - -# 添加项目根目录到Python路径 -project_root = Path(__file__).parent.parent -sys.path.insert(0, str(project_root)) - -from python_core.scene_detection import SceneDetector, DetectorType, OutputFormat - - -def test_batch_workflow(): - """测试批量工作流""" - print("🚀 测试批量场景检测和切分工作流") - print("=" * 60) - - try: - # 创建检测器 - detector = SceneDetector() - - # 准备测试视频 - test_videos = [ - Path("assets/1/1752032011698.mp4") - ] - - # 检查测试视频是否存在 - existing_videos = [v for v in test_videos if v.exists()] - if not existing_videos: - print("❌ 没有找到测试视频文件") - print("💡 请确保 assets/1/1752032011698.mp4 文件存在") - return False - - print(f"📹 找到 {len(existing_videos)} 个测试视频:") - for video in existing_videos: - print(f" • {video}") - - # 创建临时输出目录 - with tempfile.TemporaryDirectory() as temp_dir: - output_dir = Path(temp_dir) / "batch_output" - - print(f"\n📂 输出目录: {output_dir}") - - # 执行批量处理 - print("\n🔄 开始批量处理...") - result = detector.batch_detect_and_split( - video_paths=existing_videos, - output_base_dir=output_dir, - detector_type=DetectorType.CONTENT, - threshold=30.0, - min_scene_length=1.0, - output_format=OutputFormat.JSON, - enable_ai_analysis=False, - enable_video_splitting=True, - max_concurrent=1, # 使用单线程避免资源竞争 - continue_on_error=True - ) - - # 检查结果 - if result.get("workflow_state") == "completed": - print("✅ 批量处理完成!") - - # 显示摘要 - batch_results = result.get("batch_results", {}) - print(f"\n📊 处理摘要:") - print(f" 总视频数: {batch_results.get('total_videos', 0)}") - print(f" 成功处理: {batch_results.get('completed_videos', 0)}") - print(f" 处理失败: {batch_results.get('failed_videos', 0)}") - print(f" 成功率: {batch_results.get('success_rate', 0):.1f}%") - - # 显示每个任务的详细信息 - tasks = batch_results.get('tasks', []) - print(f"\n📋 任务详情:") - for i, task in enumerate(tasks, 1): - video_name = Path(task['video_path']).name - status = task['status'] - scenes = task.get('total_scenes', 0) - splits = task.get('split_count', 0) - proc_time = task.get('processing_time', 0) - - print(f" 任务 {i}: {video_name}") - print(f" 状态: {status}") - print(f" 场景数: {scenes}") - print(f" 切分数: {splits}") - print(f" 处理时间: {proc_time:.2f}s") - - if task.get('error'): - print(f" 错误: {task['error']}") - - # 检查输出文件 - if task.get('output_dir'): - output_path = Path(task['output_dir']) - if output_path.exists(): - print(f" 输出目录: {output_path}") - - # 检查场景检测结果文件 - scenes_file = output_path / "scenes.json" - if scenes_file.exists(): - print(f" ✅ 场景检测结果: {scenes_file}") - - # 检查切分目录 - scenes_dir = output_path / "scenes" - if scenes_dir.exists(): - split_files = list(scenes_dir.glob("*.mp4")) - print(f" ✅ 切分文件: {len(split_files)} 个") - for split_file in split_files[:3]: # 只显示前3个 - size_mb = split_file.stat().st_size / (1024 * 1024) - print(f" • {split_file.name} ({size_mb:.1f}MB)") - if len(split_files) > 3: - print(f" ... 还有 {len(split_files) - 3} 个文件") - - # 检查切分摘要 - summary_file = output_path / "split_summary.json" - if summary_file.exists(): - print(f" ✅ 切分摘要: {summary_file}") - try: - with open(summary_file, 'r', encoding='utf-8') as f: - summary_data = json.load(f) - print(f" 成功切分: {summary_data.get('successful_splits', 0)}") - print(f" 失败切分: {summary_data.get('failed_splits', 0)}") - print(f" 总输出大小: {summary_data.get('total_output_size', 0):,} bytes") - except Exception as e: - print(f" ⚠️ 读取摘要失败: {e}") - - return True - else: - print("❌ 批量处理失败") - errors = result.get("errors", []) - if errors: - print("错误信息:") - for error in errors: - print(f" • {error}") - return False - - except Exception as e: - print(f"❌ 测试异常: {e}") - import traceback - print(f"详细错误: {traceback.format_exc()}") - return False - - -def test_cli_batch_command(): - """测试CLI批量命令""" - print("\n🧪 测试CLI批量命令") - print("=" * 60) - - try: - # 检查CLI帮助 - import subprocess - result = subprocess.run( - ['python3', '-m', 'python_core.cli', 'scene', 'batch', '--help'], - capture_output=True, - text=True, - cwd=project_root - ) - - if result.returncode == 0: - print("✅ CLI批量命令帮助正常") - print("📋 命令帮助预览:") - help_lines = result.stdout.split('\n')[:10] # 只显示前10行 - for line in help_lines: - if line.strip(): - print(f" {line}") - if len(result.stdout.split('\n')) > 10: - print(" ...") - return True - else: - print("❌ CLI批量命令帮助失败") - print(f"错误: {result.stderr}") - return False - - except Exception as e: - print(f"❌ CLI测试异常: {e}") - return False - - -def main(): - """主函数""" - print("🚀 开始测试批量场景检测和切分工作流") - print("=" * 80) - - tests = [ - ("批量工作流功能", test_batch_workflow), - ("CLI批量命令", test_cli_batch_command), - ] - - passed = 0 - total = len(tests) - - for test_name, test_func in tests: - try: - if test_func(): - passed += 1 - print(f"✅ {test_name} - 通过") - else: - print(f"❌ {test_name} - 失败") - except Exception as e: - print(f"❌ {test_name} - 异常: {e}") - - print("\n" + "=" * 80) - print(f"🎉 测试完成: {passed}/{total} 通过") - - if passed == total: - print("🎊 所有测试都通过了!批量工作流开发成功!") - print("\n💡 使用方法:") - print(" # 批量处理目录中的所有视频") - print(" python3 -m python_core.cli scene batch input_dir output_dir") - print(" ") - print(" # 自定义参数") - print(" python3 -m python_core.cli scene batch input_dir output_dir \\") - print(" --threshold 15.0 --concurrent 4 --no-ai --split") - else: - print("⚠️ 部分测试失败,需要进一步调试") - - return passed == total - - -if __name__ == "__main__": - success = main() - sys.exit(0 if success else 1) diff --git a/examples/test_video_validation.py b/examples/test_video_validation.py deleted file mode 100644 index 7a59b3c..0000000 --- a/examples/test_video_validation.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python3 -""" -Video Validation Tool -视频验证工具 - -验证切分后的视频文件是否有效 -""" - -import sys -import subprocess -from pathlib import Path - - -def validate_video_file(video_path: Path) -> dict: - """验证单个视频文件""" - try: - # 使用ffprobe获取视频信息 - cmd = [ - 'ffprobe', - '-v', 'quiet', - '-print_format', 'json', - '-show_format', - '-show_streams', - str(video_path) - ] - - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=30 - ) - - if result.returncode == 0: - import json - data = json.loads(result.stdout) - - # 提取基本信息 - format_info = data.get('format', {}) - streams = data.get('streams', []) - - video_stream = None - audio_stream = None - - for stream in streams: - if stream.get('codec_type') == 'video': - video_stream = stream - elif stream.get('codec_type') == 'audio': - audio_stream = stream - - return { - 'valid': True, - 'file_size': int(format_info.get('size', 0)), - 'duration': float(format_info.get('duration', 0)), - 'bitrate': int(format_info.get('bit_rate', 0)), - 'video_codec': video_stream.get('codec_name') if video_stream else None, - 'video_resolution': f"{video_stream.get('width')}x{video_stream.get('height')}" if video_stream else None, - 'video_fps': eval(video_stream.get('r_frame_rate', '0/1')) if video_stream else 0, - 'audio_codec': audio_stream.get('codec_name') if audio_stream else None, - 'audio_sample_rate': int(audio_stream.get('sample_rate', 0)) if audio_stream else 0, - 'error': None - } - else: - return { - 'valid': False, - 'error': f"ffprobe failed: {result.stderr}" - } - - except Exception as e: - return { - 'valid': False, - 'error': str(e) - } - - -def validate_split_videos(scenes_dir: Path): - """验证切分目录中的所有视频""" - print(f"🔍 验证目录: {scenes_dir}") - - if not scenes_dir.exists(): - print(f"❌ 目录不存在: {scenes_dir}") - return False - - # 查找所有mp4文件 - video_files = list(scenes_dir.glob("*.mp4")) - - if not video_files: - print(f"❌ 目录中没有找到mp4文件") - return False - - print(f"📹 找到 {len(video_files)} 个视频文件") - - valid_count = 0 - total_size = 0 - total_duration = 0 - - for video_file in sorted(video_files): - print(f"\n🎬 验证: {video_file.name}") - - validation = validate_video_file(video_file) - - if validation['valid']: - valid_count += 1 - file_size = validation['file_size'] - duration = validation['duration'] - total_size += file_size - total_duration += duration - - print(f" ✅ 有效") - print(f" 📏 大小: {file_size:,} bytes ({file_size/1024/1024:.1f} MB)") - print(f" ⏱️ 时长: {duration:.2f}秒") - print(f" 🎥 视频: {validation['video_codec']} {validation['video_resolution']} {validation['video_fps']:.1f}fps") - if validation['audio_codec']: - print(f" 🔊 音频: {validation['audio_codec']} {validation['audio_sample_rate']}Hz") - print(f" 📊 码率: {validation['bitrate']:,} bps") - else: - print(f" ❌ 无效: {validation['error']}") - - print(f"\n📊 验证摘要:") - print(f" 有效文件: {valid_count}/{len(video_files)}") - print(f" 成功率: {valid_count/len(video_files)*100:.1f}%") - print(f" 总大小: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)") - print(f" 总时长: {total_duration:.2f}秒") - - return valid_count == len(video_files) - - -def main(): - """主函数""" - print("🚀 视频验证工具") - print("=" * 50) - - # 验证最新的切分结果 - test_scenes_dir = Path("/tmp/test_batch_output/1752032011698/scenes") - - if test_scenes_dir.exists(): - print("🎯 验证最新的批量切分结果") - success = validate_split_videos(test_scenes_dir) - - if success: - print("\n🎉 所有视频文件都有效!切分成功!") - return 0 - else: - print("\n⚠️ 部分视频文件无效") - return 1 - else: - print("❌ 没有找到测试切分结果") - print("💡 请先运行批量切分命令:") - print(" python3 -m python_core.cli scene batch /tmp/test_batch_input /tmp/test_batch_output --verbose --no-ai") - return 1 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/examples/user_postgres_example.py b/examples/user_postgres_example.py deleted file mode 100644 index fcfc38e..0000000 --- a/examples/user_postgres_example.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -""" -UserTablePostgres 使用示例 -""" - -import sys -import os -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from python_core.database.user_postgres import UserTablePostgres - -def main(): - """主函数""" - print("=== UserTablePostgres 使用示例 ===\n") - - # 创建用户表实例 - user_table = UserTablePostgres() - print("✅ 用户表初始化成功") - - # 创建用户 - print("\n1. 创建用户...") - user_id = user_table.create_user( - username="john_doe", - email="john@example.com", - password="secure_password_123", - display_name="John Doe" - ) - print(f"用户创建成功,ID: {user_id}") - - # 获取用户信息 - print("\n2. 获取用户信息...") - user = user_table.get_user_by_username("john_doe") - print(f"用户信息: {user['display_name']} ({user['email']})") - - # 用户认证 - print("\n3. 用户认证...") - auth_result = user_table.authenticate_user("john_doe", "secure_password_123") - if auth_result: - print(f"认证成功: {auth_result['username']}") - else: - print("认证失败") - - # 更新用户信息 - print("\n4. 更新用户信息...") - user_table.update_user(user_id, { - "display_name": "John Smith", - "avatar_url": "https://example.com/avatar.jpg" - }) - print("用户信息更新成功") - - # 获取用户统计 - print("\n5. 用户统计...") - user_count = user_table.get_user_count() - print(f"总用户数: {user_count}") - - # 搜索用户 - print("\n6. 搜索用户...") - search_results = user_table.search_users("john") - print(f"搜索到 {len(search_results)} 个用户") - - # 清理:删除测试用户 - print("\n7. 清理测试数据...") - user_table.delete_user(user_id) - print("测试用户已删除") - - print("\n✅ 示例完成") - -if __name__ == "__main__": - main() diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index b128c18..d8696e1 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -43,6 +43,7 @@ pub fn run() { commands::get_templates, commands::get_template, commands::get_template_detail, + commands::get_template_detail_cli, commands::delete_template, commands::get_all_resource_categories, commands::get_resource_category_by_id, diff --git a/src/services/TemplateServiceV2.ts b/src/services/TemplateServiceV2.ts index 245ecdb..593e9e6 100644 --- a/src/services/TemplateServiceV2.ts +++ b/src/services/TemplateServiceV2.ts @@ -229,19 +229,24 @@ export class TemplateServiceV2 { /** * 获取模板详细信息 (新版本) - * 注意:目前使用原版本的get_template_detail命令,因为CLI版本还未实现详细信息功能 */ - static async getTemplateDetail(templateId: string): Promise { + static async getTemplateDetail(templateId: string, userId?: string): Promise { try { - // 暂时使用原版本的命令,直到CLI版本实现详细信息功能 - const response = await invoke('get_template_detail', { template_id: templateId }) - const detail = JSON.parse(response) + const request: TemplateGetRequest = { + template_id: templateId, + user_id: userId || this.getCurrentUserId(), + verbose: false, + json_output: true + } - if (!detail) { + const response = await invoke('get_template_detail_cli', { request }) + + if (!response.success) { + console.error('Get template detail failed:', response.error || response.message) return null } - return detail as TemplateDetail + return response.data as TemplateDetail } catch (error) { console.error('Get template detail failed:', error) return null