ComfyUI-CustomNode/nodes/lip_sync_node.py

183 lines
7.0 KiB
Python

# -*- coding:utf-8 -*-
"""
File lip_sync_node.py
Author silence
Date 2025/9/9 17:39
"""
import io
import mimetypes
import os
import time
import folder_paths
import logging
import numpy as np
import httpx
from PIL import Image
import scipy.io.wavfile as wavfile
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("hedra api")
class HedraLipNode:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image": ("IMAGE", {"description": "图片文件"}),
"audio": ("AUDIO",),
"env": (["prod", "dev", "test"], {"default": "prod"}),
},
"optional": {
"prompt": ("STRING", {"description": "【可选】 文本提示词", "multiline": True}),
"resolution": (['720p', '540p'], {"default": "720p"}),
"aspect_ratio": (["1:1", "9:16", "16:9"], {"default": "1:1"}),
"interval": ("INT", {"default": 3, "min": 1, "max": 60}),
"timeout": ("INT", {"default": 300, "min": 10, "max": 3600}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("video_url",)
FUNCTION = "execute"
CATEGORY = "不忘科技-自定义节点🚩/api/hedra对嘴型"
url_mapping = {
"prod": "https://bowongai-prod--text-video-agent-fastapi-app.modal.run",
"dev": "https://bowongai-dev--text-video-agent-fastapi-app.modal.run",
"test": "https://bowongai-test--text-video-agent-fastapi-app.modal.run"
}
def save_audio_tensor_to_temp(self, waveform_tensor, sample_rate):
if 'wavfile' not in globals():
raise ImportError("Scipy 库未安装。请在您的 ComfyUI 环境中运行 'pip install scipy' 来启用此功能。")
waveform_np = waveform_tensor.cpu().numpy()
if waveform_np.ndim == 3:
waveform_np = waveform_np[0]
waveform_np = waveform_np.T
waveform_int16 = np.int16(waveform_np * 32767)
output_dir = folder_paths.get_temp_directory()
(full_output_folder, filename, counter, _, _) = folder_paths.get_save_image_path("llm_temp_audio", output_dir)
filepath = os.path.join(full_output_folder, f"{filename}_{counter:05}.wav")
wavfile.write(filepath, sample_rate, waveform_int16)
print(f"音频张量已使用 Scipy 保存到临时文件: {filepath}")
return filepath
def execute(self, image, audio, env: str,
prompt: str, resolution: str, aspect_ratio: str,
timeout: int = 300,
interval: int = 3
):
img_tensor = image[0]
img_np = np.clip(255. * img_tensor.cpu().numpy(), 0, 255).astype(np.uint8)
pil_image = Image.fromarray(img_np)
buffer = io.BytesIO()
pil_image.save(buffer, format="PNG")
buffer.seek(0)
audio_info = audio[0] if isinstance(audio, (list, tuple)) and audio else audio
if isinstance(audio_info, dict) and 'filename' in audio_info:
filename = audio_info['filename']
print(f"从音频对象中找到 'filename': '{filename}'")
full_path = folder_paths.get_full_path("input", filename)
if full_path and os.path.exists(full_path):
media_path = full_path
else:
return (f"错误: 无法在 'input' 文件夹中找到文件 '{filename}'",)
elif isinstance(audio_info, dict) and 'waveform' in audio_info and 'sample_rate' in audio_info:
print("从音频对象中找到 'waveform' 数据,正在使用 Scipy 保存为临时文件...")
try:
media_path = self.save_audio_tensor_to_temp(audio_info['waveform'], audio_info['sample_rate'])
except Exception as e:
return (f"错误: 保存音频张量时出错: {e}",)
elif isinstance(audio_info, str):
print(f"检测到音频输入为字符串,作为文件名处理: '{audio_info}'")
full_path = folder_paths.get_full_path("input", audio_info)
if full_path and os.path.exists(full_path):
media_path = full_path
else:
return (f"错误: 无法在 'input' 文件夹中找到文件 '{audio_info}'",)
else:
return (f"错误: 不支持的音频输入格式或结构。收到类型: {type(audio_info)}",)
headers = {
'accept': 'application/json',
}
if not media_path:
raise ValueError(f'parse audio data failed...')
audio_mime_type = mimetypes.guess_type(media_path)[0]
audio_name = os.path.basename(media_path)
img_file_name = f'{time.time_ns()}.png'
prompt = prompt or ''
prompt = prompt.strip()
files = {
'img_file': (img_file_name, buffer, 'image/png'),
'audio_file': (audio_name, open(media_path, 'rb'), audio_mime_type),
'resolution': (None, resolution),
'aspect_ratio': (None, aspect_ratio),
'prompt': (None, prompt)
}
url = self.url_mapping[env]
api_url = f'{url}/api/302/hedra/v3/submit/task'
print(f'api_url: {api_url}')
response = httpx.post(
api_url,
headers=headers,
files=files,
timeout=120
)
response.raise_for_status()
resp_json = response.json()
if resp_json.get('status'):
task_id = resp_json.get('data')
res = self.sync_query_result(task_id, url, timeout=timeout, interval=interval)
return (res,)
else:
error_msg = resp_json.get('msg', '未知API错误')
raise ValueError(f"API返回失败: {error_msg}")
def sync_query_result(self, task_id: str, base_url: str,
timeout: int = 600, interval: int = 3):
def query_task_result(t_id: str):
headers = {
'accept': 'application/json',
}
params = {
'task_id': t_id,
}
nonlocal base_url
api_url = f'{base_url}/api/302/hedra/v3/task/status'
response = httpx.get(
api_url,
params=params,
headers=headers,
)
response.raise_for_status()
print(f'query_task_result: {response.text}')
return response.json()
end = time.time() + timeout
while time.time() <= end:
tmp_dict = query_task_result(task_id)
if tmp_dict['status']:
video_url = tmp_dict['data']
return video_url
else:
print(f'wait next interval: {interval}')
time.sleep(interval)
else:
raise ValueError(f'query task timeout: {timeout}')
NODE_CLASS_MAPPINGS = {
"HedraLipNode": HedraLipNode
}
NODE_DISPLAY_NAME_MAPPINGS = {
"HedraLipNode": "hedra对嘴型"
}