- hls录制时添加了split_by_time和discont_start的flag用于应对录制时的特殊情况
This commit is contained in:
shuohigh@gmail.com 2025-06-13 12:18:54 +08:00
parent 71289fe743
commit b83eb08b72
6 changed files with 113 additions and 92 deletions

View File

@ -428,7 +428,8 @@ class ComfyTaskRequest(BaseFFMPEGTaskRequest):
class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest):
stream_source: str = Field(description="直播源地址")
segment_duration: int = Field(default=5, description="hls片段时长(秒)")
first_segment_duration: int = Field(default=2, description="hls首个片段时长(秒), 首片段长度越小hls流能越快速开始播放")
segment_duration: int = Field(default=10, description="hls片段时长(秒)")
recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)")
monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时",
le=43200)

View File

@ -213,6 +213,7 @@ async def stream_record_vod(body: FFMPEGStreamRecordRequest,
if headers.x_trace_id and headers.x_baggage:
sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage)
fn_call = fn.spawn(stream_url=body.stream_source,
first_segment_duration=body.first_segment_duration,
segment_duration=body.segment_duration,
recording_timeout=body.recording_timeout,
monitor_timeout=body.monitor_timeout,

View File

@ -38,7 +38,7 @@ class ModalUtils:
if not task.function_call_id == task_id:
return ModalTaskInfo(function_name=task.function_name, status=TaskStatus.expired,
error_code=ErrorCode.NOT_FOUND, error_reason="NOT_FOUND")
logger.info(f"Task {task.function_name} function call status: {task.status}")
logger.info(f"Task {task.function_name} function call status: {task.status.name}")
match task.status:
case InputStatus.PENDING:
return ModalTaskInfo(function_name=task.function_name, status=TaskStatus.running)

View File

@ -1179,7 +1179,7 @@ class VideoUtils:
async def ffmpeg_stream_record_as_hls(stream_url: str,
segments_output_dir: str,
playlist_output_dir: str,
manifest_segment_prefix: Optional[str] = None,
first_segment_duration: float = 2.0,
segment_duration: float = 5.0,
stream_content_timeout: int = 300,
stream_monitor_timeout: int = 36000,
@ -1197,13 +1197,11 @@ class VideoUtils:
ffmpeg_cmd.output(
f"{playlist_output_dir}/playlist.m3u8",
f="hls",
# flags="+cgop",
# g=30,
hls_init_time=first_segment_duration,
hls_time=segment_duration,
hls_segment_filename=f"{segments_output_dir}/{output_file_pattern}",
hls_segment_type="mpegts",
hls_flags="append_list+independent_segments+program_date_time",
# hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None,
hls_flags="append_list+independent_segments+program_date_time+split_by_time+discont_start",
hls_playlist_type="event",
hls_list_size=0,
hls_start_number_source="epoch_us",

View File

@ -661,6 +661,7 @@ with ffmpeg_worker_image.imports():
}, )
@modal.concurrent(max_inputs=5)
async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int,
first_segment_duration: int,
webhook: Optional[WebhookNotify] = None, monitor_timeout: int = 36000,
sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[
FFMPEGResult, Optional[SentryTransactionInfo]]:
@ -708,6 +709,7 @@ with ffmpeg_worker_image.imports():
playlist_observer.start()
try:
await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url,
first_segment_duration=first_segment_duration,
segment_duration=segment_duration,
stream_content_timeout=recording_timeout,
stream_monitor_timeout=monitor_timeout,

View File

@ -346,6 +346,7 @@ with downloader_image.imports():
cache.status = MediaCacheStatus.missing
return cache
@app.function(cpu=(0.5, 64), timeout=1800,
max_containers=config.video_downloader_concurrency,
volumes={
@ -396,7 +397,8 @@ with downloader_image.imports():
width, height = map(int, info.split(','))
# 计算目标尺寸
target_width, target_height = calculate_target_dimensions(width, height, target_height=int(0.5*height) if height > 1600 else 1138)
target_width, target_height = calculate_target_dimensions(width, height, target_height=int(
0.5 * height) if height > 1600 else 1138)
# 第二步:执行视频转换
ff = FFmpeg(
@ -464,7 +466,8 @@ with downloader_image.imports():
return upload_response.json(), upload_response.status_code
@SentryUtils.webhook_handler(webhook, current_function_call_id())
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini", name="Gemini推理", fn_id=current_function_call_id())
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini",
name="Gemini推理", fn_id=current_function_call_id())
async def _handler(media: MediaSource,
google_api_key: str,
product_grid_list: List[str],
@ -497,9 +500,11 @@ with downloader_image.imports():
} for i in product_grid_list]
# 2、切20分钟的条
logger.info("2、开始截取指定视频")
slice_fn = modal.Function.from_name(config.modal_app_name,"ffmpeg_slice_media", environment_name=config.modal_environment)
slice_fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_slice_media",
environment_name=config.modal_environment)
slice_result, sentry_trace = await slice_fn.remote.aio(media, [FFMpegSliceSegment(
start=TimeDelta.from_format_string(start_time),end=TimeDelta.from_format_string(end_time))], sentry_trace)
start=TimeDelta.from_format_string(start_time), end=TimeDelta.from_format_string(end_time))],
sentry_trace)
video = MediaSource.from_str(slice_result[0].urn)
logger.success("截取完成")
# 3、转换为720p h265
@ -513,16 +518,19 @@ with downloader_image.imports():
else:
logger.error("视频文件上传Gemini失败")
raise Exception("视频文件上传Gemini失败")
# 5、检查文件是否已处理完成
def check():
with httpx.Client(timeout=Timeout(timeout=30)) as client:
file = video_gemini_uri.split("/")[-1]
response = client.get(url=f"https://generativelanguage.googleapis.com/v1beta/files/{file}?key={google_api_key}")
response = client.get(
url=f"https://generativelanguage.googleapis.com/v1beta/files/{file}?key={google_api_key}")
response.raise_for_status()
if response.status_code == 200:
if response.json()["state"] == "ACTIVE":
return True
return False
check_num = 60
logger.info("5、开始检查Gemini文件是否处理完成")
while check_num > 0:
@ -536,6 +544,7 @@ with downloader_image.imports():
if check_num <= 0:
raise Exception("Gemini文件上传处理状态检查超时")
logger.success("Gemini文件处理完成")
# 6、执行Gemini推理操作
def inference_api():
try:
@ -699,6 +708,7 @@ with downloader_image.imports():
logger.warning("请求频率过高, 等待50-70秒后重试")
sleep(random.randint(50, 70))
return None
logger.info("6、发起Gemini推理")
target_json = None
while target_json is None and retry_time > 0:
@ -726,16 +736,21 @@ with downloader_image.imports():
if video_gemini_uri:
logger.info("7、清除Gemini临时文件")
with httpx.Client(timeout=Timeout(timeout=120)) as client:
resp = client.delete(f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp.modal.run/google/delete',
params={"filename":video_gemini_uri.split("/")[-1]}, headers={"x-google-api-key":google_api_key})
resp = client.delete(
f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp.modal.run/google/delete',
params={"filename": video_gemini_uri.split("/")[-1]},
headers={"x-google-api-key": google_api_key})
resp.raise_for_status()
if resp.status_code == 200:
logger.success("Gemini临时文件清除成功")
else:
logger.warning("Gemini临时文件清除失败, 请自行清除")
return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, sentry_trace, retry_time)
@app.function(max_containers=config.video_downloader_concurrency, timeout=60)
return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time,
sentry_trace, retry_time)
@app.function(cpu=(0.5, 16), max_containers=config.video_downloader_concurrency, timeout=240)
@modal.concurrent(max_inputs=200)
async def make_image_grid_upload(pic_info_list: List[Dict[str, str]],
image_size: int,
@ -995,7 +1010,8 @@ with downloader_image.imports():
return upload_response.json(), upload_response.status_code
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="make_grid_gemini", name="将输入图拼为网格上传到Gemini网盘", fn_id=current_function_call_id())
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="make_grid_gemini",
name="将输入图拼为网格上传到Gemini网盘", fn_id=current_function_call_id())
def _handler(google_api_key: str,
pic_info_list: List[Dict[str, str]],
image_size: int,
@ -1004,7 +1020,8 @@ with downloader_image.imports():
padding: int,
separator: int):
image_grid_path = f"grid_{uuid.uuid4()}.jpg"
image_grid_path = create_image_grid(pic_info_list, image_grid_path, image_size, text_height, font_size, padding, separator)
image_grid_path = create_image_grid(pic_info_list, image_grid_path, image_size, text_height, font_size,
padding, separator)
if not image_grid_path:
raise Exception("创建图片网格失败")
@ -1015,6 +1032,7 @@ with downloader_image.imports():
logger.error("图片网格文件上传Gemini失败")
raise Exception("图片网格文件上传Gemini失败")
return image_gemini_uri
return _handler(google_api_key, pic_info_list, image_size, text_height, font_size, padding, separator)
@ -1023,7 +1041,8 @@ with downloader_image.imports():
async def monitor_live_room_product_trigger(cookie: str, room_id: str, author_id: str) -> int:
def get_product_list():
with httpx.Client(timeout=Timeout(timeout=120)) as client:
resp = client.get(f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp-tikhub.modal.run/douyin/web/fetch_live_room_product_result',
resp = client.get(
f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp-tikhub.modal.run/douyin/web/fetch_live_room_product_result',
params={"cookie": cookie, "room_id": room_id, "author_id": author_id})
resp.raise_for_status()
if resp.status_code == 200:
@ -1037,6 +1056,7 @@ with downloader_image.imports():
return 2, []
# 其他错误
return 3, []
try:
logger.info(f"room_id {room_id} author_id {author_id} 触发监控商品...")
is_live, product_list = get_product_list()
@ -1084,4 +1104,3 @@ with downloader_image.imports():
except Exception as e:
logger.exception(f"room_id {room_id} author_id {author_id} 触发监控商品发生错误 {e}")
return 4