diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index 6aa9af8..4ba4990 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -428,7 +428,8 @@ class ComfyTaskRequest(BaseFFMPEGTaskRequest): class FFMPEGStreamRecordRequest(BaseFFMPEGTaskRequest): stream_source: str = Field(description="直播源地址") - segment_duration: int = Field(default=5, description="hls片段时长(秒)") + first_segment_duration: int = Field(default=2, description="hls首个片段时长(秒), 首片段长度越小hls流能越快速开始播放") + segment_duration: int = Field(default=10, description="hls片段时长(秒)") recording_timeout: int = Field(default=300, description="hls流无内容后等待的时长(秒)") monitor_timeout: int = Field(default=36000, description="录制监控最大时长(秒), 默认为10小时, 不可大于12小时", le=43200) diff --git a/src/BowongModalFunctions/router/ffmpeg.py b/src/BowongModalFunctions/router/ffmpeg.py index 1fc93ec..86ec03b 100644 --- a/src/BowongModalFunctions/router/ffmpeg.py +++ b/src/BowongModalFunctions/router/ffmpeg.py @@ -213,6 +213,7 @@ async def stream_record_vod(body: FFMPEGStreamRecordRequest, if headers.x_trace_id and headers.x_baggage: sentry_trace = SentryTransactionInfo(x_trace_id=headers.x_trace_id, x_baggage=headers.x_baggage) fn_call = fn.spawn(stream_url=body.stream_source, + first_segment_duration=body.first_segment_duration, segment_duration=body.segment_duration, recording_timeout=body.recording_timeout, monitor_timeout=body.monitor_timeout, diff --git a/src/BowongModalFunctions/utils/ModalUtils.py b/src/BowongModalFunctions/utils/ModalUtils.py index 68aa357..0a8ae13 100644 --- a/src/BowongModalFunctions/utils/ModalUtils.py +++ b/src/BowongModalFunctions/utils/ModalUtils.py @@ -38,7 +38,7 @@ class ModalUtils: if not task.function_call_id == task_id: return ModalTaskInfo(function_name=task.function_name, status=TaskStatus.expired, error_code=ErrorCode.NOT_FOUND, error_reason="NOT_FOUND") - logger.info(f"Task {task.function_name} function call status: {task.status}") + logger.info(f"Task {task.function_name} function call status: {task.status.name}") match task.status: case InputStatus.PENDING: return ModalTaskInfo(function_name=task.function_name, status=TaskStatus.running) diff --git a/src/BowongModalFunctions/utils/VideoUtils.py b/src/BowongModalFunctions/utils/VideoUtils.py index 59607a4..d29802d 100644 --- a/src/BowongModalFunctions/utils/VideoUtils.py +++ b/src/BowongModalFunctions/utils/VideoUtils.py @@ -1179,7 +1179,7 @@ class VideoUtils: async def ffmpeg_stream_record_as_hls(stream_url: str, segments_output_dir: str, playlist_output_dir: str, - manifest_segment_prefix: Optional[str] = None, + first_segment_duration: float = 2.0, segment_duration: float = 5.0, stream_content_timeout: int = 300, stream_monitor_timeout: int = 36000, @@ -1197,13 +1197,11 @@ class VideoUtils: ffmpeg_cmd.output( f"{playlist_output_dir}/playlist.m3u8", f="hls", - # flags="+cgop", - # g=30, + hls_init_time=first_segment_duration, hls_time=segment_duration, hls_segment_filename=f"{segments_output_dir}/{output_file_pattern}", hls_segment_type="mpegts", - hls_flags="append_list+independent_segments+program_date_time", - # hls_base_url=manifest_segment_prefix if manifest_segment_prefix else None, + hls_flags="append_list+independent_segments+program_date_time+split_by_time+discont_start", hls_playlist_type="event", hls_list_size=0, hls_start_number_source="epoch_us", diff --git a/src/cluster/ffmpeg_app.py b/src/cluster/ffmpeg_app.py index 90c057c..0ac726f 100644 --- a/src/cluster/ffmpeg_app.py +++ b/src/cluster/ffmpeg_app.py @@ -661,6 +661,7 @@ with ffmpeg_worker_image.imports(): }, ) @modal.concurrent(max_inputs=5) async def ffmpeg_stream_record_as_hls(stream_url: str, segment_duration: int, recording_timeout: int, + first_segment_duration: int, webhook: Optional[WebhookNotify] = None, monitor_timeout: int = 36000, sentry_trace: Optional[SentryTransactionInfo] = None, ) -> Tuple[ FFMPEGResult, Optional[SentryTransactionInfo]]: @@ -708,6 +709,7 @@ with ffmpeg_worker_image.imports(): playlist_observer.start() try: await VideoUtils.ffmpeg_stream_record_as_hls(stream_url=stream_url, + first_segment_duration=first_segment_duration, segment_duration=segment_duration, stream_content_timeout=recording_timeout, stream_monitor_timeout=monitor_timeout, diff --git a/src/cluster/video.py b/src/cluster/video.py index aa3eba1..c1964d9 100644 --- a/src/cluster/video.py +++ b/src/cluster/video.py @@ -55,7 +55,7 @@ with downloader_image.imports(): from BowongModalFunctions.utils.KVCache import MediaSourceKVCache, LiveProductKVCache from BowongModalFunctions.models.media_model import MediaSource, MediaCacheStatus, MediaProtocol from BowongModalFunctions.models.web_model import SentryTransactionInfo, MonitorLiveRoomProductRequest, LiveProduct, \ - LiveProductCaches, WebhookNotify + LiveProductCaches, WebhookNotify from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment from BowongModalFunctions.utils.TimeUtils import TimeDelta, merge_product_data @@ -346,7 +346,8 @@ with downloader_image.imports(): cache.status = MediaCacheStatus.missing return cache - @app.function(cpu=(0.5,64), timeout=1800, + + @app.function(cpu=(0.5, 64), timeout=1800, max_containers=config.video_downloader_concurrency, volumes={ config.S3_mount_dir: modal.CloudBucketMount( @@ -362,8 +363,8 @@ with downloader_image.imports(): start_time: str, end_time: str, sentry_trace: SentryTransactionInfo, - webhook: WebhookNotify=None, - retry_time: int=3 + webhook: WebhookNotify = None, + retry_time: int = 3 ): def calculate_target_dimensions(original_width, original_height, target_height=1280): @@ -389,14 +390,15 @@ with downloader_image.imports(): # 第一步:获取原始视频宽高 ffprobe = FFprobe( inputs={input_file: ['-v', 'error', '-select_streams', 'v:0', - '-show_entries', 'stream=width,height', - '-of', 'csv=p=0']} + '-show_entries', 'stream=width,height', + '-of', 'csv=p=0']} ) info = ffprobe.run(stdout=subprocess.PIPE, stderr=subprocess.PIPE)[0].decode('utf-8').strip() width, height = map(int, info.split(',')) # 计算目标尺寸 - target_width, target_height = calculate_target_dimensions(width, height, target_height=int(0.5*height) if height > 1600 else 1138) + target_width, target_height = calculate_target_dimensions(width, height, target_height=int( + 0.5 * height) if height > 1600 else 1138) # 第二步:执行视频转换 ff = FFmpeg( @@ -464,15 +466,16 @@ with downloader_image.imports(): return upload_response.json(), upload_response.status_code @SentryUtils.webhook_handler(webhook, current_function_call_id()) - @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini", name="Gemini推理", fn_id=current_function_call_id()) + @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini", + name="Gemini推理", fn_id=current_function_call_id()) async def _handler(media: MediaSource, - google_api_key: str, - product_grid_list: List[str], - product_list: List[any], - start_time: str, - end_time: str, - sentry_trace: SentryTransactionInfo, - retry_time: int=3): + google_api_key: str, + product_grid_list: List[str], + product_list: List[any], + start_time: str, + end_time: str, + sentry_trace: SentryTransactionInfo, + retry_time: int = 3): video_gemini_uri = None @@ -489,22 +492,24 @@ with downloader_image.imports(): else: product_title_list = product_list logger.info("product_title_list: \n" + "\n".join(product_title_list)) - image_parts=[{ - "file_data": { - "mime_type": "image/jpeg", - "file_uri": f"{i}" - } - } for i in product_grid_list] + image_parts = [{ + "file_data": { + "mime_type": "image/jpeg", + "file_uri": f"{i}" + } + } for i in product_grid_list] # 2、切20分钟的条 logger.info("2、开始截取指定视频") - slice_fn = modal.Function.from_name(config.modal_app_name,"ffmpeg_slice_media", environment_name=config.modal_environment) - slice_result,sentry_trace = await slice_fn.remote.aio(media,[FFMpegSliceSegment( - start=TimeDelta.from_format_string(start_time),end=TimeDelta.from_format_string(end_time))], sentry_trace) + slice_fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_slice_media", + environment_name=config.modal_environment) + slice_result, sentry_trace = await slice_fn.remote.aio(media, [FFMpegSliceSegment( + start=TimeDelta.from_format_string(start_time), end=TimeDelta.from_format_string(end_time))], + sentry_trace) video = MediaSource.from_str(slice_result[0].urn) logger.success("截取完成") # 3、转换为720p h265 logger.info("3、转换视频为720p h264") - video_path = convert_video(os.path.join(config.S3_mount_dir,video.path)) + video_path = convert_video(os.path.join(config.S3_mount_dir, video.path)) # 4、上传到gemini logger.info("4、视频文件开始上传到Gemini") video_gemini, code = upload(video_path) @@ -513,16 +518,19 @@ with downloader_image.imports(): else: logger.error("视频文件上传Gemini失败") raise Exception("视频文件上传Gemini失败") + # 5、检查文件是否已处理完成 def check(): with httpx.Client(timeout=Timeout(timeout=30)) as client: file = video_gemini_uri.split("/")[-1] - response = client.get(url=f"https://generativelanguage.googleapis.com/v1beta/files/{file}?key={google_api_key}") + response = client.get( + url=f"https://generativelanguage.googleapis.com/v1beta/files/{file}?key={google_api_key}") response.raise_for_status() if response.status_code == 200: - if response.json()["state"]=="ACTIVE": + if response.json()["state"] == "ACTIVE": return True return False + check_num = 60 logger.info("5、开始检查Gemini文件是否处理完成") while check_num > 0: @@ -536,6 +544,7 @@ with downloader_image.imports(): if check_num <= 0: raise Exception("Gemini文件上传处理状态检查超时") logger.success("Gemini文件处理完成") + # 6、执行Gemini推理操作 def inference_api(): try: @@ -664,22 +673,22 @@ with downloader_image.imports(): } ]) json_data = { - "safetySettings": [ - {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, - {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"} - ], - "contents": [ - { - "parts": image_parts - } - ], - "generation_config": { - "temperature": 0.1, + "safetySettings": [ + {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, + {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"} + ], + "contents": [ + { + "parts": image_parts } + ], + "generation_config": { + "temperature": 0.1, } + } resp = requests.post( "https://gateway.ai.cloudflare.com/v1/67720b647ff2b55cf37ba3ef9e677083/bowong-dev/google-ai-studio/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent", headers={ @@ -697,8 +706,9 @@ with downloader_image.imports(): logger.error(f"Gemini推理失败, 状态码{resp.status_code}") if resp.status_code == 429: logger.warning("请求频率过高, 等待50-70秒后重试") - sleep(random.randint(50,70)) + sleep(random.randint(50, 70)) return None + logger.info("6、发起Gemini推理") target_json = None while target_json is None and retry_time > 0: @@ -709,9 +719,9 @@ with downloader_image.imports(): logger.info(f"推理完成JSON \n{json.dumps(target_json, indent=4, ensure_ascii=False)}") reason = target_json["candidates"][0]["finishReason"] - if reason=="STOP": - parts:str = target_json["candidates"][0]["content"]["parts"][0]["text"] - parts = parts.replace("```","").replace("json\n","").replace("\n","").replace("\\","") + if reason == "STOP": + parts: str = target_json["candidates"][0]["content"]["parts"][0]["text"] + parts = parts.replace("```", "").replace("json\n", "").replace("\n", "").replace("\\", "") parts = json.loads(parts) # 合并产品和时间线 parts = merge_product_data(parts) @@ -726,32 +736,37 @@ with downloader_image.imports(): if video_gemini_uri: logger.info("7、清除Gemini临时文件") with httpx.Client(timeout=Timeout(timeout=120)) as client: - resp = client.delete(f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp.modal.run/google/delete', - params={"filename":video_gemini_uri.split("/")[-1]}, headers={"x-google-api-key":google_api_key}) + resp = client.delete( + f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp.modal.run/google/delete', + params={"filename": video_gemini_uri.split("/")[-1]}, + headers={"x-google-api-key": google_api_key}) resp.raise_for_status() if resp.status_code == 200: logger.success("Gemini临时文件清除成功") else: logger.warning("Gemini临时文件清除失败, 请自行清除") - return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, sentry_trace, retry_time) - @app.function(max_containers=config.video_downloader_concurrency, timeout=60) + return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, + sentry_trace, retry_time) + + + @app.function(cpu=(0.5, 16), max_containers=config.video_downloader_concurrency, timeout=240) @modal.concurrent(max_inputs=200) - async def make_image_grid_upload(pic_info_list:List[Dict[str,str]], - image_size:int, - text_height:int, - font_size:int, - padding:int, - separator:int, + async def make_image_grid_upload(pic_info_list: List[Dict[str, str]], + image_size: int, + text_height: int, + font_size: int, + padding: int, + separator: int, google_api_key: str, - sentry_trace:SentryTransactionInfo) -> str: + sentry_trace: SentryTransactionInfo) -> str: def create_image_grid(image_info_list: List[Dict[str, str]], - output_path: str, - image_size: int = 450, - text_height: int = 40, - font_size: int = 18, - padding: int = 5, - separator: int = 5) -> str | None: + output_path: str, + image_size: int = 450, + text_height: int = 40, + font_size: int = 18, + padding: int = 5, + separator: int = 5) -> str | None: """ 创建一个包含图片和文字说明的马赛克拼图,最大36张图 @@ -995,16 +1010,18 @@ with downloader_image.imports(): return upload_response.json(), upload_response.status_code - @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="make_grid_gemini", name="将输入图拼为网格上传到Gemini网盘", fn_id=current_function_call_id()) - def _handler(google_api_key:str, - pic_info_list:List[Dict[str,str]], - image_size:int, - text_height:int, - font_size:int, - padding:int, - separator:int): + @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="make_grid_gemini", + name="将输入图拼为网格上传到Gemini网盘", fn_id=current_function_call_id()) + def _handler(google_api_key: str, + pic_info_list: List[Dict[str, str]], + image_size: int, + text_height: int, + font_size: int, + padding: int, + separator: int): image_grid_path = f"grid_{uuid.uuid4()}.jpg" - image_grid_path = create_image_grid(pic_info_list, image_grid_path, image_size, text_height, font_size, padding, separator) + image_grid_path = create_image_grid(pic_info_list, image_grid_path, image_size, text_height, font_size, + padding, separator) if not image_grid_path: raise Exception("创建图片网格失败") @@ -1015,16 +1032,18 @@ with downloader_image.imports(): logger.error("图片网格文件上传Gemini失败") raise Exception("图片网格文件上传Gemini失败") return image_gemini_uri + return _handler(google_api_key, pic_info_list, image_size, text_height, font_size, padding, separator) @app.function(max_containers=config.video_downloader_concurrency, timeout=130) @modal.concurrent(max_inputs=50) - async def monitor_live_room_product_trigger(cookie:str, room_id:str, author_id:str) -> int: + async def monitor_live_room_product_trigger(cookie: str, room_id: str, author_id: str) -> int: def get_product_list(): with httpx.Client(timeout=Timeout(timeout=120)) as client: - resp = client.get(f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp-tikhub.modal.run/douyin/web/fetch_live_room_product_result', - params={"cookie":cookie,"room_id":room_id,"author_id":author_id}) + resp = client.get( + f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp-tikhub.modal.run/douyin/web/fetch_live_room_product_result', + params={"cookie": cookie, "room_id": room_id, "author_id": author_id}) resp.raise_for_status() if resp.status_code == 200: if resp.json()["data"]["total"] >= 0: @@ -1037,6 +1056,7 @@ with downloader_image.imports(): return 2, [] # 其他错误 return 3, [] + try: logger.info(f"room_id {room_id} author_id {author_id} 触发监控商品...") is_live, product_list = get_product_list() @@ -1062,20 +1082,20 @@ with downloader_image.imports(): # 最新的插入在最前 for new_product in product_list[::-1]: last_cache.product_list.insert(0, LiveProduct(title=new_product["title"], - leaf_category=new_product["leaf_category"], - shop_id=new_product["shop_id"], - product_id=new_product["product_id"], - cover=new_product["cover"], - detail_url=new_product["detail_url"])) + leaf_category=new_product["leaf_category"], + shop_id=new_product["shop_id"], + product_id=new_product["product_id"], + cover=new_product["cover"], + detail_url=new_product["detail_url"])) else: logger.success(f"room_id {room_id} author_id {author_id} 新建缓存") last_cache = LiveProductCaches(room_id=room_id, author_id=author_id, product_list=[ LiveProduct(title=new_product["title"], - leaf_category=new_product["leaf_category"], - shop_id=new_product["shop_id"], - product_id=new_product["product_id"], - cover=new_product["cover"], - detail_url=new_product["detail_url"]) for new_product in product_list]) + leaf_category=new_product["leaf_category"], + shop_id=new_product["shop_id"], + product_id=new_product["product_id"], + cover=new_product["cover"], + detail_url=new_product["detail_url"]) for new_product in product_list]) last_cache.update_time = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S") last_cache.count = len(last_cache.product_list) modal_kv_product_cache.set_cache(last_cache) @@ -1084,4 +1104,3 @@ with downloader_image.imports(): except Exception as e: logger.exception(f"room_id {room_id} author_id {author_id} 触发监控商品发生错误 {e}") return 4 -