This commit is contained in:
kyj@bowong.ai 2025-08-04 13:59:09 +08:00
parent cf9b44efa7
commit 6cc9aeab32
2 changed files with 75 additions and 24 deletions

View File

@ -12,6 +12,17 @@ from workflow_service.config import Settings, ComfyUIServer
settings = Settings()
# [新增] 定义一个自定义异常用于封装来自ComfyUI的执行错误
class ComfyUIExecutionError(Exception):
def __init__(self, error_data: dict):
self.error_data = error_data
# 创建一个对开发者友好的异常消息
message = (
f"ComfyUI节点执行失败。节点ID: {error_data.get('node_id')}, "
f"节点类型: {error_data.get('node_type')}. "
f"错误: {error_data.get('exception_message', 'N/A')}"
)
super().__init__(message)
async def get_server_status(server: ComfyUIServer, session: aiohttp.ClientSession) -> Dict[str, Any]:
"""
@ -101,35 +112,59 @@ async def queue_prompt(prompt: dict, client_id: str, http_url: str) -> str:
async def get_execution_results(prompt_id: str, client_id: str, ws_url: str) -> dict:
"""通过WebSocket连接到指定的ComfyUI服务器聚合执行结果。"""
"""
通过WebSocket连接到指定的ComfyUI服务器聚合执行结果
[核心改动] 新增对 'execution_error' 消息的处理
"""
full_ws_url = f"{ws_url}?clientId={client_id}"
aggregated_outputs = {}
async with websockets.connect(full_ws_url) as websocket:
while True:
try:
out = await websocket.recv()
if isinstance(out, str):
try:
async with websockets.connect(full_ws_url) as websocket:
while True:
try:
out = await websocket.recv()
if not isinstance(out, str):
continue
message = json.loads(out)
msg_type = message.get('type')
data = message.get('data')
if data and data.get('prompt_id') == prompt_id:
if msg_type == 'executed':
node_id = data.get('node')
output_data = data.get('output')
if node_id and output_data:
aggregated_outputs[node_id] = output_data
print(f"收到节点 {node_id} 的输出 (Prompt ID: {prompt_id})")
elif msg_type == 'executing' and data.get('node') is None:
print(f"Prompt ID: {prompt_id} 执行完成。")
return aggregated_outputs
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket 连接已关闭 (Prompt ID: {prompt_id})。错误: {e}")
return aggregated_outputs
except Exception as e:
print(f"处理 prompt {prompt_id} 时发生错误: {e}")
break
return aggregated_outputs
if not (data and data.get('prompt_id') == prompt_id):
continue
# [核心改动] 捕获并处理执行错误
if msg_type == 'execution_error':
print(f"ComfyUI执行错误 (Prompt ID: {prompt_id}): {data}")
# 抛出自定义异常,将错误详情传递出去
raise ComfyUIExecutionError(data)
if msg_type == 'executed':
node_id = data.get('node')
output_data = data.get('output')
if node_id and output_data:
aggregated_outputs[node_id] = output_data
print(f"收到节点 {node_id} 的输出 (Prompt ID: {prompt_id})")
elif msg_type == 'executing' and data.get('node') is None:
print(f"Prompt ID: {prompt_id} 执行完成。")
return aggregated_outputs
except websockets.exceptions.ConnectionClosed as e:
print(f"WebSocket 连接已关闭 (Prompt ID: {prompt_id})。错误: {e}")
return aggregated_outputs
except Exception as e:
# 重新抛出我们自己的异常,或者处理其他意外错误
if not isinstance(e, ComfyUIExecutionError):
print(f"处理 prompt {prompt_id} 时发生意外错误: {e}")
raise e
except websockets.exceptions.InvalidURI as e:
print(f"错误: 尝试连接的WebSocket URI无效: '{full_ws_url}'. 原始URL: '{ws_url}'. 错误: {e}")
raise e
return aggregated_outputs
async def execute_prompt_on_server(prompt: Dict, server: ComfyUIServer) -> Dict:
"""

View File

@ -15,6 +15,7 @@ from workflow_service import comfyui_client
from workflow_service import database
from workflow_service import s3_client
from workflow_service import workflow_parser
from workflow_service.comfyui_client import ComfyUIExecutionError
from workflow_service.config import Settings
settings = Settings()
@ -229,7 +230,22 @@ async def execute_workflow_endpoint(base_name: str, request_data_raw: Dict[str,
output_response[final_param_name] = output_value
return JSONResponse(content=output_response)
# [核心改动] 捕获来自ComfyUI的执行失败异常
except ComfyUIExecutionError as e:
print(f"捕获到ComfyUI执行错误: {e.error_data}")
# 返回 502 Bad Gateway 状态码,表示上游服务器出错
# detail 中包含结构化的错误信息,方便客户端处理
raise HTTPException(
status_code=500,
detail={
"message": "工作流在上游ComfyUI节点中执行失败。",
"error_details": e.error_data
}
)
except Exception as e:
# 捕获其他所有异常,作为通用的服务器内部错误
print(f"执行工作流时发生未知错误: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
if cleanup_paths:
print(f"正在清理 {len(cleanup_paths)} 个临时文件...")