From cdf7ccc697bc82a42c63e92ec38394773db54ce7 Mon Sep 17 00:00:00 2001 From: "shuohigh@gmail.com" Date: Mon, 9 Jun 2025 12:37:06 +0800 Subject: [PATCH] =?UTF-8?q?fix=20:=20hls=E5=BD=95=E5=88=B6=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E8=AF=BB=E5=86=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/BowongModalFunctions/models/web_model.py | 1 + src/BowongModalFunctions/router/ffmpeg.py | 1 + src/BowongModalFunctions/utils/VideoUtils.py | 23 ++-- src/cluster/ffmpeg_app.py | 126 ++++++++----------- 4 files changed, 67 insertions(+), 84 deletions(-) diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index e85c899..0aee4d8 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -428,3 +428,4 @@ class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest): stream_source: str = Field(description="直播源地址") segment_duration: int = Field(default=5, description="hls片段时长(秒)") recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)") + monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时") diff --git a/src/BowongModalFunctions/router/ffmpeg.py b/src/BowongModalFunctions/router/ffmpeg.py index 4bf4f2b..72ee316 100644 --- a/src/BowongModalFunctions/router/ffmpeg.py +++ b/src/BowongModalFunctions/router/ffmpeg.py @@ -217,6 +217,7 @@ async def stream_record_vod(body: FFMPEGStreamRecordRequest, fn_call = fn.spawn(stream_url=body.stream_source, segment_duration=body.segment_duration, recording_timeout=body.recording_timeout, + monitor_timeout=body.monitor_timeout, sentry_trace=sentry_trace, webhook=body.webhook) manifest_link = f"{config.S3_cdn_endpoint}/{config.modal_environment}/records/hls/{fn_call.object_id}/playlist.m3u8" diff --git a/src/BowongModalFunctions/utils/VideoUtils.py b/src/BowongModalFunctions/utils/VideoUtils.py index ca50a58..d836cde 100644 --- a/src/BowongModalFunctions/utils/VideoUtils.py +++ b/src/BowongModalFunctions/utils/VideoUtils.py @@ -314,7 +314,7 @@ class VideoUtils: return output_path @staticmethod - def async_ffmpeg_init(use_ffprobe: bool = False) -> AsyncFFmpeg: + def async_ffmpeg_init(use_ffprobe: bool = False, quiet: bool = False) -> AsyncFFmpeg: if use_ffprobe: ffmpeg_cmd = AsyncFFmpeg('ffprobe') else: @@ -335,7 +335,8 @@ class VideoUtils: @ffmpeg_cmd.on("progress") def on_progress(progress): - logger.info(f"处理进度: {progress}") + if not quiet: + logger.info(f"处理进度: {progress}") @ffmpeg_cmd.on("completed") def on_completed(): @@ -343,11 +344,12 @@ class VideoUtils: @ffmpeg_cmd.on("stderr") def on_stderr(line: str): - if line.startswith('Error'): + if line.startswith('Error') and ".m3u8" not in line: logger.error(line) raise RuntimeError(line) else: - logger.warning(line) + if not quiet: + logger.warning(line) return ffmpeg_cmd @@ -946,15 +948,15 @@ class VideoUtils: async def ffmpeg_stream_record_as_hls(stream_url: str, segments_output_dir: str, playlist_output_dir: str, - playlist_output_method: Optional[str] = None, - playlist_output_headers: Optional[Dict[str, str]] = None, manifest_segment_prefix: Optional[str] = None, segment_duration: float = 5.0, stream_content_timeout: int = 300, + stream_monitor_timeout: int = 36000, output_file_pattern: str = "%10d.ts"): os.makedirs(segments_output_dir, exist_ok=True) - ffmpeg_cmd = VideoUtils.async_ffmpeg_init() + ffmpeg_cmd = VideoUtils.async_ffmpeg_init(quiet=True) # ffmpeg_cmd.option("loglevel", "debug") + ffmpeg_cmd.option("t", stream_monitor_timeout) ffmpeg_cmd.input(stream_url, protocol_whitelist="file,http,https,tcp,tls", # 使用flv reconnect="1", # 自动重连 @@ -966,14 +968,11 @@ class VideoUtils: f="hls", # flags="+cgop", # g=30, - method=playlist_output_method, - headers="\\r\\n".join([f"{header_key}:{playlist_output_headers[header_key]}" for header_key in - playlist_output_headers.keys()]) if playlist_output_headers else None, hls_time=segment_duration, hls_segment_filename=f"{segments_output_dir}/{output_file_pattern}", hls_segment_type="mpegts", - hls_flags="independent_segments+program_date_time", - hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None, + hls_flags="append_list+independent_segments+program_date_time", + # hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None, hls_playlist_type="event", hls_list_size=0, hls_start_number_source="epoch_us", diff --git a/src/cluster/ffmpeg_app.py b/src/cluster/ffmpeg_app.py index 424bed4..ee3b502 100644 --- a/src/cluster/ffmpeg_app.py +++ b/src/cluster/ffmpeg_app.py @@ -560,6 +560,10 @@ with ffmpeg_worker_image.imports(): return result, sentry_trace + hls_recording_volume = modal.Volume.from_name("stream_records", create_if_missing=True) + hls_recording_mount_point = "/mnt/stream_records" + + @app.function(timeout=43200, # 最长处理12h的录制任务 cloud="aws", # todo: 暂时不限制最大同时存在的录制数量 @@ -570,27 +574,22 @@ with ffmpeg_worker_image.imports(): secret=modal.Secret.from_name("aws-s3-secret", environment_name=config.modal_environment), ), + hls_recording_mount_point: hls_recording_volume }, ) @modal.concurrent(max_inputs=5) async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int, - webhook: Optional[WebhookNotify] = None, + webhook: Optional[WebhookNotify] = None, monitor_timeout: int = 36000, sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() output_dir = f"{config.modal_environment}/records/hls/{fn_id}" - recording_output_dir = f"{s3_mount}/{output_dir}" - manifest_output_dir = f"{output_path_prefix}/{output_dir}" - logger.info(f"manifest = {manifest_output_dir}/playlist.m3u8") - manifest_segment_prefix = f"{config.S3_cdn_endpoint}/{recording_output_dir}/" - - from watchdog.events import (FileCreatedEvent, - FileModifiedEvent, - DirCreatedEvent, - DirModifiedEvent, - FileMovedEvent, - FileSystemEventHandler) + s3_mount_output_dir = f"{s3_mount}/{output_dir}" + os.makedirs(s3_mount_output_dir, exist_ok=True) + volume_output_dir = f"{hls_recording_mount_point}/{output_dir}" + os.makedirs(volume_output_dir, exist_ok=True) + logger.info(f"manifest = {volume_output_dir}/playlist.m3u8") + from watchdog.events import FileMovedEvent, FileCreatedEvent, FileSystemEventHandler from watchdog.observers import Observer - import m3u8 class PlaylistEventHandler(FileSystemEventHandler): update_counter: int = 0 @@ -602,39 +601,36 @@ with ffmpeg_worker_image.imports(): self.webhook = webhook self.fn_id = fn_id - def on_created(self, event: Union[FileCreatedEvent, DirCreatedEvent]) -> None: - logger.info(f"[dir={event.is_directory}](created)@{event.src_path}") + def on_created(self, event: FileCreatedEvent) -> None: + logger.info(f"[created] {event.src_path}") + if event.src_path.endswith(".ts"): + filename = os.path.basename(event.src_path) + mount_path = f"{s3_mount_output_dir}/{filename}" + # 将Volume内的ts文件复制到S3挂载点 + shutil.copy(event.src_path, mount_path) + logger.info(f"[copy] {event.src_path} -> {mount_path}") + else: + return - def on_modified(self, event: Union[FileModifiedEvent, DirModifiedEvent]) -> None: - logger.info(f"[dir={event.is_directory}](modified)@{event.src_path}") - - def on_moved(self, event: Union[DirMovedEvent, FileMovedEvent]) -> None: - logger.info(f"[dir={event.is_directory}](moved)@{event.src_path} -> {event.dest_path}") - if isinstance(event, FileMovedEvent): - if event.is_directory: - return - if not event.dest_path.endswith(".m3u8"): - return - playlist = m3u8.load(event.dest_path) - logger.info("playlist file loaded") - for segment in playlist.segments: - new_uri = segment.uri.replace(f"{config.S3_cdn_endpoint}//mntS3/", "") - local_uri = os.path.basename(new_uri) - logger.info(f"{segment.uri} -> {local_uri}") - segment.uri = local_uri - live_playlist = event.dest_path.replace(output_path_prefix, s3_mount) - with open(live_playlist, "w") as f: - f.write(playlist.dumps()) - logger.info(f"live_playlist file written to {live_playlist}") - self.update_counter += 1 - if self.webhook and self.update_counter == 1: - logger.info("[Start] webhook trigger") - try: - self.webhook_on_start() - except Exception as e: - logger.exception(e) - else: - logger.info("[Start] no webhook") + def on_moved(self, event: FileMovedEvent) -> None: + logger.info(f"[moved] {event.src_path} -> {event.dest_path}") + if not event.dest_path.endswith(".m3u8"): + return + filename = os.path.basename(event.dest_path) + filename.replace('.tmp', '') + mount_path = f"{s3_mount_output_dir}/{filename}" + # 将Volume内的playlist.m3u8.tmp复制到S3挂载点的playlist.m3u8文件 + shutil.copy(event.dest_path, mount_path) + logger.info(f"[copy] {event.dest_path} -> {mount_path}") + self.update_counter += 1 + if self.webhook and self.update_counter == 1: + logger.info("[Start] webhook trigger") + try: + self.webhook_on_start() + except Exception as e: + logger.exception(e) + else: + logger.info("[Start] no webhook") @backoff.on_exception(exception=Exception, wait_gen=backoff.constant, max_time=5, max_tries=5, raise_on_giveup=True) @@ -654,19 +650,6 @@ with ffmpeg_worker_image.imports(): response.raise_for_status() logger.info(f"[Start] webhook {response.status_code} {response.text}") - async def ffmpeg_process(stream_url: str, - output_dir: Optional[str] = None, - manifest_output_dir: Optional[str] = None, - manifest_segment_prefix: Optional[str] = None, - segment_duration: float = 5.0, - stream_content_timeout: int = 300, ): - await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url, - segment_duration=segment_duration, - stream_content_timeout=stream_content_timeout, - segments_output_dir=output_dir, - playlist_output_dir=manifest_output_dir, - manifest_segment_prefix=manifest_segment_prefix) - @backoff.on_exception(exception=Exception, wait_gen=backoff.constant, max_time=5, max_tries=5, raise_on_giveup=False) async def webhook_on_end(fn_id: str, webhook: WebhookNotify, result: FFMPEGResult): @@ -697,29 +680,27 @@ with ffmpeg_worker_image.imports(): playlist_handler = PlaylistEventHandler(webhook=webhook, fn_id=fn_id) playlist_observer = Observer() - os.makedirs(manifest_output_dir, exist_ok=True) - playlist_observer.schedule(playlist_handler, - path=f"{manifest_output_dir}", - recursive=False) + os.makedirs(volume_output_dir, exist_ok=True) + # 监控本地Volume下录制缓存目录 + playlist_observer.schedule(playlist_handler, path=volume_output_dir, recursive=False) playlist_observer.start() try: - await ffmpeg_process(stream_url=stream_url, - segment_duration=segment_duration, - stream_content_timeout=recording_timeout, - output_dir=recording_output_dir, - manifest_output_dir=manifest_output_dir, - manifest_segment_prefix=manifest_segment_prefix, ) + await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url, + segment_duration=segment_duration, + stream_content_timeout=recording_timeout, + stream_monitor_timeout=monitor_timeout, + segments_output_dir=volume_output_dir, + playlist_output_dir=volume_output_dir) except Exception as e: logger.exception(e) playlist_observer.stop() if webhook: - await webhook_on_error(webhook=webhook, - fn_id=fn_id, + await webhook_on_error(webhook=webhook, fn_id=fn_id, error=e.message if hasattr(e, 'message') else str(e)) raise e playlist_observer.stop() - content_length = FileUtils.get_folder_size(recording_output_dir) - metadata = VideoUtils.ffprobe_media_metadata(media_path=f"{manifest_output_dir}/playlist.m3u8") + content_length = FileUtils.get_folder_size(volume_output_dir) + metadata = VideoUtils.ffprobe_media_metadata(media_path=f"{volume_output_dir}/playlist.m3u8") result = FFMPEGResult(urn=f"s3://{config.S3_region}/{config.S3_bucket_name}/{output_dir}/playlist.m3u8", metadata=metadata, content_length=content_length, ) @@ -728,4 +709,5 @@ with ffmpeg_worker_image.imports(): if not sentry_trace: sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), x_baggage=sentry_sdk.get_baggage()) + shutil.rmtree(volume_output_dir) return result, sentry_trace