#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ JSON-RPC Communication Module JSON-RPC 通信模块 Provides standardized JSON-RPC 2.0 communication for Tauri-Python integration. """ import json import sys import time from typing import Any, Dict, Optional, Union class JSONRPCResponse: """JSON-RPC 2.0 Response handler""" def __init__(self, request_id: Optional[Union[str, int]] = None): self.request_id = request_id def success(self, result: Any) -> None: """Send successful response""" response = { "jsonrpc": "2.0", "id": self.request_id, "result": result } self._send_response(response) def error(self, code: int, message: str, data: Any = None) -> None: """Send error response""" error_obj = { "code": code, "message": message } if data is not None: error_obj["data"] = data response = { "jsonrpc": "2.0", "id": self.request_id, "error": error_obj } self._send_response(response) def notification(self, method: str, params: Any = None) -> None: """Send notification (no response expected)""" notification = { "jsonrpc": "2.0", "method": method, "params": params } self._send_response(notification) def _send_response(self, response: Dict[str, Any]) -> None: """Send JSON-RPC response to stdout""" try: json_str = json.dumps(response, ensure_ascii=False, separators=(',', ':')) print(f"JSONRPC:{json_str}") sys.stdout.flush() except Exception as e: # Fallback error response fallback = { "jsonrpc": "2.0", "id": self.request_id, "error": { "code": -32603, "message": "Internal error", "data": str(e) } } print(f"JSONRPC:{json.dumps(fallback)}") sys.stdout.flush() class ProgressReporter: """Progress reporting using JSON-RPC notifications""" def __init__(self): self.rpc = JSONRPCResponse() def report(self, step: str, progress: float, message: str, details: Dict[str, Any] = None) -> None: """Report progress using JSON-RPC notification""" params = { "step": step, "progress": progress, # 0.0 to 1.0 "message": message, "timestamp": time.time() } if details: params["details"] = details self.rpc.notification("progress", params) def step(self, step_name: str, message: str) -> None: """Report a step without specific progress""" self.report(step_name, -1, message) def complete(self, message: str = "完成") -> None: """Report completion""" self.report("complete", 1.0, message) def update(self, step: str, progress: float, message: str, details: Dict[str, Any] = None) -> None: self.report(step=step, progress=progress,message=message,details=details) def error(self, message: str, error_details: Dict[str, Any] = None) -> None: """Report error""" self.report("error", -1, message, error_details) # Error codes following JSON-RPC 2.0 specification class JSONRPCError: PARSE_ERROR = -32700 INVALID_REQUEST = -32600 METHOD_NOT_FOUND = -32601 INVALID_PARAMS = -32602 INTERNAL_ERROR = -32603 # Custom application errors (start from -32000) FILE_NOT_FOUND = -32001 UPLOAD_FAILED = -32002 GENERATION_FAILED = -32003 DOWNLOAD_FAILED = -32004 TIMEOUT_ERROR = -32005 def create_response_handler(request_id: Optional[Union[str, int]] = None) -> JSONRPCResponse: """Create a JSON-RPC response handler""" return JSONRPCResponse(request_id) def create_progress_reporter() -> ProgressReporter: """Create a progress reporter""" return ProgressReporter() def parse_request(request_str: str) -> Dict[str, Any]: """Parse JSON-RPC request string""" try: request = json.loads(request_str) if not isinstance(request, dict): raise ValueError("Request must be a JSON object") # Validate JSON-RPC 2.0 format if request.get("jsonrpc") != "2.0": raise ValueError("Invalid JSON-RPC version") if "method" not in request: raise ValueError("Missing method field") return request except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON: {e}") # Example usage functions def example_video_generation(): """Example of how to use JSON-RPC for video generation""" rpc = create_response_handler("video_gen_001") progress = create_progress_reporter() try: # Report progress steps progress.step("upload", "[1/4] 正在上传图片到云存储...") # ... upload logic ... progress.step("submit", "[2/4] 正在提交视频生成任务...") # ... submit logic ... progress.step("wait", "[3/4] 正在等待视频生成完成...") # ... wait logic ... progress.step("download", "[4/4] 正在下载视频到本地...") # ... download logic ... progress.complete("[完成] 视频生成并下载成功") # Send final result result = { "status": True, "video_path": "/path/to/video.mp4", "video_url": "https://example.com/video.mp4", "message": "视频生成并下载成功" } rpc.success(result) except Exception as e: progress.error(f"生成失败: {str(e)}") rpc.error(JSONRPCError.GENERATION_FAILED, "Video generation failed", str(e)) if __name__ == "__main__": # Test the JSON-RPC module example_video_generation()