FFMPEGSliceOptions 新增输出视频的宽高参数; limit_size换为使用bytes字节数以确保ffmpeg fs命令生效

This commit is contained in:
shuohigh@gmail.com 2025-06-16 12:04:32 +08:00
parent 86daaf46b0
commit f1ed7f0da9
3 changed files with 66 additions and 231 deletions

View File

@ -44,6 +44,8 @@ class FFMPEGSliceOptions(BaseModel):
crf: int = Field(default=16, description="输出视频的质量")
fps: int = Field(default=30, description="输出视频的FPS")
width: int = Field(default=1080, description="输出视频的宽(像素)")
height: int = Field(default=1920, description="输出视频的高(像素)")
@computed_field(description="解析为字符表达式的文件大小")
@property
@ -431,19 +433,42 @@ class VideoUtils:
@staticmethod
async def ffmpeg_slice_media(media_path: str, media_markers: List[FFMpegSliceSegment],
options: FFMPEGSliceOptions,
options: FFMPEGSliceOptions, is_streams: bool = False,
output_path: Optional[str] = None) -> List[Tuple[str, VideoMetadata]]:
"""
使用本地视频文件按时间段切割出分段视频
使用本地视频文件按时间段切割出分段视频, 如果是直播流则按时间分段切割HLS视频流_预先多线程下载所有ts
:param media_path: 本地视频路径
:param media_markers: 分段起始结束时间标记
:param options: 输出切割质量选项
:param is_streams: 输入是否为直播流
:param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4 _2.mp4等片段编号
:return: 输出片段的本地路径
"""
ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
ffmpeg_cmd.input(media_path)
if not is_streams:
ffmpeg_cmd.input(media_path)
else:
seek_head = media_markers[0].start.total_seconds()
seek_tail = media_markers[-1].end.total_seconds()
duration = seek_tail - seek_head
logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s")
local_m3u8_path, temp_dir = await VideoUtils.convert_m3u8_to_local_source(media_path, head=seek_head,
tail=seek_tail)
logger.info(f"local_playlist: {local_m3u8_path}")
for segment in media_markers:
segment.start = segment.start - timedelta(seconds=seek_head)
segment.end = segment.end - timedelta(seconds=seek_head)
logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s")
ffmpeg_cmd.input(media_path,
ss=seek_head,
t=duration,
protocol_whitelist="file,http,https,tcp,tls",
reconnect="1", # 自动重连
reconnect_streamed="1",
reconnect_delay_max="5")
filter_complex: List[str] = []
temp_outputs: List[str] = []
if not output_path:
@ -452,12 +477,21 @@ class VideoUtils:
metadata = VideoUtils.ffprobe_media_metadata(media_path)
for index, marker in enumerate(media_markers):
filter_complex.extend(
[
f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]",
f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]",
]
)
# 处理指定的输出分辨率
if options.width and options.height:
filter_complex.extend(
[
f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},scale={options.width}:{options.height},setpts=PTS-STARTPTS[cut{index}]",
f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]",
]
)
else:
filter_complex.extend(
[
f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]",
f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]",
]
)
ffmpeg_cmd.option('filter_complex', ';'.join(filter_complex))
diff_tolerance = 0.001
@ -488,7 +522,7 @@ class VideoUtils:
"r": options.fps
}
if options.limit_size:
ffmpeg_options["fs"] = options.pretty_limit_size
ffmpeg_options["fs"] = options.limit_size
elif options.bit_rate:
ffmpeg_options["b:v"] = options.pretty_bit_rate
@ -500,199 +534,6 @@ class VideoUtils:
temp_outputs]
return outputs
@staticmethod
async def ffmpeg_slice_stream_media(media_path: str,
media_markers: List[FFMpegSliceSegment],
output_path: Optional[str] = None) -> List[Tuple[str, VideoMetadata]]:
"""
按时间分段切割HLS视频流
:param media_path: hls manifest URL
:param media_markers: 分段起始结束时间标记
:param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4 _2.mp4等片段编号
:return: 输出片段的本地路径 输出片段时长
"""
import m3u8
playlist = m3u8.load(media_path)
stream_total_duration: float = sum(segment.duration for segment in playlist.segments)
seek_head = media_markers[0].start.total_seconds()
seek_tail = media_markers[-1].end.total_seconds()
duration = seek_tail - seek_head
logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s")
ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
# ffmpeg_cmd.option('loglevel', 'debug')
ffmpeg_cmd.input(media_path,
ss=seek_head,
t=duration,
protocol_whitelist="file,http,https,tcp,tls",
reconnect="1", # 自动重连
reconnect_streamed="1",
reconnect_delay_max="5")
filter_complex: List[str] = []
temp_outputs: List[str] = []
if not output_path:
output_path = FileUtils.file_path_extend(media_path, "slice")
if not output_path.endswith(".mp4"):
output_path = output_path + ".mp4"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
for index, marker in enumerate(media_markers):
if marker.start.total_seconds() > stream_total_duration or marker.start.total_seconds() < 0:
raise ValueError(
f"{index}个切割点起始点{marker.start.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围")
if marker.end.total_seconds() > stream_total_duration or marker.end.total_seconds() < 0:
raise ValueError(
f"{index}个切割点结束点{marker.end.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围")
filter_complex.extend(
[
f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]",
f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]",
]
)
ffmpeg_cmd.option('filter_complex', ';'.join(filter_complex))
for i, marker in enumerate(media_markers):
output_filepath = FileUtils.file_path_extend(output_path, str(i))
ffmpeg_cmd.output(output_filepath,
map=[f"[cut{i}]", f"[acut{i}]"],
reset_timestamps="1",
sc_threshold="0",
g="1",
force_key_frames="expr:gte(t,n_forced*1)",
vcodec="libx264",
acodec="aac",
crf=16,
r=30, )
temp_outputs.append(output_filepath)
await ffmpeg_cmd.execute()
outputs: List[Tuple[str, VideoMetadata]] = [(output, VideoUtils.ffprobe_media_metadata(output)) for output in
temp_outputs]
return outputs
@staticmethod
async def ffmpeg_slice_stream_media_multithread(media_path: str,
options: FFMPEGSliceOptions,
media_markers: List[FFMpegSliceSegment],
output_path: Optional[str] = None) -> List[
Tuple[str, VideoMetadata]]:
"""
按时间分段切割HLS视频流_预先多线程下载所有ts
:param media_path: hls manifest URL
:param media_markers: 分段起始结束时间标记
:param output_path: 最终输出文件路径, 片段会根据指定路径附加_1.mp4 _2.mp4等片段编号
:return: 输出片段的本地路径 输出片段时长
"""
seek_head = media_markers[0].start.total_seconds()
seek_tail = media_markers[-1].end.total_seconds()
duration = seek_tail - seek_head
logger.info(f"Only using {seek_head}s --> {seek_tail}s = {duration}s")
local_m3u8_path, temp_dir = await VideoUtils.convert_m3u8_to_local_source(media_path, head=seek_head,
tail=seek_tail)
logger.info(f"local_playlist: {local_m3u8_path}")
stream_total_duration = duration
for segment in media_markers:
segment.start = segment.start - timedelta(seconds=seek_head)
segment.end = segment.end - timedelta(seconds=seek_head)
ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
ffmpeg_cmd.input(local_m3u8_path,
protocol_whitelist="file,http,https,tcp,tls")
filter_complex: List[str] = []
temp_outputs: List[str] = []
if not output_path:
output_path = FileUtils.file_path_extend(media_path, "slice")
if not output_path.endswith(".mp4"):
output_path = output_path + ".mp4"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
diff_tolerance = 0.001
for index, marker in enumerate(media_markers):
if marker.start.total_seconds() > stream_total_duration or marker.start.total_seconds() < 0:
raise ValueError(
f"{index}个切割点起始点{marker.start.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围")
if marker.end.total_seconds() > stream_total_duration or marker.end.total_seconds() < 0:
if marker.end.total_seconds() > 0 and math.isclose(marker.end.total_seconds(), stream_total_duration,
rel_tol=diff_tolerance):
marker.end = TimeDelta(seconds=stream_total_duration)
logger.warning(
f"{index}个切割点结束点{marker.end.total_seconds()}s接近视频时长[0-{stream_total_duration}s]范围")
filter_complex.extend(
[
f"[v:0]trim=start={marker.start.total_seconds()},setpts=PTS-STARTPTS[cut{index}]",
f"[a:0]atrim=start={marker.start.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]",
]
)
else:
raise ValueError(
f"{index}个切割点结束点{marker.end.total_seconds()}s超出视频时长[0-{stream_total_duration}s]范围")
else:
filter_complex.extend(
[
f"[v:0]trim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},setpts=PTS-STARTPTS[cut{index}]",
f"[a:0]atrim=start={marker.start.total_seconds()}:end={marker.end.total_seconds()},asetpts=PTS-STARTPTS[acut{index}]",
]
)
ffmpeg_cmd.option('filter_complex', ';'.join(filter_complex))
for i, marker in enumerate(media_markers):
output_filepath = FileUtils.file_path_extend(output_path, str(i))
ffmpeg_options = {
"map": [f"[cut{i}]", f"[acut{i}]"],
"reset_timestamps": "1",
"sc_threshold": "0",
"g": "1",
"force_key_frames": "expr:gte(t,n_forced*1)",
"vcodec": "libx264",
"acodec": "aac",
"crf": options.crf,
"r": options.fps
}
if options.limit_size:
ffmpeg_options["fs"] = options.pretty_limit_size
elif options.bit_rate:
ffmpeg_options["b:v"] = options.pretty_bit_rate
ffmpeg_cmd.output(output_filepath, options=ffmpeg_options)
temp_outputs.append(output_filepath)
await ffmpeg_cmd.execute()
VideoUtils.purge_temp_ts_dir(temp_dir)
outputs: List[Tuple[str, VideoMetadata]] = [(output, VideoUtils.ffprobe_media_metadata(output)) for output in
temp_outputs]
return outputs
@staticmethod
async def ffmpeg_convert_stream_media(media_stream_url: str,
output_path: Optional[str] = None) -> Tuple[str, VideoMetadata]:
if not output_path:
output_path = FileUtils.file_path_extend(media_stream_url, "convert")
if not output_path.endswith(".mp4"):
output_path = output_path + ".mp4"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
ffmpeg_cmd.input(media_stream_url,
protocol_whitelist="file,http,https,tcp,tls",
reconnect="1", # 自动重连
reconnect_streamed="1",
reconnect_delay_max="5")
ffmpeg_cmd.output(output_path,
reset_timestamps="1",
sc_threshold="0",
g="1",
force_key_frames="expr:gte(t,n_forced*1)",
vcodec="libx264",
acodec="aac",
crf=16,
r=30, )
await ffmpeg_cmd.execute()
output: Tuple[str, VideoMetadata] = (output_path, VideoUtils.ffprobe_media_metadata(output_path))
return output
@staticmethod
async def async_download_file(url: str, output_path: Optional[str] = None) -> str | None | Any:
t = 10
@ -770,8 +611,8 @@ class VideoUtils:
logger.exception(e)
@staticmethod
async def ffmpeg_convert_stream_media_multithread(media_stream_url: str, options: FFMPEGSliceOptions,
output_path: Optional[str] = None) -> tuple[
async def ffmpeg_convert_stream_media(media_stream_url: str, options: FFMPEGSliceOptions,
output_path: Optional[str] = None) -> tuple[
str, VideoMetadata] | None:
if not output_path:
output_path = FileUtils.file_path_extend(media_stream_url, "convert")

View File

@ -45,7 +45,7 @@ with ffmpeg_worker_image.imports():
raise NotImplementedError(f"暂不支持使用{media_stream.protocol.value}协议")
output_path = f"{output_path_prefix}/{config.modal_environment}/convert_stream/{func_id}/output.mp4"
local_output, metadata = await VideoUtils.ffmpeg_convert_stream_media_multithread(
local_output, metadata = await VideoUtils.ffmpeg_convert_stream_media(
media_stream_url=stream_url,
options=options,
output_path=output_path)

View File

@ -37,53 +37,47 @@ with ffmpeg_worker_image.imports():
List[FFMPEGResult], Optional[SentryTransactionInfo]]:
fn_id = current_function_call_id()
@SentryUtils.sentry_tracker(name="视频文件切割任务", op="ffmpeg.slice.media", fn_id=fn_id,
@SentryUtils.sentry_tracker(name="视频文件/直播切割任务", op="ffmpeg.slice.media", fn_id=fn_id,
sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None,
sentry_baggage=sentry_trace.x_baggage if sentry_trace else None)
@SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id)
async def ffmpeg_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment],
fn_id: str, options: FFMPEGSliceOptions = None,
fn_id: str, options: FFMPEGSliceOptions = None, is_streams: bool = False,
) -> List[FFMPEGResult]:
cache_filepath = f"{s3_mount}/{media_source.cache_filepath}"
logger.info(f"{media_source.urn}切割")
for i, marker in enumerate(media_markers):
logger.info(f"[{i}] {marker.start.toFormatStr()} --> {marker.end.toFormatStr()}")
if not is_streams:
cache_filepath = f"{s3_mount}/{media_source.cache_filepath}"
logger.info(f"{media_source.urn}切割")
for i, marker in enumerate(media_markers):
logger.info(f"[{i}] {marker.start.toFormatStr()} --> {marker.end.toFormatStr()}")
else:
cache_filepath = media_source.path
segments = await VideoUtils.ffmpeg_slice_media(media_path=cache_filepath,
media_markers=media_markers,
options=options,
is_streams=is_streams,
output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4")
return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1],
content_length=FileUtils.get_file_size(segment[0])) for segment in segments]
@SentryUtils.sentry_tracker(name="直播视频切割任务", op="ffmpeg.slice.stream", fn_id=fn_id,
sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None,
sentry_baggage=sentry_trace.x_baggage if sentry_trace else None)
@SentryUtils.webhook_handler(webhook=webhook, func_id=fn_id)
async def ffmpeg_hls_slice_process(media_source: MediaSource, media_markers: List[FFMpegSliceSegment],
fn_id: str, options: FFMPEGSliceOptions, ) -> List[FFMPEGResult]:
hls_m3u8_url = media_source.path
segments = await VideoUtils.ffmpeg_slice_stream_media_multithread(media_path=hls_m3u8_url,
options=options,
media_markers=media_markers,
output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4")
return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1],
content_length=FileUtils.get_file_size(segment[0])) for segment in segments]
match media.protocol:
case MediaProtocol.hls:
outputs = await ffmpeg_hls_slice_process(media_source=media,
media_markers=markers,
options=options,
fn_id=fn_id)
outputs = await ffmpeg_slice_process(media_source=media,
media_markers=markers,
options=options,
is_streams=True,
fn_id=fn_id)
case MediaProtocol.vod:
outputs = await ffmpeg_slice_process(media_source=media,
media_markers=markers,
options=options,
is_streams=False,
fn_id=fn_id)
case MediaProtocol.s3:
outputs = await ffmpeg_slice_process(media_source=media,
media_markers=markers,
options=options,
is_streams=False,
fn_id=fn_id)
case _: # webhook不会报错需要确认
raise NotImplementedError("暂不支持的协议")