fix : hls录制缓存读写问题

This commit is contained in:
shuohigh@gmail.com 2025-06-09 12:37:06 +08:00
parent d0fee9c6e3
commit cdf7ccc697
4 changed files with 67 additions and 84 deletions

View File

@ -428,3 +428,4 @@ class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest):
stream_source: str = Field(description="直播源地址") stream_source: str = Field(description="直播源地址")
segment_duration: int = Field(default=5, description="hls片段时长(秒)") segment_duration: int = Field(default=5, description="hls片段时长(秒)")
recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)") recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)")
monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时")

View File

@ -217,6 +217,7 @@ async def stream_record_vod(body: FFMPEGStreamRecordRequest,
fn_call = fn.spawn(stream_url=body.stream_source, fn_call = fn.spawn(stream_url=body.stream_source,
segment_duration=body.segment_duration, segment_duration=body.segment_duration,
recording_timeout=body.recording_timeout, recording_timeout=body.recording_timeout,
monitor_timeout=body.monitor_timeout,
sentry_trace=sentry_trace, sentry_trace=sentry_trace,
webhook=body.webhook) webhook=body.webhook)
manifest_link = f"{config.S3_cdn_endpoint}/{config.modal_environment}/records/hls/{fn_call.object_id}/playlist.m3u8" manifest_link = f"{config.S3_cdn_endpoint}/{config.modal_environment}/records/hls/{fn_call.object_id}/playlist.m3u8"

View File

@ -314,7 +314,7 @@ class VideoUtils:
return output_path return output_path
@staticmethod @staticmethod
def async_ffmpeg_init(use_ffprobe: bool = False) -> AsyncFFmpeg: def async_ffmpeg_init(use_ffprobe: bool = False, quiet: bool = False) -> AsyncFFmpeg:
if use_ffprobe: if use_ffprobe:
ffmpeg_cmd = AsyncFFmpeg('ffprobe') ffmpeg_cmd = AsyncFFmpeg('ffprobe')
else: else:
@ -335,7 +335,8 @@ class VideoUtils:
@ffmpeg_cmd.on("progress") @ffmpeg_cmd.on("progress")
def on_progress(progress): def on_progress(progress):
logger.info(f"处理进度: {progress}") if not quiet:
logger.info(f"处理进度: {progress}")
@ffmpeg_cmd.on("completed") @ffmpeg_cmd.on("completed")
def on_completed(): def on_completed():
@ -343,11 +344,12 @@ class VideoUtils:
@ffmpeg_cmd.on("stderr") @ffmpeg_cmd.on("stderr")
def on_stderr(line: str): def on_stderr(line: str):
if line.startswith('Error'): if line.startswith('Error') and ".m3u8" not in line:
logger.error(line) logger.error(line)
raise RuntimeError(line) raise RuntimeError(line)
else: else:
logger.warning(line) if not quiet:
logger.warning(line)
return ffmpeg_cmd return ffmpeg_cmd
@ -946,15 +948,15 @@ class VideoUtils:
async def ffmpeg_stream_record_as_hls(stream_url: str, async def ffmpeg_stream_record_as_hls(stream_url: str,
segments_output_dir: str, segments_output_dir: str,
playlist_output_dir: str, playlist_output_dir: str,
playlist_output_method: Optional[str] = None,
playlist_output_headers: Optional[Dict[str, str]] = None,
manifest_segment_prefix: Optional[str] = None, manifest_segment_prefix: Optional[str] = None,
segment_duration: float = 5.0, segment_duration: float = 5.0,
stream_content_timeout: int = 300, stream_content_timeout: int = 300,
stream_monitor_timeout: int = 36000,
output_file_pattern: str = "%10d.ts"): output_file_pattern: str = "%10d.ts"):
os.makedirs(segments_output_dir, exist_ok=True) os.makedirs(segments_output_dir, exist_ok=True)
ffmpeg_cmd = VideoUtils.async_ffmpeg_init() ffmpeg_cmd = VideoUtils.async_ffmpeg_init(quiet=True)
# ffmpeg_cmd.option("loglevel", "debug") # ffmpeg_cmd.option("loglevel", "debug")
ffmpeg_cmd.option("t", stream_monitor_timeout)
ffmpeg_cmd.input(stream_url, ffmpeg_cmd.input(stream_url,
protocol_whitelist="file,http,https,tcp,tls", # 使用flv protocol_whitelist="file,http,https,tcp,tls", # 使用flv
reconnect="1", # 自动重连 reconnect="1", # 自动重连
@ -966,14 +968,11 @@ class VideoUtils:
f="hls", f="hls",
# flags="+cgop", # flags="+cgop",
# g=30, # g=30,
method=playlist_output_method,
headers="\\r\\n".join([f"{header_key}:{playlist_output_headers[header_key]}" for header_key in
playlist_output_headers.keys()]) if playlist_output_headers else None,
hls_time=segment_duration, hls_time=segment_duration,
hls_segment_filename=f"{segments_output_dir}/{output_file_pattern}", hls_segment_filename=f"{segments_output_dir}/{output_file_pattern}",
hls_segment_type="mpegts", hls_segment_type="mpegts",
hls_flags="independent_segments+program_date_time", hls_flags="append_list+independent_segments+program_date_time",
hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None, # hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None,
hls_playlist_type="event", hls_playlist_type="event",
hls_list_size=0, hls_list_size=0,
hls_start_number_source="epoch_us", hls_start_number_source="epoch_us",

View File

@ -560,6 +560,10 @@ with ffmpeg_worker_image.imports():
return result, sentry_trace return result, sentry_trace
hls_recording_volume = modal.Volume.from_name("stream_records", create_if_missing=True)
hls_recording_mount_point = "/mnt/stream_records"
@app.function(timeout=43200, # 最长处理12h的录制任务 @app.function(timeout=43200, # 最长处理12h的录制任务
cloud="aws", cloud="aws",
# todo: 暂时不限制最大同时存在的录制数量 # todo: 暂时不限制最大同时存在的录制数量
@ -570,27 +574,22 @@ with ffmpeg_worker_image.imports():
secret=modal.Secret.from_name("aws-s3-secret", secret=modal.Secret.from_name("aws-s3-secret",
environment_name=config.modal_environment), environment_name=config.modal_environment),
), ),
hls_recording_mount_point: hls_recording_volume
}, ) }, )
@modal.concurrent(max_inputs=5) @modal.concurrent(max_inputs=5)
async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int, async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int,
webhook: Optional[WebhookNotify] = None, webhook: Optional[WebhookNotify] = None, monitor_timeout: int = 36000,
sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[ sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[
FFMPEGResult, Optional[SentryTransactionInfo]]: FFMPEGResult, Optional[SentryTransactionInfo]]:
fn_id = current_function_call_id() fn_id = current_function_call_id()
output_dir = f"{config.modal_environment}/records/hls/{fn_id}" output_dir = f"{config.modal_environment}/records/hls/{fn_id}"
recording_output_dir = f"{s3_mount}/{output_dir}" s3_mount_output_dir = f"{s3_mount}/{output_dir}"
manifest_output_dir = f"{output_path_prefix}/{output_dir}" os.makedirs(s3_mount_output_dir, exist_ok=True)
logger.info(f"manifest = {manifest_output_dir}/playlist.m3u8") volume_output_dir = f"{hls_recording_mount_point}/{output_dir}"
manifest_segment_prefix = f"{config.S3_cdn_endpoint}/{recording_output_dir}/" os.makedirs(volume_output_dir, exist_ok=True)
logger.info(f"manifest = {volume_output_dir}/playlist.m3u8")
from watchdog.events import (FileCreatedEvent, from watchdog.events import FileMovedEvent, FileCreatedEvent, FileSystemEventHandler
FileModifiedEvent,
DirCreatedEvent,
DirModifiedEvent,
FileMovedEvent,
FileSystemEventHandler)
from watchdog.observers import Observer from watchdog.observers import Observer
import m3u8
class PlaylistEventHandler(FileSystemEventHandler): class PlaylistEventHandler(FileSystemEventHandler):
update_counter: int = 0 update_counter: int = 0
@ -602,39 +601,36 @@ with ffmpeg_worker_image.imports():
self.webhook = webhook self.webhook = webhook
self.fn_id = fn_id self.fn_id = fn_id
def on_created(self, event: Union[FileCreatedEvent, DirCreatedEvent]) -> None: def on_created(self, event: FileCreatedEvent) -> None:
logger.info(f"[dir={event.is_directory}](created)@{event.src_path}") logger.info(f"[created] {event.src_path}")
if event.src_path.endswith(".ts"):
filename = os.path.basename(event.src_path)
mount_path = f"{s3_mount_output_dir}/{filename}"
# 将Volume内的ts文件复制到S3挂载点
shutil.copy(event.src_path, mount_path)
logger.info(f"[copy] {event.src_path} -> {mount_path}")
else:
return
def on_modified(self, event: Union[FileModifiedEvent, DirModifiedEvent]) -> None: def on_moved(self, event: FileMovedEvent) -> None:
logger.info(f"[dir={event.is_directory}](modified)@{event.src_path}") logger.info(f"[moved] {event.src_path} -> {event.dest_path}")
if not event.dest_path.endswith(".m3u8"):
def on_moved(self, event: Union[DirMovedEvent, FileMovedEvent]) -> None: return
logger.info(f"[dir={event.is_directory}](moved)@{event.src_path} -> {event.dest_path}") filename = os.path.basename(event.dest_path)
if isinstance(event, FileMovedEvent): filename.replace('.tmp', '')
if event.is_directory: mount_path = f"{s3_mount_output_dir}/{filename}"
return # 将Volume内的playlist.m3u8.tmp复制到S3挂载点的playlist.m3u8文件
if not event.dest_path.endswith(".m3u8"): shutil.copy(event.dest_path, mount_path)
return logger.info(f"[copy] {event.dest_path} -> {mount_path}")
playlist = m3u8.load(event.dest_path) self.update_counter += 1
logger.info("playlist file loaded") if self.webhook and self.update_counter == 1:
for segment in playlist.segments: logger.info("[Start] webhook trigger")
new_uri = segment.uri.replace(f"{config.S3_cdn_endpoint}//mntS3/", "") try:
local_uri = os.path.basename(new_uri) self.webhook_on_start()
logger.info(f"{segment.uri} -> {local_uri}") except Exception as e:
segment.uri = local_uri logger.exception(e)
live_playlist = event.dest_path.replace(output_path_prefix, s3_mount) else:
with open(live_playlist, "w") as f: logger.info("[Start] no webhook")
f.write(playlist.dumps())
logger.info(f"live_playlist file written to {live_playlist}")
self.update_counter += 1
if self.webhook and self.update_counter == 1:
logger.info("[Start] webhook trigger")
try:
self.webhook_on_start()
except Exception as e:
logger.exception(e)
else:
logger.info("[Start] no webhook")
@backoff.on_exception(exception=Exception, wait_gen=backoff.constant, @backoff.on_exception(exception=Exception, wait_gen=backoff.constant,
max_time=5, max_tries=5, raise_on_giveup=True) max_time=5, max_tries=5, raise_on_giveup=True)
@ -654,19 +650,6 @@ with ffmpeg_worker_image.imports():
response.raise_for_status() response.raise_for_status()
logger.info(f"[Start] webhook {response.status_code} {response.text}") logger.info(f"[Start] webhook {response.status_code} {response.text}")
async def ffmpeg_process(stream_url: str,
output_dir: Optional[str] = None,
manifest_output_dir: Optional[str] = None,
manifest_segment_prefix: Optional[str] = None,
segment_duration: float = 5.0,
stream_content_timeout: int = 300, ):
await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url,
segment_duration=segment_duration,
stream_content_timeout=stream_content_timeout,
segments_output_dir=output_dir,
playlist_output_dir=manifest_output_dir,
manifest_segment_prefix=manifest_segment_prefix)
@backoff.on_exception(exception=Exception, wait_gen=backoff.constant, @backoff.on_exception(exception=Exception, wait_gen=backoff.constant,
max_time=5, max_tries=5, raise_on_giveup=False) max_time=5, max_tries=5, raise_on_giveup=False)
async def webhook_on_end(fn_id: str, webhook: WebhookNotify, result: FFMPEGResult): async def webhook_on_end(fn_id: str, webhook: WebhookNotify, result: FFMPEGResult):
@ -697,29 +680,27 @@ with ffmpeg_worker_image.imports():
playlist_handler = PlaylistEventHandler(webhook=webhook, fn_id=fn_id) playlist_handler = PlaylistEventHandler(webhook=webhook, fn_id=fn_id)
playlist_observer = Observer() playlist_observer = Observer()
os.makedirs(manifest_output_dir, exist_ok=True) os.makedirs(volume_output_dir, exist_ok=True)
playlist_observer.schedule(playlist_handler, # 监控本地Volume下录制缓存目录
path=f"{manifest_output_dir}", playlist_observer.schedule(playlist_handler, path=volume_output_dir, recursive=False)
recursive=False)
playlist_observer.start() playlist_observer.start()
try: try:
await ffmpeg_process(stream_url=stream_url, await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url,
segment_duration=segment_duration, segment_duration=segment_duration,
stream_content_timeout=recording_timeout, stream_content_timeout=recording_timeout,
output_dir=recording_output_dir, stream_monitor_timeout=monitor_timeout,
manifest_output_dir=manifest_output_dir, segments_output_dir=volume_output_dir,
manifest_segment_prefix=manifest_segment_prefix, ) playlist_output_dir=volume_output_dir)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
playlist_observer.stop() playlist_observer.stop()
if webhook: if webhook:
await webhook_on_error(webhook=webhook, await webhook_on_error(webhook=webhook, fn_id=fn_id,
fn_id=fn_id,
error=e.message if hasattr(e, 'message') else str(e)) error=e.message if hasattr(e, 'message') else str(e))
raise e raise e
playlist_observer.stop() playlist_observer.stop()
content_length = FileUtils.get_folder_size(recording_output_dir) content_length = FileUtils.get_folder_size(volume_output_dir)
metadata = VideoUtils.ffprobe_media_metadata(media_path=f"{manifest_output_dir}/playlist.m3u8") metadata = VideoUtils.ffprobe_media_metadata(media_path=f"{volume_output_dir}/playlist.m3u8")
result = FFMPEGResult(urn=f"s3://{config.S3_region}/{config.S3_bucket_name}/{output_dir}/playlist.m3u8", result = FFMPEGResult(urn=f"s3://{config.S3_region}/{config.S3_bucket_name}/{output_dir}/playlist.m3u8",
metadata=metadata, metadata=metadata,
content_length=content_length, ) content_length=content_length, )
@ -728,4 +709,5 @@ with ffmpeg_worker_image.imports():
if not sentry_trace: if not sentry_trace:
sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(), sentry_trace = SentryTransactionInfo(x_trace_id=sentry_sdk.get_traceparent(),
x_baggage=sentry_sdk.get_baggage()) x_baggage=sentry_sdk.get_baggage())
shutil.rmtree(volume_output_dir)
return result, sentry_trace return result, sentry_trace