mxivideo/scripts/test_media_manager_refactor.py

362 lines
13 KiB
Python

#!/usr/bin/env python3
"""
测试重构后的媒体管理器
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_imports():
"""测试模块导入"""
print("🔍 测试模块导入")
print("=" * 50)
try:
# 测试数据类型导入
from python_core.services.media_manager.types import (
VideoSegment, OriginalVideo, VideoInfo, UploadResult, BatchUploadResult
)
print("✅ 数据类型导入成功")
# 测试组件导入
from python_core.services.media_manager.video_info import (
VideoInfoExtractor, create_video_info_extractor
)
from python_core.services.media_manager.scene_detector import (
SceneDetector, create_scene_detector
)
from python_core.services.media_manager.video_processor import (
VideoProcessor, create_video_processor
)
from python_core.services.media_manager.storage import MediaStorage
print("✅ 组件导入成功")
# 测试主要类导入
from python_core.services.media_manager.manager import MediaManager, get_media_manager
from python_core.services.media_manager.cli import MediaManagerCommander
print("✅ 主要类导入成功")
# 测试统一导入
from python_core.services.media_manager import (
VideoSegment, MediaManager, MediaManagerCommander
)
print("✅ 统一导入成功")
return True
except ImportError as e:
print(f"❌ 导入失败: {e}")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
def test_video_info_extractor():
"""测试视频信息提取器"""
print("\n📹 测试视频信息提取器")
print("=" * 50)
try:
from python_core.services.media_manager.video_info import create_video_info_extractor
# 创建提取器
extractor = create_video_info_extractor()
print(f"✅ 视频信息提取器创建成功: {type(extractor).__name__}")
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if video_files:
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
try:
video_info = extractor.extract_video_info(test_video)
print(f"✅ 视频信息提取成功:")
print(f" 时长: {video_info.duration:.2f}")
print(f" 分辨率: {video_info.width}x{video_info.height}")
print(f" 帧率: {video_info.fps:.2f}")
print(f" 文件大小: {video_info.file_size} bytes")
except Exception as e:
print(f"⚠️ 视频信息提取失败: {e}")
else:
print("⚠️ 没有找到测试视频,跳过功能测试")
return True
except Exception as e:
print(f"❌ 视频信息提取器测试失败: {e}")
return False
def test_scene_detector():
"""测试场景检测器"""
print("\n🎬 测试场景检测器")
print("=" * 50)
try:
from python_core.services.media_manager.scene_detector import create_scene_detector
# 创建检测器
detector = create_scene_detector()
print(f"✅ 场景检测器创建成功: {type(detector).__name__}")
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if video_files:
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
try:
scene_changes = detector.detect_scenes(test_video, threshold=30.0)
print(f"✅ 场景检测成功:")
print(f" 检测到 {len(scene_changes)-1} 个场景变化")
print(f" 场景时间点: {[f'{t:.2f}s' for t in scene_changes[:5]]}")
except Exception as e:
print(f"⚠️ 场景检测失败: {e}")
else:
print("⚠️ 没有找到测试视频,跳过功能测试")
return True
except Exception as e:
print(f"❌ 场景检测器测试失败: {e}")
return False
def test_media_storage():
"""测试媒体存储"""
print("\n💾 测试媒体存储")
print("=" * 50)
try:
from python_core.services.media_manager.storage import MediaStorage
from python_core.services.media_manager.types import VideoSegment, OriginalVideo
# 创建存储管理器
storage = MediaStorage()
print("✅ 媒体存储创建成功")
# 测试加载数据
segments = storage.load_video_segments()
videos = storage.load_original_videos()
print(f"✅ 数据加载成功: {len(segments)} 个片段, {len(videos)} 个视频")
# 测试搜索功能
if segments:
# 测试标签搜索
test_segments = storage.get_segments_by_tags(segments, ["分镜"], match_all=False)
print(f"✅ 标签搜索测试: 找到 {len(test_segments)} 个分镜片段")
# 测试关键词搜索
keyword_segments = storage.search_segments(segments, "mp4")
print(f"✅ 关键词搜索测试: 找到 {len(keyword_segments)} 个mp4片段")
# 测试热门片段
popular_segments = storage.get_popular_segments(segments, limit=5)
print(f"✅ 热门片段测试: 找到 {len(popular_segments)} 个热门片段")
return True
except Exception as e:
print(f"❌ 媒体存储测试失败: {e}")
return False
def test_media_manager():
"""测试媒体管理器"""
print("\n🎛️ 测试媒体管理器")
print("=" * 50)
try:
from python_core.services.media_manager.manager import get_media_manager
# 获取全局实例
manager = get_media_manager()
print("✅ 媒体管理器创建成功")
# 测试查询方法
all_segments = manager.get_all_segments()
all_videos = manager.get_all_original_videos()
print(f"✅ 查询测试: {len(all_segments)} 个片段, {len(all_videos)} 个视频")
# 测试搜索方法
if all_segments:
search_results = manager.search_segments("mp4")
print(f"✅ 搜索测试: 找到 {len(search_results)} 个结果")
tag_results = manager.get_segments_by_tags(["分镜"])
print(f"✅ 标签搜索测试: 找到 {len(tag_results)} 个分镜片段")
popular_results = manager.get_popular_segments(limit=3)
print(f"✅ 热门片段测试: 找到 {len(popular_results)} 个热门片段")
return True
except Exception as e:
print(f"❌ 媒体管理器测试失败: {e}")
return False
def test_commander():
"""测试命令行接口"""
print("\n⌨️ 测试命令行接口")
print("=" * 50)
try:
from python_core.services.media_manager.cli import MediaManagerCommander
# 创建Commander
commander = MediaManagerCommander()
print("✅ 媒体管理器Commander创建成功")
# 检查注册的命令
commands = list(commander.commands.keys())
expected_commands = [
"upload", "batch_upload", "get_all_segments", "get_all_videos",
"search_segments", "get_segments_by_tags", "add_tags", "delete_segment"
]
for cmd in expected_commands:
if cmd in commands:
print(f"✅ 命令 '{cmd}' 已注册")
else:
print(f"❌ 命令 '{cmd}' 未注册")
return False
# 测试参数解析
test_args = ["get_all_segments"]
try:
command, parsed_args = commander.parse_arguments(test_args)
print(f"✅ 参数解析成功: {command}")
except Exception as e:
print(f"❌ 参数解析失败: {e}")
return False
return True
except Exception as e:
print(f"❌ Commander测试失败: {e}")
return False
def test_file_structure():
"""测试文件结构"""
print("\n📁 测试文件结构")
print("=" * 50)
try:
# 检查新的模块文件
module_files = [
"python_core/services/media_manager/__init__.py",
"python_core/services/media_manager/types.py",
"python_core/services/media_manager/video_info.py",
"python_core/services/media_manager/scene_detector.py",
"python_core/services/media_manager/video_processor.py",
"python_core/services/media_manager/storage.py",
"python_core/services/media_manager/manager.py",
"python_core/services/media_manager/cli.py"
]
for file_path in module_files:
full_path = project_root / file_path
if full_path.exists():
lines = len(full_path.read_text().splitlines())
print(f"{file_path} 存在 ({lines} 行)")
else:
print(f"{file_path} 不存在")
return False
# 检查原文件是否已删除
old_file = project_root / "python_core/services/media_manager.py"
if not old_file.exists():
print("✅ 原始大文件已删除")
else:
print("⚠️ 原始大文件仍然存在")
return True
except Exception as e:
print(f"❌ 文件结构测试失败: {e}")
return False
def main():
"""主函数"""
print("🚀 测试重构后的媒体管理器")
try:
# 运行所有测试
tests = [
test_imports,
test_video_info_extractor,
test_scene_detector,
test_media_storage,
test_media_manager,
test_commander,
test_file_structure
]
results = []
for test in tests:
try:
result = test()
results.append(result)
except Exception as e:
print(f"❌ 测试 {test.__name__} 异常: {e}")
results.append(False)
# 总结
print("\n" + "=" * 60)
print("📊 媒体管理器重构测试总结")
print("=" * 60)
passed = sum(results)
total = len(results)
print(f"通过测试: {passed}/{total}")
if passed == total:
print("🎉 所有重构测试通过!")
print("\n✅ 重构成果:")
print(" 1. 模块化设计 - 8个专门的模块文件")
print(" 2. 单一职责 - 每个模块功能明确")
print(" 3. 代码简化 - 从1506行减少到~200行/文件")
print(" 4. 易于维护 - 修改影响范围小")
print(" 5. 可测试性 - 每个组件可独立测试")
print("\n📁 新的模块结构:")
print(" media_manager/")
print(" ├── types.py - 数据类型定义")
print(" ├── video_info.py - 视频信息提取")
print(" ├── scene_detector.py - 场景检测")
print(" ├── video_processor.py - 视频处理")
print(" ├── storage.py - 存储管理")
print(" ├── manager.py - 主要管理器")
print(" ├── cli.py - 命令行接口")
print(" └── __init__.py - 统一导入")
print("\n🎯 使用方式:")
print(" # 简单使用")
print(" from python_core.services.media_manager import MediaManager")
print(" manager = MediaManager()")
print(" # 命令行使用")
print(" from python_core.services.media_manager import MediaManagerCommander")
print(" commander = MediaManagerCommander()")
return 0
else:
print("⚠️ 部分重构测试失败")
return 1
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)