mxivideo/tests/test_media_manager.py

404 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MediaManager 测试文件
测试视频切分、场景检测、信息提取等功能
"""
import os
import sys
import unittest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# 导入要测试的模块
from python_core.services.media_manager import (
MediaManager,
DependencyManager,
VideoProcessorFactory,
FFProbeVideoInfoExtractor,
OpenCVVideoInfoExtractor,
PySceneDetectSceneDetector,
OpenCVSceneDetector,
OpenCVThumbnailGenerator,
MD5FileHashCalculator,
OpenCVVideoSegmentCreator
)
class TestDependencyManager(unittest.TestCase):
"""测试依赖管理器"""
def setUp(self):
self.dependency_manager = DependencyManager()
def test_dependency_initialization(self):
"""测试依赖初始化"""
# 检查依赖管理器是否正确初始化
self.assertIsInstance(self.dependency_manager._dependencies, dict)
# 检查是否包含预期的依赖
self.assertIn('opencv', self.dependency_manager._dependencies)
self.assertIn('scenedetect', self.dependency_manager._dependencies)
def test_opencv_availability(self):
"""测试OpenCV可用性检查"""
opencv_available = self.dependency_manager.is_available('opencv')
print(f"OpenCV available: {opencv_available}")
if opencv_available:
# 如果OpenCV可用测试模块获取
cv2 = self.dependency_manager.get_module('opencv', 'cv2')
self.assertIsNotNone(cv2)
numpy = self.dependency_manager.get_module('opencv', 'numpy')
self.assertIsNotNone(numpy)
def test_scenedetect_availability(self):
"""测试PySceneDetect可用性检查"""
scenedetect_available = self.dependency_manager.is_available('scenedetect')
print(f"PySceneDetect available: {scenedetect_available}")
if scenedetect_available:
# 如果PySceneDetect可用测试模块获取
VideoManager = self.dependency_manager.get_module('scenedetect', 'VideoManager')
self.assertIsNotNone(VideoManager)
def test_dependency_info(self):
"""测试依赖信息获取"""
opencv_info = self.dependency_manager.get_dependency_info('opencv')
self.assertIsInstance(opencv_info, dict)
self.assertIn('available', opencv_info)
all_deps = self.dependency_manager.get_all_dependencies()
self.assertIsInstance(all_deps, dict)
self.assertIn('opencv', all_deps)
self.assertIn('scenedetect', all_deps)
class TestVideoProcessorFactory(unittest.TestCase):
"""测试视频处理器工厂"""
def setUp(self):
self.dependency_manager = DependencyManager()
self.factory = VideoProcessorFactory(self.dependency_manager)
def test_create_video_info_extractor(self):
"""测试创建视频信息提取器"""
try:
extractor = self.factory.create_video_info_extractor()
self.assertIsNotNone(extractor)
print(f"Created video info extractor: {type(extractor).__name__}")
except Exception as e:
self.fail(f"Failed to create video info extractor: {e}")
def test_create_scene_detector(self):
"""测试创建场景检测器"""
try:
detector = self.factory.create_scene_detector()
self.assertIsNotNone(detector)
print(f"Created scene detector: {type(detector).__name__}")
except Exception as e:
self.fail(f"Failed to create scene detector: {e}")
def test_create_thumbnail_generator(self):
"""测试创建缩略图生成器"""
if self.dependency_manager.is_available('opencv'):
try:
generator = self.factory.create_thumbnail_generator()
self.assertIsNotNone(generator)
print(f"Created thumbnail generator: {type(generator).__name__}")
except Exception as e:
self.fail(f"Failed to create thumbnail generator: {e}")
else:
print("OpenCV not available, skipping thumbnail generator test")
def test_create_hash_calculator(self):
"""测试创建哈希计算器"""
calculator = self.factory.create_hash_calculator()
self.assertIsNotNone(calculator)
self.assertIsInstance(calculator, MD5FileHashCalculator)
class TestVideoInfoExtractors(unittest.TestCase):
"""测试视频信息提取器"""
def setUp(self):
# 使用assets文件夹中的测试视频
self.test_video = project_root / "assets" / "1" / "1752031789460.mp4"
if not self.test_video.exists():
self.skipTest(f"Test video not found: {self.test_video}")
def test_ffprobe_extractor(self):
"""测试FFProbe视频信息提取器"""
try:
extractor = FFProbeVideoInfoExtractor()
info = extractor.extract_video_info(str(self.test_video))
# 验证返回的信息结构
self.assertIsInstance(info, dict)
self.assertIn('duration', info)
self.assertIn('width', info)
self.assertIn('height', info)
self.assertIn('fps', info)
self.assertIn('file_size', info)
print(f"FFProbe video info: {info}")
# 验证数值合理性
self.assertGreater(info['duration'], 0)
self.assertGreater(info['width'], 0)
self.assertGreater(info['height'], 0)
self.assertGreater(info['fps'], 0)
self.assertGreater(info['file_size'], 0)
except Exception as e:
print(f"FFProbe extractor failed (expected if ffprobe not available): {e}")
def test_opencv_extractor(self):
"""测试OpenCV视频信息提取器"""
dependency_manager = DependencyManager()
if not dependency_manager.is_available('opencv'):
self.skipTest("OpenCV not available")
try:
extractor = OpenCVVideoInfoExtractor(dependency_manager)
info = extractor.extract_video_info(str(self.test_video))
# 验证返回的信息结构
self.assertIsInstance(info, dict)
self.assertIn('duration', info)
self.assertIn('width', info)
self.assertIn('height', info)
self.assertIn('fps', info)
self.assertIn('file_size', info)
print(f"OpenCV video info: {info}")
# 验证数值合理性
self.assertGreater(info['duration'], 0)
self.assertGreater(info['width'], 0)
self.assertGreater(info['height'], 0)
self.assertGreater(info['fps'], 0)
self.assertGreater(info['file_size'], 0)
except Exception as e:
self.fail(f"OpenCV extractor failed: {e}")
class TestSceneDetectors(unittest.TestCase):
"""测试场景检测器"""
def setUp(self):
# 使用assets文件夹中的测试视频
self.test_video = project_root / "assets" / "1" / "1752031789460.mp4"
if not self.test_video.exists():
self.skipTest(f"Test video not found: {self.test_video}")
self.dependency_manager = DependencyManager()
def test_pyscenedetect_detector(self):
"""测试PySceneDetect场景检测器"""
if not self.dependency_manager.is_available('scenedetect'):
self.skipTest("PySceneDetect not available")
try:
detector = PySceneDetectSceneDetector(self.dependency_manager)
scene_changes = detector.detect_scenes(str(self.test_video), threshold=30.0)
# 验证返回的场景变化点
self.assertIsInstance(scene_changes, list)
self.assertGreater(len(scene_changes), 0)
# 第一个应该是0.0
self.assertEqual(scene_changes[0], 0.0)
# 场景变化点应该是递增的
for i in range(1, len(scene_changes)):
self.assertGreater(scene_changes[i], scene_changes[i-1])
print(f"PySceneDetect found {len(scene_changes)} scene changes: {scene_changes}")
except Exception as e:
print(f"PySceneDetect detector failed: {e}")
def test_opencv_detector(self):
"""测试OpenCV场景检测器"""
if not self.dependency_manager.is_available('opencv'):
self.skipTest("OpenCV not available")
try:
detector = OpenCVSceneDetector(self.dependency_manager)
scene_changes = detector.detect_scenes(str(self.test_video), threshold=30.0)
# 验证返回的场景变化点
self.assertIsInstance(scene_changes, list)
self.assertGreater(len(scene_changes), 0)
# 第一个应该是0.0
self.assertEqual(scene_changes[0], 0.0)
# 场景变化点应该是递增的
for i in range(1, len(scene_changes)):
self.assertGreater(scene_changes[i], scene_changes[i-1])
print(f"OpenCV found {len(scene_changes)} scene changes: {scene_changes}")
except Exception as e:
self.fail(f"OpenCV detector failed: {e}")
class TestMediaManager(unittest.TestCase):
"""测试媒体管理器"""
def setUp(self):
# 创建临时目录用于测试
self.temp_dir = tempfile.mkdtemp()
self.test_video = project_root / "assets" / "1" / "1752031789460.mp4"
if not self.test_video.exists():
self.skipTest(f"Test video not found: {self.test_video}")
# 使用临时目录创建MediaManager
with patch('python_core.config.settings') as mock_settings:
mock_settings.temp_dir = Path(self.temp_dir)
self.media_manager = MediaManager()
def tearDown(self):
# 清理临时目录
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_media_manager_initialization(self):
"""测试媒体管理器初始化"""
self.assertIsNotNone(self.media_manager)
self.assertIsNotNone(self.media_manager.dependency_manager)
self.assertIsNotNone(self.media_manager.factory)
def test_video_info_extraction(self):
"""测试视频信息提取"""
try:
video_info = self.media_manager._get_video_info(str(self.test_video))
self.assertIsInstance(video_info, dict)
self.assertIn('duration', video_info)
self.assertIn('width', video_info)
self.assertIn('height', video_info)
self.assertIn('fps', video_info)
print(f"Video info extracted: {video_info}")
except Exception as e:
self.fail(f"Video info extraction failed: {e}")
def test_scene_detection(self):
"""测试场景检测"""
try:
scene_changes = self.media_manager._detect_scene_changes(str(self.test_video))
self.assertIsInstance(scene_changes, list)
self.assertGreater(len(scene_changes), 0)
print(f"Scene changes detected: {scene_changes}")
except Exception as e:
self.fail(f"Scene detection failed: {e}")
def test_video_splitting(self):
"""测试视频切分功能"""
try:
# 首先检测场景变化
scene_changes = self.media_manager._detect_scene_changes(str(self.test_video))
# 如果场景变化太少,手动添加一些切分点
if len(scene_changes) < 3:
video_info = self.media_manager._get_video_info(str(self.test_video))
duration = video_info.get('duration', 10.0)
scene_changes = [0.0, duration / 2, duration]
print(f"Using scene changes for splitting: {scene_changes}")
# 执行视频切分
segments = self.media_manager._split_video_by_scenes(
str(self.test_video),
scene_changes,
"test_original_id",
["测试", "分镜"]
)
print(f"Created {len(segments)} video segments")
# 验证切分结果
self.assertIsInstance(segments, list)
for i, segment in enumerate(segments):
print(f"Segment {i}: {segment.filename}, duration: {segment.duration}s")
# 验证片段文件是否存在(如果创建成功的话)
if hasattr(segment, 'file_path') and segment.file_path:
segment_path = Path(segment.file_path)
if segment_path.exists():
print(f" File exists: {segment_path}")
self.assertGreater(segment_path.stat().st_size, 0)
except Exception as e:
print(f"Video splitting failed (may be expected if dependencies missing): {e}")
def run_comprehensive_test():
"""运行全面的测试"""
print("=" * 60)
print("MediaManager 综合测试")
print("=" * 60)
# 检查测试视频
test_videos = list((project_root / "assets").rglob("*.mp4"))
print(f"Found {len(test_videos)} test videos:")
for video in test_videos[:5]: # 只显示前5个
print(f" - {video}")
if not test_videos:
print("❌ No test videos found in assets folder")
return
print("\n" + "=" * 60)
print("开始测试...")
print("=" * 60)
# 运行测试套件
loader = unittest.TestLoader()
suite = unittest.TestSuite()
# 添加测试类
suite.addTests(loader.loadTestsFromTestCase(TestDependencyManager))
suite.addTests(loader.loadTestsFromTestCase(TestVideoProcessorFactory))
suite.addTests(loader.loadTestsFromTestCase(TestVideoInfoExtractors))
suite.addTests(loader.loadTestsFromTestCase(TestSceneDetectors))
suite.addTests(loader.loadTestsFromTestCase(TestMediaManager))
# 运行测试
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
print("\n" + "=" * 60)
print("测试总结")
print("=" * 60)
print(f"运行测试: {result.testsRun}")
print(f"失败: {len(result.failures)}")
print(f"错误: {len(result.errors)}")
print(f"跳过: {len(result.skipped) if hasattr(result, 'skipped') else 0}")
if result.failures:
print("\n失败的测试:")
for test, traceback in result.failures:
print(f" - {test}: {traceback}")
if result.errors:
print("\n错误的测试:")
for test, traceback in result.errors:
print(f" - {test}: {traceback}")
return result.wasSuccessful()
if __name__ == "__main__":
success = run_comprehensive_test()
sys.exit(0 if success else 1)