ComfyUI-CustomNode/__init__.py

223 lines
8.6 KiB
Python

import glob
import json
import os
import random
import shutil
import traceback
import uuid
from datetime import datetime
from .test_single_image import test_node
import ffmpy
video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov']
class FaceDetect:
"""
A example node
Class methods
-------------
INPUT_TYPES (dict):
Tell the main program input parameters of nodes.
IS_CHANGED:
optional method to control when the node is re executed.
Attributes
----------
RETURN_TYPES (`tuple`):
The type of each element in the output tuple.
RETURN_NAMES (`tuple`):
Optional: The name of each output in the output tuple.
FUNCTION (`str`):
The name of the entry-point method. For example, if `FUNCTION = "execute"` then it will run Example().execute()
OUTPUT_NODE ([`bool`]):
If this node is an output node that outputs a result/image from the graph. The SaveImage node is an example.
The backend iterates on these output nodes and tries to execute all their parents if their parent graph is properly connected.
Assumed to be False if not present.
CATEGORY (`str`):
The category the node should appear in the UI.
DEPRECATED (`bool`):
Indicates whether the node is deprecated. Deprecated nodes are hidden by default in the UI, but remain
functional in existing workflows that use them.
EXPERIMENTAL (`bool`):
Indicates whether the node is experimental. Experimental nodes are marked as such in the UI and may be subject to
significant changes or removal in future versions. Use with caution in production workflows.
execute(s) -> tuple || None:
The entry point method. The name of this method must be the same as the value of property `FUNCTION`.
For example, if `FUNCTION = "execute"` then this method's name must be `execute`, if `FUNCTION = "foo"` then it must be `foo`.
"""
def __init__(self):
pass
@classmethod
def INPUT_TYPES(s):
"""
Return a dictionary which contains config for all input fields.
Some types (string): "MODEL", "VAE", "CLIP", "CONDITIONING", "LATENT", "IMAGE", "INT", "STRING", "FLOAT".
Input types "INT", "STRING" or "FLOAT" are special values for fields on the node.
The type can be a list for selection.
Returns: `dict`:
- Key input_fields_group (`string`): Can be either required, hidden or optional. A node class must have property `required`
- Value input_fields (`dict`): Contains input fields config:
* Key field_name (`string`): Name of a entry-point method's argument
* Value field_config (`tuple`):
+ First value is a string indicate the type of field or a list for selection.
+ Second value is a config for type "INT", "STRING" or "FLOAT".
"""
return {
"required": {
"image": ("IMAGE",),
"main_seed": ("INT:seed", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
"model": (["convnext_tiny","convnext_base"],),
"length": ("INT",{"default": 10, "min": 3, "max": 60, "step": 1}),
"threshold": ("FLOAT", {"default": 94, "min": 55, "max": 99, "step": 0.1})
},
}
RETURN_TYPES = ("IMAGE", "IMAGE", "STRING", "STRING", "STRING", "STRING", "STRING","INT","INT")
RETURN_NAMES = ("图像", "选中人脸", "分类", "概率", "采用帧序号", "全部帧序列", "剪辑配置","起始帧序号","帧数量")
FUNCTION = "predict"
# OUTPUT_NODE = False
CATEGORY = "我的自定义节点"
def predict(self, image, main_seed, model, length, threshold):
image, image_selected, cls, prob, nums, period = test_node(image, length=length, thres=threshold, model_name=model)
print("全部帧序列", period)
if len(period) > 0:
start, end = period[main_seed % len(period)]
config = {"start": start, "end": end}
else:
config = {}
start = 0
end = 0
raise RuntimeError("未找到符合要求的视频片段")
return (image, image_selected, cls, prob, nums, str(period), json.dumps(config), start, end-start)
"""
The node will always be re executed if any of the inputs change but
this method can be used to force the node to execute again even when the inputs don't change.
You can make this node return a number or a string. This value will be compared to the one returned the last time the node was
executed, if it is different the node will be executed again.
This method is used in the core repo for the LoadImage node where they return the image hash as a string, if the image hash
changes between executions the LoadImage node is executed again.
"""
# @classmethod
# def IS_CHANGED(s, image, string_field, int_field, float_field, print_to_screen):
# return ""
# Set the web directory, any .js file in that directory will be loaded by the frontend as a frontend extension
# WEB_DIRECTORY = "./somejs"
class VideoCut:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"config": ("STRING", ),
"video_path": ("STRING", ),
"mod": ("INT",),
"fps": ("FLOAT",),
"period_length":("INT",{"default":10,"min":4,"max":100,"step":1,"forceInput":True})
},
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("视频路径",)
FUNCTION = "cut"
# OUTPUT_NODE = False
CATEGORY = "我的自定义节点"
def cut(self, config, video_path, mod, fps, period_length):
# 原文件名
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
# 配置获取
mul = mod/fps
print("fps",fps)
config = json.loads(config)
if len(config.keys()) == 0:
return ("无法生成符合要求的片段",)
start, end = config["start"], config["end"]
# 新文件名 复制改名适配ffmpeg
uid = uuid.uuid1()
temp_fname = os.sep.join([*video_path.split(os.sep)[:-1],"%s.%s" % (str(uid),video_path.split(".")[-1])])
try:
shutil.copy(video_path, temp_fname)
except:
return ("请检查输入文件权限",)
video_path = temp_fname
# 组装输出文件名
output_name = (".".join([*video_path.split(os.sep)[-1].split(".")[:-2],
video_path.split(os.sep)[-1].split(".")[-2] +
"_output_%%03d_%s" % datetime.now().strftime('%Y%m%d_%H%M%S'),
video_path.split(os.sep)[-1].split(".")[-1]]))
output = (os.sep.join([*video_path.split(os.sep)[:-1],output_name])
.replace(os.sep.join(["ComfyUI","input"]),os.sep.join(["ComfyUI","output"])).replace(" ",""))
#调用ffmpeg
ff = ffmpy.FFmpeg(
inputs={video_path: ['-accurate_seek']},
outputs={output: [
'-f', 'segment',
'-ss', str(round(start*mul,3)),
'-to', str(round(end*mul,3)),
'-segment_times', str(period_length),
'-c', 'copy',
'-map', '0',
'-avoid_negative_ts', '1'
]}
)
print(ff.cmd)
ff.run()
# uuid填充改回原文件名
try:
os.remove(temp_fname)
except:
pass
try:
files = glob.glob(output.replace("%03d","*"))
for file in files:
shutil.move(file,file.replace(str(uid),origin_fname))
files = glob.glob(output.replace(str(uid),origin_fname).replace("%03d", "*"))
return (str(files),)
except:
files = glob.glob(output.replace("%03d", "*"))
traceback.print_exc()
return (str(files),)
# Add custom API routes, using router
from aiohttp import web
from server import PromptServer
@PromptServer.instance.routes.get("/hello")
async def get_hello(request):
return web.json_response("hello")
# A dictionary that contains all nodes you want to export with their names
# NOTE: names should be globally unique
NODE_CLASS_MAPPINGS = {
"FaceOccDetect": FaceDetect,
"VideoCutCustom": VideoCut
}
# A dictionary that contains the friendly/humanly readable titles for the nodes
NODE_DISPLAY_NAME_MAPPINGS = {
"FaceOccDetect": "面部遮挡检测",
"VideoCutCustom": "视频剪裁"
}