#!/usr/bin/env python3 """ 测试重构后的媒体管理器 """ import sys from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def test_imports(): """测试模块导入""" print("🔍 测试模块导入") print("=" * 50) try: # 测试数据类型导入 from python_core.services.media_manager.types import ( VideoSegment, OriginalVideo, VideoInfo, UploadResult, BatchUploadResult ) print("✅ 数据类型导入成功") # 测试组件导入 from python_core.services.media_manager.video_info import ( VideoInfoExtractor, create_video_info_extractor ) from python_core.services.media_manager.scene_detector import ( SceneDetector, create_scene_detector ) from python_core.services.media_manager.video_processor import ( VideoProcessor, create_video_processor ) from python_core.services.media_manager.storage import MediaStorage print("✅ 组件导入成功") # 测试主要类导入 from python_core.services.media_manager.manager import MediaManager, get_media_manager from python_core.services.media_manager.cli import MediaManagerCommander print("✅ 主要类导入成功") # 测试统一导入 from python_core.services.media_manager import ( VideoSegment, MediaManager, MediaManagerCommander ) print("✅ 统一导入成功") return True except ImportError as e: print(f"❌ 导入失败: {e}") return False except Exception as e: print(f"❌ 测试失败: {e}") return False def test_video_info_extractor(): """测试视频信息提取器""" print("\n📹 测试视频信息提取器") print("=" * 50) try: from python_core.services.media_manager.video_info import create_video_info_extractor # 创建提取器 extractor = create_video_info_extractor() print(f"✅ 视频信息提取器创建成功: {type(extractor).__name__}") # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if video_files: test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") try: video_info = extractor.extract_video_info(test_video) print(f"✅ 视频信息提取成功:") print(f" 时长: {video_info.duration:.2f}秒") print(f" 分辨率: {video_info.width}x{video_info.height}") print(f" 帧率: {video_info.fps:.2f}") print(f" 文件大小: {video_info.file_size} bytes") except Exception as e: print(f"⚠️ 视频信息提取失败: {e}") else: print("⚠️ 没有找到测试视频,跳过功能测试") return True except Exception as e: print(f"❌ 视频信息提取器测试失败: {e}") return False def test_scene_detector(): """测试场景检测器""" print("\n🎬 测试场景检测器") print("=" * 50) try: from python_core.services.media_manager.scene_detector import create_scene_detector # 创建检测器 detector = create_scene_detector() print(f"✅ 场景检测器创建成功: {type(detector).__name__}") # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if video_files: test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") try: scene_changes = detector.detect_scenes(test_video, threshold=30.0) print(f"✅ 场景检测成功:") print(f" 检测到 {len(scene_changes)-1} 个场景变化") print(f" 场景时间点: {[f'{t:.2f}s' for t in scene_changes[:5]]}") except Exception as e: print(f"⚠️ 场景检测失败: {e}") else: print("⚠️ 没有找到测试视频,跳过功能测试") return True except Exception as e: print(f"❌ 场景检测器测试失败: {e}") return False def test_media_storage(): """测试媒体存储""" print("\n💾 测试媒体存储") print("=" * 50) try: from python_core.services.media_manager.storage import MediaStorage from python_core.services.media_manager.types import VideoSegment, OriginalVideo # 创建存储管理器 storage = MediaStorage() print("✅ 媒体存储创建成功") # 测试加载数据 segments = storage.load_video_segments() videos = storage.load_original_videos() print(f"✅ 数据加载成功: {len(segments)} 个片段, {len(videos)} 个视频") # 测试搜索功能 if segments: # 测试标签搜索 test_segments = storage.get_segments_by_tags(segments, ["分镜"], match_all=False) print(f"✅ 标签搜索测试: 找到 {len(test_segments)} 个分镜片段") # 测试关键词搜索 keyword_segments = storage.search_segments(segments, "mp4") print(f"✅ 关键词搜索测试: 找到 {len(keyword_segments)} 个mp4片段") # 测试热门片段 popular_segments = storage.get_popular_segments(segments, limit=5) print(f"✅ 热门片段测试: 找到 {len(popular_segments)} 个热门片段") return True except Exception as e: print(f"❌ 媒体存储测试失败: {e}") return False def test_media_manager(): """测试媒体管理器""" print("\n🎛️ 测试媒体管理器") print("=" * 50) try: from python_core.services.media_manager.manager import get_media_manager # 获取全局实例 manager = get_media_manager() print("✅ 媒体管理器创建成功") # 测试查询方法 all_segments = manager.get_all_segments() all_videos = manager.get_all_original_videos() print(f"✅ 查询测试: {len(all_segments)} 个片段, {len(all_videos)} 个视频") # 测试搜索方法 if all_segments: search_results = manager.search_segments("mp4") print(f"✅ 搜索测试: 找到 {len(search_results)} 个结果") tag_results = manager.get_segments_by_tags(["分镜"]) print(f"✅ 标签搜索测试: 找到 {len(tag_results)} 个分镜片段") popular_results = manager.get_popular_segments(limit=3) print(f"✅ 热门片段测试: 找到 {len(popular_results)} 个热门片段") return True except Exception as e: print(f"❌ 媒体管理器测试失败: {e}") return False def test_commander(): """测试命令行接口""" print("\n⌨️ 测试命令行接口") print("=" * 50) try: from python_core.services.media_manager.cli import MediaManagerCommander # 创建Commander commander = MediaManagerCommander() print("✅ 媒体管理器Commander创建成功") # 检查注册的命令 commands = list(commander.commands.keys()) expected_commands = [ "upload", "batch_upload", "get_all_segments", "get_all_videos", "search_segments", "get_segments_by_tags", "add_tags", "delete_segment" ] for cmd in expected_commands: if cmd in commands: print(f"✅ 命令 '{cmd}' 已注册") else: print(f"❌ 命令 '{cmd}' 未注册") return False # 测试参数解析 test_args = ["get_all_segments"] try: command, parsed_args = commander.parse_arguments(test_args) print(f"✅ 参数解析成功: {command}") except Exception as e: print(f"❌ 参数解析失败: {e}") return False return True except Exception as e: print(f"❌ Commander测试失败: {e}") return False def test_file_structure(): """测试文件结构""" print("\n📁 测试文件结构") print("=" * 50) try: # 检查新的模块文件 module_files = [ "python_core/services/media_manager/__init__.py", "python_core/services/media_manager/types.py", "python_core/services/media_manager/video_info.py", "python_core/services/media_manager/scene_detector.py", "python_core/services/media_manager/video_processor.py", "python_core/services/media_manager/storage.py", "python_core/services/media_manager/manager.py", "python_core/services/media_manager/cli.py" ] for file_path in module_files: full_path = project_root / file_path if full_path.exists(): lines = len(full_path.read_text().splitlines()) print(f"✅ {file_path} 存在 ({lines} 行)") else: print(f"❌ {file_path} 不存在") return False # 检查原文件是否已删除 old_file = project_root / "python_core/services/media_manager.py" if not old_file.exists(): print("✅ 原始大文件已删除") else: print("⚠️ 原始大文件仍然存在") return True except Exception as e: print(f"❌ 文件结构测试失败: {e}") return False def main(): """主函数""" print("🚀 测试重构后的媒体管理器") try: # 运行所有测试 tests = [ test_imports, test_video_info_extractor, test_scene_detector, test_media_storage, test_media_manager, test_commander, test_file_structure ] results = [] for test in tests: try: result = test() results.append(result) except Exception as e: print(f"❌ 测试 {test.__name__} 异常: {e}") results.append(False) # 总结 print("\n" + "=" * 60) print("📊 媒体管理器重构测试总结") print("=" * 60) passed = sum(results) total = len(results) print(f"通过测试: {passed}/{total}") if passed == total: print("🎉 所有重构测试通过!") print("\n✅ 重构成果:") print(" 1. 模块化设计 - 8个专门的模块文件") print(" 2. 单一职责 - 每个模块功能明确") print(" 3. 代码简化 - 从1506行减少到~200行/文件") print(" 4. 易于维护 - 修改影响范围小") print(" 5. 可测试性 - 每个组件可独立测试") print("\n📁 新的模块结构:") print(" media_manager/") print(" ├── types.py - 数据类型定义") print(" ├── video_info.py - 视频信息提取") print(" ├── scene_detector.py - 场景检测") print(" ├── video_processor.py - 视频处理") print(" ├── storage.py - 存储管理") print(" ├── manager.py - 主要管理器") print(" ├── cli.py - 命令行接口") print(" └── __init__.py - 统一导入") print("\n🎯 使用方式:") print(" # 简单使用") print(" from python_core.services.media_manager import MediaManager") print(" manager = MediaManager()") print(" # 命令行使用") print(" from python_core.services.media_manager import MediaManagerCommander") print(" commander = MediaManagerCommander()") return 0 else: print("⚠️ 部分重构测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)