#!/usr/bin/env python3 """ JSON-RPC Commander基类 """ import sys import json from abc import ABC, abstractmethod from typing import Dict, Any, List from .types import CommandConfig from .parser import ArgumentParser from ..jsonrpc import create_response_handler from ..logger import logger class JSONRPCCommander(ABC): """JSON-RPC Commander 基类""" def __init__(self, service_name: str): """ 初始化Commander Args: service_name: 服务名称 """ self.service_name = service_name self.rpc_handler = None self.rpc_progress_reporter = None self.commands: Dict[str, CommandConfig] = {} self.parser = ArgumentParser(self.commands) self._setup_rpc_handler() self._register_commands() # 重新创建parser以包含注册的命令 self.parser = ArgumentParser(self.commands) def _setup_rpc_handler(self) -> None: """设置RPC处理器""" try: self.rpc_handler = create_response_handler() logger.debug(f"JSON-RPC handler initialized for {self.service_name}") except Exception as e: logger.warning(f"Failed to initialize JSON-RPC handler: {e}") self.rpc_handler = None @abstractmethod def _register_commands(self) -> None: """注册命令 - 子类必须实现""" pass def register_command(self, name: str, description: str, required_args: List[str] = None, optional_args: Dict[str, Dict[str, Any]] = None) -> None: """ 注册命令 Args: name: 命令名称 description: 命令描述 required_args: 必需参数列表 optional_args: 可选参数配置 """ self.commands[name] = CommandConfig( name=name, description=description, required_args=required_args or [], optional_args=optional_args or {} ) def parse_arguments(self, args: List[str]) -> tuple: """ 解析命令行参数 Args: args: 命令行参数列表 Returns: (command, parsed_args) 元组 """ try: return self.parser.parse_arguments(args) except ValueError as e: self._send_error("INVALID_ARGS", str(e)) sys.exit(1) def _show_usage(self) -> None: """显示使用说明""" usage_info = { "service": self.service_name, "usage": f"python -m {self.service_name} [args...]", "commands": {} } for cmd_name, cmd_config in self.commands.items(): cmd_info = { "description": cmd_config.description, "required_args": cmd_config.required_args, "optional_args": {} } for arg_name, arg_config in cmd_config.optional_args.items(): cmd_info["optional_args"][arg_name] = { "type": arg_config.get('type', str).__name__, "default": arg_config.get('default'), "choices": arg_config.get('choices'), "description": arg_config.get('description', '') } usage_info["commands"][cmd_name] = cmd_info self._send_response(usage_info) def _send_response(self, result: Any) -> None: """发送成功响应""" if self.rpc_handler: self.rpc_handler.success(result) else: print(json.dumps(result, indent=2, ensure_ascii=False)) def _send_error(self, error_code: str, message: str) -> None: """发送错误响应""" if self.rpc_handler: self.rpc_handler.error(error_code, message) else: error_response = { "error": { "code": error_code, "message": message } } print(json.dumps(error_response, indent=2, ensure_ascii=False)) @abstractmethod def execute_command(self, command: str, args: Dict[str, Any]) -> Any: """ 执行命令 - 子类必须实现 Args: command: 命令名称 args: 解析后的参数 Returns: 命令执行结果 """ pass def run(self, argv: List[str] = None) -> None: """ 运行Commander Args: argv: 命令行参数,默认使用sys.argv[1:] """ if argv is None: argv = sys.argv[1:] if len(argv) == 0: self._show_usage() return try: # 解析参数 command, args = self.parse_arguments(argv) # 执行命令 result = self.execute_command(command, args) # 发送响应 self._send_response(result) except KeyboardInterrupt: self._send_error("INTERRUPTED", "Command interrupted by user") sys.exit(1) except Exception as e: logger.error(f"Command execution failed: {e}") self._send_error("INTERNAL_ERROR", str(e)) sys.exit(1)