#!/usr/bin/env python3 """ 测试拆分后的视频拆分服务模块 """ import sys from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def test_module_imports(): """测试模块导入""" print("🔍 测试模块导入") print("=" * 50) try: # 测试主模块导入 from python_core.services.video_splitter import ( VideoSplitterService, DetectionConfig, DetectorType, SceneInfo, AnalysisResult, create_service, analyze_video ) print("✅ 主模块导入成功") # 测试子模块导入 from python_core.services.video_splitter.types import ValidationError from python_core.services.video_splitter.detectors import PySceneDetectDetector from python_core.services.video_splitter.validators import BasicVideoValidator from python_core.services.video_splitter.service import VideoSplitterService as ServiceClass from python_core.services.video_splitter.cli import CommandLineInterface print("✅ 子模块导入成功") # 测试便捷函数 service = create_service() print("✅ 便捷函数工作正常") return True except ImportError as e: print(f"❌ 导入失败: {e}") return False except Exception as e: print(f"❌ 测试失败: {e}") return False def test_module_functionality(): """测试模块功能""" print("\n🎯 测试模块功能") print("=" * 50) try: from python_core.services.video_splitter import ( VideoSplitterService, DetectionConfig, DetectorType, analyze_video ) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频,跳过功能测试") return True test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") # 测试服务创建 try: service = VideoSplitterService() print("✅ 服务创建成功") except Exception as e: print(f"⚠️ 服务创建失败(可能是依赖问题): {e}") return True # 测试配置创建 config = DetectionConfig( threshold=30.0, detector_type=DetectorType.CONTENT, min_scene_length=1.0 ) print("✅ 配置创建成功") # 测试视频分析 result = service.analyze_video(test_video, config) if result.success: print(f"✅ 视频分析成功:") print(f" 总场景数: {result.total_scenes}") print(f" 总时长: {result.total_duration:.2f}秒") print(f" 分析时间: {result.analysis_time:.2f}秒") else: print(f"❌ 视频分析失败: {result.error}") return False # 测试便捷函数 quick_result = analyze_video(test_video, threshold=25.0) if quick_result.success: print(f"✅ 便捷函数分析成功: {quick_result.total_scenes} 个场景") else: print(f"❌ 便捷函数分析失败: {quick_result.error}") return False return True except Exception as e: print(f"❌ 功能测试失败: {e}") import traceback traceback.print_exc() return False def test_command_line_module(): """测试命令行模块""" print("\n🖥️ 测试命令行模块") print("=" * 50) try: import subprocess # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("⚠️ 没有找到测试视频,跳过命令行测试") return True test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") # 测试模块命令行调用 cmd = [ sys.executable, "-m", "python_core.services.video_splitter", "analyze", test_video, "--threshold", "30.0" ] env = {"PYTHONPATH": str(project_root)} print(f"🔧 执行命令: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, env=env, cwd=str(project_root) ) if result.returncode == 0: print("✅ 命令行模块执行成功") # 尝试解析输出 try: import json if result.stdout.startswith("JSONRPC:"): json_str = result.stdout[8:] data = json.loads(json_str) if "result" in data and data["result"].get("success"): print(f" 检测到场景数: {data['result'].get('total_scenes', 0)}") else: data = json.loads(result.stdout) if data.get("success"): print(f" 检测到场景数: {data.get('total_scenes', 0)}") except json.JSONDecodeError: print(f" 输出: {result.stdout[:100]}...") else: print(f"❌ 命令行模块执行失败") print(f" 错误: {result.stderr}") return False return True except Exception as e: print(f"❌ 命令行测试失败: {e}") return False def test_module_structure(): """测试模块结构""" print("\n📁 测试模块结构") print("=" * 50) try: # 检查文件结构 module_dir = project_root / "python_core" / "services" / "video_splitter" expected_files = [ "__init__.py", "__main__.py", "types.py", "detectors.py", "validators.py", "service.py", "cli.py" ] for file_name in expected_files: file_path = module_dir / file_name if file_path.exists(): print(f"✅ {file_name} 存在") else: print(f"❌ {file_name} 缺失") return False # 检查文件大小(应该都比较小) for file_name in expected_files: file_path = module_dir / file_name if file_path.exists(): lines = len(file_path.read_text().splitlines()) if lines <= 300: # 每个文件不超过300行 print(f"✅ {file_name}: {lines} 行 (合理大小)") else: print(f"⚠️ {file_name}: {lines} 行 (可能过大)") return True except Exception as e: print(f"❌ 结构测试失败: {e}") return False def main(): """主函数""" print("🚀 拆分后的视频拆分服务模块测试") try: # 运行所有测试 tests = [ test_module_imports, test_module_functionality, test_command_line_module, test_module_structure ] results = [] for test in tests: try: result = test() results.append(result) except Exception as e: print(f"❌ 测试 {test.__name__} 异常: {e}") results.append(False) # 总结 print("\n" + "=" * 60) print("📊 模块化测试总结") print("=" * 60) passed = sum(results) total = len(results) print(f"通过测试: {passed}/{total}") if passed == total: print("🎉 所有模块化测试通过!") print("\n✅ 模块化优势:") print(" 1. 单一职责 - 每个文件职责明确") print(" 2. 易于维护 - 文件大小合理") print(" 3. 清晰结构 - 模块组织良好") print(" 4. 独立测试 - 可单独测试各模块") print(" 5. 便捷导入 - 支持多种导入方式") print(" 6. 命令行支持 - 支持模块化调用") print("\n📁 模块结构:") print(" python_core/services/video_splitter/") print(" ├── __init__.py # 模块入口和便捷函数") print(" ├── __main__.py # 命令行入口") print(" ├── types.py # 类型定义和数据结构") print(" ├── detectors.py # 场景检测器实现") print(" ├── validators.py # 视频验证器实现") print(" ├── service.py # 核心服务实现") print(" └── cli.py # 命令行接口") print("\n🚀 使用方法:") print(" # 作为模块导入") print(" from python_core.services.video_splitter import VideoSplitterService") print(" # 命令行调用") print(" python -m python_core.services.video_splitter analyze video.mp4") return 0 else: print("⚠️ 部分模块化测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)