From 7666102c2790f052132a1ba3294fb0ba5030432b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BA=B7=E5=AE=87=E4=BD=B3?= Date: Mon, 23 Jun 2025 16:13:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=88=86=E6=94=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Merge branch 'main' into cluster-gemini * FIX Gemini推理配置修复langfuse来源 * PERF Gemini推理配置适配langfuse来源 FIX 时间转换适配更多时间格式 * PERF prompt接入langfuse * Merge branch 'main' into cluster-gemini * FIX 修复pydantic导致的问题 PERF 预先使用prompt传输格式要求,如果失败fallback到调用gemini处理格式错误 * Merge branch 'main' into cluster-gemini * FIX 修复上传问题 * PERF 规范化gemini调用方式, 使用response_schema规范化输出 * PERF 修改gemini upload调用为规范调用 * PERF 优化prompt词提高识别准确率 FIX 修复升级httpx导致部署失败的问题 * Merge branch 'main' into cluster-gemini * PERF 优化prompt词提高识别准确率 * FIX 修复prompt时间问题 * Merge branch 'main' into cluster-gemini * PERF 修复时间最大限制问题 * Merge branch 'main' into cluster-gemini * FIX 修复缩放分辨率计算问题 ADD Gemini推理改为二阶段 FIX 修复时间合并计算问题 * Merge branch 'main' into cluster-gemini * ADD gemini数据源使用cloud storage --------- Merge request URL: https://g-ldyi2063.coding.net/p/dev/d/modalDeploy/git/merge/4861 Co-authored-by: 康宇佳 --- src/BowongModalFunctions/models/web_model.py | 23 ++ src/BowongModalFunctions/utils/TimeUtils.py | 16 +- src/cluster/video_apps/hls_slice_inference.py | 239 +++++++++--------- 3 files changed, 155 insertions(+), 123 deletions(-) diff --git a/src/BowongModalFunctions/models/web_model.py b/src/BowongModalFunctions/models/web_model.py index 42adb3f..169ee54 100644 --- a/src/BowongModalFunctions/models/web_model.py +++ b/src/BowongModalFunctions/models/web_model.py @@ -1,3 +1,4 @@ +import json import uuid from datetime import datetime from enum import Enum @@ -652,3 +653,25 @@ class GeminiSecondStageResponseModel(RootModel): 表示一个包含多个商品及其时间线信息的列表。 """ root: List[ProductTimeline] + + +class GeminiFirstStagePromptVariables(BaseModel): + product_list: List[str] = Field(description="商品名列表") + + @computed_field(description="xml格式排列的商品列表") + @property + def product_list_xml(self) -> str: + xml_items = [f" {product}" for product in self.product_list] + xml_string = "\n".join(xml_items) + return f"\n{xml_string}\n " + + +class GeminiSecondStagePromptVariables(BaseModel): + product_json_list: List[dict] = Field(description="识别出的商品JSON列表") + + @computed_field(description="xml格式排列的识别出的商品JSON列表") + @property + def product_json_list_xml(self) -> str: + xml_items = [f" {json.dumps(product,ensure_ascii=False)}" for product in self.product_json_list] + xml_string = "\n".join(xml_items) + return f"\n{xml_string}\n " \ No newline at end of file diff --git a/src/BowongModalFunctions/utils/TimeUtils.py b/src/BowongModalFunctions/utils/TimeUtils.py index 80749e5..cd114b6 100644 --- a/src/BowongModalFunctions/utils/TimeUtils.py +++ b/src/BowongModalFunctions/utils/TimeUtils.py @@ -26,19 +26,29 @@ def parse_time(time_str): if re.match(r"^\d{2}:\d{2}:\d{2}\.\d{3}$",time_str): # 先尝试完整格式 HH:MM:SS.fff parsed_time = datetime.strptime(time_str, '%H:%M:%S.%f') + elif re.match(r"^\d{2}:\d{2}:\d{2}:\d{3}$",time_str): + # 如果失败,尝试 HH:MM:SS:fff 格式 + parsed_time = datetime.strptime(time_str, '%H:%M:%S:%f') elif re.match(r"^\d{2}:\d{2}\.\d{3}$",time_str): # 如果失败,尝试 MM:SS.fff 格式 dt = datetime.strptime(time_str, '%M:%S.%f') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) + elif re.match(r"^\d{2}\.\d{2}:\d{3}$",time_str): + # 如果失败,尝试 MM.SS:fff 格式 + dt = datetime.strptime(time_str, '%M.%S:%f') + # 将小时设为0,只保留分钟和秒 + parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) + elif re.match(r"^\d{2}\.\d{2}\.\d{3}$",time_str): + # 如果失败,尝试 MM.SS.fff 格式 + dt = datetime.strptime(time_str, '%M.%S.%f') + # 将小时设为0,只保留分钟和秒 + parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) elif re.match(r"^\d{2}:\d{2}:\d{3}$",time_str): # 如果失败,尝试 MM:SS:fff 格式 dt = datetime.strptime(time_str, '%M:%S:%f') # 将小时设为0,只保留分钟和秒 parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0)) - elif re.match(r"^\d{2}:\d{2}:\d{2}:\d{3}$",time_str): - # 如果失败,尝试 HH:MM:SS:fff 格式 - parsed_time = datetime.strptime(time_str, '%H:%M:%S:%f') else: logger.error(f"转换时间格式失败 {time_str}") return parsed_time diff --git a/src/cluster/video_apps/hls_slice_inference.py b/src/cluster/video_apps/hls_slice_inference.py index 1606f1e..2a90e04 100644 --- a/src/cluster/video_apps/hls_slice_inference.py +++ b/src/cluster/video_apps/hls_slice_inference.py @@ -3,6 +3,9 @@ import uuid import modal from google.genai import types +from jinja2 import Template +from langfuse._client.client import Langfuse +from langfuse.model import TextPromptClient from BowongModalFunctions.utils.HTTPUtils import GoogleAuthUtils, FlatJsonSchemaGenerator from ..video import downloader_image, app, config @@ -16,8 +19,8 @@ with downloader_image.imports(): from starlette import status from BowongModalFunctions.utils.SentryUtils import SentryUtils from BowongModalFunctions.models.media_model import MediaSource - from BowongModalFunctions.models.web_model import SentryTransactionInfo, WebhookNotify, \ - GeminiFirstStageResponseModel, GeminiSecondStageResponseModel + from BowongModalFunctions.models.web_model import SentryTransactionInfo, WebhookNotify, GeminiFirstStagePromptVariables, \ + GeminiSecondStagePromptVariables, GeminiFirstStageResponseModel, GeminiSecondStageResponseModel from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment from BowongModalFunctions.utils.TimeUtils import TimeDelta, merge_product_data from BowongModalFunctions.utils.VideoUtils import FFMPEGSliceOptions, VideoUtils @@ -25,7 +28,6 @@ with downloader_image.imports(): @app.function(cpu=(0.5, 64), timeout=1800, max_containers=config.video_downloader_concurrency, - secrets=[modal.Secret.from_name("gemini-prompt")], volumes={ config.S3_mount_dir: modal.CloudBucketMount( bucket_name=config.S3_bucket_name, @@ -65,6 +67,11 @@ with downloader_image.imports(): "europe-west8", ] + langfuse = Langfuse(host="https://us.cloud.langfuse.com", + secret_key="sk-lf-dd20cb0b-ef2e-49f6-80f0-b2d9cff1bb11", + public_key="pk-lf-15f9d809-0bf6-4a84-ae1c-18f7a7d927c7", + tracing_enabled=False) + logger.info(f"\nmedia || {media.urn}," f"\nstart_time || {start_time}, " f"\nend_time || {end_time}, " @@ -80,10 +87,20 @@ with downloader_image.imports(): regions=gemini_region, access_token=google_api_key, ) - # 动态Prompt - if "IMAGE_PRODUCT_IDENTIFICATION_PROMPT" in os.environ: - IMAGE_PRODUCT_IDENTIFICATION_PROMPT = os.environ["IMAGE_PRODUCT_IDENTIFICATION_PROMPT"] + def split_prompt_config(c:dict): + return { + "temperature": c["temperature"], + "top_p": c["top_p"], + "safety_settings": c["safety_settings"] + },c + + # 动态Prompt, langfuse获取失败使用默认值 + IMAGE_PRODUCT_IDENTIFICATION_PROMPT = langfuse.get_prompt("Gemini自动切条/商品识别", type="text", label="latest") + if IMAGE_PRODUCT_IDENTIFICATION_PROMPT: + prompt_config = IMAGE_PRODUCT_IDENTIFICATION_PROMPT.config + first_stage_generate_config, first_stage_correct_config = split_prompt_config(prompt_config) else: + logger.warning("🥺获取一阶段Prompt失败,使用内置Prompt") IMAGE_PRODUCT_IDENTIFICATION_PROMPT = """ 你是专业的商品识别专家。我上传了商品图片网格,需要你识别图片中的商品并与商品列表进行匹配。 @@ -108,32 +125,66 @@ with downloader_image.imports(): - ✅ 输出商品数量不得超过图片中的黑色边框区域数量 **商品列表-标准商品名称**: - - {PRODUCT_LIST} - + {{PRODUCT_LIST}} 请按以下JSON格式输出: [ - {{ + { "image_order": 1, "image_name": "图片上显示的原始文字", "matched_product": "匹配到的标准商品名称(必须与商品列表-标准商品名称一致)或null", "match_confidence": 95, - "visual_features": {{ + "visual_features": { "color": "商品详细配色等有辨识度的色彩特征", "pattern": "商品详细图案纹理布料等有辨识度的材质特征,材质可根据商品名以及图片综合进行判断", "style": "商品详细款式风格等有辨识度的款式特征" - }} - }} + } + } ] """ + first_stage_correct_config = { + "temperature": 0.01, + "top_p": 0.7, + "safety_settings": [ + { + "category": "HARM_CATEGORY_HARASSMENT", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_CIVIC_INTEGRITY", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_DANGEROUS_CONTENT", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_HATE_SPEECH", + "threshold": "BLOCK_NONE" + } + ], + "response_mime_type": "application/json", + "response_schema": GeminiFirstStageResponseModel.model_json_schema(schema_generator=FlatJsonSchemaGenerator) + } + first_stage_generate_config = { + "temperature": first_stage_correct_config["temperature"], + "top_p": first_stage_correct_config["top_p"], + "safety_settings": first_stage_correct_config["safety_settings"] + } - if "VIDEO_TIMELINE_ANALYSIS_PROMPT" in os.environ: - VIDEO_TIMELINE_ANALYSIS_PROMPT = os.environ["VIDEO_TIMELINE_ANALYSIS_PROMPT"] + VIDEO_TIMELINE_ANALYSIS_PROMPT = langfuse.get_prompt("Gemini自动切条/视频时间点识别", type="text", label="latest") + if VIDEO_TIMELINE_ANALYSIS_PROMPT: + prompt_config = VIDEO_TIMELINE_ANALYSIS_PROMPT.config + second_stage_generate_config, second_stage_correct_config = split_prompt_config(prompt_config) else: + logger.warning("🥺获取二阶段Prompt失败,使用内置Prompt") VIDEO_TIMELINE_ANALYSIS_PROMPT = """ 基于已识别的商品清单,分析视频中每个商品的出现时间段。 @@ -176,24 +227,55 @@ with downloader_image.imports(): - 常见错误:将05:23.500解释为5分钟23.5秒 **已识别商品清单(product-商品名 feature-商品特征 feature.color-商品详细颜色配色等色彩特征 feature.pattern-商品详细图案纹理布料等材质特征 feature.style-商品详细款式风格等有辨识度的款式特征)**: - - {IDENTIFIED_PRODUCTS} - + {{IDENTIFIED_PRODUCTS}} 请按以下JSON格式输出,确保输出格式复核JSON格式要求: [ - {{ + { "product": "标准商品名称", "timeline": [ "01:15.200 - 02:30.800 (穿着本品+介绍本品)", "05:10.100 - 06:25.600 (穿着本品+他品)" ] - }} + } ] """ + second_stage_correct_config = { + "temperature": 0.01, + "top_p": 0.7, + "safety_settings": [ + { + "category": "HARM_CATEGORY_HARASSMENT", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_CIVIC_INTEGRITY", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_DANGEROUS_CONTENT", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", + "threshold": "BLOCK_NONE" + }, + { + "category": "HARM_CATEGORY_HATE_SPEECH", + "threshold": "BLOCK_NONE" + } + ], + "response_mime_type": "application/json", + "response_schema": GeminiSecondStageResponseModel.model_json_schema(schema_generator=FlatJsonSchemaGenerator) + } + second_stage_generate_config = { + "temperature": second_stage_correct_config["temperature"], + "top_p": second_stage_correct_config["top_p"], + "safety_settings": second_stage_correct_config["safety_settings"] + } async def upload(file_path): @@ -215,7 +297,7 @@ with downloader_image.imports(): return upload_response.urn - def convert_json(json_like_str:str, json_model): + def convert_json(json_like_str:str, correct_config): try: resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", contents=[types.Content(role='user', @@ -232,35 +314,7 @@ with downloader_image.imports(): ] ) ], - config=types.GenerateContentConfig( - temperature=0.01, - top_p=0.7, - safety_settings=[ - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ) - ], - response_mime_type="application/json", - response_schema=json_model.model_json_schema( - schema_generator=FlatJsonSchemaGenerator) - ), timeout=900) + config=types.GenerateContentConfig.model_validate(correct_config), timeout=900) except Exception as e: logger.exception(f"😭格式化json请求失败 {e}") return None @@ -282,9 +336,7 @@ with downloader_image.imports(): logger.warning("🥵请求负载过高, 随机更换地区重试") return None - - - def parse_stage1_result(result_text): + def parse_stage1_result(result_text, correct_config): """解析第一阶段结果,提取已识别的商品列表""" try: # 清理结果文本,提取JSON部分 @@ -298,7 +350,7 @@ with downloader_image.imports(): try: stage1_data = json.loads(clean_result) except json.decoder.JSONDecodeError: - stage1_data = convert_json(clean_result, GeminiFirstStageResponseModel) + stage1_data = convert_json(clean_result, correct_config) if not stage1_data: raise Exception("json为空") identified_products = [] @@ -321,7 +373,6 @@ with downloader_image.imports(): @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini", name="Gemini推理", fn_id=current_function_call_id()) async def _handler(media: MediaSource, - google_api_key: str, product_grid_list: List[str], product_list: List[any], start_time: str, @@ -372,9 +423,6 @@ with downloader_image.imports(): async def inference_api_first_stage(): try: logger.info("🎈开始一阶段推理") - product_list_str = "" - for i, product in enumerate(product_title_list): - product_list_str += f"{i}. {product}\n" image_parts = [] for i in product_grid_list: image_parts.append( @@ -387,7 +435,10 @@ with downloader_image.imports(): ) image_parts.append( types.Part.from_text( - text=IMAGE_PRODUCT_IDENTIFICATION_PROMPT.format(PRODUCT_LIST=product_list_str) + text=Template(IMAGE_PRODUCT_IDENTIFICATION_PROMPT.prompt + if isinstance(IMAGE_PRODUCT_IDENTIFICATION_PROMPT, TextPromptClient) + else IMAGE_PRODUCT_IDENTIFICATION_PROMPT).render( + PRODUCT_LIST=GeminiFirstStagePromptVariables(product_list=product_title_list).product_list_xml) ) ) resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", @@ -395,32 +446,7 @@ with downloader_image.imports(): parts=image_parts ) ], - config=types.GenerateContentConfig( - temperature=0.01, - top_p=0.7, - safety_settings=[ - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ) - ], - ), timeout=900) + config=types.GenerateContentConfig.model_validate(first_stage_generate_config), timeout=900) except Exception as e: logger.exception(f"😭Gemini一阶段推理请求失败: {e}") raise e @@ -430,7 +456,7 @@ with downloader_image.imports(): if reason == "STOP": result_text: str = resp.candidates[0].content.parts[0].text # 解析识别结果 - identified_products = parse_stage1_result(result_text) + identified_products = parse_stage1_result(result_text, first_stage_correct_config) return identified_products else: logger.error(f"😭Gemini一阶段推理失败, Reason {reason}") @@ -444,11 +470,6 @@ with downloader_image.imports(): async def inference_api_second_stage(identified_products): try: logger.info("🎈开始二阶段推理") - # 构建已识别商品清单字符串 - identified_products_str = "" - for i, product in enumerate(identified_products): - product_info_json = json.dumps(product,ensure_ascii=False) - identified_products_str += f"{i}. {product_info_json}\n" video_parts = [] video_parts.append( types.Part( @@ -460,7 +481,10 @@ with downloader_image.imports(): ) video_parts.append( types.Part.from_text( - text=VIDEO_TIMELINE_ANALYSIS_PROMPT.format(IDENTIFIED_PRODUCTS=identified_products_str) + text=Template(VIDEO_TIMELINE_ANALYSIS_PROMPT.prompt + if isinstance(VIDEO_TIMELINE_ANALYSIS_PROMPT, TextPromptClient) + else VIDEO_TIMELINE_ANALYSIS_PROMPT).render( + IDENTIFIED_PRODUCTS=GeminiSecondStagePromptVariables(product_json_list=identified_products).product_json_list_xml) ) ) resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", @@ -468,32 +492,7 @@ with downloader_image.imports(): parts=video_parts ) ], - config=types.GenerateContentConfig( - temperature=0.01, - top_p=0.7, - safety_settings=[ - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ), - types.SafetySetting( - category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, - threshold=types.HarmBlockThreshold.BLOCK_NONE, - ) - ], - ), timeout=900) + config=types.GenerateContentConfig.model_validate(second_stage_generate_config), timeout=900) except Exception as e: logger.exception(f"😭Gemini二阶段推理请求失败: {e}") raise e @@ -512,7 +511,7 @@ with downloader_image.imports(): try: parts = json.loads(parts_text) except json.decoder.JSONDecodeError: - parts = convert_json(parts_text, GeminiSecondStageResponseModel) + parts = convert_json(parts_text, second_stage_correct_config) if not parts: raise Exception("json为空") logger.info(f"👌合并前 {json.dumps(parts, indent=4, ensure_ascii=False)}") @@ -562,5 +561,5 @@ with downloader_image.imports(): logger.exception(f"🥲推理失败, {e}") raise Exception(f"🥲推理失败, {e}") - return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, options, + return await _handler(media, product_grid_list, product_list, start_time, end_time, options, sentry_trace, retry_time, scale)