import asyncio import logging from typing import Any, Optional import aiohttp from app.comfy.comfy_workflow import ComfyWorkflow from app.comfy.comfy_run import ComfyRun from app.config import Settings from app.comfy.comfy_server import server_manager, ComfyUIServerInfo settings = Settings() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 全局任务队列管理器 class WorkflowQueueManager: def __init__(self, monitor_interval: int = 5): self.running_tasks = {} # server_url -> task_info self.pending_tasks = [] # 等待队列 self.lock = asyncio.Lock() self._queue_monitor_task: Optional[asyncio.Task] = None self._monitor_interval = monitor_interval # 监控间隔(秒) self._last_processing_time = None # 上次处理队列的时间 async def start_queue_monitor(self): """启动队列监控任务""" if self._queue_monitor_task is None or self._queue_monitor_task.done(): self._queue_monitor_task = asyncio.create_task(self._monitor_queue()) logger.info(f"队列监控任务已启动,监控间隔: {self._monitor_interval}秒") async def stop_queue_monitor(self): """停止队列监控任务""" if self._queue_monitor_task and not self._queue_monitor_task.done(): self._queue_monitor_task.cancel() try: await self._queue_monitor_task except asyncio.CancelledError: pass logger.info("队列监控任务已停止") async def add_task( self, workflow: ComfyWorkflow, request_data: dict, ): """添加新任务到队列""" async with self.lock: # 创建ComfyRun实例并保存到数据库 comfy_run = await ComfyRun.create(workflow, request_data) # 添加到待处理队列 self.pending_tasks.append(comfy_run.run_id) logger.info( f"任务 {comfy_run.run_id} 已添加到队列,当前队列长度: {len(self.pending_tasks)}" ) return comfy_run.run_id async def get_server_status( self, server: ComfyUIServerInfo, session: aiohttp.ClientSession ) -> dict[str, Any]: """ 检查单个ComfyUI服务器的详细状态。 返回一个包含可达性、队列状态和详细队列内容的字典。 """ # [BUG修复] 确保初始字典结构与成功时的结构一致,以满足Pydantic模型 status_info = { "is_reachable": False, "is_free": False, "queue_details": {"running_count": 0, "pending_count": 0}, } try: queue_url = f"{server.http_url}/queue" async with session.get(queue_url, timeout=60) as response: response.raise_for_status() queue_data = await response.json() status_info["is_reachable"] = True running_count = len(queue_data.get("queue_running", [])) pending_count = len(queue_data.get("queue_pending", [])) status_info["queue_details"] = { "running_count": running_count, "pending_count": pending_count, } status_info["is_free"] = running_count == 0 and pending_count == 0 except Exception as e: # 当请求失败时,将返回上面定义的、结构正确的初始 status_info logger.warning(f"无法检查服务器 {server.http_url} 的队列状态: {e}") return status_info async def _monitor_queue(self): """定期监控队列,当有可用服务器时自动处理""" while True: try: await asyncio.sleep(self._monitor_interval) # 每次定时检测触发时,打印当前状态 async with self.lock: from datetime import datetime pending_count = len(self.pending_tasks) running_count = len(self.running_tasks) current_time = datetime.now() timestamp = current_time.strftime("%Y-%m-%d %H:%M:%S") # 计算距离上次处理队列的时间 time_since_last_processing = "" if self._last_processing_time: time_diff = current_time - self._last_processing_time minutes = int(time_diff.total_seconds() // 60) seconds = int(time_diff.total_seconds() % 60) time_since_last_processing = ( f"上次处理: {minutes}分{seconds}秒前" ) else: time_since_last_processing = "上次处理: 从未" # 格式化状态信息,提高可读性 status_info = ( f"⏰ 定时检测触发 [{timestamp}]\t" f"待处理/运行中: {pending_count}/{running_count}\t" f"监控间隔: {self._monitor_interval} 秒\t" f"🕐 {time_since_last_processing}" ) logger.info(status_info) if self.pending_tasks: available_servers = await self._get_available_servers() if available_servers: logger.info( f"监控发现 {len(available_servers)} 个可用服务器,开始处理队列" ) # 使用create_task避免阻塞监控循环 asyncio.create_task(self._process_queue()) else: # 只在有任务等待时记录日志,避免日志噪音 if len(self.pending_tasks) > 0: logger.debug( f"队列中有 {len(self.pending_tasks)} 个待处理任务,但无可用服务器" ) else: # 队列为空时,减少日志输出 logger.debug("队列为空,继续监控中...") except asyncio.CancelledError: logger.info("队列监控任务被取消") break except Exception as e: logger.error(f"队列监控任务出错: {e}") # 出错时稍微延长等待时间,避免频繁重试 await asyncio.sleep(self._monitor_interval * 2) async def _process_queue(self): """处理队列中的任务""" async with self.lock: # 更新上次处理队列的时间 from datetime import datetime self._last_processing_time = datetime.now() if not self.pending_tasks: logger.debug("队列为空,无需处理") return # 检查是否有空闲的服务器 available_servers = await self._get_available_servers() if not available_servers: logger.info( f"队列中有 {len(self.pending_tasks)} 个待处理任务,但没有可用的服务器" ) return # 获取一个待处理任务 workflow_run_id = self.pending_tasks.pop(0) server = available_servers[0] logger.info( f"开始处理任务 {workflow_run_id},使用服务器 {server.name} ({server.http_url})" ) # 记录运行中任务 self.running_tasks[server.http_url] = { "workflow_run_id": workflow_run_id, "started_at": datetime.now(), "server_name": server.name, } # 启动任务执行 asyncio.create_task(self._execute_task(workflow_run_id, server)) logger.info( f"任务 {workflow_run_id} 已分配到服务器 {server.name},当前运行中任务数: {len(self.running_tasks)}" ) async def _get_available_servers(self) -> list[ComfyUIServerInfo]: """获取可用的服务器""" # 使用新的服务器管理器获取可用服务器 available_server = await server_manager.get_available_server() if available_server: return [available_server] return [] async def _execute_task(self, workflow_run_id: str, server: ComfyUIServerInfo): """执行任务""" try: # 从run_id创建ComfyRun实例并执行 comfy_run = await ComfyRun.from_run_id(workflow_run_id) if not comfy_run: raise Exception(f"找不到工作流运行记录: {workflow_run_id}") # 执行工作流(ComfyRun内部处理所有逻辑包括资源管理) await comfy_run.execute(server) except Exception as e: logger.error(f"执行任务 {workflow_run_id} 时出错: {e}") finally: # 清理运行状态 async with self.lock: if server.http_url in self.running_tasks: del self.running_tasks[server.http_url] # 全局队列管理器实例 # 从配置中读取监控间隔,默认为5秒 queue_manager = WorkflowQueueManager(monitor_interval=5)