From f1e4fa3d6c074a8101b2fe89864eb95c6e6facc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=82=96=E5=AE=87=E8=BF=AA?= Date: Thu, 12 Jun 2025 11:58:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96google/upload=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=9A=84=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 优化google/upload接口的速度 * 切割视频接口新增输出质量选项 --------- Merge request URL: https://g-ldyi2063.coding.net/p/dev/d/modalDeploy/git/merge/4790?initial=true Co-authored-by: shuohigh@gmail.com --- src/BowongModalFunctions/config.py | 4 +- src/BowongModalFunctions/models/web_model.py | 20 +++++--- src/BowongModalFunctions/router/ffmpeg.py | 12 ++--- src/BowongModalFunctions/router/google.py | 37 +++++++++----- src/BowongModalFunctions/utils/VideoUtils.py | 51 ++++++++++++++------ src/cluster/ffmpeg_app.py | 27 +++++------ 6 files changed, 94 insertions(+), 57 deletions(-) diff --git a/src/BowongModalFunctions/config.py b/src/BowongModalFunctions/config.py index 681a3c0..ad688aa 100644 --- a/src/BowongModalFunctions/config.py +++ b/src/BowongModalFunctions/config.py @@ -22,4 +22,6 @@ class WorkerConfig(BaseSettings): comfyui_s3_input: Optional[str] = Field(default="comfyui-input", description="ComfyUI input S3文件夹名") comfyui_s3_output: Optional[str] = Field(default="comfyui-output", description="ComfyUI output S3文件夹名") - modal_config: Any = SettingsConfigDict() + modal_config: Any = SettingsConfigDict(json_schema_extra={ + "description": "可通过本地环境变量加载对应Field, 不区分大小写, Modal创建App的Image时可通过dotenv加载指定.env文件写入到Docker Image的系统变量", + }) diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index 68ecf83..2747d03 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, Field, field_validator, ConfigDict, HttpUrl, com from .ffmpeg_worker_model import FFMpegSliceSegment from .media_model import MediaSource, MediaSources, MediaProtocol from ..config import WorkerConfig -from ..utils.VideoUtils import VideoMetadata +from ..utils.VideoUtils import VideoMetadata, FFMPEGSliceOptions config = WorkerConfig() @@ -124,6 +124,7 @@ class FFMPEGConvertStreamResponse(BaseFFMPEGTaskStatusResponse): class FFMPEGSliceRequest(BaseFFMPEGTaskRequest): media: MediaSource = Field(description="待切割的媒体源") markers: List[FFMpegSliceSegment] = Field(description="按照时间顺序排序过的切割标记数组") + options: FFMPEGSliceOptions = Field(default_factory=FFMPEGSliceOptions, description="切割质量选项") @field_validator('media', mode='before') @classmethod @@ -428,14 +429,15 @@ class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest): stream_source: str = Field(description="直播源地址") segment_duration: int = Field(default=5, description="hls片段时长(秒)") recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)") - monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时", le=43200) + monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时", + le=43200) class GeminiRequest(BaseFFMPEGTaskRequest): media_hls_url: MediaSource = Field(default="", description="视频流录制HLS地址 hls://格式 需录制超过20分钟") product_list: List[Union[str, dict]] = Field(description="商品名列表(时间倒序)"), start_time: str = Field(default="00:00:00.000", description="开始时间(hls)") - end_time:str = Field(default="00:20:00.000", description="结束时间(hls)") + end_time: str = Field(default="00:20:00.000", description="结束时间(hls)") @field_validator('media_hls_url', mode='before') @classmethod @@ -451,9 +453,11 @@ class GeminiRequest(BaseFFMPEGTaskRequest): else: raise pydantic.ValidationError("media格式读取失败") + class GeminiResultResponse(BaseFFMPEGTaskStatusResponse): result: str = Field(default="", description="推理出的json") + class LiveProduct(BaseModel): title: str = Field(default="", description="商品标题") leaf_category: str = Field(default="", description="商品分类") @@ -462,19 +466,23 @@ class LiveProduct(BaseModel): cover: str = Field(default="", description="商品封面图链接") detail_url: str = Field(default="", description="商品详情链接") + class LiveProductCaches(BaseModel): room_id: str = Field(default="", description="直播间room_id/Room room_id") author_id: str = Field(default="", description="作者id/Author id") update_time: str = Field(default=0, description="商品列表更新时间") count: int = Field(default=0, description="缓存直播间商品数量") - product_list: List[LiveProduct] = Field(default=None,description="缓存直播间商品列表") + product_list: List[LiveProduct] = Field(default=None, description="缓存直播间商品列表") + class MonitorLiveRoomProductRequest(BaseModel): cookie: str = Field(default="YOUR_COOKIE", description="用户网页版抖音Cookie/Your web version of Douyin Cookie") room_id: str = Field(default="", description="直播间room_id/Room room_id") author_id: str = Field(default="", description="作者id/Author id") + class LiveRoomProductCachesResponse(BaseModel): - status: int = Field(default=None, description="缓存状态/0-正常返回 1-直播已结束 2-IP风控 3-请求Tikhub API错误 4-内部错误") + status: int = Field(default=None, + description="缓存状态/0-正常返回 1-直播已结束 2-IP风控 3-请求Tikhub API错误 4-内部错误") message: str = Field(default="", description="错误信息") - cache_json: str = Field(default="", description="缓存内容/Json文本") \ No newline at end of file + cache_json: str = Field(default="", description="缓存内容/Json文本") diff --git a/src/BowongModalFunctions/router/ffmpeg.py b/src/BowongModalFunctions/router/ffmpeg.py index 72ee316..1fc93ec 100644 --- a/src/BowongModalFunctions/router/ffmpeg.py +++ b/src/BowongModalFunctions/router/ffmpeg.py @@ -1,11 +1,10 @@ from typing import Annotated, cast import modal -from fastapi import APIRouter, Depends, Header, Body -from fastapi.responses import Response, JSONResponse +from fastapi import APIRouter, Depends, Header +from fastapi.responses import Response from loguru import logger from starlette import status -import m3u8 from ..config import WorkerConfig from ..middleware.authorization import verify_token @@ -51,8 +50,7 @@ sentry_header_schema = { "description": "", "headers": sentry_header_schema }, -}, - ) +}, ) async def get_task_status(task_id: str, response: Response) -> BaseFFMPEGTaskStatusResponse: logger.info(f"Get task [{task_id}]") task_info = await ModalUtils.get_modal_task_status(task_id) @@ -72,10 +70,10 @@ async def slice_media(request: FFMPEGSliceRequest, sentry_trace = None if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) - fn_call = fn.spawn(media=request.media, markers=request.markers, sentry_trace=sentry_trace, webhook=request.webhook) + fn_call = fn.spawn(media=request.media, markers=request.markers, options=request.options, + sentry_trace=sentry_trace, webhook=request.webhook) return ModalTaskResponse(success=True, taskId=fn_call.object_id) - @router.post("/convert", summary="发起直播流转换任务", ) async def convert_media(request: FFMPEGConvertStreamRequest, headers: Annotated[SentryTransactionHeader, Header()]) -> ModalTaskResponse: diff --git a/src/BowongModalFunctions/router/google.py b/src/BowongModalFunctions/router/google.py index f8a7954..c4132e7 100644 --- a/src/BowongModalFunctions/router/google.py +++ b/src/BowongModalFunctions/router/google.py @@ -1,6 +1,6 @@ import json import os -from typing import Annotated, Optional, cast +from typing import Annotated, Optional, cast, List import modal import sentry_sdk @@ -25,6 +25,7 @@ router = APIRouter(prefix="/google", tags=["Google"]) class GoogleAPIKeyHeaders(BaseModel): x_google_api_key: Optional[str] = Field(description="Google API Key") + class BundleHeaders(BaseModel): x_google_api_key: Optional[str] = Field(description="Google API Key") x_trace_id: str = Field(description="Sentry Transaction ID", default=None) @@ -41,8 +42,12 @@ async def upload_file_multipart(file: UploadFile, raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Google API Key") content_length = file.size content_type = file.content_type - if content_type not in ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg', - 'video/webm', 'video/wmv', 'video/3gpp']: + # 允许上传的视频 + google_content_type_allow_list = ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg', + 'video/webm', 'video/wmv', 'video/3gpp', ] + # 允许上传的图片 + google_content_type_allow_list.extend(["image/png", "image/jpeg", "image/webp", "image/heic", "image/heif"]) + if content_type not in google_content_type_allow_list: raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) logger.info(f"Uploading name = {file.filename}, size = {content_length}, type = {content_type} to google file") with httpx.Client(timeout=1800) as client: @@ -63,7 +68,7 @@ async def upload_file_multipart(file: UploadFile, upload_url = pre_upload_response.headers.get("X-Goog-Upload-Url") - upload_response = client.post(url=upload_url, content=file.file.read(), headers={ + upload_response = client.post(url=upload_url, content=file.file, headers={ "X-Goog-Upload-Offset": "0", "X-Goog-Upload-Command": "upload, finalize", "Content-Type": content_type @@ -97,6 +102,7 @@ async def delete_file(filename: str, headers: Annotated[GoogleAPIKeyHeaders, Hea response.raise_for_status() return JSONResponse(content=response.json(), status_code=response.status_code) + @router.delete('/delete_all', summary="删除所有已上传的文件/第一页") async def delete_all(headers: Annotated[GoogleAPIKeyHeaders, Header()]): google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY") @@ -116,6 +122,7 @@ async def delete_all(headers: Annotated[GoogleAPIKeyHeaders, Header()]): return JSONResponse(content={"msg": f"删除文件{filename}失败"}, status_code=resp.status_code) return JSONResponse(content=response.json(), status_code=response.status_code) + @router.get('/list', summary="列出已上传的文件") async def list_files(headers: Annotated[GoogleAPIKeyHeaders, Header()]): google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY") @@ -128,20 +135,25 @@ async def list_files(headers: Annotated[GoogleAPIKeyHeaders, Header()]): return JSONResponse(content={}, status_code=response.status_code) return JSONResponse(content=response.json() if len(response.text) > 0 else "", status_code=response.status_code) + @router.post('/inference_gemini', summary="使用Gemini推理hls视频流指定时间段的打点情况") async def inference_gemini( - data:GeminiRequest, - headers: Annotated[BundleHeaders, Header()], - ) -> ModalTaskResponse: + data: GeminiRequest, + headers: Annotated[BundleHeaders, Header()], +) -> ModalTaskResponse: google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY") if not google_api_key: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Google API Key") - fn = modal.Function.from_name(config.modal_app_name,"video_hls_slice_inference", environment_name=config.modal_environment) + fn = modal.Function.from_name(config.modal_app_name, "video_hls_slice_inference", + environment_name=config.modal_environment) fn_call = fn.spawn(data.media_hls_url, google_api_key, data.product_list, data.start_time, data.end_time, - SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), x_baggage=sentry_sdk.get_baggage()) - if headers.x_trace_id is None else SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)) + SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), + x_baggage=sentry_sdk.get_baggage()) + if headers.x_trace_id is None else SentryTransactionInfo(x_trace_id=headers.x_trace_id, + x_baggage=headers.x_baggage)) return ModalTaskResponse(success=True, taskId=fn_call.object_id) + @router.get("/inference_gemini/{task_id}", summary="查询Gemini推理hls视频流指定时间段的打点任务") async def gemini_status(task_id: str, response: Response) -> GeminiResultResponse: task_info = await ModalUtils.get_modal_task_status(task_id) @@ -150,10 +162,11 @@ async def gemini_status(task_id: str, response: Response) -> GeminiResultRespons response.headers["x-baggage"] = task_info.transaction.x_baggage try: return GeminiResultResponse(taskId=task_id, status=task_info.status, code=cast(int, task_info.error_code.value), - error=task_info.error_reason, result=json.dumps(task_info.results, ensure_ascii=False) + error=task_info.error_reason, + result=json.dumps(task_info.results, ensure_ascii=False) if task_info.results is not None else "") except Exception as e: logger.exception(f"获取Gemini状态发生错误 {e}") return GeminiResultResponse(taskId=task_id, status=task_info.status, code=cast(int, task_info.error_code.value), error=task_info.error_reason + f"获取Gemini状态发生错误 {e}" - if task_info.error_reason else f"获取Gemini状态发生错误 {e}", result="") \ No newline at end of file + if task_info.error_reason else f"获取Gemini状态发生错误 {e}", result="") diff --git a/src/BowongModalFunctions/utils/VideoUtils.py b/src/BowongModalFunctions/utils/VideoUtils.py index 84652d6..e0ece3b 100644 --- a/src/BowongModalFunctions/utils/VideoUtils.py +++ b/src/BowongModalFunctions/utils/VideoUtils.py @@ -1,5 +1,4 @@ import asyncio -import re import shutil import tempfile from datetime import datetime, timedelta @@ -7,15 +6,12 @@ from datetime import datetime, timedelta import aiofiles import aiohttp from typing import Union, List, Tuple, Optional, Dict, Any -from typing import Union, List, Tuple, Optional, Dict, Any import m3u8 import numpy as np import json, os import math -import retry from aiohttp import ClientTimeout -from urllib3 import Retry from .TimeUtils import TimeDelta from pydantic import BaseModel, ConfigDict, computed_field, Field, field_validator @@ -39,7 +35,25 @@ from pedalboard.io import AudioFile from loguru import logger from .PathUtils import FileUtils -from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment +from ..models.ffmpeg_worker_model import FFMpegSliceSegment + + +class FFMPEGSliceOptions(BaseModel): + limit_size: Optional[int] = Field(default=None, description="不超过指定文件(字节)大小, 默认为空不限制输出大小") + crf: int = Field(default=16, description="输出视频的质量") + fps: int = Field(default=30, description="输出视频的FPS") + + @computed_field(description="解析为字符表达式的文件大小") + @property + def pretty_limit_size(self) -> Optional[str]: + if self.limit_size is not None: + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if self.limit_size < 1024.0: + return f"{self.limit_size:.2f} {unit}" + self.limit_size /= 1024.0 + return f"{self.limit_size:.2f} PB" + else: + return None class MediaStream(BaseModel): @@ -408,11 +422,13 @@ class VideoUtils: @staticmethod async def ffmpeg_slice_media(media_path: str, media_markers: List[FFMpegSliceSegment], + options: FFMPEGSliceOptions, output_path: Optional[str] = None) -> List[Tuple[str, VideoMetadata]]: """ 使用本地视频文件按时间段切割出分段视频 :param media_path: 本地视频路径 :param media_markers: 分段起始结束时间标记 + :param options: 输出切割质量选项 :param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4, _2.mp4等片段编号 :return: 输出片段的本地路径 """ @@ -449,17 +465,20 @@ class VideoUtils: raise ValueError( f"第{i}个切割点结束点{marker.end.total_seconds()}s超出视频时长[0-{metadata.format.duration}s]范围") segment_output_path = FileUtils.file_path_extend(output_path, str(i)) - ffmpeg_cmd.output(segment_output_path, - map=[f"[cut{i}]", f"[acut{i}]"], - reset_timestamps="1", - sc_threshold="0", - g="1", - force_key_frames="expr:gte(t,n_forced*1)", - vcodec="libx264", - acodec="aac", - crf=16, - r=30, - ) + ffmpeg_options = { + "map": [f"[cut{i}]", f"[acut{i}]"], + "reset_timestamps": "1", + "sc_threshold": "0", + "g": "1", + "force_key_frames": "expr:gte(t,n_forced*1)", + "vcodec": "libx264", + "acodec": "aac", + "crf": options.crf, + "r": options.fps + } + if options.limit_size: + ffmpeg_options["fs"] = options.pretty_limit_size + ffmpeg_cmd.output(segment_output_path, options=ffmpeg_options) temp_outputs.append(segment_output_path) await ffmpeg_cmd.execute() diff --git a/src/cluster/ffmpeg_app.py b/src/cluster/ffmpeg_app.py index 9fa7862..b0918fd 100644 --- a/src/cluster/ffmpeg_app.py +++ b/src/cluster/ffmpeg_app.py @@ -1,13 +1,5 @@ -import asyncio -from pathlib import Path - -import botocore -import httpx import modal from dotenv import dotenv_values -from watchdog.events import DirMovedEvent - -from BowongModalFunctions.utils.ModalUtils import ModalUtils ffmpeg_worker_image = ( modal.Image.debian_slim(python_version="3.11") @@ -28,15 +20,18 @@ with ffmpeg_worker_image.imports(): from typing import List, Optional, Tuple, Dict, Any, Union, Set from loguru import logger from modal import current_function_call_id - from ffmpeg.asyncio import FFmpeg from BowongModalFunctions.utils.SentryUtils import SentryUtils from BowongModalFunctions.utils.PathUtils import FileUtils from BowongModalFunctions.utils.VideoUtils import VideoUtils + from BowongModalFunctions.utils.ModalUtils import ModalUtils from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment from BowongModalFunctions.models.media_model import MediaSources, MediaSource, MediaProtocol from BowongModalFunctions.models.web_model import SentryTransactionInfo, WebhookNotify, FFMPEGResult, \ - WebhookMethodEnum, BaseFFMPEGTaskStatusResponse, TaskStatus + WebhookMethodEnum, BaseFFMPEGTaskStatusResponse, TaskStatus, FFMPEGSliceOptions from BowongModalFunctions.config import WorkerConfig + import httpx + from pathlib import Path + import asyncio config = WorkerConfig() @@ -124,6 +119,7 @@ with ffmpeg_worker_image.imports(): @modal.concurrent(max_inputs=1) async def ffmpeg_slice_media(media: MediaSource, markers: List[FFMpegSliceSegment], + options: FFMPEGSliceOptions, sentry_trace: Optional[SentryTransactionInfo] = None, webhook: Optional[WebhookNotify] = None) -> Tuple[ List[FFMPEGResult], Optional[SentryTransactionInfo]]: @@ -134,13 +130,15 @@ with ffmpeg_worker_image.imports(): sentry_baggage=sentry_trace.x_baggage if sentry_trace else None) @SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id) async def ffmpeg_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment], - fn_id: str) -> List[FFMPEGResult]: + fn_id: str, options: Optional[FFMPEGSliceOptions] = None, + ) -> List[FFMPEGResult]: cache_filepath = f"{s3_mount}/{media_source.cache_filepath}" logger.info(f"从{media_source.urn}切割") for i, marker in enumerate(media_markers): logger.info(f"[{i}] {marker.start.toFormatStr()} --> {marker.end.toFormatStr()}") segments = await VideoUtils.ffmpeg_slice_media(media_path=cache_filepath, media_markers=media_markers, + options=options, output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], content_length=FileUtils.get_file_size(segment[0])) for segment in segments] @@ -149,9 +147,9 @@ with ffmpeg_worker_image.imports(): sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None, sentry_baggage=sentry_trace.x_baggage if sentry_trace else None) @SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id) - async def ffmpeg_hls_slice_process(media_source: MediaSource, - media_markers: List[FFMpegSliceSegment], - fn_id: str) -> List[FFMPEGResult]: + async def ffmpeg_hls_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment], + fn_id: str, + options: Optional[FFMPEGSliceOptions] = None, ) -> List[FFMPEGResult]: hls_m3u8_url = media_source.path segments = await VideoUtils.ffmpeg_slice_stream_media_multithread(media_path=hls_m3u8_url, media_markers=media_markers, @@ -594,7 +592,6 @@ with ffmpeg_worker_image.imports(): ) self.s3_root_output_dir = s3_mount_output_dir.replace(s3_mount, "") - def on_created(self, event: FileCreatedEvent) -> None: logger.info(f"[created] {event.src_path}") if event.src_path.endswith(".ts"):