import errno import glob import os import shutil import subprocess import sys import traceback import uuid from datetime import datetime import ffmpy import loguru import torchvision.io from ffmpy import FFExecutableNotFoundError, FFRuntimeError video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov'] class VideoCut: """FFMPEG视频剪辑""" @classmethod def INPUT_TYPES(s): return { "required": { "video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "start": ("STRING", {"default": "00:00:00.000"}), "end": ("STRING", {"default": "00:00:10.000"}), }, } RETURN_TYPES = ("IMAGE","AUDIO") RETURN_NAMES = ("视频帧","音频") FUNCTION = "cut" # OUTPUT_NODE = False CATEGORY = "不忘科技-自定义节点🚩/视频" def cut(self, video_path, start, end): try: # 原文件名 origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1]) # 新文件名 复制改名适配ffmpeg uid = uuid.uuid1() temp_fname = os.sep.join( [ *video_path.split(os.sep)[:-1], "%s.%s" % (str(uid), video_path.split(".")[-1]), ] ) try: shutil.copy(video_path, temp_fname) except: return ("请检查输入文件权限",) video_path = temp_fname # 组装输出文件名 output_name = ".".join( [ *video_path.split(os.sep)[-1].split(".")[:-2], video_path.split(os.sep)[-1].split(".")[-2] + "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"), video_path.split(os.sep)[-1].split(".")[-1], ] ) output = ( os.sep.join([*video_path.split(os.sep)[:-1], output_name]) .replace( os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"]) ) .replace(" ", "") ) # 调用ffmpeg ff = ffmpy.FFmpeg( inputs={video_path: None}, outputs={ output: [ "-ss", start, "-to", end, "-c:v", "libx264", "-c:a", "libmp3lame", "-reset_timestamps", "1", "-sc_threshold", "0", "-g", "1" "-force_key_frames", "expr:gte(t, n_forced * 1)", "-v", "-8" ] }, ) print(ff.cmd) ff.run() # uuid填充改回原文件名 try: os.remove(temp_fname) except: pass try: files = glob.glob(output.replace("%03d", "*")) for file in files: shutil.move(file, file.replace(str(uid), origin_fname)) files = glob.glob( output.replace(str(uid), origin_fname).replace("%03d", "*") ) except: files = glob.glob(output.replace("%03d", "*")) traceback.print_exc() video, audio, info = torchvision.io.read_video(files[0]) audio.unsqueeze_(0) try: os.remove(files[0]) except: pass return (video, {"waveform":audio,"sample_rate":info["audio_fps"]},) except: traceback.print_exc() raise Exception("Cut Failed") class VideoCutByFramePoint: """FFMPEG视频剪辑-帧位""" @classmethod def INPUT_TYPES(s): return { "required": { "video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "start_point": ("FLOAT", {"default": "0.0"}), "duration": ("FLOAT", {"default": "10.0"}), "fps": ("INT", {"default": "25"}), "force_match_fps": ("BOOLEAN", {"default": True}), }, } RETURN_TYPES = ("IMAGE","AUDIO") RETURN_NAMES = ("视频帧","音频") FUNCTION = "cut" # OUTPUT_NODE = False CATEGORY = "不忘科技-自定义节点🚩/视频" def cut(self, video_path, start_point, duration, fps, force_match_fps): try: # 原文件名 origin_fname = ".".join(video_path.split(os.sep)[-1].split(".")[:-1]) # 新文件名 复制改名适配ffmpeg uid = uuid.uuid1() temp_fname = os.sep.join( [ os.path.dirname(__file__), "%s.%s" % (str(uid), video_path.split(".")[-1]), ] ) try: shutil.copy(video_path, temp_fname) except: return ("请检查输入文件权限",) video_path = temp_fname # 组装输出文件名 output_name = ".".join( [ *video_path.split(os.sep)[-1].split(".")[:-2], video_path.split(os.sep)[-1].split(".")[-2] + "_output_%s" % datetime.now().strftime("%Y%m%d_%H%M%S"), video_path.split(os.sep)[-1].split(".")[-1], ] ) output = ( os.sep.join([os.path.dirname(__file__), output_name]) .replace( os.sep.join(["ComfyUI", "input"]), os.sep.join(["ComfyUI", "output"]) ) .replace(" ", "") ) # 调用ffmpeg ff = ffmpy.FFmpeg( inputs={video_path: None}, outputs={ output: [ "-ss", "%.3f" % (start_point/fps), "-t", "%.3f" % (duration/fps), "-c:v", "libx264", "-c:a", "libmp3lame", "-reset_timestamps", "1", "-sc_threshold", "0", "-g", "1", "-force_key_frames", "expr:gte(t, n_forced * 1)", "-r" if force_match_fps else "", "%d" % fps if force_match_fps else "", "-v", "-8" ] }, ) print(ff.cmd) ff.run() # uuid填充改回原文件名 try: os.remove(temp_fname) except: pass # try: # files = glob.glob(output.replace("%03d", "*")) # for file in files: # shutil.move(file, file.replace(str(uid), origin_fname)) # files = glob.glob( # output.replace(str(uid), origin_fname).replace("%03d", "*") # ) # except: # files = glob.glob(output.replace("%03d", "*")) # traceback.print_exc() video, audio, info = torchvision.io.read_video(output) audio.unsqueeze_(0) try: os.remove(output) except: pass return (video, {"waveform":audio,"sample_rate":info["audio_fps"]},) except: traceback.print_exc() raise Exception("Cut Failed") class VideoChangeFPS: """FFMPEG视频FPS转换""" @classmethod def INPUT_TYPES(s): return { "required": { "video_path": ("STRING",{"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), "fps": ("INT", {"default": 30}), }, } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("视频路径",) FUNCTION = "changeFps" OUTPUT_NODE = True CATEGORY = "不忘科技-自定义节点🚩/视频" def get_media_duration(self, input_file, stream_type="v"): """获取视频或音频的精确时长(秒)""" cmd = [ "ffprobe", "-v", "error", "-select_streams", f"{stream_type}:0", "-show_entries", "stream=duration", "-of", "csv=p=0", input_file ] return float(subprocess.check_output(cmd).decode().strip()) def adjust_video_fps(self, input_video, output_temp, target_fps): """仅调整视频FPS,输出临时文件""" ff = ffmpy.FFmpeg( inputs={input_video: None}, outputs={ output_temp: f"-y -vf fps={target_fps} -c:v libx264 -c:a aac" } ) print("调整FPS命令:", ff.cmd) with open("user/ffmpeg.txt", "a") as log: log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n") with open("user/ffmpeg.txt", "a") as log: ff.run(stdout=log, stderr=log) def align_audio_to_video(self, input_video, output_final): """将音频精确对齐到视频长度""" video_duration = self.get_media_duration(input_video, "v") audio_duration = self.get_media_duration(input_video, "a") print("video_duration:", video_duration, "audio_duration:", audio_duration) # 动态选择补静音或剪切 filter_audio = ( f"atrim=end={video_duration},asetpts=PTS-STARTPTS" if audio_duration > video_duration else f"apad=whole_dur={video_duration},asetpts=PTS-STARTPTS" ) ff = ffmpy.FFmpeg( inputs={input_video: None}, outputs={ output_final: f""" -y -filter_complex " [0:v]copy[v]; [0:a]{filter_audio}[a] " -map "[v]" -map "[a]" -c:v libx264 -c:a aac """ } ) print("对齐音频命令:", ff.cmd) with open("user/ffmpeg.txt", "a") as log: log.write("\n----" + f"{datetime.now()}----\n" + ff.cmd + "\n========\n") with open("user/ffmpeg.txt", "a") as log: ff.run(stdout=log, stderr=log) def changeFps(self, video_path, fps): try: if not (video_path.startswith("/") or video_path.startswith("output/") or video_path[1] == ":"): video_path = "output/" + video_path loguru.logger.info("Processing video: %s" % video_path) output_temp = ".".join([video_path.split(".")[-2] + "-%dfps-temp" % fps, video_path.split(".")[-1]]) output = ".".join([video_path.split(".")[-2]+"-%dfps" % fps,video_path.split(".")[-1]]) # 分步执行 self.adjust_video_fps(video_path, output_temp, fps) # 第一步:调整FPS self.align_audio_to_video(output_temp, output) # 第二步:对齐音频 # 校验结果 final_video_dur = self.get_media_duration(output, "v") final_audio_dur = self.get_media_duration(output, "a") print("video_duration:", final_video_dur, "audio_duration:", final_audio_dur) if abs(final_video_dur - final_audio_dur) > 0.01: loguru.logger.warning(f"音视频长度未对齐!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s") else: loguru.logger.success(f"处理成功!视频长度: {final_video_dur:.3f}s, 音频长度: {final_audio_dur:.3f}s") try: os.remove(output_temp) except: pass return (output,) except: traceback.print_exc() raise Exception("ChangeFPS Failed")