diff --git a/src/BowongModalFunctions/router/ffmpeg.py b/src/BowongModalFunctions/router/ffmpeg.py index e8a8f06..77e9307 100644 --- a/src/BowongModalFunctions/router/ffmpeg.py +++ b/src/BowongModalFunctions/router/ffmpeg.py @@ -204,7 +204,9 @@ async def video_extract_frame(body: FFMPEGExtractFrameRequest, return ModalTaskResponse(success=True, taskId=fn_call.object_id) -@router.post("/record/hls", summary="发起直播录制为HLS任务", ) +@router.post("/record/hls", summary="发起直播录制为HLS任务", + description="录制任务最长存在时间为12小时, 录制任务完成后所获得的hls视频资源只存在3天,如需持久存储请通过使用manifest_urn发起'直播流转换任务'转换为mp4存储", + response_description="返回发起的任务id和成功录制完第一个片段后通过CDN缓存获得的m3u8播放列表") async def stream_record_vod(body: FFMPEGStreamRecordRequest, headers: Annotated[SentryTransactionHeader, Header()]) -> RecordingTaskResponse: fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_stream_record_as_hls", diff --git a/src/BowongModalFunctions/utils/PathUtils.py b/src/BowongModalFunctions/utils/PathUtils.py index a1609ef..f71bbfa 100644 --- a/src/BowongModalFunctions/utils/PathUtils.py +++ b/src/BowongModalFunctions/utils/PathUtils.py @@ -1,4 +1,6 @@ import os +from pathlib import Path +from typing import List class FileUtils: @@ -27,3 +29,22 @@ class FileUtils: filenames[-1] = extension filename = ".".join(filenames) return os.path.join(media_dir, filename) + + @staticmethod + def get_folder_size(folder_path: str) -> int: + total_size = 0 + for path in Path(folder_path).rglob('*'): + if path.is_file(): + total_size += path.stat().st_size + return total_size + + @staticmethod + def get_file_size(file_path: str) -> int: + return os.path.getsize(file_path) + + @staticmethod + def get_files_size(files: List[str]) -> int: + total_size = 0 + for file in files: + total_size += os.path.getsize(file) + return total_size diff --git a/src/cluster/ffmpeg_app.py b/src/cluster/ffmpeg_app.py index 69eea03..1dcff1a 100644 --- a/src/cluster/ffmpeg_app.py +++ b/src/cluster/ffmpeg_app.py @@ -92,7 +92,7 @@ with ffmpeg_worker_image.imports(): output_path=output_filepath) s3_outputs = local_copy_to_s3([local_output_path]) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, - content_length=os.path.getsize(local_output_path), ) + content_length=FileUtils.get_file_size(local_output_path), ) output_path = f"{output_path_prefix}/{config.modal_environment}/concat/outputs/{fn_id}/output.mp4" result = await ffmpeg_process(media_sources=medias, output_filepath=output_path) @@ -136,7 +136,7 @@ with ffmpeg_worker_image.imports(): media_markers=media_markers, output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], - content_length=os.path.getsize(segment[0])) for segment in segments] + content_length=FileUtils.get_file_size(segment[0])) for segment in segments] @SentryUtils.sentry_tracker(name="直播视频切割任务", op="ffmpeg.slice.stream", fn_id=fn_id, sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None, @@ -153,7 +153,7 @@ with ffmpeg_worker_image.imports(): media_markers=media_markers, output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], - content_length=os.path.getsize(segment[0])) for segment in segments] + content_length=FileUtils.get_file_size(segment[0])) for segment in segments] match media.protocol: case MediaProtocol.hls: @@ -202,7 +202,8 @@ with ffmpeg_worker_image.imports(): output_path = f"{output_path_prefix}/{config.modal_environment}/extract_audio/outputs/{fn_id}/output.wav" output_path, metadata = await VideoUtils.ffmpeg_extract_audio_async(cache_filepath, output_path) s3_outputs = local_copy_to_s3([output_path]) - return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(output_path), ) + return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, + content_length=FileUtils.get_file_size(output_path), ) result = await ffmpeg_process(media_source, fn_id=fn_id) return result, sentry_trace @@ -244,7 +245,7 @@ with ffmpeg_worker_image.imports(): s3_outputs = local_copy_to_s3([local_output_filepath]) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, - content_length=os.path.getsize(local_output_filepath), ) + content_length=FileUtils.get_file_size(local_output_filepath), ) result = await ffmpeg_process(media=media, mirror_scale_down_size=mirror_scale_down_size, func_id=fn_id, mirror_from_right=mirror_from_right, mirror_position=mirror_position) @@ -283,7 +284,7 @@ with ffmpeg_worker_image.imports(): overlay_gif_path=gif_filepath) s3_outputs = local_copy_to_s3([local_output_filepath]) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, - content_length=os.path.getsize(local_output_filepath), ) + content_length=FileUtils.get_file_size(local_output_filepath), ) result = await ffmpeg_process(media=media, func_id=fn_id, gif=gif) if not sentry_trace: @@ -322,7 +323,7 @@ with ffmpeg_worker_image.imports(): zoom=zoom) s3_outputs = local_copy_to_s3([local_output_filepath]) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, - content_length=os.path.getsize(local_output_filepath), ) + content_length=FileUtils.get_file_size(local_output_filepath), ) result = await ffmpeg_process(media=media, duration=duration, zoom=zoom, func_id=fn_id) if not sentry_trace: @@ -372,7 +373,7 @@ with ffmpeg_worker_image.imports(): s3_outputs = local_copy_to_s3([local_output_filepath]) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, - content_length=os.path.getsize(local_output_filepath), ) + content_length=FileUtils.get_file_size(local_output_filepath), ) result = await ffmpeg_process(video=media, bgm=bgm, video_volume=video_volume, func_id=fn_id, music_volume=music_volume, noise_sample=noise_sample) @@ -418,7 +419,8 @@ with ffmpeg_worker_image.imports(): subtitle_path=subtitle_path, font_dir=font_dir, output_path=output_path) s3_outputs = local_copy_to_s3([local_output]) - return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) + return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, + content_length=FileUtils.get_file_size(local_output), ) result = await ffmpeg_process(video=media, subtitle=subtitle, fonts=fonts, func_id=fn_id) if not sentry_trace: @@ -455,7 +457,8 @@ with ffmpeg_worker_image.imports(): audio_path=audio_path, output_path=f"{output_path_prefix}/{config.modal_environment}/loop_fill/{func_id}/output.mp4") s3_outputs = local_copy_to_s3([local_output]) - return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) + return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, + content_length=FileUtils.get_file_size(local_output), ) result = await ffmpeg_process(media=media, audio=audio, func_id=fn_id) @@ -500,7 +503,8 @@ with ffmpeg_worker_image.imports(): output_path=output_path) s3_outputs = local_copy_to_s3([local_output]) - return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) + return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, + content_length=FileUtils.get_file_size(local_output), ) result = await ffmpeg_process(media_stream=media, func_id=fn_id) @@ -546,7 +550,8 @@ with ffmpeg_worker_image.imports(): frame_index=frame_index, output_path=output_path) s3_outputs = local_copy_to_s3([local_output]) - return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) + return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, + content_length=FileUtils.get_file_size(local_output), ) result = await ffmpeg_process(media=media, frame_index=frame_index, func_id=fn_id) if not sentry_trace: @@ -566,10 +571,11 @@ with ffmpeg_worker_image.imports(): environment_name=config.modal_environment), ), }, ) - @modal.concurrent(max_inputs=1) + @modal.concurrent(max_inputs=5) async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int, webhook: Optional[WebhookNotify] = None, - sentry_trace: Optional[SentryTransactionInfo] = None, ): + sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[ + FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() output_dir = f"{config.modal_environment}/records/hls/{fn_id}" recording_output_dir = f"{s3_mount}/{output_dir}" @@ -588,8 +594,14 @@ with ffmpeg_worker_image.imports(): class PlaylistEventHandler(FileSystemEventHandler): update_counter: int = 0 + fn_id: str webhook: Optional[WebhookNotify] = None + def __init__(self, fn_id: str, webhook: Optional[WebhookNotify] = None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.webhook = webhook + self.fn_id = fn_id + def on_created(self, event: Union[FileCreatedEvent, DirCreatedEvent]) -> None: logger.info(f"[dir={event.is_directory}](created)@{event.src_path}") @@ -615,11 +627,17 @@ with ffmpeg_worker_image.imports(): f.write(playlist.dumps()) logger.info(f"live_playlist file written to {live_playlist}") self.update_counter += 1 - if self.webhook: - self.webhook_on_start() + if self.webhook and self.update_counter == 1: + logger.info("[Start] webhook trigger") + try: + self.webhook_on_start() + except Exception as e: + logger.exception(e) + else: + logger.info("[Start] no webhook") @backoff.on_exception(exception=Exception, wait_gen=backoff.constant, - max_time=5, max_tries=5, raise_on_giveup=False) + max_time=5, max_tries=5, raise_on_giveup=True) def webhook_on_start(self): """ 开始录制第一次更新时回调 @@ -627,12 +645,13 @@ with ffmpeg_worker_image.imports(): webhook = self.webhook if webhook.method is not WebhookMethodEnum.POST: logger.warning(f"webhook method {webhook.method.value} not supported") - fn_id = current_function_call_id() + body = BaseFFMPEGTaskStatusResponse(taskId=self.fn_id, + task_type="ffmpeg_stream_record_as_hls", + status=TaskStatus.running).model_dump() response = httpx.post(url=webhook.endpoint.__str__(), - json=BaseFFMPEGTaskStatusResponse(taskId=fn_id, - task_type="ffmpeg_stream_record_as_hls", - status=TaskStatus.running).model_dump_json(), + json=body, headers=webhook.headers) + response.raise_for_status() logger.info(f"[Start] webhook {response.status_code} {response.text}") async def ffmpeg_process(stream_url: str, @@ -650,33 +669,33 @@ with ffmpeg_worker_image.imports(): @backoff.on_exception(exception=Exception, wait_gen=backoff.constant, max_time=5, max_tries=5, raise_on_giveup=False) - async def webhook_on_end(webhook: WebhookNotify): + async def webhook_on_end(fn_id: str, webhook: WebhookNotify, result: FFMPEGResult): if webhook.method is not WebhookMethodEnum.POST: logger.warning(f"webhook method {webhook.method.value} not supported") - fn_id = current_function_call_id() + body = BaseFFMPEGTaskStatusResponse(taskId=fn_id, + task_type="ffmpeg_stream_record_as_hls", + status=TaskStatus.success, + results=[result]) response = httpx.post(url=webhook.endpoint.__str__(), - json=BaseFFMPEGTaskStatusResponse(taskId=fn_id, - task_type="ffmpeg_stream_record_as_hls", - status=TaskStatus.success).model_dump_json(), + json=body.model_dump(), headers=webhook.headers) logger.info(f"[End] webhook {response.status_code} {response.text}") @backoff.on_exception(exception=Exception, wait_gen=backoff.constant, max_time=5, max_tries=5, raise_on_giveup=False) - async def webhook_on_error(webhook: WebhookNotify, error: str): + async def webhook_on_error(fn_id: str, webhook: WebhookNotify, error: str): if webhook.method is not WebhookMethodEnum.POST: logger.warning(f"webhook method {webhook.method.value} not supported") - fn_id = current_function_call_id() + body = BaseFFMPEGTaskStatusResponse(taskId=fn_id, + task_type="ffmpeg_stream_record_as_hls", + error=error, + status=TaskStatus.failed).model_dump() response = httpx.post(url=webhook.endpoint.__str__(), - json=BaseFFMPEGTaskStatusResponse(taskId=fn_id, - task_type="ffmpeg_stream_record_as_hls", - error=error, - status=TaskStatus.failed).model_dump_json(), + json=body, headers=webhook.headers) logger.info(f"[End] webhook {response.status_code} {response.text}") - playlist_handler = PlaylistEventHandler() - playlist_handler.webhook = webhook + playlist_handler = PlaylistEventHandler(webhook=webhook, fn_id=fn_id) playlist_observer = Observer() os.makedirs(manifest_output_dir, exist_ok=True) playlist_observer.schedule(playlist_handler, @@ -695,8 +714,18 @@ with ffmpeg_worker_image.imports(): playlist_observer.stop() if webhook: await webhook_on_error(webhook=webhook, + fn_id=fn_id, error=e.message if hasattr(e, 'message') else str(e)) raise e playlist_observer.stop() + content_length = FileUtils.get_folder_size(recording_output_dir) + metadata = VideoUtils.ffprobe_media_metadata(media_path=f"{manifest_output_dir}/playlist.m3u8") + result = FFMPEGResult(urn=f"s3://{output_dir}/playlist.m3u8", + metadata=metadata, + content_length=content_length, ) if webhook: - await webhook_on_end(webhook=webhook) + await webhook_on_end(webhook=webhook, fn_id=fn_id, result=result) + if not sentry_trace: + sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), + x_baggage=sentry_sdk.get_baggage()) + return result, sentry_trace