ComfyUI-CustomNode/nodes/object_storage_nodes.py

320 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import boto3
import loguru
import torch
import yaml
from qcloud_cos import CosConfig, CosS3Client, CosClientError, CosServiceError
from ..utils.image_utils import tensor_to_tempfile
class COSDownload:
"""腾讯云COS下载"""
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"cos_bucket": ("STRING", {"default": "bwkj-cos-1324682537"}),
"cos_key": ("STRING", {"multiline": True}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("视频存储路径",)
FUNCTION = "download"
CATEGORY = "不忘科技-自定义节点🚩/对象存储/COS"
def download(self, cos_bucket, cos_key):
cos_key_in = cos_key.replace("/", os.sep)
destination = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"download",
os.path.dirname(cos_key_in),
os.path.basename(cos_key_in)
)
loguru.logger.info(f"COS DOWNLOAD to {destination}")
os.makedirs(
os.path.dirname(destination),
exist_ok=True,
)
for i in range(0, 10):
try:
if "aws_key_id" in list(os.environ.keys()):
yaml_config = os.environ
else:
with open(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"
),
encoding="utf-8",
mode="r+",
) as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
config = CosConfig(
Region=yaml_config["cos_region"],
SecretId=yaml_config["cos_secret_id"],
SecretKey=yaml_config["cos_secret_key"],
)
client = CosS3Client(config)
response = client.download_file(
Bucket=cos_bucket,
Key=cos_key,
DestFilePath=destination
)
break
except CosClientError or CosServiceError as e:
raise Exception(f"COS下载失败 bucket {cos_bucket}; key {cos_key}")
return (
destination,
)
class COSUpload:
"""腾讯云COS上传"""
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"cos_bucket": ("STRING", {"default": "bwkj-cos-1324682537"}),
"path": ("STRING", {"multiline": True}),
"subfolder": ("STRING", {"default": "test"}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("COS文件Key",)
FUNCTION = "upload"
CATEGORY = "不忘科技-自定义节点🚩/对象存储/COS"
def upload(self, cos_bucket, path, subfolder):
dest_key = "/".join(
[
subfolder,
(
path.split("/")[-1]
if "/" in path
else path.split("\\")[-1]
),
]
)
loguru.logger.info(f"COS UPLOAD {path} to {cos_bucket}/{subfolder}")
for i in range(0, 10):
try:
if "aws_key_id" in list(os.environ.keys()):
yaml_config = os.environ
else:
with open(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"
),
encoding="utf-8",
mode="r+",
) as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
config = CosConfig(
Region=yaml_config["cos_region"],
SecretId=yaml_config["cos_secret_id"],
SecretKey=yaml_config["cos_secret_key"],
)
client = CosS3Client(config)
response = client.upload_file(
Bucket=cos_bucket,
Key=dest_key,
LocalFilePath=path,
)
break
except CosClientError or CosServiceError as e:
raise Exception(f"COS上传失败 bucket {cos_bucket}; local_path {path}; subfolder {subfolder}")
return (
dest_key,
)
class S3Download:
"""AWS S3下载"""
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"s3_bucket": ("STRING", {"default": "bw-comfyui-input"}),
"s3_key": ("STRING", {"multiline": True}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("视频存储路径",)
FUNCTION = "download"
CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3"
def download(self, s3_bucket, s3_key):
s3_key_in = s3_key.replace("/", os.sep)
destination = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"download",
os.path.dirname(s3_key_in),
os.path.basename(s3_key_in),
)
loguru.logger.info(f"S3 DOWNLOAD to {destination}")
os.makedirs(
os.path.dirname(destination),
exist_ok=True,
)
try:
if "aws_key_id" in list(os.environ.keys()):
yaml_config = os.environ
else:
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"),
encoding="utf-8", mode="r+") as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"],
aws_secret_access_key=yaml_config["aws_access_key"])
client.download_file(s3_bucket, s3_key, destination)
except Exception as e:
raise Exception(f"S3下载失败 bucket {s3_bucket}; key {s3_key}")
return (destination,)
class S3Upload:
"""AWS S3上传"""
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"s3_bucket": ("STRING", {"default": "bw-comfyui-output"}),
"path": ("STRING", {"multiline": True}),
"subfolder": ("STRING", {"default": "test"}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("S3文件Key",)
FUNCTION = "upload"
CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3"
def upload(self, s3_bucket, path, subfolder):
loguru.logger.info(f"S3 UPLOAD {path} to {s3_bucket}/{subfolder}")
try:
if "aws_key_id" in list(os.environ.keys()):
yaml_config = os.environ
else:
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"),
encoding="utf-8", mode="r+") as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"],
aws_secret_access_key=yaml_config["aws_access_key"])
dest_key = "/".join(
[
subfolder,
(
path.split("/")[-1]
if "/" in path
else path.split("\\")[-1]
),
]
)
client.upload_file(path, s3_bucket, dest_key)
except Exception as e:
raise Exception(f"S3上传失败 bucket {s3_bucket}; local_path {path}; subfolder {subfolder}")
return (dest_key,)
class S3UploadURL:
"""AWS S3上传 返回URL"""
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"path": ("STRING", {"multiline": True}),
"subfolder": ("STRING", {"default": "test"}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("URL",)
FUNCTION = "upload"
CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3"
def upload(self, path, subfolder):
s3_bucket = "modal-media-cache"
loguru.logger.info(f"S3 UPLOAD {path} to {s3_bucket}/{subfolder}")
try:
if "aws_key_id" in list(os.environ.keys()):
yaml_config = os.environ
else:
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"),
encoding="utf-8", mode="r+") as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"],
aws_secret_access_key=yaml_config["aws_access_key"])
dest_key = "/".join(
[
subfolder,
(
path.split("/")[-1]
if "/" in path
else path.split("\\")[-1]
),
]
)
client.upload_file(path, s3_bucket, dest_key)
except Exception as e:
raise Exception(f"S3上传失败 bucket {s3_bucket}; local_path {path}; subfolder {subfolder}")
url = f"https://cdn.roasmax.cn/{dest_key}"
return (url,)
class S3UploadIMAGEURL:
"""AWS S3上传 返回URL"""
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE", {"multiline": True}),
"subfolder": ("STRING", {"default": "test"}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("URL",)
FUNCTION = "upload"
CATEGORY = "不忘科技-自定义节点🚩/对象存储/S3"
def upload(self, image:torch.Tensor, subfolder):
s3_bucket = "modal-media-cache"
loguru.logger.info(f"S3 UPLOAD image to {s3_bucket}/{subfolder}")
path = tensor_to_tempfile(image).name
try:
if "aws_key_id" in list(os.environ.keys()):
yaml_config = os.environ
else:
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.yaml"),
encoding="utf-8", mode="r+") as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
client = boto3.client("s3", aws_access_key_id=yaml_config["aws_key_id"],
aws_secret_access_key=yaml_config["aws_access_key"])
dest_key = "/".join(
[
subfolder,
(
path.split("/")[-1]
if "/" in path
else path.split("\\")[-1]
),
]
)
client.upload_file(path, s3_bucket, dest_key)
except Exception as e:
raise Exception(f"S3上传失败 bucket {s3_bucket}; local_path {path}; subfolder {subfolder}")
url = f"https://cdn.roasmax.cn/{dest_key}"
return (url,)