mxivideo/scripts/demo_scene_detection.py

321 lines
12 KiB
Python

#!/usr/bin/env python3
"""
批量场景检测工具演示
"""
import sys
import tempfile
import shutil
import json
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def demo_single_detection():
"""演示单个视频检测"""
print("🎬 演示:单个视频场景检测")
print("=" * 60)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("⚠️ 没有找到测试视频")
return
test_video = str(video_files[0])
print(f"📹 检测视频: {test_video}")
from python_core.services.scene_detection.cli import SceneDetectionCommander
# 创建Commander
commander = SceneDetectionCommander()
# 执行单个检测
print("\n🔍 执行场景检测...")
result = commander.execute_command("detect", {
"video_path": test_video,
"detector": "content",
"threshold": 30.0,
"min_scene_length": 1.0
})
if result.get("success"):
print(f"✅ 检测成功!")
print(f" 文件名: {result['filename']}")
print(f" 场景数量: {result['total_scenes']}")
print(f" 视频时长: {result['total_duration']:.2f}")
print(f" 检测时间: {result['detection_time']:.2f}")
print(f" 检测器类型: {result['detector_type']}")
print(f" 检测阈值: {result['threshold']}")
print(f"\n📋 场景详情:")
for i, scene in enumerate(result['scenes']):
print(f" 场景 {i+1}: {scene['start_time']:.2f}s - {scene['end_time']:.2f}s ({scene['duration']:.2f}s)")
else:
print(f"❌ 检测失败: {result.get('error', 'Unknown error')}")
def demo_batch_detection():
"""演示批量检测"""
print("\n\n📦 演示:批量视频场景检测")
print("=" * 60)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("⚠️ 没有找到测试视频")
return
# 创建临时目录并复制测试视频
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
print(f"📁 创建临时目录: {temp_path}")
# 复制测试视频
test_videos = []
for i, video_file in enumerate(video_files[:3]): # 最多复制3个
dest_file = temp_path / f"demo_video_{i+1}.mp4"
shutil.copy2(video_file, dest_file)
test_videos.append(dest_file)
print(f" 📹 复制视频: {dest_file.name}")
from python_core.services.scene_detection.cli import SceneDetectionCommander
# 创建Commander
commander = SceneDetectionCommander()
# 创建输出文件路径
output_file = temp_path / "batch_results.json"
print(f"\n🔍 执行批量检测...")
print(f" 输入目录: {temp_path}")
print(f" 输出文件: {output_file}")
# 执行批量检测(这会显示进度)
result = commander.execute_command("batch_detect", {
"input_directory": str(temp_path),
"detector": "content",
"threshold": 30.0,
"output": str(output_file),
"format": "json"
})
print(f"\n✅ 批量检测完成!")
print(f" 总文件数: {result['total_files']}")
print(f" 处理成功: {result['processed_files']}")
print(f" 处理失败: {result['failed_files']}")
print(f" 总场景数: {result['total_scenes']}")
print(f" 总时长: {result['total_duration']:.2f}")
print(f" 平均场景数: {result['average_scenes_per_video']:.1f}")
print(f" 检测时间: {result['detection_time']:.2f}")
# 显示每个视频的结果
print(f"\n📋 各视频检测结果:")
for video_result in result['results']:
print(f" 📹 {video_result['filename']}")
print(f" 场景数: {video_result['total_scenes']}")
print(f" 时长: {video_result['total_duration']:.2f}")
print(f" 检测时间: {video_result['detection_time']:.2f}")
# 显示输出文件内容
if output_file.exists():
print(f"\n📄 输出文件已保存: {output_file}")
with open(output_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f" 文件大小: {output_file.stat().st_size} bytes")
print(f" 包含 {len(data['results'])} 个视频结果")
def demo_detector_comparison():
"""演示检测器比较"""
print("\n\n🔬 演示:检测器效果比较")
print("=" * 60)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("⚠️ 没有找到测试视频")
return
test_video = str(video_files[0])
print(f"📹 比较视频: {test_video}")
from python_core.services.scene_detection.cli import SceneDetectionCommander
# 创建Commander
commander = SceneDetectionCommander()
# 创建临时输出文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
output_file = f.name
print(f"\n🔍 执行检测器比较...")
print(f" 测试阈值: 20, 30, 40")
print(f" 测试检测器: content, threshold, adaptive")
print(f" 输出文件: {output_file}")
# 执行比较(这会显示进度)
result = commander.execute_command("compare", {
"video_path": test_video,
"thresholds": "20,30,40",
"output": output_file
})
print(f"\n✅ 检测器比较完成!")
print(f" 总测试数: {result['total_tests']}")
print(f" 视频路径: {result['video_path']}")
# 显示比较结果
summary = result['summary']
print(f"\n📊 比较结果摘要:")
print(f" 成功测试数: {summary['total_successful_tests']}")
print(f" 推荐检测器: {summary['best_detector']}")
print(f" 建议: {summary['recommendation']}")
print(f"\n📋 各检测器表现:")
for detector, analysis in summary['detector_analysis'].items():
print(f" 🔧 {detector} 检测器:")
print(f" 平均场景数: {analysis['average_scenes']:.1f}")
print(f" 平均检测时间: {analysis['average_detection_time']:.2f}")
print(f" 测试次数: {analysis['test_count']}")
print(f"\n📋 详细测试结果:")
for test_result in result['results']:
status = "" if test_result['success'] else ""
print(f" {status} {test_result['detector']} (阈值: {test_result['threshold']})")
if test_result['success']:
print(f" 场景数: {test_result['total_scenes']}, 时间: {test_result['detection_time']:.2f}s")
else:
print(f" 错误: {test_result['error']}")
# 清理临时文件
Path(output_file).unlink(missing_ok=True)
def demo_output_formats():
"""演示不同输出格式"""
print("\n\n📄 演示:多种输出格式")
print("=" * 60)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("⚠️ 没有找到测试视频")
return
test_video = str(video_files[0])
from python_core.services.scene_detection.cli import SceneDetectionCommander
# 创建Commander
commander = SceneDetectionCommander()
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
formats = ["json", "csv", "txt"]
for fmt in formats:
output_file = temp_path / f"demo_output.{fmt}"
print(f"\n📝 生成 {fmt.upper()} 格式输出...")
# 执行检测并保存为指定格式
result = commander.execute_command("detect", {
"video_path": test_video,
"detector": "content",
"threshold": 30.0,
"output": str(output_file),
"format": fmt
})
if output_file.exists():
file_size = output_file.stat().st_size
print(f"{fmt.upper()} 文件已生成: {output_file.name} ({file_size} bytes)")
# 显示文件内容预览
if fmt == "json":
with open(output_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f" 包含 {len(data['results'])} 个视频结果")
elif fmt == "csv":
with open(output_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
print(f" 包含 {len(lines)} 行数据(含表头)")
elif fmt == "txt":
with open(output_file, 'r', encoding='utf-8') as f:
content = f.read()
print(f" 文本长度: {len(content)} 字符")
else:
print(f"{fmt.upper()} 文件生成失败")
def main():
"""主演示函数"""
print("🚀 批量场景检测工具完整演示")
print("=" * 80)
print("这个演示将展示批量场景检测工具的所有主要功能:")
print("1. 单个视频场景检测")
print("2. 批量视频场景检测(带进度条)")
print("3. 检测器效果比较")
print("4. 多种输出格式支持")
print("=" * 80)
try:
# 运行所有演示
demo_single_detection()
demo_batch_detection()
demo_detector_comparison()
demo_output_formats()
print("\n" + "=" * 80)
print("🎉 批量场景检测工具演示完成!")
print("=" * 80)
print("\n✨ 工具特色功能:")
print(" 🎯 智能场景检测 - 支持多种检测算法")
print(" 📊 实时进度显示 - 批量操作进度可视化")
print(" 🔧 灵活配置 - 可调节检测阈值和参数")
print(" 📄 多格式输出 - JSON/CSV/TXT格式支持")
print(" 📈 结果分析 - 自动生成统计信息")
print(" 🔬 算法比较 - 自动测试不同检测器效果")
print("\n🚀 实际使用命令:")
print(" # 检测单个视频")
print(" python -m python_core.services.scene_detection detect video.mp4 --threshold 30")
print(" ")
print(" # 批量检测目录中的所有视频")
print(" python -m python_core.services.scene_detection batch_detect /path/to/videos --output results.json")
print(" ")
print(" # 比较不同检测器效果")
print(" python -m python_core.services.scene_detection compare video.mp4 --thresholds 20,30,40")
print(" ")
print(" # 分析检测结果")
print(" python -m python_core.services.scene_detection analyze results.json --output stats.json")
print("\n💡 应用场景:")
print(" 📹 视频内容分析 - 自动识别视频中的场景变化")
print(" 🎬 影视后期制作 - 快速定位剪辑点")
print(" 📚 视频索引建立 - 为大量视频建立场景索引")
print(" 🔍 内容审核 - 批量分析视频内容结构")
print(" 📊 数据分析 - 视频库的统计分析")
return 0
except Exception as e:
print(f"❌ 演示过程中出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)