diff --git a/src/cluster/video_apps/hls_slice_inference.py b/src/cluster/video_apps/hls_slice_inference.py index 67640ce..3f335f0 100644 --- a/src/cluster/video_apps/hls_slice_inference.py +++ b/src/cluster/video_apps/hls_slice_inference.py @@ -1,3 +1,5 @@ +import uuid + import modal from ..video import downloader_image, app, config @@ -114,38 +116,25 @@ with downloader_image.imports(): video = video.read() content_length = len(video) content_type = f"video/{file_path.split('.')[-1]}" - filename = file_path.split("\\")[-1] + filename = ".".join([str(uuid.uuid4()), file_path.split(".")[-1]]) if content_type not in ['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg', 'video/webm', 'video/wmv', 'video/3gpp']: raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) logger.info( f"Uploading name = {filename}, size = {content_length}, type = {content_type} to google file") with httpx.Client(timeout=1800) as client: - pre_upload_response = client.post( - url=f"https://generativelanguage.googleapis.com/upload/v1beta/files?key={google_api_key}", + upload_response = client.post( + url=f"https://storage.googleapis.com/upload/storage/v1/b/dy-media-storage/o?uploadType=media&name=video%2F{filename}", + content=video, headers={ - "X-Goog-Upload-Protocol": "resumable", - "X-Goog-Upload-Command": "start", - "X-Goog-Upload-Header-Content-Length": str(content_length), - "X-Goog-Upload-Header-Content-Type": content_type - }, - json={ - "file": { - "display_name": filename.split(".")[0], - } + "Authorization": f"Bearer {google_api_key}", + "Content-Type": content_type }) - pre_upload_response.raise_for_status() - - upload_url = pre_upload_response.headers.get("X-Goog-Upload-Url") - - upload_response = client.post(url=upload_url, content=video, headers={ - "X-Goog-Upload-Offset": "0", - "X-Goog-Upload-Command": "upload, finalize", - "Content-Type": content_type - }) upload_response.raise_for_status() - return upload_response.json(), upload_response.status_code + upload_url = f"gs://dy-media-storage/video/{filename}" + + return upload_url, upload_response.status_code @SentryUtils.webhook_handler(webhook, current_function_call_id()) @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="inference_gemini", @@ -216,38 +205,11 @@ with downloader_image.imports(): logger.info("3、视频文件开始上传到Gemini") video_gemini, code = upload(video_path) if code == 200: - video_gemini_uri = video_gemini["file"]["uri"] + video_gemini_uri = video_gemini else: logger.error("视频文件上传Gemini失败") raise Exception("视频文件上传Gemini失败") - # 5、检查文件是否已处理完成 - def check(): - with httpx.Client(timeout=Timeout(timeout=30)) as client: - file = video_gemini_uri.split("/")[-1] - response = client.get( - url=f"https://generativelanguage.googleapis.com/v1beta/files/{file}?key={google_api_key}") - response.raise_for_status() - if response.status_code == 200: - if response.json()["state"] == "ACTIVE": - return True - return False - - check_num = 60 - logger.info("4、开始检查Gemini文件是否处理完成") - while check_num > 0: - logger.info(f"检查中, 剩余检查次数{check_num}") - ret = check() - if ret: - break - else: - check_num -= 1 - time.sleep(5) - if check_num <= 0: - raise Exception("Gemini文件上传处理状态检查超时") - logger.success("Gemini文件处理完成") - - # 6、执行Gemini推理操作 async def inference_api(): try: logger.info("请求推理接口") @@ -274,6 +236,7 @@ with downloader_image.imports(): ], "contents": [ { + "role": "user", "parts": image_parts } ], @@ -282,10 +245,10 @@ with downloader_image.imports(): } } resp = requests.post( - "https://gateway.ai.cloudflare.com/v1/67720b647ff2b55cf37ba3ef9e677083/bowong-dev/google-ai-studio/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent", + "https://gateway.ai.cloudflare.com/v1/67720b647ff2b55cf37ba3ef9e677083/bowong-dev/google-vertex-ai/v1/projects/gen-lang-client-0413414134/locations/us-central1/publishers/google/models/gemini-2.5-flash-preview-05-20:generateContent", headers={ "Content-Type": "application/json", - "X-Goog-Api-Key": f"{google_api_key}" + "Authorization": f"Bearer {google_api_key}" }, json=json_data, timeout=900 ) @@ -301,7 +264,7 @@ with downloader_image.imports(): await asyncio.sleep(random.randint(50, 70)) return None - logger.info("5、发起Gemini推理") + logger.info("4、发起Gemini推理") target_json = None while target_json is None and retry_time > 0: target_json = await inference_api() @@ -325,19 +288,6 @@ with downloader_image.imports(): except Exception as e: logger.exception(f"推理失败, {e}") raise Exception(f"推理失败, {e}") - finally: - if video_gemini_uri: - logger.info("6、清除Gemini临时文件") - with httpx.Client(timeout=Timeout(timeout=120)) as client: - resp = client.delete( - f'https://bowongai-{config.modal_environment}--{config.modal_app_name}-fastapi-webapp.modal.run/google/delete', - params={"filename": video_gemini_uri.split("/")[-1]}, - headers={"x-google-api-key": google_api_key}) - resp.raise_for_status() - if resp.status_code == 200: - logger.success("Gemini临时文件清除成功") - else: - logger.warning("Gemini临时文件清除失败, 请自行清除") return await _handler(media, google_api_key, product_grid_list, product_list, start_time, end_time, options, sentry_trace, retry_time, scale) diff --git a/src/cluster/video_apps/make_grid_upload.py b/src/cluster/video_apps/make_grid_upload.py index 9aa07a5..573d5e7 100644 --- a/src/cluster/video_apps/make_grid_upload.py +++ b/src/cluster/video_apps/make_grid_upload.py @@ -1,3 +1,7 @@ +import mimetypes +import urllib + +import aiofiles import modal from ..video import downloader_image, app, config @@ -239,48 +243,57 @@ with downloader_image.imports(): logger.exception(f"拼图错误 {e}") return None - async def upload(file_path, google_api_key): - with open(file_path, "rb") as image: - image = image.read() - content_length = len(image) - if file_path.split('.')[-1] == "jpg": - content_type = f"image/jpeg" - elif file_path.split('.')[-1] == "png": - content_type = f"image/png" - elif file_path.split('.')[-1] == "gif": - content_type = f"image/gif" - elif file_path.split('.')[-1] == "webp": - content_type = f"image/webp" - else: - raise Exception(f"不支持的文件格式{file_path.split('.')[-1]}") - filename = file_path.split("\\")[-1] - logger.info(f"Uploading name = {filename}, size = {content_length}, type = {content_type} to google file") + async def upload(file_path, google_api_key, bucket_name="dy-media-storage"): + """异步上传文件到Google Cloud Storage""" + # 验证文件存在 + if not os.path.exists(file_path): + raise FileNotFoundError(f"文件不存在: {file_path}") + + # 自动检测MIME类型 + content_type, _ = mimetypes.guess_type(file_path) + if not content_type or not content_type.startswith("image/"): + ext = os.path.splitext(file_path)[1].lower() + if ext in ['.jpg', '.jpeg']: + content_type = "image/jpeg" + elif ext == '.png': + content_type = "image/png" + elif ext == '.gif': + content_type = "image/gif" + elif ext == '.webp': + content_type = "image/webp" + else: + raise ValueError(f"不支持的文件格式: {ext}") + + # 提取文件名 + filename = os.path.basename(file_path) + content_length = os.path.getsize(file_path) + + print(f"上传文件: {filename}, 大小: {content_length} bytes, 类型: {content_type}") + + # 构建上传URL + object_name = f"videos/{filename}" + object_name_quote = object_name.replace("/", "%2F") + upload_url = f"https://storage.googleapis.com/upload/storage/v1/b/{bucket_name}/o?uploadType=media&name={object_name_quote}" + + # 读取文件内容 + async with aiofiles.open(file_path, 'rb') as f: + file_content = await f.read() + + # 发送异步请求 async with httpx.AsyncClient(timeout=1800) as client: - pre_upload_response = await client.post( - url=f"https://generativelanguage.googleapis.com/upload/v1beta/files?key={google_api_key}", + response = await client.post( + url=upload_url, + content=file_content, headers={ - "X-Goog-Upload-Protocol": "resumable", - "X-Goog-Upload-Command": "start", - "X-Goog-Upload-Header-Content-Length": str(content_length), - "X-Goog-Upload-Header-Content-Type": content_type - }, - json={ - "file": { - "display_name": filename.split(".")[0], - } - }) - pre_upload_response.raise_for_status() - - upload_url = pre_upload_response.headers.get("X-Goog-Upload-Url") - - upload_response = await client.post(url=upload_url, content=image, headers={ - "X-Goog-Upload-Offset": "0", - "X-Goog-Upload-Command": "upload, finalize", - "Content-Type": content_type - }) - upload_response.raise_for_status() - - return upload_response.json(), upload_response.status_code + "Authorization": f"Bearer {google_api_key}", + "Content-Type": content_type, + "Content-Length": str(content_length) + } + ) + response.raise_for_status() + # 处理响应 + upload_url = f"gs://{bucket_name}/{object_name}" + return upload_url, response.status_code @SentryUtils.sentry_tracker(sentry_trace.x_trace_id, sentry_trace.x_baggage, op="make_grid_gemini", name="将输入图拼为网格上传到Gemini网盘", fn_id=current_function_call_id()) @@ -299,7 +312,7 @@ with downloader_image.imports(): image_grid_gemini, code = await upload(image_grid_path, google_api_key) if code == 200: - image_gemini_uri = image_grid_gemini["file"]["uri"] + image_gemini_uri = image_grid_gemini else: logger.error("图片网格文件上传Gemini失败") raise Exception("图片网格文件上传Gemini失败")