#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Enhanced JSON-RPC Communication Module 增强版 JSON-RPC 通信模块 Uses json-rpc library for robust JSON-RPC 2.0 communication. """ import json import sys import time import uuid import asyncio from typing import Any, Dict, Optional, Union, Callable, List from dataclasses import dataclass, asdict from enum import Enum # json-rpc library imports from jsonrpc import JSONRPCResponseManager, dispatcher from jsonrpc.exceptions import JSONRPCError, JSONRPCInvalidRequest, JSONRPCMethodNotFound from jsonrpc.jsonrpc2 import JSONRPC20Request, JSONRPC20Response class ProgressLevel(str, Enum): """进度级别""" INFO = "info" SUCCESS = "success" WARNING = "warning" ERROR = "error" DEBUG = "debug" @dataclass class ProgressUpdate: """进度更新数据结构""" step: str type: str progress: int message: str timestamp: float level: ProgressLevel = ProgressLevel.INFO data: Optional[Dict[str, Any]] = None class EnhancedJSONRPCResponse: """增强版 JSON-RPC 2.0 Response handler""" def __init__(self, request_id: Optional[Union[str, int]] = None): # 如果没有提供request_id,生成一个UUID if request_id is None: self.request_id = str(uuid.uuid4()) else: self.request_id = request_id def success(self, result: Any) -> None: """发送成功响应""" response = JSONRPC20Response(result=result, _id=self.request_id) self._send_response(response.data) def error(self, code: int, message: str, data: Any = None) -> None: """发送错误响应""" error = JSONRPCError(code=code, message=message, data=data) response = JSONRPC20Response(error=error, _id=self.request_id) self._send_response(response.data) def progress(self, step: str, progress: int, message: str, level: ProgressLevel = ProgressLevel.INFO, data: Optional[Dict[str, Any]] = None) -> None: """发送进度通知""" progress_data = ProgressUpdate( step=step, type="progress", progress=progress, message=message, timestamp=time.time(), level=level, data=data ) self.notification(asdict(progress_data)) def notification(self, params: Any = None) -> None: """发送通知(无需响应)""" response = JSONRPC20Response(result=params, _id=self.request_id) self._send_response(response.data) def _send_response(self, response: Dict[str, Any]) -> None: """发送响应到标准输出""" json_str = json.dumps(response, ensure_ascii=False, separators=(',', ':')) print(f"JSONRPC:{json_str}", file=sys.stdout, flush=True) class EnhancedProgressReporter: """增强版进度报告器""" def __init__(self, total: int = 0): self.response = EnhancedJSONRPCResponse() self.step = 0 self.total = total def update(self, message: str, step: Optional[int] = None, data: Optional[Dict[str, Any]] = None) -> None: """更新进度""" if step is not None: self.step = step else: self.step += 1 progress = int((self.step / self.total * 100)) if self.total > 0 else -1 self.response.progress("update", progress, message, ProgressLevel.INFO, data) def info(self, message: str, data: Optional[Dict[str, Any]] = None) -> None: """信息消息""" self.response.progress("info", -1, message, ProgressLevel.INFO, data) def success(self, message: str, data: Optional[Dict[str, Any]] = None) -> None: """成功消息""" self.response.progress("success", -1, message, ProgressLevel.SUCCESS, data) def warning(self, message: str, data: Optional[Dict[str, Any]] = None) -> None: """警告消息""" self.response.progress("warning", -1, message, ProgressLevel.WARNING, data) def error(self, message: str, data: Optional[Dict[str, Any]] = None) -> None: """错误消息""" self.response.progress("error", -1, message, ProgressLevel.ERROR, data) def debug(self, message: str, data: Optional[Dict[str, Any]] = None) -> None: """调试消息""" self.response.progress("debug", -1, message, ProgressLevel.DEBUG, data) class JSONRPCMethodRegistry: """JSON-RPC 方法注册器""" def __init__(self): self.methods: Dict[str, Callable] = {} self.dispatcher = dispatcher def register(self, name: Optional[str] = None): """注册方法装饰器""" def decorator(func: Callable): method_name = name or func.__name__ self.methods[method_name] = func self.dispatcher.add_method(func, method_name) return func return decorator def register_function(self, func: Callable, name: Optional[str] = None) -> None: """直接注册函数""" method_name = name or func.__name__ self.methods[method_name] = func self.dispatcher.add_method(func, method_name) def handle_request(self, request_data: str) -> Optional[str]: """处理JSON-RPC请求""" try: # 解析请求 request_json = json.loads(request_data) method_name = request_json.get("method") params = request_json.get("params", {}) request_id = request_json.get("id") # 检查方法是否存在 if method_name not in self.methods: error_response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32601, "message": f"Method '{method_name}' not found" } } return json.dumps(error_response, ensure_ascii=False, separators=(',', ':')) try: # 调用方法 if isinstance(params, dict): result = self.methods[method_name](**params) elif isinstance(params, list): result = self.methods[method_name](*params) else: result = self.methods[method_name](params) # 如果方法返回None,表示响应已经异步发送,不需要额外响应 if result is None: return None # 正常响应 success_response = { "jsonrpc": "2.0", "id": request_id, "result": result } return json.dumps(success_response, ensure_ascii=False, separators=(',', ':')) except Exception as e: error_response = { "jsonrpc": "2.0", "id": request_id, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } return json.dumps(error_response, ensure_ascii=False, separators=(',', ':')) except Exception as e: error_response = { "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": f"Parse error: {str(e)}" } } return json.dumps(error_response, ensure_ascii=False, separators=(',', ':')) # 全局实例 enhanced_progress_reporter = EnhancedProgressReporter() method_registry = JSONRPCMethodRegistry() # 便捷函数 def create_response_handler(request_id: Optional[Union[str, int]] = None) -> EnhancedJSONRPCResponse: """创建响应处理器""" return EnhancedJSONRPCResponse(request_id) def create_progress_reporter(total: int = 0) -> EnhancedProgressReporter: """创建进度报告器""" return EnhancedProgressReporter(total) def register_method(name: Optional[str] = None): """注册JSON-RPC方法""" return method_registry.register(name) def handle_jsonrpc_request(request_data: str) -> str: """处理JSON-RPC请求""" return method_registry.handle_request(request_data) # 向后兼容的别名 JSONRPCResponse = EnhancedJSONRPCResponse ProgressReporter = EnhancedProgressReporter