MediaSource类下新增Metadata字段,用来附带缓存状态为ready时视频的元数据

This commit is contained in:
shuohigh@gmail.com 2025-05-26 10:19:16 +08:00
parent 008626e0df
commit ec455ed3e9
4 changed files with 45 additions and 11 deletions

View File

@ -11,11 +11,13 @@ from pydantic import (BaseModel, Field, field_validator, ValidationError,
from pydantic.json_schema import JsonSchemaValue from pydantic.json_schema import JsonSchemaValue
from ..config import WorkerConfig from ..config import WorkerConfig
from ..utils.TimeUtils import TimeDelta from ..utils.TimeUtils import TimeDelta
from ..utils.VideoUtils import VideoMetadata
config = WorkerConfig() config = WorkerConfig()
s3_region = config.S3_region s3_region = config.S3_region
s3_bucket_name = config.S3_bucket_name s3_bucket_name = config.S3_bucket_name
s3_mount_point = config.S3_mount_dir
class MediaProtocol(str, Enum): class MediaProtocol(str, Enum):
@ -45,6 +47,8 @@ class MediaSource(BaseModel):
expired_at: Optional[datetime] = Field(description="缓存过期时间点, 为None时不会过期过期处理WIP", default=None) expired_at: Optional[datetime] = Field(description="缓存过期时间点, 为None时不会过期过期处理WIP", default=None)
downloader_id: Optional[str] = Field(description="正在处理下载的Downloader ID", default=None) downloader_id: Optional[str] = Field(description="正在处理下载的Downloader ID", default=None)
progress: int = Field(description="缓存进度", default=0) progress: int = Field(description="缓存进度", default=0)
metadata: Optional[VideoMetadata] = Field(description="媒体元数据", default=None)
content_length: Optional[int] = Field(description="媒体文件大小", default=None)
@classmethod @classmethod
def from_str(cls, media_url: str) -> 'MediaSource': def from_str(cls, media_url: str) -> 'MediaSource':
@ -148,6 +152,18 @@ class MediaSource(BaseModel):
def file_extension(self) -> Optional[str]: def file_extension(self) -> Optional[str]:
return os.path.basename(self.urn).split('.')[-1] return os.path.basename(self.urn).split('.')[-1]
@computed_field(description="是否本地可用")
@property
def local_available(self) -> bool:
if self.status == MediaCacheStatus.ready:
return os.path.exists(self.local_mount_path)
return False
@computed_field(description="本地挂载地址")
@property
def local_mount_path(self) -> str:
return f"{s3_mount_point}/{self.cache_filepath}"
@field_serializer('expired_at') @field_serializer('expired_at')
def serialize_datetime(self, value: Optional[datetime], info: SerializationInfo) -> Optional[str]: def serialize_datetime(self, value: Optional[datetime], info: SerializationInfo) -> Optional[str]:
if value: if value:

View File

@ -4,7 +4,10 @@ from typing import Optional, List
import httpx import httpx
import modal import modal
from loguru import logger from loguru import logger
from ..models.media_model import MediaSource
from .VideoUtils import VideoMetadata, VideoUtils
from ..models.media_model import MediaSource, MediaProtocol
cf_account_id = os.environ.get("CF_ACCOUNT_ID") cf_account_id = os.environ.get("CF_ACCOUNT_ID")
cf_kv_api_token = os.environ.get("CF_KV_API_TOKEN") cf_kv_api_token = os.environ.get("CF_KV_API_TOKEN")
cf_kv_namespace_id = os.environ.get("CF_KV_NAMESPACE_ID") cf_kv_namespace_id = os.environ.get("CF_KV_NAMESPACE_ID")
@ -20,9 +23,22 @@ class KVCache:
cache_json = self.kv.get(urn) cache_json = self.kv.get(urn)
if not cache_json: if not cache_json:
return None return None
return MediaSource.model_validate_json(cache_json) media = MediaSource.model_validate_json(cache_json)
if media.local_available:
try:
media.metadata = VideoUtils.ffprobe_media_metadata(media.local_mount_path)
media.content_length = os.path.getsize(media.local_mount_path)
except Exception as e:
logger.exception(e)
return media
def set_cache(self, media: MediaSource): def set_cache(self, media: MediaSource):
if media.local_available:
try:
media.metadata = VideoUtils.ffprobe_media_metadata(media.local_mount_path)
media.content_length = os.path.getsize(media.local_mount_path)
except Exception as e:
logger.exception(e)
cache_json = media.model_dump_json() cache_json = media.model_dump_json()
self.kv.put(media.urn, cache_json) self.kv.put(media.urn, cache_json)

View File

@ -28,7 +28,9 @@ class SentryUtils:
logger.info(f"sentry-trace={sentry_trace_id}, baggage={sentry_baggage}") logger.info(f"sentry-trace={sentry_trace_id}, baggage={sentry_baggage}")
if sentry_trace_id and sentry_baggage: if sentry_trace_id and sentry_baggage:
transaction = sentry_sdk.continue_trace(environ_or_headers={"sentry-trace": sentry_trace_id, transaction = sentry_sdk.continue_trace(environ_or_headers={"sentry-trace": sentry_trace_id,
"baggage": sentry_baggage, }) "baggage": sentry_baggage, },
op=op,
name=name)
else: else:
transaction = sentry_sdk.start_transaction(op='modal.function', name='Modal Function直接调用') transaction = sentry_sdk.start_transaction(op='modal.function', name='Modal Function直接调用')
with transaction: with transaction:

View File

@ -306,8 +306,7 @@ class VideoUtils:
ffmpeg_cmd = VideoUtils.async_ffmpeg_init() ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
ffmpeg_cmd.input(media_path) ffmpeg_cmd.input(media_path)
filter_complex: List[str] = [] filter_complex: List[str] = []
outputs: List[Tuple[str, VideoMetadata]] = [] temp_outputs: List[str] = []
if not output_path: if not output_path:
output_path = FileUtils.file_path_extend(media_path, "slice") output_path = FileUtils.file_path_extend(media_path, "slice")
os.makedirs(os.path.dirname(output_path), exist_ok=True) os.makedirs(os.path.dirname(output_path), exist_ok=True)
@ -341,10 +340,11 @@ class VideoUtils:
crf=16, crf=16,
r=30, r=30,
) )
video_metadata = VideoUtils.ffprobe_media_metadata(segment_output_path) temp_outputs.append(segment_output_path)
outputs.append((segment_output_path, video_metadata))
await ffmpeg_cmd.execute() await ffmpeg_cmd.execute()
outputs: List[Tuple[str, VideoMetadata]] = [(output, VideoUtils.ffprobe_media_metadata(output)) for output in
temp_outputs]
return outputs return outputs
@staticmethod @staticmethod
@ -369,8 +369,8 @@ class VideoUtils:
reconnect_streamed="1", reconnect_streamed="1",
reconnect_delay_max="5") reconnect_delay_max="5")
filter_complex: List[str] = [] filter_complex: List[str] = []
outputs: List[Tuple[str, VideoMetadata]] = []
temp_outputs: List[str] = []
if not output_path: if not output_path:
output_path = FileUtils.file_path_extend(media_path, "slice") output_path = FileUtils.file_path_extend(media_path, "slice")
os.makedirs(os.path.dirname(output_path), exist_ok=True) os.makedirs(os.path.dirname(output_path), exist_ok=True)
@ -402,10 +402,10 @@ class VideoUtils:
acodec="aac", acodec="aac",
crf=16, crf=16,
r=30, ) r=30, )
video_metadata = VideoUtils.ffprobe_media_metadata(output_filepath) temp_outputs.append(output_filepath)
outputs.append((output_filepath, video_metadata))
await ffmpeg_cmd.execute() await ffmpeg_cmd.execute()
outputs: List[Tuple[str, VideoMetadata]] = [(output, VideoUtils.ffprobe_media_metadata(output)) for output in
temp_outputs]
return outputs return outputs
@staticmethod @staticmethod