#!/usr/bin/env python3 """ Test Batch Scene Detection and Splitting Workflow 测试批量场景检测和切分工作流 """ import sys import json import tempfile from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from python_core.scene_detection import SceneDetector, DetectorType, OutputFormat def test_batch_workflow(): """测试批量工作流""" print("🚀 测试批量场景检测和切分工作流") print("=" * 60) try: # 创建检测器 detector = SceneDetector() # 准备测试视频 test_videos = [ Path("assets/1/1752032011698.mp4") ] # 检查测试视频是否存在 existing_videos = [v for v in test_videos if v.exists()] if not existing_videos: print("❌ 没有找到测试视频文件") print("💡 请确保 assets/1/1752032011698.mp4 文件存在") return False print(f"📹 找到 {len(existing_videos)} 个测试视频:") for video in existing_videos: print(f" • {video}") # 创建临时输出目录 with tempfile.TemporaryDirectory() as temp_dir: output_dir = Path(temp_dir) / "batch_output" print(f"\n📂 输出目录: {output_dir}") # 执行批量处理 print("\n🔄 开始批量处理...") result = detector.batch_detect_and_split( video_paths=existing_videos, output_base_dir=output_dir, detector_type=DetectorType.CONTENT, threshold=30.0, min_scene_length=1.0, output_format=OutputFormat.JSON, enable_ai_analysis=False, enable_video_splitting=True, max_concurrent=1, # 使用单线程避免资源竞争 continue_on_error=True ) # 检查结果 if result.get("workflow_state") == "completed": print("✅ 批量处理完成!") # 显示摘要 batch_results = result.get("batch_results", {}) print(f"\n📊 处理摘要:") print(f" 总视频数: {batch_results.get('total_videos', 0)}") print(f" 成功处理: {batch_results.get('completed_videos', 0)}") print(f" 处理失败: {batch_results.get('failed_videos', 0)}") print(f" 成功率: {batch_results.get('success_rate', 0):.1f}%") # 显示每个任务的详细信息 tasks = batch_results.get('tasks', []) print(f"\n📋 任务详情:") for i, task in enumerate(tasks, 1): video_name = Path(task['video_path']).name status = task['status'] scenes = task.get('total_scenes', 0) splits = task.get('split_count', 0) proc_time = task.get('processing_time', 0) print(f" 任务 {i}: {video_name}") print(f" 状态: {status}") print(f" 场景数: {scenes}") print(f" 切分数: {splits}") print(f" 处理时间: {proc_time:.2f}s") if task.get('error'): print(f" 错误: {task['error']}") # 检查输出文件 if task.get('output_dir'): output_path = Path(task['output_dir']) if output_path.exists(): print(f" 输出目录: {output_path}") # 检查场景检测结果文件 scenes_file = output_path / "scenes.json" if scenes_file.exists(): print(f" ✅ 场景检测结果: {scenes_file}") # 检查切分目录 scenes_dir = output_path / "scenes" if scenes_dir.exists(): split_files = list(scenes_dir.glob("*.mp4")) print(f" ✅ 切分文件: {len(split_files)} 个") for split_file in split_files[:3]: # 只显示前3个 size_mb = split_file.stat().st_size / (1024 * 1024) print(f" • {split_file.name} ({size_mb:.1f}MB)") if len(split_files) > 3: print(f" ... 还有 {len(split_files) - 3} 个文件") # 检查切分摘要 summary_file = output_path / "split_summary.json" if summary_file.exists(): print(f" ✅ 切分摘要: {summary_file}") try: with open(summary_file, 'r', encoding='utf-8') as f: summary_data = json.load(f) print(f" 成功切分: {summary_data.get('successful_splits', 0)}") print(f" 失败切分: {summary_data.get('failed_splits', 0)}") print(f" 总输出大小: {summary_data.get('total_output_size', 0):,} bytes") except Exception as e: print(f" ⚠️ 读取摘要失败: {e}") return True else: print("❌ 批量处理失败") errors = result.get("errors", []) if errors: print("错误信息:") for error in errors: print(f" • {error}") return False except Exception as e: print(f"❌ 测试异常: {e}") import traceback print(f"详细错误: {traceback.format_exc()}") return False def test_cli_batch_command(): """测试CLI批量命令""" print("\n🧪 测试CLI批量命令") print("=" * 60) try: # 检查CLI帮助 import subprocess result = subprocess.run( ['python3', '-m', 'python_core.cli', 'scene', 'batch', '--help'], capture_output=True, text=True, cwd=project_root ) if result.returncode == 0: print("✅ CLI批量命令帮助正常") print("📋 命令帮助预览:") help_lines = result.stdout.split('\n')[:10] # 只显示前10行 for line in help_lines: if line.strip(): print(f" {line}") if len(result.stdout.split('\n')) > 10: print(" ...") return True else: print("❌ CLI批量命令帮助失败") print(f"错误: {result.stderr}") return False except Exception as e: print(f"❌ CLI测试异常: {e}") return False def main(): """主函数""" print("🚀 开始测试批量场景检测和切分工作流") print("=" * 80) tests = [ ("批量工作流功能", test_batch_workflow), ("CLI批量命令", test_cli_batch_command), ] passed = 0 total = len(tests) for test_name, test_func in tests: try: if test_func(): passed += 1 print(f"✅ {test_name} - 通过") else: print(f"❌ {test_name} - 失败") except Exception as e: print(f"❌ {test_name} - 异常: {e}") print("\n" + "=" * 80) print(f"🎉 测试完成: {passed}/{total} 通过") if passed == total: print("🎊 所有测试都通过了!批量工作流开发成功!") print("\n💡 使用方法:") print(" # 批量处理目录中的所有视频") print(" python3 -m python_core.cli scene batch input_dir output_dir") print(" ") print(" # 自定义参数") print(" python3 -m python_core.cli scene batch input_dir output_dir \\") print(" --threshold 15.0 --concurrent 4 --no-ai --split") else: print("⚠️ 部分测试失败,需要进一步调试") return passed == total if __name__ == "__main__": success = main() sys.exit(0 if success else 1)