from fastapi import APIRouter, HTTPException
from fastapi.responses import HTMLResponse
import psutil
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List
import json
# 设置日志
logger = logging.getLogger(__name__)
from workflow_service.config import settings
from workflow_service.database.api import get_workflow_runs_recent, get_workflow_run_nodes
from workflow_service.comfy.comfy_queue import WorkflowQueueManager
from workflow_service.comfy.comfy_server import server_manager
monitor_router = APIRouter(prefix="/monitor", tags=["监控"])
# 全局队列管理器实例
queue_manager = WorkflowQueueManager()
@monitor_router.get("/", response_class=HTMLResponse)
async def monitor_dashboard():
"""监控仪表板页面"""
html_content = """
ComfyUI 工作流服务监控
"""
return HTMLResponse(content=html_content)
@monitor_router.get("/system-stats")
async def get_system_stats() -> Dict[str, Any]:
"""获取系统统计信息"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu_percent": round(cpu_percent, 1),
"memory_percent": round(memory.percent, 1),
"memory_used_gb": round(memory.used / (1024**3), 2),
"memory_total_gb": round(memory.total / (1024**3), 2),
"disk_percent": round(disk.percent, 1),
"disk_used_gb": round(disk.used / (1024**3), 2),
"disk_total_gb": round(disk.total / (1024**3), 2),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取系统统计信息失败: {str(e)}")
@monitor_router.get("/task-stats")
async def get_task_stats() -> Dict[str, Any]:
"""获取任务统计信息"""
try:
# 获取最近24小时的任务统计
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
recent_runs = await get_workflow_runs_recent(start_time, end_time)
# 统计各种状态的任务数量
status_counts = {}
for run in recent_runs:
status = run.get('status', 'unknown')
status_counts[status] = status_counts.get(status, 0) + 1
return {
"running_tasks": status_counts.get('running', 0),
"pending_tasks": status_counts.get('pending', 0),
"completed_tasks": status_counts.get('completed', 0),
"failed_tasks": status_counts.get('failed', 0),
"total_tasks_24h": len(recent_runs),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取任务统计信息失败: {str(e)}")
@monitor_router.get("/server-status")
async def get_server_status() -> List[Dict[str, Any]]:
"""获取服务器状态信息"""
try:
import aiohttp
import asyncio
# 从 ComfyUIServerManager 获取所有注册的服务器
all_servers = await server_manager.get_all_servers()
if not all_servers:
logger.info("当前没有动态注册的服务器")
return []
async def check_server_status(server):
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(f"{server.http_url}/system_stats") as response:
if response.status == 200:
return {
"http_url": server.http_url,
"ws_url": server.ws_url,
"status": "online",
"response_time": response.headers.get('X-Response-Time', 'N/A'),
"current_tasks": getattr(server, 'current_tasks', 0),
"max_concurrent_tasks": getattr(server, 'max_concurrent_tasks', 1),
"last_heartbeat": getattr(server, 'last_heartbeat', None),
"last_health_check": getattr(server, 'last_health_check', None)
}
else:
return {
"http_url": server.http_url,
"ws_url": server.ws_url,
"status": "offline",
"error": f"HTTP {response.status}",
"current_tasks": getattr(server, 'current_tasks', 0),
"max_concurrent_tasks": getattr(server, 'max_concurrent_tasks', 1),
"last_heartbeat": getattr(server, 'last_heartbeat', None),
"last_health_check": getattr(server, 'last_health_check', None)
}
except Exception as e:
return {
"http_url": server.http_url,
"ws_url": server.ws_url,
"status": "offline",
"error": str(e),
"current_tasks": getattr(server, 'current_tasks', 0),
"max_concurrent_tasks": getattr(server, 'max_concurrent_tasks', 1),
"last_heartbeat": getattr(server, 'last_heartbeat', None),
"last_health_check": getattr(server, 'last_health_check', None)
}
# 并发检查所有服务器状态
tasks = [check_server_status(server) for server in all_servers]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉异常结果
valid_results = []
for result in results:
if isinstance(result, dict):
valid_results.append(result)
return valid_results
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取服务器状态失败: {str(e)}")
@monitor_router.get("/recent-tasks")
async def get_recent_tasks(limit: int = 10) -> List[Dict[str, Any]]:
"""获取最近的任务列表"""
try:
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
recent_runs = await get_workflow_runs_recent(start_time, end_time)
# 限制返回数量
limited_runs = recent_runs[:limit]
# 格式化返回数据
formatted_runs = []
for run in limited_runs:
formatted_runs.append({
"id": run.get('id'), # 使用数据库中的id字段
"workflow_name": run.get('workflow_name'),
"status": run.get('status', 'unknown'),
"created_at": run.get('created_at'),
"updated_at": run.get('updated_at'),
"api_spec": run.get('api_spec')
})
return formatted_runs
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取最近任务失败: {str(e)}")
@monitor_router.get("/health")
async def health_check() -> Dict[str, Any]:
"""健康检查端点"""
try:
# 检查数据库连接
from workflow_service.database.connection import get_db
db = get_db()
# 从 ComfyUIServerManager 获取服务器信息
all_servers = await server_manager.get_all_servers()
dynamic_servers_count = len(all_servers)
# 检查服务器健康状态
online_servers = [s for s in all_servers if s.status == 'online']
offline_servers = [s for s in all_servers if s.status != 'online']
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"database": "connected",
"servers": {
"total_dynamic": dynamic_servers_count,
"online": len(online_servers),
"offline": len(offline_servers)
},
"uptime": "N/A" # 可以添加启动时间跟踪
}
except Exception as e:
return {
"status": "unhealthy",
"timestamp": datetime.now().isoformat(),
"error": str(e)
}
@monitor_router.get("/queue/status")
async def get_queue_status():
"""获取队列状态"""
try:
status = await queue_manager.get_queue_status()
return {
"success": True,
"data": status,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"获取队列状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取队列状态失败: {str(e)}")
@monitor_router.post("/queue/trigger")
async def trigger_queue_processing():
"""手动触发队列处理"""
try:
await queue_manager.trigger_queue_processing()
return {
"success": True,
"message": "队列处理已手动触发",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"手动触发队列处理失败: {e}")
raise HTTPException(status_code=500, detail=f"手动触发队列处理失败: {str(e)}")
@monitor_router.post("/queue/interval")
async def set_monitor_interval(interval: int):
"""设置队列监控间隔"""
try:
if interval < 1:
raise HTTPException(status_code=400, detail="监控间隔不能小于1秒")
await queue_manager.set_monitor_interval(interval)
return {
"success": True,
"message": f"队列监控间隔已设置为 {interval} 秒",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"设置监控间隔失败: {e}")
raise HTTPException(status_code=500, detail=f"设置监控间隔失败: {str(e)}")