From a41ead002172ac15bee33c2d217a20d480f40462 Mon Sep 17 00:00:00 2001 From: imeepos Date: Thu, 7 Aug 2025 17:26:30 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E6=9F=A5=E8=AF=A2=E6=97=B6=E7=9A=84JSON?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加多种时间戳格式支持,解决数据库时间戳解析问题 - 简化JSON解析逻辑,移除复杂的容错解析器依赖 - 修复环境类型和健康状态的解析逻辑 - 解决'查询执行环境失败: 解析查询结果失败: Conversion error from type Text'错误 - 与之前修复工作流模板JSON解析问题的方案保持一致 --- ...rkflow_execution_environment_repository.rs | 210 +++++++++++++----- 1 file changed, 156 insertions(+), 54 deletions(-) diff --git a/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs b/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs index 70ac940..a1532f3 100644 --- a/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs +++ b/apps/desktop/src-tauri/src/data/repositories/workflow_execution_environment_repository.rs @@ -1,13 +1,14 @@ use crate::data::models::workflow_execution_environment::{ - WorkflowExecutionEnvironment, EnvironmentType, HealthStatus, + WorkflowExecutionEnvironment, EnvironmentType, HealthStatus, CreateExecutionEnvironmentRequest, UpdateExecutionEnvironmentRequest, ExecutionEnvironmentFilter }; use crate::infrastructure::database::Database; + use anyhow::{Result, anyhow}; use chrono::{DateTime, Utc}; use rusqlite::Row; use std::sync::Arc; -use tracing::{debug, info}; +use tracing::{debug, info, error}; /// 工作流执行环境数据仓库 /// 遵循 Tauri 开发规范的仓库模式设计 @@ -526,56 +527,146 @@ impl WorkflowExecutionEnvironmentRepository { (sql, params) } + /// 安全地解析可选JSON字段的辅助函数 + fn parse_optional_json_field(&self, row: &Row, field_name: &str) -> rusqlite::Result> { + match row.get::<_, Option>(field_name) { + Ok(Some(json_str)) => { + if json_str.trim().is_empty() { + Ok(Some(serde_json::Value::Object(serde_json::Map::new()))) + } else { + match serde_json::from_str(&json_str) { + Ok(parsed_value) => { + debug!("Successfully parsed optional {}", field_name); + Ok(Some(parsed_value)) + }, + Err(e) => { + error!("JSON parsing failed for optional {}: '{}', error: {}", field_name, json_str, e); + // 如果解析失败,返回空对象而不是错误 + Ok(Some(serde_json::Value::Object(serde_json::Map::new()))) + } + } + } + }, + Ok(None) => Ok(None), + Err(e) => { + error!("Error getting {}: {}", field_name, e); + Err(e) + } + } + } + + /// 解析多种时间戳格式的辅助函数 + fn parse_datetime(&self, datetime_str: &str) -> Result, Box> { + use chrono::{DateTime, NaiveDateTime, Utc}; + + // 尝试多种时间格式 + + // 1. 尝试 RFC3339 格式 (ISO 8601) + if let Ok(dt) = DateTime::parse_from_rfc3339(datetime_str) { + return Ok(dt.with_timezone(&Utc)); + } + + // 2. 尝试常见的数据库格式: "YYYY-MM-DD HH:MM:SS" + if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%d %H:%M:%S") { + return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc)); + } + + // 3. 尝试带毫秒的格式: "YYYY-MM-DD HH:MM:SS.fff" + if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%d %H:%M:%S%.f") { + return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc)); + } + + // 4. 尝试 ISO 格式但没有时区: "YYYY-MM-DDTHH:MM:SS" + if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%dT%H:%M:%S") { + return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc)); + } + + // 5. 尝试 ISO 格式带毫秒但没有时区: "YYYY-MM-DDTHH:MM:SS.fff" + if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%dT%H:%M:%S%.f") { + return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc)); + } + + Err(format!("Unable to parse datetime: '{}'", datetime_str).into()) + } + /// 将数据库行转换为WorkflowExecutionEnvironment对象 fn row_to_environment(&self, row: &Row) -> rusqlite::Result { let id: Option = Some(row.get("id")?); let name: String = row.get("name")?; - // 解析环境类型 - let type_str: String = row.get("type")?; - let environment_type: EnvironmentType = serde_json::from_str(&type_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))?; - + // 解析基本字段 let description: Option = row.get("description")?; let base_url: String = row.get("base_url")?; let api_key: Option = row.get("api_key")?; + // 解析环境类型 + let type_str: String = row.get("type")?; + let environment_type: EnvironmentType = match serde_json::from_str(&type_str) { + Ok(env_type) => env_type, + Err(_) => { + // 如果JSON解析失败,尝试直接解析字符串 + match type_str.trim_matches('"') { + "local_comfyui" => EnvironmentType::LocalComfyui, + "modal_cloud" => EnvironmentType::ModalCloud, + "runpod_cloud" => EnvironmentType::RunpodCloud, + "custom" => EnvironmentType::Custom, + _ => { + error!("Unknown environment type: {}", type_str); + EnvironmentType::Custom // 默认值 + } + } + } + }; + // 解析JSON字段 - let connection_config_json: Option = row.get::<_, Option>("connection_config_json")? - .map(|s| serde_json::from_str(&s)) - .transpose() - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))?; + let connection_config_json = self.parse_optional_json_field(row, "connection_config_json")?; let supported_workflow_types_str: String = row.get("supported_workflow_types")?; - let supported_workflow_types: Vec = serde_json::from_str(&supported_workflow_types_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))?; + let supported_workflow_types: Vec = match serde_json::from_str(&supported_workflow_types_str) { + Ok(types) => types, + Err(e) => { + error!("Failed to parse supported_workflow_types: '{}', error: {}", supported_workflow_types_str, e); + // 返回默认值 + vec!["image_generation".to_string()] + } + }; let max_concurrent_jobs: i32 = row.get("max_concurrent_jobs")?; let priority: i32 = row.get("priority")?; let is_active: bool = row.get("is_active")?; let is_available: bool = row.get("is_available")?; - // 解析健康检查时间 - let last_health_check: Option> = row.get::<_, Option>("last_health_check")? - .map(|s| DateTime::parse_from_rfc3339(&s)) - .transpose() - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))? - .map(|dt| dt.with_timezone(&Utc)); + // 解析健康检查时间 - 使用多格式时间解析 + let last_health_check: Option> = match row.get::<_, Option>("last_health_check")? { + Some(time_str) => { + match self.parse_datetime(&time_str) { + Ok(dt) => Some(dt), + Err(e) => { + error!("Failed to parse last_health_check: '{}', error: {}", time_str, e); + None + } + } + }, + None => None + }; // 解析健康状态 let health_status_str: String = row.get("health_status")?; - let health_status: HealthStatus = serde_json::from_str(&health_status_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))?; + let health_status: HealthStatus = match serde_json::from_str(&health_status_str) { + Ok(status) => status, + Err(_) => { + // 如果JSON解析失败,尝试直接解析字符串 + match health_status_str.trim_matches('"') { + "healthy" => HealthStatus::Healthy, + "unhealthy" => HealthStatus::Unhealthy, + "unknown" => HealthStatus::Unknown, + _ => { + error!("Unknown health status: {}", health_status_str); + HealthStatus::Unknown // 默认值 + } + } + } + }; let average_response_time_ms: Option = row.get("average_response_time_ms")?; let success_rate: f64 = row.get("success_rate")?; @@ -584,34 +675,45 @@ impl WorkflowExecutionEnvironmentRepository { let max_memory_mb: Option = row.get("max_memory_mb")?; let max_execution_time_seconds: Option = row.get("max_execution_time_seconds")?; - let metadata_json: Option = row.get::<_, Option>("metadata_json")? - .map(|s| serde_json::from_str(&s)) - .transpose() - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))?; + // 解析metadata_json - 使用容错解析器 + let metadata_json = self.parse_optional_json_field(row, "metadata_json")?; - let tags: Option> = row.get::<_, Option>("tags")? - .map(|s| serde_json::from_str(&s)) - .transpose() - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))?; + // 解析tags字段 + let tags: Option> = match row.get::<_, Option>("tags")? { + Some(tags_str) => { + if tags_str.trim().is_empty() { + None + } else { + match serde_json::from_str(&tags_str) { + Ok(tags_vec) => Some(tags_vec), + Err(e) => { + error!("Failed to parse tags JSON: '{}', error: {}", tags_str, e); + None + } + } + } + }, + None => None + }; - // 解析时间戳 + // 解析时间戳 - 使用多格式时间解析 let created_at_str: String = row.get("created_at")?; - let created_at = DateTime::parse_from_rfc3339(&created_at_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))? - .with_timezone(&Utc); + let created_at = self.parse_datetime(&created_at_str) + .map_err(|e| { + error!("解析 created_at 时间戳失败: '{}', error: {}", created_at_str, e); + rusqlite::Error::FromSqlConversionFailure( + 0, rusqlite::types::Type::Text, e + ) + })?; let updated_at_str: String = row.get("updated_at")?; - let updated_at = DateTime::parse_from_rfc3339(&updated_at_str) - .map_err(|e| rusqlite::Error::FromSqlConversionFailure( - 0, rusqlite::types::Type::Text, Box::new(e) - ))? - .with_timezone(&Utc); + let updated_at = self.parse_datetime(&updated_at_str) + .map_err(|e| { + error!("解析 updated_at 时间戳失败: '{}', error: {}", updated_at_str, e); + rusqlite::Error::FromSqlConversionFailure( + 0, rusqlite::types::Type::Text, e + ) + })?; Ok(WorkflowExecutionEnvironment { id,