import json from typing import Union, Any, Optional, List, Dict from loguru import logger from pydantic import BaseModel, Field, computed_field, field_validator, model_validator, ConfigDict, HttpUrl from pydantic.json_schema import JsonSchemaValue from datetime import timedelta, datetime class TimeDelta(timedelta): @classmethod def from_timedelta(cls, delta: timedelta) -> "TimeDelta": return cls(days=delta.days, seconds=delta.seconds, microseconds=delta.microseconds) @classmethod def from_format_string(cls, format_string: str) -> "TimeDelta": formated_time = datetime.strptime(format_string, "%H:%M:%S.%f") return cls(hours=formated_time.hour, minutes=formated_time.minute, seconds=formated_time.second, microseconds=formated_time.microsecond) def toFormatStr(self) -> str: return (datetime(year=2000, month=1, day=1) + self).strftime("%H:%M:%S.%f")[:-3] class FFMpegSliceSegment(BaseModel): start: TimeDelta = Field( description="视频切割的开始时间点秒数, 可为浮点小数(精确到小数点后3位,毫秒级)或者为标准格式的时间戳") end: TimeDelta = Field( description="视频切割的结束时间点秒数, 可为浮点小数(精确到小数点后3位,毫秒级)或者标准格式的时间戳") @computed_field @property def duration(self) -> TimeDelta: return self.end - self.start @field_validator('start', mode='before') @classmethod def parse_start(cls, v: Union[float, str, TimeDelta]): if isinstance(v, float): if v < 0.0: raise ValueError("开始时间点不能小于0") return TimeDelta(seconds=v) elif isinstance(v, int): if v < 0: raise ValueError("开始时间点不能小于0") return TimeDelta(seconds=v) elif isinstance(v, str): timedelta = TimeDelta.from_format_string(v) if timedelta.total_seconds() < 0.0: raise ValueError("开始时间点不能小于0") return timedelta elif isinstance(v, TimeDelta): if v.total_seconds() < 0.0: raise ValueError("开始时间点不能小于0") return v else: raise TypeError(v) @field_validator('end', mode='before') @classmethod def parse_end(cls, v: Union[float, TimeDelta]): if isinstance(v, float): if v < 0.0: raise ValueError("结束时间点不能小于0") return TimeDelta(seconds=v) elif isinstance(v, int): if v < 0: raise ValueError("结束时间点不能小于0") return TimeDelta(seconds=v) elif isinstance(v, str): timedelta = TimeDelta.from_format_string(v) if timedelta.total_seconds() < 0.0: raise ValueError("结束时间点不能小于0") return timedelta elif isinstance(v, TimeDelta): if v.total_seconds() < 0.0: raise ValueError("结束时间点不能小于0") return v else: raise TypeError(v) @model_validator(mode='after') def validate_end_after_start(self) -> 'FFMpegSliceSegment': if self.end <= self.start: raise ValueError("end time must be greater than start time") return self @classmethod def __get_pydantic_json_schema__(cls, core_schema: Any, handler: Any) -> JsonSchemaValue: # Override the schema to represent it as a string return { "type": "object", "properties": { "start": { "type": "number", "examples": [5, 10.5, '00:00:10.500'], }, "end": { "type": "number", "examples": [8, 12.5, '00:00:12.500'], } }, "required": [ "start", "end" ] } model_config = { "arbitrary_types_allowed": True } class FFMPEGSliceOptions(BaseModel): limit_size: Optional[int] = Field(default=None, description="不超过指定文件(字节)大小, 默认为空不限制输出大小") bit_rate: Optional[int] = Field(default=None, description="指定输出视频的比特率, 不能与limit_size同时设置") crf: int = Field(default=16, description="输出视频的质量") fps: int = Field(default=30, description="输出视频的FPS") width: int = Field(default=None, description="输出视频的宽(像素)") height: int = Field(default=None, description="输出视频的高(像素)") @computed_field(description="解析为字符表达式的文件大小") @property def pretty_limit_size(self) -> Optional[str]: if self.limit_size is not None: for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if self.limit_size < 1024.0: return f"{self.limit_size:.2f}{unit}" self.limit_size /= 1024.0 return f"{self.limit_size:.2f}PB" else: return None @computed_field(description="解析为字符表达式的比特率") @property def pretty_bit_rate(self) -> Optional[str]: if self.bit_rate is not None: return f"{self.bit_rate}k" class MediaStream(BaseModel): duration: float = Field(0, description="时长") codec_name: str tags: Optional[Any] = Field(None) model_config = ConfigDict(extra='allow') class HLSMediaVideoStream(BaseModel): stream_type: str = "video" codec_name: str codec_type: str width: int height: int avg_frame_rate: str tags: Optional[Any] = Field(None) duration: Optional[float] = Field(None) @computed_field @property def video_frame_rate(self) -> float: numerator, denominator = map(int, self.avg_frame_rate.split('/')) if denominator != 0: return numerator / denominator return 0 class HLSMediaAudioStream(BaseModel): stream_type: str = "audio" sample_rate: str channels: int channel_layout: str start_time: str tags: Optional[Any] = Field(None) class AudioStream(MediaStream): stream_type: str = Field("audio") codec_type: str sample_rate: str channels: int tags: Optional[Any] = Field(None) model_config = ConfigDict(extra='allow') class VideoStream(MediaStream): stream_type: str = "video" width: int height: int bit_rate: int avg_frame_rate: str @computed_field @property def video_bitrate(self) -> str: return str(int(self.bit_rate / 1000)) + 'k' @computed_field @property def video_frame_rate(self) -> float: numerator, denominator = map(int, self.avg_frame_rate.split('/')) if denominator != 0: return numerator / denominator return 0 model_config = ConfigDict(extra='allow') class ImageStream(MediaStream): stream_type: str = "image" width: int height: int model_config = ConfigDict(extra='allow') class SubtitleStreamTags(BaseModel): language: str = Field(description="内嵌字幕的语言") model_config = ConfigDict(extra='allow') class SubtitleStream(MediaStream): stream_type: str = "subtitle" tags: SubtitleStreamTags model_config = ConfigDict(extra='allow') class VideoFormat(BaseModel): filename: str format_name: str start_time: float = Field(0, description="起始时间") size: int bit_rate: Optional[int] = Field(None, description="文件比特率") duration: float = Field(3600 * 12, description="文件时长") class VideoMetadata(BaseModel): streams: List[ Union[ImageStream, AudioStream, VideoStream, HLSMediaAudioStream, HLSMediaVideoStream, SubtitleStream]] = Field( description="媒体包含的数据轨道") format: Optional[VideoFormat] = Field(None) @field_validator('streams', mode='before') def parse_streams(cls, value): streams = [] if isinstance(value, List): for stream in value: if isinstance(stream, Dict): logger.info(f"Parsing stream : {json.dumps(stream, ensure_ascii=False)}") if stream.get("codec_type") == 'audio': if stream.get("duration") is None: logger.info("Parsing audio stream") hls_audio = HLSMediaAudioStream.model_validate(stream) streams.append(hls_audio) else: logger.info("Parsing hls audio stream") audio = AudioStream.model_validate(stream) streams.append(audio) elif stream.get("codec_type") == 'video': if stream.get("codec_name") in ("gif", "png", "mjpg", "mjpeg", "webp"): logger.info("Parsing image stream") image = ImageStream.model_validate(stream) streams.append(image) else: if stream.get("duration") is None: logger.info("Parsing hls video stream") hls_video = HLSMediaVideoStream.model_validate(stream) streams.append(hls_video) else: logger.info("Parsing video stream") video = VideoStream.model_validate(stream) streams.append(video) elif stream.get("codec_type") == 'subtitle': logger.info("Parsing subtitle stream") subtitle_stream = SubtitleStream.model_validate(stream) streams.append(subtitle_stream) return streams else: raise TypeError model_config = ConfigDict(extra='allow') class SentryTransactionHeader(BaseModel): x_trace_id: Optional[str] = Field(description="Sentry Transaction ID", default=None) x_baggage: Optional[str] = Field(description="Sentry Transaction baggage", default=None) class SentryTransactionInfo(BaseModel): x_trace_id: str = Field(description="Sentry Transaction ID") x_baggage: str = Field(description="Sentry Transaction baggage") class FFMPEGResult(BaseModel): urn: str = Field(description="FFMPEG任务结果urn") content_length: int = Field(description="媒体资源文件字节大小(Byte)") metadata: VideoMetadata = Field(description="媒体元数据")