#!/usr/bin/env python3 """ 测试批量场景检测命令行工具 """ import sys import tempfile import shutil import json from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def test_imports(): """测试模块导入""" print("🔍 测试模块导入") print("=" * 50) try: # 测试数据类型导入 from python_core.services.scene_detection.types import ( DetectorType, SceneInfo, VideoSceneResult, BatchDetectionConfig, BatchDetectionResult, DetectionStats ) print("✅ 数据类型导入成功") # 测试服务导入 from python_core.services.scene_detection.detector import SceneDetectionService print("✅ 检测服务导入成功") # 测试CLI导入 from python_core.services.scene_detection.cli import SceneDetectionCommander from python_core.utils.progress import ProgressJSONRPCCommander print("✅ CLI导入成功") # 检查继承关系 commander = SceneDetectionCommander() if isinstance(commander, ProgressJSONRPCCommander): print("✅ SceneDetectionCommander 正确继承了 ProgressJSONRPCCommander") else: print("❌ SceneDetectionCommander 没有继承 ProgressJSONRPCCommander") return False # 测试统一导入 from python_core.services.scene_detection import ( DetectorType, SceneDetectionService, SceneDetectionCommander ) print("✅ 统一导入成功") return True except ImportError as e: print(f"❌ 导入失败: {e}") return False except Exception as e: print(f"❌ 测试失败: {e}") return False def test_service_creation(): """测试服务创建""" print("\n🔧 测试服务创建") print("=" * 50) try: from python_core.services.scene_detection.detector import SceneDetectionService from python_core.services.scene_detection.types import BatchDetectionConfig, DetectorType # 创建服务 service = SceneDetectionService() print("✅ 场景检测服务创建成功") # 测试配置创建 config = BatchDetectionConfig( detector_type=DetectorType.CONTENT, threshold=30.0, min_scene_length=1.0 ) print("✅ 检测配置创建成功") # 检查支持的格式 print(f"✅ 支持的视频格式: {service.supported_formats}") return True except Exception as e: print(f"❌ 服务创建测试失败: {e}") return False def test_commander_commands(): """测试Commander命令注册""" print("\n⌨️ 测试Commander命令注册") print("=" * 50) try: from python_core.services.scene_detection.cli import SceneDetectionCommander # 创建Commander commander = SceneDetectionCommander() print("✅ SceneDetectionCommander创建成功") # 检查注册的命令 commands = list(commander.commands.keys()) expected_commands = ["detect", "batch_detect", "analyze", "compare"] for cmd in expected_commands: if cmd in commands: print(f"✅ 命令 '{cmd}' 已注册") else: print(f"❌ 命令 '{cmd}' 未注册") return False # 检查进度命令识别 progressive_commands = ["batch_detect", "compare"] non_progressive_commands = ["detect", "analyze"] for cmd in progressive_commands: if commander._is_progressive_command(cmd): print(f"✅ 命令 '{cmd}' 正确识别为进度命令") else: print(f"❌ 命令 '{cmd}' 没有被识别为进度命令") return False for cmd in non_progressive_commands: if not commander._is_progressive_command(cmd): print(f"⚡ 命令 '{cmd}' 正确识别为非进度命令") else: print(f"❌ 命令 '{cmd}' 错误识别为进度命令") return False return True except Exception as e: print(f"❌ Commander命令测试失败: {e}") return False def test_argument_parsing(): """测试参数解析""" print("\n📝 测试参数解析") print("=" * 50) try: from python_core.services.scene_detection.cli import SceneDetectionCommander commander = SceneDetectionCommander() # 测试单个检测命令参数解析 test_cases = [ # (args, expected_success, description) (["detect", "video.mp4"], True, "基本单个检测"), (["detect", "video.mp4", "--threshold", "25.0"], True, "带阈值的单个检测"), (["detect", "video.mp4", "--detector", "content", "--format", "json"], True, "完整参数的单个检测"), (["batch_detect", "/path/to/videos"], True, "基本批量检测"), (["batch_detect", "/path/to/videos", "--detector", "adaptive", "--output", "results.json"], True, "完整参数的批量检测"), (["compare", "video.mp4"], True, "基本比较"), (["compare", "video.mp4", "--thresholds", "20,30,40"], True, "带阈值列表的比较"), (["analyze", "results.json"], True, "结果分析"), (["unknown_command"], False, "未知命令"), (["detect"], False, "缺少必需参数"), ] for args, expected_success, description in test_cases: try: command, parsed_args = commander.parse_arguments(args) if expected_success: print(f"✅ {description}: {command}") else: print(f"⚠️ 预期失败但成功了: {description}") except SystemExit: if not expected_success: print(f"✅ 预期失败: {description}") else: print(f"❌ 意外失败: {description}") return False except Exception as e: if not expected_success: print(f"✅ 预期失败: {description} -> {e}") else: print(f"❌ 意外错误: {description} -> {e}") return False return True except Exception as e: print(f"❌ 参数解析测试失败: {e}") return False def test_single_detection(): """测试单个视频检测""" print("\n🎬 测试单个视频检测") print("=" * 50) try: # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频,跳过单个检测测试") return True test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") from python_core.services.scene_detection.detector import SceneDetectionService from python_core.services.scene_detection.types import BatchDetectionConfig, DetectorType # 创建服务和配置 service = SceneDetectionService() config = BatchDetectionConfig( detector_type=DetectorType.CONTENT, threshold=30.0, min_scene_length=1.0 ) # 执行检测 result = service.detect_single_video(test_video, config) if result.success: print(f"✅ 单个视频检测成功:") print(f" 文件名: {result.filename}") print(f" 场景数: {result.total_scenes}") print(f" 总时长: {result.total_duration:.2f}秒") print(f" 检测时间: {result.detection_time:.2f}秒") print(f" 检测器: {result.detector_type}") else: print(f"⚠️ 单个视频检测失败: {result.error}") return True except Exception as e: print(f"❌ 单个视频检测测试失败: {e}") return False def test_batch_detection(): """测试批量检测""" print("\n📦 测试批量检测") print("=" * 50) try: # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频,跳过批量检测测试") return True # 创建临时目录并复制测试视频 with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # 复制几个测试视频 test_videos = [] for i, video_file in enumerate(video_files[:2]): # 最多复制2个 dest_file = temp_path / f"test_video_{i}.mp4" shutil.copy2(video_file, dest_file) test_videos.append(dest_file) print(f"📹 创建了 {len(test_videos)} 个测试视频") from python_core.services.scene_detection.detector import SceneDetectionService from python_core.services.scene_detection.types import BatchDetectionConfig, DetectorType # 创建服务和配置 service = SceneDetectionService() config = BatchDetectionConfig( detector_type=DetectorType.CONTENT, threshold=30.0, min_scene_length=1.0 ) # 收集进度消息 progress_messages = [] def progress_callback(message: str): progress_messages.append(message) print(f"📊 进度: {message}") # 执行批量检测 result = service.batch_detect_scenes(str(temp_path), config, progress_callback) print(f"✅ 批量检测完成:") print(f" 总文件数: {result.total_files}") print(f" 处理成功: {result.processed_files}") print(f" 处理失败: {result.failed_files}") print(f" 总场景数: {result.total_scenes}") print(f" 检测时间: {result.detection_time:.2f}秒") print(f" 收到 {len(progress_messages)} 个进度消息") return True except Exception as e: print(f"❌ 批量检测测试失败: {e}") return False def test_cli_execution(): """测试CLI执行""" print("\n⚙️ 测试CLI执行") print("=" * 50) try: # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频,跳过CLI执行测试") return True test_video = str(video_files[0]) from python_core.services.scene_detection.cli import SceneDetectionCommander commander = SceneDetectionCommander() # 测试单个检测命令 try: result = commander.execute_command("detect", { "video_path": test_video, "detector": "content", "threshold": 30.0 }) if result.get("success"): print(f"✅ CLI单个检测成功: {result['total_scenes']} 个场景") else: print(f"⚠️ CLI单个检测失败: {result.get('error', 'Unknown error')}") except Exception as e: print(f"⚠️ CLI单个检测异常: {e}") return True except Exception as e: print(f"❌ CLI执行测试失败: {e}") return False def test_output_formats(): """测试输出格式""" print("\n📄 测试输出格式") print("=" * 50) try: from python_core.services.scene_detection.detector import SceneDetectionService from python_core.services.scene_detection.types import ( BatchDetectionConfig, BatchDetectionResult, VideoSceneResult, SceneInfo ) # 创建模拟结果 scenes = [ SceneInfo(0, 0.0, 10.0, 10.0, 1.0), SceneInfo(1, 10.0, 20.0, 10.0, 1.0) ] video_result = VideoSceneResult( video_path="test.mp4", filename="test.mp4", success=True, total_scenes=2, total_duration=20.0, scenes=scenes, detection_time=1.0, detector_type="content", threshold=30.0 ) batch_result = BatchDetectionResult( total_files=1, processed_files=1, failed_files=0, total_scenes=2, total_duration=20.0, average_scenes_per_video=2.0, detection_time=1.0, results=[video_result], failed_list=[], config=BatchDetectionConfig() ) service = SceneDetectionService() # 测试不同输出格式 with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) formats = ["json", "csv", "txt"] for fmt in formats: output_file = temp_path / f"test_output.{fmt}" batch_result.config.output_format = fmt success = service.save_results(batch_result, str(output_file)) if success and output_file.exists(): print(f"✅ {fmt.upper()} 格式输出成功") else: print(f"❌ {fmt.upper()} 格式输出失败") return False return True except Exception as e: print(f"❌ 输出格式测试失败: {e}") return False def main(): """主函数""" print("🚀 测试批量场景检测命令行工具") try: # 运行所有测试 tests = [ test_imports, test_service_creation, test_commander_commands, test_argument_parsing, test_single_detection, test_batch_detection, test_cli_execution, test_output_formats ] results = [] for test in tests: try: result = test() results.append(result) except Exception as e: print(f"❌ 测试 {test.__name__} 异常: {e}") results.append(False) # 总结 print("\n" + "=" * 60) print("📊 批量场景检测工具测试总结") print("=" * 60) passed = sum(results) total = len(results) print(f"通过测试: {passed}/{total}") if passed == total: print("🎉 所有场景检测工具测试通过!") print("\n✅ 功能验证:") print(" 1. 模块导入正确 - ✅") print(" 2. 服务创建成功 - ✅") print(" 3. 命令注册完整 - ✅") print(" 4. 参数解析正常 - ✅") print(" 5. 单个检测工作 - ✅") print(" 6. 批量检测工作 - ✅") print(" 7. CLI执行正常 - ✅") print(" 8. 输出格式支持 - ✅") print("\n🔧 工具特点:") print(" 1. 多种检测器 - content/threshold/adaptive") print(" 2. 批量处理 - 目录级别的批量检测") print(" 3. 进度显示 - 实时进度反馈") print(" 4. 多种输出 - JSON/CSV/TXT格式") print(" 5. 结果分析 - 统计信息和比较") print(" 6. 检测器比较 - 自动测试不同参数") print("\n📝 使用示例:") print(" # 单个视频检测") print(" python -m python_core.services.scene_detection detect video.mp4") print(" # 批量检测(带进度)") print(" python -m python_core.services.scene_detection batch_detect /path/to/videos") print(" # 检测器比较") print(" python -m python_core.services.scene_detection compare video.mp4") return 0 else: print("⚠️ 部分场景检测工具测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)