This commit is contained in:
imeepos 2025-08-08 13:19:18 +08:00
parent 050c706ab1
commit 0899996963
9 changed files with 302 additions and 39 deletions

View File

@ -429,6 +429,41 @@ impl AppState {
Ok(())
}
/// 启动监控服务
pub async fn start_monitoring_service(&self) -> anyhow::Result<()> {
use crate::business::services::workflow_monitoring_service::{WorkflowMonitoringService, MonitoringConfig};
info!("正在启动监控服务...");
// 获取必要的仓库
let execution_record_repo = {
let repo_guard = self.workflow_execution_record_repository.lock().unwrap();
repo_guard.as_ref()
.ok_or_else(|| anyhow::anyhow!("工作流执行记录仓库未初始化"))?
.clone()
};
let execution_environment_repo = {
let repo_guard = self.workflow_execution_environment_repository.lock().unwrap();
repo_guard.as_ref()
.ok_or_else(|| anyhow::anyhow!("工作流执行环境仓库未初始化"))?
.clone()
};
// 创建监控服务
let monitoring_service = WorkflowMonitoringService::new(
execution_record_repo,
execution_environment_repo,
Some(MonitoringConfig::default()),
);
// 启动监控服务
monitoring_service.start_monitoring().await?;
info!("监控服务启动成功");
Ok(())
}
}
impl Default for AppState {

View File

@ -11,7 +11,7 @@ use crate::infrastructure::database::Database;
use anyhow::{Result, anyhow};
use serde_json::Value;
use std::sync::Arc;
use tracing::{info, debug};
use tracing::{info, debug, warn};
/// 通用工作流执行服务
///
@ -100,9 +100,13 @@ impl UniversalWorkflowService {
request: ExecuteWorkflowRequest,
) -> Result<ExecuteWorkflowResponse> {
info!("开始执行工作流: {}", request.workflow_identifier);
info!("执行请求详情: identifier='{}', version={:?}, environment_id={:?}",
request.workflow_identifier, request.version, request.environment_id);
// 1. 获取工作流模板
let template = self.get_workflow_template(&request.workflow_identifier, request.version.as_deref()).await?;
info!("成功获取工作流模板: ID={}, 名称='{}', 类型={:?}",
template.id.unwrap_or(-1), template.name, template.workflow_type);
// 2. 验证输入数据
template.validate_input(&request.input_data).map_err(|e| anyhow!(e))?;
@ -175,30 +179,58 @@ impl UniversalWorkflowService {
identifier: &str,
version: Option<&str>,
) -> Result<WorkflowTemplate> {
info!("正在查找工作流模板: identifier='{}', version={:?}", identifier, version);
// 首先尝试按ID查询如果identifier是数字
if let Ok(id) = identifier.parse::<i64>() {
info!("尝试按ID查询工作流模板: {}", id);
if let Some(template) = self.workflow_template_repository.find_by_id(id)? {
info!("成功找到工作流模板: ID={}, 名称='{}', 类型={:?}",
template.id.unwrap_or(-1), template.name, template.workflow_type);
return Ok(template);
} else {
warn!("未找到ID为 {} 的工作流模板", id);
}
} else {
info!("identifier '{}' 不是有效的数字ID尝试按base_name查询", identifier);
}
// 按base_name查询
if let Some(version) = version {
// 查询指定版本
info!("按base_name和版本查询: base_name='{}', version='{}'", identifier, version);
let templates = self.workflow_template_repository.find_by_base_name(identifier)?;
info!("找到 {} 个base_name为 '{}' 的模板", templates.len(), identifier);
for template in templates {
if template.version == version {
info!("找到匹配版本的模板: ID={}, 名称='{}'",
template.id.unwrap_or(-1), template.name);
return Ok(template);
}
}
return Err(anyhow!("未找到工作流模板: {} 版本 {}", identifier, version));
} else {
// 查询最新版本
info!("按base_name查询最新版本: '{}'", identifier);
if let Some(template) = self.workflow_template_repository.get_latest_version(identifier)? {
info!("找到最新版本模板: ID={}, 名称='{}', 版本='{}'",
template.id.unwrap_or(-1), template.name, template.version);
return Ok(template);
} else {
warn!("未找到base_name为 '{}' 的模板", identifier);
}
}
// 如果都没找到,列出所有可用的模板进行调试
let all_templates = self.workflow_template_repository.find_all(None)?;
warn!("未找到工作流模板 '{}', 数据库中共有 {} 个模板:", identifier, all_templates.len());
for template in &all_templates {
warn!(" 模板: ID={}, 名称='{}', base_name='{}', 版本='{}', 类型={:?}",
template.id.unwrap_or(-1), template.name, template.base_name,
template.version, template.workflow_type);
}
Err(anyhow!("未找到工作流模板: {}", identifier))
}
@ -223,11 +255,29 @@ impl UniversalWorkflowService {
// 自动选择最佳环境
// 获取支持当前工作流类型的可用环境,按优先级排序
let workflow_type_str = serde_json::to_string(&template.workflow_type)?;
let workflow_type_str = template.workflow_type.to_string();
info!("正在查找支持工作流类型 '{}' 的执行环境", workflow_type_str);
let available_environments = self.workflow_execution_environment_repository
.find_available_environments(Some(&workflow_type_str))?;
info!("找到 {} 个支持工作流类型 '{}' 的可用执行环境", available_environments.len(), workflow_type_str);
if available_environments.is_empty() {
// 获取所有执行环境进行调试
let all_environments = self.workflow_execution_environment_repository.find_all(None)?;
warn!("没有找到支持工作流类型 '{}' 的执行环境", workflow_type_str);
warn!("数据库中共有 {} 个执行环境:", all_environments.len());
for env in &all_environments {
warn!(" 环境: {} (ID: {:?})", env.name, env.id);
warn!(" 类型: {:?}", env.environment_type);
warn!(" 是否活跃: {}", env.is_active);
warn!(" 是否可用: {}", env.is_available);
warn!(" 健康状态: {:?}", env.health_status);
warn!(" 支持的工作流类型: {:?}", env.supported_workflow_types);
}
return Err(anyhow!("没有可用的执行环境支持工作流类型: {:?}", template.workflow_type));
}

View File

@ -6,7 +6,7 @@ use regex::Regex;
use crate::data::models::outfit_photo_generation::{
ExecuteWorkflowRequest, ExecuteWorkflowResponse, NodeMapping, WorkflowNodeReplacement
};
use crate::data::models::workflow_template::WorkflowTemplate;
use crate::data::models::workflow_template::{WorkflowTemplate, WorkflowType};
use crate::data::models::workflow_execution_record::{
CreateExecutionRecordRequest, WorkflowExecutionRecord
};
@ -301,7 +301,7 @@ mod tests {
name: "测试工作流".to_string(),
base_name: "test_workflow".to_string(),
version: "1.0".to_string(),
workflow_type: crate::data::models::workflow_template::WorkflowType::Custom,
workflow_type: WorkflowType::Custom("test_workflow".to_string()),
description: Some("测试用工作流模板".to_string()),
comfyui_workflow_json: json!({
"1": {

View File

@ -8,21 +8,12 @@ use std::collections::HashMap;
pub enum EnvironmentType {
/// 本地ComfyUI
LocalComfyui,
/// Modal云端
ModalCloud,
/// RunPod云端
RunpodCloud,
/// 自定义环境
Custom,
}
impl std::fmt::Display for EnvironmentType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EnvironmentType::LocalComfyui => write!(f, "local_comfyui"),
EnvironmentType::ModalCloud => write!(f, "modal_cloud"),
EnvironmentType::RunpodCloud => write!(f, "runpod_cloud"),
EnvironmentType::Custom => write!(f, "custom"),
}
}
}

View File

@ -8,7 +8,7 @@ use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use rusqlite::Row;
use std::sync::Arc;
use tracing::{debug, info, error};
use tracing::{debug, info, error, warn};
/// 工作流执行环境数据仓库
/// 遵循 Tauri 开发规范的仓库模式设计
@ -153,22 +153,70 @@ impl WorkflowExecutionEnvironmentRepository {
/// 获取可用的执行环境(按优先级排序)
pub fn find_available_environments(&self, workflow_type: Option<&str>) -> Result<Vec<WorkflowExecutionEnvironment>> {
use tracing::{info, warn, debug};
info!("查找可用的执行环境,工作流类型: {:?}", workflow_type);
let mut filter = ExecutionEnvironmentFilter {
is_active: Some(true),
is_available: Some(true),
health_status: Some(HealthStatus::Healthy),
// 允许健康状态为 Healthy 或 Unknown 的环境Unknown 通常表示还未进行健康检查)
// 只排除明确不健康(Unhealthy)或维护中(Maintenance)的环境
health_status: None, // 我们将在后续手动过滤
..Default::default()
};
if let Some(wf_type) = workflow_type {
info!("设置工作流类型过滤器: '{}'", wf_type);
filter.supported_workflow_type = Some(wf_type.to_string());
}
debug!("执行环境查询过滤器: {:?}", filter);
let mut environments = self.find_all(Some(&filter))?;
info!("找到 {} 个匹配过滤条件的执行环境", environments.len());
// 手动过滤健康状态:只排除明确不健康或维护中的环境
// 允许 Healthy 和 Unknown 状态的环境通过
environments.retain(|env| {
match env.health_status {
HealthStatus::Unhealthy | HealthStatus::Maintenance => {
debug!("排除不健康的环境: {} (状态: {:?})", env.name, env.health_status);
false
}
HealthStatus::Healthy | HealthStatus::Unknown => {
debug!("保留可用的环境: {} (状态: {:?})", env.name, env.health_status);
true
}
}
});
info!("健康状态过滤后剩余 {} 个执行环境", environments.len());
// 如果没有找到环境,尝试不带工作流类型过滤器查询所有环境进行调试
if environments.is_empty() && workflow_type.is_some() {
warn!("没有找到支持工作流类型 '{}' 的环境,查询所有环境进行调试", workflow_type.unwrap());
let all_environments = self.find_all(None)?;
warn!("数据库中共有 {} 个执行环境:", all_environments.len());
for env in &all_environments {
warn!(" 环境: {} (ID: {:?})", env.name, env.id);
warn!(" 是否活跃: {}", env.is_active);
warn!(" 是否可用: {}", env.is_available);
warn!(" 健康状态: {:?}", env.health_status);
warn!(" 支持的工作流类型: {:?}", env.supported_workflow_types);
// 检查是否包含目标工作流类型
let contains_type = env.supported_workflow_types.iter()
.any(|t| t == workflow_type.unwrap());
warn!(" 是否包含目标类型 '{}': {}", workflow_type.unwrap(), contains_type);
}
}
// 按优先级降序排序
environments.sort_by(|a, b| b.priority.cmp(&a.priority));
info!("返回 {} 个按优先级排序的可用执行环境", environments.len());
Ok(environments)
}
@ -453,13 +501,40 @@ impl WorkflowExecutionEnvironmentRepository {
/// 检查ComfyUI环境健康状态
async fn check_comfyui_health(&self, base_url: &str) -> HealthStatus {
// 实现ComfyUI特定的健康检查
// 这里可以调用ComfyUI的API端点
match reqwest::get(&format!("{}/api/servers/status", base_url)).await {
Ok(response) if response.status().is_success() => HealthStatus::Healthy,
Ok(_) => HealthStatus::Unhealthy,
Err(_) => HealthStatus::Unhealthy,
info!("检查ComfyUI环境健康状态: {}", base_url);
// 尝试多个可能的健康检查端点
let endpoints = vec![
"/system_stats", // 标准ComfyUI端点
"/", // API根端点
"/api/servers/status" // 备用端点
];
for endpoint in endpoints {
let url = format!("{}{}", base_url.trim_end_matches('/'), endpoint);
debug!("尝试健康检查端点: {}", url);
match reqwest::Client::new()
.get(&url)
.timeout(std::time::Duration::from_secs(10))
.send()
.await
{
Ok(response) if response.status().is_success() => {
info!("ComfyUI环境健康检查成功: {} (端点: {})", base_url, endpoint);
return HealthStatus::Healthy;
}
Ok(response) => {
debug!("端点 {} 响应状态: {}", endpoint, response.status());
}
Err(e) => {
debug!("端点 {} 连接失败: {}", endpoint, e);
}
}
}
warn!("ComfyUI环境健康检查失败: {}", base_url);
HealthStatus::Unhealthy
}
/// 检查通用环境健康状态
@ -513,6 +588,7 @@ impl WorkflowExecutionEnvironmentRepository {
if let Some(supported_workflow_type) = &filter.supported_workflow_type {
where_clauses.push("supported_workflow_types LIKE ?");
let search_pattern = format!("%\"{}\"%%", supported_workflow_type);
tracing::debug!("工作流类型查询模式: '{}' -> LIKE '{}'", supported_workflow_type, search_pattern);
params.push(Box::new(search_pattern));
}

View File

@ -177,8 +177,6 @@ impl TolerantJsonParser {
return Err(anyhow!("Text too long: {} bytes", text.len()));
}
info!("Starting tolerant JSON parsing, text length: {}", text.len());
// 首先尝试标准JSON解析
match serde_json::from_str::<Value>(text) {
Ok(result) => {
@ -193,7 +191,6 @@ impl TolerantJsonParser {
parse_time_ms: start_time.elapsed().as_millis() as u64,
recovery_strategies_used: recovery_strategies,
};
info!("Standard JSON parsing successful in {}ms", stats.parse_time_ms);
return Ok((processed_result, stats));
}
Err(_) => {
@ -943,7 +940,6 @@ impl TolerantJsonParser {
// 尝试解析YAML
match serde_yaml::from_str::<Value>(trimmed) {
Ok(yaml_value) => {
info!("Successfully parsed YAML content");
Ok(yaml_value)
}
Err(e) => {

View File

@ -16,7 +16,7 @@ use app_state::AppState;
use presentation::commands;
use tauri::Manager;
use infrastructure::logging;
use tracing::info;
use tracing::{info, error};
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
@ -608,7 +608,9 @@ pub fn run() {
commands::workflow_commands::create_execution_environment,
commands::workflow_commands::update_execution_environment,
commands::workflow_commands::delete_execution_environment,
commands::workflow_commands::health_check_execution_environment
commands::workflow_commands::health_check_execution_environment,
commands::workflow_commands::debug_check_workflow_templates,
commands::workflow_commands::debug_check_execution_environments
])
.setup(|app| {
// 初始化日志系统
@ -640,6 +642,13 @@ pub fn run() {
let _ = event_bus_manager.publish_app_started().await;
});
// 启动监控服务(同步方式)
if let Err(e) = tauri::async_runtime::block_on(state.start_monitoring_service()) {
error!("启动监控服务失败: {}", e);
} else {
info!("监控服务启动成功");
}
// 记录启动性能指标
{
let mut monitor = state.performance_monitor.lock().unwrap();

View File

@ -127,6 +127,7 @@ pub async fn execute_workflow_with_mapping(
state: State<'_, AppState>,
) -> Result<crate::data::models::outfit_photo_generation::ExecuteWorkflowResponse, String> {
info!("执行基于映射的工作流模板ID: {}", request.workflow_template_id);
info!("请求详情: {:?}", request);
// 获取工作流模板仓库
let template_repo = {
@ -174,7 +175,7 @@ pub async fn execute_workflow_with_mapping(
// 转换请求格式到新的统一格式
let unified_request = crate::business::services::universal_workflow_service::ExecuteWorkflowRequest {
workflow_identifier: format!("template_{}", request.workflow_template_id),
workflow_identifier: request.workflow_template_id.to_string(),
version: None,
input_data: request.input_data,
environment_id: request.execution_environment_id,
@ -659,6 +660,82 @@ pub async fn validate_workflow_template(
Ok(warnings)
}
/// 调试:检查数据库中的工作流模板数据
#[tauri::command]
pub async fn debug_check_workflow_templates(
state: State<'_, AppState>,
) -> Result<serde_json::Value, String> {
info!("调试:检查数据库中的工作流模板数据");
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 查询所有工作流模板
let templates = repo.find_all(None)
.map_err(|e| format!("查询工作流模板失败: {}", e))?;
// 构建调试信息
let debug_info = serde_json::json!({
"total_templates": templates.len(),
"templates": templates.iter().map(|t| serde_json::json!({
"id": t.id,
"name": t.name,
"base_name": t.base_name,
"version": t.version,
"type": t.workflow_type,
"is_active": t.is_active,
"is_published": t.is_published,
"created_at": t.created_at
})).collect::<Vec<_>>()
});
info!("数据库中共有 {} 个工作流模板", templates.len());
Ok(debug_info)
}
/// 调试:检查数据库中的执行环境数据
#[tauri::command]
pub async fn debug_check_execution_environments(
state: State<'_, AppState>,
) -> Result<serde_json::Value, String> {
info!("调试:检查数据库中的执行环境数据");
// 获取执行环境仓库
let repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取执行环境仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "执行环境仓库未初始化".to_string())?;
// 查询所有执行环境
let environments = repo.find_all(None)
.map_err(|e| format!("查询执行环境失败: {}", e))?;
// 构建调试信息
let debug_info = serde_json::json!({
"total_environments": environments.len(),
"environments": environments.iter().map(|env| serde_json::json!({
"id": env.id,
"name": env.name,
"environment_type": env.environment_type,
"is_active": env.is_active,
"is_available": env.is_available,
"health_status": env.health_status,
"supported_workflow_types": env.supported_workflow_types,
"priority": env.priority,
"base_url": env.base_url,
"created_at": env.created_at
})).collect::<Vec<_>>()
});
info!("数据库中共有 {} 个执行环境", environments.len());
Ok(debug_info)
}
/// 批量导出工作流模板
#[tauri::command]
pub async fn batch_export_workflow_templates(

View File

@ -100,17 +100,46 @@ export const WorkflowFormGenerator: React.FC<WorkflowFormGeneratorProps> = ({
useEffect(() => {
if (!uiConfig || !uiConfig.form_fields) return;
const initializedData: WorkflowFormData = { ...initialData };
setFormData(prev => {
const initializedData: WorkflowFormData = {};
// 为每个字段设置默认值(如果没有值的话)
uiConfig.form_fields.forEach(field => {
if (!(field.name in initializedData)) {
initializedData[field.name] = field.default_value ?? getDefaultValueForFieldType(field.type);
// 为每个字段设置默认值
uiConfig.form_fields.forEach(field => {
// 优先使用当前值然后是initialData中的值然后是字段默认值最后是类型默认值
if (field.name in prev) {
initializedData[field.name] = prev[field.name];
} else if (field.name in initialData) {
initializedData[field.name] = initialData[field.name];
} else {
initializedData[field.name] = field.default_value ?? getDefaultValueForFieldType(field.type);
}
});
// 只有当数据真正不同时才返回新对象
if (JSON.stringify(initializedData) !== JSON.stringify(prev)) {
return initializedData;
}
return prev;
});
}, [uiConfig]);
setFormData(initializedData);
}, [uiConfig, initialData]);
// 处理initialData变化仅当有意义的数据时
useEffect(() => {
if (!initialData || Object.keys(initialData).length === 0) return;
// 使用函数式更新来避免依赖formData
setFormData(prev => {
// 检查是否有实际变化
const hasChanges = Object.keys(initialData).some(key =>
prev[key] !== initialData[key]
);
if (hasChanges) {
return { ...prev, ...initialData };
}
return prev;
});
}, [JSON.stringify(initialData)]); // 使用JSON.stringify来深度比较
// 单独的 useEffect 来通知父组件数据变化,避免渲染时调用
useEffect(() => {
@ -128,7 +157,7 @@ export const WorkflowFormGenerator: React.FC<WorkflowFormGeneratorProps> = ({
const updateFormData = useCallback((fieldName: string, value: any) => {
setFormData(prev => {
const newData = { ...prev, [fieldName]: value };
onFormDataChange?.(newData);
// 不在这里直接调用onFormDataChange让useEffect处理
return newData;
});
@ -141,7 +170,7 @@ export const WorkflowFormGenerator: React.FC<WorkflowFormGeneratorProps> = ({
}
return prev;
});
}, [onFormDataChange]); // 只依赖onFormDataChange
}, []); // 移除onFormDataChange依赖
// 验证表单
const validateForm = useCallback(() => {