#!/usr/bin/env python3 """ 带进度的JSON-RPC Commander """ import time from abc import abstractmethod from typing import Dict, Any, Callable from contextlib import contextmanager from .types import ProgressInfo, TaskResult from .task import ProgressiveTask from .reporter import ProgressReporter from ..commander import JSONRPCCommander from ..logger import logger class ProgressJSONRPCCommander(JSONRPCCommander): """带进度条的JSON-RPC Commander基类""" def __init__(self, service_name: str): super().__init__(service_name) self.progress_reporter = ProgressReporter(service_name) @contextmanager def create_task(self, task_name: str, total_steps: int = 100): """创建带进度的任务上下文""" task = ProgressiveTask(task_name, total_steps) task.set_progress_callback(self.progress_reporter.report_progress) try: task.start() yield task task.finish() except Exception as e: task._report_progress(f"任务失败: {str(e)}") raise def execute_progressive_command(self, command: str, args: Dict[str, Any]) -> TaskResult: """ 执行带进度的命令 Args: command: 命令名称 args: 命令参数 Returns: 任务结果 """ start_time = time.time() try: # 调用子类实现的进度命令执行 result = self._execute_with_progress(command, args) total_time = time.time() - start_time return TaskResult( success=True, result=result, total_time=total_time ) except Exception as e: total_time = time.time() - start_time logger.error(f"Progressive command failed: {e}") return TaskResult( success=False, error=str(e), total_time=total_time ) @abstractmethod def _execute_with_progress(self, command: str, args: Dict[str, Any]) -> Any: """ 执行带进度的命令 - 子类必须实现 Args: command: 命令名称 args: 命令参数 Returns: 命令执行结果 """ pass def execute_command(self, command: str, args: Dict[str, Any]) -> Any: """ 执行命令(重写基类方法以支持进度) Args: command: 命令名称 args: 命令参数 Returns: 命令执行结果 """ # 先进行参数类型转换 converted_args = self._convert_args_types(command, args) # 检查是否是需要进度报告的命令 if self._is_progressive_command(command): task_result = self.execute_progressive_command(command, converted_args) if task_result.success: return task_result.result else: raise Exception(task_result.error) else: # 普通命令,调用子类实现 return self._execute_simple_command(command, converted_args) def _convert_args_types(self, command: str, args: Dict[str, Any]) -> Dict[str, Any]: """ 转换参数类型 Args: command: 命令名称 args: 原始参数 Returns: 转换后的参数 """ if command not in self.commands: return args command_config = self.commands[command] converted_args = args.copy() # 转换可选参数的类型 for arg_name, arg_config in command_config.optional_args.items(): if arg_name in converted_args: arg_type = arg_config.get('type', str) try: if arg_type == bool: # 布尔类型特殊处理 value = converted_args[arg_name] if isinstance(value, str): converted_args[arg_name] = value.lower() in ('true', '1', 'yes', 'on') elif arg_type != str and isinstance(converted_args[arg_name], str): # 其他类型从字符串转换 converted_args[arg_name] = arg_type(converted_args[arg_name]) except (ValueError, TypeError) as e: logger.warning(f"Failed to convert argument {arg_name}: {e}") return converted_args def _is_progressive_command(self, command: str) -> bool: """ 判断是否是需要进度报告的命令 子类可以重写此方法来指定哪些命令需要进度报告 Args: command: 命令名称 Returns: 是否需要进度报告 """ # 默认所有命令都需要进度报告 return True def _execute_simple_command(self, command: str, args: Dict[str, Any]) -> Any: """ 执行简单命令(不需要进度报告) 子类可以重写此方法来处理不需要进度的命令 Args: command: 命令名称 args: 命令参数 Returns: 命令执行结果 """ # 默认调用带进度的执行方法 return self._execute_with_progress(command, args) # 便捷函数 def create_progress_commander(service_name: str): """创建带进度的JSON-RPC Commander""" class SimpleProgressCommander(ProgressJSONRPCCommander): def __init__(self): super().__init__(service_name) self.command_handlers: Dict[str, Callable] = {} self._progressive_commands = set() def _register_commands(self): pass def add_command(self, name: str, handler: Callable, description: str, required_args: list = None, optional_args: dict = None, progressive: bool = True): """添加命令""" self.register_command(name, description, required_args, optional_args) self.command_handlers[name] = handler # 标记是否需要进度报告 if progressive: self._progressive_commands.add(name) def _is_progressive_command(self, command: str) -> bool: return command in self._progressive_commands def _execute_with_progress(self, command: str, args: Dict[str, Any]) -> Any: if command in self.command_handlers: return self.command_handlers[command](**args) else: raise ValueError(f"No handler for command: {command}") def _execute_simple_command(self, command: str, args: Dict[str, Any]) -> Any: if command in self.command_handlers: return self.command_handlers[command](**args) else: raise ValueError(f"No handler for command: {command}") return SimpleProgressCommander()