合并分支

* Merge branch 'main' into cluster-gemini
* ADD gemini数据源使用cloud storage

---------

Merge request URL: https://g-ldyi2063.coding.net/p/dev/d/modalDeploy/git/merge/4815
Co-authored-by: 康宇佳
This commit is contained in:
康宇佳 2025-06-17 19:17:01 +08:00 committed by Coding
parent a05cf7b069
commit 40e0089308
2 changed files with 70 additions and 107 deletions

View File

@ -1,3 +1,5 @@
import uuid
import modal
from ..video import downloader_image, app, config
@ -114,38 +116,25 @@ with downloader_image.imports():
video = video.read()
content_length = len(video)
content_type = f"video/{file_path.split('.')[-1]}"
filename = file_path.split("\\")[-1]
filename = ".".join([str(uuid.uuid4()), file_path.split(".")[-1]])
if content_type not in ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg',
'video/webm', 'video/wmv', 'video/3gpp']:
raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
logger.info(
f"Uploading name = {filename}, size = {content_length}, type = {content_type} to google file")
with httpx.Client(timeout=1800) as client:
pre_upload_response = client.post(
url=f"https://generativelanguage.googleapis.com/upload/v1beta/files?key={google_api_key}",
upload_response = client.post(
url=f"https://storage.googleapis.com/upload/storage/v1/b/dy-media-storage/o?uploadType=media&name=video%2F{filename}",
content=video,
headers={
"X-Goog-Upload-Protocol": "resumable",
"X-Goog-Upload-Command": "start",
"X-Goog-Upload-Header-Content-Length": str(content_length),
"X-Goog-Upload-Header-Content-Type": content_type
},
json={
"file": {
"display_name": filename.split(".")[0],
}
"Authorization": f"Bearer {google_api_key}",
"Content-Type": content_type
})
pre_upload_response.raise_for_status()
upload_url = pre_upload_response.headers.get("X-Goog-Upload-Url")
upload_response = client.post(url=upload_url, content=video, headers={
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
"Content-Type": content_type
})
upload_response.raise_for_status()
return upload_response.json(), upload_response.status_code
upload_url = f"gs://dy-media-storage/video/{filename}"
return upload_url, upload_response.status_code
@SentryUtils.webhook_handler(webhook, current_function_call_id())
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini",
@ -216,38 +205,11 @@ with downloader_image.imports():
logger.info("3、视频文件开始上传到Gemini")
video_gemini, code = upload(video_path)
if code == 200:
video_gemini_uri = video_gemini["file"]["uri"]
video_gemini_uri = video_gemini
else:
logger.error("视频文件上传Gemini失败")
raise Exception("视频文件上传Gemini失败")
# 5、检查文件是否已处理完成
def check():
with httpx.Client(timeout=Timeout(timeout=30)) as client:
file = video_gemini_uri.split("/")[-1]
response = client.get(
url=f"https://generativelanguage.googleapis.com/v1beta/files/{file}?key={google_api_key}")
response.raise_for_status()
if response.status_code == 200:
if response.json()["state"] == "ACTIVE":
return True
return False
check_num = 60
logger.info("4、开始检查Gemini文件是否处理完成")
while check_num > 0:
logger.info(f"检查中, 剩余检查次数{check_num}")
ret = check()
if ret:
break
else:
check_num -= 1
time.sleep(5)
if check_num <= 0:
raise Exception("Gemini文件上传处理状态检查超时")
logger.success("Gemini文件处理完成")
# 6、执行Gemini推理操作
async def inference_api():
try:
logger.info("请求推理接口")
@ -274,6 +236,7 @@ with downloader_image.imports():
],
"contents": [
{
"role": "user",
"parts": image_parts
}
],
@ -282,10 +245,10 @@ with downloader_image.imports():
}
}
resp = requests.post(
"https://gateway.ai.cloudflare.com/v1/67720b647ff2b55cf37ba3ef9e677083/bowong-dev/google-ai-studio/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent",
"https://gateway.ai.cloudflare.com/v1/67720b647ff2b55cf37ba3ef9e677083/bowong-dev/google-vertex-ai/v1/projects/gen-lang-client-0413414134/locations/us-central1/publishers/google/models/gemini-2.5-flash-preview-05-20:generateContent",
headers={
"Content-Type": "application/json",
"X-Goog-Api-Key": f"{google_api_key}"
"Authorization": f"Bearer {google_api_key}"
},
json=json_data, timeout=900
)
@ -301,7 +264,7 @@ with downloader_image.imports():
await asyncio.sleep(random.randint(50, 70))
return None
logger.info("5、发起Gemini推理")
logger.info("4、发起Gemini推理")
target_json = None
while target_json is None and retry_time > 0:
target_json = await inference_api()
@ -325,19 +288,6 @@ with downloader_image.imports():
except Exception as e:
logger.exception(f"推理失败, {e}")
raise Exception(f"推理失败, {e}")
finally:
if video_gemini_uri:
logger.info("6、清除Gemini临时文件")
with httpx.Client(timeout=Timeout(timeout=120)) as client:
resp = client.delete(
f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp.modal.run/google/delete',
params={"filename": video_gemini_uri.split("/")[-1]},
headers={"x-google-api-key": google_api_key})
resp.raise_for_status()
if resp.status_code == 200:
logger.success("Gemini临时文件清除成功")
else:
logger.warning("Gemini临时文件清除失败, 请自行清除")
return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, options,
sentry_trace, retry_time, scale)

View File

@ -1,3 +1,7 @@
import mimetypes
import urllib
import aiofiles
import modal
from ..video import downloader_image, app, config
@ -239,48 +243,57 @@ with downloader_image.imports():
logger.exception(f"拼图错误 {e}")
return None
async def upload(file_path, google_api_key):
with open(file_path, "rb") as image:
image = image.read()
content_length = len(image)
if file_path.split('.')[-1] == "jpg":
content_type = f"image/jpeg"
elif file_path.split('.')[-1] == "png":
content_type = f"image/png"
elif file_path.split('.')[-1] == "gif":
content_type = f"image/gif"
elif file_path.split('.')[-1] == "webp":
content_type = f"image/webp"
else:
raise Exception(f"不支持的文件格式{file_path.split('.')[-1]}")
filename = file_path.split("\\")[-1]
logger.info(f"Uploading name = {filename}, size = {content_length}, type = {content_type} to google file")
async def upload(file_path, google_api_key, bucket_name="dy-media-storage"):
"""异步上传文件到Google Cloud Storage"""
# 验证文件存在
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
# 自动检测MIME类型
content_type, _ = mimetypes.guess_type(file_path)
if not content_type or not content_type.startswith("image/"):
ext = os.path.splitext(file_path)[1].lower()
if ext in ['.jpg', '.jpeg']:
content_type = "image/jpeg"
elif ext == '.png':
content_type = "image/png"
elif ext == '.gif':
content_type = "image/gif"
elif ext == '.webp':
content_type = "image/webp"
else:
raise ValueError(f"不支持的文件格式: {ext}")
# 提取文件名
filename = os.path.basename(file_path)
content_length = os.path.getsize(file_path)
print(f"上传文件: {filename}, 大小: {content_length} bytes, 类型: {content_type}")
# 构建上传URL
object_name = f"videos/{filename}"
object_name_quote = object_name.replace("/", "%2F")
upload_url = f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name_quote}"
# 读取文件内容
async with aiofiles.open(file_path, 'rb') as f:
file_content = await f.read()
# 发送异步请求
async with httpx.AsyncClient(timeout=1800) as client:
pre_upload_response = await client.post(
url=f"https://generativelanguage.googleapis.com/upload/v1beta/files?key={google_api_key}",
response = await client.post(
url=upload_url,
content=file_content,
headers={
"X-Goog-Upload-Protocol": "resumable",
"X-Goog-Upload-Command": "start",
"X-Goog-Upload-Header-Content-Length": str(content_length),
"X-Goog-Upload-Header-Content-Type": content_type
},
json={
"file": {
"display_name": filename.split(".")[0],
}
})
pre_upload_response.raise_for_status()
upload_url = pre_upload_response.headers.get("X-Goog-Upload-Url")
upload_response = await client.post(url=upload_url, content=image, headers={
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
"Content-Type": content_type
})
upload_response.raise_for_status()
return upload_response.json(), upload_response.status_code
"Authorization": f"Bearer {google_api_key}",
"Content-Type": content_type,
"Content-Length": str(content_length)
}
)
response.raise_for_status()
# 处理响应
upload_url = f"gs://{bucket_name}/{object_name}"
return upload_url, response.status_code
@SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="make_grid_gemini",
name="将输入图拼为网格上传到Gemini网盘", fn_id=current_function_call_id())
@ -299,7 +312,7 @@ with downloader_image.imports():
image_grid_gemini, code = await upload(image_grid_path, google_api_key)
if code == 200:
image_gemini_uri = image_grid_gemini["file"]["uri"]
image_gemini_uri = image_grid_gemini
else:
logger.error("图片网格文件上传Gemini失败")
raise Exception("图片网格文件上传Gemini失败")