473 lines
16 KiB
Python
473 lines
16 KiB
Python
import glob
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import traceback
|
||
import uuid
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List
|
||
|
||
import ffmpy
|
||
import folder_paths
|
||
import loguru
|
||
import torchvision.io
|
||
|
||
video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov']
|
||
|
||
|
||
class VideoCut:
|
||
"""FFMPEG视频剪辑"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"video_path": (
|
||
"STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
||
"start": ("STRING", {"default": "00:00:00.000"}),
|
||
"end": ("STRING", {"default": "00:00:10.000"}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("IMAGE", "AUDIO")
|
||
RETURN_NAMES = ("视频帧", "音频")
|
||
|
||
FUNCTION = "cut"
|
||
|
||
# OUTPUT_NODE = False
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def cut(self, video_path, start, end):
|
||
try:
|
||
# 原文件名
|
||
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
|
||
# 新文件名 复制改名适配ffmpeg
|
||
uid = uuid.uuid1()
|
||
temp_fname = os.sep.join(
|
||
[
|
||
*video_path.split(os.sep)[:-1],
|
||
"%s.%s" % (str(uid), video_path.split(".")[-1]),
|
||
]
|
||
)
|
||
try:
|
||
shutil.copy(video_path, temp_fname)
|
||
except:
|
||
return ("请检查输入文件权限",)
|
||
video_path = temp_fname
|
||
# 组装输出文件名
|
||
output_name = ".".join(
|
||
[
|
||
*video_path.split(os.sep)[-1].split(".")[:-2],
|
||
video_path.split(os.sep)[-1].split(".")[-2]
|
||
+ "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||
video_path.split(os.sep)[-1].split(".")[-1],
|
||
]
|
||
)
|
||
output = (
|
||
os.sep.join([*video_path.split(os.sep)[:-1], output_name])
|
||
.replace(
|
||
os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"])
|
||
)
|
||
.replace(" ", "")
|
||
)
|
||
# 调用ffmpeg
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={video_path: None},
|
||
outputs={
|
||
output: [
|
||
"-ss",
|
||
start,
|
||
"-to",
|
||
end,
|
||
"-c:v",
|
||
"libx264",
|
||
"-c:a",
|
||
"libmp3lame",
|
||
"-reset_timestamps",
|
||
"1",
|
||
"-sc_threshold",
|
||
"0",
|
||
"-g",
|
||
"1",
|
||
"-force_key_frames",
|
||
"expr:gte(t, n_forced * 1)",
|
||
"-v",
|
||
"-8"
|
||
]
|
||
},
|
||
)
|
||
print(ff.cmd)
|
||
ff.run()
|
||
# uuid填充改回原文件名
|
||
try:
|
||
os.remove(temp_fname)
|
||
except:
|
||
pass
|
||
try:
|
||
files = glob.glob(output.replace("%03d", "*"))
|
||
for file in files:
|
||
shutil.move(file, file.replace(str(uid), origin_fname))
|
||
files = glob.glob(
|
||
output.replace(str(uid), origin_fname).replace("%03d", "*")
|
||
)
|
||
except:
|
||
files = glob.glob(output.replace("%03d", "*"))
|
||
traceback.print_exc()
|
||
video, audio, info = torchvision.io.read_video(files[0])
|
||
audio.unsqueeze_(0)
|
||
try:
|
||
os.remove(files[0])
|
||
except:
|
||
pass
|
||
return (
|
||
video / 255.0, {"waveform": audio, "sample_rate": info["audio_fps"]} if "audio_fps" in info else None,)
|
||
except:
|
||
traceback.print_exc()
|
||
raise Exception("Cut Failed")
|
||
|
||
|
||
class VideoCutByFramePoint:
|
||
"""FFMPEG视频剪辑-帧位"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"video_path": (
|
||
"STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
||
"start_point": ("FLOAT", {"default": "0.0"}),
|
||
"duration": ("FLOAT", {"default": "10.0"}),
|
||
"fps": ("INT", {"default": "25"}),
|
||
"force_match_fps": ("BOOLEAN", {"default": True}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("IMAGE", "AUDIO")
|
||
RETURN_NAMES = ("视频帧", "音频")
|
||
|
||
FUNCTION = "cut"
|
||
|
||
# OUTPUT_NODE = False
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def cut(self, video_path, start_point, duration, fps, force_match_fps):
|
||
try:
|
||
# 原文件名
|
||
origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1])
|
||
# 新文件名 复制改名适配ffmpeg
|
||
uid = uuid.uuid1()
|
||
temp_fname = os.sep.join(
|
||
[
|
||
os.path.dirname(__file__),
|
||
"%s.%s" % (str(uid), video_path.split(".")[-1]),
|
||
]
|
||
)
|
||
try:
|
||
shutil.copy(video_path, temp_fname)
|
||
except:
|
||
return ("请检查输入文件权限",)
|
||
video_path = temp_fname
|
||
# 组装输出文件名
|
||
output_name = ".".join(
|
||
[
|
||
*video_path.split(os.sep)[-1].split(".")[:-2],
|
||
video_path.split(os.sep)[-1].split(".")[-2]
|
||
+ "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"),
|
||
video_path.split(os.sep)[-1].split(".")[-1],
|
||
]
|
||
)
|
||
output = (
|
||
os.sep.join([os.path.dirname(__file__), output_name])
|
||
.replace(
|
||
os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"])
|
||
)
|
||
.replace(" ", "")
|
||
)
|
||
# 调用ffmpeg
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={video_path: None},
|
||
outputs={
|
||
output: [
|
||
"-ss",
|
||
"%.3f" % (start_point / fps),
|
||
"-t",
|
||
"%.3f" % (duration / fps),
|
||
"-c:v",
|
||
"libx264",
|
||
"-c:a",
|
||
"libmp3lame",
|
||
"-reset_timestamps",
|
||
"1",
|
||
"-sc_threshold",
|
||
"0",
|
||
"-g",
|
||
"1",
|
||
"-force_key_frames",
|
||
"expr:gte(t, n_forced * 1)",
|
||
"-r" if force_match_fps else "",
|
||
"%d" % fps if force_match_fps else "",
|
||
"-v",
|
||
"-8"
|
||
]
|
||
},
|
||
)
|
||
print(ff.cmd)
|
||
ff.run()
|
||
# uuid填充改回原文件名
|
||
try:
|
||
os.remove(temp_fname)
|
||
except:
|
||
pass
|
||
# try:
|
||
# files = glob.glob(output.replace("%03d", "*"))
|
||
# for file in files:
|
||
# shutil.move(file, file.replace(str(uid), origin_fname))
|
||
# files = glob.glob(
|
||
# output.replace(str(uid), origin_fname).replace("%03d", "*")
|
||
# )
|
||
# except:
|
||
# files = glob.glob(output.replace("%03d", "*"))
|
||
# traceback.print_exc()
|
||
video, audio, info = torchvision.io.read_video(output)
|
||
audio.unsqueeze_(0)
|
||
try:
|
||
os.remove(output)
|
||
except:
|
||
pass
|
||
return (video / 255.0, {"waveform": audio, "sample_rate": info["audio_fps"]},)
|
||
except:
|
||
traceback.print_exc()
|
||
raise Exception("Cut Failed")
|
||
|
||
|
||
class VideoChangeFPS:
|
||
"""FFMPEG视频FPS转换"""
|
||
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"video_path": (
|
||
"STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}),
|
||
"fps": ("INT", {"default": 30}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("STRING",)
|
||
RETURN_NAMES = ("视频路径",)
|
||
|
||
FUNCTION = "changeFps"
|
||
|
||
OUTPUT_NODE = True
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def get_media_duration(self, input_file, stream_type="v"):
|
||
"""获取视频或音频的精确时长(秒)"""
|
||
cmd = [
|
||
"ffprobe", "-v", "error",
|
||
"-select_streams", f"{stream_type}:0",
|
||
"-show_entries", "stream=duration",
|
||
"-of", "csv=p=0",
|
||
input_file
|
||
]
|
||
return float(subprocess.check_output(cmd).decode().strip())
|
||
|
||
def adjust_video_fps(self, input_video, output_temp, target_fps):
|
||
"""仅调整视频FPS,输出临时文件"""
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={input_video: None},
|
||
outputs={
|
||
output_temp: f"-y -vf fps={target_fps} -c:v libx264 -c:a aac -preset slow -crf 16"
|
||
}
|
||
)
|
||
print("调整FPS命令:", ff.cmd)
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n")
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
ff.run(stdout=log, stderr=log)
|
||
|
||
def align_audio_to_video(self, input_video, output_final):
|
||
"""将音频精确对齐到视频长度"""
|
||
video_duration = self.get_media_duration(input_video, "v")
|
||
audio_duration = self.get_media_duration(input_video, "a")
|
||
print("video_duration:", video_duration, "audio_duration:", audio_duration)
|
||
|
||
# 动态选择补静音或剪切
|
||
filter_audio = (
|
||
f"atrim=end={video_duration},asetpts=PTS-STARTPTS"
|
||
if audio_duration > video_duration
|
||
else f"apad=whole_dur={video_duration},asetpts=PTS-STARTPTS"
|
||
)
|
||
|
||
ff = ffmpy.FFmpeg(
|
||
inputs={input_video: None},
|
||
outputs={
|
||
output_final: f""" -y
|
||
-filter_complex "
|
||
[0:v]copy[v];
|
||
[0:a]{filter_audio}[a]
|
||
"
|
||
-map "[v]" -map "[a]"
|
||
-c:v libx264 -c:a aac -preset slow -crf 16
|
||
"""
|
||
}
|
||
)
|
||
print("对齐音频命令:", ff.cmd)
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n")
|
||
with open("user/ffmpeg.txt", "a") as log:
|
||
ff.run(stdout=log, stderr=log)
|
||
|
||
def changeFps(self, video_path, fps):
|
||
try:
|
||
if not (video_path.startswith("/") or video_path.startswith("output/") or video_path[1] == ":"):
|
||
video_path = "output/" + video_path
|
||
loguru.logger.info("Processing video: %s" % video_path)
|
||
output_temp = ".".join([video_path.split(".")[-2] + "-%dfps-temp" % fps, video_path.split(".")[-1]])
|
||
output = ".".join([video_path.split(".")[-2] + "-%dfps" % fps, video_path.split(".")[-1]])
|
||
# 分步执行
|
||
self.adjust_video_fps(video_path, output_temp, fps) # 第一步:调整FPS
|
||
self.align_audio_to_video(output_temp, output) # 第二步:对齐音频
|
||
|
||
# 校验结果
|
||
final_video_dur = self.get_media_duration(output, "v")
|
||
final_audio_dur = self.get_media_duration(output, "a")
|
||
print("video_duration:", final_video_dur, "audio_duration:", final_audio_dur)
|
||
if abs(final_video_dur - final_audio_dur) > 0.01:
|
||
loguru.logger.warning(
|
||
f"音视频长度未对齐!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s")
|
||
else:
|
||
loguru.logger.success(f"处理成功!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s")
|
||
try:
|
||
os.remove(output_temp)
|
||
except:
|
||
pass
|
||
return (output,)
|
||
except:
|
||
traceback.print_exc()
|
||
raise Exception("ChangeFPS Failed")
|
||
|
||
|
||
def validate_time_format(time_str):
|
||
pattern = r'^([0-1][0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9]|\d{1,2}).(\d{3})$'
|
||
return bool(re.match(pattern, time_str))
|
||
|
||
|
||
def get_duration_wave(audio):
|
||
waveform, sample_rate = audio["waveform"], audio["sample_rate"]
|
||
# 防止话说不完
|
||
return waveform.shape[2] / sample_rate
|
||
|
||
|
||
class VideoStartPointDurationCompute:
|
||
@classmethod
|
||
def INPUT_TYPES(s):
|
||
return {
|
||
"required": {
|
||
"start_time": ("STRING", {"forceInput": True}),
|
||
"audio": ("AUDIO", {"forceInput": True}),
|
||
"end_padding": ("FLOAT", {"forceInput": True, "default": 0.4}),
|
||
"fps": ("INT", {"default": 25, "step": 1}),
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("FLOAT", "FLOAT",)
|
||
RETURN_NAMES = ("起始帧位", "帧数")
|
||
|
||
FUNCTION = "compute"
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def compute(self, start_time, audio, fps, end_padding):
|
||
if not validate_time_format(start_time):
|
||
raise ValueError("start_time或者end_time时间格式不对(start_time or end_time is not in time format)")
|
||
|
||
time_format = "%H:%M:%S.%f"
|
||
start_dt = datetime.strptime(start_time, time_format)
|
||
start_sec = (start_dt - datetime(1900, 1, 1)).total_seconds()
|
||
start_point = start_sec * fps
|
||
duration = get_duration_wave(audio)
|
||
loguru.logger.info("audio duration %.3f s" % duration)
|
||
duration = duration + end_padding
|
||
loguru.logger.info("audio duration with padding %.3f s" % duration)
|
||
return (start_point, duration * fps,)
|
||
|
||
|
||
def merge_videos(input_paths: List[str], output_path: str) -> str:
|
||
"""
|
||
按顺序拼接多个视频文件到一个输出文件
|
||
|
||
参数:
|
||
input_paths: 视频文件路径列表,按拼接顺序排列
|
||
output_path: 输出视频文件路径
|
||
"""
|
||
# 检查所有输入文件是否存在
|
||
for path in input_paths:
|
||
if not Path(path).exists():
|
||
raise FileNotFoundError(f"输入文件不存在: {path}")
|
||
|
||
# 创建临时文件列表
|
||
temp_filelist = os.path.join(os.path.dirname(__file__),"filelist.txt")
|
||
with open(temp_filelist, "w", encoding="utf-8") as f:
|
||
for path in input_paths:
|
||
# 处理路径中的引号和特殊字符
|
||
escaped_path = path.replace("'", r"'\''")
|
||
f.write(f"file '{escaped_path}'\n")
|
||
|
||
try:
|
||
# 使用ffmpeg执行拼接操作
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-f", "concat",
|
||
"-safe", "0",
|
||
"-i", str(temp_filelist),
|
||
"-c", "copy",
|
||
output_path
|
||
]
|
||
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True
|
||
)
|
||
|
||
print(f"视频拼接成功,输出文件: {output_path}")
|
||
print("ffmpeg 输出:", result.stderr)
|
||
return output_path
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"视频拼接失败: {e.stderr}")
|
||
raise e
|
||
finally:
|
||
if os.path.exists(temp_filelist):
|
||
os.remove(temp_filelist)
|
||
|
||
|
||
class VideoMerge:
|
||
@classmethod
|
||
def INPUT_TYPES(cls):
|
||
return {
|
||
"required": {
|
||
"video_list": ("STRING", {"default": "[]"})
|
||
},
|
||
}
|
||
|
||
RETURN_TYPES = ("STRING",)
|
||
RETURN_NAMES = ("视频路径",)
|
||
|
||
FUNCTION = "process"
|
||
|
||
CATEGORY = "不忘科技-自定义节点🚩/视频"
|
||
|
||
def process(self, video_list):
|
||
if isinstance(video_list, str):
|
||
video_list = json.loads(video_list)
|
||
return (merge_videos(video_list, os.path.join(folder_paths.get_output_directory(), f"merged_{uuid.uuid4()}.mp4")),)
|