From ab12c1482d0b46f40ade817b39658951b6268bd4 Mon Sep 17 00:00:00 2001 From: iHeyTang Date: Fri, 8 Aug 2025 17:39:20 +0800 Subject: [PATCH] add .gitignore, update README with new API server details, and add requirements.txt for dependencies --- .gitignore | 1 + README.md | 49 +++++++++---- .../requirements.txt => requirements.txt | 0 workflow_service/comfyui_client.py | 72 +++++++++++-------- 4 files changed, 79 insertions(+), 43 deletions(-) create mode 100644 .gitignore rename workflow_service/requirements.txt => requirements.txt (100%) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..496ee2c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index f0c0451..5d3fc3f 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,25 @@ # ComfyUI 工作流管理 + ![这是图片](https://cdn.roasmax.cn/static/publisher.png) ## 功能 + - 工作流上传 - 工作流版本控制 - 工作流加载 -- 需配置工作流服务器 (详见**WAAS(工作流即服务) Demo API服务器**) +- 需配置工作流服务器 (详见**WAAS(工作流即服务) Demo API 服务器**) +# WAAS(工作流即服务) Demo API 服务器 -# WAAS(工作流即服务) Demo API服务器 ![WAAS整体结构.png](WAAS%E6%95%B4%E4%BD%93%E7%BB%93%E6%9E%84.png) + - 路径: ./workflow_service -- 部署: ```modal deploy modal_deploy.py --name waas-demo -e dev``` -- 配置: 参考```.env```文件 +- 部署: `modal deploy modal_deploy.py --name waas-demo -e dev` +- 配置: 参考`.env`文件 - 必须按照指定规则命名 - - **输入节点名**: 前缀 **INPUT_** - - **除生成文件节点外输出节点名**: 前缀 **OUTPUT_** -- 支持输入节点: + - **输入节点名**: 前缀 **INPUT\_** + - **除生成文件节点外输出节点名**: 前缀 **OUTPUT\_** +- 支持输入节点: - comfyui-core - 加载图像 - ComfyUI-VideoHelperSuite @@ -25,8 +28,8 @@ - 整数 - 字符串 - 浮点数 -- 支持输出节点: - - 所有在output文件夹中生成文件(图片/视频)的节点 +- 支持输出节点: + - 所有在 output 文件夹中生成文件(图片/视频)的节点 - comfyui-easy-use - 展示任何 - 数据库 @@ -41,15 +44,17 @@ ) ``` - 路由 + - GET /api/workflow: 列出工作流 - POST /api/workflow: 添加工作流 - DELETE /api/workflow: 删除工作流 - GET /api/run/{base_name}: 获取工作流输入输出元数据 + ``` 输入: *base_name: 工作流名称 version: 工作流版本 - + 输出: Json { @@ -75,12 +80,14 @@ } } ``` + - POST /api/run/{base_name}: 执行工作流 + ``` 输入: *base_name: 工作流名称 version: 工作流版本 - + 输出: Json { @@ -91,4 +98,22 @@ "output\\111_00001_.png" ] } - ``` \ No newline at end of file + ``` + +# JS 脚本使用指南 + +### MacOS + +将这个脚本直接放在该目录下,重启 ComfyUI 即可 +/Applications/ComfyUI.app/Contents/Resources/ComfyUI/custom_nodes/ComfyUI-Manager/js + +### Windows + +将这个脚本直接放在该目录下,重启 ComfyUI 即可 +{Comfy 的安装目录}\comfy\ComfyUI\custom_nodes\ComfyUI-Manager\js + +### Q&A + +主要是找到 `custom_nodes/ComfyUI-Manager` 的位置,这个目录下有一个 js 目录,把 `publisher.js` 脚本直接扔到这个 js 目录下即可。 + +一些版本安装的时候,用户额外装的的 custom_nodes 和系统自带的可能还不在一个位置,比如 MacOS 装客户端版本。用户额外装的是在 Documents 下,但 ComfyUI-Manager 是在对应的安装包目录里,得去 `/Applications` 里找。Windows 可能也会有类似的逻辑 diff --git a/workflow_service/requirements.txt b/requirements.txt similarity index 100% rename from workflow_service/requirements.txt rename to requirements.txt diff --git a/workflow_service/comfyui_client.py b/workflow_service/comfyui_client.py index 787a9d4..01a5566 100644 --- a/workflow_service/comfyui_client.py +++ b/workflow_service/comfyui_client.py @@ -12,6 +12,7 @@ from workflow_service.config import Settings, ComfyUIServer settings = Settings() + # [新增] 定义一个自定义异常,用于封装来自ComfyUI的执行错误 class ComfyUIExecutionError(Exception): def __init__(self, error_data: dict): @@ -24,7 +25,10 @@ class ComfyUIExecutionError(Exception): ) super().__init__(message) -async def get_server_status(server: ComfyUIServer, session: aiohttp.ClientSession) -> Dict[str, Any]: + +async def get_server_status( + server: ComfyUIServer, session: aiohttp.ClientSession +) -> Dict[str, Any]: """ 检查单个ComfyUI服务器的详细状态。 返回一个包含可达性、队列状态和详细队列内容的字典。 @@ -33,10 +37,7 @@ async def get_server_status(server: ComfyUIServer, session: aiohttp.ClientSessio status_info = { "is_reachable": False, "is_free": False, - "queue_details": { - "running_count": 0, - "pending_count": 0 - } + "queue_details": {"running_count": 0, "pending_count": 0}, } try: queue_url = f"{server.http_url}/queue" @@ -51,10 +52,10 @@ async def get_server_status(server: ComfyUIServer, session: aiohttp.ClientSessio status_info["queue_details"] = { "running_count": running_count, - "pending_count": pending_count + "pending_count": pending_count, } - status_info["is_free"] = (running_count == 0 and pending_count == 0) + status_info["is_free"] = running_count == 0 and pending_count == 0 except Exception as e: # 当请求失败时,将返回上面定义的、结构正确的初始 status_info @@ -82,21 +83,38 @@ async def select_server_for_execution() -> ComfyUIServer: if free_servers: selected_server = random.choice(free_servers) - print(f"发现 {len(free_servers)} 个空闲服务器。已选择: {selected_server.http_url}") + print( + f"发现 {len(free_servers)} 个空闲服务器。已选择: {selected_server.http_url}" + ) return selected_server else: # 后备方案:选择一个可达的服务器,即使它很忙 - reachable_servers = [servers[i] for i, status in enumerate(results) if status["is_reachable"]] + reachable_servers = [ + servers[i] for i, status in enumerate(results) if status["is_reachable"] + ] if reachable_servers: selected_server = random.choice(reachable_servers) - print(f"所有服务器当前都在忙。从可达服务器中随机选择: {selected_server.http_url}") + print( + f"所有服务器当前都在忙。从可达服务器中随机选择: {selected_server.http_url}" + ) return selected_server else: # 最坏情况:所有服务器都不可达,抛出异常 raise ConnectionError("所有配置的ComfyUI服务器都不可达。") -async def queue_prompt(prompt: dict, client_id: str, http_url: str) -> str: +async def execute_prompt_on_server(prompt: Dict, server: ComfyUIServer) -> Dict: + """ + 在指定的服务器上执行一个准备好的prompt。 + """ + client_id = str(uuid.uuid4()) + prompt_id = await _queue_prompt(prompt, client_id, server.http_url) + print(f"工作流已在 {server.http_url} 上入队,Prompt ID: {prompt_id}") + results = await _get_execution_results(prompt_id, client_id, server.ws_url) + return results + + +async def _queue_prompt(prompt: dict, client_id: str, http_url: str) -> str: """通过HTTP POST将工作流任务提交到指定的ComfyUI服务器。""" for node_id in prompt: prompt[node_id]["inputs"][f"cache_buster_{uuid.uuid4().hex}"] = random.random() @@ -111,7 +129,7 @@ async def queue_prompt(prompt: dict, client_id: str, http_url: str) -> str: return result["prompt_id"] -async def get_execution_results(prompt_id: str, client_id: str, ws_url: str) -> dict: +async def _get_execution_results(prompt_id: str, client_id: str, ws_url: str) -> dict: """ 通过WebSocket连接到指定的ComfyUI服务器,聚合执行结果。 [核心改动] 新增对 'execution_error' 消息的处理。 @@ -128,26 +146,26 @@ async def get_execution_results(prompt_id: str, client_id: str, ws_url: str) -> continue message = json.loads(out) - msg_type = message.get('type') - data = message.get('data') + msg_type = message.get("type") + data = message.get("data") - if not (data and data.get('prompt_id') == prompt_id): + if not (data and data.get("prompt_id") == prompt_id): continue # [核心改动] 捕获并处理执行错误 - if msg_type == 'execution_error': + if msg_type == "execution_error": print(f"ComfyUI执行错误 (Prompt ID: {prompt_id}): {data}") # 抛出自定义异常,将错误详情传递出去 raise ComfyUIExecutionError(data) - if msg_type == 'executed': - node_id = data.get('node') - output_data = data.get('output') + if msg_type == "executed": + node_id = data.get("node") + output_data = data.get("output") if node_id and output_data: aggregated_outputs[node_id] = output_data print(f"收到节点 {node_id} 的输出 (Prompt ID: {prompt_id})") - elif msg_type == 'executing' and data.get('node') is None: + elif msg_type == "executing" and data.get("node") is None: print(f"Prompt ID: {prompt_id} 执行完成。") return aggregated_outputs @@ -161,17 +179,9 @@ async def get_execution_results(prompt_id: str, client_id: str, ws_url: str) -> raise e except websockets.exceptions.InvalidURI as e: - print(f"错误: 尝试连接的WebSocket URI无效: '{full_ws_url}'. 原始URL: '{ws_url}'. 错误: {e}") + print( + f"错误: 尝试连接的WebSocket URI无效: '{full_ws_url}'. 原始URL: '{ws_url}'. 错误: {e}" + ) raise e return aggregated_outputs - -async def execute_prompt_on_server(prompt: Dict, server: ComfyUIServer) -> Dict: - """ - 在指定的服务器上执行一个准备好的prompt。 - """ - client_id = str(uuid.uuid4()) - prompt_id = await queue_prompt(prompt, client_id, server.http_url) - print(f"工作流已在 {server.http_url} 上入队,Prompt ID: {prompt_id}") - results = await get_execution_results(prompt_id, client_id, server.ws_url) - return results \ No newline at end of file