feat: 新增仅视频流的视频拼接功能

- 在VideoUtils中新增ffmpeg_concat_medias_video_only函数,仅处理视频流忽略音频
- 新增concat_medias_video_only.py模块,提供Modal函数封装
- 在ffmpeg路由中新增/concat-video-only接口
- 解决音频流缺失导致的"Stream specifier ':a' matches no streams"错误
- 适用于不需要音频的视频拼接场景
This commit is contained in:
imeepos 2025-09-24 15:35:46 +08:00
parent 4479544c18
commit 7ac387be1d
5 changed files with 127 additions and 2 deletions

View File

@ -3,7 +3,7 @@
## 部署
```bash
modal deploy -m cluster.app
modal deploy -m cluster.app --env dev
```
## 本地验证fastapi部署

View File

@ -6,7 +6,7 @@ authors = [
]
description = "Bowong modal云函数以及对于的FastAPI接口"
readme = "src/BowongModalFunctions/readme.md"
requires-python = ">=3.11"
requires-python = ">=3.10"
dependencies = [
"fastapi[standard]>=0.115.12",
"backoff>=2.2.1",

View File

@ -94,6 +94,22 @@ async def concat_media(body: FFMPEGConcatRequest,
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@router.post("/concat-video-only",
summary="发起合并任务(仅视频流)",
description="依据AI分析的结果发起合并任务仅处理视频流忽略音频",
)
async def concat_media_video_only(body: FFMPEGConcatRequest,
headers: Annotated[SentryTransactionHeader, Header()]) -> ModalTaskResponse:
medias = body.medias
fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_concat_medias_video_only",
environment_name=config.modal_environment)
sentry_trace = None
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(medias, sentry_trace, webhook=body.webhook)
return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@router.post("/extract-audio", summary="发起音频提取任务", )
async def extract_audio(body: FFMPEGExtractAudioRequest,
headers: Annotated[SentryTransactionHeader, Header()]) -> ModalTaskResponse:

View File

@ -913,6 +913,63 @@ class VideoUtils:
image_metadata = VideoUtils.ffprobe_media_metadata(output_path)
return output_path, image_metadata
@staticmethod
async def ffmpeg_concat_medias_video_only(media_paths: List[str],
target_width: int = 1080,
target_height: int = 1920,
output_path: Optional[str] = None) -> Tuple[str, VideoMetadata]:
"""
将待处理的视频合并为一个视频仅处理视频流忽略音频
:param media_paths: 待合并的多个视频文件路径
:param target_width: 输出的视频分辨率宽
:param target_height: 输出的视频分辨率高
:param output_path: 指定输出视频路径
:return: 最终合并结果路径最终合并结果时长
"""
total_videos = len(media_paths)
if total_videos == 0:
raise ValueError("没有可以合并的视频源")
if not output_path:
output_path = FileUtils.file_path_extend(media_paths[0], "concat_video_only")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
filter_complex = []
for input_path in media_paths:
ffmpeg_cmd.input(input_path)
# 2. 统一所有视频的格式、分辨率和帧率(仅处理视频流)
for i in range(total_videos):
filter_complex.append(
# 先缩放到统一分辨率,然后设置帧率和格式
f"[{i}:v]scale={target_width}:{target_height}:force_original_aspect_ratio=decrease,"
f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2,"
f"setsar=1:1," # 强制设置SAR
f"fps=30,format=yuv420p[v{i}]"
)
# 3. 准备处理后的视频流的连接字符串(仅视频流)
video_streams = "".join(f"[v{i}]" for i in range(total_videos))
# 4. 使用concat过滤器合并视频仅视频流
filter_complex.append(
f"{video_streams}concat=n={total_videos}:v=1:a=0[vconcated]"
)
ffmpeg_cmd.output(
output_path,
{
"filter_complex": ";".join(filter_complex),
"map": "[vconcated]", # 仅映射视频流
"vcodec": "libx264",
"crf": 16,
"r": 30,
# 不处理音频编码相关参数
},
)
await ffmpeg_cmd.execute()
video_metadata = VideoUtils.ffprobe_media_metadata(output_path)
return output_path, video_metadata
@staticmethod
async def ffmpeg_stream_record_as_hls(stream_url: str,
segments_output_dir: str,

View File

@ -0,0 +1,52 @@
import modal
from ..ffmpeg_app import ffmpeg_worker_image, app, config, s3_mount, local_copy_to_s3, output_path_prefix
with ffmpeg_worker_image.imports():
from BowongModalFunctions.models.requests.models import MediaSourcesRequest
from BowongModalFunctions.models.ffmpeg_tasks.models import SentryTransactionInfo, WebhookNotify, FFMPEGResult
from BowongModalFunctions.utils.PathUtils import FileUtils
from BowongModalFunctions.utils.SentryUtils import SentryUtils
from BowongModalFunctions.utils.VideoUtils import VideoUtils
import sentry_sdk
from typing import Optional, Tuple
from modal import current_function_call_id
@app.function(
timeout=1800,
cloud="aws",
# region='ap-northeast',
max_containers=config.ffmpeg_worker_concurrency,
volumes={
s3_mount: modal.CloudBucketMount(
bucket_name=config.S3_bucket_name,
secret=modal.Secret.from_name("aws-s3-secret",
environment_name=config.modal_environment),
),
}, )
@modal.concurrent(max_inputs=1)
async def ffmpeg_concat_medias_video_only(medias: MediaSourcesRequest,
sentry_trace: Optional[SentryTransactionInfo] = None,
webhook: Optional[WebhookNotify] = None) -> Tuple[
FFMPEGResult, Optional[SentryTransactionInfo]]:
fn_id = current_function_call_id()
@SentryUtils.sentry_tracker(name="视频合并任务(仅视频流)", op="ffmpeg.concat_video_only", fn_id=fn_id,
sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None,
sentry_baggage=sentry_trace.x_baggage if sentry_trace else None)
@SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id)
async def ffmpeg_process(media_sources: MediaSourcesRequest, output_filepath: str) -> FFMPEGResult:
input_videos = [f"{s3_mount}/{media_source.cache_filepath}" for media_source in media_sources.inputs]
local_output_path, metadata = await VideoUtils.ffmpeg_concat_medias_video_only(media_paths=input_videos,
output_path=output_filepath)
s3_outputs = local_copy_to_s3([local_output_path])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=FileUtils.get_file_size(local_output_path), )
output_path = f"{output_path_prefix}/{config.modal_environment}/concat_video_only/outputs/{fn_id}/output.mp4"
result = await ffmpeg_process(media_sources=medias, output_filepath=output_path)
if not sentry_trace:
sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(),
x_baggage=sentry_sdk.get_baggage())
return result, sentry_trace