import os import boto3 import loguru import yaml class S3Download: """AWS S3下载""" @classmethod def INPUT_TYPES(s): return { "required": { "s3_bucket": ("STRING", {"default": "bw-comfyui-input"}), "s3_key": ("STRING", {"multiline": True}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("视频存储路径",) FUNCTION = "download" CATEGORY = "不忘科技-自定义节点🚩/S3" def download(self, s3_bucket, s3_key): s3_key_in = s3_key.replace("/",os.sep) destination = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "download", os.path.dirname(s3_key_in), os.path.basename(s3_key_in), ) loguru.logger.info(f"S3 DOWNLOAD to {destination}") os.makedirs( os.path.dirname(destination), exist_ok=True, ) try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"),encoding="utf-8",mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3",aws_access_key_id=yaml_config["aws_key_id"],aws_secret_access_key=yaml_config["aws_access_key"]) client.download_file(s3_bucket, s3_key, destination) except Exception as e: raise Exception(f"S3下载失败! bucket {s3_bucket}; key {s3_key}") return (destination,) class S3Upload: """AWS S3上传""" @classmethod def INPUT_TYPES(s): return { "required": { "s3_bucket": ("STRING", {"default": "bw-comfyui-output"}), "path": ("STRING", {"multiline": True}), "subfolder": ("STRING", {"default": "test"}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("S3文件Key",) FUNCTION = "upload" CATEGORY = "不忘科技-自定义节点🚩/S3" def upload(self, s3_bucket, path, subfolder): loguru.logger.info(f"S3 UPLOAD {path} to {s3_bucket}/{subfolder}") try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"), encoding="utf-8", mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"], aws_secret_access_key=yaml_config["aws_access_key"]) dest_key = "/".join( [ subfolder, ( path.split("/")[-1] if "/" in path else path.split("\\")[-1] ), ] ) client.upload_file(path, s3_bucket, dest_key) except Exception as e: raise Exception(f"S3上传失败! bucket {s3_bucket}; local_path {path}; subfolder {subfolder}") return (dest_key,) class S3UploadURL: """AWS S3上传 返回URL""" @classmethod def INPUT_TYPES(s): return { "required": { "path": ("STRING", {"multiline": True}), "subfolder": ("STRING", {"default": "test"}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("S3文件Key",) FUNCTION = "upload" CATEGORY = "不忘科技-自定义节点🚩/S3" def upload(self, path, subfolder): s3_bucket = "modal-media-cache" loguru.logger.info(f"S3 UPLOAD {path} to {s3_bucket}/{subfolder}") try: if "aws_key_id" in list(os.environ.keys()): yaml_config = os.environ else: with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"), encoding="utf-8", mode="r+") as f: yaml_config = yaml.load(f, Loader=yaml.FullLoader) client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"], aws_secret_access_key=yaml_config["aws_access_key"]) dest_key = "/".join( [ subfolder, ( path.split("/")[-1] if "/" in path else path.split("\\")[-1] ), ] ) client.upload_file(path, s3_bucket, dest_key) except Exception as e: raise Exception(f"S3上传失败! bucket {s3_bucket}; local_path {path}; subfolder {subfolder}") url = f"https://cdn.roasmax.cn/{dest_key}" return (url,)