ComfyUI-CustomNode/nodes/image_nodes.py

260 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import uuid
from io import BytesIO
import loguru
import numpy as np
import requests
import torch
from PIL import Image
import folder_paths
# 定义节点类
class LoadImgOptional:
# 定义节点输入类型
@classmethod
def INPUT_TYPES(cls):
input_dir = folder_paths.get_input_directory()
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
files = folder_paths.filter_files_content_types(files, ["image"])
return {
"optional": {
"image_url": ("STRING", {
"default": "https://example.com/sample.jpg",
"multiline": False
}),
"image": (sorted(files), {"image_upload": True})
}
}
# 定义节点输出类型
RETURN_TYPES = ("IMAGE",) # 返回图像数据
RETURN_NAMES = ("image",) # 命名返回值
FUNCTION = "load_image_task" # 函数标识符,方便注册多个节点功能
OUTPUT_NODE = False # 不允许该节点直接作为最终输出节点
CATEGORY = "不忘科技-自定义节点🚩/图片" # 节点所属类别(在 ComfyUI 界面中分类)
def load_image_task(self, image_url, image):
try:
if not image_url or len(image_url.strip()) == 0 or image_url == "https://example.com/sample.jpg":
loguru.logger.info("读取本地文件")
image_path = folder_paths.get_annotated_filepath(image)
with open(image_path, "rb") as image_file:
image = image_file.read()
else:
loguru.logger.info("读取线上文件")
response = requests.get(image_url)
response.raise_for_status()
image = response.content
image = Image.open(BytesIO(image)).convert("RGB")
# 按照官方格式转换图像数据
image_array = np.array(image).astype(np.float32) / 255.0
image_tensor = torch.from_numpy(image_array).unsqueeze(0)
return (image_tensor,)
except Exception as e:
raise Exception(f"Error loading image: {e}")
class SaveImagePath:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image_path": ("IMAGE", {"forceInput": True}),
}
}
RETURN_TYPES = ("STRING",)
FUNCTION = "load"
CATEGORY = "不忘科技-自定义节点🚩/图片"
def load(self, image_path):
# 如果是torch.Tensor类型转换为numpy数组
if isinstance(image_path, torch.Tensor):
image_path = image_path.cpu().numpy()
# 去除多余的维度,如果形状是(1, 1, height, width, channels)或(1, height, width, channels)等情况
while len(image_path.shape) > 3:
image_path = image_path.squeeze(0)
# 如果是通道优先格式 (C, H, W),转换为通道最后格式 (H, W, C)
if len(image_path.shape) == 3 and image_path.shape[0] <= 4:
image_path = np.transpose(image_path, (1, 2, 0))
# 如果是单通道图像转换为3通道
if len(image_path.shape) == 2:
image_path = np.stack([image_path] * 3, axis=-1)
# 数据范围和类型转换 - 这是关键修复
if image_path.dtype == np.float32 or image_path.dtype == np.float64:
# ComfyUI图像数据通常是0-1范围的浮点数
if image_path.max() <= 1.0:
# 从0-1范围转换到0-255范围
image_path = (image_path * 255.0).astype(np.uint8)
else:
# 如果已经是0-255范围直接转换类型
image_path = np.clip(image_path, 0, 255).astype(np.uint8)
elif image_path.dtype != np.uint8:
# 其他数据类型确保在0-255范围内
image_path = np.clip(image_path, 0, 255).astype(np.uint8)
pil_image = Image.fromarray(image_path)
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output")
# base_dir = os.path.dirname(os.path.abspath(__file__))
# output_dir = '/root/comfy/ComfyUI/output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
file_name = "%s.png" % str(uuid.uuid4())
p = os.path.join(output_dir, file_name)
pil_image.save(p)
return (p,)
class SaveImageWithOutput:
"""保存图片并输出的节点:既保存图片又能继续传递数据"""
def __init__(self):
self.output_dir = "output"
self.type = "output"
self.prefix_append = ""
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"images": ("IMAGE",),
"filename_prefix": ("STRING", {"default": "ComfyUI"}),
},
}
RETURN_TYPES = ("IMAGE", "STRING")
RETURN_NAMES = ("images", "file_path")
FUNCTION = "save_image"
OUTPUT_NODE = False
CATEGORY = "不忘科技-自定义节点🚩/图片"
def save_image(self, images, filename_prefix="ComfyUI"):
filename_prefix += self.prefix_append
full_output_folder, filename, counter, subfolder, filename_prefix = self.get_save_image_path(filename_prefix,
self.output_dir)
file_paths = []
for i, image in enumerate(images):
# 转换图片数据
img_array = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8))
# 生成文件名
file = f"{filename_prefix}_{counter + i:05}_.png"
file_path = os.path.join(full_output_folder, file)
# 保存图片
img.save(file_path, compress_level=4)
file_paths.append(file_path)
print(f"✅ 图片已保存到: {file_paths[0] if file_paths else '未知路径'}")
# 直接返回元组:(原始图片数据, 第一个文件路径)
return (images, file_paths[0] if file_paths else "")
def get_save_image_path(self, filename_prefix, output_dir):
def map_filename(filename):
prefix_len = len(os.path.basename(filename_prefix))
prefix = filename[:prefix_len + 1]
try:
digits = int(filename[prefix_len + 1:].split('_')[0])
except:
digits = 0
return (digits, prefix)
subfolder = ""
full_output_folder = os.path.join(output_dir, subfolder)
if os.path.commonpath((os.path.abspath(output_dir), os.path.abspath(full_output_folder))) != os.path.abspath(output_dir):
print("Saving image outside the output folder is not allowed.")
return {}
try:
counter = max(filter(lambda a: a[1][:-1] == filename_prefix and a[1][-1] == "_",
map(map_filename, os.listdir(full_output_folder))))[0] + 1
except ValueError:
counter = 1
except FileNotFoundError:
os.makedirs(full_output_folder, exist_ok=True)
counter = 1
return full_output_folder, filename_prefix, counter, subfolder, filename_prefix
class SaveImageAnywhere:
"""保存图片并输出的节点:既保存图片又能继续传递数据"""
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"images": ("IMAGE",),
"filename_prefix": ("STRING", {"default": "ComfyUI"}),
"output_dir": ("STRING", {"default": "output"}),
},
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("file_path",)
FUNCTION = "save_image"
OUTPUT_NODE = True
CATEGORY = "不忘科技-自定义节点🚩/图片"
def save_image(self, images, filename_prefix="ComfyUI", output_dir="output"):
full_output_folder, filename, counter, subfolder, filename_prefix = self.get_save_image_path(
filename_prefix,output_dir)
file_paths = []
for i, image in enumerate(images):
# 转换图片数据
img_array = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8))
# 生成文件名
file = f"{filename_prefix}_{counter + i:05}_.png"
file_path = os.path.join(full_output_folder, file)
# 保存图片
img.save(file_path, compress_level=4)
file_paths.append(file_path)
print(f"✅ 图片已保存到: {file_paths if file_paths else '未知路径'}")
return (file_paths[0] if file_paths else "",)
def get_save_image_path(self, filename_prefix, output_dir):
def map_filename(filename):
prefix_len = len(os.path.basename(filename_prefix))
prefix = filename[:prefix_len + 1]
try:
digits = int(filename[prefix_len + 1:].split('_')[0])
except:
digits = 0
return (digits, prefix)
subfolder = ""
full_output_folder = os.path.join(output_dir, subfolder)
try:
counter = max(filter(lambda a: a[1][:-1] == filename_prefix and a[1][-1] == "_",
map(map_filename, os.listdir(full_output_folder))))[0] + 1
except ValueError:
counter = 1
except FileNotFoundError:
os.makedirs(full_output_folder, exist_ok=True)
counter = 1
return full_output_folder, filename_prefix, counter, subfolder, filename_prefix