合并分支

* Merge branch 'main' into cluster-gemini
* FIX Gemini推理配置修复langfuse来源
* PERF Gemini推理配置适配langfuse来源 FIX 时间转换适配更多时间格式
* PERF prompt接入langfuse
* Merge branch 'main' into cluster-gemini
* FIX 修复pydantic导致的问题 PERF 预先使用prompt传输格式要求,如果失败fallback到调用gemini处理格式错误
* Merge branch 'main' into cluster-gemini
* FIX 修复上传问题
* PERF 规范化gemini调用方式, 使用response_schema规范化输出
* PERF 修改gemini upload调用为规范调用
* PERF 优化prompt词提高识别准确率 FIX 修复升级httpx导致部署失败的问题
* Merge branch 'main' into cluster-gemini
* PERF 优化prompt词提高识别准确率
* FIX 修复prompt时间问题
* Merge branch 'main' into cluster-gemini
* PERF 修复时间最大限制问题
* Merge branch 'main' into cluster-gemini
* FIX 修复缩放分辨率计算问题 ADD Gemini推理改为二阶段 FIX 修复时间合并计算问题
* Merge branch 'main' into cluster-gemini
* ADD gemini数据源使用cloud storage

---------

Merge request URL: https://g-ldyi2063.coding.net/p/dev/d/modalDeploy/git/merge/4861
Co-authored-by: 康宇佳
This commit is contained in:
康宇佳 2025-06-23 16:13:05 +08:00 committed by Coding
parent bffd102622
commit 7666102c27
3 changed files with 155 additions and 123 deletions

View File

@ -1,3 +1,4 @@
import json
import uuid
from datetime import datetime
from enum import Enum
@ -652,3 +653,25 @@ class GeminiSecondStageResponseModel(RootModel):
表示一个包含多个商品及其时间线信息的列表
"""
root: List[ProductTimeline]
class GeminiFirstStagePromptVariables(BaseModel):
product_list: List[str] = Field(description="商品名列表")
@computed_field(description="xml格式排列的商品列表")
@property
def product_list_xml(self) -> str:
xml_items = [f" <product>{product}</product>" for product in self.product_list]
xml_string = "\n".join(xml_items)
return f"<products>\n{xml_string}\n </products>"
class GeminiSecondStagePromptVariables(BaseModel):
product_json_list: List[dict] = Field(description="识别出的商品JSON列表")
@computed_field(description="xml格式排列的识别出的商品JSON列表")
@property
def product_json_list_xml(self) -> str:
xml_items = [f" <product>{json.dumps(product,ensure_ascii=False)}</product>" for product in self.product_json_list]
xml_string = "\n".join(xml_items)
return f"<products>\n{xml_string}\n </products>"

View File

@ -26,19 +26,29 @@ def parse_time(time_str):
if re.match(r"^\d{2}:\d{2}:\d{2}\.\d{3}$",time_str):
# 先尝试完整格式 HH:MM:SS.fff
parsed_time = datetime.strptime(time_str, '%H:%M:%S.%f')
elif re.match(r"^\d{2}:\d{2}:\d{2}:\d{3}$",time_str):
# 如果失败,尝试 HH:MM:SS:fff 格式
parsed_time = datetime.strptime(time_str, '%H:%M:%S:%f')
elif re.match(r"^\d{2}:\d{2}\.\d{3}$",time_str):
# 如果失败,尝试 MM:SS.fff 格式
dt = datetime.strptime(time_str, '%M:%S.%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}\.\d{2}:\d{3}$",time_str):
# 如果失败,尝试 MM.SS:fff 格式
dt = datetime.strptime(time_str, '%M.%S:%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}\.\d{2}\.\d{3}$",time_str):
# 如果失败,尝试 MM.SS.fff 格式
dt = datetime.strptime(time_str, '%M.%S.%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}:\d{2}:\d{3}$",time_str):
# 如果失败,尝试 MM:SS:fff 格式
dt = datetime.strptime(time_str, '%M:%S:%f')
# 将小时设为0只保留分钟和秒
parsed_time = datetime.combine(dt.date(), dt.time().replace(hour=0))
elif re.match(r"^\d{2}:\d{2}:\d{2}:\d{3}$",time_str):
# 如果失败,尝试 HH:MM:SS:fff 格式
parsed_time = datetime.strptime(time_str, '%H:%M:%S:%f')
else:
logger.error(f"转换时间格式失败 {time_str}")
return parsed_time

View File

@ -3,6 +3,9 @@ import uuid
import modal
from google.genai import types
from jinja2 import Template
from langfuse._client.client import Langfuse
from langfuse.model import TextPromptClient
from BowongModalFunctions.utils.HTTPUtils import GoogleAuthUtils, FlatJsonSchemaGenerator
from ..video import downloader_image, app, config
@ -16,8 +19,8 @@ with downloader_image.imports():
from starlette import status
from BowongModalFunctions.utils.SentryUtils import SentryUtils
from BowongModalFunctions.models.media_model import MediaSource
from BowongModalFunctions.models.web_model import SentryTransactionInfo, WebhookNotify, \
GeminiFirstStageResponseModel, GeminiSecondStageResponseModel
from BowongModalFunctions.models.web_model import SentryTransactionInfo, WebhookNotify, GeminiFirstStagePromptVariables, \
GeminiSecondStagePromptVariables, GeminiFirstStageResponseModel, GeminiSecondStageResponseModel
from BowongModalFunctions.models.ffmpeg_worker_model import FFMpegSliceSegment
from BowongModalFunctions.utils.TimeUtils import TimeDelta, merge_product_data
from BowongModalFunctions.utils.VideoUtils import FFMPEGSliceOptions, VideoUtils
@ -25,7 +28,6 @@ with downloader_image.imports():
@app.function(cpu=(0.5, 64), timeout=1800,
max_containers=config.video_downloader_concurrency,
secrets=[modal.Secret.from_name("gemini-prompt")],
volumes={
config.S3_mount_dir: modal.CloudBucketMount(
bucket_name=config.S3_bucket_name,
@ -65,6 +67,11 @@ with downloader_image.imports():
"europe-west8",
]
langfuse = Langfuse(host="https://us.cloud.langfuse.com",
secret_key="sk-lf-dd20cb0b-ef2e-49f6-80f0-b2d9cff1bb11",
public_key="pk-lf-15f9d809-0bf6-4a84-ae1c-18f7a7d927c7",
tracing_enabled=False)
logger.info(f"\nmedia || {media.urn},"
f"\nstart_time || {start_time}, "
f"\nend_time || {end_time}, "
@ -80,10 +87,20 @@ with downloader_image.imports():
regions=gemini_region, access_token=google_api_key,
)
# 动态Prompt
if "IMAGE_PRODUCT_IDENTIFICATION_PROMPT" in os.environ:
IMAGE_PRODUCT_IDENTIFICATION_PROMPT = os.environ["IMAGE_PRODUCT_IDENTIFICATION_PROMPT"]
def split_prompt_config(c:dict):
return {
"temperature": c["temperature"],
"top_p": c["top_p"],
"safety_settings": c["safety_settings"]
},c
# 动态Prompt, langfuse获取失败使用默认值
IMAGE_PRODUCT_IDENTIFICATION_PROMPT = langfuse.get_prompt("Gemini自动切条/商品识别", type="text", label="latest")
if IMAGE_PRODUCT_IDENTIFICATION_PROMPT:
prompt_config = IMAGE_PRODUCT_IDENTIFICATION_PROMPT.config
first_stage_generate_config, first_stage_correct_config = split_prompt_config(prompt_config)
else:
logger.warning("🥺获取一阶段Prompt失败使用内置Prompt")
IMAGE_PRODUCT_IDENTIFICATION_PROMPT = """<prompt>
<instruction>
你是专业的商品识别专家我上传了商品图片网格需要你识别图片中的商品并与商品列表进行匹配
@ -108,32 +125,66 @@ with downloader_image.imports():
- 输出商品数量不得超过图片中的黑色边框区域数量
**商品列表-标准商品名称**
<items>
{PRODUCT_LIST}
</items>
{{PRODUCT_LIST}}
</instruction>
<output_format>
请按以下JSON格式输出
[
{{
{
"image_order": 1,
"image_name": "图片上显示的原始文字",
"matched_product": "匹配到的标准商品名称(必须与商品列表-标准商品名称一致)或null",
"match_confidence": 95,
"visual_features": {{
"visual_features": {
"color": "商品详细配色等有辨识度的色彩特征",
"pattern": "商品详细图案纹理布料等有辨识度的材质特征,材质可根据商品名以及图片综合进行判断",
"style": "商品详细款式风格等有辨识度的款式特征"
}}
}}
}
}
]
</output_format>
</prompt>"""
first_stage_correct_config = {
"temperature": 0.01,
"top_p": 0.7,
"safety_settings": [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_CIVIC_INTEGRITY",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE"
}
],
"response_mime_type": "application/json",
"response_schema": GeminiFirstStageResponseModel.model_json_schema(schema_generator=FlatJsonSchemaGenerator)
}
first_stage_generate_config = {
"temperature": first_stage_correct_config["temperature"],
"top_p": first_stage_correct_config["top_p"],
"safety_settings": first_stage_correct_config["safety_settings"]
}
if "VIDEO_TIMELINE_ANALYSIS_PROMPT" in os.environ:
VIDEO_TIMELINE_ANALYSIS_PROMPT = os.environ["VIDEO_TIMELINE_ANALYSIS_PROMPT"]
VIDEO_TIMELINE_ANALYSIS_PROMPT = langfuse.get_prompt("Gemini自动切条/视频时间点识别", type="text", label="latest")
if VIDEO_TIMELINE_ANALYSIS_PROMPT:
prompt_config = VIDEO_TIMELINE_ANALYSIS_PROMPT.config
second_stage_generate_config, second_stage_correct_config = split_prompt_config(prompt_config)
else:
logger.warning("🥺获取二阶段Prompt失败使用内置Prompt")
VIDEO_TIMELINE_ANALYSIS_PROMPT = """<prompt>
<instruction>
基于已识别的商品清单分析视频中每个商品的出现时间段
@ -176,24 +227,55 @@ with downloader_image.imports():
- 常见错误将05:23.500解释为5分钟23.5
**已识别商品清单(product-商品名 feature-商品特征 feature.color-商品详细颜色配色等色彩特征 feature.pattern-商品详细图案纹理布料等材质特征 feature.style-商品详细款式风格等有辨识度的款式特征)**
<items>
{IDENTIFIED_PRODUCTS}
</items>
{{IDENTIFIED_PRODUCTS}}
</instruction>
<output_format>
请按以下JSON格式输出确保输出格式复核JSON格式要求
[
{{
{
"product": "标准商品名称",
"timeline": [
"01:15.200 - 02:30.800 (穿着本品+介绍本品)",
"05:10.100 - 06:25.600 (穿着本品+他品)"
]
}}
}
]
</output_format>
</prompt>"""
second_stage_correct_config = {
"temperature": 0.01,
"top_p": 0.7,
"safety_settings": [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_CIVIC_INTEGRITY",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE"
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE"
}
],
"response_mime_type": "application/json",
"response_schema": GeminiSecondStageResponseModel.model_json_schema(schema_generator=FlatJsonSchemaGenerator)
}
second_stage_generate_config = {
"temperature": second_stage_correct_config["temperature"],
"top_p": second_stage_correct_config["top_p"],
"safety_settings": second_stage_correct_config["safety_settings"]
}
async def upload(file_path):
@ -215,7 +297,7 @@ with downloader_image.imports():
return upload_response.urn
def convert_json(json_like_str:str, json_model):
def convert_json(json_like_str:str, correct_config):
try:
resp, resp_code = client.generate_content(model_id="gemini-2.5-flash",
contents=[types.Content(role='user',
@ -232,35 +314,7 @@ with downloader_image.imports():
]
)
],
config=types.GenerateContentConfig(
temperature=0.01,
top_p=0.7,
safety_settings=[
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
)
],
response_mime_type="application/json",
response_schema=json_model.model_json_schema(
schema_generator=FlatJsonSchemaGenerator)
), timeout=900)
config=types.GenerateContentConfig.model_validate(correct_config), timeout=900)
except Exception as e:
logger.exception(f"😭格式化json请求失败 {e}")
return None
@ -282,9 +336,7 @@ with downloader_image.imports():
logger.warning("🥵请求负载过高, 随机更换地区重试")
return None
def parse_stage1_result(result_text):
def parse_stage1_result(result_text, correct_config):
"""解析第一阶段结果,提取已识别的商品列表"""
try:
# 清理结果文本提取JSON部分
@ -298,7 +350,7 @@ with downloader_image.imports():
try:
stage1_data = json.loads(clean_result)
except json.decoder.JSONDecodeError:
stage1_data = convert_json(clean_result, GeminiFirstStageResponseModel)
stage1_data = convert_json(clean_result, correct_config)
if not stage1_data:
raise Exception("json为空")
identified_products = []
@ -321,7 +373,6 @@ with downloader_image.imports():
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini",
name="Gemini推理", fn_id=current_function_call_id())
async def _handler(media: MediaSource,
google_api_key: str,
product_grid_list: List[str],
product_list: List[any],
start_time: str,
@ -372,9 +423,6 @@ with downloader_image.imports():
async def inference_api_first_stage():
try:
logger.info("🎈开始一阶段推理")
product_list_str = ""
for i, product in enumerate(product_title_list):
product_list_str += f"<item>{i}. {product}</item>\n"
image_parts = []
for i in product_grid_list:
image_parts.append(
@ -387,7 +435,10 @@ with downloader_image.imports():
)
image_parts.append(
types.Part.from_text(
text=IMAGE_PRODUCT_IDENTIFICATION_PROMPT.format(PRODUCT_LIST=product_list_str)
text=Template(IMAGE_PRODUCT_IDENTIFICATION_PROMPT.prompt
if isinstance(IMAGE_PRODUCT_IDENTIFICATION_PROMPT, TextPromptClient)
else IMAGE_PRODUCT_IDENTIFICATION_PROMPT).render(
PRODUCT_LIST=GeminiFirstStagePromptVariables(product_list=product_title_list).product_list_xml)
)
)
resp, resp_code = client.generate_content(model_id="gemini-2.5-flash",
@ -395,32 +446,7 @@ with downloader_image.imports():
parts=image_parts
)
],
config=types.GenerateContentConfig(
temperature=0.01,
top_p=0.7,
safety_settings=[
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
)
],
), timeout=900)
config=types.GenerateContentConfig.model_validate(first_stage_generate_config), timeout=900)
except Exception as e:
logger.exception(f"😭Gemini一阶段推理请求失败: {e}")
raise e
@ -430,7 +456,7 @@ with downloader_image.imports():
if reason == "STOP":
result_text: str = resp.candidates[0].content.parts[0].text
# 解析识别结果
identified_products = parse_stage1_result(result_text)
identified_products = parse_stage1_result(result_text, first_stage_correct_config)
return identified_products
else:
logger.error(f"😭Gemini一阶段推理失败, Reason {reason}")
@ -444,11 +470,6 @@ with downloader_image.imports():
async def inference_api_second_stage(identified_products):
try:
logger.info("🎈开始二阶段推理")
# 构建已识别商品清单字符串
identified_products_str = ""
for i, product in enumerate(identified_products):
product_info_json = json.dumps(product,ensure_ascii=False)
identified_products_str += f"<item>{i}. {product_info_json}<item>\n"
video_parts = []
video_parts.append(
types.Part(
@ -460,7 +481,10 @@ with downloader_image.imports():
)
video_parts.append(
types.Part.from_text(
text=VIDEO_TIMELINE_ANALYSIS_PROMPT.format(IDENTIFIED_PRODUCTS=identified_products_str)
text=Template(VIDEO_TIMELINE_ANALYSIS_PROMPT.prompt
if isinstance(VIDEO_TIMELINE_ANALYSIS_PROMPT, TextPromptClient)
else VIDEO_TIMELINE_ANALYSIS_PROMPT).render(
IDENTIFIED_PRODUCTS=GeminiSecondStagePromptVariables(product_json_list=identified_products).product_json_list_xml)
)
)
resp, resp_code = client.generate_content(model_id="gemini-2.5-flash",
@ -468,32 +492,7 @@ with downloader_image.imports():
parts=video_parts
)
],
config=types.GenerateContentConfig(
temperature=0.01,
top_p=0.7,
safety_settings=[
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
),
types.SafetySetting(
category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=types.HarmBlockThreshold.BLOCK_NONE,
)
],
), timeout=900)
config=types.GenerateContentConfig.model_validate(second_stage_generate_config), timeout=900)
except Exception as e:
logger.exception(f"😭Gemini二阶段推理请求失败: {e}")
raise e
@ -512,7 +511,7 @@ with downloader_image.imports():
try:
parts = json.loads(parts_text)
except json.decoder.JSONDecodeError:
parts = convert_json(parts_text, GeminiSecondStageResponseModel)
parts = convert_json(parts_text, second_stage_correct_config)
if not parts:
raise Exception("json为空")
logger.info(f"👌合并前 {json.dumps(parts, indent=4, ensure_ascii=False)}")
@ -562,5 +561,5 @@ with downloader_image.imports():
logger.exception(f"🥲推理失败, {e}")
raise Exception(f"🥲推理失败, {e}")
return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, options,
return await _handler(media, product_grid_list, product_list, start_time, end_time, options,
sentry_trace, retry_time, scale)