#!/usr/bin/env python3 """ 测试模块化的Commander """ import sys from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def test_modular_imports(): """测试模块化导入""" print("🔍 测试模块化导入") print("=" * 50) try: # 测试基础模块导入 from python_core.utils.commander.types import CommandConfig from python_core.utils.commander.parser import ArgumentParser from python_core.utils.commander.base import JSONRPCCommander from python_core.utils.commander.simple import SimpleJSONRPCCommander, create_simple_commander print("✅ 基础模块导入成功") # 测试统一导入 from python_core.utils.commander import ( CommandConfig, ArgumentParser, JSONRPCCommander, SimpleJSONRPCCommander, create_simple_commander ) print("✅ 统一导入成功") # 测试进度模块导入 from python_core.utils.progress import ( ProgressInfo, TaskResult, ProgressiveTask, ProgressJSONRPCCommander, create_progress_commander ) print("✅ 进度模块导入成功") return True except ImportError as e: print(f"❌ 导入失败: {e}") return False except Exception as e: print(f"❌ 测试失败: {e}") return False def test_command_config(): """测试命令配置""" print("\n📋 测试命令配置") print("=" * 50) try: from python_core.utils.commander.types import CommandConfig # 创建命令配置 config = CommandConfig( name="test_command", description="测试命令", required_args=["input"], optional_args={ "output": {"type": str, "default": "output.txt"}, "verbose": {"type": bool, "default": False} } ) print(f"✅ 命令配置创建成功: {config.name}") print(f" 描述: {config.description}") print(f" 必需参数: {config.required_args}") print(f" 可选参数: {list(config.optional_args.keys())}") return True except Exception as e: print(f"❌ 命令配置测试失败: {e}") return False def test_argument_parser(): """测试参数解析器""" print("\n🔧 测试参数解析器") print("=" * 50) try: from python_core.utils.commander.types import CommandConfig from python_core.utils.commander.parser import ArgumentParser # 创建命令配置 commands = { "process": CommandConfig( name="process", description="处理文件", required_args=["input_file"], optional_args={ "output": {"type": str, "default": "output.txt"}, "format": {"type": str, "default": "json", "choices": ["json", "xml"]}, "verbose": {"type": bool, "default": False} } ) } # 创建解析器 parser = ArgumentParser(commands) # 测试解析 test_cases = [ (["process", "input.txt"], True), (["process", "input.txt", "--output", "result.txt"], True), (["process", "input.txt", "--format", "xml", "--verbose"], True), (["unknown"], False), # 未知命令 (["process"], False), # 缺少必需参数 ] for args, should_succeed in test_cases: try: command, parsed_args = parser.parse_arguments(args) if should_succeed: print(f"✅ 解析成功: {args} -> {command}, {parsed_args}") else: print(f"⚠️ 预期失败但成功了: {args}") except ValueError as e: if not should_succeed: print(f"✅ 预期失败: {args} -> {e}") else: print(f"❌ 意外失败: {args} -> {e}") return True except Exception as e: print(f"❌ 参数解析器测试失败: {e}") return False def test_simple_commander(): """测试简单Commander""" print("\n🚀 测试简单Commander") print("=" * 50) try: from python_core.utils.commander import create_simple_commander # 创建Commander commander = create_simple_commander("test_service") # 定义处理函数 def hello_handler(name: str = "World"): return {"message": f"Hello, {name}!"} def add_handler(a: str, b: str): return {"result": float(a) + float(b)} # 添加命令 commander.add_command( name="hello", handler=hello_handler, description="打招呼", optional_args={ "name": {"type": str, "default": "World"} } ) commander.add_command( name="add", handler=add_handler, description="加法运算", required_args=["a", "b"] ) print("✅ 命令注册成功") # 测试命令执行 test_cases = [ (["hello"], {"message": "Hello, World!"}), (["hello", "--name", "Alice"], {"message": "Hello, Alice!"}), (["add", "5", "3"], {"result": 8.0}), ] for args, expected in test_cases: try: command, parsed_args = commander.parse_arguments(args) result = commander.execute_command(command, parsed_args) if result == expected: print(f"✅ 命令测试成功: {args} -> {result}") else: print(f"❌ 结果不匹配: {args} -> 期望 {expected}, 得到 {result}") return False except Exception as e: print(f"❌ 命令执行失败: {args} -> {e}") return False return True except Exception as e: print(f"❌ 简单Commander测试失败: {e}") return False def test_video_splitter_integration(): """测试视频拆分服务集成""" print("\n🎬 测试视频拆分服务集成") print("=" * 50) try: # 检查依赖 try: import scenedetect print(f"✅ PySceneDetect {scenedetect.__version__} 可用") except ImportError: print("⚠️ PySceneDetect不可用,跳过视频拆分集成测试") return True from python_core.services.video_splitter.cli import VideoSplitterCommander # 创建Commander commander = VideoSplitterCommander() print("✅ 视频拆分Commander创建成功") # 检查注册的命令 commands = list(commander.commands.keys()) expected_commands = ["analyze", "detect_scenes"] for cmd in expected_commands: if cmd in commands: print(f"✅ 命令 '{cmd}' 已注册") else: print(f"❌ 命令 '{cmd}' 未注册") return False # 测试参数解析 test_args = ["analyze", "test.mp4", "--threshold", "30.0"] try: command, parsed_args = commander.parse_arguments(test_args) print(f"✅ 参数解析成功: {command}, {parsed_args}") except Exception as e: print(f"❌ 参数解析失败: {e}") return False return True except Exception as e: print(f"❌ 视频拆分集成测试失败: {e}") return False def test_file_structure(): """测试文件结构""" print("\n📁 测试文件结构") print("=" * 50) try: # 检查Commander模块文件 commander_files = [ "python_core/utils/commander/__init__.py", "python_core/utils/commander/types.py", "python_core/utils/commander/parser.py", "python_core/utils/commander/base.py", "python_core/utils/commander/simple.py" ] for file_path in commander_files: full_path = project_root / file_path if full_path.exists(): print(f"✅ {file_path} 存在") else: print(f"❌ {file_path} 不存在") return False # 检查Progress模块文件 progress_files = [ "python_core/utils/progress/__init__.py", "python_core/utils/progress/types.py", "python_core/utils/progress/task.py", "python_core/utils/progress/reporter.py", "python_core/utils/progress/generator.py", "python_core/utils/progress/decorators.py", "python_core/utils/progress/commander.py" ] for file_path in progress_files: full_path = project_root / file_path if full_path.exists(): print(f"✅ {file_path} 存在") else: print(f"❌ {file_path} 不存在") return False # 检查原文件是否已删除 old_files = [ "python_core/utils/jsonrpc_commander.py", "python_core/utils/progress_commander.py" ] for file_path in old_files: full_path = project_root / file_path if not full_path.exists(): print(f"✅ {file_path} 已删除") else: print(f"⚠️ {file_path} 仍然存在") return True except Exception as e: print(f"❌ 文件结构测试失败: {e}") return False def main(): """主函数""" print("🚀 测试模块化Commander") try: # 运行所有测试 tests = [ test_modular_imports, test_command_config, test_argument_parser, test_simple_commander, test_video_splitter_integration, test_file_structure ] results = [] for test in tests: try: result = test() results.append(result) except Exception as e: print(f"❌ 测试 {test.__name__} 异常: {e}") results.append(False) # 总结 print("\n" + "=" * 60) print("📊 模块化Commander测试总结") print("=" * 60) passed = sum(results) total = len(results) print(f"通过测试: {passed}/{total}") if passed == total: print("🎉 所有模块化测试通过!") print("\n✅ 模块化优势:") print(" 1. 代码组织 - 每个模块职责单一明确") print(" 2. 易于维护 - 修改某个功能只需改对应模块") print(" 3. 可重用性 - 模块可以独立使用") print(" 4. 测试友好 - 可以单独测试每个模块") print(" 5. 扩展性强 - 新功能可以独立添加") print("\n📁 模块结构:") print(" commander/") print(" ├── types.py - 数据类型定义") print(" ├── parser.py - 参数解析器") print(" ├── base.py - 基础Commander类") print(" ├── simple.py - 简化Commander类") print(" └── __init__.py - 统一导入接口") print(" ") print(" progress/") print(" ├── types.py - 进度相关类型") print(" ├── task.py - 任务管理") print(" ├── reporter.py - 进度报告") print(" ├── generator.py - 进度生成器") print(" ├── decorators.py - 装饰器") print(" ├── commander.py - 进度Commander") print(" └── __init__.py - 统一导入接口") print("\n🎯 使用方式:") print(" # 基础Commander") print(" from python_core.utils.commander import JSONRPCCommander") print(" # 简单Commander") print(" from python_core.utils.commander import create_simple_commander") print(" # 进度Commander") print(" from python_core.utils.progress import ProgressJSONRPCCommander") return 0 else: print("⚠️ 部分模块化测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)