fix 一些webhook相关的bug,添加了更详细的接口说明

This commit is contained in:
shuohigh@gmail.com 2025-06-05 14:24:30 +08:00
parent 369a7f957f
commit 818d3e5595
3 changed files with 88 additions and 36 deletions

View File

@ -204,7 +204,9 @@ async def video_extract_frame(body: FFMPEGExtractFrameRequest,
return ModalTaskResponse(success=True, taskId=fn_call.object_id) return ModalTaskResponse(success=True, taskId=fn_call.object_id)
@router.post("/record/hls", summary="发起直播录制为HLS任务", ) @router.post("/record/hls", summary="发起直播录制为HLS任务",
description="录制任务最长存在时间为12小时, 录制任务完成后所获得的hls视频资源只存在3天如需持久存储请通过使用manifest_urn发起'直播流转换任务'转换为mp4存储",
response_description="返回发起的任务id和成功录制完第一个片段后通过CDN缓存获得的m3u8播放列表")
async def stream_record_vod(body: FFMPEGStreamRecordRequest, async def stream_record_vod(body: FFMPEGStreamRecordRequest,
headers: Annotated[SentryTransactionHeader, Header()]) -> RecordingTaskResponse: headers: Annotated[SentryTransactionHeader, Header()]) -> RecordingTaskResponse:
fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_stream_record_as_hls", fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_stream_record_as_hls",

View File

@ -1,4 +1,6 @@
import os import os
from pathlib import Path
from typing import List
class FileUtils: class FileUtils:
@ -27,3 +29,22 @@ class FileUtils:
filenames[-1] = extension filenames[-1] = extension
filename = ".".join(filenames) filename = ".".join(filenames)
return os.path.join(media_dir, filename) return os.path.join(media_dir, filename)
@staticmethod
def get_folder_size(folder_path: str) -> int:
total_size = 0
for path in Path(folder_path).rglob('*'):
if path.is_file():
total_size += path.stat().st_size
return total_size
@staticmethod
def get_file_size(file_path: str) -> int:
return os.path.getsize(file_path)
@staticmethod
def get_files_size(files: List[str]) -> int:
total_size = 0
for file in files:
total_size += os.path.getsize(file)
return total_size

View File

@ -92,7 +92,7 @@ with ffmpeg_worker_image.imports():
output_path=output_filepath) output_path=output_filepath)
s3_outputs = local_copy_to_s3([local_output_path]) s3_outputs = local_copy_to_s3([local_output_path])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=os.path.getsize(local_output_path), ) content_length=FileUtils.get_file_size(local_output_path), )
output_path = f"{output_path_prefix}/{config.modal_environment}/concat/outputs/{fn_id}/output.mp4" output_path = f"{output_path_prefix}/{config.modal_environment}/concat/outputs/{fn_id}/output.mp4"
result = await ffmpeg_process(media_sources=medias, output_filepath=output_path) result = await ffmpeg_process(media_sources=medias, output_filepath=output_path)
@ -136,7 +136,7 @@ with ffmpeg_worker_image.imports():
media_markers=media_markers, media_markers=media_markers,
output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4")
return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1],
content_length=os.path.getsize(segment[0])) for segment in segments] content_length=FileUtils.get_file_size(segment[0])) for segment in segments]
@SentryUtils.sentry_tracker(name="直播视频切割任务", op="ffmpeg.slice.stream", fn_id=fn_id, @SentryUtils.sentry_tracker(name="直播视频切割任务", op="ffmpeg.slice.stream", fn_id=fn_id,
sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None, sentry_trace_id=sentry_trace.x_trace_id if sentry_trace else None,
@ -153,7 +153,7 @@ with ffmpeg_worker_image.imports():
media_markers=media_markers, media_markers=media_markers,
output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4") output_path=f"{output_path_prefix}/{config.modal_environment}/slice/outputs/{fn_id}/output.mp4")
return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1], return [FFMPEGResult(urn=local_copy_to_s3([segment[0]])[0], metadata=segment[1],
content_length=os.path.getsize(segment[0])) for segment in segments] content_length=FileUtils.get_file_size(segment[0])) for segment in segments]
match media.protocol: match media.protocol:
case MediaProtocol.hls: case MediaProtocol.hls:
@ -202,7 +202,8 @@ with ffmpeg_worker_image.imports():
output_path = f"{output_path_prefix}/{config.modal_environment}/extract_audio/outputs/{fn_id}/output.wav" output_path = f"{output_path_prefix}/{config.modal_environment}/extract_audio/outputs/{fn_id}/output.wav"
output_path, metadata = await VideoUtils.ffmpeg_extract_audio_async(cache_filepath, output_path) output_path, metadata = await VideoUtils.ffmpeg_extract_audio_async(cache_filepath, output_path)
s3_outputs = local_copy_to_s3([output_path]) s3_outputs = local_copy_to_s3([output_path])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(output_path), ) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=FileUtils.get_file_size(output_path), )
result = await ffmpeg_process(media_source, fn_id=fn_id) result = await ffmpeg_process(media_source, fn_id=fn_id)
return result, sentry_trace return result, sentry_trace
@ -244,7 +245,7 @@ with ffmpeg_worker_image.imports():
s3_outputs = local_copy_to_s3([local_output_filepath]) s3_outputs = local_copy_to_s3([local_output_filepath])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=os.path.getsize(local_output_filepath), ) content_length=FileUtils.get_file_size(local_output_filepath), )
result = await ffmpeg_process(media=media, mirror_scale_down_size=mirror_scale_down_size, func_id=fn_id, result = await ffmpeg_process(media=media, mirror_scale_down_size=mirror_scale_down_size, func_id=fn_id,
mirror_from_right=mirror_from_right, mirror_position=mirror_position) mirror_from_right=mirror_from_right, mirror_position=mirror_position)
@ -283,7 +284,7 @@ with ffmpeg_worker_image.imports():
overlay_gif_path=gif_filepath) overlay_gif_path=gif_filepath)
s3_outputs = local_copy_to_s3([local_output_filepath]) s3_outputs = local_copy_to_s3([local_output_filepath])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=os.path.getsize(local_output_filepath), ) content_length=FileUtils.get_file_size(local_output_filepath), )
result = await ffmpeg_process(media=media, func_id=fn_id, gif=gif) result = await ffmpeg_process(media=media, func_id=fn_id, gif=gif)
if not sentry_trace: if not sentry_trace:
@ -322,7 +323,7 @@ with ffmpeg_worker_image.imports():
zoom=zoom) zoom=zoom)
s3_outputs = local_copy_to_s3([local_output_filepath]) s3_outputs = local_copy_to_s3([local_output_filepath])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=os.path.getsize(local_output_filepath), ) content_length=FileUtils.get_file_size(local_output_filepath), )
result = await ffmpeg_process(media=media, duration=duration, zoom=zoom, func_id=fn_id) result = await ffmpeg_process(media=media, duration=duration, zoom=zoom, func_id=fn_id)
if not sentry_trace: if not sentry_trace:
@ -372,7 +373,7 @@ with ffmpeg_worker_image.imports():
s3_outputs = local_copy_to_s3([local_output_filepath]) s3_outputs = local_copy_to_s3([local_output_filepath])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=os.path.getsize(local_output_filepath), ) content_length=FileUtils.get_file_size(local_output_filepath), )
result = await ffmpeg_process(video=media, bgm=bgm, video_volume=video_volume, func_id=fn_id, result = await ffmpeg_process(video=media, bgm=bgm, video_volume=video_volume, func_id=fn_id,
music_volume=music_volume, noise_sample=noise_sample) music_volume=music_volume, noise_sample=noise_sample)
@ -418,7 +419,8 @@ with ffmpeg_worker_image.imports():
subtitle_path=subtitle_path, subtitle_path=subtitle_path,
font_dir=font_dir, output_path=output_path) font_dir=font_dir, output_path=output_path)
s3_outputs = local_copy_to_s3([local_output]) s3_outputs = local_copy_to_s3([local_output])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=FileUtils.get_file_size(local_output), )
result = await ffmpeg_process(video=media, subtitle=subtitle, fonts=fonts, func_id=fn_id) result = await ffmpeg_process(video=media, subtitle=subtitle, fonts=fonts, func_id=fn_id)
if not sentry_trace: if not sentry_trace:
@ -455,7 +457,8 @@ with ffmpeg_worker_image.imports():
audio_path=audio_path, audio_path=audio_path,
output_path=f"{output_path_prefix}/{config.modal_environment}/loop_fill/{func_id}/output.mp4") output_path=f"{output_path_prefix}/{config.modal_environment}/loop_fill/{func_id}/output.mp4")
s3_outputs = local_copy_to_s3([local_output]) s3_outputs = local_copy_to_s3([local_output])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=FileUtils.get_file_size(local_output), )
result = await ffmpeg_process(media=media, audio=audio, func_id=fn_id) result = await ffmpeg_process(media=media, audio=audio, func_id=fn_id)
@ -500,7 +503,8 @@ with ffmpeg_worker_image.imports():
output_path=output_path) output_path=output_path)
s3_outputs = local_copy_to_s3([local_output]) s3_outputs = local_copy_to_s3([local_output])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=FileUtils.get_file_size(local_output), )
result = await ffmpeg_process(media_stream=media, func_id=fn_id) result = await ffmpeg_process(media_stream=media, func_id=fn_id)
@ -546,7 +550,8 @@ with ffmpeg_worker_image.imports():
frame_index=frame_index, frame_index=frame_index,
output_path=output_path) output_path=output_path)
s3_outputs = local_copy_to_s3([local_output]) s3_outputs = local_copy_to_s3([local_output])
return FFMPEGResult(urn=s3_outputs[0], metadata=metadata, content_length=os.path.getsize(local_output), ) return FFMPEGResult(urn=s3_outputs[0], metadata=metadata,
content_length=FileUtils.get_file_size(local_output), )
result = await ffmpeg_process(media=media, frame_index=frame_index, func_id=fn_id) result = await ffmpeg_process(media=media, frame_index=frame_index, func_id=fn_id)
if not sentry_trace: if not sentry_trace:
@ -566,10 +571,11 @@ with ffmpeg_worker_image.imports():
environment_name=config.modal_environment), environment_name=config.modal_environment),
), ),
}, ) }, )
@modal.concurrent(max_inputs=1) @modal.concurrent(max_inputs=5)
async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int, async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int,
webhook: Optional[WebhookNotify] = None, webhook: Optional[WebhookNotify] = None,
sentry_trace: Optional[SentryTransactionInfo] = None, ): sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[
FFMPEGResult, Optional[SentryTransactionInfo]]:
fn_id = current_function_call_id() fn_id = current_function_call_id()
output_dir = f"{config.modal_environment}/records/hls/{fn_id}" output_dir = f"{config.modal_environment}/records/hls/{fn_id}"
recording_output_dir = f"{s3_mount}/{output_dir}" recording_output_dir = f"{s3_mount}/{output_dir}"
@ -588,8 +594,14 @@ with ffmpeg_worker_image.imports():
class PlaylistEventHandler(FileSystemEventHandler): class PlaylistEventHandler(FileSystemEventHandler):
update_counter: int = 0 update_counter: int = 0
fn_id: str
webhook: Optional[WebhookNotify] = None webhook: Optional[WebhookNotify] = None
def __init__(self, fn_id: str, webhook: Optional[WebhookNotify] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.webhook = webhook
self.fn_id = fn_id
def on_created(self, event: Union[FileCreatedEvent, DirCreatedEvent]) -> None: def on_created(self, event: Union[FileCreatedEvent, DirCreatedEvent]) -> None:
logger.info(f"[dir={event.is_directory}](created)@{event.src_path}") logger.info(f"[dir={event.is_directory}](created)@{event.src_path}")
@ -615,11 +627,17 @@ with ffmpeg_worker_image.imports():
f.write(playlist.dumps()) f.write(playlist.dumps())
logger.info(f"live_playlist file written to {live_playlist}") logger.info(f"live_playlist file written to {live_playlist}")
self.update_counter += 1 self.update_counter += 1
if self.webhook: if self.webhook and self.update_counter == 1:
self.webhook_on_start() logger.info("[Start] webhook trigger")
try:
self.webhook_on_start()
except Exception as e:
logger.exception(e)
else:
logger.info("[Start] no webhook")
@backoff.on_exception(exception=Exception, wait_gen=backoff.constant, @backoff.on_exception(exception=Exception, wait_gen=backoff.constant,
max_time=5, max_tries=5, raise_on_giveup=False) max_time=5, max_tries=5, raise_on_giveup=True)
def webhook_on_start(self): def webhook_on_start(self):
""" """
开始录制第一次更新时回调 开始录制第一次更新时回调
@ -627,12 +645,13 @@ with ffmpeg_worker_image.imports():
webhook = self.webhook webhook = self.webhook
if webhook.method is not WebhookMethodEnum.POST: if webhook.method is not WebhookMethodEnum.POST:
logger.warning(f"webhook method {webhook.method.value} not supported") logger.warning(f"webhook method {webhook.method.value} not supported")
fn_id = current_function_call_id() body = BaseFFMPEGTaskStatusResponse(taskId=self.fn_id,
task_type="ffmpeg_stream_record_as_hls",
status=TaskStatus.running).model_dump()
response = httpx.post(url=webhook.endpoint.__str__(), response = httpx.post(url=webhook.endpoint.__str__(),
json=BaseFFMPEGTaskStatusResponse(taskId=fn_id, json=body,
task_type="ffmpeg_stream_record_as_hls",
status=TaskStatus.running).model_dump_json(),
headers=webhook.headers) headers=webhook.headers)
response.raise_for_status()
logger.info(f"[Start] webhook {response.status_code} {response.text}") logger.info(f"[Start] webhook {response.status_code} {response.text}")
async def ffmpeg_process(stream_url: str, async def ffmpeg_process(stream_url: str,
@ -650,33 +669,33 @@ with ffmpeg_worker_image.imports():
@backoff.on_exception(exception=Exception, wait_gen=backoff.constant, @backoff.on_exception(exception=Exception, wait_gen=backoff.constant,
max_time=5, max_tries=5, raise_on_giveup=False) max_time=5, max_tries=5, raise_on_giveup=False)
async def webhook_on_end(webhook: WebhookNotify): async def webhook_on_end(fn_id: str, webhook: WebhookNotify, result: FFMPEGResult):
if webhook.method is not WebhookMethodEnum.POST: if webhook.method is not WebhookMethodEnum.POST:
logger.warning(f"webhook method {webhook.method.value} not supported") logger.warning(f"webhook method {webhook.method.value} not supported")
fn_id = current_function_call_id() body = BaseFFMPEGTaskStatusResponse(taskId=fn_id,
task_type="ffmpeg_stream_record_as_hls",
status=TaskStatus.success,
results=[result])
response = httpx.post(url=webhook.endpoint.__str__(), response = httpx.post(url=webhook.endpoint.__str__(),
json=BaseFFMPEGTaskStatusResponse(taskId=fn_id, json=body.model_dump(),
task_type="ffmpeg_stream_record_as_hls",
status=TaskStatus.success).model_dump_json(),
headers=webhook.headers) headers=webhook.headers)
logger.info(f"[End] webhook {response.status_code} {response.text}") logger.info(f"[End] webhook {response.status_code} {response.text}")
@backoff.on_exception(exception=Exception, wait_gen=backoff.constant, @backoff.on_exception(exception=Exception, wait_gen=backoff.constant,
max_time=5, max_tries=5, raise_on_giveup=False) max_time=5, max_tries=5, raise_on_giveup=False)
async def webhook_on_error(webhook: WebhookNotify, error: str): async def webhook_on_error(fn_id: str, webhook: WebhookNotify, error: str):
if webhook.method is not WebhookMethodEnum.POST: if webhook.method is not WebhookMethodEnum.POST:
logger.warning(f"webhook method {webhook.method.value} not supported") logger.warning(f"webhook method {webhook.method.value} not supported")
fn_id = current_function_call_id() body = BaseFFMPEGTaskStatusResponse(taskId=fn_id,
task_type="ffmpeg_stream_record_as_hls",
error=error,
status=TaskStatus.failed).model_dump()
response = httpx.post(url=webhook.endpoint.__str__(), response = httpx.post(url=webhook.endpoint.__str__(),
json=BaseFFMPEGTaskStatusResponse(taskId=fn_id, json=body,
task_type="ffmpeg_stream_record_as_hls",
error=error,
status=TaskStatus.failed).model_dump_json(),
headers=webhook.headers) headers=webhook.headers)
logger.info(f"[End] webhook {response.status_code} {response.text}") logger.info(f"[End] webhook {response.status_code} {response.text}")
playlist_handler = PlaylistEventHandler() playlist_handler = PlaylistEventHandler(webhook=webhook, fn_id=fn_id)
playlist_handler.webhook = webhook
playlist_observer = Observer() playlist_observer = Observer()
os.makedirs(manifest_output_dir, exist_ok=True) os.makedirs(manifest_output_dir, exist_ok=True)
playlist_observer.schedule(playlist_handler, playlist_observer.schedule(playlist_handler,
@ -695,8 +714,18 @@ with ffmpeg_worker_image.imports():
playlist_observer.stop() playlist_observer.stop()
if webhook: if webhook:
await webhook_on_error(webhook=webhook, await webhook_on_error(webhook=webhook,
fn_id=fn_id,
error=e.message if hasattr(e, 'message') else str(e)) error=e.message if hasattr(e, 'message') else str(e))
raise e raise e
playlist_observer.stop() playlist_observer.stop()
content_length = FileUtils.get_folder_size(recording_output_dir)
metadata = VideoUtils.ffprobe_media_metadata(media_path=f"{manifest_output_dir}/playlist.m3u8")
result = FFMPEGResult(urn=f"s3://{output_dir}/playlist.m3u8",
metadata=metadata,
content_length=content_length, )
if webhook: if webhook:
await webhook_on_end(webhook=webhook) await webhook_on_end(webhook=webhook, fn_id=fn_id, result=result)
if not sentry_trace:
sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(),
x_baggage=sentry_sdk.get_baggage())
return result, sentry_trace