diff --git a/workflow_service/comfyui_client.py b/workflow_service/comfyui_client.py index 8d926d4..787a9d4 100644 --- a/workflow_service/comfyui_client.py +++ b/workflow_service/comfyui_client.py @@ -12,6 +12,17 @@ from workflow_service.config import Settings, ComfyUIServer settings = Settings() +# [新增] 定义一个自定义异常,用于封装来自ComfyUI的执行错误 +class ComfyUIExecutionError(Exception): + def __init__(self, error_data: dict): + self.error_data = error_data + # 创建一个对开发者友好的异常消息 + message = ( + f"ComfyUI节点执行失败。节点ID: {error_data.get('node_id')}, " + f"节点类型: {error_data.get('node_type')}. " + f"错误: {error_data.get('exception_message', 'N/A')}" + ) + super().__init__(message) async def get_server_status(server: ComfyUIServer, session: aiohttp.ClientSession) -> Dict[str, Any]: """ @@ -101,35 +112,59 @@ async def queue_prompt(prompt: dict, client_id: str, http_url: str) -> str: async def get_execution_results(prompt_id: str, client_id: str, ws_url: str) -> dict: - """通过WebSocket连接到指定的ComfyUI服务器,聚合执行结果。""" + """ + 通过WebSocket连接到指定的ComfyUI服务器,聚合执行结果。 + [核心改动] 新增对 'execution_error' 消息的处理。 + """ full_ws_url = f"{ws_url}?clientId={client_id}" aggregated_outputs = {} - async with websockets.connect(full_ws_url) as websocket: - while True: - try: - out = await websocket.recv() - if isinstance(out, str): + + try: + async with websockets.connect(full_ws_url) as websocket: + while True: + try: + out = await websocket.recv() + if not isinstance(out, str): + continue + message = json.loads(out) msg_type = message.get('type') data = message.get('data') - if data and data.get('prompt_id') == prompt_id: - if msg_type == 'executed': - node_id = data.get('node') - output_data = data.get('output') - if node_id and output_data: - aggregated_outputs[node_id] = output_data - print(f"收到节点 {node_id} 的输出 (Prompt ID: {prompt_id})") - elif msg_type == 'executing' and data.get('node') is None: - print(f"Prompt ID: {prompt_id} 执行完成。") - return aggregated_outputs - except websockets.exceptions.ConnectionClosed as e: - print(f"WebSocket 连接已关闭 (Prompt ID: {prompt_id})。错误: {e}") - return aggregated_outputs - except Exception as e: - print(f"处理 prompt {prompt_id} 时发生错误: {e}") - break - return aggregated_outputs + if not (data and data.get('prompt_id') == prompt_id): + continue + + # [核心改动] 捕获并处理执行错误 + if msg_type == 'execution_error': + print(f"ComfyUI执行错误 (Prompt ID: {prompt_id}): {data}") + # 抛出自定义异常,将错误详情传递出去 + raise ComfyUIExecutionError(data) + + if msg_type == 'executed': + node_id = data.get('node') + output_data = data.get('output') + if node_id and output_data: + aggregated_outputs[node_id] = output_data + print(f"收到节点 {node_id} 的输出 (Prompt ID: {prompt_id})") + + elif msg_type == 'executing' and data.get('node') is None: + print(f"Prompt ID: {prompt_id} 执行完成。") + return aggregated_outputs + + except websockets.exceptions.ConnectionClosed as e: + print(f"WebSocket 连接已关闭 (Prompt ID: {prompt_id})。错误: {e}") + return aggregated_outputs + except Exception as e: + # 重新抛出我们自己的异常,或者处理其他意外错误 + if not isinstance(e, ComfyUIExecutionError): + print(f"处理 prompt {prompt_id} 时发生意外错误: {e}") + raise e + + except websockets.exceptions.InvalidURI as e: + print(f"错误: 尝试连接的WebSocket URI无效: '{full_ws_url}'. 原始URL: '{ws_url}'. 错误: {e}") + raise e + + return aggregated_outputs async def execute_prompt_on_server(prompt: Dict, server: ComfyUIServer) -> Dict: """ diff --git a/workflow_service/main.py b/workflow_service/main.py index 0b3fdeb..93ba7c2 100644 --- a/workflow_service/main.py +++ b/workflow_service/main.py @@ -15,6 +15,7 @@ from workflow_service import comfyui_client from workflow_service import database from workflow_service import s3_client from workflow_service import workflow_parser +from workflow_service.comfyui_client import ComfyUIExecutionError from workflow_service.config import Settings settings = Settings() @@ -229,7 +230,22 @@ async def execute_workflow_endpoint(base_name: str, request_data_raw: Dict[str, output_response[final_param_name] = output_value return JSONResponse(content=output_response) - + # [核心改动] 捕获来自ComfyUI的执行失败异常 + except ComfyUIExecutionError as e: + print(f"捕获到ComfyUI执行错误: {e.error_data}") + # 返回 502 Bad Gateway 状态码,表示上游服务器出错 + # detail 中包含结构化的错误信息,方便客户端处理 + raise HTTPException( + status_code=500, + detail={ + "message": "工作流在上游ComfyUI节点中执行失败。", + "error_details": e.error_data + } + ) + except Exception as e: + # 捕获其他所有异常,作为通用的服务器内部错误 + print(f"执行工作流时发生未知错误: {e}") + raise HTTPException(status_code=500, detail=str(e)) finally: if cleanup_paths: print(f"正在清理 {len(cleanup_paths)} 个临时文件...")