Merge branch 'feature/google-upload-v2'

This commit is contained in:
shuohigh@gmail.com 2025-06-18 17:28:43 +08:00
commit 99b303c161
3 changed files with 33 additions and 15 deletions

View File

@ -9,6 +9,7 @@ from loguru import logger
import httpx import httpx
from fastapi import APIRouter, UploadFile, Header, HTTPException, Depends from fastapi import APIRouter, UploadFile, Header, HTTPException, Depends
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from pydantic import computed_field
from starlette import status from starlette import status
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
@ -34,8 +35,18 @@ class BundleHeaders(BaseModel):
x_baggage: str = Field(description="Sentry Transaction baggage", default=None) x_baggage: str = Field(description="Sentry Transaction baggage", default=None)
class GoogleAuthHeaders(BaseModel):
Authorization: str = Field(description="Google Auth Bearer Token")
@computed_field(description="Google Auth Token")
@property
def auth_token(self) -> str:
return self.Authorization[len("Bearer "):]
@router.post("/upload", @router.post("/upload",
summary="上传文件到Google File", summary="上传文件到Google AI Studio提供的File API, 将被VertexAI新规范替代统一使用Google CloudStorage存储",
deprecated=True,
description="上传文件到Google File, 换取Google File URI, 不同Google API Key之间的URI不互通, 最多可为每个项目存储 20 GB 的文件,每个文件的大小上限为 2 GB。文件会存储 48 小时") description="上传文件到Google File, 换取Google File URI, 不同Google API Key之间的URI不互通, 最多可为每个项目存储 20 GB 的文件,每个文件的大小上限为 2 GB。文件会存储 48 小时")
async def upload_file_multipart(file: UploadFile, async def upload_file_multipart(file: UploadFile,
headers: Annotated[GoogleAPIKeyHeaders, Header()]): headers: Annotated[GoogleAPIKeyHeaders, Header()]):
@ -80,6 +91,10 @@ async def upload_file_multipart(file: UploadFile,
return JSONResponse(content=upload_response.json(), status_code=upload_response.status_code) return JSONResponse(content=upload_response.json(), status_code=upload_response.status_code)
@router.post("/vertex-ai/upload", summary="上传文件到Vertex AI可访问的Google cloud storage", )
async def upload_file(file: UploadFile, headers: Annotated[GoogleAuthHeaders, Header()]):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
@router.get("/status", summary="获取已上传文件的处理状态") @router.get("/status", summary="获取已上传文件的处理状态")
async def uploaded_file_status(filename: str, async def uploaded_file_status(filename: str,
headers: Annotated[GoogleAPIKeyHeaders, Header()]): headers: Annotated[GoogleAPIKeyHeaders, Header()]):

View File

@ -1,4 +1,4 @@
from typing import Union, Dict, Any from typing import Union, Dict, Any, BinaryIO
import backoff import backoff
import httpx import httpx
@ -126,3 +126,7 @@ class GoogleAuthUtils:
data=params) data=params)
response.raise_for_status() response.raise_for_status()
return GoogleAuthUtils.GoogleAuthResponse.model_validate_json(response.text) return GoogleAuthUtils.GoogleAuthResponse.model_validate_json(response.text)
# @staticmethod
# async def google_upload_file(file_stream: BinaryIO, google_api_key: str, bucket_name: str):
#

View File

@ -557,12 +557,16 @@ class VideoUtils:
@staticmethod @staticmethod
async def convert_m3u8_to_local_source(media_stream_url: str, async def convert_m3u8_to_local_source(media_stream_url: str,
head: Optional[float] = None, head: float = 0,
tail: Optional[float] = None) -> tuple[str, str, TimeDelta]: tail: float = 86400, # 使用24H时长替代♾
temp_dir: str = None) -> tuple[str, str, TimeDelta]:
""" """
转换m3u8为本地来源 转换m3u8为本地来源
""" """
# 创建临时目录存储TS片段 # 创建临时目录存储TS片段
if temp_dir:
os.makedirs(temp_dir, exist_ok=True)
else:
temp_dir = tempfile.mkdtemp() temp_dir = tempfile.mkdtemp()
from m3u8 import SegmentList, Segment from m3u8 import SegmentList, Segment
try: try:
@ -578,10 +582,6 @@ class VideoUtils:
max_head = origin_time + timedelta(seconds=tail) max_head = origin_time + timedelta(seconds=tail)
logger.info(f"min: {min_head}, max: {max_head}") logger.info(f"min: {min_head}, max: {max_head}")
for segment in playlist.segments: for segment in playlist.segments:
if not head:
head = 0
if not tail:
tail = 86400 # 使用24H时长替代♾
if min_head - timedelta(seconds=segment.duration) <= segment.current_program_date_time <= max_head: if min_head - timedelta(seconds=segment.duration) <= segment.current_program_date_time <= max_head:
logger.info(f"duration: {segment.duration}, head: {segment.current_program_date_time}") logger.info(f"duration: {segment.duration}, head: {segment.current_program_date_time}")
duration += segment.duration duration += segment.duration
@ -600,13 +600,10 @@ class VideoUtils:
playlist.is_endlist = True playlist.is_endlist = True
for url in ts_urls: for url in ts_urls:
tasks.append(VideoUtils.async_download_file(url.absolute_uri, f"{temp_dir}/{url.uri}")) tasks.append(VideoUtils.async_download_file(url.absolute_uri, f"{temp_dir}/{url.uri}"))
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
# 4. 修改m3u8文件指向本地TS片段 # 4. 修改m3u8文件指向本地TS片段
local_m3u8_path = os.path.join(temp_dir, "local.m3u8") local_m3u8_path = os.path.join(temp_dir, "local.m3u8")
playlist.dump(local_m3u8_path) playlist.dump(local_m3u8_path)
return local_m3u8_path, temp_dir, diff return local_m3u8_path, temp_dir, diff
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
@ -631,7 +628,7 @@ class VideoUtils:
os.makedirs(os.path.dirname(output_path), exist_ok=True) os.makedirs(os.path.dirname(output_path), exist_ok=True)
try: try:
local_m3u8_path, temp_dir = await VideoUtils.convert_m3u8_to_local_source(media_stream_url=media_stream_url) local_m3u8_path, temp_dir, diff = await VideoUtils.convert_m3u8_to_local_source(media_stream_url=media_stream_url)
# 使用ffmpeg合并TS片段 # 使用ffmpeg合并TS片段
ffmpeg_cmd = VideoUtils.async_ffmpeg_init() ffmpeg_cmd = VideoUtils.async_ffmpeg_init()
ffmpeg_cmd.input(local_m3u8_path, ffmpeg_cmd.input(local_m3u8_path,
@ -689,7 +686,9 @@ class VideoUtils:
f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2," f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2,"
f"setsar=1:1," # 新增强制设置SAR f"setsar=1:1," # 新增强制设置SAR
f"fps=30,format=yuv420p[v{i}]", f"fps=30,format=yuv420p[v{i}]",
f"[{i}:a]aformat=sample_fmts=fltp:sample_rates=44100:channel_layouts=stereo[a{i}]", # 修改音频过滤器确保输出为AAC兼容格式
# f"[{i}:a]aformat=sample_fmts=fltp:sample_rates=44100:channel_layouts=stereo[a{i}]",
f"[{i}:a]aformat=sample_fmts=s16:sample_rates=44100:channel_layouts=stereo[a{i}]",
] ]
) )
# 3. 准备处理后的视频流和音频流的连接字符串 # 3. 准备处理后的视频流和音频流的连接字符串