fix: 修复执行环境查询时的JSON解析错误

- 添加多种时间戳格式支持,解决数据库时间戳解析问题
- 简化JSON解析逻辑,移除复杂的容错解析器依赖
- 修复环境类型和健康状态的解析逻辑
- 解决'查询执行环境失败: 解析查询结果失败: Conversion error from type Text'错误
- 与之前修复工作流模板JSON解析问题的方案保持一致
This commit is contained in:
imeepos 2025-08-07 17:26:30 +08:00
parent 6b94aaf260
commit a41ead0021
1 changed files with 156 additions and 54 deletions

View File

@ -3,11 +3,12 @@ use crate::data::models::workflow_execution_environment::{
CreateExecutionEnvironmentRequest, UpdateExecutionEnvironmentRequest, ExecutionEnvironmentFilter
};
use crate::infrastructure::database::Database;
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use rusqlite::Row;
use std::sync::Arc;
use tracing::{debug, info};
use tracing::{debug, info, error};
/// 工作流执行环境数据仓库
/// 遵循 Tauri 开发规范的仓库模式设计
@ -526,56 +527,146 @@ impl WorkflowExecutionEnvironmentRepository {
(sql, params)
}
/// 安全地解析可选JSON字段的辅助函数
fn parse_optional_json_field(&self, row: &Row, field_name: &str) -> rusqlite::Result<Option<serde_json::Value>> {
match row.get::<_, Option<String>>(field_name) {
Ok(Some(json_str)) => {
if json_str.trim().is_empty() {
Ok(Some(serde_json::Value::Object(serde_json::Map::new())))
} else {
match serde_json::from_str(&json_str) {
Ok(parsed_value) => {
debug!("Successfully parsed optional {}", field_name);
Ok(Some(parsed_value))
},
Err(e) => {
error!("JSON parsing failed for optional {}: '{}', error: {}", field_name, json_str, e);
// 如果解析失败,返回空对象而不是错误
Ok(Some(serde_json::Value::Object(serde_json::Map::new())))
}
}
}
},
Ok(None) => Ok(None),
Err(e) => {
error!("Error getting {}: {}", field_name, e);
Err(e)
}
}
}
/// 解析多种时间戳格式的辅助函数
fn parse_datetime(&self, datetime_str: &str) -> Result<chrono::DateTime<Utc>, Box<dyn std::error::Error + Send + Sync>> {
use chrono::{DateTime, NaiveDateTime, Utc};
// 尝试多种时间格式
// 1. 尝试 RFC3339 格式 (ISO 8601)
if let Ok(dt) = DateTime::parse_from_rfc3339(datetime_str) {
return Ok(dt.with_timezone(&Utc));
}
// 2. 尝试常见的数据库格式: "YYYY-MM-DD HH:MM:SS"
if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%d %H:%M:%S") {
return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
}
// 3. 尝试带毫秒的格式: "YYYY-MM-DD HH:MM:SS.fff"
if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%d %H:%M:%S%.f") {
return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
}
// 4. 尝试 ISO 格式但没有时区: "YYYY-MM-DDTHH:MM:SS"
if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%dT%H:%M:%S") {
return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
}
// 5. 尝试 ISO 格式带毫秒但没有时区: "YYYY-MM-DDTHH:MM:SS.fff"
if let Ok(naive_dt) = NaiveDateTime::parse_from_str(datetime_str, "%Y-%m-%dT%H:%M:%S%.f") {
return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
}
Err(format!("Unable to parse datetime: '{}'", datetime_str).into())
}
/// 将数据库行转换为WorkflowExecutionEnvironment对象
fn row_to_environment(&self, row: &Row) -> rusqlite::Result<WorkflowExecutionEnvironment> {
let id: Option<i64> = Some(row.get("id")?);
let name: String = row.get("name")?;
// 解析环境类型
let type_str: String = row.get("type")?;
let environment_type: EnvironmentType = serde_json::from_str(&type_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?;
// 解析基本字段
let description: Option<String> = row.get("description")?;
let base_url: String = row.get("base_url")?;
let api_key: Option<String> = row.get("api_key")?;
// 解析环境类型
let type_str: String = row.get("type")?;
let environment_type: EnvironmentType = match serde_json::from_str(&type_str) {
Ok(env_type) => env_type,
Err(_) => {
// 如果JSON解析失败尝试直接解析字符串
match type_str.trim_matches('"') {
"local_comfyui" => EnvironmentType::LocalComfyui,
"modal_cloud" => EnvironmentType::ModalCloud,
"runpod_cloud" => EnvironmentType::RunpodCloud,
"custom" => EnvironmentType::Custom,
_ => {
error!("Unknown environment type: {}", type_str);
EnvironmentType::Custom // 默认值
}
}
}
};
// 解析JSON字段
let connection_config_json: Option<serde_json::Value> = row.get::<_, Option<String>>("connection_config_json")?
.map(|s| serde_json::from_str(&s))
.transpose()
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?;
let connection_config_json = self.parse_optional_json_field(row, "connection_config_json")?;
let supported_workflow_types_str: String = row.get("supported_workflow_types")?;
let supported_workflow_types: Vec<String> = serde_json::from_str(&supported_workflow_types_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?;
let supported_workflow_types: Vec<String> = match serde_json::from_str(&supported_workflow_types_str) {
Ok(types) => types,
Err(e) => {
error!("Failed to parse supported_workflow_types: '{}', error: {}", supported_workflow_types_str, e);
// 返回默认值
vec!["image_generation".to_string()]
}
};
let max_concurrent_jobs: i32 = row.get("max_concurrent_jobs")?;
let priority: i32 = row.get("priority")?;
let is_active: bool = row.get("is_active")?;
let is_available: bool = row.get("is_available")?;
// 解析健康检查时间
let last_health_check: Option<DateTime<Utc>> = row.get::<_, Option<String>>("last_health_check")?
.map(|s| DateTime::parse_from_rfc3339(&s))
.transpose()
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?
.map(|dt| dt.with_timezone(&Utc));
// 解析健康检查时间 - 使用多格式时间解析
let last_health_check: Option<DateTime<Utc>> = match row.get::<_, Option<String>>("last_health_check")? {
Some(time_str) => {
match self.parse_datetime(&time_str) {
Ok(dt) => Some(dt),
Err(e) => {
error!("Failed to parse last_health_check: '{}', error: {}", time_str, e);
None
}
}
},
None => None
};
// 解析健康状态
let health_status_str: String = row.get("health_status")?;
let health_status: HealthStatus = serde_json::from_str(&health_status_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?;
let health_status: HealthStatus = match serde_json::from_str(&health_status_str) {
Ok(status) => status,
Err(_) => {
// 如果JSON解析失败尝试直接解析字符串
match health_status_str.trim_matches('"') {
"healthy" => HealthStatus::Healthy,
"unhealthy" => HealthStatus::Unhealthy,
"unknown" => HealthStatus::Unknown,
_ => {
error!("Unknown health status: {}", health_status_str);
HealthStatus::Unknown // 默认值
}
}
}
};
let average_response_time_ms: Option<i32> = row.get("average_response_time_ms")?;
let success_rate: f64 = row.get("success_rate")?;
@ -584,34 +675,45 @@ impl WorkflowExecutionEnvironmentRepository {
let max_memory_mb: Option<i32> = row.get("max_memory_mb")?;
let max_execution_time_seconds: Option<i32> = row.get("max_execution_time_seconds")?;
let metadata_json: Option<serde_json::Value> = row.get::<_, Option<String>>("metadata_json")?
.map(|s| serde_json::from_str(&s))
.transpose()
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?;
// 解析metadata_json - 使用容错解析器
let metadata_json = self.parse_optional_json_field(row, "metadata_json")?;
let tags: Option<Vec<String>> = row.get::<_, Option<String>>("tags")?
.map(|s| serde_json::from_str(&s))
.transpose()
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?;
// 解析tags字段
let tags: Option<Vec<String>> = match row.get::<_, Option<String>>("tags")? {
Some(tags_str) => {
if tags_str.trim().is_empty() {
None
} else {
match serde_json::from_str(&tags_str) {
Ok(tags_vec) => Some(tags_vec),
Err(e) => {
error!("Failed to parse tags JSON: '{}', error: {}", tags_str, e);
None
}
}
}
},
None => None
};
// 解析时间戳
// 解析时间戳 - 使用多格式时间解析
let created_at_str: String = row.get("created_at")?;
let created_at = DateTime::parse_from_rfc3339(&created_at_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?
.with_timezone(&Utc);
let created_at = self.parse_datetime(&created_at_str)
.map_err(|e| {
error!("解析 created_at 时间戳失败: '{}', error: {}", created_at_str, e);
rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, e
)
})?;
let updated_at_str: String = row.get("updated_at")?;
let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)
.map_err(|e| rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, Box::new(e)
))?
.with_timezone(&Utc);
let updated_at = self.parse_datetime(&updated_at_str)
.map_err(|e| {
error!("解析 updated_at 时间戳失败: '{}', error: {}", updated_at_str, e);
rusqlite::Error::FromSqlConversionFailure(
0, rusqlite::types::Type::Text, e
)
})?;
Ok(WorkflowExecutionEnvironment {
id,