From 089999696325fa633428026a14407b849f4d4373 Mon Sep 17 00:00:00 2001 From: imeepos Date: Fri, 8 Aug 2025 13:19:18 +0800 Subject: [PATCH] fix: bug --- apps/desktop/src-tauri/src/app_state.rs | 35 +++++++ .../services/universal_workflow_service.rs | 54 ++++++++++- .../services/workflow_execution_service.rs | 4 +- .../models/workflow_execution_environment.rs | 9 -- ...rkflow_execution_environment_repository.rs | 96 +++++++++++++++++-- .../infrastructure/tolerant_json_parser.rs | 4 - apps/desktop/src-tauri/src/lib.rs | 13 ++- .../commands/workflow_commands.rs | 79 ++++++++++++++- .../workflow/WorkflowFormGenerator.tsx | 47 +++++++-- 9 files changed, 302 insertions(+), 39 deletions(-) diff --git a/apps/desktop/src-tauri/src/app_state.rs b/apps/desktop/src-tauri/src/app_state.rs index fb8af61..2ed8467 100644 --- a/apps/desktop/src-tauri/src/app_state.rs +++ b/apps/desktop/src-tauri/src/app_state.rs @@ -429,6 +429,41 @@ impl AppState { Ok(()) } + + /// 启动监控服务 + pub async fn start_monitoring_service(&self) -> anyhow::Result<()> { + use crate::business::services::workflow_monitoring_service::{WorkflowMonitoringService, MonitoringConfig}; + + info!("正在启动监控服务..."); + + // 获取必要的仓库 + let execution_record_repo = { + let repo_guard = self.workflow_execution_record_repository.lock().unwrap(); + repo_guard.as_ref() + .ok_or_else(|| anyhow::anyhow!("工作流执行记录仓库未初始化"))? + .clone() + }; + + let execution_environment_repo = { + let repo_guard = self.workflow_execution_environment_repository.lock().unwrap(); + repo_guard.as_ref() + .ok_or_else(|| anyhow::anyhow!("工作流执行环境仓库未初始化"))? + .clone() + }; + + // 创建监控服务 + let monitoring_service = WorkflowMonitoringService::new( + execution_record_repo, + execution_environment_repo, + Some(MonitoringConfig::default()), + ); + + // 启动监控服务 + monitoring_service.start_monitoring().await?; + + info!("监控服务启动成功"); + Ok(()) + } } impl Default for AppState { diff --git a/apps/desktop/src-tauri/src/business/services/universal_workflow_service.rs b/apps/desktop/src-tauri/src/business/services/universal_workflow_service.rs index aecb4ca..75809f5 100644 --- a/apps/desktop/src-tauri/src/business/services/universal_workflow_service.rs +++ b/apps/desktop/src-tauri/src/business/services/universal_workflow_service.rs @@ -11,7 +11,7 @@ use crate::infrastructure::database::Database; use anyhow::{Result, anyhow}; use serde_json::Value; use std::sync::Arc; -use tracing::{info, debug}; +use tracing::{info, debug, warn}; /// 通用工作流执行服务 /// @@ -100,9 +100,13 @@ impl UniversalWorkflowService { request: ExecuteWorkflowRequest, ) -> Result { info!("开始执行工作流: {}", request.workflow_identifier); + info!("执行请求详情: identifier='{}', version={:?}, environment_id={:?}", + request.workflow_identifier, request.version, request.environment_id); // 1. 获取工作流模板 let template = self.get_workflow_template(&request.workflow_identifier, request.version.as_deref()).await?; + info!("成功获取工作流模板: ID={}, 名称='{}', 类型={:?}", + template.id.unwrap_or(-1), template.name, template.workflow_type); // 2. 验证输入数据 template.validate_input(&request.input_data).map_err(|e| anyhow!(e))?; @@ -175,30 +179,58 @@ impl UniversalWorkflowService { identifier: &str, version: Option<&str>, ) -> Result { + info!("正在查找工作流模板: identifier='{}', version={:?}", identifier, version); + // 首先尝试按ID查询(如果identifier是数字) if let Ok(id) = identifier.parse::() { + info!("尝试按ID查询工作流模板: {}", id); if let Some(template) = self.workflow_template_repository.find_by_id(id)? { + info!("成功找到工作流模板: ID={}, 名称='{}', 类型={:?}", + template.id.unwrap_or(-1), template.name, template.workflow_type); return Ok(template); + } else { + warn!("未找到ID为 {} 的工作流模板", id); } + } else { + info!("identifier '{}' 不是有效的数字ID,尝试按base_name查询", identifier); } // 按base_name查询 if let Some(version) = version { // 查询指定版本 + info!("按base_name和版本查询: base_name='{}', version='{}'", identifier, version); let templates = self.workflow_template_repository.find_by_base_name(identifier)?; + info!("找到 {} 个base_name为 '{}' 的模板", templates.len(), identifier); + for template in templates { if template.version == version { + info!("找到匹配版本的模板: ID={}, 名称='{}'", + template.id.unwrap_or(-1), template.name); return Ok(template); } } return Err(anyhow!("未找到工作流模板: {} 版本 {}", identifier, version)); } else { // 查询最新版本 + info!("按base_name查询最新版本: '{}'", identifier); if let Some(template) = self.workflow_template_repository.get_latest_version(identifier)? { + info!("找到最新版本模板: ID={}, 名称='{}', 版本='{}'", + template.id.unwrap_or(-1), template.name, template.version); return Ok(template); + } else { + warn!("未找到base_name为 '{}' 的模板", identifier); } } + // 如果都没找到,列出所有可用的模板进行调试 + let all_templates = self.workflow_template_repository.find_all(None)?; + warn!("未找到工作流模板 '{}', 数据库中共有 {} 个模板:", identifier, all_templates.len()); + for template in &all_templates { + warn!(" 模板: ID={}, 名称='{}', base_name='{}', 版本='{}', 类型={:?}", + template.id.unwrap_or(-1), template.name, template.base_name, + template.version, template.workflow_type); + } + Err(anyhow!("未找到工作流模板: {}", identifier)) } @@ -223,11 +255,29 @@ impl UniversalWorkflowService { // 自动选择最佳环境 // 获取支持当前工作流类型的可用环境,按优先级排序 - let workflow_type_str = serde_json::to_string(&template.workflow_type)?; + let workflow_type_str = template.workflow_type.to_string(); + info!("正在查找支持工作流类型 '{}' 的执行环境", workflow_type_str); + let available_environments = self.workflow_execution_environment_repository .find_available_environments(Some(&workflow_type_str))?; + info!("找到 {} 个支持工作流类型 '{}' 的可用执行环境", available_environments.len(), workflow_type_str); + if available_environments.is_empty() { + // 获取所有执行环境进行调试 + let all_environments = self.workflow_execution_environment_repository.find_all(None)?; + warn!("没有找到支持工作流类型 '{}' 的执行环境", workflow_type_str); + warn!("数据库中共有 {} 个执行环境:", all_environments.len()); + + for env in &all_environments { + warn!(" 环境: {} (ID: {:?})", env.name, env.id); + warn!(" 类型: {:?}", env.environment_type); + warn!(" 是否活跃: {}", env.is_active); + warn!(" 是否可用: {}", env.is_available); + warn!(" 健康状态: {:?}", env.health_status); + warn!(" 支持的工作流类型: {:?}", env.supported_workflow_types); + } + return Err(anyhow!("没有可用的执行环境支持工作流类型: {:?}", template.workflow_type)); } diff --git a/apps/desktop/src-tauri/src/business/services/workflow_execution_service.rs b/apps/desktop/src-tauri/src/business/services/workflow_execution_service.rs index 23415b2..6c62484 100644 --- a/apps/desktop/src-tauri/src/business/services/workflow_execution_service.rs +++ b/apps/desktop/src-tauri/src/business/services/workflow_execution_service.rs @@ -6,7 +6,7 @@ use regex::Regex; use crate::data::models::outfit_photo_generation::{ ExecuteWorkflowRequest, ExecuteWorkflowResponse, NodeMapping, WorkflowNodeReplacement }; -use crate::data::models::workflow_template::WorkflowTemplate; +use crate::data::models::workflow_template::{WorkflowTemplate, WorkflowType}; use crate::data::models::workflow_execution_record::{ CreateExecutionRecordRequest, WorkflowExecutionRecord }; @@ -301,7 +301,7 @@ mod tests { name: "测试工作流".to_string(), base_name: "test_workflow".to_string(), version: "1.0".to_string(), - workflow_type: crate::data::models::workflow_template::WorkflowType::Custom, + workflow_type: WorkflowType::Custom("test_workflow".to_string()), description: Some("测试用工作流模板".to_string()), comfyui_workflow_json: json!({ "1": { diff --git a/apps/desktop/src-tauri/src/data/models/workflow_execution_environment.rs b/apps/desktop/src-tauri/src/data/models/workflow_execution_environment.rs index 7d3984d..461355a 100644 --- a/apps/desktop/src-tauri/src/data/models/workflow_execution_environment.rs +++ b/apps/desktop/src-tauri/src/data/models/workflow_execution_environment.rs @@ -8,21 +8,12 @@ use std::collections::HashMap; pub enum EnvironmentType { /// 本地ComfyUI LocalComfyui, - /// Modal云端 - ModalCloud, - /// RunPod云端 - RunpodCloud, - /// 自定义环境 - Custom, } impl std::fmt::Display for EnvironmentType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { EnvironmentType::LocalComfyui => write!(f, "local_comfyui"), - EnvironmentType::ModalCloud => write!(f, "modal_cloud"), - EnvironmentType::RunpodCloud => write!(f, "runpod_cloud"), - EnvironmentType::Custom => write!(f, "custom"), } } } diff --git a/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs b/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs index 7d5e7eb..2fde419 100644 --- a/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs +++ b/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs @@ -8,7 +8,7 @@ use anyhow::{Result, anyhow}; use chrono::{DateTime, Utc}; use rusqlite::Row; use std::sync::Arc; -use tracing::{debug, info, error}; +use tracing::{debug, info, error, warn}; /// 工作流执行环境数据仓库 /// 遵循 Tauri 开发规范的仓库模式设计 @@ -153,22 +153,70 @@ impl WorkflowExecutionEnvironmentRepository { /// 获取可用的执行环境(按优先级排序) pub fn find_available_environments(&self, workflow_type: Option<&str>) -> Result> { + use tracing::{info, warn, debug}; + + info!("查找可用的执行环境,工作流类型: {:?}", workflow_type); + let mut filter = ExecutionEnvironmentFilter { is_active: Some(true), is_available: Some(true), - health_status: Some(HealthStatus::Healthy), + // 允许健康状态为 Healthy 或 Unknown 的环境(Unknown 通常表示还未进行健康检查) + // 只排除明确不健康(Unhealthy)或维护中(Maintenance)的环境 + health_status: None, // 我们将在后续手动过滤 ..Default::default() }; if let Some(wf_type) = workflow_type { + info!("设置工作流类型过滤器: '{}'", wf_type); filter.supported_workflow_type = Some(wf_type.to_string()); } + debug!("执行环境查询过滤器: {:?}", filter); let mut environments = self.find_all(Some(&filter))?; - + + info!("找到 {} 个匹配过滤条件的执行环境", environments.len()); + + // 手动过滤健康状态:只排除明确不健康或维护中的环境 + // 允许 Healthy 和 Unknown 状态的环境通过 + environments.retain(|env| { + match env.health_status { + HealthStatus::Unhealthy | HealthStatus::Maintenance => { + debug!("排除不健康的环境: {} (状态: {:?})", env.name, env.health_status); + false + } + HealthStatus::Healthy | HealthStatus::Unknown => { + debug!("保留可用的环境: {} (状态: {:?})", env.name, env.health_status); + true + } + } + }); + + info!("健康状态过滤后剩余 {} 个执行环境", environments.len()); + + // 如果没有找到环境,尝试不带工作流类型过滤器查询所有环境进行调试 + if environments.is_empty() && workflow_type.is_some() { + warn!("没有找到支持工作流类型 '{}' 的环境,查询所有环境进行调试", workflow_type.unwrap()); + let all_environments = self.find_all(None)?; + warn!("数据库中共有 {} 个执行环境:", all_environments.len()); + + for env in &all_environments { + warn!(" 环境: {} (ID: {:?})", env.name, env.id); + warn!(" 是否活跃: {}", env.is_active); + warn!(" 是否可用: {}", env.is_available); + warn!(" 健康状态: {:?}", env.health_status); + warn!(" 支持的工作流类型: {:?}", env.supported_workflow_types); + + // 检查是否包含目标工作流类型 + let contains_type = env.supported_workflow_types.iter() + .any(|t| t == workflow_type.unwrap()); + warn!(" 是否包含目标类型 '{}': {}", workflow_type.unwrap(), contains_type); + } + } + // 按优先级降序排序 environments.sort_by(|a, b| b.priority.cmp(&a.priority)); - + + info!("返回 {} 个按优先级排序的可用执行环境", environments.len()); Ok(environments) } @@ -453,13 +501,40 @@ impl WorkflowExecutionEnvironmentRepository { /// 检查ComfyUI环境健康状态 async fn check_comfyui_health(&self, base_url: &str) -> HealthStatus { - // 实现ComfyUI特定的健康检查 - // 这里可以调用ComfyUI的API端点 - match reqwest::get(&format!("{}/api/servers/status", base_url)).await { - Ok(response) if response.status().is_success() => HealthStatus::Healthy, - Ok(_) => HealthStatus::Unhealthy, - Err(_) => HealthStatus::Unhealthy, + info!("检查ComfyUI环境健康状态: {}", base_url); + + // 尝试多个可能的健康检查端点 + let endpoints = vec![ + "/system_stats", // 标准ComfyUI端点 + "/", // API根端点 + "/api/servers/status" // 备用端点 + ]; + + for endpoint in endpoints { + let url = format!("{}{}", base_url.trim_end_matches('/'), endpoint); + debug!("尝试健康检查端点: {}", url); + + match reqwest::Client::new() + .get(&url) + .timeout(std::time::Duration::from_secs(10)) + .send() + .await + { + Ok(response) if response.status().is_success() => { + info!("ComfyUI环境健康检查成功: {} (端点: {})", base_url, endpoint); + return HealthStatus::Healthy; + } + Ok(response) => { + debug!("端点 {} 响应状态: {}", endpoint, response.status()); + } + Err(e) => { + debug!("端点 {} 连接失败: {}", endpoint, e); + } + } } + + warn!("ComfyUI环境健康检查失败: {}", base_url); + HealthStatus::Unhealthy } /// 检查通用环境健康状态 @@ -513,6 +588,7 @@ impl WorkflowExecutionEnvironmentRepository { if let Some(supported_workflow_type) = &filter.supported_workflow_type { where_clauses.push("supported_workflow_types LIKE ?"); let search_pattern = format!("%\"{}\"%%", supported_workflow_type); + tracing::debug!("工作流类型查询模式: '{}' -> LIKE '{}'", supported_workflow_type, search_pattern); params.push(Box::new(search_pattern)); } diff --git a/apps/desktop/src-tauri/src/infrastructure/tolerant_json_parser.rs b/apps/desktop/src-tauri/src/infrastructure/tolerant_json_parser.rs index 4cfd2be..27005e5 100644 --- a/apps/desktop/src-tauri/src/infrastructure/tolerant_json_parser.rs +++ b/apps/desktop/src-tauri/src/infrastructure/tolerant_json_parser.rs @@ -177,8 +177,6 @@ impl TolerantJsonParser { return Err(anyhow!("Text too long: {} bytes", text.len())); } - info!("Starting tolerant JSON parsing, text length: {}", text.len()); - // 首先尝试标准JSON解析 match serde_json::from_str::(text) { Ok(result) => { @@ -193,7 +191,6 @@ impl TolerantJsonParser { parse_time_ms: start_time.elapsed().as_millis() as u64, recovery_strategies_used: recovery_strategies, }; - info!("Standard JSON parsing successful in {}ms", stats.parse_time_ms); return Ok((processed_result, stats)); } Err(_) => { @@ -943,7 +940,6 @@ impl TolerantJsonParser { // 尝试解析YAML match serde_yaml::from_str::(trimmed) { Ok(yaml_value) => { - info!("Successfully parsed YAML content"); Ok(yaml_value) } Err(e) => { diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index e2b9d21..a7ed38b 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -16,7 +16,7 @@ use app_state::AppState; use presentation::commands; use tauri::Manager; use infrastructure::logging; -use tracing::info; +use tracing::{info, error}; #[cfg_attr(mobile, tauri::mobile_entry_point)] pub fn run() { @@ -608,7 +608,9 @@ pub fn run() { commands::workflow_commands::create_execution_environment, commands::workflow_commands::update_execution_environment, commands::workflow_commands::delete_execution_environment, - commands::workflow_commands::health_check_execution_environment + commands::workflow_commands::health_check_execution_environment, + commands::workflow_commands::debug_check_workflow_templates, + commands::workflow_commands::debug_check_execution_environments ]) .setup(|app| { // 初始化日志系统 @@ -640,6 +642,13 @@ pub fn run() { let _ = event_bus_manager.publish_app_started().await; }); + // 启动监控服务(同步方式) + if let Err(e) = tauri::async_runtime::block_on(state.start_monitoring_service()) { + error!("启动监控服务失败: {}", e); + } else { + info!("监控服务启动成功"); + } + // 记录启动性能指标 { let mut monitor = state.performance_monitor.lock().unwrap(); diff --git a/apps/desktop/src-tauri/src/presentation/commands/workflow_commands.rs b/apps/desktop/src-tauri/src/presentation/commands/workflow_commands.rs index 1017b54..c3ba9ce 100644 --- a/apps/desktop/src-tauri/src/presentation/commands/workflow_commands.rs +++ b/apps/desktop/src-tauri/src/presentation/commands/workflow_commands.rs @@ -127,6 +127,7 @@ pub async fn execute_workflow_with_mapping( state: State<'_, AppState>, ) -> Result { info!("执行基于映射的工作流,模板ID: {}", request.workflow_template_id); + info!("请求详情: {:?}", request); // 获取工作流模板仓库 let template_repo = { @@ -174,7 +175,7 @@ pub async fn execute_workflow_with_mapping( // 转换请求格式到新的统一格式 let unified_request = crate::business::services::universal_workflow_service::ExecuteWorkflowRequest { - workflow_identifier: format!("template_{}", request.workflow_template_id), + workflow_identifier: request.workflow_template_id.to_string(), version: None, input_data: request.input_data, environment_id: request.execution_environment_id, @@ -659,6 +660,82 @@ pub async fn validate_workflow_template( Ok(warnings) } +/// 调试:检查数据库中的工作流模板数据 +#[tauri::command] +pub async fn debug_check_workflow_templates( + state: State<'_, AppState>, +) -> Result { + info!("调试:检查数据库中的工作流模板数据"); + + // 获取工作流模板仓库 + let repo_guard = state.get_workflow_template_repository() + .map_err(|e| format!("获取工作流模板仓库失败: {}", e))?; + + let repo = repo_guard.as_ref() + .ok_or_else(|| "工作流模板仓库未初始化".to_string())?; + + // 查询所有工作流模板 + let templates = repo.find_all(None) + .map_err(|e| format!("查询工作流模板失败: {}", e))?; + + // 构建调试信息 + let debug_info = serde_json::json!({ + "total_templates": templates.len(), + "templates": templates.iter().map(|t| serde_json::json!({ + "id": t.id, + "name": t.name, + "base_name": t.base_name, + "version": t.version, + "type": t.workflow_type, + "is_active": t.is_active, + "is_published": t.is_published, + "created_at": t.created_at + })).collect::>() + }); + + info!("数据库中共有 {} 个工作流模板", templates.len()); + Ok(debug_info) +} + +/// 调试:检查数据库中的执行环境数据 +#[tauri::command] +pub async fn debug_check_execution_environments( + state: State<'_, AppState>, +) -> Result { + info!("调试:检查数据库中的执行环境数据"); + + // 获取执行环境仓库 + let repo_guard = state.get_workflow_execution_environment_repository() + .map_err(|e| format!("获取执行环境仓库失败: {}", e))?; + + let repo = repo_guard.as_ref() + .ok_or_else(|| "执行环境仓库未初始化".to_string())?; + + // 查询所有执行环境 + let environments = repo.find_all(None) + .map_err(|e| format!("查询执行环境失败: {}", e))?; + + // 构建调试信息 + let debug_info = serde_json::json!({ + "total_environments": environments.len(), + "environments": environments.iter().map(|env| serde_json::json!({ + "id": env.id, + "name": env.name, + "environment_type": env.environment_type, + "is_active": env.is_active, + "is_available": env.is_available, + "health_status": env.health_status, + "supported_workflow_types": env.supported_workflow_types, + "priority": env.priority, + "base_url": env.base_url, + "created_at": env.created_at + })).collect::>() + }); + + info!("数据库中共有 {} 个执行环境", environments.len()); + Ok(debug_info) +} + /// 批量导出工作流模板 #[tauri::command] pub async fn batch_export_workflow_templates( diff --git a/apps/desktop/src/components/workflow/WorkflowFormGenerator.tsx b/apps/desktop/src/components/workflow/WorkflowFormGenerator.tsx index d2a26ad..f39ac3f 100644 --- a/apps/desktop/src/components/workflow/WorkflowFormGenerator.tsx +++ b/apps/desktop/src/components/workflow/WorkflowFormGenerator.tsx @@ -100,17 +100,46 @@ export const WorkflowFormGenerator: React.FC = ({ useEffect(() => { if (!uiConfig || !uiConfig.form_fields) return; - const initializedData: WorkflowFormData = { ...initialData }; + setFormData(prev => { + const initializedData: WorkflowFormData = {}; - // 为每个字段设置默认值(如果没有值的话) - uiConfig.form_fields.forEach(field => { - if (!(field.name in initializedData)) { - initializedData[field.name] = field.default_value ?? getDefaultValueForFieldType(field.type); + // 为每个字段设置默认值 + uiConfig.form_fields.forEach(field => { + // 优先使用当前值,然后是initialData中的值,然后是字段默认值,最后是类型默认值 + if (field.name in prev) { + initializedData[field.name] = prev[field.name]; + } else if (field.name in initialData) { + initializedData[field.name] = initialData[field.name]; + } else { + initializedData[field.name] = field.default_value ?? getDefaultValueForFieldType(field.type); + } + }); + + // 只有当数据真正不同时才返回新对象 + if (JSON.stringify(initializedData) !== JSON.stringify(prev)) { + return initializedData; } + return prev; }); + }, [uiConfig]); - setFormData(initializedData); - }, [uiConfig, initialData]); + // 处理initialData变化(仅当有意义的数据时) + useEffect(() => { + if (!initialData || Object.keys(initialData).length === 0) return; + + // 使用函数式更新来避免依赖formData + setFormData(prev => { + // 检查是否有实际变化 + const hasChanges = Object.keys(initialData).some(key => + prev[key] !== initialData[key] + ); + + if (hasChanges) { + return { ...prev, ...initialData }; + } + return prev; + }); + }, [JSON.stringify(initialData)]); // 使用JSON.stringify来深度比较 // 单独的 useEffect 来通知父组件数据变化,避免渲染时调用 useEffect(() => { @@ -128,7 +157,7 @@ export const WorkflowFormGenerator: React.FC = ({ const updateFormData = useCallback((fieldName: string, value: any) => { setFormData(prev => { const newData = { ...prev, [fieldName]: value }; - onFormDataChange?.(newData); + // 不在这里直接调用onFormDataChange,让useEffect处理 return newData; }); @@ -141,7 +170,7 @@ export const WorkflowFormGenerator: React.FC = ({ } return prev; }); - }, [onFormDataChange]); // 只依赖onFormDataChange + }, []); // 移除onFormDataChange依赖 // 验证表单 const validateForm = useCallback(() => {