modalDeploy/src/BowongModalFunctions/models/requests/models.py

517 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import uuid
from typing import List, Union, Optional, Dict
import pydantic
from pydantic import BaseModel, Field, ConfigDict, field_validator, ValidationError, model_validator, computed_field, \
EmailStr
from ..cache_tasks.models import CacheTask, MediaSource, Base64File
from ..enums.models import MediaProtocol
from ..ffmpeg_tasks.models import WebhookNotify, FFMpegSliceSegment, FFMPEGSliceOptions
from ...utils.TimeUtils import TimeDelta
class ClusterCacheBatchRequest(BaseModel):
tasks: List[CacheTask] = Field(description="批量操作任务,按列表顺序执行")
model_config = ConfigDict()
class MediaSourcesRequest(BaseModel):
inputs: List[MediaSource] = Field(examples=[
[
"vod://ap-shanghai/1500034234/1397757910405340824.mp4",
"vod://ap-shanghai/1500034234/1397757910403699452.mp4",
"s3://ap-northeast-2/modal-media-cache/concat/outputs/fc-01JTPV5FCNA74CKX3N3214XJPD/output.mp4"
]
], description="支持多种协议['vod://', 's3://'], 计划支持['cos://', http://]")
@field_validator('inputs', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]) -> List[MediaSource]:
if not v:
raise ValidationError([
{
'loc': ('inputs',),
'msg': "inputs为空",
'type': 'value_error',
'input': v
}
], MediaSourcesRequest)
result = []
for item in v:
if isinstance(item, str):
result.append(MediaSource.from_str(item))
elif isinstance(item, MediaSource):
result.append(item)
else:
raise ValidationError([
{
'loc': ('inputs',),
'msg': "inputs元素类型错误: 必须是字符串",
'type': 'value_error',
'input': v
}
], MediaSourcesRequest)
return result
model_config = {
"arbitrary_types_allowed": True
}
class UploadBase64Request(BaseModel):
file: Base64File = Field(description="上传的文件")
prefix: Optional[str] = Field(description="文件存在的前缀目录", default=None)
class UploadPresignRequest(BaseModel):
key: str = Field(description="上传文件的key", examples=['123/456/abc.mp4'])
content_type: str = Field(description="上传对象的文件类型", examples=['video/mp4'])
class UploadMultipartPresignRequest(UploadPresignRequest):
parts_count: int = Field(description="分片数量")
class MediaCopyRequest(BaseModel):
class MediaCopyTask(BaseModel):
source: MediaSource = Field(description="源媒体")
destination: MediaSource = Field(description="")
class FFMPEGSliceTaskStatusRequest(BaseModel):
taskId: str = Field(description="任务Id")
class BaseFFMPEGTaskRequest(BaseModel):
webhook: Optional[WebhookNotify] = Field(description="Task webhook", default=None)
class FFMPEGSliceRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="待切割的媒体源")
markers: List[FFMpegSliceSegment] = Field(description="按照时间顺序排序过的切割标记数组")
options: FFMPEGSliceOptions = Field(default_factory=FFMPEGSliceOptions, description="输出质量选项")
@field_validator('media', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGConcatRequest(BaseFFMPEGTaskRequest):
medias: MediaSourcesRequest = Field(description="待合并的媒体源")
class FFMPEGExtractAudioRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="待提取音频的媒体源")
@field_validator('media', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGCornerMirrorRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="需要处理的媒体源")
@field_validator('media', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGBgmRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="需要处理的媒体源")
bgm_media: MediaSource = Field(description="添加的BGM媒体源", alias="bgmMedia")
@field_validator('media', mode='before')
@classmethod
def parse_media_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('bgm_media', mode='before')
@classmethod
def parse_bgm_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
media = MediaSource.from_str(v)
if not media.file_extension in ['wav', 'mp3']:
raise pydantic.ValidationError("必须使用符合规范的音频格式, 如wav或者mp3")
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGZoomLoopRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="需要处理的媒体源")
duration: float = Field(description="放大缩小一个循环的持续时间秒数", default=6.0)
zoom: float = Field(description="放大缩小系数", default=0.1)
@field_validator('media', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGOverlayGifRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="需要处理的媒体源")
gif: MediaSource = Field(description="叠加的特效gif")
@field_validator('media', mode='before')
@classmethod
def parse_media_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(f"解析{type(v)}类型不支持")
@field_validator('gif', mode='before')
@classmethod
def parse_fig_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
media = MediaSource.from_str(v)
if not media.file_extension == 'gif':
raise pydantic.ValidationError("必须使用.gif文件")
return media
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGSubtitleOverlayRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="需要处理的媒体源")
subtitle: Optional[MediaSource] = Field(default=None, description="需要叠加的字幕文件")
embedded_subtitle: Optional[MediaSource] = Field(default=None, description="需要内嵌的字幕文件")
fonts: Optional[List[MediaSource]] = Field(default=None, description="字幕文件内使用到的字体文件")
@field_validator('media', mode='before')
@classmethod
def parse_media_input(cls, v: Union[str, MediaSource]) -> MediaSource:
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('subtitle', mode='before')
@classmethod
def parse_subtitle_input(cls, v: Union[str, MediaSource]) -> Optional[MediaSource]:
if not v:
return None
if isinstance(v, str):
media = MediaSource.from_str(v)
if not media.file_extension in ['ass']:
raise pydantic.ValidationError("必须使用标准字幕文件, 如.ass")
return media
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('embedded_subtitle', mode='before')
@classmethod
def parse_embedded_subtitle_input(cls, v: Union[str, MediaSource]) -> Optional[MediaSource]:
if not v:
return None
if isinstance(v, str):
media = MediaSource.from_str(v)
if not media.file_extension in ['vtt', 'srt', 'ass']:
raise pydantic.ValidationError("必须使用标准字幕文件, 如.vtt/.srt/.ass")
return media
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('fonts', mode='before')
@classmethod
def parse_font_input(cls, v: Union[str, MediaSource]) -> Optional[List[MediaSource]]:
if not v:
return None
result = []
for item in v:
if isinstance(item, str):
result.append(MediaSource.from_str(item))
elif isinstance(item, MediaSource):
result.append(item)
else:
raise pydantic.ValidationError("fonts元素类型错误: 必须是字符串")
return result
@model_validator(mode='after')
def check_at_least_one(self):
if self.subtitle is None and self.embedded_subtitle is None:
raise pydantic.ValidationError("至少需要提供一个有效字幕")
if self.subtitle is not None and self.fonts is None:
raise pydantic.ValidationError("使用叠加字幕时需要指定使用的字体文件")
return self
class FFMPEGMixBgmWithNoiseReduceRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="需要处理的媒体源")
bgm: MediaSource = Field(description="需要添加的BGM媒体源")
video_volume: float = Field(description="最终输出的视频音量系数", default=1.4)
music_volume: float = Field(description="最终输出的BGM音量系数", default=0.1)
noise_sample: Optional[MediaSource] = Field(description="常考噪音样本如不指定将使用源视频的前2秒作为样本",
default=None)
@field_validator('media', mode='before')
@classmethod
def parse_media_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('bgm', mode='before')
@classmethod
def parse_bgm_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('noise_sample', mode='before')
@classmethod
def parse_sample_input(cls, v: Union[None, str, MediaSource]):
if not v:
return None
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGVideoLoopFillAudioRequest(BaseFFMPEGTaskRequest):
video: MediaSource = Field(description="用来填充的视频素材")
audio: MediaSource = Field(description="被填充的音频素材")
@field_validator('video', mode='before')
@classmethod
def parse_video_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@field_validator('audio', mode='before')
@classmethod
def parse_audio_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGExtractFrameRequest(BaseFFMPEGTaskRequest):
video: MediaSource = Field(description="提取帧画面的来源")
seek_time: Optional[Union[str, int, float]] = Field(default=None, description="先跳转到视频对应时间再取首帧",
examples=["00:00:01.000", "5.4", "2"])
frame_index: int = Field(description="提取的第几帧, 从1开始默认为1", default=1)
@field_validator('video', mode='before')
@classmethod
def parse_video_input(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
@computed_field(description="格式处理后的Seek Time")
@property
def prased_seek_time(self) -> 'TimeDelta':
if self.seek_time is None:
return None
elif isinstance(self.seek_time, str):
return TimeDelta.from_format_string(self.seek_time)
elif isinstance(self.seek_time, int):
return TimeDelta(seconds=self.seek_time)
elif isinstance(self.seek_time, float):
return TimeDelta(seconds=self.seek_time)
else:
raise TypeError("不支持的时间类型")
model_config = {
"arbitrary_types_allowed": True
}
class ComfyTaskRequest(BaseFFMPEGTaskRequest):
video_path: MediaSource = Field(
default=None, description="视频源")
start_time: str = Field(default="00:00:01.600", description="开始时间")
filename_prefix: str = Field(default=str(uuid.uuid4()), description="生成文件名前缀")
tts_text1: str = Field(default="好好看,这是我们家专门为女生定制的背心,", description="tts文本1")
tts_text2: str = Field(default="", description="tts文本2")
tts_text3: str = Field(default="", description="tts文本3")
tts_text4: str = Field(default="", description="tts文本4")
anchor_id: str = Field(default="dawan", description="讲话人ID")
speed: float = Field(default=1, description="讲话语速")
@field_validator('video_path', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
return MediaSource.from_str(v)
elif isinstance(v, MediaSource):
return v
else:
raise TypeError(v)
class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest):
stream_source: str = Field(description="直播源地址")
first_segment_duration: int = Field(default=2,
description="hls首个片段时长(秒), 首片段长度越小hls流能越快速开始播放")
segment_duration: int = Field(default=10, description="hls片段时长(秒)")
recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)")
monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时",
le=43200)
class GeminiRequest(BaseFFMPEGTaskRequest):
media_hls_url: MediaSource = Field(default="", description="视频流录制HLS地址 hls://格式 需录制超过20分钟")
product_cover_grid_uri_list: List[str] = Field(description="商品封面网格拼图URI列表")
product_list: List[str] = Field(description="商品名列表(时间倒序)"),
start_time: str = Field(default="00:00:00.000", description="开始时间(hls)")
end_time: str = Field(default="00:20:00.000", description="结束时间(hls)")
options: FFMPEGSliceOptions = Field(default=FFMPEGSliceOptions(), description="输出质量选项")
scale: float = Field(default=0.85, description="视频尺寸缩放倍率")
last_product_text: str = Field(default="", description="上一段视频结尾介绍的商品以及标签")
@field_validator('media_hls_url', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
media_source = MediaSource.from_str(v)
if media_source.protocol == MediaProtocol.hls:
return media_source
else:
raise pydantic.ValidationError('media只支持hls格式的urn')
elif isinstance(v, MediaSource):
return v
else:
raise pydantic.ValidationError("media格式读取失败")
class GeminiRequestFirstOnly(BaseFFMPEGTaskRequest):
product_cover_grid_uri_list: List[str] = Field(description="商品封面网格拼图URI列表")
product_list: List[str] = Field(description="商品名列表(时间倒序)"),
prompt_label: str = Field(default="production", description="使用的Langfuse Prompt Label")
class GeminiRequestSecondOnly(BaseFFMPEGTaskRequest):
media_hls_url: MediaSource = Field(default="", description="视频流录制HLS地址 hls://格式 需录制超过20分钟")
identified_product_list: List[Dict] = Field(description="第一阶段返回的识别商品及特征列表")
start_time: str = Field(default="00:00:00.000", description="开始时间(hls)")
end_time: str = Field(default="00:20:00.000", description="结束时间(hls)")
options: FFMPEGSliceOptions = Field(default=FFMPEGSliceOptions(), description="输出质量选项")
scale: float = Field(default=0.85, description="视频尺寸缩放倍率")
prompt_label: str = Field(default="production", description="使用的Langfuse Prompt Label"),
last_product_text: str = Field(default="", description="上一段视频结尾介绍的商品以及标签")
@field_validator('media_hls_url', mode='before')
@classmethod
def parse_inputs_media_hls_url(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
media_source = MediaSource.from_str(v)
if media_source.protocol == MediaProtocol.hls:
return media_source
else:
raise pydantic.ValidationError('media只支持hls格式的urn')
elif isinstance(v, MediaSource):
return v
else:
raise pydantic.ValidationError("media格式读取失败")
@field_validator("identified_product_list", mode="before")
@classmethod
def parse_inputs_identified_product_list(cls, v: str):
if isinstance(v, str):
return json.loads(v)
else:
raise pydantic.ValidationError("identified_product_list")
class MonitorLiveRoomProductRequest(BaseModel):
cookie: str = Field(default="YOUR_COOKIE", description="用户网页版抖音Cookie/Your web version of Douyin Cookie")
room_id: str = Field(default="", description="直播间room_id/Room room_id")
author_id: str = Field(default="", description="作者id/Author id")
class MakeGridGeminiRequest(BaseFFMPEGTaskRequest):
pic_info_list: List[Dict[str, str]] = Field(default=[],
description="包含图片信息的字典列表,每个字典包含 \"title\"\"cover\"")
image_size: int = Field(default=450, description="单个图片网格的尺寸/像素")
text_height: int = Field(default=40, description="文本框的高度/像素")
font_size: int = Field(default=18, description="文本尺寸/像素")
padding: int = Field(default=5, description="文本距离文本框边缘距离/像素")
separator: int = Field(default=12, description="分割线宽度/像素")
class FFMPEGConvertStreamRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="待转换的媒体源")
options: FFMPEGSliceOptions = Field(default_factory=FFMPEGSliceOptions, description="输出质量选项")
@field_validator('media', mode='before')
@classmethod
def parse_inputs(cls, v: Union[str, MediaSource]):
if isinstance(v, str):
media_source = MediaSource.from_str(v)
if media_source.protocol == MediaProtocol.hls:
return media_source
else:
raise pydantic.ValidationError('media只支持hls格式的urn')
elif isinstance(v, MediaSource):
return v
else:
raise pydantic.ValidationError("media格式读取失败")
class NakamaLogin(BaseModel):
email: EmailStr = Field(description="Bowong Echo(Nakama)的登录账号")
password: str = Field(description="Bowong Echo(Nakama)的登录密码")