import asyncio import asyncio.exceptions import json import os import random import subprocess import time import traceback from dataclasses import dataclass from datetime import datetime from typing import Dict, List, Optional import aiofiles import aiohttp from loguru import logger from models import TaskStatus # 从models.py导入TaskStatus from moviepy.editor import VideoFileClip from pydub import AudioSegment # 配置日志输出 # 创建logs目录 os.makedirs("logs", exist_ok=True) # 添加错误日志文件输出(只输出ERROR级别的日志) logger.add( "logs/error.log", rotation="500 MB", retention="10 days", level="ERROR", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", ) @dataclass class Machine: name: str host: str port: int max_concurrent: int enabled: bool current_tasks: int = 0 lock: Optional[asyncio.Lock] = None class ProcessManager: def __init__(self, config_path: str, log_dir: str = "logs", task_manager=None): self.machines: Dict[str, Machine] = {} self.config_path = config_path self.log_dir = log_dir self.request_log_file = None self.task_manager = task_manager # 添加task_manager引用 self._load_config() # 添加文件锁字典 self.file_locks: Dict[str, asyncio.Lock] = {} self.output_dir_locks: Dict[str, asyncio.Lock] = {} # 添加任务队列 self.task_queue: asyncio.Queue = asyncio.Queue() # 添加状态汇报任务 self.status_task: Optional[asyncio.Task] = None # 添加运行状态标志 self.is_running = False # 添加处理任务列表 self.process_tasks: List[asyncio.Task] = [] # 初始化日志文件 self._init_log_file() # 添加机器使用统计 self.machine_stats: Dict[str, Dict] = {} for machine_name in self.machines: self.machine_stats[machine_name] = { "total_tasks": 0, "current_tasks": 0, "last_used": None, "average_processing_time": 0, "success_rate": 1.0, "total_processing_time": 0, } def _init_log_file(self): """初始化日志文件""" # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) # 创建新的日志文件 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.request_log_file = os.path.join(self.log_dir, f"video_generation_requests_{timestamp}.md") # 创建表头 with open(self.request_log_file, "w", encoding="utf-8") as f: f.write( f"""# 视频生成请求记录 - {timestamp} | 开始时间 | 结束时间 | 用时(秒) | 设备 | 视频大小(MB) | 音频大小(MB) | 视频时长(秒) | 音频时长(秒) | 视频名称 | 音频名称 | 输出路径 | 状态 | 错误信息 | |----------|----------|----------|------|-------------|-------------|-------------|-------------|----------|----------|----------|------|----------| """ ) def write_request_log( self, start_time: float, end_time: float, machine_name: str, video_size: float, audio_size: float, video_duration: float, audio_duration: float, video_name: str, audio_name: str, output_path: str, status: bool, error_msg: str = "", ): """记录视频生成请求的详细信息""" if not self.request_log_file: logger.error("日志文件未初始化") return duration = end_time - start_time # 追加新的记录 with open(self.request_log_file, "a", encoding="utf-8") as f: f.write( f"| {datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')} | " f"{datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')} | " f"{duration:.2f} | {machine_name} | {video_size:.2f} | {audio_size:.2f} | " f"{video_duration:.2f} | {audio_duration:.2f} | {video_name} | {audio_name} | " f"{output_path} | {'成功' if status else '失败'} | {error_msg} |\n" ) def _load_config(self): """加载机器配置""" config_path = os.path.abspath(self.config_path) with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) for machine_config in config["machines"]: if machine_config["enabled"]: machine = Machine( name=machine_config["name"], host=machine_config["host"], port=machine_config["port"], max_concurrent=machine_config["max_concurrent"], enabled=machine_config["enabled"], lock=asyncio.Lock(), ) self.machines[machine.name] = machine async def get_available_machine(self) -> Optional[Machine]: """获取一个可用的机器""" for machine in self.machines.values(): async with machine.lock: # type: ignore if machine.current_tasks < machine.max_concurrent: machine.current_tasks += 1 return machine return None async def release_machine(self, machine: Machine): """释放机器资源""" async with machine.lock: # type: ignore machine.current_tasks -= 1 def _get_file_lock(self, file_path: str) -> asyncio.Lock: """获取文件锁""" if file_path not in self.file_locks: self.file_locks[file_path] = asyncio.Lock() return self.file_locks[file_path] def _get_output_dir_lock(self, output_dir: str) -> asyncio.Lock: """获取输出目录锁""" if output_dir not in self.output_dir_locks: self.output_dir_locks[output_dir] = asyncio.Lock() return self.output_dir_locks[output_dir] async def _safe_copy_file(self, src: str, dst: str, timeout: float = 300) -> bool: """安全地复制文件,带超时控制""" try: # 获取文件锁 file_lock = self._get_file_lock(src) # 尝试获取锁,带超时控制 try: async with asyncio.timeout(timeout): await file_lock.acquire() except asyncio.exceptions.TimeoutError: logger.warning(f"等待文件锁超时: {src}") return False except Exception as e: logger.error(f"获取文件锁失败: {src}, 错误: {str(e)}") return False try: # 确保源文件存在 if not os.path.exists(src): logger.error(f"源文件不存在: {src}") return False # 确保目标目录存在 os.makedirs(os.path.dirname(dst), exist_ok=True) # 复制文件 async with aiofiles.open(src, "rb") as fsrc: content = await fsrc.read() async with aiofiles.open(dst, "wb") as fdst: await fdst.write(content) return True finally: file_lock.release() except Exception as e: logger.error(f"复制文件失败 {src} -> {dst}: {str(e)}") return False async def start(self): """启动进程管理器""" self.is_running = True # 启动状态汇报任务 self.status_task = asyncio.create_task(self._status_report()) # 启动任务处理循环 # 计算总的最大并发数 total_max_concurrent = sum(machine.max_concurrent for machine in self.machines.values()) self.process_tasks = [asyncio.create_task(self._process_tasks()) for _ in range(total_max_concurrent)] async def stop(self): """停止进程管理器""" self.is_running = False if self.status_task: self.status_task.cancel() try: await self.status_task except asyncio.CancelledError: pass # 取消所有处理任务 for task in self.process_tasks: task.cancel() try: await task except asyncio.CancelledError: pass async def _status_report(self): """定时打印状态汇报""" while self.is_running: report: List[str] = [] # 汇报等待队列中的任务数量 report.append(f"\n状态: 队列等待任务数量: {self.task_queue.qsize()}") # 更新机器统计信息 for machine_name, machine in self.machines.items(): stats = self.machine_stats[machine_name] stats["current_tasks"] = machine.current_tasks # 计算使用率 usage_rate = (machine.current_tasks / machine.max_concurrent) * 100 report.append(f" - {machine_name}: 当前负载: {machine.current_tasks}/{machine.max_concurrent} ({usage_rate:.1f}%)") logger.info("\n".join(report)) await asyncio.sleep(5) # 每5秒打印一次状态 async def _process_tasks(self): """处理任务队列中的任务""" while self.is_running: try: # 从队列中获取任务 task = await self.task_queue.get() logger.info(f"获取任务: {task}") task_id, audio_file_path, video_file_path, output_file_path = task # 等待可用机器 machine = await self._wait_for_available_machine() if not machine: logger.error("无法获取可用机器,任务将重新入队") # 将任务重新放回队列 await self.task_queue.put(task) # 更新任务状态为失败 if self.task_manager and task_id: await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message="无法获取可用机器") self.task_queue.task_done() await asyncio.sleep(5) # 等待5秒后重试 continue # 处理任务 try: success = await self._process_single_task(task_id, machine, audio_file_path, video_file_path, output_file_path) if not success: logger.error(f"任务处理失败: \n音频文件: {audio_file_path}\n视频文件: {video_file_path}\n输出文件: {output_file_path}") # 更新任务状态为失败 if self.task_manager and task_id: await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message="任务处理失败") finally: self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"任务处理出错: {str(e)} {traceback.format_exc()}") # 更新任务状态为失败 if self.task_manager and task_id: await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message=str(e)) self.task_queue.task_done() async def _wait_for_available_machine(self, timeout: float = 3600) -> Optional[Machine]: """等待获取可用机器,只关注负载""" start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < timeout: # 获取所有未满的机器 available_machines = [] for machine in self.machines.values(): async with machine.lock: # type: ignore if machine.current_tasks < machine.max_concurrent: available_machines.append(machine) if available_machines: # 选择负载最低的机器 selected_machine = min(available_machines, key=lambda m: m.current_tasks / m.max_concurrent) # 再次检查机器是否可用 async with selected_machine.lock: # type: ignore if selected_machine.current_tasks < selected_machine.max_concurrent: selected_machine.current_tasks += 1 logger.info( f"选择机器 {selected_machine.name} 处理任务,当前负载: {selected_machine.current_tasks}/{selected_machine.max_concurrent}" ) return selected_machine # 所有机器都满了,等待后重试 logger.warning("所有机器都已满,等待资源释放") await asyncio.sleep(5) async def _process_single_task(self, task_id: str, machine: Machine, audio_file_path: str, video_file_path: str, output_file_path: str) -> bool: """处理单个任务""" start_time = time.time() success = False try: logger.info(f"开始处理视频 {output_file_path}") logger.info(f"任务ID: {task_id}") logger.info(f"音频文件路径: {audio_file_path} 是否存在: {os.path.exists(audio_file_path)}") logger.info(f"视频文件路径: {video_file_path} 是否存在: {os.path.exists(video_file_path)}") logger.info(f"使用机器: {machine.name}") url = f"http://{machine.host}:{machine.port}/process" async with aiohttp.ClientSession() as session: # 准备文件数据 data = aiohttp.FormData() # 直接使用原文件 async with aiofiles.open(audio_file_path, "rb") as f: audio_content = await f.read() data.add_field("audio", audio_content, filename=os.path.basename(audio_file_path), content_type="audio/mpeg") async with aiofiles.open(video_file_path, "rb") as f: video_content = await f.read() data.add_field("video", video_content, filename=os.path.basename(video_file_path), content_type="video/mp4") # 获取音频和视频时长 try: # 获取音频时长 audio = AudioSegment.from_mp3(audio_file_path) audio_duration = len(audio) / 1000.0 # 转换为秒 # 获取视频时长 video = VideoFileClip(video_file_path) video_duration = video.duration video.close() logger.info(f"音频时长: {audio_duration:.2f}秒") logger.info(f"视频时长: {video_duration:.2f}秒") except Exception as e: logger.warning(f"获取音频或视频时长失败: {str(e)}") # 如果视频时长小于音频时长+1s,则使用ffmpeg通过循环的形式延长视频时长到音频时长+1s if video_duration < audio_duration + 1: logger.info(f"视频时长小于音频时长+1s,通过循环的形式延长视频时长到音频时长+1s") # 计算需要循环的次数 target_duration = audio_duration + 1 loop_count = int(target_duration / video_duration) + 1 # 使用ffmpeg通过循环的形式延长视频时长 command = [ "ffmpeg", "-stream_loop", str(loop_count - 1), # 循环次数减1,因为原始视频也算一次 "-i", video_file_path, "-c", "copy", "-t", str(target_duration), # 设置目标时长 "-f", "mp4", # 明确指定输出格式 "-y", video_file_path + ".tmp", # 使用临时文件避免覆盖原文件 ] try: # 使用 subprocess.run 替代 asyncio.create_subprocess_exec logger.info(f"执行FFmpeg命令: {' '.join(command)}") result = subprocess.run(command, capture_output=True, text=True, check=False) if result.returncode != 0: error_msg = result.stderr if result.stderr else "未知错误" logger.error(f"扩展视频时长失败,FFmpeg返回码: {result.returncode}, 错误信息: {error_msg}") return False logger.info(f"扩展视频时长完成,扩展后视频时长: {target_duration:.2f}秒") # 替换原文件 os.replace(video_file_path + ".tmp", video_file_path) except Exception as e: import traceback error_trace = traceback.format_exc() logger.error(f"扩展视频时长时发生错误:\n命令: {' '.join(command)}\n错误信息: {str(e)}\n完整堆栈:\n{error_trace}") return False # 如果视频时长大于音频时长+1s,则随机裁剪到音频时长+1s elif video_duration > audio_duration + 1: logger.info(f"视频时长大于音频时长+1s,随机裁剪到音频时长+1s") target_duration = audio_duration + 1 # 计算随机起始时间,确保剩余时长足够 max_start_time = video_duration - target_duration start_time = random.uniform(0, max_start_time) # 使用ffmpeg随机裁剪视频 command = [ "ffmpeg", "-ss", str(start_time), "-i", video_file_path, "-t", str(target_duration), "-c", "copy", "-f", "mp4", # 明确指定输出格式 "-y", video_file_path + ".tmp", ] logger.info(f"实际请求音视频时长: {target_duration:.2f}秒") try: # 使用 subprocess.run 替代 asyncio.create_subprocess_exec logger.info(f"执行FFmpeg命令: {' '.join(command)}") result = subprocess.run(command, capture_output=True, text=True, check=False) if result.returncode != 0: error_msg = result.stderr if result.stderr else "未知错误" logger.error(f"裁剪视频时长失败,FFmpeg返回码: {result.returncode}, 错误信息: {error_msg}") return False logger.info(f"裁剪视频时长完成,裁剪后视频时长: {target_duration:.2f}秒") # 替换原文件 os.replace(video_file_path + ".tmp", video_file_path) logger.info(f"替换原文件完成") except Exception as e: import traceback error_trace = traceback.format_exc() logger.error(f"裁剪视频时长时发生错误:\n命令: {' '.join(command)}\n错误信息: {str(e)}\n完整堆栈:\n{error_trace}") return False # 打印文件大小 audio_size = len(audio_content) / (1024 * 1024) # 转换为MB video_size = len(video_content) / (1024 * 1024) # 转换为MB logger.info(f"音频文件大小: {audio_size:.2f}MB") logger.info(f"视频文件大小: {video_size:.2f}MB") # 发送请求 logger.info(f"开始进行视频生成,视频名称: {os.path.basename(video_file_path)}, 音频名称: {os.path.basename(audio_file_path)}") start_time = time.time() # 设置30分钟超时 timeout = aiohttp.ClientTimeout(total=1800) # 30分钟 = 1800秒 async with session.post(url, data=data, timeout=timeout) as response: if response.status == 200: # 检查响应的 Content-Type content_type = response.headers.get("Content-Type", "").lower() # 如果是 JSON 响应 if "application/json" in content_type: try: response_json = await response.json() if not response_json.get("status", True): # 如果status为false或不存在 error_msg = response_json.get("msg", "未知错误") logger.error(f"服务器处理失败: {error_msg}") self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, False, error_msg, ) return False except json.JSONDecodeError: logger.error("解析JSON响应失败") self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, False, "解析JSON响应失败", ) return False # 如果是视频文件响应 elif "video/mp4" in content_type: # 确保输出目录存在 output_dir = os.path.dirname(output_file_path) output_lock = self._get_output_dir_lock(output_dir) # 尝试获取输出目录锁,带超时控制 try: async with asyncio.timeout(300): # 5分钟超时 await output_lock.acquire() except asyncio.exceptions.TimeoutError: logger.error(f"等待输出目录锁超时: {output_dir}") self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, False, f"等待输出目录锁超时: {output_dir}", ) return False try: os.makedirs(output_dir, exist_ok=True) # 保存响应内容,添加重试机制 max_retries = 3 retry_delay = 5 # 秒 last_error = None for attempt in range(max_retries): try: # 读取响应内容 content = await response.read() if not content: raise ValueError("响应内容为空") # 保存文件 async with aiofiles.open(output_file_path, "wb") as f: await f.write(content) # 验证文件是否成功保存 if not os.path.exists(output_file_path) or os.path.getsize(output_file_path) == 0: raise ValueError("文件保存失败或文件大小为0") logger.info(f"成功保存视频文件: {output_file_path}") # 更新任务状态为完成 if self.task_manager and task_id: await self.task_manager.update_task_status(task_id, TaskStatus.COMPLETED) self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, True, ) return True except Exception as e: last_error = e if attempt < max_retries - 1: logger.warning(f"保存视频文件失败,{retry_delay}秒后重试 ({attempt + 1}/{max_retries}): {str(e)}") await asyncio.sleep(retry_delay) else: logger.error(f"保存视频文件失败,达到最大重试次数: {str(last_error)}") # 更新任务状态为失败 if self.task_manager and task_id: await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message=str(last_error)) self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, False, f"保存视频文件失败: {str(last_error)}", ) return False finally: output_lock.release() else: logger.error(f"未知的响应类型: {content_type}") self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, False, f"未知的响应类型: {content_type}", ) return False else: error_text = await response.text() try: error_json = json.loads(error_text) error_msg = error_json.get("msg", error_text) logger.error(f"处理视频失败: HTTP {response.status} - {error_msg}") except json.JSONDecodeError: logger.error(f"处理视频失败: HTTP {response.status} - {error_text}") self.write_request_log( start_time, time.time(), machine.name, video_size, audio_size, target_duration, audio_duration, os.path.basename(video_file_path), os.path.basename(audio_file_path), output_file_path, False, f"HTTP {response.status} - {error_text}", ) return False return success except Exception as e: logger.error(f"处理视频时发生错误: {str(e)}") return False finally: await self.release_machine(machine) async def process_video(self, task_id: str, audio_file_path: str, video_file_path: str, out_file_path: str) -> Optional[str]: """添加视频处理任务到队列""" try: # 生成输出文件路径 # 添加任务到队列 await self.task_queue.put((task_id, audio_file_path, video_file_path, out_file_path)) return out_file_path except Exception as e: logger.error(f"添加任务到队列失败: {str(e)}") return None def get_machine_status(self) -> List[Dict]: """获取所有机器的状态""" return [ { "name": machine.name, "host": machine.host, "current_tasks": machine.current_tasks, "max_concurrent": machine.max_concurrent, "enabled": machine.enabled, } for machine in self.machines.values() ] async def main(): """ 命令行入口函数 """ import argparse parser = argparse.ArgumentParser(description="视频处理管理器") args = parser.parse_args() logger.info("启动视频处理管理器...") # 初始化进程管理器 process_manager = ProcessManager(config_path="config/machines.json") await process_manager.start() try: # 保持程序运行 while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("收到停止信号,正在关闭...") finally: await process_manager.stop() if __name__ == "__main__": exit(asyncio.run(main()))