341 lines
12 KiB
Python
341 lines
12 KiB
Python
import uuid
|
||
from enum import Enum
|
||
from typing import List, Union, Optional
|
||
|
||
import pydantic
|
||
from pydantic import BaseModel, Field, field_validator, ConfigDict, HttpUrl
|
||
from .ffmpeg_worker_model import FFMpegSliceSegment
|
||
from .media_model import MediaSource, MediaSources
|
||
|
||
|
||
class TaskStatus(str, Enum):
|
||
running = "running"
|
||
failed = "failed"
|
||
success = "success"
|
||
expired = "expired"
|
||
|
||
|
||
class ErrorCode(int, Enum):
|
||
SUCCESS = 0
|
||
PARAM_ERROR = 10001
|
||
NOT_FOUND = 10002
|
||
UNAUTHORIZED = 10003
|
||
FORBIDDEN = 10004
|
||
BUSINESS_ERROR = 10005
|
||
SYSTEM_ERROR = 99999
|
||
|
||
|
||
class WebhookMethodEnum(str, Enum):
|
||
GET = "get"
|
||
POST = "post"
|
||
|
||
|
||
class SentryTransactionHeader(BaseModel):
|
||
x_trace_id: Optional[str] = Field(description="Sentry Transaction ID", default=None)
|
||
x_baggage: Optional[str] = Field(description="Sentry Transaction baggage", default=None)
|
||
|
||
|
||
class SentryTransactionInfo(BaseModel):
|
||
x_trace_id: str = Field(description="Sentry Transaction ID")
|
||
x_baggage: str = Field(description="Sentry Transaction baggage")
|
||
|
||
|
||
class FFMPEGSliceTaskStatusRequest(BaseModel):
|
||
taskId: str = Field(description="任务Id")
|
||
|
||
|
||
class ModalTaskResponse(BaseModel):
|
||
success: bool = Field(description="任务接受成功")
|
||
taskId: str = Field(description="任务Id")
|
||
|
||
|
||
class WebhookNotify(BaseModel):
|
||
endpoint: HttpUrl = Field(description="Webhook回调端点", examples=["https://webhook.example.com"])
|
||
method: WebhookMethodEnum = Field(description="Webhook回调请求方法", examples=["get", "post"])
|
||
authentication: Optional[str] = Field(description="Webhook authentication回调请求降权token,如不需要鉴权则不填",
|
||
examples=["Bearer 123456"], default=None)
|
||
|
||
|
||
class BaseFFMPEGTaskRequest(BaseModel):
|
||
webhook: Optional[WebhookNotify] = Field(description="Task webhook", default=None)
|
||
|
||
|
||
class BaseFFMPEGTaskStatusResponse(BaseModel):
|
||
taskId: str = Field(description="任务Id")
|
||
status: TaskStatus = Field(description="任务运行状态")
|
||
error: Optional[str] = Field(description="任务错误原因", default=None)
|
||
code: Optional[int] = Field(description="任务错误原因代码", default=None)
|
||
results: Optional[List[str]] = Field(description="任务运行结果", default=None)
|
||
|
||
model_config = ConfigDict(extra='ignore')
|
||
|
||
|
||
class FFMPEGSliceRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="待切割的媒体源")
|
||
markers: List[FFMpegSliceSegment] = Field(description="切割标记数组")
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGSliceTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[List[str]] = Field(default=None, description="任务运行结果")
|
||
|
||
|
||
class FFMPEGConcatRequest(BaseFFMPEGTaskRequest):
|
||
medias: MediaSources = Field(description="待合并的媒体源")
|
||
|
||
|
||
class FFMPEGConcatTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="任务运行结果")
|
||
|
||
|
||
class FFMPEGExtractAudioRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="待提取音频的媒体源")
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGExtractAudioTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="任务运行结果")
|
||
|
||
|
||
class FFMPEGCornerMirrorRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="需要处理的媒体源")
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGCornerMirrorTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="任务运行结果")
|
||
|
||
|
||
class FFMPEGBgmRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="需要处理的媒体源")
|
||
bgm_media: MediaSource = Field(description="添加的BGM媒体源", alias="bgmMedia")
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
@field_validator('bgm_media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
media = MediaSource.from_str(v)
|
||
if not media.file_extension in ['wav', 'mp3']:
|
||
raise pydantic.ValidationError("必须使用符合规范的音频格式, 如wav或者mp3")
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGBgmTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="生成结果的URN")
|
||
|
||
|
||
class FFMPEGZoomLoopRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="需要处理的媒体源")
|
||
duration: float = Field(description="放大缩小一个循环的持续时间秒数", default=6.0)
|
||
zoom: float = Field(description="放大缩小系数", default=0.1)
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGZoomLoopTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="生成结果的URN")
|
||
|
||
|
||
class FFMPEGOverlayGifRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="需要处理的媒体源")
|
||
gif: MediaSource = Field(description="叠加的特效gif")
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
@field_validator('gif', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
media = MediaSource.from_str(v)
|
||
if not media.file_extension == 'gif':
|
||
raise pydantic.ValidationError("必须使用.gif文件")
|
||
return media
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGOverlayGifTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="生成结果的URN")
|
||
|
||
|
||
class FFMPEGSubtitleOverlayRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="需要处理的媒体源")
|
||
subtitle: MediaSource = Field(description="需要叠加的字幕文件")
|
||
fonts: List[MediaSource] = Field(description="字幕文件内使用到的字体文件")
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
@field_validator('subtitle', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
media = MediaSource.from_str(v)
|
||
if not media.file_extension in ['ass']:
|
||
raise pydantic.ValidationError("必须使用标准字幕文件, 如.ass")
|
||
return media
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
@field_validator('fonts', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]) -> List[MediaSource]:
|
||
if not v:
|
||
raise pydantic.ValidationError("fonts输入为空")
|
||
result = []
|
||
for item in v:
|
||
if isinstance(item, str):
|
||
result.append(MediaSource.from_str(item))
|
||
elif isinstance(item, MediaSource):
|
||
result.append(item)
|
||
else:
|
||
raise pydantic.ValidationError("fonts元素类型错误: 必须是字符串")
|
||
return result
|
||
|
||
|
||
class FFMPEGSubtitleTaskStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="生成结果的URN")
|
||
|
||
|
||
class FFMPEGMixBgmWithNoiseReduceRequest(BaseFFMPEGTaskRequest):
|
||
media: MediaSource = Field(description="需要处理的媒体源")
|
||
bgm: MediaSource = Field(description="需要添加的BGM媒体源")
|
||
video_volume: float = Field(description="最终输出的视频音量系数", default=1.4)
|
||
music_volume: float = Field(description="最终输出的BGM音量系数", default=0.1)
|
||
noise_sample: Optional[MediaSource] = Field(description="常考噪音样本,如不指定将使用源视频的前2秒作为样本",
|
||
default=None)
|
||
|
||
@field_validator('media', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
@field_validator('bgm', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
@field_validator('noise_sample', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[None, str, MediaSource]):
|
||
if not v:
|
||
return None
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|
||
|
||
|
||
class FFMPEGMixBgmWithNoiseReduceStatusResponse(BaseFFMPEGTaskStatusResponse):
|
||
result: Optional[str] = Field(default=None, description="生成结果的URN")
|
||
|
||
|
||
class ComfyTaskStatusResponse(BaseModel):
|
||
taskId: str = Field(description="任务Id")
|
||
status: TaskStatus = Field(description="任务运行状态")
|
||
error: Optional[str] = Field(description="任务错误原因", default=None)
|
||
code: Optional[int] = Field(description="任务错误原因代码", default=None)
|
||
result: Optional[str] = Field(description="任务运行结果", default=None)
|
||
|
||
|
||
class ComfyTaskRequest(BaseFFMPEGTaskRequest):
|
||
video_path: MediaSource = Field(
|
||
default=None, description="视频源")
|
||
start_time: str = Field(default="00:00:01.600", description="开始时间")
|
||
filename_prefix: str = Field(default=str(uuid.uuid4()), description="生成文件名前缀")
|
||
tts_text1: str = Field(default="好好看,这是我们家专门为女生定制的背心,", description="tts文本1")
|
||
tts_text2: str = Field(default="", description="tts文本2")
|
||
tts_text3: str = Field(default="", description="tts文本3")
|
||
tts_text4: str = Field(default="", description="tts文本4")
|
||
anchor_id: str = Field(default="dawan", description="讲话人ID")
|
||
speed: float = Field(default=1, description="讲话语速")
|
||
|
||
@field_validator('video_path', mode='before')
|
||
@classmethod
|
||
def parse_inputs(cls, v: Union[str, MediaSource]):
|
||
if isinstance(v, str):
|
||
return MediaSource.from_str(v)
|
||
elif isinstance(v, MediaSource):
|
||
return v
|
||
else:
|
||
raise TypeError(v)
|