import os import uuid from io import BytesIO import loguru import numpy as np import requests import torch from PIL import Image import folder_paths # 定义节点类 class LoadImgOptional: # 定义节点输入类型 @classmethod def INPUT_TYPES(cls): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] files = folder_paths.filter_files_content_types(files, ["image"]) return { "optional": { "image_url": ("STRING", { "default": "https://example.com/sample.jpg", "multiline": False }), "image": (sorted(files), {"image_upload": True}) } } # 定义节点输出类型 RETURN_TYPES = ("IMAGE",) # 返回图像数据 RETURN_NAMES = ("image",) # 命名返回值 FUNCTION = "load_image_task" # 函数标识符,方便注册多个节点功能 OUTPUT_NODE = False # 不允许该节点直接作为最终输出节点 CATEGORY = "不忘科技-自定义节点🚩/图片" # 节点所属类别(在 ComfyUI 界面中分类) def load_image_task(self, image_url, image): try: if not image_url or len(image_url.strip()) == 0 or image_url == "https://example.com/sample.jpg": loguru.logger.info("读取本地文件") image_path = folder_paths.get_annotated_filepath(image) with open(image_path, "rb") as image_file: image = image_file.read() else: loguru.logger.info("读取线上文件") response = requests.get(image_url) response.raise_for_status() image = response.content image = Image.open(BytesIO(image)).convert("RGB") # 按照官方格式转换图像数据 image_array = np.array(image).astype(np.float32) / 255.0 image_tensor = torch.from_numpy(image_array).unsqueeze(0) return (image_tensor,) except Exception as e: raise Exception(f"Error loading image: {e}") class SaveImagePath: @classmethod def INPUT_TYPES(s): return { "required": { "image_path": ("IMAGE", {"forceInput": True}), } } RETURN_TYPES = ("STRING",) FUNCTION = "load" CATEGORY = "不忘科技-自定义节点🚩/图片" def load(self, image_path): # 如果是torch.Tensor类型,转换为numpy数组 if isinstance(image_path, torch.Tensor): image_path = image_path.cpu().numpy() # 去除多余的维度,如果形状是(1, 1, height, width, channels)或(1, height, width, channels)等情况 while len(image_path.shape) > 3: image_path = image_path.squeeze(0) # 如果是通道优先格式 (C, H, W),转换为通道最后格式 (H, W, C) if len(image_path.shape) == 3 and image_path.shape[0] <= 4: image_path = np.transpose(image_path, (1, 2, 0)) # 如果是单通道图像,转换为3通道 if len(image_path.shape) == 2: image_path = np.stack([image_path] * 3, axis=-1) # 数据范围和类型转换 - 这是关键修复 if image_path.dtype == np.float32 or image_path.dtype == np.float64: # ComfyUI图像数据通常是0-1范围的浮点数 if image_path.max() <= 1.0: # 从0-1范围转换到0-255范围 image_path = (image_path * 255.0).astype(np.uint8) else: # 如果已经是0-255范围,直接转换类型 image_path = np.clip(image_path, 0, 255).astype(np.uint8) elif image_path.dtype != np.uint8: # 其他数据类型,确保在0-255范围内 image_path = np.clip(image_path, 0, 255).astype(np.uint8) pil_image = Image.fromarray(image_path) output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output") # base_dir = os.path.dirname(os.path.abspath(__file__)) # output_dir = '/root/comfy/ComfyUI/output' if not os.path.exists(output_dir): os.makedirs(output_dir) file_name = "%s.png" % str(uuid.uuid4()) p = os.path.join(output_dir, file_name) pil_image.save(p) return (p,) class SaveImageWithOutput: """保存图片并输出的节点:既保存图片又能继续传递数据""" def __init__(self): self.output_dir = "output" self.type = "output" self.prefix_append = "" @classmethod def INPUT_TYPES(cls): return { "required": { "images": ("IMAGE",), "filename_prefix": ("STRING", {"default": "ComfyUI"}), }, } RETURN_TYPES = ("IMAGE", "STRING") RETURN_NAMES = ("images", "file_path") FUNCTION = "save_image" OUTPUT_NODE = False CATEGORY = "不忘科技-自定义节点🚩/图片" def save_image(self, images, filename_prefix="ComfyUI"): filename_prefix += self.prefix_append full_output_folder, filename, counter, subfolder, filename_prefix = self.get_save_image_path(filename_prefix, self.output_dir) file_paths = [] for i, image in enumerate(images): # 转换图片数据 img_array = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8)) # 生成文件名 file = f"{filename_prefix}_{counter + i:05}_.png" file_path = os.path.join(full_output_folder, file) # 保存图片 img.save(file_path, compress_level=4) file_paths.append(file_path) print(f"✅ 图片已保存到: {file_paths[0] if file_paths else '未知路径'}") # 直接返回元组:(原始图片数据, 第一个文件路径) return (images, file_paths[0] if file_paths else "") def get_save_image_path(self, filename_prefix, output_dir): def map_filename(filename): prefix_len = len(os.path.basename(filename_prefix)) prefix = filename[:prefix_len + 1] try: digits = int(filename[prefix_len + 1:].split('_')[0]) except: digits = 0 return (digits, prefix) subfolder = "" full_output_folder = os.path.join(output_dir, subfolder) if os.path.commonpath((os.path.abspath(output_dir), os.path.abspath(full_output_folder))) != os.path.abspath(output_dir): print("Saving image outside the output folder is not allowed.") return {} try: counter = max(filter(lambda a: a[1][:-1] == filename_prefix and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1 except ValueError: counter = 1 except FileNotFoundError: os.makedirs(full_output_folder, exist_ok=True) counter = 1 return full_output_folder, filename_prefix, counter, subfolder, filename_prefix class SaveImageAnywhere: """保存图片并输出的节点:既保存图片又能继续传递数据""" @classmethod def INPUT_TYPES(cls): return { "required": { "images": ("IMAGE",), "filename_prefix": ("STRING", {"default": "ComfyUI"}), "output_dir": ("STRING", {"default": "output"}), }, } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("file_path",) FUNCTION = "save_image" OUTPUT_NODE = True CATEGORY = "不忘科技-自定义节点🚩/图片" def save_image(self, images, filename_prefix="ComfyUI", output_dir="output"): full_output_folder, filename, counter, subfolder, filename_prefix = self.get_save_image_path( filename_prefix,output_dir) file_paths = [] for i, image in enumerate(images): # 转换图片数据 img_array = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8)) # 生成文件名 file = f"{filename_prefix}_{counter + i:05}_.png" file_path = os.path.join(full_output_folder, file) # 保存图片 img.save(file_path, compress_level=4) file_paths.append(file_path) print(f"✅ 图片已保存到: {file_paths if file_paths else '未知路径'}") return (file_paths[0] if file_paths else "",) def get_save_image_path(self, filename_prefix, output_dir): def map_filename(filename): prefix_len = len(os.path.basename(filename_prefix)) prefix = filename[:prefix_len + 1] try: digits = int(filename[prefix_len + 1:].split('_')[0]) except: digits = 0 return (digits, prefix) subfolder = "" full_output_folder = os.path.join(output_dir, subfolder) try: counter = max(filter(lambda a: a[1][:-1] == filename_prefix and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1 except ValueError: counter = 1 except FileNotFoundError: os.makedirs(full_output_folder, exist_ok=True) counter = 1 return full_output_folder, filename_prefix, counter, subfolder, filename_prefix