优化google/upload接口的速度

* 优化google/upload接口的速度
* 切割视频接口新增输出质量选项

---------

Merge request URL: https://g-ldyi2063.coding.net/p/dev/d/modalDeploy/git/merge/4790?initial=true
Co-authored-by: shuohigh@gmail.com
This commit is contained in:
肖宇迪 2025-06-12 11:58:52 +08:00 committed by Coding
parent 8901d2a6a5
commit f1e4fa3d6c
6 changed files with 94 additions and 57 deletions

View File

@ -22,4 +22,6 @@ class WorkerConfig(BaseSettings):
comfyui_s3_input: Optional[str] = Field(default="comfyui-input", description="ComfyUI input S3文件夹名")
comfyui_s3_output: Optional[str] = Field(default="comfyui-output", description="ComfyUI output S3文件夹名")
modal_config: Any = SettingsConfigDict()
modal_config: Any = SettingsConfigDict(json_schema_extra={
"description": "可通过本地环境变量加载对应Field, 不区分大小写, Modal创建App的Image时可通过dotenv加载指定.env文件写入到Docker Image的系统变量",
})

View File

@ -7,7 +7,7 @@ from pydantic import BaseModel, Field, field_validator, ConfigDict, HttpUrl, com
from .ffmpeg_worker_model import FFMpegSliceSegment
from .media_model import MediaSource, MediaSources, MediaProtocol
from ..config import WorkerConfig
from ..utils.VideoUtils import VideoMetadata
from ..utils.VideoUtils import VideoMetadata, FFMPEGSliceOptions
config = WorkerConfig()
@ -124,6 +124,7 @@ class FFMPEGConvertStreamResponse(BaseFFMPEGTaskStatusResponse):
class FFMPEGSliceRequest(BaseFFMPEGTaskRequest):
media: MediaSource = Field(description="待切割的媒体源")
markers: List[FFMpegSliceSegment] = Field(description="按照时间顺序排序过的切割标记数组")
options: FFMPEGSliceOptions = Field(default_factory=FFMPEGSliceOptions, description="切割质量选项")
@field_validator('media', mode='before')
@classmethod
@ -428,14 +429,15 @@ class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest):
stream_source: str = Field(description="直播源地址")
segment_duration: int = Field(default=5, description="hls片段时长(秒)")
recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)")
monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时", le=43200)
monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时",
le=43200)
class GeminiRequest(BaseFFMPEGTaskRequest):
media_hls_url: MediaSource = Field(default="", description="视频流录制HLS地址 hls://格式 需录制超过20分钟")
product_list: List[Union[str, dict]] = Field(description="商品名列表(时间倒序)"),
start_time: str = Field(default="00:00:00.000", description="开始时间(hls)")
end_time:str = Field(default="00:20:00.000", description="结束时间(hls)")
end_time: str = Field(default="00:20:00.000", description="结束时间(hls)")
@field_validator('media_hls_url', mode='before')
@classmethod
@ -451,9 +453,11 @@ class GeminiRequest(BaseFFMPEGTaskRequest):
else:
raise pydantic.ValidationError("media格式读取失败")
class GeminiResultResponse(BaseFFMPEGTaskStatusResponse):
result: str = Field(default="", description="推理出的json")
class LiveProduct(BaseModel):
title: str = Field(default="", description="商品标题")
leaf_category: str = Field(default="", description="商品分类")
@ -462,19 +466,23 @@ class LiveProduct(BaseModel):
cover: str = Field(default="", description="商品封面图链接")
detail_url: str = Field(default="", description="商品详情链接")
class LiveProductCaches(BaseModel):
room_id: str = Field(default="", description="直播间room_id/Room room_id")
author_id: str = Field(default="", description="作者id/Author id")
update_time: str = Field(default=0, description="商品列表更新时间")
count: int = Field(default=0, description="缓存直播间商品数量")
product_list: List[LiveProduct] = Field(default=None,description="缓存直播间商品列表")
product_list: List[LiveProduct] = Field(default=None, description="缓存直播间商品列表")
class MonitorLiveRoomProductRequest(BaseModel):
cookie: str = Field(default="YOUR_COOKIE", description="用户网页版抖音Cookie/Your web version of Douyin Cookie")
room_id: str = Field(default="", description="直播间room_id/Room room_id")
author_id: str = Field(default="", description="作者id/Author id")
class LiveRoomProductCachesResponse(BaseModel):
status: int = Field(default=None, description="缓存状态/0-正常返回 1-直播已结束 2-IP风控 3-请求Tikhub API错误 4-内部错误")
status: int = Field(default=None,
description="缓存状态/0-正常返回 1-直播已结束 2-IP风控 3-请求Tikhub API错误 4-内部错误")
message: str = Field(default="", description="错误信息")
cache_json: str = Field(default="", description="缓存内容/Json文本")
cache_json: str = Field(default="", description="缓存内容/Json文本")

View File

@ -1,11 +1,10 @@
from typing import Annotated, cast
import modal
from fastapi import APIRouter, Depends, Header, Body
from fastapi.responses import Response, JSONResponse
from fastapi import APIRouter, Depends, Header
from fastapi.responses import Response
from loguru import logger
from starlette import status
import m3u8
from ..config import WorkerConfig
from ..middleware.authorization import verify_token
@ -51,8 +50,7 @@ sentry_header_schema = {
"description": "",
"headers": sentry_header_schema
},
},
)
}, )
async def get_task_status(task_id: str, response: Response) -> BaseFFMPEGTaskStatusResponse:
logger.info(f"Get task [{task_id}]")
task_info = await ModalUtils.get_modal_task_status(task_id)
@ -72,10 +70,10 @@ async def slice_media(request: FFMPEGSliceRequest,
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(media=request.media, markers=request.markers, sentry_trace=sentry_trace, webhook=request.webhook)
fn_call = fn.spawn(media=request.media, markers=request.markers, options=request.options,
sentry_trace=sentry_trace, webhook=request.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@router.post("/convert", summary="发起直播流转换任务", )
async def convert_media(request: FFMPEGConvertStreamRequest,
headers: Annotated[SentryTransactionHeader, Header()]) -> ModalTaskResponse:

View File

@ -1,6 +1,6 @@
import json
import os
from typing import Annotated, Optional, cast
from typing import Annotated, Optional, cast, List
import modal
import sentry_sdk
@ -25,6 +25,7 @@ router = APIRouter(prefix="/google", tags=["Google"])
class GoogleAPIKeyHeaders(BaseModel):
x_google_api_key: Optional[str] = Field(description="Google API Key")
class BundleHeaders(BaseModel):
x_google_api_key: Optional[str] = Field(description="Google API Key")
x_trace_id: str = Field(description="Sentry Transaction ID", default=None)
@ -41,8 +42,12 @@ async def upload_file_multipart(file: UploadFile,
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Google API Key")
content_length = file.size
content_type = file.content_type
if content_type not in ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg',
'video/webm', 'video/wmv', 'video/3gpp']:
# 允许上传的视频
google_content_type_allow_list = ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg',
'video/webm', 'video/wmv', 'video/3gpp', ]
# 允许上传的图片
google_content_type_allow_list.extend(["image/png", "image/jpeg", "image/webp", "image/heic", "image/heif"])
if content_type not in google_content_type_allow_list:
raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
logger.info(f"Uploading name = {file.filename}, size = {content_length}, type = {content_type} to google file")
with httpx.Client(timeout=1800) as client:
@ -63,7 +68,7 @@ async def upload_file_multipart(file: UploadFile,
upload_url = pre_upload_response.headers.get("X-Goog-Upload-Url")
upload_response = client.post(url=upload_url, content=file.file.read(), headers={
upload_response = client.post(url=upload_url, content=file.file, headers={
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
"Content-Type": content_type
@ -97,6 +102,7 @@ async def delete_file(filename: str, headers: Annotated[GoogleAPIKeyHeaders, Hea
response.raise_for_status()
return JSONResponse(content=response.json(), status_code=response.status_code)
@router.delete('/delete_all', summary="删除所有已上传的文件/第一页")
async def delete_all(headers: Annotated[GoogleAPIKeyHeaders, Header()]):
google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY")
@ -116,6 +122,7 @@ async def delete_all(headers: Annotated[GoogleAPIKeyHeaders, Header()]):
return JSONResponse(content={"msg": f"删除文件{filename}失败"}, status_code=resp.status_code)
return JSONResponse(content=response.json(), status_code=response.status_code)
@router.get('/list', summary="列出已上传的文件")
async def list_files(headers: Annotated[GoogleAPIKeyHeaders, Header()]):
google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY")
@ -128,20 +135,25 @@ async def list_files(headers: Annotated[GoogleAPIKeyHeaders, Header()]):
return JSONResponse(content={}, status_code=response.status_code)
return JSONResponse(content=response.json() if len(response.text) > 0 else "", status_code=response.status_code)
@router.post('/inference_gemini', summary="使用Gemini推理hls视频流指定时间段的打点情况")
async def inference_gemini(
data:GeminiRequest,
headers: Annotated[BundleHeaders, Header()],
) -> ModalTaskResponse:
data: GeminiRequest,
headers: Annotated[BundleHeaders, Header()],
) -> ModalTaskResponse:
google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY")
if not google_api_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Google API Key")
fn = modal.Function.from_name(config.modal_app_name,"video_hls_slice_inference", environment_name=config.modal_environment)
fn = modal.Function.from_name(config.modal_app_name, "video_hls_slice_inference",
environment_name=config.modal_environment)
fn_call = fn.spawn(data.media_hls_url, google_api_key, data.product_list, data.start_time, data.end_time,
SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), x_baggage=sentry_sdk.get_baggage())
if headers.x_trace_id is None else SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage))
SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(),
x_baggage=sentry_sdk.get_baggage())
if headers.x_trace_id is None else SentryTransactionInfo(x_trace_id=headers.x_trace_id,
x_baggage=headers.x_baggage))
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@router.get("/inference_gemini/{task_id}", summary="查询Gemini推理hls视频流指定时间段的打点任务")
async def gemini_status(task_id: str, response: Response) -> GeminiResultResponse:
task_info = await ModalUtils.get_modal_task_status(task_id)
@ -150,10 +162,11 @@ async def gemini_status(task_id: str, response: Response) -> GeminiResultRespons
response.headers["x-baggage"] = task_info.transaction.x_baggage
try:
return GeminiResultResponse(taskId=task_id, status=task_info.status, code=cast(int, task_info.error_code.value),
error=task_info.error_reason, result=json.dumps(task_info.results, ensure_ascii=False)
error=task_info.error_reason,
result=json.dumps(task_info.results, ensure_ascii=False)
if task_info.results is not None else "")
except Exception as e:
logger.exception(f"获取Gemini状态发生错误 {e}")
return GeminiResultResponse(taskId=task_id, status=task_info.status, code=cast(int, task_info.error_code.value),
error=task_info.error_reason + f"获取Gemini状态发生错误 {e}"
if task_info.error_reason else f"获取Gemini状态发生错误 {e}", result="")
if task_info.error_reason else f"获取Gemini状态发生错误 {e}", result="")

View File

@ -1,5 +1,4 @@
import asyncio
import re
import shutil
import tempfile
from datetime import datetime, timedelta
@ -7,15 +6,12 @@ from datetime import datetime, timedelta
import aiofiles
import aiohttp
from typing import Union, List, Tuple, Optional, Dict, Any
from typing import Union, List, Tuple, Optional, Dict, Any
import m3u8
import numpy as np
import json, os
import math
import retry
from aiohttp import ClientTimeout
from urllib3 import Retry
from .TimeUtils import TimeDelta
from pydantic import BaseModel, ConfigDict, computed_field, Field, field_validator
@ -39,7 +35,25 @@ from pedalboard.io import AudioFile
from loguru import logger
from .PathUtils import FileUtils
from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment
from ..models.ffmpeg_worker_model import FFMpegSliceSegment
class FFMPEGSliceOptions(BaseModel):
limit_size: Optional[int] = Field(default=None, description="不超过指定文件(字节)大小, 默认为空不限制输出大小")
crf: int = Field(default=16, description="输出视频的质量")
fps: int = Field(default=30, description="输出视频的FPS")
@computed_field(description="解析为字符表达式的文件大小")
@property
def pretty_limit_size(self) -> Optional[str]:
if self.limit_size is not None:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if self.limit_size < 1024.0:
return f"{self.limit_size:.2f} {unit}"
self.limit_size /= 1024.0
return f"{self.limit_size:.2f} PB"
else:
return None
class MediaStream(BaseModel):
@ -408,11 +422,13 @@ class VideoUtils:
@staticmethod
async def ffmpeg_slice_media(media_path: str, media_markers: List[FFMpegSliceSegment],
options: FFMPEGSliceOptions,
output_path: Optional[str] = None) -> List[Tuple[str, VideoMetadata]]:
"""
使用本地视频文件按时间段切割出分段视频
:param media_path: 本地视频路径
:param media_markers: 分段起始结束时间标记
:param options: 输出切割质量选项
:param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4 _2.mp4等片段编号
:return: 输出片段的本地路径
"""
@ -449,17 +465,20 @@ class VideoUtils:
raise ValueError(
f"{i}个切割点结束点{marker.end.total_seconds()}s超出视频时长[0-{metadata.format.duration}s]范围")
segment_output_path = FileUtils.file_path_extend(output_path, str(i))
ffmpeg_cmd.output(segment_output_path,
map=[f"[cut{i}]", f"[acut{i}]"],
reset_timestamps="1",
sc_threshold="0",
g="1",
force_key_frames="expr:gte(t,n_forced*1)",
vcodec="libx264",
acodec="aac",
crf=16,
r=30,
)
ffmpeg_options = {
"map": [f"[cut{i}]", f"[acut{i}]"],
"reset_timestamps": "1",
"sc_threshold": "0",
"g": "1",
"force_key_frames": "expr:gte(t,n_forced*1)",
"vcodec": "libx264",
"acodec": "aac",
"crf": options.crf,
"r": options.fps
}
if options.limit_size:
ffmpeg_options["fs"] = options.pretty_limit_size
ffmpeg_cmd.output(segment_output_path, options=ffmpeg_options)
temp_outputs.append(segment_output_path)
await ffmpeg_cmd.execute()

View File

@ -1,13 +1,5 @@
import asyncio
from pathlib import Path
import botocore
import httpx
import modal
from dotenv import dotenv_values
from watchdog.events import DirMovedEvent
from BowongModalFunctions.utils.ModalUtils import ModalUtils
ffmpeg_worker_image = (
modal.Image.debian_slim(python_version="3.11")
@ -28,15 +20,18 @@ with ffmpeg_worker_image.imports():
from typing import List, Optional, Tuple, Dict, Any, Union, Set
from loguru import logger
from modal import current_function_call_id
from ffmpeg.asyncio import FFmpeg
from BowongModalFunctions.utils.SentryUtils import SentryUtils
from BowongModalFunctions.utils.PathUtils import FileUtils
from BowongModalFunctions.utils.VideoUtils import VideoUtils
from BowongModalFunctions.utils.ModalUtils import ModalUtils
from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment
from BowongModalFunctions.models.media_model import MediaSources, MediaSource, MediaProtocol
from BowongModalFunctions.models.web_model import SentryTransactionInfo, WebhookNotify, FFMPEGResult, \
WebhookMethodEnum, BaseFFMPEGTaskStatusResponse, TaskStatus
WebhookMethodEnum, BaseFFMPEGTaskStatusResponse, TaskStatus, FFMPEGSliceOptions
from BowongModalFunctions.config import WorkerConfig
import httpx
from pathlib import Path
import asyncio
config = WorkerConfig()
@ -124,6 +119,7 @@ with ffmpeg_worker_image.imports():
@modal.concurrent(max_inputs=1)
async def ffmpeg_slice_media(media: MediaSource,
markers: List[FFMpegSliceSegment],
options: FFMPEGSliceOptions,
sentry_trace: Optional[SentryTransactionInfo] = None,
webhook: Optional[WebhookNotify] = None) -> Tuple[
List[FFMPEGResult], Optional[SentryTransactionInfo]]:
@ -134,13 +130,15 @@ with ffmpeg_worker_image.imports():
sentry_baggage=sentry_trace.x_baggage if sentry_trace else None)
@SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id)
async def ffmpeg_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment],
fn_id: str) -> List[FFMPEGResult]:
fn_id: str, options: Optional[FFMPEGSliceOptions] = None,
) -> List[FFMPEGResult]:
cache_filepath = f"{s3_mount}/{media_source.cache_filepath}"
logger.info(f"{media_source.urn}切割")
for i, marker in enumerate(media_markers):
logger.info(f"[{i}] {marker.start.toFormatStr()} --> {marker.end.toFormatStr()}")
segments = await VideoUtils.ffmpeg_slice_media(media_path=cache_filepath,
media_markers=media_markers,
options=options,
output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4")
return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1],
content_length=FileUtils.get_file_size(segment[0])) for segment in segments]
@ -149,9 +147,9 @@ with ffmpeg_worker_image.imports():
sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None,
sentry_baggage=sentry_trace.x_baggage if sentry_trace else None)
@SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id)
async def ffmpeg_hls_slice_process(media_source: MediaSource,
media_markers: List[FFMpegSliceSegment],
fn_id: str) -> List[FFMPEGResult]:
async def ffmpeg_hls_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment],
fn_id: str,
options: Optional[FFMPEGSliceOptions] = None, ) -> List[FFMPEGResult]:
hls_m3u8_url = media_source.path
segments = await VideoUtils.ffmpeg_slice_stream_media_multithread(media_path=hls_m3u8_url,
media_markers=media_markers,
@ -594,7 +592,6 @@ with ffmpeg_worker_image.imports():
)
self.s3_root_output_dir = s3_mount_output_dir.replace(s3_mount, "")
def on_created(self, event: FileCreatedEvent) -> None:
logger.info(f"[created] {event.src_path}")
if event.src_path.endswith(".ts"):