235 lines
9.3 KiB
Python
235 lines
9.3 KiB
Python
import asyncio
|
||
import logging
|
||
from typing import Any, Optional
|
||
|
||
import aiohttp
|
||
|
||
from app.comfy.comfy_workflow import ComfyWorkflow
|
||
from app.comfy.comfy_run import ComfyRun
|
||
from app.config import Settings
|
||
from app.comfy.comfy_server import server_manager, ComfyUIServerInfo
|
||
|
||
settings = Settings()
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# 全局任务队列管理器
|
||
class WorkflowQueueManager:
|
||
def __init__(self, monitor_interval: int = 5):
|
||
self.running_tasks = {} # server_url -> task_info
|
||
self.pending_tasks = [] # 等待队列
|
||
self.lock = asyncio.Lock()
|
||
self._queue_monitor_task: Optional[asyncio.Task] = None
|
||
self._monitor_interval = monitor_interval # 监控间隔(秒)
|
||
self._last_processing_time = None # 上次处理队列的时间
|
||
|
||
async def start_queue_monitor(self):
|
||
"""启动队列监控任务"""
|
||
if self._queue_monitor_task is None or self._queue_monitor_task.done():
|
||
self._queue_monitor_task = asyncio.create_task(self._monitor_queue())
|
||
logger.info(f"队列监控任务已启动,监控间隔: {self._monitor_interval}秒")
|
||
|
||
async def stop_queue_monitor(self):
|
||
"""停止队列监控任务"""
|
||
if self._queue_monitor_task and not self._queue_monitor_task.done():
|
||
self._queue_monitor_task.cancel()
|
||
try:
|
||
await self._queue_monitor_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("队列监控任务已停止")
|
||
|
||
async def add_task(
|
||
self,
|
||
workflow: ComfyWorkflow,
|
||
request_data: dict,
|
||
):
|
||
"""添加新任务到队列"""
|
||
async with self.lock:
|
||
# 创建ComfyRun实例并保存到数据库
|
||
comfy_run = await ComfyRun.create(workflow, request_data)
|
||
|
||
# 添加到待处理队列
|
||
self.pending_tasks.append(comfy_run.run_id)
|
||
logger.info(
|
||
f"任务 {comfy_run.run_id} 已添加到队列,当前队列长度: {len(self.pending_tasks)}"
|
||
)
|
||
|
||
return comfy_run.run_id
|
||
|
||
async def get_server_status(
|
||
self, server: ComfyUIServerInfo, session: aiohttp.ClientSession
|
||
) -> dict[str, Any]:
|
||
"""
|
||
检查单个ComfyUI服务器的详细状态。
|
||
返回一个包含可达性、队列状态和详细队列内容的字典。
|
||
"""
|
||
# [BUG修复] 确保初始字典结构与成功时的结构一致,以满足Pydantic模型
|
||
status_info = {
|
||
"is_reachable": False,
|
||
"is_free": False,
|
||
"queue_details": {"running_count": 0, "pending_count": 0},
|
||
}
|
||
try:
|
||
queue_url = f"{server.http_url}/queue"
|
||
async with session.get(queue_url, timeout=60) as response:
|
||
response.raise_for_status()
|
||
queue_data = await response.json()
|
||
|
||
status_info["is_reachable"] = True
|
||
|
||
running_count = len(queue_data.get("queue_running", []))
|
||
pending_count = len(queue_data.get("queue_pending", []))
|
||
|
||
status_info["queue_details"] = {
|
||
"running_count": running_count,
|
||
"pending_count": pending_count,
|
||
}
|
||
|
||
status_info["is_free"] = running_count == 0 and pending_count == 0
|
||
|
||
except Exception as e:
|
||
# 当请求失败时,将返回上面定义的、结构正确的初始 status_info
|
||
logger.warning(f"无法检查服务器 {server.http_url} 的队列状态: {e}")
|
||
|
||
return status_info
|
||
|
||
async def _monitor_queue(self):
|
||
"""定期监控队列,当有可用服务器时自动处理"""
|
||
while True:
|
||
try:
|
||
await asyncio.sleep(self._monitor_interval)
|
||
|
||
# 每次定时检测触发时,打印当前状态
|
||
async with self.lock:
|
||
from datetime import datetime
|
||
|
||
pending_count = len(self.pending_tasks)
|
||
running_count = len(self.running_tasks)
|
||
current_time = datetime.now()
|
||
timestamp = current_time.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 计算距离上次处理队列的时间
|
||
time_since_last_processing = ""
|
||
if self._last_processing_time:
|
||
time_diff = current_time - self._last_processing_time
|
||
minutes = int(time_diff.total_seconds() // 60)
|
||
seconds = int(time_diff.total_seconds() % 60)
|
||
time_since_last_processing = (
|
||
f"上次处理: {minutes}分{seconds}秒前"
|
||
)
|
||
else:
|
||
time_since_last_processing = "上次处理: 从未"
|
||
|
||
# 格式化状态信息,提高可读性
|
||
status_info = (
|
||
f"⏰ 定时检测触发 [{timestamp}]\t"
|
||
f"待处理/运行中: {pending_count}/{running_count}\t"
|
||
f"监控间隔: {self._monitor_interval} 秒\t"
|
||
f"🕐 {time_since_last_processing}"
|
||
)
|
||
logger.info(status_info)
|
||
|
||
if self.pending_tasks:
|
||
available_servers = await self._get_available_servers()
|
||
if available_servers:
|
||
logger.info(
|
||
f"监控发现 {len(available_servers)} 个可用服务器,开始处理队列"
|
||
)
|
||
# 使用create_task避免阻塞监控循环
|
||
asyncio.create_task(self._process_queue())
|
||
else:
|
||
# 只在有任务等待时记录日志,避免日志噪音
|
||
if len(self.pending_tasks) > 0:
|
||
logger.debug(
|
||
f"队列中有 {len(self.pending_tasks)} 个待处理任务,但无可用服务器"
|
||
)
|
||
else:
|
||
# 队列为空时,减少日志输出
|
||
logger.debug("队列为空,继续监控中...")
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("队列监控任务被取消")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"队列监控任务出错: {e}")
|
||
# 出错时稍微延长等待时间,避免频繁重试
|
||
await asyncio.sleep(self._monitor_interval * 2)
|
||
|
||
async def _process_queue(self):
|
||
"""处理队列中的任务"""
|
||
async with self.lock:
|
||
# 更新上次处理队列的时间
|
||
from datetime import datetime
|
||
|
||
self._last_processing_time = datetime.now()
|
||
|
||
if not self.pending_tasks:
|
||
logger.debug("队列为空,无需处理")
|
||
return
|
||
|
||
# 检查是否有空闲的服务器
|
||
available_servers = await self._get_available_servers()
|
||
if not available_servers:
|
||
logger.info(
|
||
f"队列中有 {len(self.pending_tasks)} 个待处理任务,但没有可用的服务器"
|
||
)
|
||
return
|
||
|
||
# 获取一个待处理任务
|
||
workflow_run_id = self.pending_tasks.pop(0)
|
||
server = available_servers[0]
|
||
|
||
logger.info(
|
||
f"开始处理任务 {workflow_run_id},使用服务器 {server.name} ({server.http_url})"
|
||
)
|
||
|
||
# 记录运行中任务
|
||
self.running_tasks[server.http_url] = {
|
||
"workflow_run_id": workflow_run_id,
|
||
"started_at": datetime.now(),
|
||
"server_name": server.name,
|
||
}
|
||
|
||
# 启动任务执行
|
||
asyncio.create_task(self._execute_task(workflow_run_id, server))
|
||
|
||
logger.info(
|
||
f"任务 {workflow_run_id} 已分配到服务器 {server.name},当前运行中任务数: {len(self.running_tasks)}"
|
||
)
|
||
|
||
async def _get_available_servers(self) -> list[ComfyUIServerInfo]:
|
||
"""获取可用的服务器"""
|
||
# 使用新的服务器管理器获取可用服务器
|
||
available_server = await server_manager.get_available_server()
|
||
if available_server:
|
||
return [available_server]
|
||
return []
|
||
|
||
async def _execute_task(self, workflow_run_id: str, server: ComfyUIServerInfo):
|
||
"""执行任务"""
|
||
try:
|
||
# 从run_id创建ComfyRun实例并执行
|
||
comfy_run = await ComfyRun.from_run_id(workflow_run_id)
|
||
if not comfy_run:
|
||
raise Exception(f"找不到工作流运行记录: {workflow_run_id}")
|
||
|
||
# 执行工作流(ComfyRun内部处理所有逻辑包括资源管理)
|
||
await comfy_run.execute(server)
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行任务 {workflow_run_id} 时出错: {e}")
|
||
|
||
finally:
|
||
# 清理运行状态
|
||
async with self.lock:
|
||
if server.http_url in self.running_tasks:
|
||
del self.running_tasks[server.http_url]
|
||
|
||
|
||
# 全局队列管理器实例
|
||
# 从配置中读取监控间隔,默认为5秒
|
||
queue_manager = WorkflowQueueManager(monitor_interval=5)
|