From c69509b5b593cfc65226b35d02c8e68cbd3f290b Mon Sep 17 00:00:00 2001 From: "shuohigh@gmail.com" Date: Wed, 4 Jun 2025 12:37:13 +0800 Subject: [PATCH] =?UTF-8?q?-=20KVCache=E7=B1=BB=E6=94=B9=E4=B8=BA=E5=8F=AF?= =?UTF-8?q?=E6=8B=93=E5=B1=95=EF=BC=8C=E5=9F=BA=E4=BA=8E=E7=8E=AF=E5=A2=83?= =?UTF-8?q?=E5=8F=98=E9=87=8F=E8=AE=BE=E7=BD=AEKV=20space=20-=20test=20?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E9=85=8D=E7=BD=AE=E4=B8=8ECF=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=8E=AF=E5=A2=83=E5=AF=B9=E9=BD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .runtime.env | 6 +- src/BowongModalFunctions/api.py | 6 +- src/BowongModalFunctions/config.py | 3 + src/BowongModalFunctions/models/web_model.py | 11 ++ src/BowongModalFunctions/router/cache.py | 27 +++-- src/BowongModalFunctions/router/ffmpeg.py | 36 ++++-- src/BowongModalFunctions/utils/KVCache.py | 114 ++++++++++--------- src/BowongModalFunctions/utils/VideoUtils.py | 42 ++++++- src/cluster/app.py | 4 +- src/cluster/ffmpeg_app.py | 62 ++++++++-- src/cluster/video.py | 11 +- 11 files changed, 229 insertions(+), 93 deletions(-) diff --git a/.runtime.env b/.runtime.env index ef38cdc..6170374 100644 --- a/.runtime.env +++ b/.runtime.env @@ -1,6 +1,8 @@ -MODAL_ENVIRONMENT=prod +MODAL_ENVIRONMENT=test modal_app_name=bowong-ai-video S3_mount_dir=/mntS3 S3_bucket_name=modal-media-cache S3_region=ap-northeast-2 -S3_cdn_endpoint=https://d2nj71io21vkj2.cloudfront.net \ No newline at end of file +S3_cdn_endpoint=https://d2nj71io21vkj2.cloudfront.net +#CF_KV_namespace_id=f24d396e0daa418e89a1d7074b435c24 +CF_KV_namespace_id=527a61fea05543f2a49d62889ba868c5 \ No newline at end of file diff --git a/src/BowongModalFunctions/api.py b/src/BowongModalFunctions/api.py index f9d0ad1..4875fd1 100644 --- a/src/BowongModalFunctions/api.py +++ b/src/BowongModalFunctions/api.py @@ -5,7 +5,7 @@ from sentry_sdk.integrations.loguru import LoguruIntegration, LoggingLevels from sentry_sdk.integrations.fastapi import FastApiIntegration from fastapi.middleware.cors import CORSMiddleware -from .utils.KVCache import KVCache +from .utils.KVCache import MediaSourceKVCache from .router import ffmpeg, cache, comfyui, google, task from .config import WorkerConfig @@ -32,7 +32,9 @@ sentry_sdk.init(dsn="https://dab7b7ae652216282c89f029a76bb10a@sentry.bowongai.co FastApiIntegration() ] ) -modal_kv_cache = KVCache(kv_name=config.modal_kv_name, environment=config.modal_environment) +modal_kv_cache = MediaSourceKVCache(kv_name=config.modal_kv_name, + cf_kv_id=config.CF_KV_namespace_id, + environment=config.modal_environment, ) sentry_header_schema = { "x-trace-id": { diff --git a/src/BowongModalFunctions/config.py b/src/BowongModalFunctions/config.py index 1dfd7d0..f37298b 100644 --- a/src/BowongModalFunctions/config.py +++ b/src/BowongModalFunctions/config.py @@ -2,6 +2,7 @@ from typing import Optional, Any from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict + class WorkerConfig(BaseSettings): video_downloader_concurrency: int = Field(default=10, description="处理缓存任务的并行数") ffmpeg_worker_concurrency: int = Field(default=20, description="处理视频任务的并行数") @@ -14,6 +15,8 @@ class WorkerConfig(BaseSettings): S3_cdn_endpoint: str = Field(default="https://d2nj71io21vkj2.cloudfront.net", description="集群挂载S3存储桶的对应AWS Cloudfront CDN") + CF_KV_namespace_id: Optional[str] = Field(default=None, description="Cloudflare KV namespace ID") + modal_kv_name: str = Field(default='media-cache', description="Modal视频缓存KV库") modal_environment: str = Field(default="dev", description="Modal worker运行环境") modal_app_name: str = Field(default='bowong-ai-video', description="Modal App集群名称") diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index ab3120f..e493c8a 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -49,6 +49,10 @@ class ModalTaskResponse(BaseModel): success: bool = Field(description="任务接受成功") taskId: str = Field(description="任务Id") +class RecordingTaskResponse(BaseModel): + success: bool = Field(description="任务接受成功") + taskId: str = Field(description="任务Id") + manifest: str = Field(description="播放地址") class WebhookNotify(BaseModel): endpoint: HttpUrl = Field(description="Webhook回调端点", examples=["https://webhook.example.com?query=123"]) @@ -405,3 +409,10 @@ class FFMPEGResult(BaseModel): urn: str = Field(description="FFMPEG任务结果urn") content_length: int = Field(description="媒体资源文件字节大小(Byte)") metadata: VideoMetadata = Field(description="媒体元数据") + + +class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest): + stream_source: str = Field(description="直播源地址") + segment_duration: int = Field(default=5, description="hls片段时长(秒)") + recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)") + diff --git a/src/BowongModalFunctions/router/cache.py b/src/BowongModalFunctions/router/cache.py index 4d50f4f..4218ff9 100644 --- a/src/BowongModalFunctions/router/cache.py +++ b/src/BowongModalFunctions/router/cache.py @@ -20,13 +20,18 @@ from ..models.media_model import (MediaSources, UploadBase64Request ) from ..models.web_model import SentryTransactionInfo -from ..utils.KVCache import KVCache +from ..utils.KVCache import MediaSourceKVCache from ..utils.SentryUtils import SentryUtils config = WorkerConfig() router = APIRouter(prefix="/cache") -modal_kv_cache = KVCache(kv_name=config.modal_kv_name, environment=config.modal_environment) +if not config.CF_KV_namespace_id: + raise ValueError("未配置Cloudflare KV namespace ID") + +modal_kv_cache = MediaSourceKVCache(kv_name=config.modal_kv_name, + cf_kv_id=config.CF_KV_namespace_id, + environment=config.modal_environment) @router.post("/", @@ -91,10 +96,16 @@ async def cache(medias: MediaSources) -> CacheResult: async with asyncio.TaskGroup() as group: tasks = [group.create_task(cache_handler(media)) for media in medias.inputs] - cache_task_result = [task.result() for task in tasks] + cache_task_result_dict = {} + cache_task_result_list = [] - KVCache.batch_update_cloudflare_kv(cache_task_result) - return CacheResult(caches={media.urn: media for media in cache_task_result}) + for task in tasks: + result = task.result() + cache_task_result_dict[result.urn] = result.model_dump_json() + cache_task_result_list.append(result) + + modal_kv_cache.batch_update_cloudflare_kv(cache_task_result_dict) + return CacheResult(caches={media.urn: media for media in cache_task_result_list}) @router.delete("/", @@ -119,7 +130,7 @@ async def purge_media_kv_file(medias: MediaSources): tasks = [group.create_task(purge_handle(media)) for media in medias.inputs] keys = [task.result() for task in tasks] - KVCache.batch_remove_cloudflare_kv(keys) + modal_kv_cache.batch_remove_cloudflare_kv(keys) return JSONResponse(content={"success": True, "keys": keys}) @@ -172,7 +183,7 @@ async def purge_kv(medias: MediaSources): for media in medias.inputs: modal_kv_cache.pop(media.urn) keys = [media.urn for media in medias.inputs] - KVCache.batch_remove_cloudflare_kv(keys) + modal_kv_cache.batch_remove_cloudflare_kv(keys) return JSONResponse(content={"success": True, "keys": keys}) except Exception as e: return JSONResponse(content={"success": False, "error": str(e)}) @@ -201,7 +212,7 @@ async def purge_media(medias: MediaSources): tasks = [group.create_task(purge_handle(media)) for media in medias.inputs] keys = [task.result() for task in tasks] - KVCache.batch_remove_cloudflare_kv(keys) + modal_kv_cache.batch_remove_cloudflare_kv(keys) return JSONResponse(content={"success": True, "keys": keys}) diff --git a/src/BowongModalFunctions/router/ffmpeg.py b/src/BowongModalFunctions/router/ffmpeg.py index 8dbaa5d..7ba8b68 100644 --- a/src/BowongModalFunctions/router/ffmpeg.py +++ b/src/BowongModalFunctions/router/ffmpeg.py @@ -1,7 +1,7 @@ from typing import Annotated, cast import modal -from fastapi import APIRouter, Depends, Header +from fastapi import APIRouter, Depends, Header, UploadFile, File from fastapi.responses import Response from loguru import logger from starlette import status @@ -21,17 +21,7 @@ from ..models.web_model import (FFMPEGSliceRequest, SentryTransactionHeader, FFMPEGVideoLoopFillAudioRequest, FFMPEGConvertStreamRequest, FFMPEGExtractFrameRequest, - FFMPEGExtractFrameStatusResponse, - FFMPEGConvertStreamResponse, - FFMPEGSliceTaskStatusResponse, - FFMPEGConcatTaskStatusResponse, - FFMPEGExtractAudioTaskStatusResponse, - FFMPEGCornerMirrorTaskStatusResponse, - FFMPEGOverlayGifTaskStatusResponse, - FFMPEGSubtitleTaskStatusResponse, - FFMPEGZoomLoopTaskStatusResponse, - FFMPEGVideoLoopFillAudioResponse, - FFMPEGMixBgmWithNoiseReduceStatusResponse, BaseFFMPEGTaskStatusResponse) + BaseFFMPEGTaskStatusResponse, FFMPEGStreamRecordRequest, RecordingTaskResponse) config = WorkerConfig() @@ -209,3 +199,25 @@ async def video_extract_frame(body: FFMPEGExtractFrameRequest, sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) fn_call = fn.spawn(media=body.video, frame_index=body.frame_index, sentry_trace=sentry_trace, webhook=body.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) + + +@router.post("/record/hls", summary="发起直播录制为HLS任务", ) +async def stream_record_vod(body: FFMPEGStreamRecordRequest, + headers: Annotated[SentryTransactionHeader, Header()]) -> RecordingTaskResponse: + fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_stream_record_as_hls", + environment_name=config.modal_environment) + sentry_trace = None + if headers.x_trace_id and headers.x_baggage: + sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) + fn_call = fn.spawn(stream_url=body.stream_source, + segment_duration=body.segment_duration, + recording_timeout=body.recording_timeout, + sentry_trace=sentry_trace, + webhook=body.webhook) + manifest_link = f"{config.S3_cdn_endpoint}/{config.modal_environment}/records/hls/{fn_call.object_id}/playlist.m3u8" + return RecordingTaskResponse(success=True, taskId=fn_call.object_id, manifest=manifest_link) + + +@router.put('/record/hls/{fn_id}', summary="通过HTTP PUT请求更新正在录制中的playlist.m3u8", ) +async def stream_playlist_update(fn_id: str, playlist_file: Annotated[UploadFile, File(...)]): + raise NotImplementedError diff --git a/src/BowongModalFunctions/utils/KVCache.py b/src/BowongModalFunctions/utils/KVCache.py index 26c40db..5ecbe70 100644 --- a/src/BowongModalFunctions/utils/KVCache.py +++ b/src/BowongModalFunctions/utils/KVCache.py @@ -1,24 +1,75 @@ import os -from typing import Optional, List - +from typing import Optional, List, Dict import httpx import modal from loguru import logger +from .VideoUtils import VideoUtils +from ..models.media_model import MediaSource -from .VideoUtils import VideoMetadata, VideoUtils -from ..models.media_model import MediaSource, MediaProtocol -cf_account_id = os.environ.get("CF_ACCOUNT_ID") -cf_kv_api_token = os.environ.get("CF_KV_API_TOKEN") -cf_kv_namespace_id = os.environ.get("CF_KV_NAMESPACE_ID") +# cf_account_id = os.environ.get("CF_ACCOUNT_ID") +# cf_kv_api_token = os.environ.get("CF_KV_API_TOKEN") +# cf_kv_namespace_id = os.environ.get("CF_KV_NAMESPACE_ID") class KVCache: kv: modal.Dict + cf_kv_id: str + cf_account_id: str = os.environ.get("CF_ACCOUNT_ID") + cf_kv_api_token: str = os.environ.get("CF_KV_API_TOKEN") - def __init__(self, kv_name: str, environment: str): + def __init__(self, kv_name: str, cf_kv_id: str, environment: str): + self.cf_kv_id = cf_kv_id self.kv = modal.Dict.from_name(kv_name, environment_name=environment, create_if_missing=True) + def batch_update_cloudflare_kv(self, caches: Dict[str, str]): + with httpx.Client() as client: + try: + response = client.put( + f"https://api.cloudflare.com/client/v4/accounts/{self.cf_account_id}/storage/kv/namespaces/{self.cf_kv_id}/bulk", + headers={"Authorization": f"Bearer {self.cf_kv_api_token}"}, + json=[ + { + "based64": False, + "key": key, + "value": value, + } + for (key, value) in caches.items() + ] + ) + response.raise_for_status() + except httpx.RequestError as e: + logger.error(f"An error occurred while put kv to cloudflare") + raise e + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error occurred while get kv from cloudflare {str(e)}") + raise e + except Exception as e: + logger.error(f"An unexpected error occurred: {str(e)}") + raise e + + def batch_remove_cloudflare_kv(self, keys: List[str]): + with httpx.Client() as client: + try: + response = client.post( + f"https://api.cloudflare.com/client/v4/accounts/{self.cf_account_id}/storage/kv/namespaces/{self.cf_kv_id}/bulk/delete", + headers={"Authorization": f"Bearer {self.cf_kv_api_token}"}, + json=keys + ) + response.raise_for_status() + except httpx.RequestError as e: + logger.error(f"An error occurred while put kv to cloudflare") + raise e + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error occurred while get kv from cloudflare {str(e)}") + raise e + except Exception as e: + logger.error(f"An unexpected error occurred: {str(e)}") + raise e + + +class MediaSourceKVCache(KVCache): + def get_cache(self, urn: str) -> Optional[MediaSource]: cache_json = self.kv.get(urn) if not cache_json: @@ -52,50 +103,3 @@ class KVCache: raise KeyError("URN错误,资源不存在") return None return MediaSource.model_validate_json(cache_json) - - @staticmethod - def batch_update_cloudflare_kv(caches: List[MediaSource]): - with httpx.Client() as client: - try: - response = client.put( - f"https://api.cloudflare.com/client/v4/accounts/{cf_account_id}/storage/kv/namespaces/{cf_kv_namespace_id}/bulk", - headers={"Authorization": f"Bearer {cf_kv_api_token}"}, - json=[ - { - "based64": False, - "key": cache.urn, - "value": cache.model_dump_json(), - } - for cache in caches - ] - ) - response.raise_for_status() - except httpx.RequestError as e: - logger.error(f"An error occurred while put kv to cloudflare") - raise e - except httpx.HTTPStatusError as e: - logger.error(f"HTTP error occurred while get kv from cloudflare {str(e)}") - raise e - except Exception as e: - logger.error(f"An unexpected error occurred: {str(e)}") - raise e - - @staticmethod - def batch_remove_cloudflare_kv(keys: List[str]): - with httpx.Client() as client: - try: - response = client.post( - f"https://api.cloudflare.com/client/v4/accounts/{cf_account_id}/storage/kv/namespaces/{cf_kv_namespace_id}/bulk/delete", - headers={"Authorization": f"Bearer {cf_kv_api_token}"}, - json=keys - ) - response.raise_for_status() - except httpx.RequestError as e: - logger.error(f"An error occurred while put kv to cloudflare") - raise e - except httpx.HTTPStatusError as e: - logger.error(f"HTTP error occurred while get kv from cloudflare {str(e)}") - raise e - except Exception as e: - logger.error(f"An unexpected error occurred: {str(e)}") - raise e diff --git a/src/BowongModalFunctions/utils/VideoUtils.py b/src/BowongModalFunctions/utils/VideoUtils.py index 5654128..658f8bf 100644 --- a/src/BowongModalFunctions/utils/VideoUtils.py +++ b/src/BowongModalFunctions/utils/VideoUtils.py @@ -1,4 +1,4 @@ -from typing import Union, List, Tuple, Optional +from typing import Union, List, Tuple, Optional, Dict import numpy as np import json, os import math @@ -670,7 +670,7 @@ class VideoUtils: ] ffmpeg_cmd = VideoUtils.async_ffmpeg_init() ffmpeg_cmd.input(media_path) - ffmpeg_cmd.input(overlay_gif_path, stream_loop=-1) # 使用stream_loop让GIF循环直到视频结束 + ffmpeg_cmd.input(overlay_gif_path, stream_loop=-1) # 使用stream_loop让GIF循环直到视频结束 ffmpeg_cmd.output(output_path, options={"filter_complex": ";".join(filter_complex), }, map=["[v]", "0:a"], @@ -845,3 +845,41 @@ class VideoUtils: await ffmpeg_cmd.execute() image_metadata = VideoUtils.ffprobe_media_metadata(output_path) return output_path, image_metadata + + @staticmethod + async def ffmpeg_stream_record_as_hls(stream_url: str, + segments_output_dir: str, + playlist_output_dir: str, + playlist_output_method: Optional[str] = None, + playlist_output_headers: Optional[Dict[str, str]] = None, + manifest_segment_prefix: Optional[str] = None, + segment_duration: float = 5.0, + stream_content_timeout: int = 300, + output_file_pattern: str = "%10d.ts"): + os.makedirs(segments_output_dir, exist_ok=True) + ffmpeg_cmd = VideoUtils.async_ffmpeg_init() + ffmpeg_cmd.input(stream_url, + protocol_whitelist="file,http,https,tcp,tls", # 使用flv + reconnect="1", # 自动重连 + reconnect_at_eof="1", + reconnect_streamed="1", + reconnect_delay_max="5") + ffmpeg_cmd.output( + f"{playlist_output_dir}/playlist.m3u8", + f="hls", + method=playlist_output_method, + headers="\\r\\n".join([f"{header_key}:{playlist_output_headers[header_key]}" for header_key in + playlist_output_headers.keys()]) if playlist_output_headers else None, + hls_time=segment_duration, + hls_segment_filename=f"{segments_output_dir}/{output_file_pattern}", + hls_segment_type="mpegts", + hls_flags="append_list+independent_segments+program_date_time", + hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None, + hls_playlist_type="event", + hls_list_size=0, + hls_start_number_source="epoch_us", + timeout=stream_content_timeout, + c="copy", + ) + await ffmpeg_cmd.execute() + logger.info(f'停止录制') diff --git a/src/cluster/app.py b/src/cluster/app.py index 344e116..eaa291f 100644 --- a/src/cluster/app.py +++ b/src/cluster/app.py @@ -16,5 +16,5 @@ app = modal.App(config.modal_app_name, app.include(media_app) app.include(ffmpeg_app) app.include(web_app) -app.include(comfyui_v1_app) -app.include(comfyui_v2_app) +# app.include(comfyui_v1_app) +# app.include(comfyui_v2_app) diff --git a/src/cluster/ffmpeg_app.py b/src/cluster/ffmpeg_app.py index 26481cc..e47cbb7 100644 --- a/src/cluster/ffmpeg_app.py +++ b/src/cluster/ffmpeg_app.py @@ -44,7 +44,8 @@ with ffmpeg_worker_image.imports(): ) - @backoff.on_exception(wait_gen=backoff.constant, exception=Exception, max_tries=5, max_time=60, raise_on_giveup=True) + @backoff.on_exception(wait_gen=backoff.constant, exception=Exception, max_tries=5, max_time=60, + raise_on_giveup=True) def local_copy_to_s3(local_outputs: List[str]) -> List[str]: s3_outputs = [] for output in local_outputs: @@ -330,7 +331,7 @@ with ffmpeg_worker_image.imports(): async def ffmpeg_bgm_nosie_reduce(media: MediaSource, bgm: MediaSource, noise_sample: Optional[MediaSource] = None, video_volume: float = 1.4, music_volume: float = 0.1, sentry_trace: Optional[SentryTransactionInfo] = None, - webhook: Optional[WebhookNotify] = None)-> Tuple[ + webhook: Optional[WebhookNotify] = None) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() @@ -381,7 +382,7 @@ with ffmpeg_worker_image.imports(): @modal.concurrent(max_inputs=1) async def ffmpeg_subtitle_apply(media: MediaSource, subtitle: MediaSource, fonts: List[MediaSource], sentry_trace: Optional[SentryTransactionInfo] = None, - webhook: Optional[WebhookNotify] = None)-> Tuple[ + webhook: Optional[WebhookNotify] = None) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() @@ -425,7 +426,7 @@ with ffmpeg_worker_image.imports(): @modal.concurrent(max_inputs=1) async def ffmpeg_loop_fill(media: MediaSource, audio: MediaSource, sentry_trace: Optional[SentryTransactionInfo] = None, - webhook: Optional[WebhookNotify] = None)-> Tuple[ + webhook: Optional[WebhookNotify] = None) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() @@ -464,7 +465,7 @@ with ffmpeg_worker_image.imports(): @modal.concurrent(max_inputs=1) async def ffmpeg_convert_stream(media: MediaSource, sentry_trace: Optional[SentryTransactionInfo] = None, - webhook: Optional[WebhookNotify] = None)-> Tuple[ + webhook: Optional[WebhookNotify] = None) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() @@ -503,7 +504,7 @@ with ffmpeg_worker_image.imports(): @modal.concurrent(max_inputs=1) async def ffmpeg_extract_frame(media: MediaSource, frame_index: int, sentry_trace: Optional[SentryTransactionInfo] = None, - webhook: Optional[WebhookNotify] = None)-> Tuple[ + webhook: Optional[WebhookNotify] = None) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: fn_id = current_function_call_id() @@ -512,7 +513,15 @@ with ffmpeg_worker_image.imports(): sentry_baggage=sentry_trace.x_baggage if sentry_trace else None) @SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id) async def ffmpeg_process(media: MediaSource, frame_index: int, func_id: str) -> FFMPEGResult: - video_path = f"{s3_mount}/{media.cache_filepath}" + match media.protocol: + case MediaProtocol.hls: + video_path = f"{media.path}" + case MediaProtocol.s3: + video_path = f"{s3_mount}/{media.cache_filepath}" + case MediaProtocol.vod: + video_path = f"{s3_mount}/{media.cache_filepath}" + case _: + raise NotImplementedError(f"暂不支持此协议") output_path = f"{output_path_prefix}/{config.modal_environment}/extract_frame/{func_id}/output.jpg" local_output, metadata = await VideoUtils.ffmpeg_extract_frame_image(video_path=video_path, frame_index=frame_index, @@ -525,3 +534,42 @@ with ffmpeg_worker_image.imports(): sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), x_baggage=sentry_sdk.get_baggage()) return result, sentry_trace + + + @app.function(timeout=43200, # 最长处理12h的录制任务 + cloud="aws", + max_containers=config.ffmpeg_worker_concurrency, + volumes={ + s3_mount: modal.CloudBucketMount( + bucket_name=config.S3_bucket_name, + secret=modal.Secret.from_name("aws-s3-secret", + environment_name=config.modal_environment), + ), + }, ) + @modal.concurrent(max_inputs=1) + async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int, + webhook: Optional[WebhookNotify] = None, + sentry_trace: Optional[SentryTransactionInfo] = None, ): + fn_id = current_function_call_id() + output_dir = f"{config.modal_environment}/records/hls/{fn_id}" + recording_output_dir = f"{s3_mount}/{output_dir}" + manifest_segment_prefix = f"{config.S3_cdn_endpoint}/{recording_output_dir}" + + async def ffmpeg_process(stream_url: str, output_dir: Optional[str] = None, + manifest_segment_prefix: Optional[str] = None, + segment_duration: float = 5.0, + stream_content_timeout: int = 300, ): + await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url, + segment_duration=segment_duration, + stream_content_timeout=stream_content_timeout, + segments_output_dir=output_dir, + playlist_output_dir=output_dir, + manifest_segment_prefix=manifest_segment_prefix) + # todo : add monitor task for webhook on each segment finish writing io + # todo : handler local hls to single mp4 file transcode task after stream is finish recording + + await ffmpeg_process(stream_url=stream_url, + segment_duration=segment_duration, + stream_content_timeout=recording_timeout, + output_dir=recording_output_dir, + manifest_segment_prefix=manifest_segment_prefix, ) diff --git a/src/cluster/video.py b/src/cluster/video.py index 73083f4..5f7a0fd 100644 --- a/src/cluster/video.py +++ b/src/cluster/video.py @@ -4,6 +4,7 @@ from dotenv import dotenv_values downloader_image = ( modal.Image .debian_slim(python_version="3.11") + .apt_install('ffmpeg') .pip_install_from_pyproject("../pyproject.toml") .env(dotenv_values("../.runtime.env")) .add_local_python_source('cluster') @@ -33,7 +34,7 @@ with downloader_image.imports(): from tencentcloud.vod.v20180717 import models as vod_request_models from BowongModalFunctions.config import WorkerConfig - from BowongModalFunctions.utils.KVCache import KVCache + from BowongModalFunctions.utils.KVCache import MediaSourceKVCache from BowongModalFunctions.models.media_model import MediaSource, MediaCacheStatus, MediaProtocol from BowongModalFunctions.models.web_model import SentryTransactionInfo @@ -53,7 +54,9 @@ with downloader_image.imports(): cf_kv_api_token = os.environ.get("CF_KV_API_TOKEN") cf_kv_namespace_id = os.environ.get("CF_KV_NAMESPACE_ID") - modal_kv_cache = KVCache(kv_name=config.modal_kv_name, environment=config.modal_environment) + modal_kv_cache = MediaSourceKVCache(kv_name=config.modal_kv_name, + cf_kv_id=config.CF_KV_namespace_id, + environment=config.modal_environment) @sentry_sdk.trace @@ -272,6 +275,7 @@ with downloader_image.imports(): modal_kv.set_cache(media) batch_update_cloudflare_kv([media]) process_span.set_status("failed") + raise e case MediaProtocol.http: try: cache_filepath = f"{config.S3_mount_dir}/{media.protocol.value}/{media.cache_filepath}" @@ -283,6 +287,7 @@ with downloader_image.imports(): modal_kv.set_cache(media) batch_update_cloudflare_kv([media]) process_span.set_status("failed") + raise e case MediaProtocol.s3: # 本地挂载缓存 if media.protocol == MediaProtocol.s3 and media.endpoint == config.S3_region and media.bucket == config.S3_bucket_name: @@ -291,7 +296,7 @@ with downloader_image.imports(): logger.error("protocol not yet supported") case _: process_span.set_status("failed") - logger.error(f"protocol not yet supported") + raise NotImplementedError("protocol not yet supported") media.downloader_id = fn_id media.status = MediaCacheStatus.ready if volume_cache_path else MediaCacheStatus.failed media.progress = 1 if volume_cache_path else 0