refactor: 移除不必要的监控和健康检查接口,简化代码结构,增强API的可读性和维护性

This commit is contained in:
iHeyTang 2025-08-19 15:07:31 +08:00
parent c0354b35d2
commit 0d8d9ffc4e
3 changed files with 86 additions and 366 deletions

View File

@ -1,13 +1,11 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, HttpUrl
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Dict, List, Optional, Any
import logging
import aiohttp
import asyncio
from workflow_service.comfy.comfy_server import (
server_manager,
ComfyUIServerInfo,
ServerStatus,
)
from workflow_service.comfy.comfy_server import server_manager
logger = logging.getLogger(__name__)
@ -84,36 +82,6 @@ async def unregister_server(request: ServerUnregisterRequest):
raise HTTPException(status_code=500, detail=f"注销失败: {str(e)}")
@router.get("/status/{server_name}", response_model=ServerStatusResponse)
async def get_server_status(server_name: str):
"""获取指定服务器状态"""
try:
server_info = await server_manager.get_server_status(server_name)
if server_info is None:
raise HTTPException(status_code=404, detail=f"服务器 {server_name} 不存在")
return ServerStatusResponse(
name=server_info.name,
http_url=server_info.http_url,
ws_url=server_info.ws_url,
status=server_info.status.value,
last_health_check=(
server_info.last_health_check.isoformat()
if server_info.last_health_check
else None
),
current_tasks=server_info.current_tasks,
max_concurrent_tasks=server_info.max_concurrent_tasks,
capabilities=server_info.capabilities,
metadata=server_info.metadata,
)
except Exception as e:
logger.error(f"获取服务器 {server_name} 状态时发生错误: {e}")
raise HTTPException(status_code=500, detail=f"获取状态失败: {str(e)}")
@router.get("/list", response_model=List[ServerStatusResponse])
async def list_all_servers():
"""获取所有服务器列表"""
@ -142,49 +110,3 @@ async def list_all_servers():
except Exception as e:
logger.error(f"获取服务器列表时发生错误: {e}")
raise HTTPException(status_code=500, detail=f"获取列表失败: {str(e)}")
@router.get("/health", response_model=Dict[str, Any])
async def get_system_health():
"""获取系统整体健康状态"""
try:
servers = await server_manager.get_all_servers()
total_servers = len(servers)
online_servers = len([s for s in servers if s.status == ServerStatus.ONLINE])
busy_servers = len([s for s in servers if s.status == ServerStatus.BUSY])
offline_servers = len([s for s in servers if s.status == ServerStatus.OFFLINE])
error_servers = len([s for s in servers if s.status == ServerStatus.ERROR])
total_tasks = sum(s.current_tasks for s in servers)
total_capacity = sum(s.max_concurrent_tasks for s in servers)
return {
"total_servers": total_servers,
"online_servers": online_servers,
"busy_servers": busy_servers,
"offline_servers": offline_servers,
"error_servers": error_servers,
"current_tasks": total_tasks,
"total_capacity": total_capacity,
"utilization_rate": (
(total_tasks / total_capacity * 100) if total_capacity > 0 else 0
),
}
except Exception as e:
logger.error(f"获取系统健康状态时发生错误: {e}")
raise HTTPException(status_code=500, detail=f"获取健康状态失败: {str(e)}")
@router.post("/force_health_check", response_model=Dict[str, str])
async def force_health_check(background_tasks: BackgroundTasks):
"""强制执行健康检查"""
try:
# 在后台执行健康检查
background_tasks.add_task(server_manager._perform_health_checks)
return {"message": "健康检查已启动", "status": "success"}
except Exception as e:
logger.error(f"启动健康检查时发生错误: {e}")
raise HTTPException(status_code=500, detail=f"启动健康检查失败: {str(e)}")

View File

@ -1,29 +1,8 @@
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
import psutil
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List
import json
# 设置日志
logger = logging.getLogger(__name__)
from workflow_service.config import settings
from workflow_service.database.api import (
get_workflow_runs_recent,
get_workflow_run_nodes,
)
from workflow_service.comfy.comfy_queue import WorkflowQueueManager
from workflow_service.comfy.comfy_server import server_manager
monitor_router = APIRouter(prefix="/monitor", tags=["监控"])
# 全局队列管理器实例
queue_manager = WorkflowQueueManager()
@monitor_router.get("/", response_class=HTMLResponse)
async def monitor_dashboard():
"""监控仪表板页面"""
@ -380,20 +359,20 @@ async def monitor_dashboard():
<script>
async function refreshData() {
try {
// 获取任务状态
const taskStats = await fetch('/monitor/task-stats').then(r => r.json());
document.getElementById('running-tasks').textContent = taskStats.running_tasks;
document.getElementById('pending-tasks').textContent = taskStats.pending_tasks;
document.getElementById('completed-tasks').textContent = taskStats.completed_tasks;
document.getElementById('total-tasks').textContent = taskStats.total_tasks_24h;
// 获取运行概览 - 更新为新的API路径
const runMetrics = await fetch('/api/run/metrics').then(r => r.json());
document.getElementById('running-tasks').textContent = runMetrics.running_tasks;
document.getElementById('pending-tasks').textContent = runMetrics.pending_tasks;
document.getElementById('completed-tasks').textContent = runMetrics.completed_tasks;
document.getElementById('total-tasks').textContent = runMetrics.total_tasks_24h;
// 获取服务器状态
const serverStatus = await fetch('/monitor/server-status').then(r => r.json());
// 获取服务器状态 - 使用现有的list接口
const serverStatus = await fetch('/api/comfy/list').then(r => r.json());
updateServerStatus(serverStatus);
// 获取最近任务
const recentTasks = await fetch('/monitor/recent-tasks').then(r => r.json());
updateRecentTasks(recentTasks);
// 获取运行列表 - 更新为新的API路径
const runs = await fetch('/api/run').then(r => r.json());
updateRecentTasks(runs);
} catch (error) {
console.error('刷新数据失败:', error);
@ -517,251 +496,3 @@ async def monitor_dashboard():
</html>
"""
return HTMLResponse(content=html_content)
@monitor_router.get("/system-stats")
async def get_system_stats() -> Dict[str, Any]:
"""获取系统统计信息"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage("/")
return {
"cpu_percent": round(cpu_percent, 1),
"memory_percent": round(memory.percent, 1),
"memory_used_gb": round(memory.used / (1024**3), 2),
"memory_total_gb": round(memory.total / (1024**3), 2),
"disk_percent": round(disk.percent, 1),
"disk_used_gb": round(disk.used / (1024**3), 2),
"disk_total_gb": round(disk.total / (1024**3), 2),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取系统统计信息失败: {str(e)}")
@monitor_router.get("/task-stats")
async def get_task_stats() -> Dict[str, Any]:
"""获取任务统计信息"""
try:
# 获取最近24小时的任务统计
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
recent_runs = await get_workflow_runs_recent(start_time, end_time)
# 统计各种状态的任务数量
status_counts = {}
for run in recent_runs:
status = run.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
return {
"running_tasks": status_counts.get("running", 0),
"pending_tasks": status_counts.get("pending", 0),
"completed_tasks": status_counts.get("completed", 0),
"failed_tasks": status_counts.get("failed", 0),
"total_tasks_24h": len(recent_runs),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取任务统计信息失败: {str(e)}")
@monitor_router.get("/server-status")
async def get_server_status() -> List[Dict[str, Any]]:
"""获取服务器状态信息"""
try:
import aiohttp
import asyncio
# 从 ComfyUIServerManager 获取所有注册的服务器
all_servers = await server_manager.get_all_servers()
if not all_servers:
logger.info("当前没有动态注册的服务器")
return []
async def check_server_status(server):
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(
f"{server.http_url}/system_stats"
) as response:
if response.status == 200:
return {
"name": server.name,
"http_url": server.http_url,
"ws_url": server.ws_url,
"status": "online",
"response_time": response.headers.get(
"X-Response-Time", "N/A"
),
"current_tasks": getattr(server, "current_tasks", 0),
"max_concurrent_tasks": getattr(
server, "max_concurrent_tasks", 1
),
"last_health_check": getattr(
server, "last_health_check", None
),
}
else:
return {
"name": server.name,
"http_url": server.http_url,
"ws_url": server.ws_url,
"status": "offline",
"error": f"HTTP {response.status}",
"current_tasks": getattr(server, "current_tasks", 0),
"max_concurrent_tasks": getattr(
server, "max_concurrent_tasks", 1
),
"last_health_check": getattr(
server, "last_health_check", None
),
}
except Exception as e:
return {
"name": server.name,
"http_url": server.http_url,
"ws_url": server.ws_url,
"status": "offline",
"error": str(e),
"current_tasks": getattr(server, "current_tasks", 0),
"max_concurrent_tasks": getattr(server, "max_concurrent_tasks", 1),
"last_health_check": getattr(server, "last_health_check", None),
}
# 并发检查所有服务器状态
tasks = [check_server_status(server) for server in all_servers]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉异常结果
valid_results = []
for result in results:
if isinstance(result, dict):
valid_results.append(result)
return valid_results
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取服务器状态失败: {str(e)}")
@monitor_router.get("/recent-tasks")
async def get_recent_tasks(limit: int = 10) -> List[Dict[str, Any]]:
"""获取最近的任务列表"""
try:
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
recent_runs = await get_workflow_runs_recent(start_time, end_time)
# 限制返回数量
limited_runs = recent_runs[:limit]
# 格式化返回数据
formatted_runs = []
for run in limited_runs:
formatted_runs.append(
{
"id": run.get("id"), # 使用数据库中的id字段
"workflow_name": run.get("workflow_name"),
"status": run.get("status", "unknown"),
"created_at": run.get("created_at"),
"updated_at": run.get("updated_at"),
"api_spec": run.get("api_spec"),
}
)
return formatted_runs
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取最近任务失败: {str(e)}")
@monitor_router.get("/health")
async def health_check() -> Dict[str, Any]:
"""健康检查端点"""
try:
# 检查数据库连接
from workflow_service.database.connection import get_db
db = get_db()
# 从 ComfyUIServerManager 获取服务器信息
all_servers = await server_manager.get_all_servers()
dynamic_servers_count = len(all_servers)
# 检查服务器健康状态
online_servers = [s for s in all_servers if s.status == "online"]
offline_servers = [s for s in all_servers if s.status != "online"]
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"database": "connected",
"servers": {
"total_dynamic": dynamic_servers_count,
"online": len(online_servers),
"offline": len(offline_servers),
},
"uptime": "N/A", # 可以添加启动时间跟踪
}
except Exception as e:
return {
"status": "unhealthy",
"timestamp": datetime.now().isoformat(),
"error": str(e),
}
@monitor_router.get("/queue/status")
async def get_queue_status():
"""获取队列状态"""
try:
status = await queue_manager.get_queue_status()
return {
"success": True,
"data": status,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"获取队列状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取队列状态失败: {str(e)}")
@monitor_router.post("/queue/trigger")
async def trigger_queue_processing():
"""手动触发队列处理"""
try:
await queue_manager.trigger_queue_processing()
return {
"success": True,
"message": "队列处理已手动触发",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"手动触发队列处理失败: {e}")
raise HTTPException(status_code=500, detail=f"手动触发队列处理失败: {str(e)}")
@monitor_router.post("/queue/interval")
async def set_monitor_interval(interval: int):
"""设置队列监控间隔"""
try:
if interval < 1:
raise HTTPException(status_code=400, detail="监控间隔不能小于1秒")
await queue_manager.set_monitor_interval(interval)
return {
"success": True,
"message": f"队列监控间隔已设置为 {interval}",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"设置监控间隔失败: {e}")
raise HTTPException(status_code=500, detail=f"设置监控间隔失败: {str(e)}")

View File

@ -1,12 +1,13 @@
import json
from typing import Dict, Optional
from typing import Dict, Optional, List, Any
from datetime import datetime, timedelta
from fastapi import APIRouter, Body, HTTPException
from fastapi.responses import JSONResponse
from workflow_service.comfy import comfy_workflow
from workflow_service.comfy.comfy_queue import queue_manager
from workflow_service.database.api import get_workflow
from workflow_service.database.api import get_workflow, get_workflow_runs_recent
run_router = APIRouter(
prefix="/api/run",
@ -62,6 +63,72 @@ async def run_workflow(
raise HTTPException(status_code=500, detail=f"提交工作流失败: {str(e)}")
@run_router.get("")
async def get_runs(
limit: int = 10, status: Optional[str] = None
) -> List[Dict[str, Any]]:
"""获取运行列表,支持分页和状态过滤"""
try:
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
recent_runs = await get_workflow_runs_recent(start_time, end_time)
# 如果指定了状态过滤
if status:
recent_runs = [run for run in recent_runs if run.get("status") == status]
# 限制返回数量
limited_runs = recent_runs[:limit]
# 格式化返回数据
formatted_runs = []
for run in limited_runs:
formatted_runs.append(
{
"id": run.get("id"), # 使用数据库中的id字段
"workflow_name": run.get("workflow_name"),
"status": run.get("status", "unknown"),
"created_at": run.get("created_at"),
"updated_at": run.get("updated_at"),
"api_spec": run.get("api_spec"),
}
)
return formatted_runs
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取运行列表失败: {str(e)}")
@run_router.get("/metrics")
async def get_run_metrics() -> Dict[str, Any]:
"""获取运行概览统计信息"""
try:
# 获取最近24小时的任务统计
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
recent_runs = await get_workflow_runs_recent(start_time, end_time)
# 统计各种状态的任务数量
status_counts = {}
for run in recent_runs:
status = run.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
return {
"running_tasks": status_counts.get("running", 0),
"pending_tasks": status_counts.get("pending", 0),
"completed_tasks": status_counts.get("completed", 0),
"failed_tasks": status_counts.get("failed", 0),
"total_tasks_24h": len(recent_runs),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取运行概览失败: {str(e)}")
@run_router.get("/{workflow_run_id}")
async def get_run_status(workflow_run_id: str):
"""