# -*- coding:utf-8 -*- """ File cos_utils.py Author silence Date 2025/6/11 20:37 """ import mimetypes import os from loguru import logger from qcloud_cos import CosConfig from qcloud_cos import CosS3Client # COS配置 COS_BUCKET_NAME = 'sucai-1324682537' COS_SECRET_ID = 'AKIDsrihIyjZOBsjimt8TsN8yvv1AMh5dB44' COS_SECRET_KEY = 'CPZcxdk6W39Jd4cGY95wvupoyMd0YFqW' COS_REGION = 'ap-shanghai' class CosUploadNode: @classmethod def INPUT_TYPES(s): return { "required": { "file_path": ("STRING", {"forceInput": True}), }, "optional": { "remove_source": ("BOOLEAN", {"default": False}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("upload_url",) FUNCTION = "upload_to_cos" CATEGORY = "不忘科技-自定义节点🚩" def upload_to_cos(self, file_path, remove_source=False): try: # 检查文件是否存在 if not os.path.isfile(file_path): raise Exception(f"文件不存在: {file_path}") # 获取文件MIME类型和分类 mime_type, _ = mimetypes.guess_type(file_path) if mime_type: category = mime_type.split('/')[0] else: category = 'unknown' file_name = os.path.basename(file_path) object_key = f'tk/{category}/{file_name}' # 配置COS客户端 config = CosConfig( Region=COS_REGION, SecretId=COS_SECRET_ID, SecretKey=COS_SECRET_KEY ) client = CosS3Client(config) # 上传文件 client.upload_file( Bucket=COS_BUCKET_NAME, Key=object_key, LocalFilePath=file_path, EnableMD5=False ) # 生成访问URL upload_url = f'https://{COS_BUCKET_NAME}.cos.{COS_REGION}.myqcloud.com/{object_key}' logger.info(f'文件上传成功: {upload_url}') # 如果需要删除源文件 if remove_source and os.path.exists(file_path): try: os.remove(file_path) logger.info(f'源文件已删除: {file_path}') except Exception as e: logger.warning(f'删除源文件失败: {e}') return (upload_url,) except Exception as e: error_msg = f'上传失败: {str(e)}' logger.error(error_msg) # 返回错误信息而不是抛出异常,这样ComfyUI不会中断流程 return (f"ERROR: {error_msg}",) # 节点注册 NODE_CLASS_MAPPINGS = { "CosUploadNode": CosUploadNode } NODE_DISPLAY_NAME_MAPPINGS = { "CosUploadNode": "腾讯COS上传" }