#!/usr/bin/env python3 """ 批量场景检测工具演示 """ import sys import tempfile import shutil import json from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def demo_single_detection(): """演示单个视频检测""" print("🎬 演示:单个视频场景检测") print("=" * 60) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频") return test_video = str(video_files[0]) print(f"📹 检测视频: {test_video}") from python_core.services.scene_detection.cli import SceneDetectionCommander # 创建Commander commander = SceneDetectionCommander() # 执行单个检测 print("\n🔍 执行场景检测...") result = commander.execute_command("detect", { "video_path": test_video, "detector": "content", "threshold": 30.0, "min_scene_length": 1.0 }) if result.get("success"): print(f"✅ 检测成功!") print(f" 文件名: {result['filename']}") print(f" 场景数量: {result['total_scenes']}") print(f" 视频时长: {result['total_duration']:.2f}秒") print(f" 检测时间: {result['detection_time']:.2f}秒") print(f" 检测器类型: {result['detector_type']}") print(f" 检测阈值: {result['threshold']}") print(f"\n📋 场景详情:") for i, scene in enumerate(result['scenes']): print(f" 场景 {i+1}: {scene['start_time']:.2f}s - {scene['end_time']:.2f}s ({scene['duration']:.2f}s)") else: print(f"❌ 检测失败: {result.get('error', 'Unknown error')}") def demo_batch_detection(): """演示批量检测""" print("\n\n📦 演示:批量视频场景检测") print("=" * 60) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频") return # 创建临时目录并复制测试视频 with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) print(f"📁 创建临时目录: {temp_path}") # 复制测试视频 test_videos = [] for i, video_file in enumerate(video_files[:3]): # 最多复制3个 dest_file = temp_path / f"demo_video_{i+1}.mp4" shutil.copy2(video_file, dest_file) test_videos.append(dest_file) print(f" 📹 复制视频: {dest_file.name}") from python_core.services.scene_detection.cli import SceneDetectionCommander # 创建Commander commander = SceneDetectionCommander() # 创建输出文件路径 output_file = temp_path / "batch_results.json" print(f"\n🔍 执行批量检测...") print(f" 输入目录: {temp_path}") print(f" 输出文件: {output_file}") # 执行批量检测(这会显示进度) result = commander.execute_command("batch_detect", { "input_directory": str(temp_path), "detector": "content", "threshold": 30.0, "output": str(output_file), "format": "json" }) print(f"\n✅ 批量检测完成!") print(f" 总文件数: {result['total_files']}") print(f" 处理成功: {result['processed_files']}") print(f" 处理失败: {result['failed_files']}") print(f" 总场景数: {result['total_scenes']}") print(f" 总时长: {result['total_duration']:.2f}秒") print(f" 平均场景数: {result['average_scenes_per_video']:.1f}") print(f" 检测时间: {result['detection_time']:.2f}秒") # 显示每个视频的结果 print(f"\n📋 各视频检测结果:") for video_result in result['results']: print(f" 📹 {video_result['filename']}") print(f" 场景数: {video_result['total_scenes']}") print(f" 时长: {video_result['total_duration']:.2f}秒") print(f" 检测时间: {video_result['detection_time']:.2f}秒") # 显示输出文件内容 if output_file.exists(): print(f"\n📄 输出文件已保存: {output_file}") with open(output_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f" 文件大小: {output_file.stat().st_size} bytes") print(f" 包含 {len(data['results'])} 个视频结果") def demo_detector_comparison(): """演示检测器比较""" print("\n\n🔬 演示:检测器效果比较") print("=" * 60) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频") return test_video = str(video_files[0]) print(f"📹 比较视频: {test_video}") from python_core.services.scene_detection.cli import SceneDetectionCommander # 创建Commander commander = SceneDetectionCommander() # 创建临时输出文件 with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: output_file = f.name print(f"\n🔍 执行检测器比较...") print(f" 测试阈值: 20, 30, 40") print(f" 测试检测器: content, threshold, adaptive") print(f" 输出文件: {output_file}") # 执行比较(这会显示进度) result = commander.execute_command("compare", { "video_path": test_video, "thresholds": "20,30,40", "output": output_file }) print(f"\n✅ 检测器比较完成!") print(f" 总测试数: {result['total_tests']}") print(f" 视频路径: {result['video_path']}") # 显示比较结果 summary = result['summary'] print(f"\n📊 比较结果摘要:") print(f" 成功测试数: {summary['total_successful_tests']}") print(f" 推荐检测器: {summary['best_detector']}") print(f" 建议: {summary['recommendation']}") print(f"\n📋 各检测器表现:") for detector, analysis in summary['detector_analysis'].items(): print(f" 🔧 {detector} 检测器:") print(f" 平均场景数: {analysis['average_scenes']:.1f}") print(f" 平均检测时间: {analysis['average_detection_time']:.2f}秒") print(f" 测试次数: {analysis['test_count']}") print(f"\n📋 详细测试结果:") for test_result in result['results']: status = "✅" if test_result['success'] else "❌" print(f" {status} {test_result['detector']} (阈值: {test_result['threshold']})") if test_result['success']: print(f" 场景数: {test_result['total_scenes']}, 时间: {test_result['detection_time']:.2f}s") else: print(f" 错误: {test_result['error']}") # 清理临时文件 Path(output_file).unlink(missing_ok=True) def demo_output_formats(): """演示不同输出格式""" print("\n\n📄 演示:多种输出格式") print("=" * 60) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频") return test_video = str(video_files[0]) from python_core.services.scene_detection.cli import SceneDetectionCommander # 创建Commander commander = SceneDetectionCommander() # 创建临时目录 with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) formats = ["json", "csv", "txt"] for fmt in formats: output_file = temp_path / f"demo_output.{fmt}" print(f"\n📝 生成 {fmt.upper()} 格式输出...") # 执行检测并保存为指定格式 result = commander.execute_command("detect", { "video_path": test_video, "detector": "content", "threshold": 30.0, "output": str(output_file), "format": fmt }) if output_file.exists(): file_size = output_file.stat().st_size print(f" ✅ {fmt.upper()} 文件已生成: {output_file.name} ({file_size} bytes)") # 显示文件内容预览 if fmt == "json": with open(output_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f" 包含 {len(data['results'])} 个视频结果") elif fmt == "csv": with open(output_file, 'r', encoding='utf-8') as f: lines = f.readlines() print(f" 包含 {len(lines)} 行数据(含表头)") elif fmt == "txt": with open(output_file, 'r', encoding='utf-8') as f: content = f.read() print(f" 文本长度: {len(content)} 字符") else: print(f" ❌ {fmt.upper()} 文件生成失败") def main(): """主演示函数""" print("🚀 批量场景检测工具完整演示") print("=" * 80) print("这个演示将展示批量场景检测工具的所有主要功能:") print("1. 单个视频场景检测") print("2. 批量视频场景检测(带进度条)") print("3. 检测器效果比较") print("4. 多种输出格式支持") print("=" * 80) try: # 运行所有演示 demo_single_detection() demo_batch_detection() demo_detector_comparison() demo_output_formats() print("\n" + "=" * 80) print("🎉 批量场景检测工具演示完成!") print("=" * 80) print("\n✨ 工具特色功能:") print(" 🎯 智能场景检测 - 支持多种检测算法") print(" 📊 实时进度显示 - 批量操作进度可视化") print(" 🔧 灵活配置 - 可调节检测阈值和参数") print(" 📄 多格式输出 - JSON/CSV/TXT格式支持") print(" 📈 结果分析 - 自动生成统计信息") print(" 🔬 算法比较 - 自动测试不同检测器效果") print("\n🚀 实际使用命令:") print(" # 检测单个视频") print(" python -m python_core.services.scene_detection detect video.mp4 --threshold 30") print(" ") print(" # 批量检测目录中的所有视频") print(" python -m python_core.services.scene_detection batch_detect /path/to/videos --output results.json") print(" ") print(" # 比较不同检测器效果") print(" python -m python_core.services.scene_detection compare video.mp4 --thresholds 20,30,40") print(" ") print(" # 分析检测结果") print(" python -m python_core.services.scene_detection analyze results.json --output stats.json") print("\n💡 应用场景:") print(" 📹 视频内容分析 - 自动识别视频中的场景变化") print(" 🎬 影视后期制作 - 快速定位剪辑点") print(" 📚 视频索引建立 - 为大量视频建立场景索引") print(" 🔍 内容审核 - 批量分析视频内容结构") print(" 📊 数据分析 - 视频库的统计分析") return 0 except Exception as e: print(f"❌ 演示过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)