340 lines
12 KiB
Python
340 lines
12 KiB
Python
import errno
|
||
import glob
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import traceback
|
||
import uuid
|
||
from datetime import datetime
|
||
|
||
import ffmpy
|
||
import loguru
|
||
import torchvision.io
|
||
from ffmpy import FFExecutableNotFoundError, FFRuntimeError
|
||
|
||
video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov']
|
||
|
||
|
||
|
||
class VideoCut:
|
||
"""FFMPEG视频剪辑"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
||
"start": ("STRING", {"default": "00:00:00.000"}),
|
||
"end": ("STRING", {"default": "00:00:10.000"}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("IMAGE","AUDIO")
|
||
RETURN_NAMES = ("视频帧","音频")
|
||
|
||
FUNCTION = "cut"
|
||
|
||
# OUTPUT_NODE = False
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def cut(self, video_path, start, end):
|
||
try:
|
||
# 原文件名
|
||
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
|
||
# 新文件名 复制改名适配ffmpeg
|
||
uid = uuid.uuid1()
|
||
temp_fname = os.sep.join(
|
||
[
|
||
*video_path.split(os.sep)[:-1],
|
||
"%s.%s" % (str(uid), video_path.split(".")[-1]),
|
||
]
|
||
)
|
||
try:
|
||
shutil.copy(video_path, temp_fname)
|
||
except:
|
||
return ("请检查输入文件权限",)
|
||
video_path = temp_fname
|
||
# 组装输出文件名
|
||
output_name = ".".join(
|
||
[
|
||
*video_path.split(os.sep)[-1].split(".")[:-2],
|
||
video_path.split(os.sep)[-1].split(".")[-2]
|
||
+ "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||
video_path.split(os.sep)[-1].split(".")[-1],
|
||
]
|
||
)
|
||
output = (
|
||
os.sep.join([*video_path.split(os.sep)[:-1], output_name])
|
||
.replace(
|
||
os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"])
|
||
)
|
||
.replace(" ", "")
|
||
)
|
||
# 调用ffmpeg
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={video_path: None},
|
||
outputs={
|
||
output: [
|
||
"-ss",
|
||
start,
|
||
"-to",
|
||
end,
|
||
"-c:v",
|
||
"libx264",
|
||
"-c:a",
|
||
"libmp3lame",
|
||
"-reset_timestamps",
|
||
"1",
|
||
"-sc_threshold",
|
||
"0",
|
||
"-g",
|
||
"1"
|
||
"-force_key_frames",
|
||
"expr:gte(t, n_forced * 1)",
|
||
"-v",
|
||
"-8"
|
||
]
|
||
},
|
||
)
|
||
print(ff.cmd)
|
||
ff.run()
|
||
# uuid填充改回原文件名
|
||
try:
|
||
os.remove(temp_fname)
|
||
except:
|
||
pass
|
||
try:
|
||
files = glob.glob(output.replace("%03d", "*"))
|
||
for file in files:
|
||
shutil.move(file, file.replace(str(uid), origin_fname))
|
||
files = glob.glob(
|
||
output.replace(str(uid), origin_fname).replace("%03d", "*")
|
||
)
|
||
except:
|
||
files = glob.glob(output.replace("%03d", "*"))
|
||
traceback.print_exc()
|
||
video, audio, info = torchvision.io.read_video(files[0])
|
||
audio.unsqueeze_(0)
|
||
try:
|
||
os.remove(files[0])
|
||
except:
|
||
pass
|
||
return (video, {"waveform":audio,"sample_rate":info["audio_fps"]},)
|
||
except:
|
||
traceback.print_exc()
|
||
raise Exception("Cut Failed")
|
||
|
||
class VideoCutByFramePoint:
|
||
"""FFMPEG视频剪辑-帧位"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
||
"start_point": ("FLOAT", {"default": "0.0"}),
|
||
"duration": ("FLOAT", {"default": "10.0"}),
|
||
"fps": ("INT", {"default": "25"}),
|
||
"force_match_fps": ("BOOLEAN", {"default": True}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("IMAGE","AUDIO")
|
||
RETURN_NAMES = ("视频帧","音频")
|
||
|
||
FUNCTION = "cut"
|
||
|
||
# OUTPUT_NODE = False
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def cut(self, video_path, start_point, duration, fps, force_match_fps):
|
||
try:
|
||
# 原文件名
|
||
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
|
||
# 新文件名 复制改名适配ffmpeg
|
||
uid = uuid.uuid1()
|
||
temp_fname = os.sep.join(
|
||
[
|
||
os.path.dirname(__file__),
|
||
"%s.%s" % (str(uid), video_path.split(".")[-1]),
|
||
]
|
||
)
|
||
try:
|
||
shutil.copy(video_path, temp_fname)
|
||
except:
|
||
return ("请检查输入文件权限",)
|
||
video_path = temp_fname
|
||
# 组装输出文件名
|
||
output_name = ".".join(
|
||
[
|
||
*video_path.split(os.sep)[-1].split(".")[:-2],
|
||
video_path.split(os.sep)[-1].split(".")[-2]
|
||
+ "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||
video_path.split(os.sep)[-1].split(".")[-1],
|
||
]
|
||
)
|
||
output = (
|
||
os.sep.join([os.path.dirname(__file__), output_name])
|
||
.replace(
|
||
os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"])
|
||
)
|
||
.replace(" ", "")
|
||
)
|
||
# 调用ffmpeg
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={video_path: None},
|
||
outputs={
|
||
output: [
|
||
"-ss",
|
||
"%.3f" % (start_point/fps),
|
||
"-t",
|
||
"%.3f" % (duration/fps),
|
||
"-c:v",
|
||
"libx264",
|
||
"-c:a",
|
||
"libmp3lame",
|
||
"-reset_timestamps",
|
||
"1",
|
||
"-sc_threshold",
|
||
"0",
|
||
"-g",
|
||
"1",
|
||
"-force_key_frames",
|
||
"expr:gte(t, n_forced * 1)",
|
||
"-r" if force_match_fps else "",
|
||
"%d" % fps if force_match_fps else "",
|
||
"-v",
|
||
"-8"
|
||
]
|
||
},
|
||
)
|
||
print(ff.cmd)
|
||
ff.run()
|
||
# uuid填充改回原文件名
|
||
try:
|
||
os.remove(temp_fname)
|
||
except:
|
||
pass
|
||
# try:
|
||
# files = glob.glob(output.replace("%03d", "*"))
|
||
# for file in files:
|
||
# shutil.move(file, file.replace(str(uid), origin_fname))
|
||
# files = glob.glob(
|
||
# output.replace(str(uid), origin_fname).replace("%03d", "*")
|
||
# )
|
||
# except:
|
||
# files = glob.glob(output.replace("%03d", "*"))
|
||
# traceback.print_exc()
|
||
video, audio, info = torchvision.io.read_video(output)
|
||
audio.unsqueeze_(0)
|
||
try:
|
||
os.remove(output)
|
||
except:
|
||
pass
|
||
return (video, {"waveform":audio,"sample_rate":info["audio_fps"]},)
|
||
except:
|
||
traceback.print_exc()
|
||
raise Exception("Cut Failed")
|
||
|
||
class VideoChangeFPS:
|
||
"""FFMPEG视频FPS转换"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
||
"fps": ("INT", {"default": 30}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("STRING",)
|
||
RETURN_NAMES = ("视频路径",)
|
||
|
||
FUNCTION = "changeFps"
|
||
|
||
OUTPUT_NODE = True
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def get_media_duration(self, input_file, stream_type="v"):
|
||
"""获取视频或音频的精确时长(秒)"""
|
||
cmd = [
|
||
"ffprobe", "-v", "error",
|
||
"-select_streams", f"{stream_type}:0",
|
||
"-show_entries", "stream=duration",
|
||
"-of", "csv=p=0",
|
||
input_file
|
||
]
|
||
return float(subprocess.check_output(cmd).decode().strip())
|
||
|
||
def adjust_video_fps(self, input_video, output_temp, target_fps):
|
||
"""仅调整视频FPS,输出临时文件"""
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={input_video: None},
|
||
outputs={
|
||
output_temp: f"-y -vf fps={target_fps} -c:v libx264 -c:a aac"
|
||
}
|
||
)
|
||
print("调整FPS命令:", ff.cmd)
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n")
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
ff.run(stdout=log, stderr=log)
|
||
|
||
def align_audio_to_video(self, input_video, output_final):
|
||
"""将音频精确对齐到视频长度"""
|
||
video_duration = self.get_media_duration(input_video, "v")
|
||
audio_duration = self.get_media_duration(input_video, "a")
|
||
print("video_duration:", video_duration, "audio_duration:", audio_duration)
|
||
|
||
# 动态选择补静音或剪切
|
||
filter_audio = (
|
||
f"atrim=end={video_duration},asetpts=PTS-STARTPTS"
|
||
if audio_duration > video_duration
|
||
else f"apad=whole_dur={video_duration},asetpts=PTS-STARTPTS"
|
||
)
|
||
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={input_video: None},
|
||
outputs={
|
||
output_final: f""" -y
|
||
-filter_complex "
|
||
[0:v]copy[v];
|
||
[0:a]{filter_audio}[a]
|
||
"
|
||
-map "[v]" -map "[a]"
|
||
-c:v libx264 -c:a aac
|
||
"""
|
||
}
|
||
)
|
||
print("对齐音频命令:", ff.cmd)
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n")
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
ff.run(stdout=log, stderr=log)
|
||
|
||
def changeFps(self, video_path, fps):
|
||
try:
|
||
if not (video_path.startswith("/") or video_path.startswith("output/") or video_path[1] == ":"):
|
||
video_path = "output/" + video_path
|
||
loguru.logger.info("Processing video: %s" % video_path)
|
||
output_temp = ".".join([video_path.split(".")[-2] + "-%dfps-temp" % fps, video_path.split(".")[-1]])
|
||
output = ".".join([video_path.split(".")[-2]+"-%dfps" % fps,video_path.split(".")[-1]])
|
||
# 分步执行
|
||
self.adjust_video_fps(video_path, output_temp, fps) # 第一步:调整FPS
|
||
self.align_audio_to_video(output_temp, output) # 第二步:对齐音频
|
||
|
||
# 校验结果
|
||
final_video_dur = self.get_media_duration(output, "v")
|
||
final_audio_dur = self.get_media_duration(output, "a")
|
||
print("video_duration:", final_video_dur, "audio_duration:", final_audio_dur)
|
||
assert abs(final_video_dur - final_audio_dur) < 0.01, "音视频长度未对齐!"
|
||
loguru.logger.success(f"处理成功!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s")
|
||
return (output,)
|
||
except:
|
||
traceback.print_exc()
|
||
raise Exception("ChangeFPS Failed") |