mxivideo/examples/test_batch_workflow.py

227 lines
8.8 KiB
Python

#!/usr/bin/env python3
"""
Test Batch Scene Detection and Splitting Workflow
测试批量场景检测和切分工作流
"""
import sys
import json
import tempfile
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from python_core.scene_detection import SceneDetector, DetectorType, OutputFormat
def test_batch_workflow():
"""测试批量工作流"""
print("🚀 测试批量场景检测和切分工作流")
print("=" * 60)
try:
# 创建检测器
detector = SceneDetector()
# 准备测试视频
test_videos = [
Path("assets/1/1752032011698.mp4")
]
# 检查测试视频是否存在
existing_videos = [v for v in test_videos if v.exists()]
if not existing_videos:
print("❌ 没有找到测试视频文件")
print("💡 请确保 assets/1/1752032011698.mp4 文件存在")
return False
print(f"📹 找到 {len(existing_videos)} 个测试视频:")
for video in existing_videos:
print(f"{video}")
# 创建临时输出目录
with tempfile.TemporaryDirectory() as temp_dir:
output_dir = Path(temp_dir) / "batch_output"
print(f"\n📂 输出目录: {output_dir}")
# 执行批量处理
print("\n🔄 开始批量处理...")
result = detector.batch_detect_and_split(
video_paths=existing_videos,
output_base_dir=output_dir,
detector_type=DetectorType.CONTENT,
threshold=30.0,
min_scene_length=1.0,
output_format=OutputFormat.JSON,
enable_ai_analysis=False,
enable_video_splitting=True,
max_concurrent=1, # 使用单线程避免资源竞争
continue_on_error=True
)
# 检查结果
if result.get("workflow_state") == "completed":
print("✅ 批量处理完成!")
# 显示摘要
batch_results = result.get("batch_results", {})
print(f"\n📊 处理摘要:")
print(f" 总视频数: {batch_results.get('total_videos', 0)}")
print(f" 成功处理: {batch_results.get('completed_videos', 0)}")
print(f" 处理失败: {batch_results.get('failed_videos', 0)}")
print(f" 成功率: {batch_results.get('success_rate', 0):.1f}%")
# 显示每个任务的详细信息
tasks = batch_results.get('tasks', [])
print(f"\n📋 任务详情:")
for i, task in enumerate(tasks, 1):
video_name = Path(task['video_path']).name
status = task['status']
scenes = task.get('total_scenes', 0)
splits = task.get('split_count', 0)
proc_time = task.get('processing_time', 0)
print(f" 任务 {i}: {video_name}")
print(f" 状态: {status}")
print(f" 场景数: {scenes}")
print(f" 切分数: {splits}")
print(f" 处理时间: {proc_time:.2f}s")
if task.get('error'):
print(f" 错误: {task['error']}")
# 检查输出文件
if task.get('output_dir'):
output_path = Path(task['output_dir'])
if output_path.exists():
print(f" 输出目录: {output_path}")
# 检查场景检测结果文件
scenes_file = output_path / "scenes.json"
if scenes_file.exists():
print(f" ✅ 场景检测结果: {scenes_file}")
# 检查切分目录
scenes_dir = output_path / "scenes"
if scenes_dir.exists():
split_files = list(scenes_dir.glob("*.mp4"))
print(f" ✅ 切分文件: {len(split_files)}")
for split_file in split_files[:3]: # 只显示前3个
size_mb = split_file.stat().st_size / (1024 * 1024)
print(f"{split_file.name} ({size_mb:.1f}MB)")
if len(split_files) > 3:
print(f" ... 还有 {len(split_files) - 3} 个文件")
# 检查切分摘要
summary_file = output_path / "split_summary.json"
if summary_file.exists():
print(f" ✅ 切分摘要: {summary_file}")
try:
with open(summary_file, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
print(f" 成功切分: {summary_data.get('successful_splits', 0)}")
print(f" 失败切分: {summary_data.get('failed_splits', 0)}")
print(f" 总输出大小: {summary_data.get('total_output_size', 0):,} bytes")
except Exception as e:
print(f" ⚠️ 读取摘要失败: {e}")
return True
else:
print("❌ 批量处理失败")
errors = result.get("errors", [])
if errors:
print("错误信息:")
for error in errors:
print(f"{error}")
return False
except Exception as e:
print(f"❌ 测试异常: {e}")
import traceback
print(f"详细错误: {traceback.format_exc()}")
return False
def test_cli_batch_command():
"""测试CLI批量命令"""
print("\n🧪 测试CLI批量命令")
print("=" * 60)
try:
# 检查CLI帮助
import subprocess
result = subprocess.run(
['python3', '-m', 'python_core.cli', 'scene', 'batch', '--help'],
capture_output=True,
text=True,
cwd=project_root
)
if result.returncode == 0:
print("✅ CLI批量命令帮助正常")
print("📋 命令帮助预览:")
help_lines = result.stdout.split('\n')[:10] # 只显示前10行
for line in help_lines:
if line.strip():
print(f" {line}")
if len(result.stdout.split('\n')) > 10:
print(" ...")
return True
else:
print("❌ CLI批量命令帮助失败")
print(f"错误: {result.stderr}")
return False
except Exception as e:
print(f"❌ CLI测试异常: {e}")
return False
def main():
"""主函数"""
print("🚀 开始测试批量场景检测和切分工作流")
print("=" * 80)
tests = [
("批量工作流功能", test_batch_workflow),
("CLI批量命令", test_cli_batch_command),
]
passed = 0
total = len(tests)
for test_name, test_func in tests:
try:
if test_func():
passed += 1
print(f"{test_name} - 通过")
else:
print(f"{test_name} - 失败")
except Exception as e:
print(f"{test_name} - 异常: {e}")
print("\n" + "=" * 80)
print(f"🎉 测试完成: {passed}/{total} 通过")
if passed == total:
print("🎊 所有测试都通过了!批量工作流开发成功!")
print("\n💡 使用方法:")
print(" # 批量处理目录中的所有视频")
print(" python3 -m python_core.cli scene batch input_dir output_dir")
print(" ")
print(" # 自定义参数")
print(" python3 -m python_core.cli scene batch input_dir output_dir \\")
print(" --threshold 15.0 --concurrent 4 --no-ai --split")
else:
print("⚠️ 部分测试失败,需要进一步调试")
return passed == total
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)