From f6f07a094f7da9f40391f93fdba74257d9ef0d35 Mon Sep 17 00:00:00 2001 From: iHeyTang Date: Thu, 14 Aug 2025 13:56:39 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=A7=BB=E9=99=A4=E6=97=A7?= =?UTF-8?q?=E7=9A=84=E6=9C=8D=E5=8A=A1=E5=99=A8=E9=80=89=E6=8B=A9=E5=92=8C?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E8=8E=B7=E5=8F=96=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E7=AE=80=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E5=8D=87=E5=8F=AF=E7=BB=B4=E6=8A=A4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- workflow_service/comfyui_client.py | 107 ----------------------------- 1 file changed, 107 deletions(-) diff --git a/workflow_service/comfyui_client.py b/workflow_service/comfyui_client.py index 24a0d1b..14ae034 100644 --- a/workflow_service/comfyui_client.py +++ b/workflow_service/comfyui_client.py @@ -254,45 +254,6 @@ async def get_server_status( return status_info -async def select_server_for_execution() -> ComfyUIServer: - """ - 智能选择一个ComfyUI服务器。 - 优先选择一个空闲的服务器,如果所有服务器都忙,则随机选择一个。 - """ - servers = settings.SERVERS - if not servers: - raise ValueError("没有在 COMFYUI_SERVERS_JSON 中配置任何服务器。") - if len(servers) == 1: - return servers[0] - - async with aiohttp.ClientSession() as session: - tasks = [get_server_status(server, session) for server in servers] - results = await asyncio.gather(*tasks) - - free_servers = [servers[i] for i, status in enumerate(results) if status["is_free"]] - - if free_servers: - selected_server = random.choice(free_servers) - logger.info( - f"发现 {len(free_servers)} 个空闲服务器。已选择: {selected_server.http_url}" - ) - return selected_server - else: - # 后备方案:选择一个可达的服务器,即使它很忙 - reachable_servers = [ - servers[i] for i, status in enumerate(results) if status["is_reachable"] - ] - if reachable_servers: - selected_server = random.choice(reachable_servers) - logger.info( - f"所有服务器当前都在忙。从可达服务器中随机选择: {selected_server.http_url}" - ) - return selected_server - else: - # 最坏情况:所有服务器都不可达,抛出异常 - raise ConnectionError("所有配置的ComfyUI服务器都不可达。") - - async def execute_prompt_on_server( workflow_data: Dict, api_spec: Dict, @@ -862,74 +823,6 @@ async def _get_execution_results( return aggregated_outputs -async def _get_execution_results_legacy( - prompt_id: str, client_id: str, ws_url: str -) -> dict: - """ - 简化版的执行结果获取函数,不包含数据库状态跟踪。 - """ - full_ws_url = f"{ws_url}?clientId={client_id}" - aggregated_outputs = {} - - try: - async with websockets.connect(full_ws_url) as websocket: - while True: - try: - out = await websocket.recv() - if not isinstance(out, str): - continue - - message = json.loads(out) - msg_type = message.get("type") - data = message.get("data") - - if not (data and data.get("prompt_id") == prompt_id): - continue - - # 捕获并处理执行错误 - if msg_type == "execution_error": - error_data = data - logger.error( - f"ComfyUI执行错误 (Prompt ID: {prompt_id}): {error_data}" - ) - # 抛出自定义异常,将错误详情传递出去 - raise ComfyUIExecutionError(error_data) - - # 处理节点执行完成 - if msg_type == "executed": - node_id = data.get("node") - output_data = data.get("output") - if node_id and output_data: - aggregated_outputs[node_id] = output_data - logger.info( - f"收到节点 {node_id} 的输出 (Prompt ID: {prompt_id})" - ) - - # 处理整个工作流执行完成 - elif msg_type == "executing" and data.get("node") is None: - logger.info(f"Prompt ID: {prompt_id} 执行完成。") - return aggregated_outputs - - except websockets.exceptions.ConnectionClosed as e: - logger.warning( - f"WebSocket 连接已关闭 (Prompt ID: {prompt_id})。错误: {e}" - ) - return aggregated_outputs - except Exception as e: - # 重新抛出我们自己的异常,或者处理其他意外错误 - if not isinstance(e, ComfyUIExecutionError): - logger.error(f"处理 prompt {prompt_id} 时发生意外错误: {e}") - raise e - - except websockets.exceptions.InvalidURI as e: - logger.error( - f"错误: 尝试连接的WebSocket URI无效: '{full_ws_url}'. 原始URL: '{ws_url}'. 错误: {e}" - ) - raise e - - return aggregated_outputs - - async def upload_image_to_comfy(file_path: str, server: ComfyUIServer) -> str: """ 上传文件到服务器。