305 lines
10 KiB
Python
305 lines
10 KiB
Python
import errno
|
|
import glob
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
import ffmpy
|
|
import loguru
|
|
import torchvision.io
|
|
from ffmpy import FFExecutableNotFoundError, FFRuntimeError
|
|
|
|
video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov']
|
|
|
|
|
|
|
|
class VideoCut:
|
|
"""FFMPEG视频剪辑"""
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
|
"start": ("STRING", {"default": "00:00:00.000"}),
|
|
"end": ("STRING", {"default": "00:00:10.000"}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE","AUDIO")
|
|
RETURN_NAMES = ("视频帧","音频")
|
|
|
|
FUNCTION = "cut"
|
|
|
|
# OUTPUT_NODE = False
|
|
|
|
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
|
|
|
def cut(self, video_path, start, end):
|
|
try:
|
|
# 原文件名
|
|
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
|
|
# 新文件名 复制改名适配ffmpeg
|
|
uid = uuid.uuid1()
|
|
temp_fname = os.sep.join(
|
|
[
|
|
*video_path.split(os.sep)[:-1],
|
|
"%s.%s" % (str(uid), video_path.split(".")[-1]),
|
|
]
|
|
)
|
|
try:
|
|
shutil.copy(video_path, temp_fname)
|
|
except:
|
|
return ("请检查输入文件权限",)
|
|
video_path = temp_fname
|
|
# 组装输出文件名
|
|
output_name = ".".join(
|
|
[
|
|
*video_path.split(os.sep)[-1].split(".")[:-2],
|
|
video_path.split(os.sep)[-1].split(".")[-2]
|
|
+ "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"),
|
|
video_path.split(os.sep)[-1].split(".")[-1],
|
|
]
|
|
)
|
|
output = (
|
|
os.sep.join([*video_path.split(os.sep)[:-1], output_name])
|
|
.replace(
|
|
os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"])
|
|
)
|
|
.replace(" ", "")
|
|
)
|
|
# 调用ffmpeg
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={video_path: None},
|
|
outputs={
|
|
output: [
|
|
"-ss",
|
|
start,
|
|
"-to",
|
|
end,
|
|
"-c:v",
|
|
"libx264",
|
|
"-c:a",
|
|
"libmp3lame",
|
|
"-reset_timestamps",
|
|
"1",
|
|
"-sc_threshold",
|
|
"0",
|
|
"-g",
|
|
"1"
|
|
"-force_key_frames",
|
|
"expr:gte(t, n_forced * 1)",
|
|
"-v",
|
|
"-8"
|
|
]
|
|
},
|
|
)
|
|
print(ff.cmd)
|
|
ff.run()
|
|
# uuid填充改回原文件名
|
|
try:
|
|
os.remove(temp_fname)
|
|
except:
|
|
pass
|
|
try:
|
|
files = glob.glob(output.replace("%03d", "*"))
|
|
for file in files:
|
|
shutil.move(file, file.replace(str(uid), origin_fname))
|
|
files = glob.glob(
|
|
output.replace(str(uid), origin_fname).replace("%03d", "*")
|
|
)
|
|
except:
|
|
files = glob.glob(output.replace("%03d", "*"))
|
|
traceback.print_exc()
|
|
video, audio, info = torchvision.io.read_video(files[0])
|
|
audio.unsqueeze_(0)
|
|
try:
|
|
os.remove(files[0])
|
|
except:
|
|
pass
|
|
return (video, {"waveform":audio,"sample_rate":info["audio_fps"]},)
|
|
except:
|
|
traceback.print_exc()
|
|
raise Exception("Cut Failed")
|
|
|
|
class VideoCutByFramePoint:
|
|
"""FFMPEG视频剪辑-帧位"""
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
|
"start_point": ("FLOAT", {"default": "0.0"}),
|
|
"duration": ("FLOAT", {"default": "10.0"}),
|
|
"fps": ("INT", {"default": "25"}),
|
|
"force_match_fps": ("BOOLEAN", {"default": True}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE","AUDIO")
|
|
RETURN_NAMES = ("视频帧","音频")
|
|
|
|
FUNCTION = "cut"
|
|
|
|
# OUTPUT_NODE = False
|
|
|
|
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
|
|
|
def cut(self, video_path, start_point, duration, fps, force_match_fps):
|
|
try:
|
|
# 原文件名
|
|
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
|
|
# 新文件名 复制改名适配ffmpeg
|
|
uid = uuid.uuid1()
|
|
temp_fname = os.sep.join(
|
|
[
|
|
os.path.dirname(__file__),
|
|
"%s.%s" % (str(uid), video_path.split(".")[-1]),
|
|
]
|
|
)
|
|
try:
|
|
shutil.copy(video_path, temp_fname)
|
|
except:
|
|
return ("请检查输入文件权限",)
|
|
video_path = temp_fname
|
|
# 组装输出文件名
|
|
output_name = ".".join(
|
|
[
|
|
*video_path.split(os.sep)[-1].split(".")[:-2],
|
|
video_path.split(os.sep)[-1].split(".")[-2]
|
|
+ "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"),
|
|
video_path.split(os.sep)[-1].split(".")[-1],
|
|
]
|
|
)
|
|
output = (
|
|
os.sep.join([os.path.dirname(__file__), output_name])
|
|
.replace(
|
|
os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"])
|
|
)
|
|
.replace(" ", "")
|
|
)
|
|
# 调用ffmpeg
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={video_path: None},
|
|
outputs={
|
|
output: [
|
|
"-ss",
|
|
"%.3f" % (start_point/fps),
|
|
"-t",
|
|
"%.3f" % (duration/fps),
|
|
"-c:v",
|
|
"libx264",
|
|
"-c:a",
|
|
"libmp3lame",
|
|
"-reset_timestamps",
|
|
"1",
|
|
"-sc_threshold",
|
|
"0",
|
|
"-g",
|
|
"1",
|
|
"-force_key_frames",
|
|
"expr:gte(t, n_forced * 1)",
|
|
"-r" if force_match_fps else "",
|
|
"%d" % fps if force_match_fps else "",
|
|
"-v",
|
|
"-8"
|
|
]
|
|
},
|
|
)
|
|
print(ff.cmd)
|
|
ff.run()
|
|
# uuid填充改回原文件名
|
|
try:
|
|
os.remove(temp_fname)
|
|
except:
|
|
pass
|
|
# try:
|
|
# files = glob.glob(output.replace("%03d", "*"))
|
|
# for file in files:
|
|
# shutil.move(file, file.replace(str(uid), origin_fname))
|
|
# files = glob.glob(
|
|
# output.replace(str(uid), origin_fname).replace("%03d", "*")
|
|
# )
|
|
# except:
|
|
# files = glob.glob(output.replace("%03d", "*"))
|
|
# traceback.print_exc()
|
|
video, audio, info = torchvision.io.read_video(output)
|
|
audio.unsqueeze_(0)
|
|
try:
|
|
os.remove(output)
|
|
except:
|
|
pass
|
|
return (video, {"waveform":audio,"sample_rate":info["audio_fps"]},)
|
|
except:
|
|
traceback.print_exc()
|
|
raise Exception("Cut Failed")
|
|
|
|
class VideoChangeFPS:
|
|
"""FFMPEG视频FPS转换"""
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
|
"fps": ("INT", {"default": 30}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("STRING",)
|
|
RETURN_NAMES = ("视频路径",)
|
|
|
|
FUNCTION = "changeFps"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
|
|
|
def changeFps(self, video_path, fps):
|
|
try:
|
|
if not (video_path.startswith("/") or video_path.startswith("output/") or video_path[1] == ":"):
|
|
video_path = "output/" + video_path
|
|
loguru.logger.info("Processing video: %s" % video_path)
|
|
output = ".".join([video_path.split(".")[-2]+"-%dfps" % fps,video_path.split(".")[-1]])
|
|
ff = ffmpy.FFmpeg(
|
|
inputs={video_path: ["-loglevel", "info","-y"]},
|
|
outputs={
|
|
output: [
|
|
"-vf",
|
|
"fps=%d" % fps,
|
|
"-c:v",
|
|
"libx264",
|
|
"-crf",
|
|
"16",
|
|
"-preset",
|
|
"slow",
|
|
"-threads",
|
|
"12",
|
|
"-c:a",
|
|
"copy",
|
|
]
|
|
})
|
|
try:
|
|
print(ff.cmd)
|
|
with open("user/ffmpeg.txt", "a") as log:
|
|
log.write("\n----"+f"{datetime.now()}----\n"+ff.cmd+"\n========\n")
|
|
process = subprocess.Popen(
|
|
ff.cmd, stdout=log, stderr=log
|
|
)
|
|
except OSError as e:
|
|
if e.errno == errno.ENOENT:
|
|
raise FFExecutableNotFoundError(f"Executable '{ff.executable}' not found")
|
|
else:
|
|
raise
|
|
o_stdout, o_stderr = process.communicate()
|
|
if process.returncode != 0:
|
|
raise FFRuntimeError(ff.cmd, process.returncode, o_stdout, o_stderr)
|
|
return (output,)
|
|
except:
|
|
traceback.print_exc()
|
|
raise Exception("ChangeFPS Failed") |