ComfyUI-CustomNode/utils/object_storage/providers/vod_provider.py

305 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
腾讯云VOD存储提供者
提供腾讯云视频点播(VOD)服务的统一接口实现。
支持媒体文件的查询和下载功能。
"""
import os
from typing import Dict, Any, Optional
import requests
import loguru
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.vod.v20180717 import vod_client, models
from ..storage_interface import StorageProvider, StorageFactory, DownloadResult
class VODProvider(StorageProvider):
"""
腾讯云VOD存储提供者
提供腾讯云视频点播服务的统一接口,包括:
- 媒体文件信息查询
- 媒体文件下载
- 媒体文件管理
"""
def __init__(self, config: Dict[str, Any]):
"""
初始化VOD提供者
Args:
config: VOD配置字典包含
- secret_id: 腾讯云密钥ID
- secret_key: 腾讯云密钥Key
- region: 区域默认ap-beijing
- sub_app_id: 子应用ID可选
"""
super().__init__(config)
# 验证必需的配置项
required_keys = ["secret_id", "secret_key"]
for key in required_keys:
if not config.get(key):
raise ValueError(f"VOD配置缺少必需项: {key}")
self.secret_id = config["secret_id"]
self.secret_key = config["secret_key"]
self.region = config.get("region", "ap-beijing")
self.sub_app_id = config.get("sub_app_id")
# 初始化VOD客户端
self.vod_client = self._init_vod_client()
loguru.logger.info(f"VOD提供者初始化成功区域: {self.region}")
def _init_vod_client(self):
"""初始化VOD客户端"""
try:
http_profile = HttpProfile(endpoint="vod.tencentcloudapi.com")
client_profile = ClientProfile(httpProfile=http_profile)
cred = credential.Credential(self.secret_id, self.secret_key)
return vod_client.VodClient(cred, self.region, client_profile)
except Exception as e:
raise RuntimeError(f"VOD客户端初始化失败: {e}")
def get_media_info(
self, file_id: str, sub_app_id: Optional[str] = None
) -> Dict[str, Any]:
"""
获取媒体文件信息
Args:
file_id: 文件ID
sub_app_id: 子应用ID为None时使用默认配置
Returns:
Dict: 媒体文件信息
Raises:
ValueError: 文件不存在或参数错误
RuntimeError: API调用失败
"""
try:
if not file_id or not file_id.strip():
raise ValueError("文件ID不能为空")
req = models.DescribeMediaInfosRequest()
req.FileIds = [file_id.strip()]
# 使用传入的sub_app_id或默认配置
app_id = sub_app_id or self.sub_app_id
if app_id:
req.SubAppId = int(app_id)
resp = self.vod_client.DescribeMediaInfos(req)
if not resp.MediaInfoSet:
raise ValueError(f"文件不存在: {file_id}")
media_info = resp.MediaInfoSet[0]
# 构造返回信息
result = {
"file_id": file_id,
"name": media_info.BasicInfo.Name or "",
"size": media_info.BasicInfo.Size or 0,
"duration": media_info.BasicInfo.Duration or 0,
"media_url": media_info.BasicInfo.MediaUrl or "",
"cover_url": media_info.BasicInfo.CoverUrl or "",
"create_time": media_info.BasicInfo.CreateTime or "",
"update_time": media_info.BasicInfo.UpdateTime or "",
}
loguru.logger.info(f"获取媒体信息成功: {file_id}")
return result
except Exception as e:
if isinstance(e, ValueError):
raise
loguru.logger.error(f"获取媒体信息失败: {e}")
raise RuntimeError(f"腾讯云VOD API错误: {e}")
def get_download_url(self, file_id: str, sub_app_id: Optional[str] = None) -> str:
"""
获取媒体文件下载URL
Args:
file_id: 文件ID
sub_app_id: 子应用ID
Returns:
str: 下载URL
Raises:
ValueError: 文件不存在或没有下载URL
"""
media_info = self.get_media_info(file_id, sub_app_id)
if not media_info["media_url"]:
raise ValueError(f"文件 {file_id} 没有可用的下载URL")
return media_info["media_url"]
def download_file(
self,
file_id: str,
local_path: str,
sub_app_id: Optional[str] = None,
timeout: int = 300,
) -> DownloadResult:
"""
下载VOD媒体文件到本地
Args:
file_id: 文件ID这里用作远程键名
local_path: 本地保存路径
sub_app_id: 子应用ID
timeout: 下载超时时间(秒)
Returns:
DownloadResult: 下载结果
"""
try:
# 获取下载URL
download_url = self.get_download_url(file_id, sub_app_id)
loguru.logger.info(f"开始下载VOD文件: {file_id} -> {local_path}")
# 确保本地目录存在
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
with requests.get(download_url, stream=True, timeout=timeout) as response:
response.raise_for_status()
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# 验证文件是否下载成功
if not os.path.exists(local_path) or os.path.getsize(local_path) == 0:
raise RuntimeError("下载的文件为空或不存在")
loguru.logger.info(f"VOD文件下载成功: {local_path}")
return DownloadResult(
success=True, local_path=local_path, message="下载成功"
)
except Exception as e:
error_msg = f"VOD文件下载失败: {str(e)}"
loguru.logger.error(error_msg)
return DownloadResult(success=False, message=error_msg)
def upload_file(self, local_path: str, remote_key: str, **kwargs) -> Any:
"""
VOD不支持直接文件上传请使用腾讯云VOD控制台或专用上传接口
Raises:
NotImplementedError: VOD不支持此操作
"""
raise NotImplementedError("VOD提供者不支持文件上传请使用腾讯云VOD控制台")
def upload_bytes(self, file_content: bytes, remote_key: str, **kwargs) -> Any:
"""
VOD不支持直接字节上传
Raises:
NotImplementedError: VOD不支持此操作
"""
raise NotImplementedError("VOD提供者不支持字节上传")
def upload_tensor(self, tensor: Any, remote_key: str, **kwargs) -> Any:
"""
VOD不支持张量上传
Raises:
NotImplementedError: VOD不支持此操作
"""
raise NotImplementedError("VOD提供者不支持张量上传")
def delete_file(self, remote_key: str, **kwargs) -> bool:
"""
删除VOD媒体文件
Args:
remote_key: 文件ID
Returns:
bool: 是否删除成功
Note:
此功能需要额外的权限配置
"""
# 这里可以实现删除逻辑,但需要谨慎使用
loguru.logger.warning("VOD文件删除功能未实现请使用控制台操作")
return False
def list_files(self, prefix: str = "", **kwargs) -> list:
"""
列出VOD媒体文件
Args:
prefix: 过滤前缀在VOD中可能是分类或标签
Returns:
list: 文件列表
Note:
此功能需要调用搜索接口实现
"""
loguru.logger.warning("VOD文件列表功能未实现")
return []
class VODStorageFactory(StorageFactory):
"""
腾讯云VOD存储工厂
负责创建和管理VOD存储提供者实例
"""
def get_supported_types(self) -> list:
"""获取支持的存储类型"""
return ["vod", "tencent_vod", "qcloud_vod"]
def create_provider(self, storage_type: str, config: Dict[str, Any]) -> VODProvider:
"""
创建VOD存储提供者
Args:
storage_type: 存储类型
config: 配置信息
Returns:
VODProvider: VOD存储提供者实例
"""
if storage_type not in self.get_supported_types():
raise ValueError(f"不支持的存储类型: {storage_type}")
return VODProvider(config)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""
验证VOD配置
Args:
config: 配置字典
Returns:
bool: 配置是否有效
"""
required_keys = ["secret_id", "secret_key"]
for key in required_keys:
if not config.get(key):
return False
return True