321 lines
12 KiB
Python
321 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
批量场景检测工具演示
|
|
"""
|
|
|
|
import sys
|
|
import tempfile
|
|
import shutil
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# 添加项目根目录到Python路径
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
def demo_single_detection():
|
|
"""演示单个视频检测"""
|
|
print("🎬 演示:单个视频场景检测")
|
|
print("=" * 60)
|
|
|
|
# 查找测试视频
|
|
assets_dir = project_root / "assets"
|
|
video_files = list(assets_dir.rglob("*.mp4"))
|
|
|
|
if not video_files:
|
|
print("⚠️ 没有找到测试视频")
|
|
return
|
|
|
|
test_video = str(video_files[0])
|
|
print(f"📹 检测视频: {test_video}")
|
|
|
|
from python_core.services.scene_detection.cli import SceneDetectionCommander
|
|
|
|
# 创建Commander
|
|
commander = SceneDetectionCommander()
|
|
|
|
# 执行单个检测
|
|
print("\n🔍 执行场景检测...")
|
|
result = commander.execute_command("detect", {
|
|
"video_path": test_video,
|
|
"detector": "content",
|
|
"threshold": 30.0,
|
|
"min_scene_length": 1.0
|
|
})
|
|
|
|
if result.get("success"):
|
|
print(f"✅ 检测成功!")
|
|
print(f" 文件名: {result['filename']}")
|
|
print(f" 场景数量: {result['total_scenes']}")
|
|
print(f" 视频时长: {result['total_duration']:.2f}秒")
|
|
print(f" 检测时间: {result['detection_time']:.2f}秒")
|
|
print(f" 检测器类型: {result['detector_type']}")
|
|
print(f" 检测阈值: {result['threshold']}")
|
|
|
|
print(f"\n📋 场景详情:")
|
|
for i, scene in enumerate(result['scenes']):
|
|
print(f" 场景 {i+1}: {scene['start_time']:.2f}s - {scene['end_time']:.2f}s ({scene['duration']:.2f}s)")
|
|
else:
|
|
print(f"❌ 检测失败: {result.get('error', 'Unknown error')}")
|
|
|
|
def demo_batch_detection():
|
|
"""演示批量检测"""
|
|
print("\n\n📦 演示:批量视频场景检测")
|
|
print("=" * 60)
|
|
|
|
# 查找测试视频
|
|
assets_dir = project_root / "assets"
|
|
video_files = list(assets_dir.rglob("*.mp4"))
|
|
|
|
if not video_files:
|
|
print("⚠️ 没有找到测试视频")
|
|
return
|
|
|
|
# 创建临时目录并复制测试视频
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
print(f"📁 创建临时目录: {temp_path}")
|
|
|
|
# 复制测试视频
|
|
test_videos = []
|
|
for i, video_file in enumerate(video_files[:3]): # 最多复制3个
|
|
dest_file = temp_path / f"demo_video_{i+1}.mp4"
|
|
shutil.copy2(video_file, dest_file)
|
|
test_videos.append(dest_file)
|
|
print(f" 📹 复制视频: {dest_file.name}")
|
|
|
|
from python_core.services.scene_detection.cli import SceneDetectionCommander
|
|
|
|
# 创建Commander
|
|
commander = SceneDetectionCommander()
|
|
|
|
# 创建输出文件路径
|
|
output_file = temp_path / "batch_results.json"
|
|
|
|
print(f"\n🔍 执行批量检测...")
|
|
print(f" 输入目录: {temp_path}")
|
|
print(f" 输出文件: {output_file}")
|
|
|
|
# 执行批量检测(这会显示进度)
|
|
result = commander.execute_command("batch_detect", {
|
|
"input_directory": str(temp_path),
|
|
"detector": "content",
|
|
"threshold": 30.0,
|
|
"output": str(output_file),
|
|
"format": "json"
|
|
})
|
|
|
|
print(f"\n✅ 批量检测完成!")
|
|
print(f" 总文件数: {result['total_files']}")
|
|
print(f" 处理成功: {result['processed_files']}")
|
|
print(f" 处理失败: {result['failed_files']}")
|
|
print(f" 总场景数: {result['total_scenes']}")
|
|
print(f" 总时长: {result['total_duration']:.2f}秒")
|
|
print(f" 平均场景数: {result['average_scenes_per_video']:.1f}")
|
|
print(f" 检测时间: {result['detection_time']:.2f}秒")
|
|
|
|
# 显示每个视频的结果
|
|
print(f"\n📋 各视频检测结果:")
|
|
for video_result in result['results']:
|
|
print(f" 📹 {video_result['filename']}")
|
|
print(f" 场景数: {video_result['total_scenes']}")
|
|
print(f" 时长: {video_result['total_duration']:.2f}秒")
|
|
print(f" 检测时间: {video_result['detection_time']:.2f}秒")
|
|
|
|
# 显示输出文件内容
|
|
if output_file.exists():
|
|
print(f"\n📄 输出文件已保存: {output_file}")
|
|
with open(output_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
print(f" 文件大小: {output_file.stat().st_size} bytes")
|
|
print(f" 包含 {len(data['results'])} 个视频结果")
|
|
|
|
def demo_detector_comparison():
|
|
"""演示检测器比较"""
|
|
print("\n\n🔬 演示:检测器效果比较")
|
|
print("=" * 60)
|
|
|
|
# 查找测试视频
|
|
assets_dir = project_root / "assets"
|
|
video_files = list(assets_dir.rglob("*.mp4"))
|
|
|
|
if not video_files:
|
|
print("⚠️ 没有找到测试视频")
|
|
return
|
|
|
|
test_video = str(video_files[0])
|
|
print(f"📹 比较视频: {test_video}")
|
|
|
|
from python_core.services.scene_detection.cli import SceneDetectionCommander
|
|
|
|
# 创建Commander
|
|
commander = SceneDetectionCommander()
|
|
|
|
# 创建临时输出文件
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
|
|
output_file = f.name
|
|
|
|
print(f"\n🔍 执行检测器比较...")
|
|
print(f" 测试阈值: 20, 30, 40")
|
|
print(f" 测试检测器: content, threshold, adaptive")
|
|
print(f" 输出文件: {output_file}")
|
|
|
|
# 执行比较(这会显示进度)
|
|
result = commander.execute_command("compare", {
|
|
"video_path": test_video,
|
|
"thresholds": "20,30,40",
|
|
"output": output_file
|
|
})
|
|
|
|
print(f"\n✅ 检测器比较完成!")
|
|
print(f" 总测试数: {result['total_tests']}")
|
|
print(f" 视频路径: {result['video_path']}")
|
|
|
|
# 显示比较结果
|
|
summary = result['summary']
|
|
print(f"\n📊 比较结果摘要:")
|
|
print(f" 成功测试数: {summary['total_successful_tests']}")
|
|
print(f" 推荐检测器: {summary['best_detector']}")
|
|
print(f" 建议: {summary['recommendation']}")
|
|
|
|
print(f"\n📋 各检测器表现:")
|
|
for detector, analysis in summary['detector_analysis'].items():
|
|
print(f" 🔧 {detector} 检测器:")
|
|
print(f" 平均场景数: {analysis['average_scenes']:.1f}")
|
|
print(f" 平均检测时间: {analysis['average_detection_time']:.2f}秒")
|
|
print(f" 测试次数: {analysis['test_count']}")
|
|
|
|
print(f"\n📋 详细测试结果:")
|
|
for test_result in result['results']:
|
|
status = "✅" if test_result['success'] else "❌"
|
|
print(f" {status} {test_result['detector']} (阈值: {test_result['threshold']})")
|
|
if test_result['success']:
|
|
print(f" 场景数: {test_result['total_scenes']}, 时间: {test_result['detection_time']:.2f}s")
|
|
else:
|
|
print(f" 错误: {test_result['error']}")
|
|
|
|
# 清理临时文件
|
|
Path(output_file).unlink(missing_ok=True)
|
|
|
|
def demo_output_formats():
|
|
"""演示不同输出格式"""
|
|
print("\n\n📄 演示:多种输出格式")
|
|
print("=" * 60)
|
|
|
|
# 查找测试视频
|
|
assets_dir = project_root / "assets"
|
|
video_files = list(assets_dir.rglob("*.mp4"))
|
|
|
|
if not video_files:
|
|
print("⚠️ 没有找到测试视频")
|
|
return
|
|
|
|
test_video = str(video_files[0])
|
|
|
|
from python_core.services.scene_detection.cli import SceneDetectionCommander
|
|
|
|
# 创建Commander
|
|
commander = SceneDetectionCommander()
|
|
|
|
# 创建临时目录
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
|
|
formats = ["json", "csv", "txt"]
|
|
|
|
for fmt in formats:
|
|
output_file = temp_path / f"demo_output.{fmt}"
|
|
|
|
print(f"\n📝 生成 {fmt.upper()} 格式输出...")
|
|
|
|
# 执行检测并保存为指定格式
|
|
result = commander.execute_command("detect", {
|
|
"video_path": test_video,
|
|
"detector": "content",
|
|
"threshold": 30.0,
|
|
"output": str(output_file),
|
|
"format": fmt
|
|
})
|
|
|
|
if output_file.exists():
|
|
file_size = output_file.stat().st_size
|
|
print(f" ✅ {fmt.upper()} 文件已生成: {output_file.name} ({file_size} bytes)")
|
|
|
|
# 显示文件内容预览
|
|
if fmt == "json":
|
|
with open(output_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
print(f" 包含 {len(data['results'])} 个视频结果")
|
|
elif fmt == "csv":
|
|
with open(output_file, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
print(f" 包含 {len(lines)} 行数据(含表头)")
|
|
elif fmt == "txt":
|
|
with open(output_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
print(f" 文本长度: {len(content)} 字符")
|
|
else:
|
|
print(f" ❌ {fmt.upper()} 文件生成失败")
|
|
|
|
def main():
|
|
"""主演示函数"""
|
|
print("🚀 批量场景检测工具完整演示")
|
|
print("=" * 80)
|
|
print("这个演示将展示批量场景检测工具的所有主要功能:")
|
|
print("1. 单个视频场景检测")
|
|
print("2. 批量视频场景检测(带进度条)")
|
|
print("3. 检测器效果比较")
|
|
print("4. 多种输出格式支持")
|
|
print("=" * 80)
|
|
|
|
try:
|
|
# 运行所有演示
|
|
demo_single_detection()
|
|
demo_batch_detection()
|
|
demo_detector_comparison()
|
|
demo_output_formats()
|
|
|
|
print("\n" + "=" * 80)
|
|
print("🎉 批量场景检测工具演示完成!")
|
|
print("=" * 80)
|
|
|
|
print("\n✨ 工具特色功能:")
|
|
print(" 🎯 智能场景检测 - 支持多种检测算法")
|
|
print(" 📊 实时进度显示 - 批量操作进度可视化")
|
|
print(" 🔧 灵活配置 - 可调节检测阈值和参数")
|
|
print(" 📄 多格式输出 - JSON/CSV/TXT格式支持")
|
|
print(" 📈 结果分析 - 自动生成统计信息")
|
|
print(" 🔬 算法比较 - 自动测试不同检测器效果")
|
|
|
|
print("\n🚀 实际使用命令:")
|
|
print(" # 检测单个视频")
|
|
print(" python -m python_core.services.scene_detection detect video.mp4 --threshold 30")
|
|
print(" ")
|
|
print(" # 批量检测目录中的所有视频")
|
|
print(" python -m python_core.services.scene_detection batch_detect /path/to/videos --output results.json")
|
|
print(" ")
|
|
print(" # 比较不同检测器效果")
|
|
print(" python -m python_core.services.scene_detection compare video.mp4 --thresholds 20,30,40")
|
|
print(" ")
|
|
print(" # 分析检测结果")
|
|
print(" python -m python_core.services.scene_detection analyze results.json --output stats.json")
|
|
|
|
print("\n💡 应用场景:")
|
|
print(" 📹 视频内容分析 - 自动识别视频中的场景变化")
|
|
print(" 🎬 影视后期制作 - 快速定位剪辑点")
|
|
print(" 📚 视频索引建立 - 为大量视频建立场景索引")
|
|
print(" 🔍 内容审核 - 批量分析视频内容结构")
|
|
print(" 📊 数据分析 - 视频库的统计分析")
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
print(f"❌ 演示过程中出错: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
if __name__ == "__main__":
|
|
exit_code = main()
|
|
sys.exit(exit_code)
|