import os from pathlib import Path import requests from tencentcloud.common import credential from tencentcloud.common.profile.client_profile import ClientProfile from tencentcloud.common.profile.http_profile import HttpProfile from tencentcloud.vod.v20180717 import vod_client, models class VodToLocalNode: def __init__(self): self.secret_id = "AKIDsrihIyjZOBsjimt8TsN8yvv1AMh5dB44" self.secret_key = "CPZcxdk6W39Jd4cGY95wvupoyMd0YFqW" self.vod_client = self.init_vod_client() def init_vod_client(self): """初始化VOD客户端""" try: http_profile = HttpProfile(endpoint="vod.tencentcloudapi.com") client_profile = ClientProfile(httpProfile=http_profile) cred = credential.Credential(self.secret_id, self.secret_key) return vod_client.VodClient(cred, "ap-shanghai", client_profile) except Exception as e: raise RuntimeError(f"VOD client initialization failed: {e}") @classmethod def INPUT_TYPES(cls): return { "required": { "file_id": ("STRING", {"default": ""}), "sub_app_id": ("STRING", {"default": ""}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("local_path",) FUNCTION = "execute" CATEGORY = "video" def execute(self, file_id, sub_app_id): # 调用下载逻辑 local_path = self.download_vod(file_id, sub_app_id) print(f"下载成功: {local_path}") return (local_path,) def _get_download_url(self, file_id, sub_app_id): """获取媒体文件下载地址""" try: req = models.DescribeMediaInfosRequest() req.FileIds = [file_id] req.SubAppId = int(sub_app_id) resp = self.vod_client.DescribeMediaInfos(req) if not resp.MediaInfoSet: raise ValueError("File not found") media_info = resp.MediaInfoSet[0] if not media_info.BasicInfo.MediaUrl: raise ValueError("No download URL available") return media_info.BasicInfo.MediaUrl except Exception as e: raise RuntimeError(f"Tencent API error: {e}") def create_directory(self, path): p = Path(path) if not p.exists(): p.mkdir( parents=True, exist_ok=True ) # parents=True会自动创建所有必需的父目录,exist_ok=True表示如果目录已存在则不会引发异常 print(f"目录已创建: {path}") else: print(f"目录已存在: {path}") def download_vod(self, file_id, sub_app_id): """ 需要补充腾讯云VOD SDK调用逻辑 返回本地文件路径 """ media_url = self._get_download_url(file_id=file_id, sub_app_id=sub_app_id) print(f"download from url: {media_url}") # 生成一个临时目录路径名并创建该目录 self.create_directory( os.path.join( os.path.dirname(os.path.abspath(__file__)), "download", f"{sub_app_id}" ) ) output_dir = os.path.join( os.path.dirname(os.path.abspath(__file__)), "download", f"{sub_app_id}", f"{file_id}.mp4", ) # 判断文件是否存在 if os.path.exists(output_dir): return output_dir return self._download_file(url=media_url, save_path=output_dir, timeout=60 * 10) def _download_file(self, url: str, save_path: str, timeout: int = 30): """下载文件到本地""" try: with requests.get(url, stream=True, timeout=timeout) as response: response.raise_for_status() with open(save_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) return save_path except Exception as e: raise RuntimeError(f"Download error: {e}")