# -*- coding:utf-8 -*- """ File lip_sync_node.py Author silence Date 2025/9/9 17:39 """ import io import mimetypes import os import time import folder_paths import logging import numpy as np import httpx from PIL import Image import scipy.io.wavfile as wavfile logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("hedra api") class HedraLipNode: @classmethod def INPUT_TYPES(cls): return { "required": { "image": ("IMAGE", {"description": "图片文件"}), "audio": ("AUDIO",), "env": (["prod", "dev", "test"], {"default": "prod"}), }, "optional": { "prompt": ("STRING", {"description": "【可选】 文本提示词", "multiline": True}), "resolution": (['720p', '540p'], {"default": "720p"}), "aspect_ratio": (["1:1", "9:16", "16:9"], {"default": "1:1"}), "interval": ("INT", {"default": 3, "min": 1, "max": 60}), "timeout": ("INT", {"default": 300, "min": 10, "max": 3600}), } } RETURN_TYPES = ("STRING",) RETURN_NAMES = ("video_url",) FUNCTION = "execute" CATEGORY = "不忘科技-自定义节点🚩/api/hedra对嘴型" url_mapping = { "prod": "https://bowongai-prod--text-video-agent-fastapi-app.modal.run", "dev": "https://bowongai-dev--text-video-agent-fastapi-app.modal.run", "test": "https://bowongai-test--text-video-agent-fastapi-app.modal.run" } def save_audio_tensor_to_temp(self, waveform_tensor, sample_rate): if 'wavfile' not in globals(): raise ImportError("Scipy 库未安装。请在您的 ComfyUI 环境中运行 'pip install scipy' 来启用此功能。") waveform_np = waveform_tensor.cpu().numpy() if waveform_np.ndim == 3: waveform_np = waveform_np[0] waveform_np = waveform_np.T waveform_int16 = np.int16(waveform_np * 32767) output_dir = folder_paths.get_temp_directory() (full_output_folder, filename, counter, _, _) = folder_paths.get_save_image_path("llm_temp_audio", output_dir) filepath = os.path.join(full_output_folder, f"{filename}_{counter:05}.wav") wavfile.write(filepath, sample_rate, waveform_int16) print(f"音频张量已使用 Scipy 保存到临时文件: {filepath}") return filepath def execute(self, image, audio, env: str, prompt: str, resolution: str, aspect_ratio: str, timeout: int = 300, interval: int = 3 ): img_tensor = image[0] img_np = np.clip(255. * img_tensor.cpu().numpy(), 0, 255).astype(np.uint8) pil_image = Image.fromarray(img_np) buffer = io.BytesIO() pil_image.save(buffer, format="PNG") buffer.seek(0) audio_info = audio[0] if isinstance(audio, (list, tuple)) and audio else audio if isinstance(audio_info, dict) and 'filename' in audio_info: filename = audio_info['filename'] print(f"从音频对象中找到 'filename': '{filename}'") full_path = folder_paths.get_full_path("input", filename) if full_path and os.path.exists(full_path): media_path = full_path else: return (f"错误: 无法在 'input' 文件夹中找到文件 '{filename}'。",) elif isinstance(audio_info, dict) and 'waveform' in audio_info and 'sample_rate' in audio_info: print("从音频对象中找到 'waveform' 数据,正在使用 Scipy 保存为临时文件...") try: media_path = self.save_audio_tensor_to_temp(audio_info['waveform'], audio_info['sample_rate']) except Exception as e: return (f"错误: 保存音频张量时出错: {e}",) elif isinstance(audio_info, str): print(f"检测到音频输入为字符串,作为文件名处理: '{audio_info}'") full_path = folder_paths.get_full_path("input", audio_info) if full_path and os.path.exists(full_path): media_path = full_path else: return (f"错误: 无法在 'input' 文件夹中找到文件 '{audio_info}'。",) else: return (f"错误: 不支持的音频输入格式或结构。收到类型: {type(audio_info)}",) headers = { 'accept': 'application/json', } if not media_path: raise ValueError(f'parse audio data failed...') audio_mime_type = mimetypes.guess_type(media_path)[0] audio_name = os.path.basename(media_path) img_file_name = f'{time.time_ns()}.png' prompt = prompt or '' prompt = prompt.strip() files = { 'img_file': (img_file_name, buffer, 'image/png'), 'audio_file': (audio_name, open(media_path, 'rb'), audio_mime_type), 'resolution': (None, resolution), 'aspect_ratio': (None, aspect_ratio), 'prompt': (None, prompt) } url = self.url_mapping[env] api_url = f'{url}/api/302/hedra/v3/submit/task' print(f'api_url: {api_url}') response = httpx.post( api_url, headers=headers, files=files, timeout=120 ) response.raise_for_status() resp_json = response.json() if resp_json.get('status'): task_id = resp_json.get('data') res = self.sync_query_result(task_id, url, timeout=timeout, interval=interval) return (res,) else: error_msg = resp_json.get('msg', '未知API错误') raise ValueError(f"API返回失败: {error_msg}") def sync_query_result(self, task_id: str, base_url: str, timeout: int = 600, interval: int = 3): def query_task_result(t_id: str): headers = { 'accept': 'application/json', } params = { 'task_id': t_id, } nonlocal base_url api_url = f'{base_url}/api/302/hedra/v3/task/status' response = httpx.get( api_url, params=params, headers=headers, ) response.raise_for_status() print(f'query_task_result: {response.text}') return response.json() end = time.time() + timeout while time.time() <= end: tmp_dict = query_task_result(task_id) if tmp_dict['status']: video_url = tmp_dict['data'] return video_url else: print(f'wait next interval: {interval}') time.sleep(interval) else: raise ValueError(f'query task timeout: {timeout}') NODE_CLASS_MAPPINGS = { "HedraLipNode": HedraLipNode } NODE_DISPLAY_NAME_MAPPINGS = { "HedraLipNode": "hedra对嘴型" }