260 lines
9.4 KiB
Python
260 lines
9.4 KiB
Python
import os
|
||
import uuid
|
||
from io import BytesIO
|
||
|
||
import loguru
|
||
import numpy as np
|
||
import requests
|
||
import torch
|
||
from PIL import Image
|
||
import folder_paths
|
||
|
||
|
||
# 定义节点类
|
||
class LoadImgOptional:
|
||
# 定义节点输入类型
|
||
@classmethod
|
||
def INPUT_TYPES(cls):
|
||
input_dir = folder_paths.get_input_directory()
|
||
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
||
files = folder_paths.filter_files_content_types(files, ["image"])
|
||
return {
|
||
"optional": {
|
||
"image_url": ("STRING", {
|
||
"default": "https://example.com/sample.jpg",
|
||
"multiline": False
|
||
}),
|
||
"image": (sorted(files), {"image_upload": True})
|
||
}
|
||
}
|
||
|
||
# 定义节点输出类型
|
||
RETURN_TYPES = ("IMAGE",) # 返回图像数据
|
||
RETURN_NAMES = ("image",) # 命名返回值
|
||
FUNCTION = "load_image_task" # 函数标识符,方便注册多个节点功能
|
||
OUTPUT_NODE = False # 不允许该节点直接作为最终输出节点
|
||
CATEGORY = "不忘科技-自定义节点🚩/图片" # 节点所属类别(在 ComfyUI 界面中分类)
|
||
|
||
def load_image_task(self, image_url, image):
|
||
try:
|
||
if not image_url or len(image_url.strip()) == 0 or image_url == "https://example.com/sample.jpg":
|
||
loguru.logger.info("读取本地文件")
|
||
image_path = folder_paths.get_annotated_filepath(image)
|
||
with open(image_path, "rb") as image_file:
|
||
image = image_file.read()
|
||
else:
|
||
loguru.logger.info("读取线上文件")
|
||
response = requests.get(image_url)
|
||
response.raise_for_status()
|
||
image = response.content
|
||
image = Image.open(BytesIO(image)).convert("RGB")
|
||
|
||
# 按照官方格式转换图像数据
|
||
image_array = np.array(image).astype(np.float32) / 255.0
|
||
image_tensor = torch.from_numpy(image_array).unsqueeze(0)
|
||
|
||
return (image_tensor,)
|
||
except Exception as e:
|
||
raise Exception(f"Error loading image: {e}")
|
||
|
||
|
||
class SaveImagePath:
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"image_path": ("IMAGE", {"forceInput": True}),
|
||
}
|
||
}
|
||
|
||
RETURN_TYPES = ("STRING",)
|
||
FUNCTION = "load"
|
||
CATEGORY = "不忘科技-自定义节点🚩/图片"
|
||
|
||
def load(self, image_path):
|
||
# 如果是torch.Tensor类型,转换为numpy数组
|
||
if isinstance(image_path, torch.Tensor):
|
||
image_path = image_path.cpu().numpy()
|
||
|
||
# 去除多余的维度,如果形状是(1, 1, height, width, channels)或(1, height, width, channels)等情况
|
||
while len(image_path.shape) > 3:
|
||
image_path = image_path.squeeze(0)
|
||
|
||
# 如果是通道优先格式 (C, H, W),转换为通道最后格式 (H, W, C)
|
||
if len(image_path.shape) == 3 and image_path.shape[0] <= 4:
|
||
image_path = np.transpose(image_path, (1, 2, 0))
|
||
|
||
# 如果是单通道图像,转换为3通道
|
||
if len(image_path.shape) == 2:
|
||
image_path = np.stack([image_path] * 3, axis=-1)
|
||
|
||
# 数据范围和类型转换 - 这是关键修复
|
||
if image_path.dtype == np.float32 or image_path.dtype == np.float64:
|
||
# ComfyUI图像数据通常是0-1范围的浮点数
|
||
if image_path.max() <= 1.0:
|
||
# 从0-1范围转换到0-255范围
|
||
image_path = (image_path * 255.0).astype(np.uint8)
|
||
else:
|
||
# 如果已经是0-255范围,直接转换类型
|
||
image_path = np.clip(image_path, 0, 255).astype(np.uint8)
|
||
elif image_path.dtype != np.uint8:
|
||
# 其他数据类型,确保在0-255范围内
|
||
image_path = np.clip(image_path, 0, 255).astype(np.uint8)
|
||
|
||
pil_image = Image.fromarray(image_path)
|
||
|
||
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output")
|
||
# base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
# output_dir = '/root/comfy/ComfyUI/output'
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
file_name = "%s.png" % str(uuid.uuid4())
|
||
p = os.path.join(output_dir, file_name)
|
||
pil_image.save(p)
|
||
|
||
return (p,)
|
||
|
||
|
||
class SaveImageWithOutput:
|
||
"""保存图片并输出的节点:既保存图片又能继续传递数据"""
|
||
|
||
def __init__(self):
|
||
self.output_dir = "output"
|
||
self.type = "output"
|
||
self.prefix_append = ""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(cls):
|
||
return {
|
||
"required": {
|
||
"images": ("IMAGE",),
|
||
"filename_prefix": ("STRING", {"default": "ComfyUI"}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("IMAGE", "STRING")
|
||
RETURN_NAMES = ("images", "file_path")
|
||
FUNCTION = "save_image"
|
||
OUTPUT_NODE = False
|
||
CATEGORY = "不忘科技-自定义节点🚩/图片"
|
||
|
||
def save_image(self, images, filename_prefix="ComfyUI"):
|
||
filename_prefix += self.prefix_append
|
||
full_output_folder, filename, counter, subfolder, filename_prefix = self.get_save_image_path(filename_prefix,
|
||
self.output_dir)
|
||
|
||
file_paths = []
|
||
|
||
for i, image in enumerate(images):
|
||
# 转换图片数据
|
||
img_array = 255. * image.cpu().numpy()
|
||
img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8))
|
||
|
||
# 生成文件名
|
||
file = f"{filename_prefix}_{counter + i:05}_.png"
|
||
file_path = os.path.join(full_output_folder, file)
|
||
|
||
# 保存图片
|
||
img.save(file_path, compress_level=4)
|
||
file_paths.append(file_path)
|
||
|
||
print(f"✅ 图片已保存到: {file_paths[0] if file_paths else '未知路径'}")
|
||
|
||
# 直接返回元组:(原始图片数据, 第一个文件路径)
|
||
return (images, file_paths[0] if file_paths else "")
|
||
|
||
def get_save_image_path(self, filename_prefix, output_dir):
|
||
def map_filename(filename):
|
||
prefix_len = len(os.path.basename(filename_prefix))
|
||
prefix = filename[:prefix_len + 1]
|
||
try:
|
||
digits = int(filename[prefix_len + 1:].split('_')[0])
|
||
except:
|
||
digits = 0
|
||
return (digits, prefix)
|
||
|
||
subfolder = ""
|
||
full_output_folder = os.path.join(output_dir, subfolder)
|
||
|
||
if os.path.commonpath((output_dir, os.path.abspath(full_output_folder))) != output_dir:
|
||
print("Saving image outside the output folder is not allowed.")
|
||
return {}
|
||
|
||
try:
|
||
counter = max(filter(lambda a: a[1][:-1] == filename_prefix and a[1][-1] == "_",
|
||
map(map_filename, os.listdir(full_output_folder))))[0] + 1
|
||
except ValueError:
|
||
counter = 1
|
||
except FileNotFoundError:
|
||
os.makedirs(full_output_folder, exist_ok=True)
|
||
counter = 1
|
||
|
||
return full_output_folder, filename_prefix, counter, subfolder, filename_prefix
|
||
|
||
|
||
class SaveImageAnywhere:
|
||
"""保存图片并输出的节点:既保存图片又能继续传递数据"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(cls):
|
||
return {
|
||
"required": {
|
||
"images": ("IMAGE",),
|
||
"filename_prefix": ("STRING", {"default": "ComfyUI"}),
|
||
"output_dir": ("STRING", {"default": "output"}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("STRING",)
|
||
RETURN_NAMES = ("file_path",)
|
||
FUNCTION = "save_image"
|
||
OUTPUT_NODE = True
|
||
CATEGORY = "不忘科技-自定义节点🚩/图片"
|
||
|
||
def save_image(self, images, filename_prefix="ComfyUI", output_dir="output"):
|
||
full_output_folder, filename, counter, subfolder, filename_prefix = self.get_save_image_path(
|
||
filename_prefix,output_dir)
|
||
|
||
file_paths = []
|
||
|
||
for i, image in enumerate(images):
|
||
# 转换图片数据
|
||
img_array = 255. * image.cpu().numpy()
|
||
img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8))
|
||
|
||
# 生成文件名
|
||
file = f"{filename_prefix}_{counter + i:05}_.png"
|
||
file_path = os.path.join(full_output_folder, file)
|
||
|
||
# 保存图片
|
||
img.save(file_path, compress_level=4)
|
||
file_paths.append(file_path)
|
||
|
||
print(f"✅ 图片已保存到: {file_paths if file_paths else '未知路径'}")
|
||
|
||
return (file_paths[0] if file_paths else "",)
|
||
|
||
def get_save_image_path(self, filename_prefix, output_dir):
|
||
def map_filename(filename):
|
||
prefix_len = len(os.path.basename(filename_prefix))
|
||
prefix = filename[:prefix_len + 1]
|
||
try:
|
||
digits = int(filename[prefix_len + 1:].split('_')[0])
|
||
except:
|
||
digits = 0
|
||
return (digits, prefix)
|
||
|
||
subfolder = ""
|
||
full_output_folder = os.path.join(output_dir, subfolder)
|
||
|
||
try:
|
||
counter = max(filter(lambda a: a[1][:-1] == filename_prefix and a[1][-1] == "_",
|
||
map(map_filename, os.listdir(full_output_folder))))[0] + 1
|
||
except ValueError:
|
||
counter = 1
|
||
except FileNotFoundError:
|
||
os.makedirs(full_output_folder, exist_ok=True)
|
||
counter = 1
|
||
|
||
return full_output_folder, filename_prefix, counter, subfolder, filename_prefix |