mxivideo/scripts/test_video_splitter_modular.py

294 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
测试拆分后的视频拆分服务模块
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_module_imports():
"""测试模块导入"""
print("🔍 测试模块导入")
print("=" * 50)
try:
# 测试主模块导入
from python_core.services.video_splitter import (
VideoSplitterService, DetectionConfig, DetectorType,
SceneInfo, AnalysisResult, create_service, analyze_video
)
print("✅ 主模块导入成功")
# 测试子模块导入
from python_core.services.video_splitter.types import ValidationError
from python_core.services.video_splitter.detectors import PySceneDetectDetector
from python_core.services.video_splitter.validators import BasicVideoValidator
from python_core.services.video_splitter.service import VideoSplitterService as ServiceClass
from python_core.services.video_splitter.cli import CommandLineInterface
print("✅ 子模块导入成功")
# 测试便捷函数
service = create_service()
print("✅ 便捷函数工作正常")
return True
except ImportError as e:
print(f"❌ 导入失败: {e}")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_module_functionality():
"""测试模块功能"""
print("\n🎯 测试模块功能")
print("=" * 50)
try:
from python_core.services.video_splitter import (
VideoSplitterService, DetectionConfig, DetectorType, analyze_video
)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("⚠️ 没有找到测试视频,跳过功能测试")
return True
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
# 测试服务创建
try:
service = VideoSplitterService()
print("✅ 服务创建成功")
except Exception as e:
print(f"⚠️ 服务创建失败(可能是依赖问题): {e}")
return True
# 测试配置创建
config = DetectionConfig(
threshold=30.0,
detector_type=DetectorType.CONTENT,
min_scene_length=1.0
)
print("✅ 配置创建成功")
# 测试视频分析
result = service.analyze_video(test_video, config)
if result.success:
print(f"✅ 视频分析成功:")
print(f" 总场景数: {result.total_scenes}")
print(f" 总时长: {result.total_duration:.2f}")
print(f" 分析时间: {result.analysis_time:.2f}")
else:
print(f"❌ 视频分析失败: {result.error}")
return False
# 测试便捷函数
quick_result = analyze_video(test_video, threshold=25.0)
if quick_result.success:
print(f"✅ 便捷函数分析成功: {quick_result.total_scenes} 个场景")
else:
print(f"❌ 便捷函数分析失败: {quick_result.error}")
return False
return True
except Exception as e:
print(f"❌ 功能测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_command_line_module():
"""测试命令行模块"""
print("\n🖥️ 测试命令行模块")
print("=" * 50)
try:
import subprocess
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("⚠️ 没有找到测试视频,跳过命令行测试")
return True
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
# 测试模块命令行调用
cmd = [
sys.executable, "-m", "python_core.services.video_splitter",
"analyze", test_video, "--threshold", "30.0"
]
env = {"PYTHONPATH": str(project_root)}
print(f"🔧 执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
env=env,
cwd=str(project_root)
)
if result.returncode == 0:
print("✅ 命令行模块执行成功")
# 尝试解析输出
try:
import json
if result.stdout.startswith("JSONRPC:"):
json_str = result.stdout[8:]
data = json.loads(json_str)
if "result" in data and data["result"].get("success"):
print(f" 检测到场景数: {data['result'].get('total_scenes', 0)}")
else:
data = json.loads(result.stdout)
if data.get("success"):
print(f" 检测到场景数: {data.get('total_scenes', 0)}")
except json.JSONDecodeError:
print(f" 输出: {result.stdout[:100]}...")
else:
print(f"❌ 命令行模块执行失败")
print(f" 错误: {result.stderr}")
return False
return True
except Exception as e:
print(f"❌ 命令行测试失败: {e}")
return False
def test_module_structure():
"""测试模块结构"""
print("\n📁 测试模块结构")
print("=" * 50)
try:
# 检查文件结构
module_dir = project_root / "python_core" / "services" / "video_splitter"
expected_files = [
"__init__.py",
"__main__.py",
"types.py",
"detectors.py",
"validators.py",
"service.py",
"cli.py"
]
for file_name in expected_files:
file_path = module_dir / file_name
if file_path.exists():
print(f"{file_name} 存在")
else:
print(f"{file_name} 缺失")
return False
# 检查文件大小(应该都比较小)
for file_name in expected_files:
file_path = module_dir / file_name
if file_path.exists():
lines = len(file_path.read_text().splitlines())
if lines <= 300: # 每个文件不超过300行
print(f"{file_name}: {lines} 行 (合理大小)")
else:
print(f"⚠️ {file_name}: {lines} 行 (可能过大)")
return True
except Exception as e:
print(f"❌ 结构测试失败: {e}")
return False
def main():
"""主函数"""
print("🚀 拆分后的视频拆分服务模块测试")
try:
# 运行所有测试
tests = [
test_module_imports,
test_module_functionality,
test_command_line_module,
test_module_structure
]
results = []
for test in tests:
try:
result = test()
results.append(result)
except Exception as e:
print(f"❌ 测试 {test.__name__} 异常: {e}")
results.append(False)
# 总结
print("\n" + "=" * 60)
print("📊 模块化测试总结")
print("=" * 60)
passed = sum(results)
total = len(results)
print(f"通过测试: {passed}/{total}")
if passed == total:
print("🎉 所有模块化测试通过!")
print("\n✅ 模块化优势:")
print(" 1. 单一职责 - 每个文件职责明确")
print(" 2. 易于维护 - 文件大小合理")
print(" 3. 清晰结构 - 模块组织良好")
print(" 4. 独立测试 - 可单独测试各模块")
print(" 5. 便捷导入 - 支持多种导入方式")
print(" 6. 命令行支持 - 支持模块化调用")
print("\n📁 模块结构:")
print(" python_core/services/video_splitter/")
print(" ├── __init__.py # 模块入口和便捷函数")
print(" ├── __main__.py # 命令行入口")
print(" ├── types.py # 类型定义和数据结构")
print(" ├── detectors.py # 场景检测器实现")
print(" ├── validators.py # 视频验证器实现")
print(" ├── service.py # 核心服务实现")
print(" └── cli.py # 命令行接口")
print("\n🚀 使用方法:")
print(" # 作为模块导入")
print(" from python_core.services.video_splitter import VideoSplitterService")
print(" # 命令行调用")
print(" python -m python_core.services.video_splitter analyze video.mp4")
return 0
else:
print("⚠️ 部分模块化测试失败")
return 1
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)