import os import boto3 import loguru import torch import yaml from qcloud_cos import CosConfig, CosS3Client, CosClientError, CosServiceError from ..utils.image_utils import tensor_to_tempfile class COSDownload: """腾讯云COS下载""" @classmethod def INPUT_TYPES(s): return { "required": { "cos_bucket": ("STRING", {"default": "bwkj-cos-1324682537"}), "cos_key": ("STRING", {"multiline": True}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("视频存储路径",) FUNCTION = "download" CATEGORY = "不忘科技-自定义节点🚩/对象存储/COS" def download(self, cos_bucket, cos_key): cos_key_in = cos_key.replace("/", os.sep) destination = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "download", os.path.dirname(cos_key_in), os.path.basename(cos_key_in) ) loguru.logger.info(f"COS DOWNLOAD to {destination}") os.makedirs( os.path.dirname(destination), exist_ok=True, ) for i in range(0, 10): try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open( os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml" ), encoding="utf-8", mode="r+", ) as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) config = CosConfig( Region=yaml_config["cos_region"], SecretId=yaml_config["cos_secret_id"], SecretKey=yaml_config["cos_secret_key"], ) client = CosS3Client(config) response = client.download_file( Bucket=cos_bucket, Key=cos_key, DestFilePath=destination ) break except CosClientError or CosServiceError as e: raise Exception(f"COS下载失败! bucket {cos_bucket}; key {cos_key}") return ( destination, ) class COSUpload: """腾讯云COS上传""" @classmethod def INPUT_TYPES(s): return { "required": { "cos_bucket": ("STRING", {"default": "bwkj-cos-1324682537"}), "path": ("STRING", {"multiline": True}), "subfolder": ("STRING", {"default": "test"}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("COS文件Key",) FUNCTION = "upload" CATEGORY = "不忘科技-自定义节点🚩/对象存储/COS" def upload(self, cos_bucket, path, subfolder): dest_key = "/".join( [ subfolder, ( path.split("/")[-1] if "/" in path else path.split("\\")[-1] ), ] ) loguru.logger.info(f"COS UPLOAD {path} to {cos_bucket}/{subfolder}") for i in range(0, 10): try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open( os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml" ), encoding="utf-8", mode="r+", ) as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) config = CosConfig( Region=yaml_config["cos_region"], SecretId=yaml_config["cos_secret_id"], SecretKey=yaml_config["cos_secret_key"], ) client = CosS3Client(config) response = client.upload_file( Bucket=cos_bucket, Key=dest_key, LocalFilePath=path, ) break except CosClientError or CosServiceError as e: raise Exception(f"COS上传失败 bucket {cos_bucket}; local_path {path}; subfolder {subfolder}") return ( dest_key, ) class S3Download: """AWS S3下载""" @classmethod def INPUT_TYPES(s): return { "required": { "s3_bucket": ("STRING", {"default": "bw-comfyui-input"}), "s3_key": ("STRING", {"multiline": True}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("视频存储路径",) FUNCTION = "download" CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3" def download(self, s3_bucket, s3_key): s3_key_in = s3_key.replace("/", os.sep) destination = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "download", os.path.dirname(s3_key_in), os.path.basename(s3_key_in), ) loguru.logger.info(f"S3 DOWNLOAD to {destination}") os.makedirs( os.path.dirname(destination), exist_ok=True, ) try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"), encoding="utf-8", mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"], aws_secret_access_key=yaml_config["aws_access_key"]) client.download_file(s3_bucket, s3_key, destination) except Exception as e: raise Exception(f"S3下载失败! bucket {s3_bucket}; key {s3_key}") return (destination,) class S3Upload: """AWS S3上传""" @classmethod def INPUT_TYPES(s): return { "required": { "s3_bucket": ("STRING", {"default": "bw-comfyui-output"}), "path": ("STRING", {"multiline": True}), "subfolder": ("STRING", {"default": "test"}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("S3文件Key",) FUNCTION = "upload" CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3" def upload(self, s3_bucket, path, subfolder): loguru.logger.info(f"S3 UPLOAD {path} to {s3_bucket}/{subfolder}") try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"), encoding="utf-8", mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"], aws_secret_access_key=yaml_config["aws_access_key"]) dest_key = "/".join( [ subfolder, ( path.split("/")[-1] if "/" in path else path.split("\\")[-1] ), ] ) client.upload_file(path, s3_bucket, dest_key) except Exception as e: raise Exception(f"S3上传失败! bucket {s3_bucket}; local_path {path}; subfolder {subfolder}") return (dest_key,) class S3UploadURL: """AWS S3上传 返回URL""" @classmethod def INPUT_TYPES(s): return { "required": { "path": ("STRING", {"multiline": True}), "subfolder": ("STRING", {"default": "test"}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("URL",) FUNCTION = "upload" CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3" def upload(self, path, subfolder): s3_bucket = "modal-media-cache" loguru.logger.info(f"S3 UPLOAD {path} to {s3_bucket}/{subfolder}") try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"), encoding="utf-8", mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"], aws_secret_access_key=yaml_config["aws_access_key"]) dest_key = "/".join( [ subfolder, ( path.split("/")[-1] if "/" in path else path.split("\\")[-1] ), ] ) client.upload_file(path, s3_bucket, dest_key) except Exception as e: raise Exception(f"S3上传失败! bucket {s3_bucket}; local_path {path}; subfolder {subfolder}") url = f"https://cdn.roasmax.cn/{dest_key}" return (url,) class S3UploadIMAGEURL: """AWS S3上传 返回URL""" @classmethod def INPUT_TYPES(s): return { "required": { "image": ("IMAGE", {"multiline": True}), "subfolder": ("STRING", {"default": "test"}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("URL",) FUNCTION = "upload" CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3" def upload(self, image:torch.Tensor, subfolder): s3_bucket = "modal-media-cache" loguru.logger.info(f"S3 UPLOAD image to {s3_bucket}/{subfolder}") path = tensor_to_tempfile(image).name try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"), encoding="utf-8", mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"], aws_secret_access_key=yaml_config["aws_access_key"]) dest_key = "/".join( [ subfolder, ( path.split("/")[-1] if "/" in path else path.split("\\")[-1] ), ] ) client.upload_file(path, s3_bucket, dest_key) except Exception as e: raise Exception(f"S3上传失败! bucket {s3_bucket}; local_path {path}; subfolder {subfolder}") url = f"https://cdn.roasmax.cn/{dest_key}" return (url,)