#!/usr/bin/env python3 """ 测试移除降级逻辑后的视频拆分服务 验证快速失败和明确错误处理 """ import sys from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def test_explicit_dependency_failure(): """测试明确的依赖失败""" print("🔍 测试明确的依赖失败处理") print("=" * 50) try: # 模拟缺少依赖的情况 import sys original_modules = sys.modules.copy() # 临时移除scenedetect模块(如果存在) modules_to_remove = [name for name in sys.modules if name.startswith('scenedetect')] for module_name in modules_to_remove: del sys.modules[module_name] try: from python_core.services.video_splitter.detectors import PySceneDetectDetector # 尝试创建检测器,应该快速失败 try: detector = PySceneDetectDetector() print("❌ 应该抛出DependencyError但没有") return False except Exception as e: if "DependencyError" in str(type(e)) or "PySceneDetect" in str(e): print(f"✅ 正确抛出依赖错误: {e}") return True else: print(f"❌ 抛出了意外错误: {e}") return False finally: # 恢复模块 sys.modules.update(original_modules) except ImportError as e: print(f"✅ 导入时就失败了,这是正确的: {e}") return True except Exception as e: print(f"❌ 意外错误: {e}") return False def test_successful_import_with_dependencies(): """测试有依赖时的成功导入""" print("\n🎯 测试有依赖时的成功导入") print("=" * 50) try: # 检查PySceneDetect是否可用 try: import scenedetect print(f"✅ PySceneDetect {scenedetect.__version__} 可用") except ImportError: print("⚠️ PySceneDetect不可用,跳过此测试") return True # 测试导入 from python_core.services.video_splitter import VideoSplitterService, DetectionConfig print("✅ 模块导入成功") # 测试服务创建 service = VideoSplitterService() print("✅ 服务创建成功") # 测试配置创建 config = DetectionConfig(threshold=30.0) print("✅ 配置创建成功") return True except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False def test_validation_errors(): """测试验证错误的快速失败""" print("\n🛡️ 测试验证错误的快速失败") print("=" * 50) try: from python_core.services.video_splitter.types import SceneInfo, DetectionConfig, ValidationError # 测试无效的SceneInfo print("🔍 测试无效的SceneInfo...") try: invalid_scene = SceneInfo( scene_number=0, # 无效:必须为正数 start_time=0.0, end_time=5.0, duration=5.0, start_frame=0, end_frame=120 ) print("❌ 应该抛出ValidationError但没有") return False except ValidationError as e: print(f"✅ 正确抛出验证错误: {e}") # 测试无效的DetectionConfig print("🔍 测试无效的DetectionConfig...") try: invalid_config = DetectionConfig(threshold=150.0) # 超出范围 print("❌ 应该抛出ValidationError但没有") return False except ValidationError as e: print(f"✅ 正确抛出配置验证错误: {e}") # 测试时间不一致的SceneInfo print("🔍 测试时间不一致的SceneInfo...") try: inconsistent_scene = SceneInfo( scene_number=1, start_time=0.0, end_time=5.0, duration=10.0, # 不匹配的时长 start_frame=0, end_frame=120 ) print("❌ 应该抛出ValidationError但没有") return False except ValidationError as e: print(f"✅ 正确抛出时间不一致错误: {e}") return True except Exception as e: print(f"❌ 验证测试失败: {e}") return False def test_file_validation(): """测试文件验证的快速失败""" print("\n📁 测试文件验证的快速失败") print("=" * 50) try: from python_core.services.video_splitter.validators import BasicVideoValidator from python_core.services.video_splitter.types import ValidationError validator = BasicVideoValidator() # 测试不存在的文件 print("🔍 测试不存在的文件...") try: validator.validate("/nonexistent/file.mp4") print("❌ 应该抛出ValidationError但没有") return False except ValidationError as e: print(f"✅ 正确抛出文件不存在错误: {e}") # 测试空路径 print("🔍 测试空文件...") import tempfile with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: tmp_path = tmp.name try: validator.validate(tmp_path) print("❌ 应该抛出ValidationError但没有") return False except ValidationError as e: print(f"✅ 正确抛出空文件错误: {e}") finally: # 清理临时文件 import os os.unlink(tmp_path) return True except Exception as e: print(f"❌ 文件验证测试失败: {e}") return False def test_error_propagation(): """测试错误传播机制""" print("\n🔄 测试错误传播机制") print("=" * 50) try: from python_core.services.video_splitter import VideoSplitterService # 检查依赖 try: import scenedetect except ImportError: print("⚠️ PySceneDetect不可用,跳过错误传播测试") return True service = VideoSplitterService() # 测试无效文件的错误传播 print("🔍 测试无效文件的错误传播...") result = service.analyze_video("/nonexistent/file.mp4") if not result.success and result.error: print(f"✅ 错误正确传播到结果: {result.error}") # 验证错误信息包含有用信息 if "not found" in result.error.lower() or "nonexistent" in result.error.lower(): print("✅ 错误信息包含有用的调试信息") else: print(f"⚠️ 错误信息可能不够详细: {result.error}") else: print("❌ 错误没有正确传播") return False return True except Exception as e: print(f"❌ 错误传播测试失败: {e}") return False def main(): """主函数""" print("🚀 测试移除降级逻辑后的视频拆分服务") print("验证快速失败和明确错误处理") try: # 运行所有测试 tests = [ test_explicit_dependency_failure, test_successful_import_with_dependencies, test_validation_errors, test_file_validation, test_error_propagation ] results = [] for test in tests: try: result = test() results.append(result) except Exception as e: print(f"❌ 测试 {test.__name__} 异常: {e}") results.append(False) # 总结 print("\n" + "=" * 60) print("📊 快速失败测试总结") print("=" * 60) passed = sum(results) total = len(results) print(f"通过测试: {passed}/{total}") if passed == total: print("🎉 所有快速失败测试通过!") print("\n✅ 移除降级逻辑的优势:") print(" 1. 快速失败 - 依赖问题立即暴露") print(" 2. 明确错误 - 错误信息清晰具体") print(" 3. 易于调试 - 问题根源容易定位") print(" 4. 避免隐藏问题 - 不会掩盖配置错误") print(" 5. 一致行为 - 不同环境下行为一致") print("\n🔧 错误处理策略:") print(" 1. 依赖检查 - 启动时立即检查所有依赖") print(" 2. 数据验证 - 创建时验证数据完整性") print(" 3. 文件验证 - 处理前验证文件存在性") print(" 4. 错误传播 - 保持错误信息的完整性") print(" 5. 结构化异常 - 使用专门的异常类型") return 0 else: print("⚠️ 部分快速失败测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)