diff --git a/.runtime.env b/.runtime.env index f766917..a31361e 100644 --- a/.runtime.env +++ b/.runtime.env @@ -1,4 +1,4 @@ -MODAL_ENVIRONMENT=prod +MODAL_ENVIRONMENT=test modal_app_name=bowong-ai-video S3_mount_dir=/mntS3 S3_bucket_name=modal-media-cache diff --git a/src/BowongModalFunctions/models/media_model.py b/src/BowongModalFunctions/models/media_model.py index 23783f7..9224c78 100644 --- a/src/BowongModalFunctions/models/media_model.py +++ b/src/BowongModalFunctions/models/media_model.py @@ -154,9 +154,14 @@ class MediaSource(BaseModel): @property def local_available(self) -> bool: if self.status == MediaCacheStatus.ready: - return os.path.exists(self.local_mount_path) + return self.local_exists return False + @computed_field(description="是否存在本地文件") + @property + def local_exists(self) -> bool: + return os.path.exists(self.local_mount_path) + @computed_field(description="本地挂载地址") @property def local_mount_path(self) -> str: diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index b61eeae..8d95922 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -56,6 +56,15 @@ class ModalTaskResponse(BaseModel): taskId: str = Field(description="任务Id") +class CacheDeleteTaskResponse(BaseModel): + success: bool = Field(description="运行成功") + keys: List[str] = Field(description="成功从KV和S3删除掉的URN") + non_kv_keys: List[str] = Field(alias="nonKVKeys", serialization_alias="nonKVKeys", + description="成功从S3删除的URN, 不存在于KV中") + not_found_keys: List[str] = Field(alias="notFoundKeys", serialization_alias="notFoundKeys", + description="不存在的URN") + + class RecordingTaskResponse(BaseModel): success: bool = Field(description="任务接受成功") taskId: str = Field(description="任务Id") diff --git a/src/BowongModalFunctions/router/cache.py b/src/BowongModalFunctions/router/cache.py index 0da8151..02308cb 100644 --- a/src/BowongModalFunctions/router/cache.py +++ b/src/BowongModalFunctions/router/cache.py @@ -1,7 +1,7 @@ import asyncio import datetime import os -from typing import Annotated, Optional +from typing import Annotated, Optional, List, Tuple import modal from loguru import logger @@ -22,10 +22,10 @@ from ..models.media_model import (MediaSources, DownloadResult, UploadResultResponse, UploadBase64Request, UploadPresignRequest, UploadPresignResponse, - UploadMultipartPresignRequest, UploadMultipartPresignResponse + UploadMultipartPresignRequest, UploadMultipartPresignResponse, MediaProtocol ) from ..models.web_model import SentryTransactionInfo, MonitorLiveRoomProductRequest, ModalTaskResponse, \ - LiveRoomProductCachesResponse + LiveRoomProductCachesResponse, CacheDeleteTaskResponse from ..utils.KVCache import MediaSourceKVCache, LiveProductKVCache from ..utils.SentryUtils import SentryUtils @@ -127,29 +127,55 @@ async def cache(medias: MediaSources) -> CacheResult: summary="清除指定的所有缓存", description="清除指定的所有缓存(包括KV记录和S3存储文件)", dependencies=[Depends(verify_token)]) -async def purge_media_kv_file(medias: MediaSources): +async def purge_media_kv_file(medias: MediaSources) -> CacheDeleteTaskResponse: fn_id = current_function_call_id() fn = modal.Function.from_name(config.modal_app_name, "cache_delete", environment_name=config.modal_environment) @SentryUtils.sentry_tracker(name="清除媒体源缓存", op="cache.purge", fn_id=fn_id, sentry_trace_id=None, sentry_baggage=None) - async def purge_handle(media: MediaSource): - cache_media = modal_kv_cache.pop(media.urn) - if cache_media: - deleted_cache: MediaSource = await fn.remote.aio(cache_media) - return deleted_cache.urn - return None + async def purge_handle(media: MediaSource) -> Tuple[Optional[str], int]: + try: + cache_media = modal_kv_cache.pop(media.urn) + if cache_media: + deleted_cache: MediaSource = await fn.remote.aio(cache_media) + return deleted_cache.urn, 1 + except KeyError as e: + logger.exception(e) + if media.local_exists: + deleted_cache: MediaSource = await fn.remote.aio(media) + if deleted_cache.status == MediaCacheStatus.missing: + logger.warning(f"不存在s3挂载文件 {deleted_cache.urn}") + return None, 0 + else: + logger.warning(f"s3挂载文件 {deleted_cache.urn} 已删除") + return deleted_cache.urn, 0 + return media.urn, -1 async with asyncio.TaskGroup() as group: tasks = [group.create_task(purge_handle(media)) for media in medias.inputs] - keys = [task.result() for task in tasks] + keys: List[str] = [] + non_kv_keys: List[str] = [] + error_keys: List[str] = [] + + for task in tasks: + urn, task_status = task.result() + if urn: + if task_status == 1: # 成功从kv和s3删除 + keys.append(urn) + elif task_status == 2: # 只从s3删除 + non_kv_keys.append(urn) + else: + error_keys.append(urn) + # keys = [task.result() for task in tasks] modal_kv_cache.batch_remove_cloudflare_kv(keys) - return JSONResponse(content={"success": True, "keys": keys}) + return CacheDeleteTaskResponse(success=True, keys=keys, nonKVKeys=non_kv_keys, notFoundKeys=error_keys) + # return JSONResponse(content={"success": True, "keys": keys, "nonKVKeys": non_kv_keys, "errorKeys": error_keys}) @router.post("/download", - summary="批量获取下载地址", + summary="批量获取下载地址, 返回的MediaSource类自带CDN访问URL, 不需要另外请求获取", + deprecated=True, description="获取已缓存的视频下载地址", dependencies=[Depends(verify_token)]) @sentry_sdk.trace @@ -163,7 +189,8 @@ async def download_caches(medias: MediaSources) -> DownloadResult: @router.get("/download", summary="下载已缓存的视频", - description="通过CDN下载已缓存的视频文件") + deprecated=True, + description="通过CDN下载已缓存的视频文件, 不在提供通过此接口下载,请使用CDN URL直接下载文件") @sentry_sdk.trace async def download_cache(media: str) -> RedirectResponse: cdn_endpoint = config.S3_cdn_endpoint @@ -199,32 +226,6 @@ async def purge_kv(medias: MediaSources): return JSONResponse(content={"success": False, "error": str(e)}) -@router.post("/media", - summary="清除指定的所有缓存", - description="清除指定的所有缓存(包括KV记录和S3存储文件), 将要被淘汰,使用DELETE /cache/替代", - deprecated=True, - dependencies=[Depends(verify_token)]) -async def purge_media(medias: MediaSources): - fn_id = current_function_call_id() - fn = modal.Function.from_name(config.modal_app_name, "cache_delete", environment_name=config.modal_environment) - - @SentryUtils.sentry_tracker(name="清除媒体源缓存", op="cache.purge", fn_id=fn_id, - sentry_trace_id=None, sentry_baggage=None) - async def purge_handle(media: MediaSource): - cache_media = modal_kv_cache.pop(media.urn) - if cache_media: - deleted_cache: MediaSource = await fn.remote.aio(cache_media) - return deleted_cache.urn - return None - - async with asyncio.TaskGroup() as group: - tasks = [group.create_task(purge_handle(media)) for media in medias.inputs] - - keys = [task.result() for task in tasks] - modal_kv_cache.batch_remove_cloudflare_kv(keys) - return JSONResponse(content={"success": True, "keys": keys}) - - @router.post("/upload-s3", summary="上传文件到S3", description="上传文件到S3的文件必须小于200M",