import glob import os import re import shutil import subprocess import traceback import uuid from datetime import datetime import ffmpy import loguru import torchvision.io video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov'] class VideoCut: """FFMPEG视频剪辑""" @classmethod def INPUT_TYPES(s): return { "required": { "video_path": ( "STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "start": ("STRING", {"default": "00:00:00.000"}), "end": ("STRING", {"default": "00:00:10.000"}), }, } RETURN_TYPES = ("IMAGE", "AUDIO") RETURN_NAMES = ("视频帧", "音频") FUNCTION = "cut" # OUTPUT_NODE = False CATEGORY = "不忘科技-自定义节点🚩/视频" def cut(self, video_path, start, end): try: # 原文件名 origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1]) # 新文件名 复制改名适配ffmpeg uid = uuid.uuid1() temp_fname = os.sep.join( [ *video_path.split(os.sep)[:-1], "%s.%s" % (str(uid), video_path.split(".")[-1]), ] ) try: shutil.copy(video_path, temp_fname) except: return ("请检查输入文件权限",) video_path = temp_fname # 组装输出文件名 output_name = ".".join( [ *video_path.split(os.sep)[-1].split(".")[:-2], video_path.split(os.sep)[-1].split(".")[-2] + "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"), video_path.split(os.sep)[-1].split(".")[-1], ] ) output = ( os.sep.join([*video_path.split(os.sep)[:-1], output_name]) .replace( os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"]) ) .replace(" ", "") ) # 调用ffmpeg ff = ffmpy.FFmpeg( inputs={video_path: None}, outputs={ output: [ "-ss", start, "-to", end, "-c:v", "libx264", "-c:a", "libmp3lame", "-reset_timestamps", "1", "-sc_threshold", "0", "-g", "1", "-force_key_frames", "expr:gte(t, n_forced * 1)", "-v", "-8" ] }, ) print(ff.cmd) ff.run() # uuid填充改回原文件名 try: os.remove(temp_fname) except: pass try: files = glob.glob(output.replace("%03d", "*")) for file in files: shutil.move(file, file.replace(str(uid), origin_fname)) files = glob.glob( output.replace(str(uid), origin_fname).replace("%03d", "*") ) except: files = glob.glob(output.replace("%03d", "*")) traceback.print_exc() video, audio, info = torchvision.io.read_video(files[0]) audio.unsqueeze_(0) try: os.remove(files[0]) except: pass return (video/255.0, {"waveform": audio, "sample_rate": info["audio_fps"]} if "audio_fps" in info else None,) except: traceback.print_exc() raise Exception("Cut Failed") class VideoCutByFramePoint: """FFMPEG视频剪辑-帧位""" @classmethod def INPUT_TYPES(s): return { "required": { "video_path": ( "STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "start_point": ("FLOAT", {"default": "0.0"}), "duration": ("FLOAT", {"default": "10.0"}), "fps": ("INT", {"default": "25"}), "force_match_fps": ("BOOLEAN", {"default": True}), }, } RETURN_TYPES = ("IMAGE", "AUDIO") RETURN_NAMES = ("视频帧", "音频") FUNCTION = "cut" # OUTPUT_NODE = False CATEGORY = "不忘科技-自定义节点🚩/视频" def cut(self, video_path, start_point, duration, fps, force_match_fps): try: # 原文件名 origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1]) # 新文件名 复制改名适配ffmpeg uid = uuid.uuid1() temp_fname = os.sep.join( [ os.path.dirname(__file__), "%s.%s" % (str(uid), video_path.split(".")[-1]), ] ) try: shutil.copy(video_path, temp_fname) except: return ("请检查输入文件权限",) video_path = temp_fname # 组装输出文件名 output_name = ".".join( [ *video_path.split(os.sep)[-1].split(".")[:-2], video_path.split(os.sep)[-1].split(".")[-2] + "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"), video_path.split(os.sep)[-1].split(".")[-1], ] ) output = ( os.sep.join([os.path.dirname(__file__), output_name]) .replace( os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"]) ) .replace(" ", "") ) # 调用ffmpeg ff = ffmpy.FFmpeg( inputs={video_path: None}, outputs={ output: [ "-ss", "%.3f" % (start_point / fps), "-t", "%.3f" % (duration / fps), "-c:v", "libx264", "-c:a", "libmp3lame", "-reset_timestamps", "1", "-sc_threshold", "0", "-g", "1", "-force_key_frames", "expr:gte(t, n_forced * 1)", "-r" if force_match_fps else "", "%d" % fps if force_match_fps else "", "-v", "-8" ] }, ) print(ff.cmd) ff.run() # uuid填充改回原文件名 try: os.remove(temp_fname) except: pass # try: # files = glob.glob(output.replace("%03d", "*")) # for file in files: # shutil.move(file, file.replace(str(uid), origin_fname)) # files = glob.glob( # output.replace(str(uid), origin_fname).replace("%03d", "*") # ) # except: # files = glob.glob(output.replace("%03d", "*")) # traceback.print_exc() video, audio, info = torchvision.io.read_video(output) audio.unsqueeze_(0) try: os.remove(output) except: pass return (video/255.0, {"waveform": audio, "sample_rate": info["audio_fps"]},) except: traceback.print_exc() raise Exception("Cut Failed") class VideoChangeFPS: """FFMPEG视频FPS转换""" @classmethod def INPUT_TYPES(s): return { "required": { "video_path": ( "STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "fps": ("INT", {"default": 30}), }, } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("视频路径",) FUNCTION = "changeFps" OUTPUT_NODE = True CATEGORY = "不忘科技-自定义节点🚩/视频" def get_media_duration(self, input_file, stream_type="v"): """获取视频或音频的精确时长(秒)""" cmd = [ "ffprobe", "-v", "error", "-select_streams", f"{stream_type}:0", "-show_entries", "stream=duration", "-of", "csv=p=0", input_file ] return float(subprocess.check_output(cmd).decode().strip()) def adjust_video_fps(self, input_video, output_temp, target_fps): """仅调整视频FPS,输出临时文件""" ff = ffmpy.FFmpeg( inputs={input_video: None}, outputs={ output_temp: f"-y -vf fps={target_fps} -c:v libx264 -c:a aac -preset slow -crf 16" } ) print("调整FPS命令:", ff.cmd) with open("user/ffmpeg.txt", "a") as log: log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n") with open("user/ffmpeg.txt", "a") as log: ff.run(stdout=log, stderr=log) def align_audio_to_video(self, input_video, output_final): """将音频精确对齐到视频长度""" video_duration = self.get_media_duration(input_video, "v") audio_duration = self.get_media_duration(input_video, "a") print("video_duration:", video_duration, "audio_duration:", audio_duration) # 动态选择补静音或剪切 filter_audio = ( f"atrim=end={video_duration},asetpts=PTS-STARTPTS" if audio_duration > video_duration else f"apad=whole_dur={video_duration},asetpts=PTS-STARTPTS" ) ff = ffmpy.FFmpeg( inputs={input_video: None}, outputs={ output_final: f""" -y -filter_complex " [0:v]copy[v]; [0:a]{filter_audio}[a] " -map "[v]" -map "[a]" -c:v libx264 -c:a aac -preset slow -crf 16 """ } ) print("对齐音频命令:", ff.cmd) with open("user/ffmpeg.txt", "a") as log: log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n") with open("user/ffmpeg.txt", "a") as log: ff.run(stdout=log, stderr=log) def changeFps(self, video_path, fps): try: if not (video_path.startswith("/") or video_path.startswith("output/") or video_path[1] == ":"): video_path = "output/" + video_path loguru.logger.info("Processing video: %s" % video_path) output_temp = ".".join([video_path.split(".")[-2] + "-%dfps-temp" % fps, video_path.split(".")[-1]]) output = ".".join([video_path.split(".")[-2] + "-%dfps" % fps, video_path.split(".")[-1]]) # 分步执行 self.adjust_video_fps(video_path, output_temp, fps) # 第一步:调整FPS self.align_audio_to_video(output_temp, output) # 第二步:对齐音频 # 校验结果 final_video_dur = self.get_media_duration(output, "v") final_audio_dur = self.get_media_duration(output, "a") print("video_duration:", final_video_dur, "audio_duration:", final_audio_dur) if abs(final_video_dur - final_audio_dur) > 0.01: loguru.logger.warning( f"音视频长度未对齐!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s") else: loguru.logger.success(f"处理成功!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s") try: os.remove(output_temp) except: pass return (output,) except: traceback.print_exc() raise Exception("ChangeFPS Failed") def validate_time_format(time_str): pattern = r'^([0-1][0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9]|\d{1,2}).(\d{3})$' return bool(re.match(pattern, time_str)) def get_duration_wave(audio): waveform, sample_rate = audio["waveform"], audio["sample_rate"] # 防止话说不完 return waveform.shape[2] / sample_rate class VideoStartPointDurationCompute: @classmethod def INPUT_TYPES(s): return { "required": { "start_time": ("STRING", {"forceInput": True}), "audio": ("AUDIO", {"forceInput": True}), "end_padding": ("FLOAT", {"forceInput": True, "default": 0.4}), "fps": ("INT", {"default": 25, "step": 1}), }, } RETURN_TYPES = ("FLOAT", "FLOAT",) RETURN_NAMES = ("起始帧位", "帧数") FUNCTION = "compute" CATEGORY = "不忘科技-自定义节点🚩/视频" def compute(self, start_time, audio, fps, end_padding): if not validate_time_format(start_time): raise ValueError("start_time或者end_time时间格式不对(start_time or end_time is not in time format)") time_format = "%H:%M:%S.%f" start_dt = datetime.strptime(start_time, time_format) start_sec = (start_dt - datetime(1900, 1, 1)).total_seconds() start_point = start_sec * fps duration = get_duration_wave(audio) loguru.logger.info("audio duration %.3f s" % duration) duration = duration + end_padding loguru.logger.info("audio duration with padding %.3f s" % duration) return (start_point, duration * fps,)