From e323073b5f9cb612ae4288424788b10f1b8bb2d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BA=B7=E5=AE=87=E4=BD=B3?= Date: Wed, 25 Jun 2025 19:06:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E5=88=86=E6=94=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Merge branch 'main' into cluster-gemini * FIX 修复返回值问题 * PERF 适配system instruction --------- Merge request URL: https://g-ldyi2063.coding.net/p/dev/d/modalDeploy/git/merge/4878 Co-authored-by: 康宇佳 --- .../models/requests/models.py | 40 + src/BowongModalFunctions/router/google.py | 28 +- src/cluster/video_apps/hls_slice_inference.py | 822 +++++++++++------- 3 files changed, 575 insertions(+), 315 deletions(-) diff --git a/src/BowongModalFunctions/models/requests/models.py b/src/BowongModalFunctions/models/requests/models.py index 9c16611..f61802c 100644 --- a/src/BowongModalFunctions/models/requests/models.py +++ b/src/BowongModalFunctions/models/requests/models.py @@ -1,3 +1,4 @@ +import json import uuid from typing import List, Union, Optional, Dict @@ -416,6 +417,45 @@ class GeminiRequest(BaseFFMPEGTaskRequest): raise pydantic.ValidationError("media格式读取失败") +class GeminiRequestFirstOnly(BaseFFMPEGTaskRequest): + product_cover_grid_uri_list: List[str] = Field(description="商品封面网格拼图URI列表") + product_list: List[str] = Field(description="商品名列表(时间倒序)"), + prompt_label: str = Field(default="production", description="使用的Langfuse Prompt Label") + + +class GeminiRequestSecondOnly(BaseFFMPEGTaskRequest): + media_hls_url: MediaSource = Field(default="", description="视频流录制HLS地址 hls://格式 需录制超过20分钟") + identified_product_list: List[Dict] = Field(description="第一阶段返回的识别商品及特征列表") + start_time: str = Field(default="00:00:00.000", description="开始时间(hls)") + end_time: str = Field(default="00:20:00.000", description="结束时间(hls)") + options: FFMPEGSliceOptions = Field(default=FFMPEGSliceOptions(), description="输出质量选项") + scale: float = Field(default=0.85, description="视频尺寸缩放倍率") + prompt_label: str = Field(default="production", description="使用的Langfuse Prompt Label"), + last_product_text: str = Field(default="", description="上一段视频结尾介绍的商品以及标签") + + @field_validator('media_hls_url', mode='before') + @classmethod + def parse_inputs_media_hls_url(cls, v: Union[str, MediaSource]): + if isinstance(v, str): + media_source = MediaSource.from_str(v) + if media_source.protocol == MediaProtocol.hls: + return media_source + else: + raise pydantic.ValidationError('media只支持hls格式的urn') + elif isinstance(v, MediaSource): + return v + else: + raise pydantic.ValidationError("media格式读取失败") + + @field_validator("identified_product_list", mode="before") + @classmethod + def parse_inputs_identified_product_list(cls, v: str): + if isinstance(v, str): + return json.loads(v) + else: + raise pydantic.ValidationError("identified_product_list") + + class MonitorLiveRoomProductRequest(BaseModel): cookie: str = Field(default="YOUR_COOKIE", description="用户网页版抖音Cookie/Your web version of Douyin Cookie") room_id: str = Field(default="", description="直播间room_id/Room room_id") diff --git a/src/BowongModalFunctions/router/google.py b/src/BowongModalFunctions/router/google.py index 6c66109..7699cb7 100644 --- a/src/BowongModalFunctions/router/google.py +++ b/src/BowongModalFunctions/router/google.py @@ -16,7 +16,8 @@ from starlette.responses import JSONResponse from ..models.settings.cluster import WorkerConfig from ..middleware.authorization import verify_token from ..models.responses.models import ModalTaskResponse, GeminiResultResponse, GoogleUploadResponse -from ..models.requests.models import GeminiRequest, MakeGridGeminiRequest +from ..models.requests.models import GeminiRequest, MakeGridGeminiRequest, GeminiRequestFirstOnly, \ + GeminiRequestSecondOnly from ..models.ffmpeg_tasks.models import SentryTransactionInfo from ..utils.ModalUtils import ModalUtils from ..utils.HTTPUtils import GoogleAuthUtils @@ -208,6 +209,31 @@ async def inference_gemini(data: GeminiRequest, headers: Annotated[BundleHeaders return ModalTaskResponse(success=True, taskId=fn_call.object_id) +@router.post('/inference_gemini_first_only', summary="使用Gemini推理hls视频流指定时间段的打点情况-仅第一阶段") +async def inference_gemini_first_only(data: GeminiRequestFirstOnly, + headers: Annotated[BundleHeaders, Header()], ) -> ModalTaskResponse: + google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY") + if not google_api_key: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Google API Key") + fn = modal.Function.from_name(config.modal_app_name, "video_inference_first_only", + environment_name=config.modal_environment) + fn_call = fn.spawn(google_api_key, data.product_cover_grid_uri_list, data.product_list) + return ModalTaskResponse(success=True, taskId=fn_call.object_id) + + +@router.post('/inference_gemini_second_only', summary="使用Gemini推理hls视频流指定时间段的打点情况-仅第二阶段") +async def inference_gemini_second_only(data: GeminiRequestSecondOnly, + headers: Annotated[BundleHeaders, Header()], ) -> ModalTaskResponse: + google_api_key = headers.x_google_api_key or os.environ.get("GOOGLE_API_KEY") + if not google_api_key: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Google API Key") + fn = modal.Function.from_name(config.modal_app_name, "video_inference_second_only", + environment_name=config.modal_environment) + fn_call = fn.spawn(data.media_hls_url, google_api_key, data.identified_product_list, data.start_time, + data.end_time, data.scale, data.options, data.prompt_label, data.last_product_text) + return ModalTaskResponse(success=True, taskId=fn_call.object_id) + + @router.get("/inference_gemini/{task_id}", summary="查询Gemini推理hls视频流指定时间段的打点任务") async def gemini_status(task_id: str, response: Response) -> GeminiResultResponse: task_info = await ModalUtils.get_modal_task_status(task_id) diff --git a/src/cluster/video_apps/hls_slice_inference.py b/src/cluster/video_apps/hls_slice_inference.py index 18f1f38..c296643 100644 --- a/src/cluster/video_apps/hls_slice_inference.py +++ b/src/cluster/video_apps/hls_slice_inference.py @@ -10,102 +10,176 @@ from langfuse.model import TextPromptClient from BowongModalFunctions.utils.HTTPUtils import GoogleAuthUtils, FlatJsonSchemaGenerator from ..video import downloader_image, app, config -with downloader_image.imports(): +with (downloader_image.imports()): import os, json - from typing import List + from typing import List, Dict, Tuple, Any from loguru import logger from modal import current_function_call_id from fastapi import HTTPException from starlette import status from BowongModalFunctions.utils.SentryUtils import SentryUtils from BowongModalFunctions.models.cache_tasks.models import MediaSource, GeminiFirstStageResponseModel, \ - GeminiFirstStagePromptVariables, GeminiSecondStagePromptVariables + GeminiFirstStagePromptVariables, GeminiSecondStagePromptVariables from BowongModalFunctions.models.responses.models import GeminiSecondStageResponseModel from BowongModalFunctions.models.ffmpeg_tasks.models import FFMpegSliceSegment, FFMPEGSliceOptions, \ - SentryTransactionInfo, WebhookNotify + SentryTransactionInfo, WebhookNotify from BowongModalFunctions.utils.TimeUtils import TimeDelta, merge_product_data from BowongModalFunctions.utils.VideoUtils import VideoUtils + # Gemini可用区 + gemini_region = [ + # "us-central1", + "us-east1", + "us-east5", + "us-west1", + "us-south1", + "europe-central2", + "europe-north1", + "europe-west1", + "europe-west4", + "europe-west8", + ] - @app.function(cpu=(0.5, 64), timeout=1800, - max_containers=config.video_downloader_concurrency, - volumes={ - config.S3_mount_dir: modal.CloudBucketMount( - bucket_name=config.S3_bucket_name, - secret=modal.Secret.from_name("aws-s3-secret", environment_name=config.modal_environment), - ), - }, - region="us", - cloud='gcp' - ) - @modal.concurrent(max_inputs=1) - async def video_hls_slice_inference(media: MediaSource, - google_api_key: str, - product_grid_list: List[str], - product_list: List[any], - start_time: str, - end_time: str, - options: FFMPEGSliceOptions, - sentry_trace: SentryTransactionInfo, - webhook: WebhookNotify = None, - retry_time: int = 3, - scale:float = 0.9, - last_product_text="" - ): + + async def upload(file_path: str, + google_api_key: str) -> str: + """ + 上传视频到gcs + """ + with open(file_path, "rb") as video: + video = video.read() + content_length = len(video) + content_type = f"video/{file_path.split('.')[-1]}" + filename = ".".join([str(uuid.uuid4()), file_path.split(".")[-1]]) + if content_type not in ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg', + 'video/webm', 'video/wmv', 'video/3gpp']: + raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) logger.info( - f"region {os.environ.get('MODAL_REGION', 'unknown')}, provider {os.environ.get('MODAL_CLOUD_PROVIDER', 'unknown')}") + f"Uploading name = {filename}, size = {content_length}, type = {content_type} to google file") + upload_response = await GoogleAuthUtils.google_upload_file(file_stream=video, + content_type=content_type, + google_api_key=google_api_key, + bucket_name="dy-media-storage", + filename=f"videos/{filename}", ) - # Gemini可用区 - gemini_region = [ - #"us-central1", - "us-east1", - "us-east5", - "us-west1", - "us-south1", - "europe-central2", - "europe-north1", - "europe-west1", - "europe-west4", - "europe-west8", - ] + return upload_response.urn + + def convert_json(client: GoogleAuthUtils.GoogleGenaiClient, + json_like_str: str, + correct_config) -> Any | None: + """ + 调用模型转换非标准json为json + """ + try: + resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", + contents=[types.Content(role='user', + parts=[ + types.Part.from_text( + text="" + "" + "请格式化以下一段非标准json格式字符串为json标准格式 \n{0}" + "" + "".format( + json_like_str + ) + ) + ] + ) + ], + config=types.GenerateContentConfig.model_validate(correct_config), + timeout=900) + except Exception as e: + logger.exception(f"😭格式化json请求失败 {e}") + return None + + if resp_code == 200: + reason = resp.candidates[0].finish_reason + logger.info("😊格式化json用量信息" + resp.usage_metadata.model_dump_json(indent=2)) + if reason == "STOP": + result_text: str = resp.candidates[0].content.parts[0].text.replace("\n", "").replace("\\n", + "").replace("\\", + "") + # 解析识别结果 + result_json = json.loads(result_text) + return result_json + else: + logger.error(f"😭格式化json推理失败, Reason {reason}") + return None + else: + logger.error(f"😭格式化json推理失败, 状态码{resp_code}") + if resp_code == 429: + logger.warning("🥵请求负载过高, 随机更换地区重试") + return None + + + def parse_stage1_result(client: GoogleAuthUtils.GoogleGenaiClient, + result_text: str, + correct_config) -> List[Dict[str, Any]]: + """ + 解析第一阶段结果,提取已识别的商品列表 + """ + try: + # 清理结果文本,提取JSON部分 + clean_result = result_text.strip() + if clean_result.startswith('```') and clean_result.endswith('```'): + clean_result = clean_result[3:-3].strip() + if clean_result.startswith('json'): + clean_result = clean_result[4:].strip() + + # 解析JSON + try: + stage1_data = json.loads(clean_result) + except json.decoder.JSONDecodeError: + stage1_data = convert_json(client, clean_result, correct_config) + if not stage1_data: + raise Exception("json为空") + identified_products = [] + if isinstance(stage1_data, list): + for item in stage1_data: + if isinstance(item, dict) and 'matched_product' in item: + matched_product = item['matched_product'] + if matched_product and matched_product != "null": + identified_products.append({"product": matched_product, "feature": item["visual_features"]}) + + logger.info(f"一阶段解析结果: {len(identified_products)} 个有效商品") + return identified_products + + except Exception as e: + logger.exception(f"一阶段结果解析失败 {str(e)}") + raise e + + + def split_prompt_config(c: dict) -> Tuple[Dict, Dict]: + """ + 拆分prompt配置 + """ + return { + "temperature": c["temperature"], + "top_p": c["top_p"], + "safety_settings": c["safety_settings"], + "system_instruction": c["system_instruction"] + }, { + "temperature": c["temperature"], + "top_p": c["top_p"], + "response_mime_type": c["response_mime_type"], + "response_schema": c["response_schema"], + "safety_settings": c["safety_settings"] + } + + + def get_prompt(first_label="production", second_label="production") -> Tuple[Tuple, Tuple]: + """ + 从langfuse获取prompt + """ langfuse = Langfuse(host="https://us.cloud.langfuse.com", secret_key="sk-lf-dd20cb0b-ef2e-49f6-80f0-b2d9cff1bb11", public_key="pk-lf-15f9d809-0bf6-4a84-ae1c-18f7a7d927c7", tracing_enabled=False) - logger.info(f"\nmedia || {media.urn}," - f"\nstart_time || {start_time}, " - f"\nend_time || {end_time}, " - f"\nproduct_grid_list || {json.dumps(product_grid_list, ensure_ascii=False, indent=2)}, " - f"\nproduct_list || {json.dumps(product_list, ensure_ascii=False, indent=2)}" - f"\noptions || {options.model_dump_json()}, " - f"\nscale || {scale}" - f"\nlast_product_text || {last_product_text}") - - client = GoogleAuthUtils.GoogleGenaiClient( - cloudflare_project_id="67720b647ff2b55cf37ba3ef9e677083", - cloudflare_gateway_id="bowong-dev", - google_project_id="gen-lang-client-0413414134", - regions=gemini_region, access_token=google_api_key, - ) - - def split_prompt_config(c:dict): - return { - "temperature": c["temperature"], - "top_p": c["top_p"], - "safety_settings": c["safety_settings"], - "system_instruction": c["system_instruction"] - },{ - "temperature": c["temperature"], - "top_p": c["top_p"], - "response_mime_type": c["response_mime_type"], - "response_schema": c["response_schema"], - "safety_settings": c["safety_settings"] - } - # 动态Prompt, langfuse获取失败使用默认值 - IMAGE_PRODUCT_IDENTIFICATION_PROMPT = langfuse.get_prompt("Gemini自动切条/商品识别", type="text", label="production") + IMAGE_PRODUCT_IDENTIFICATION_PROMPT = langfuse.get_prompt("Gemini自动切条/商品识别", type="text", + label=first_label) if IMAGE_PRODUCT_IDENTIFICATION_PROMPT: prompt_config = IMAGE_PRODUCT_IDENTIFICATION_PROMPT.config first_stage_generate_config, first_stage_correct_config = split_prompt_config(prompt_config) @@ -114,30 +188,30 @@ with downloader_image.imports(): IMAGE_PRODUCT_IDENTIFICATION_PROMPT = """ 你是专业的商品识别专家。我上传了商品图片网格,需要你识别图片中的商品并与商品列表进行匹配。 - + **输入材料**: - 🖼️ **商品图片网格**:包含多个黑色边框区域,每个区域内有商品图片+商品名称文字 - 📋 **商品列表**:标准商品名称参考清单 - + **Token自动管理** 适时记录思考token使用量,根据任务复杂程度自动调整思考token的使用量(不允许超过上限65535),任务复杂时使用摘要思考模式,仅记录决策结果不记录决策详细过程。 - + **核心任务**: 1. **扫描黑色边框区域**:从左上角开始,按行扫描每个黑色边框区域 2. **提取文字信息**:精确提取每个区域内的所有文字信息 3. **与商品列表匹配**:将图片文字与商品列表进行高相似度匹配 4. **提取商品图片特征**:从商品图片提取详细有辨识度可以用于商品辨认任务的特征,包括颜色、图案、纹理、材质、版型、款式等详细特征 - + **严格约束**: - 🚫 只识别有黑色边框包围的商品区域 - 🚫 每个商品必须有清晰可见的文字标注 - 🚫 不得推测或添加图片中不存在的商品 - ✅ 输出商品数量不得超过图片中的黑色边框区域数量 - + **商品列表-标准商品名称**: {{PRODUCT_LIST}} - + 请按以下JSON格式输出: [ @@ -181,13 +255,14 @@ with downloader_image.imports(): } ], "response_mime_type": "application/json", - "response_schema": GeminiFirstStageResponseModel.model_json_schema(schema_generator=FlatJsonSchemaGenerator) + "response_schema": GeminiFirstStageResponseModel.model_json_schema( + schema_generator=FlatJsonSchemaGenerator) } first_stage_generate_config = { "temperature": first_stage_correct_config["temperature"], "top_p": first_stage_correct_config["top_p"], "safety_settings": first_stage_correct_config["safety_settings"], - "system_instruction":{ + "system_instruction": { "parts": [ { "text": "你是商品识别专家,任务是从商品网格图片中精准识别商品、提取特征,输出为用户定义的json格式" @@ -196,7 +271,8 @@ with downloader_image.imports(): } } - VIDEO_TIMELINE_ANALYSIS_PROMPT = langfuse.get_prompt("Gemini自动切条/视频时间点识别", type="text", label="production") + VIDEO_TIMELINE_ANALYSIS_PROMPT = langfuse.get_prompt("Gemini自动切条/视频时间点识别", type="text", + label=second_label) if VIDEO_TIMELINE_ANALYSIS_PROMPT: prompt_config = VIDEO_TIMELINE_ANALYSIS_PROMPT.config second_stage_generate_config, second_stage_correct_config = split_prompt_config(prompt_config) @@ -205,14 +281,14 @@ with downloader_image.imports(): VIDEO_TIMELINE_ANALYSIS_PROMPT = """ 基于已识别的商品清单,分析视频中每个商品的出现时间段。 - + **输入材料**: - 📹 **视频**:直播带货片段 - 📋 **已识别商品清单**:第一阶段确认的商品列表及商品特征 - + **Token自动管理** 适时记录思考token使用量,根据任务复杂程度自动调整思考token的使用量(不允许超过上限65535),任务复杂时使用摘要思考模式,仅记录决策结果不记录决策详细过程。 - + **商品匹配逻辑示例** - 🔄 示例: * 📋 材质一致性: 商品名称中"100纯棉" ↔ 视频质感"棉质" ✅一致 @@ -225,28 +301,28 @@ with downloader_image.imports(): * 例如:同样的鱼图案,一个是"紫色鱼+黄色底",另一个是"红色鱼+蓝色底",这是**相同商品的不同配色** * 例如:同样的条纹图案,一个是"黑白条纹",另一个是"蓝白条纹",这是**相同商品的不同配色** * 例如:同样为衬衫,一个是"棉质黑色","一个为丝质米色",这**不是同款,是不同的商品** - + **分析任务**: 1. **时间段识别**:根据匹配逻辑对视频画面逐一匹配找出每个商品在视频中的展示时间,不要遗漏相同商品的不同配色 2. **去重原则**:视频画面、语音等都要综合考虑得出置信度值, 如果同一时间段内展示的同一商品可能对应商品列表中的多个商品,只选取置信度值最高的一个 2. **内容类型分类**:判断每个时间段的具体内容类型(必须为以下三种内容类型之一:穿着本品+介绍本品、穿着本品+他品、穿着本品+无关) 3. **时间段合并**:合并连续或相近的时间段,去除合并后仍小于5秒的时间段 - + **内容类型**: - (穿着本品+介绍本品) - 主播穿着该商品或页面主体为该商品且正在介绍该商品本身,主播话语里正在出现该商品相关描述语句或手指向或眼睛看向该商品 - (穿着本品+他品) - 主播穿着该商品或页面主体为该商品但正在介绍其他商品,主播话语里正在出现其他商品相关描述语句或手指向或眼睛看向其他商品 - (穿着本品+无关) - 主播穿着该商品或页面主体为该商品但在做无关商品的事情,主播话语里正在出现与商品无关的语句或者做换衣服等无关商品介绍的动作 - + **时间格式标准**: - 必须严格使用格式:MM:SS.mmm - MM:SS.mmm - MM是分钟(00-59),SS是秒(00-59),mmm是毫秒(000-999) - 示例:05:23.500 表示5分钟23.5秒500毫秒 - 常见错误:将05:23.500解释为5分钟23.5秒 - + **已识别商品清单(product-商品名 feature-商品特征 feature.color-商品详细颜色配色等色彩特征 feature.pattern-商品详细图案纹理布料等材质特征 feature.style-商品详细款式风格等有辨识度的款式特征)**: {{IDENTIFIED_PRODUCTS}} - + 请按以下JSON格式输出,确保输出格式复核JSON格式要求: [ @@ -286,13 +362,14 @@ with downloader_image.imports(): } ], "response_mime_type": "application/json", - "response_schema": GeminiSecondStageResponseModel.model_json_schema(schema_generator=FlatJsonSchemaGenerator) + "response_schema": GeminiSecondStageResponseModel.model_json_schema( + schema_generator=FlatJsonSchemaGenerator) } second_stage_generate_config = { "temperature": second_stage_correct_config["temperature"], "top_p": second_stage_correct_config["top_p"], "safety_settings": second_stage_correct_config["safety_settings"], - "system_instruction":{ + "system_instruction": { "parts": [ { "text": "你是直播带货视频识别专家,任务是根据商品特征从视频中高效精准地识别出视频中每个商品出现的时间段以及内容类型,输出为用户定义的json格式" @@ -301,97 +378,318 @@ with downloader_image.imports(): } } + return (IMAGE_PRODUCT_IDENTIFICATION_PROMPT, first_stage_generate_config, first_stage_correct_config), \ + (VIDEO_TIMELINE_ANALYSIS_PROMPT, second_stage_generate_config, second_stage_correct_config) - async def upload(file_path): - with open(file_path, "rb") as video: - video = video.read() - content_length = len(video) - content_type = f"video/{file_path.split('.')[-1]}" - filename = ".".join([str(uuid.uuid4()), file_path.split(".")[-1]]) - if content_type not in ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg', - 'video/webm', 'video/wmv', 'video/3gpp']: - raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) - logger.info( - f"Uploading name = {filename}, size = {content_length}, type = {content_type} to google file") - upload_response = await GoogleAuthUtils.google_upload_file(file_stream=video, - content_type=content_type, - google_api_key=google_api_key, - bucket_name="dy-media-storage", - filename=f"videos/{filename}",) - return upload_response.urn - - def convert_json(json_like_str:str, correct_config): - try: - resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", - contents=[types.Content(role='user', - parts=[ - types.Part.from_text( - text="" - "" - "格式化以下一段有问题的json, json可能存在格式错误字段错误以及部分缺失,修复后输出,若product字段中存在双引号等需转义字符需要转义 \n{0}" - "" - "".format( - json_like_str - ) - ) - ] - ) - ], - config=types.GenerateContentConfig.model_validate(correct_config), timeout=900) - except Exception as e: - logger.exception(f"😭格式化json请求失败 {e}") - return None - - if resp_code == 200: - reason = resp.candidates[0].finish_reason - logger.info("😊格式化json用量信息" + resp.usage_metadata.model_dump_json(indent=2)) - if reason == "STOP": - result_text: str = resp.candidates[0].content.parts[0].text.replace("\n","").replace("\\n", "").replace("\\", "") - # 解析识别结果 - result_json = json.loads(result_text) - return result_json - else: - logger.error(f"😭格式化json推理失败, Reason {reason}") - return None - else: - logger.error(f"😭格式化json推理失败, 状态码{resp_code}") - if resp_code == 429: - logger.warning("🥵请求负载过高, 随机更换地区重试") - return None - - def parse_stage1_result(result_text, correct_config): - """解析第一阶段结果,提取已识别的商品列表""" - try: - # 清理结果文本,提取JSON部分 - clean_result = result_text.strip() - if clean_result.startswith('```') and clean_result.endswith('```'): - clean_result = clean_result[3:-3].strip() - if clean_result.startswith('json'): - clean_result = clean_result[4:].strip() - - # 解析JSON - try: - stage1_data = json.loads(clean_result) - except json.decoder.JSONDecodeError: - stage1_data = convert_json(clean_result, correct_config) - if not stage1_data: - raise Exception("json为空") - identified_products = [] - if isinstance(stage1_data, list): - for item in stage1_data: - if isinstance(item, dict) and 'matched_product' in item: - matched_product = item['matched_product'] - if matched_product and matched_product != "null": - identified_products.append({"product": matched_product, "feature": item["visual_features"]}) - - logger.info(f"一阶段解析结果: {len(identified_products)} 个有效商品") + async def inference_api_first_stage(client: GoogleAuthUtils.GoogleGenaiClient, + product_grid_list: list, + product_list: list, + IMAGE_PRODUCT_IDENTIFICATION_PROMPT: str, + first_stage_generate_config, + first_stage_correct_config) -> List[Dict[str, Any]] | None: + """ + 一阶段推理 返回识别出的商品信息 + """ + try: + logger.info("🎈开始一阶段推理") + image_parts = [] + for i in product_grid_list: + image_parts.append( + types.Part( + file_data=types.FileData( + mime_type="image/jpeg", + file_uri=f"{i}" + ) + ) + ) + image_parts.append( + types.Part.from_text( + text=Template(IMAGE_PRODUCT_IDENTIFICATION_PROMPT.prompt + if isinstance(IMAGE_PRODUCT_IDENTIFICATION_PROMPT, TextPromptClient) + else IMAGE_PRODUCT_IDENTIFICATION_PROMPT).render( + PRODUCT_LIST=GeminiFirstStagePromptVariables(product_list=product_list).product_list_xml) + ) + ) + resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", + contents=[types.Content(role='user', + parts=image_parts + ) + ], + config=types.GenerateContentConfig.model_validate( + first_stage_generate_config), timeout=900) + except Exception as e: + logger.exception(f"😭Gemini一阶段推理请求失败: {e}") + raise e + if resp_code == 200: + reason = resp.candidates[0].finish_reason + logger.info("😊一阶段用量信息" + resp.usage_metadata.model_dump_json(indent=2)) + if reason == "STOP": + result_text: str = resp.candidates[0].content.parts[0].text + # 解析识别结果 + identified_products = parse_stage1_result(client, result_text, first_stage_correct_config) return identified_products + else: + logger.error(f"😭Gemini一阶段推理失败, Reason {reason}") + return None + else: + logger.error(f"😭Gemini一阶段推理失败, 状态码{resp_code}") + if resp_code == 429: + logger.warning("🥵请求负载过高, 随机更换地区重试") + return None - except Exception as e: - logger.exception(f"一阶段结果解析失败 {str(e)}") - raise e + async def inference_api_second_stage(client: GoogleAuthUtils.GoogleGenaiClient, + identified_products: list, + video_gemini_uri: str, + VIDEO_TIMELINE_ANALYSIS_PROMPT: str, + second_stage_generate_config, + second_stage_correct_config, + last_product_text: str, + start_time: str, + end_time: str) -> List[Any] | None | Any: + """ + 二阶段推理 返回识别出的时间段信息 + """ + try: + logger.info("🎈开始二阶段推理") + video_parts = [] + video_parts.append( + types.Part( + file_data=types.FileData( + mime_type="video/mp4", + file_uri=f"{video_gemini_uri}" + ) + ) + ) + video_parts.append( + types.Part.from_text( + text=Template(VIDEO_TIMELINE_ANALYSIS_PROMPT.prompt + if isinstance(VIDEO_TIMELINE_ANALYSIS_PROMPT, TextPromptClient) + else VIDEO_TIMELINE_ANALYSIS_PROMPT).render( + IDENTIFIED_PRODUCTS=GeminiSecondStagePromptVariables( + product_json_list=identified_products).product_json_list_xml, + LAST_PRODUCT_TEXT=last_product_text) + ) + ) + resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", + contents=[types.Content(role='user', + parts=video_parts + ) + ], + config=types.GenerateContentConfig.model_validate( + second_stage_generate_config), timeout=900) + except Exception as e: + logger.exception(f"😭Gemini二阶段推理请求失败: {e}") + raise e + if resp_code == 200: + reason = resp.candidates[0].finish_reason + logger.info("😊二阶段用量信息" + resp.usage_metadata.model_dump_json(indent=2)) + if reason == "STOP": + result_text: str = resp.candidates[0].content.parts[0].text + if len(result_text) > 10: + if result_text.startswith("```") and result_text.endswith("```"): + parts_text = result_text.split("```")[-2].replace("json", "").replace("\n", "").replace("\\", + "").replace( + "\\n", "") + else: + parts_text = result_text.replace("json", "").replace("\n", + "").replace( + "\\", "").replace("\\n", "") + try: + parts = json.loads(parts_text) + except json.decoder.JSONDecodeError: + parts = convert_json(client, parts_text, second_stage_correct_config) + if not parts: + raise Exception("json为空") + logger.info(f"👌合并前 {json.dumps(parts, indent=4, ensure_ascii=False)}") + # 合并产品和时间线 + parts = merge_product_data(parts, start_time, end_time, merge_diff=5) + for part in parts: + part["product"] = re.sub(r'^\x20*\d+\.\x20*', '', part["product"]) + return parts + else: + return [] + else: + logger.error(f"😭Gemini二阶段推理失败, Reason {reason}") + return None + else: + logger.error(f"😭Gemini二阶段推理失败, 状态码{resp_code}") + if resp_code == 429: + logger.warning("🥵请求负载过高, 随机更换地区重试") + return None + + + @app.function(cpu=(0.5, 64), timeout=1800, + max_containers=config.video_downloader_concurrency, + volumes={ + config.S3_mount_dir: modal.CloudBucketMount( + bucket_name=config.S3_bucket_name, + secret=modal.Secret.from_name("aws-s3-secret", environment_name=config.modal_environment), + ), + }, + region="us", + cloud='gcp') + @modal.concurrent(max_inputs=1) + async def video_inference_first_only(google_api_key: str, + product_grid_list: List[str], + product_list: List[any], + prompt_label: str = "production", + retry_time: int = 3): + """ + 测试用:单独第一阶段推理 + """ + client = GoogleAuthUtils.GoogleGenaiClient( + cloudflare_project_id="67720b647ff2b55cf37ba3ef9e677083", + cloudflare_gateway_id="bowong-dev", + google_project_id="gen-lang-client-0413414134", + regions=gemini_region, access_token=google_api_key, + ) + first_stage_config, _ = get_prompt(first_label=prompt_label) + IMAGE_PRODUCT_IDENTIFICATION_PROMPT, first_stage_generate_config, first_stage_correct_config = first_stage_config + # 一阶段推理 + identified_product_list = None + first_retry_time = retry_time + while identified_product_list is None and first_retry_time > 0: + identified_product_list = await inference_api_first_stage(client, product_grid_list, product_list, + IMAGE_PRODUCT_IDENTIFICATION_PROMPT, + first_stage_generate_config, + first_stage_correct_config) + first_retry_time -= 1 + if identified_product_list is None: + raise Exception("😭一阶段推理失败") + else: + logger.info( + f"🥳一阶段推理完成JSON \n{json.dumps(identified_product_list, indent=4, ensure_ascii=False)}") + logger.success("🎉推理完成") + return identified_product_list, None + + + @app.function(cpu=(0.5, 64), timeout=1800, + max_containers=config.video_downloader_concurrency, + volumes={ + config.S3_mount_dir: modal.CloudBucketMount( + bucket_name=config.S3_bucket_name, + secret=modal.Secret.from_name("aws-s3-secret", environment_name=config.modal_environment), + ), + }, + region="us", + cloud='gcp') + @modal.concurrent(max_inputs=1) + async def video_inference_second_only(media: MediaSource, + google_api_key: str, + identified_product_list: list, + start_time: str, + end_time: str, + scale: float, + options: FFMPEGSliceOptions, + prompt_label: str = "production", + last_product_text: str = "", + retry_time: int = 3): + """ + 测试用方法:单独第二阶段推理 + """ + client = GoogleAuthUtils.GoogleGenaiClient( + cloudflare_project_id="67720b647ff2b55cf37ba3ef9e677083", + cloudflare_gateway_id="bowong-dev", + google_project_id="gen-lang-client-0413414134", + regions=gemini_region, access_token=google_api_key, + ) + # 1、截取指定视频 + logger.info("1、开始截取指定视频") + # 计算缩放尺寸 + metadata = VideoUtils.ffprobe_media_metadata(media.path) + width = int(metadata.streams[0].width * scale) + height = int(metadata.streams[0].height * scale) + if width % 2 == 1: + width = width + 1 + if height % 2 == 1: + height = height + 1 + options.width = width + options.height = height + slice_fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_slice_media", + environment_name=config.modal_environment) + slice_result, sentry_trace = await slice_fn.remote.aio(media, [FFMpegSliceSegment( + start=TimeDelta.from_format_string(start_time), end=TimeDelta.from_format_string(end_time))], + options) + video = MediaSource.from_str(slice_result[0].urn) + logger.success("截取完成") + video_path = os.path.join(config.S3_mount_dir, video.path) + + # 2、上传到gemini + logger.info("2、视频文件开始上传到Gemini") + video_gemini_uri = await upload(video_path, google_api_key) + _, sencond_stage_config = get_prompt(second_label=prompt_label) + VIDEO_TIMELINE_ANALYSIS_PROMPT, second_stage_generate_config, second_stage_correct_config = sencond_stage_config + # 3、二阶段推理 + product_timeline_info = None + second_retry_time = retry_time + while product_timeline_info is None and second_retry_time > 0: + product_timeline_info = await inference_api_second_stage(client, identified_product_list, + video_gemini_uri, + VIDEO_TIMELINE_ANALYSIS_PROMPT, + second_stage_generate_config, + second_stage_correct_config, + last_product_text, + start_time, end_time) + second_retry_time -= 1 + if product_timeline_info is None: + raise Exception("😭二阶段推理失败") + else: + logger.info( + f"🥳二阶段推理完成JSON \n{json.dumps(product_timeline_info, indent=4, ensure_ascii=False)}") + logger.success("🎉推理完成") + + return product_timeline_info, None + + + @app.function(cpu=(0.5, 64), timeout=1800, + max_containers=config.video_downloader_concurrency, + volumes={ + config.S3_mount_dir: modal.CloudBucketMount( + bucket_name=config.S3_bucket_name, + secret=modal.Secret.from_name("aws-s3-secret", environment_name=config.modal_environment), + ), + }, + region="us", + cloud='gcp' + ) + @modal.concurrent(max_inputs=1) + async def video_hls_slice_inference(media: MediaSource, + google_api_key: str, + product_grid_list: List[str], + product_list: List[any], + start_time: str, + end_time: str, + options: FFMPEGSliceOptions, + sentry_trace: SentryTransactionInfo, + webhook: WebhookNotify = None, + retry_time: int = 3, + scale: float = 0.9, + last_product_text="" + ): + logger.info( + f"region {os.environ.get('MODAL_REGION', 'unknown')}, provider {os.environ.get('MODAL_CLOUD_PROVIDER', 'unknown')}") + + client = GoogleAuthUtils.GoogleGenaiClient( + cloudflare_project_id="67720b647ff2b55cf37ba3ef9e677083", + cloudflare_gateway_id="bowong-dev", + google_project_id="gen-lang-client-0413414134", + regions=gemini_region, access_token=google_api_key, + ) + + logger.info(f"\nmedia || {media.urn}," + f"\nstart_time || {start_time}, " + f"\nend_time || {end_time}, " + f"\nproduct_grid_list || {json.dumps(product_grid_list, ensure_ascii=False, indent=2)}, " + f"\nproduct_list || {json.dumps(product_list, ensure_ascii=False, indent=2)}" + f"\noptions || {options.model_dump_json()}, " + f"\nscale || {scale}" + f"\nlast_product_text || {last_product_text}") + + first_stage_config, sencond_stage_config = get_prompt() + IMAGE_PRODUCT_IDENTIFICATION_PROMPT, first_stage_generate_config, first_stage_correct_config = first_stage_config + VIDEO_TIMELINE_ANALYSIS_PROMPT, second_stage_generate_config, second_stage_correct_config = sencond_stage_config @SentryUtils.webhook_handler(webhook, current_function_call_id()) @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini", @@ -404,17 +702,13 @@ with downloader_image.imports(): options: FFMPEGSliceOptions, sentry_trace: SentryTransactionInfo, retry_time: int = 3, - scale:float = 0.9): - + scale: float = 0.9): + if len(product_list) == 0 or len(product_grid_list) == 0: + logger.error("商品列表为空, 退出推理") + raise Exception("商品列表为空, 退出推理") try: - # 1、首先获取全量商品列表 - logger.info("1、获取直播间商品列表") - if len(product_list) == 0: - logger.error("商品列表为空, 退出推理") - raise Exception("商品列表为空, 退出推理") - - # 2、切20分钟的条 - logger.info("2、开始截取指定视频") + # 1、截取指定视频 + logger.info("1、开始截取指定视频") # 计算缩放尺寸 metadata = VideoUtils.ffprobe_media_metadata(media.path) width = int(metadata.streams[0].width * scale) @@ -428,151 +722,51 @@ with downloader_image.imports(): slice_fn = modal.Function.from_name(config.modal_app_name, "ffmpeg_slice_media", environment_name=config.modal_environment) slice_result, sentry_trace = await slice_fn.remote.aio(media, [FFMpegSliceSegment( - start=TimeDelta.from_format_string(start_time), end=TimeDelta.from_format_string(end_time))], options, + start=TimeDelta.from_format_string(start_time), end=TimeDelta.from_format_string(end_time))], + options, sentry_trace) video = MediaSource.from_str(slice_result[0].urn) logger.success("截取完成") video_path = os.path.join(config.S3_mount_dir, video.path) - # 3、上传到gemini - logger.info("3、视频文件开始上传到Gemini") - video_gemini = await upload(video_path) - - async def inference_api_first_stage(): - try: - logger.info("🎈开始一阶段推理") - image_parts = [] - for i in product_grid_list: - image_parts.append( - types.Part( - file_data=types.FileData( - mime_type="image/jpeg", - file_uri=f"{i}" - ) - ) - ) - image_parts.append( - types.Part.from_text( - text=Template(IMAGE_PRODUCT_IDENTIFICATION_PROMPT.prompt - if isinstance(IMAGE_PRODUCT_IDENTIFICATION_PROMPT, TextPromptClient) - else IMAGE_PRODUCT_IDENTIFICATION_PROMPT).render( - PRODUCT_LIST=GeminiFirstStagePromptVariables(product_list=product_list).product_list_xml) - ) - ) - resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", - contents=[types.Content(role='user', - parts=image_parts - ) - ], - config=types.GenerateContentConfig.model_validate(first_stage_generate_config), timeout=900) - except Exception as e: - logger.exception(f"😭Gemini一阶段推理请求失败: {e}") - raise e - if resp_code == 200: - reason = resp.candidates[0].finish_reason - logger.info("😊一阶段用量信息"+resp.usage_metadata.model_dump_json(indent=2)) - if reason == "STOP": - result_text: str = resp.candidates[0].content.parts[0].text - # 解析识别结果 - identified_products = parse_stage1_result(result_text, first_stage_correct_config) - return identified_products - else: - logger.error(f"😭Gemini一阶段推理失败, Reason {reason}") - return None - else: - logger.error(f"😭Gemini一阶段推理失败, 状态码{resp_code}") - if resp_code == 429: - logger.warning("🥵请求负载过高, 随机更换地区重试") - return None - - async def inference_api_second_stage(identified_products): - try: - logger.info("🎈开始二阶段推理") - video_parts = [] - video_parts.append( - types.Part( - file_data=types.FileData( - mime_type="video/mp4", - file_uri=f"{video_gemini}" - ) - ) - ) - video_parts.append( - types.Part.from_text( - text=Template(VIDEO_TIMELINE_ANALYSIS_PROMPT.prompt - if isinstance(VIDEO_TIMELINE_ANALYSIS_PROMPT, TextPromptClient) - else VIDEO_TIMELINE_ANALYSIS_PROMPT).render( - IDENTIFIED_PRODUCTS=GeminiSecondStagePromptVariables(product_json_list=identified_products).product_json_list_xml, - LAST_PRODUCT_TEXT=last_product_text) - ) - ) - resp, resp_code = client.generate_content(model_id="gemini-2.5-flash", - contents=[types.Content(role='user', - parts=video_parts - ) - ], - config=types.GenerateContentConfig.model_validate(second_stage_generate_config), timeout=900) - except Exception as e: - logger.exception(f"😭Gemini二阶段推理请求失败: {e}") - raise e - if resp_code == 200: - reason = resp.candidates[0].finish_reason - logger.info("😊二阶段用量信息"+resp.usage_metadata.model_dump_json(indent=2)) - if reason == "STOP": - result_text: str = resp.candidates[0].content.parts[0].text - if len(result_text)>10: - if result_text.startswith("```") and result_text.endswith("```"): - parts_text = result_text.split("```")[-2].replace("json","").replace("\n","").replace("\\", "").replace("\\n", "") - else: - parts_text = result_text.replace("json", "").replace("\n", - "").replace( - "\\", "").replace("\\n", "") - try: - parts = json.loads(parts_text) - except json.decoder.JSONDecodeError: - parts = convert_json(parts_text, second_stage_correct_config) - if not parts: - raise Exception("json为空") - logger.info(f"👌合并前 {json.dumps(parts, indent=4, ensure_ascii=False)}") - # 合并产品和时间线 - parts = merge_product_data(parts, start_time, end_time, merge_diff=5) - for part in parts: - part["product"] = re.sub(r'^\x20*\d+\.\x20*', '', part["product"]) - return parts - else: - return [] - else: - logger.error(f"😭Gemini二阶段推理失败, Reason {reason}") - return None - else: - logger.error(f"😭Gemini二阶段推理失败, 状态码{resp_code}") - if resp_code == 429: - logger.warning("🥵请求负载过高, 随机更换地区重试") - return None - - logger.info("4、发起Gemini推理") + # 2、上传到gemini + logger.info("2、视频文件开始上传到Gemini") + video_gemini_uri = await upload(video_path, google_api_key) + # 3、发起Gemini推理 + logger.info("3、发起Gemini推理") # 一阶段推理 - product_info = None + identified_product_list = None first_retry_time = retry_time - while product_info is None and first_retry_time > 0: - product_info = await inference_api_first_stage() + while identified_product_list is None and first_retry_time > 0: + identified_product_list = await inference_api_first_stage(client, product_grid_list, product_list, + IMAGE_PRODUCT_IDENTIFICATION_PROMPT, + first_stage_generate_config, + first_stage_correct_config) first_retry_time -= 1 - if product_info is None: + if identified_product_list is None: raise Exception("😭一阶段推理失败") else: - logger.info(f"🥳一阶段推理完成JSON \n{json.dumps(product_info, indent=4, ensure_ascii=False)}") + logger.info( + f"🥳一阶段推理完成JSON \n{json.dumps(identified_product_list, indent=4, ensure_ascii=False)}") # 二阶段推理 product_timeline_info = None second_retry_time = retry_time while product_timeline_info is None and second_retry_time > 0: - product_timeline_info = await inference_api_second_stage(product_info) + product_timeline_info = await inference_api_second_stage(client, identified_product_list, + video_gemini_uri, + VIDEO_TIMELINE_ANALYSIS_PROMPT, + second_stage_generate_config, + second_stage_correct_config, + last_product_text, + start_time, end_time) second_retry_time -= 1 if product_timeline_info is None: raise Exception("😭二阶段推理失败") else: - logger.info(f"🥳二阶段推理完成JSON \n{json.dumps(product_timeline_info, indent=4, ensure_ascii=False)}") + logger.info( + f"🥳二阶段推理完成JSON \n{json.dumps(product_timeline_info, indent=4, ensure_ascii=False)}") return product_timeline_info, sentry_trace