ComfyUI-WorkflowPublisher/app/comfy/comfy_queue.py

235 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
from typing import Any, Optional
import aiohttp
from app.comfy.comfy_workflow import ComfyWorkflow
from app.comfy.comfy_run import ComfyRun
from app.config import Settings
from app.comfy.comfy_server import server_manager, ComfyUIServerInfo
settings = Settings()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 全局任务队列管理器
class WorkflowQueueManager:
def __init__(self, monitor_interval: int = 5):
self.running_tasks = {} # server_url -> task_info
self.pending_tasks = [] # 等待队列
self.lock = asyncio.Lock()
self._queue_monitor_task: Optional[asyncio.Task] = None
self._monitor_interval = monitor_interval # 监控间隔(秒)
self._last_processing_time = None # 上次处理队列的时间
async def start_queue_monitor(self):
"""启动队列监控任务"""
if self._queue_monitor_task is None or self._queue_monitor_task.done():
self._queue_monitor_task = asyncio.create_task(self._monitor_queue())
logger.info(f"队列监控任务已启动,监控间隔: {self._monitor_interval}")
async def stop_queue_monitor(self):
"""停止队列监控任务"""
if self._queue_monitor_task and not self._queue_monitor_task.done():
self._queue_monitor_task.cancel()
try:
await self._queue_monitor_task
except asyncio.CancelledError:
pass
logger.info("队列监控任务已停止")
async def add_task(
self,
workflow: ComfyWorkflow,
request_data: dict,
):
"""添加新任务到队列"""
async with self.lock:
# 创建ComfyRun实例并保存到数据库
comfy_run = await ComfyRun.create(workflow, request_data)
# 添加到待处理队列
self.pending_tasks.append(comfy_run.run_id)
logger.info(
f"任务 {comfy_run.run_id} 已添加到队列,当前队列长度: {len(self.pending_tasks)}"
)
return comfy_run.run_id
async def get_server_status(
self, server: ComfyUIServerInfo, session: aiohttp.ClientSession
) -> dict[str, Any]:
"""
检查单个ComfyUI服务器的详细状态。
返回一个包含可达性、队列状态和详细队列内容的字典。
"""
# [BUG修复] 确保初始字典结构与成功时的结构一致以满足Pydantic模型
status_info = {
"is_reachable": False,
"is_free": False,
"queue_details": {"running_count": 0, "pending_count": 0},
}
try:
queue_url = f"{server.http_url}/queue"
async with session.get(queue_url, timeout=60) as response:
response.raise_for_status()
queue_data = await response.json()
status_info["is_reachable"] = True
running_count = len(queue_data.get("queue_running", []))
pending_count = len(queue_data.get("queue_pending", []))
status_info["queue_details"] = {
"running_count": running_count,
"pending_count": pending_count,
}
status_info["is_free"] = running_count == 0 and pending_count == 0
except Exception as e:
# 当请求失败时,将返回上面定义的、结构正确的初始 status_info
logger.warning(f"无法检查服务器 {server.http_url} 的队列状态: {e}")
return status_info
async def _monitor_queue(self):
"""定期监控队列,当有可用服务器时自动处理"""
while True:
try:
await asyncio.sleep(self._monitor_interval)
# 每次定时检测触发时,打印当前状态
async with self.lock:
from datetime import datetime
pending_count = len(self.pending_tasks)
running_count = len(self.running_tasks)
current_time = datetime.now()
timestamp = current_time.strftime("%Y-%m-%d %H:%M:%S")
# 计算距离上次处理队列的时间
time_since_last_processing = ""
if self._last_processing_time:
time_diff = current_time - self._last_processing_time
minutes = int(time_diff.total_seconds() // 60)
seconds = int(time_diff.total_seconds() % 60)
time_since_last_processing = (
f"上次处理: {minutes}{seconds}秒前"
)
else:
time_since_last_processing = "上次处理: 从未"
# 格式化状态信息,提高可读性
status_info = (
f"⏰ 定时检测触发 [{timestamp}]\t"
f"待处理/运行中: {pending_count}/{running_count}\t"
f"监控间隔: {self._monitor_interval}\t"
f"🕐 {time_since_last_processing}"
)
logger.info(status_info)
if self.pending_tasks:
available_servers = await self._get_available_servers()
if available_servers:
logger.info(
f"监控发现 {len(available_servers)} 个可用服务器,开始处理队列"
)
# 使用create_task避免阻塞监控循环
asyncio.create_task(self._process_queue())
else:
# 只在有任务等待时记录日志,避免日志噪音
if len(self.pending_tasks) > 0:
logger.debug(
f"队列中有 {len(self.pending_tasks)} 个待处理任务,但无可用服务器"
)
else:
# 队列为空时,减少日志输出
logger.debug("队列为空,继续监控中...")
except asyncio.CancelledError:
logger.info("队列监控任务被取消")
break
except Exception as e:
logger.error(f"队列监控任务出错: {e}")
# 出错时稍微延长等待时间,避免频繁重试
await asyncio.sleep(self._monitor_interval * 2)
async def _process_queue(self):
"""处理队列中的任务"""
async with self.lock:
# 更新上次处理队列的时间
from datetime import datetime
self._last_processing_time = datetime.now()
if not self.pending_tasks:
logger.debug("队列为空,无需处理")
return
# 检查是否有空闲的服务器
available_servers = await self._get_available_servers()
if not available_servers:
logger.info(
f"队列中有 {len(self.pending_tasks)} 个待处理任务,但没有可用的服务器"
)
return
# 获取一个待处理任务
workflow_run_id = self.pending_tasks.pop(0)
server = available_servers[0]
logger.info(
f"开始处理任务 {workflow_run_id},使用服务器 {server.name} ({server.http_url})"
)
# 记录运行中任务
self.running_tasks[server.http_url] = {
"workflow_run_id": workflow_run_id,
"started_at": datetime.now(),
"server_name": server.name,
}
# 启动任务执行
asyncio.create_task(self._execute_task(workflow_run_id, server))
logger.info(
f"任务 {workflow_run_id} 已分配到服务器 {server.name},当前运行中任务数: {len(self.running_tasks)}"
)
async def _get_available_servers(self) -> list[ComfyUIServerInfo]:
"""获取可用的服务器"""
# 使用新的服务器管理器获取可用服务器
available_server = await server_manager.get_available_server()
if available_server:
return [available_server]
return []
async def _execute_task(self, workflow_run_id: str, server: ComfyUIServerInfo):
"""执行任务"""
try:
# 从run_id创建ComfyRun实例并执行
comfy_run = await ComfyRun.from_run_id(workflow_run_id)
if not comfy_run:
raise Exception(f"找不到工作流运行记录: {workflow_run_id}")
# 执行工作流ComfyRun内部处理所有逻辑包括资源管理
await comfy_run.execute(server)
except Exception as e:
logger.error(f"执行任务 {workflow_run_id} 时出错: {e}")
finally:
# 清理运行状态
async with self.lock:
if server.http_url in self.running_tasks:
del self.running_tasks[server.http_url]
# 全局队列管理器实例
# 从配置中读取监控间隔默认为5秒
queue_manager = WorkflowQueueManager(monitor_interval=5)