diff --git a/src/BowongModalFunctions/utils/VideoUtils.py b/src/BowongModalFunctions/utils/VideoUtils.py index f22a32b..d900298 100644 --- a/src/BowongModalFunctions/utils/VideoUtils.py +++ b/src/BowongModalFunctions/utils/VideoUtils.py @@ -44,6 +44,8 @@ class FFMPEGSliceOptions(BaseModel): crf: int = Field(default=16, description="输出视频的质量") fps: int = Field(default=30, description="输出视频的FPS") + width: int = Field(default=1080, description="输出视频的宽(像素)") + height: int = Field(default=1920, description="输出视频的高(像素)") @computed_field(description="解析为字符表达式的文件大小") @property @@ -431,19 +433,42 @@ class VideoUtils: @staticmethod async def ffmpeg_slice_media(media_path: str, media_markers: List[FFMpegSliceSegment], - options: FFMPEGSliceOptions, + options: FFMPEGSliceOptions, is_streams: bool = False, output_path: Optional[str] = None) -> List[Tuple[str, VideoMetadata]]: """ - 使用本地视频文件按时间段切割出分段视频 + 使用本地视频文件按时间段切割出分段视频, 如果是直播流则按时间分段切割HLS视频流_预先多线程下载所有ts :param media_path: 本地视频路径 :param media_markers: 分段起始结束时间标记 :param options: 输出切割质量选项 + :param is_streams: 输入是否为直播流 :param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4, _2.mp4等片段编号 :return: 输出片段的本地路径 """ - ffmpeg_cmd = VideoUtils.async_ffmpeg_init() - ffmpeg_cmd.input(media_path) + + if not is_streams: + ffmpeg_cmd.input(media_path) + else: + seek_head = media_markers[0].start.total_seconds() + seek_tail = media_markers[-1].end.total_seconds() + duration = seek_tail - seek_head + logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s") + local_m3u8_path, temp_dir = await VideoUtils.convert_m3u8_to_local_source(media_path, head=seek_head, + tail=seek_tail) + logger.info(f"local_playlist: {local_m3u8_path}") + + for segment in media_markers: + segment.start = segment.start - timedelta(seconds=seek_head) + segment.end = segment.end - timedelta(seconds=seek_head) + logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s") + ffmpeg_cmd.input(media_path, + ss=seek_head, + t=duration, + protocol_whitelist="file,http,https,tcp,tls", + reconnect="1", # 自动重连 + reconnect_streamed="1", + reconnect_delay_max="5") + filter_complex: List[str] = [] temp_outputs: List[str] = [] if not output_path: @@ -452,12 +477,21 @@ class VideoUtils: metadata = VideoUtils.ffprobe_media_metadata(media_path) for index, marker in enumerate(media_markers): - filter_complex.extend( - [ - f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]", - f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]", - ] - ) + # 处理指定的输出分辨率 + if options.width and options.height: + filter_complex.extend( + [ + f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},scale={options.width}:{options.height},setpts=PTS-STARTPTS[cut{index}]", + f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]", + ] + ) + else: + filter_complex.extend( + [ + f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]", + f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]", + ] + ) ffmpeg_cmd.option('filter_complex', ';'.join(filter_complex)) diff_tolerance = 0.001 @@ -488,7 +522,7 @@ class VideoUtils: "r": options.fps } if options.limit_size: - ffmpeg_options["fs"] = options.pretty_limit_size + ffmpeg_options["fs"] = options.limit_size elif options.bit_rate: ffmpeg_options["b:v"] = options.pretty_bit_rate @@ -500,199 +534,6 @@ class VideoUtils: temp_outputs] return outputs - @staticmethod - async def ffmpeg_slice_stream_media(media_path: str, - media_markers: List[FFMpegSliceSegment], - output_path: Optional[str] = None) -> List[Tuple[str, VideoMetadata]]: - """ - 按时间分段切割HLS视频流 - :param media_path: hls manifest URL - :param media_markers: 分段起始结束时间标记 - :param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4, _2.mp4等片段编号 - :return: 输出片段的本地路径, 输出片段时长 - """ - import m3u8 - playlist = m3u8.load(media_path) - stream_total_duration: float = sum(segment.duration for segment in playlist.segments) - - seek_head = media_markers[0].start.total_seconds() - seek_tail = media_markers[-1].end.total_seconds() - duration = seek_tail - seek_head - logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s") - ffmpeg_cmd = VideoUtils.async_ffmpeg_init() - # ffmpeg_cmd.option('loglevel', 'debug') - ffmpeg_cmd.input(media_path, - ss=seek_head, - t=duration, - protocol_whitelist="file,http,https,tcp,tls", - reconnect="1", # 自动重连 - reconnect_streamed="1", - reconnect_delay_max="5") - - filter_complex: List[str] = [] - - temp_outputs: List[str] = [] - if not output_path: - output_path = FileUtils.file_path_extend(media_path, "slice") - if not output_path.endswith(".mp4"): - output_path = output_path + ".mp4" - os.makedirs(os.path.dirname(output_path), exist_ok=True) - - for index, marker in enumerate(media_markers): - if marker.start.total_seconds() > stream_total_duration or marker.start.total_seconds() < 0: - raise ValueError( - f"第{index}个切割点起始点{marker.start.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围") - if marker.end.total_seconds() > stream_total_duration or marker.end.total_seconds() < 0: - raise ValueError( - f"第{index}个切割点结束点{marker.end.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围") - - filter_complex.extend( - [ - f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]", - f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]", - ] - ) - ffmpeg_cmd.option('filter_complex', ';'.join(filter_complex)) - for i, marker in enumerate(media_markers): - output_filepath = FileUtils.file_path_extend(output_path, str(i)) - ffmpeg_cmd.output(output_filepath, - map=[f"[cut{i}]", f"[acut{i}]"], - reset_timestamps="1", - sc_threshold="0", - g="1", - force_key_frames="expr:gte(t,n_forced*1)", - vcodec="libx264", - acodec="aac", - crf=16, - r=30, ) - temp_outputs.append(output_filepath) - await ffmpeg_cmd.execute() - outputs: List[Tuple[str, VideoMetadata]] = [(output, VideoUtils.ffprobe_media_metadata(output)) for output in - temp_outputs] - return outputs - - @staticmethod - async def ffmpeg_slice_stream_media_multithread(media_path: str, - options: FFMPEGSliceOptions, - media_markers: List[FFMpegSliceSegment], - output_path: Optional[str] = None) -> List[ - Tuple[str, VideoMetadata]]: - """ - 按时间分段切割HLS视频流_预先多线程下载所有ts - :param media_path: hls manifest URL - :param media_markers: 分段起始结束时间标记 - :param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4, _2.mp4等片段编号 - :return: 输出片段的本地路径, 输出片段时长 - """ - seek_head = media_markers[0].start.total_seconds() - seek_tail = media_markers[-1].end.total_seconds() - duration = seek_tail - seek_head - logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s") - - local_m3u8_path, temp_dir = await VideoUtils.convert_m3u8_to_local_source(media_path, head=seek_head, - tail=seek_tail) - logger.info(f"local_playlist: {local_m3u8_path}") - stream_total_duration = duration - - for segment in media_markers: - segment.start = segment.start - timedelta(seconds=seek_head) - segment.end = segment.end - timedelta(seconds=seek_head) - - ffmpeg_cmd = VideoUtils.async_ffmpeg_init() - ffmpeg_cmd.input(local_m3u8_path, - protocol_whitelist="file,http,https,tcp,tls") - filter_complex: List[str] = [] - - temp_outputs: List[str] = [] - if not output_path: - output_path = FileUtils.file_path_extend(media_path, "slice") - if not output_path.endswith(".mp4"): - output_path = output_path + ".mp4" - os.makedirs(os.path.dirname(output_path), exist_ok=True) - - diff_tolerance = 0.001 - - for index, marker in enumerate(media_markers): - if marker.start.total_seconds() > stream_total_duration or marker.start.total_seconds() < 0: - raise ValueError( - f"第{index}个切割点起始点{marker.start.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围") - if marker.end.total_seconds() > stream_total_duration or marker.end.total_seconds() < 0: - if marker.end.total_seconds() > 0 and math.isclose(marker.end.total_seconds(), stream_total_duration, - rel_tol=diff_tolerance): - marker.end = TimeDelta(seconds=stream_total_duration) - logger.warning( - f"第{index}个切割点结束点{marker.end.total_seconds()}s接近视频时长[0-{stream_total_duration}s]范围") - filter_complex.extend( - [ - f"[v:0]trim=start={marker.start.total_seconds()},setpts=PTS-STARTPTS[cut{index}]", - f"[a:0]atrim=start={marker.start.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]", - ] - ) - else: - raise ValueError( - f"第{index}个切割点结束点{marker.end.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围") - else: - filter_complex.extend( - [ - f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]", - f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]", - ] - ) - - ffmpeg_cmd.option('filter_complex', ';'.join(filter_complex)) - for i, marker in enumerate(media_markers): - output_filepath = FileUtils.file_path_extend(output_path, str(i)) - ffmpeg_options = { - "map": [f"[cut{i}]", f"[acut{i}]"], - "reset_timestamps": "1", - "sc_threshold": "0", - "g": "1", - "force_key_frames": "expr:gte(t,n_forced*1)", - "vcodec": "libx264", - "acodec": "aac", - "crf": options.crf, - "r": options.fps - } - if options.limit_size: - ffmpeg_options["fs"] = options.pretty_limit_size - elif options.bit_rate: - ffmpeg_options["b:v"] = options.pretty_bit_rate - ffmpeg_cmd.output(output_filepath, options=ffmpeg_options) - temp_outputs.append(output_filepath) - await ffmpeg_cmd.execute() - VideoUtils.purge_temp_ts_dir(temp_dir) - outputs: List[Tuple[str, VideoMetadata]] = [(output, VideoUtils.ffprobe_media_metadata(output)) for output in - temp_outputs] - return outputs - - @staticmethod - async def ffmpeg_convert_stream_media(media_stream_url: str, - output_path: Optional[str] = None) -> Tuple[str, VideoMetadata]: - if not output_path: - output_path = FileUtils.file_path_extend(media_stream_url, "convert") - if not output_path.endswith(".mp4"): - output_path = output_path + ".mp4" - os.makedirs(os.path.dirname(output_path), exist_ok=True) - - ffmpeg_cmd = VideoUtils.async_ffmpeg_init() - ffmpeg_cmd.input(media_stream_url, - protocol_whitelist="file,http,https,tcp,tls", - reconnect="1", # 自动重连 - reconnect_streamed="1", - reconnect_delay_max="5") - ffmpeg_cmd.output(output_path, - reset_timestamps="1", - sc_threshold="0", - g="1", - force_key_frames="expr:gte(t,n_forced*1)", - vcodec="libx264", - acodec="aac", - crf=16, - r=30, ) - await ffmpeg_cmd.execute() - output: Tuple[str, VideoMetadata] = (output_path, VideoUtils.ffprobe_media_metadata(output_path)) - return output - @staticmethod async def async_download_file(url: str, output_path: Optional[str] = None) -> str | None | Any: t = 10 @@ -770,8 +611,8 @@ class VideoUtils: logger.exception(e) @staticmethod - async def ffmpeg_convert_stream_media_multithread(media_stream_url: str, options: FFMPEGSliceOptions, - output_path: Optional[str] = None) -> tuple[ + async def ffmpeg_convert_stream_media(media_stream_url: str, options: FFMPEGSliceOptions, + output_path: Optional[str] = None) -> tuple[ str, VideoMetadata] | None: if not output_path: output_path = FileUtils.file_path_extend(media_stream_url, "convert") diff --git a/src/cluster/ffmpeg_apps/convert_stream.py b/src/cluster/ffmpeg_apps/convert_stream.py index d202731..a7653f3 100644 --- a/src/cluster/ffmpeg_apps/convert_stream.py +++ b/src/cluster/ffmpeg_apps/convert_stream.py @@ -45,7 +45,7 @@ with ffmpeg_worker_image.imports(): raise NotImplementedError(f"暂不支持使用{media_stream.protocol.value}协议") output_path = f"{output_path_prefix}/{config.modal_environment}/convert_stream/{func_id}/output.mp4" - local_output, metadata = await VideoUtils.ffmpeg_convert_stream_media_multithread( + local_output, metadata = await VideoUtils.ffmpeg_convert_stream_media( media_stream_url=stream_url, options=options, output_path=output_path) diff --git a/src/cluster/ffmpeg_apps/slice_media.py b/src/cluster/ffmpeg_apps/slice_media.py index abec939..62a9609 100644 --- a/src/cluster/ffmpeg_apps/slice_media.py +++ b/src/cluster/ffmpeg_apps/slice_media.py @@ -37,53 +37,47 @@ with ffmpeg_worker_image.imports(): List[FFMPEGResult], Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() - @SentryUtils.sentry_tracker(name="视频文件切割任务", op="ffmpeg.slice.media", fn_id=fn_id, + @SentryUtils.sentry_tracker(name="视频文件/直播切割任务", op="ffmpeg.slice.media", fn_id=fn_id, sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None, sentry_baggage=sentry_trace.x_baggage if sentry_trace else None) @SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id) async def ffmpeg_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment], - fn_id: str, options: FFMPEGSliceOptions = None, + fn_id: str, options: FFMPEGSliceOptions = None, is_streams: bool = False, ) -> List[FFMPEGResult]: - cache_filepath = f"{s3_mount}/{media_source.cache_filepath}" - logger.info(f"从{media_source.urn}切割") - for i, marker in enumerate(media_markers): - logger.info(f"[{i}] {marker.start.toFormatStr()} --> {marker.end.toFormatStr()}") + if not is_streams: + cache_filepath = f"{s3_mount}/{media_source.cache_filepath}" + logger.info(f"从{media_source.urn}切割") + for i, marker in enumerate(media_markers): + logger.info(f"[{i}] {marker.start.toFormatStr()} --> {marker.end.toFormatStr()}") + else: + cache_filepath = media_source.path + segments = await VideoUtils.ffmpeg_slice_media(media_path=cache_filepath, media_markers=media_markers, options=options, + is_streams=is_streams, output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], content_length=FileUtils.get_file_size(segment[0])) for segment in segments] - @SentryUtils.sentry_tracker(name="直播视频切割任务", op="ffmpeg.slice.stream", fn_id=fn_id, - sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None, - sentry_baggage=sentry_trace.x_baggage if sentry_trace else None) - @SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id) - async def ffmpeg_hls_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment], - fn_id: str, options: FFMPEGSliceOptions, ) -> List[FFMPEGResult]: - hls_m3u8_url = media_source.path - segments = await VideoUtils.ffmpeg_slice_stream_media_multithread(media_path=hls_m3u8_url, - options=options, - media_markers=media_markers, - output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") - return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], - content_length=FileUtils.get_file_size(segment[0])) for segment in segments] - match media.protocol: case MediaProtocol.hls: - outputs = await ffmpeg_hls_slice_process(media_source=media, - media_markers=markers, - options=options, - fn_id=fn_id) + outputs = await ffmpeg_slice_process(media_source=media, + media_markers=markers, + options=options, + is_streams=True, + fn_id=fn_id) case MediaProtocol.vod: outputs = await ffmpeg_slice_process(media_source=media, media_markers=markers, options=options, + is_streams=False, fn_id=fn_id) case MediaProtocol.s3: outputs = await ffmpeg_slice_process(media_source=media, media_markers=markers, options=options, + is_streams=False, fn_id=fn_id) case _: # webhook不会报错,需要确认 raise NotImplementedError("暂不支持的协议")