mixvideo-v2/apps/desktop/src-tauri/src/presentation/commands/workflow_commands.rs

1032 lines
35 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use crate::business::services::universal_workflow_service::{
ExecuteWorkflowRequest, ExecuteWorkflowResponse
};
use crate::data::models::workflow_template::{
WorkflowTemplate, CreateWorkflowTemplateRequest, UpdateWorkflowTemplateRequest, WorkflowTemplateFilter
};
use crate::data::models::workflow_execution_record::{
WorkflowExecutionRecord, ExecutionRecordFilter
};
use crate::data::models::workflow_execution_environment::{
WorkflowExecutionEnvironment, CreateExecutionEnvironmentRequest, UpdateExecutionEnvironmentRequest, ExecutionEnvironmentFilter
};
use crate::app_state::AppState;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use tauri::State;
use tracing::info;
use chrono::Timelike;
/// 获取所有工作流模板
#[tauri::command]
pub async fn get_workflow_templates(
filter: Option<WorkflowTemplateFilter>,
state: State<'_, AppState>,
) -> Result<Vec<WorkflowTemplate>, String> {
info!("获取工作流模板列表");
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 查询工作流模板
let templates = repo.find_all(filter.as_ref())
.map_err(|e| format!("查询工作流模板失败: {}", e))?;
info!("成功获取 {} 个工作流模板", templates.len());
Ok(templates)
}
/// 根据ID获取工作流模板
#[tauri::command]
pub async fn get_workflow_template_by_id(
id: i64,
state: State<'_, AppState>,
) -> Result<Option<WorkflowTemplate>, String> {
info!("获取工作流模板: {}", id);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 查询模板
let template = repo.find_by_id(id)
.map_err(|e| format!("查询工作流模板失败: {}", e))?;
info!("成功获取工作流模板: {:?}", template.as_ref().map(|t| &t.name));
Ok(template)
}
/// 创建工作流模板
#[tauri::command]
pub async fn create_workflow_template(
request: CreateWorkflowTemplateRequest,
state: State<'_, AppState>,
) -> Result<WorkflowTemplate, String> {
info!("创建工作流模板: {}", request.name);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 创建工作流模板
let template_id = repo.create(&request)
.map_err(|e| format!("创建工作流模板失败: {}", e))?;
// 查询创建的模板并返回
let template = repo.find_by_id(template_id)
.map_err(|e| format!("查询创建的工作流模板失败: {}", e))?
.ok_or_else(|| "创建的工作流模板未找到".to_string())?;
info!("成功创建工作流模板ID: {}", template_id);
Ok(template)
}
/// 更新工作流模板
#[tauri::command]
pub async fn update_workflow_template(
id: i64,
request: UpdateWorkflowTemplateRequest,
state: State<'_, AppState>,
) -> Result<WorkflowTemplate, String> {
info!("更新工作流模板: {}", id);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 更新工作流模板
repo.update(id, &request)
.map_err(|e| format!("更新工作流模板失败: {}", e))?;
// 查询更新后的模板并返回
let template = repo.find_by_id(id)
.map_err(|e| format!("查询更新后的工作流模板失败: {}", e))?
.ok_or_else(|| "更新后的工作流模板未找到".to_string())?;
info!("成功更新工作流模板ID: {}", id);
Ok(template)
}
/// 执行基于映射的工作流
#[tauri::command]
pub async fn execute_workflow_with_mapping(
request: crate::data::models::outfit_photo_generation::ExecuteWorkflowRequest,
state: State<'_, AppState>,
) -> Result<crate::data::models::outfit_photo_generation::ExecuteWorkflowResponse, String> {
info!("执行基于映射的工作流模板ID: {}", request.workflow_template_id);
// 获取工作流模板仓库
let template_repo = {
let template_repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
template_repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?
.clone()
};
// 获取工作流执行记录仓库
let execution_repo = {
let execution_repo_guard = state.get_workflow_execution_record_repository()
.map_err(|e| format!("获取工作流执行记录仓库失败: {}", e))?;
execution_repo_guard.as_ref()
.ok_or_else(|| "工作流执行记录仓库未初始化".to_string())?
.clone()
};
// 使用通用工作流服务(支持环境管理)
let universal_service = {
let service_guard = state.get_universal_workflow_service()
.map_err(|e| format!("获取通用工作流服务失败: {}", e))?;
if service_guard.as_ref().is_none() {
// 尝试初始化通用工作流服务
drop(service_guard); // 释放锁
if let Err(e) = state.initialize_universal_workflow_service() {
return Err(format!("初始化通用工作流服务失败: {}", e));
}
// 重新获取服务
let service_guard = state.get_universal_workflow_service()
.map_err(|e| format!("重新获取通用工作流服务失败: {}", e))?;
service_guard.as_ref()
.ok_or_else(|| "通用工作流服务初始化后仍然为空".to_string())?
.clone()
} else {
service_guard.as_ref().unwrap().clone()
}
};
// 转换请求格式到新的统一格式
let unified_request = crate::business::services::universal_workflow_service::ExecuteWorkflowRequest {
workflow_identifier: format!("template_{}", request.workflow_template_id),
version: None,
input_data: request.input_data,
environment_id: request.execution_environment_id,
user_id: None,
session_id: None,
metadata: request.execution_config_override,
};
// 执行工作流
let response = universal_service.execute_workflow(unified_request).await
.map_err(|e| format!("执行工作流失败: {}", e))?;
// 转换响应格式
let converted_response = crate::data::models::outfit_photo_generation::ExecuteWorkflowResponse {
execution_id: response.execution_id,
status: match response.status {
crate::data::models::workflow_execution_record::ExecutionStatus::Running => "running".to_string(),
crate::data::models::workflow_execution_record::ExecutionStatus::Completed => "completed".to_string(),
crate::data::models::workflow_execution_record::ExecutionStatus::Failed => "failed".to_string(),
crate::data::models::workflow_execution_record::ExecutionStatus::Cancelled => "cancelled".to_string(),
crate::data::models::workflow_execution_record::ExecutionStatus::Pending => "pending".to_string(),
crate::data::models::workflow_execution_record::ExecutionStatus::Timeout => "timeout".to_string(),
},
comfyui_prompt_id: response.comfyui_prompt_id,
error_message: response.error_message,
};
info!("工作流执行成功执行记录ID: {}", response.execution_id);
Ok(converted_response)
}
/// 删除工作流模板
#[tauri::command]
pub async fn delete_workflow_template(
id: i64,
state: State<'_, AppState>,
) -> Result<(), String> {
info!("删除工作流模板: {}", id);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 删除工作流模板
repo.delete(id)
.map_err(|e| format!("删除工作流模板失败: {}", e))?;
info!("成功删除工作流模板ID: {}", id);
Ok(())
}
/// 执行工作流
#[tauri::command]
pub async fn execute_workflow(
request: ExecuteWorkflowRequest,
_state: State<'_, AppState>,
) -> Result<ExecuteWorkflowResponse, String> {
info!("执行工作流: {}", request.workflow_identifier);
// TODO: 从AppState获取UniversalWorkflowService
// 暂时返回示例响应
let response = ExecuteWorkflowResponse {
execution_id: 1,
status: crate::data::models::workflow_execution_record::ExecutionStatus::Running,
progress: 0,
output_data: None,
error_message: None,
comfyui_prompt_id: Some("test-prompt-id".to_string()),
};
Ok(response)
}
/// 获取执行状态
#[tauri::command]
pub async fn get_execution_status(
execution_id: i64,
state: State<'_, AppState>,
) -> Result<ExecuteWorkflowResponse, String> {
info!("获取执行状态: {}", execution_id);
// 获取工作流执行记录仓库
let repo_guard = state.get_workflow_execution_record_repository()
.map_err(|e| format!("获取工作流执行记录仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行记录仓库未初始化".to_string())?;
// 查询执行记录
let record = repo.find_by_id(execution_id)
.map_err(|e| format!("查询执行记录失败: {}", e))?
.ok_or_else(|| format!("执行记录不存在: {}", execution_id))?;
let response = ExecuteWorkflowResponse {
execution_id: record.id.unwrap_or(execution_id),
status: record.status,
progress: record.progress,
output_data: record.output_data_json,
comfyui_prompt_id: record.comfyui_prompt_id,
error_message: record.error_message,
};
Ok(response)
}
/// 获取执行结果
#[tauri::command]
pub async fn get_execution_results(
execution_id: i64,
state: State<'_, AppState>,
) -> Result<Option<serde_json::Value>, String> {
info!("获取执行结果: {}", execution_id);
// 获取工作流执行记录仓库
let repo_guard = state.get_workflow_execution_record_repository()
.map_err(|e| format!("获取工作流执行记录仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行记录仓库未初始化".to_string())?;
// 查询执行记录
let record = repo.find_by_id(execution_id)
.map_err(|e| format!("查询执行记录失败: {}", e))?
.ok_or_else(|| format!("执行记录不存在: {}", execution_id))?;
Ok(record.output_data_json)
}
/// 取消执行
#[tauri::command]
pub async fn cancel_execution(
execution_id: i64,
_state: State<'_, AppState>,
) -> Result<(), String> {
info!("取消执行: {}", execution_id);
// TODO: 实现实际的取消逻辑
Ok(())
}
/// 执行历史分页响应
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionHistoryResponse {
pub records: Vec<WorkflowExecutionRecord>,
pub total_count: u32,
pub page: u32,
pub page_size: u32,
pub total_pages: u32,
}
/// 获取执行历史
#[tauri::command]
pub async fn get_execution_history(
filter: Option<ExecutionRecordFilter>,
limit: Option<i32>,
offset: Option<i32>,
state: State<'_, AppState>,
) -> Result<ExecutionHistoryResponse, String> {
info!("获取执行历史");
// 获取工作流执行记录仓库
let repo_guard = state.get_workflow_execution_record_repository()
.map_err(|e| format!("获取工作流执行记录仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行记录仓库未初始化".to_string())?;
// 查询执行历史
let (records, total_count, page, page_size) = if let (Some(limit), Some(offset)) = (limit, offset) {
let page = (offset / limit.max(1)) as u32;
let page_size = limit as u32;
let (records, total) = repo.find_with_pagination(filter.as_ref(), page, page_size)
.map_err(|e| format!("分页查询执行历史失败: {}", e))?;
(records, total, page, page_size)
} else {
// 如果没有分页参数,返回所有记录
let records = repo.find_all(filter.as_ref())
.map_err(|e| format!("查询执行历史失败: {}", e))?;
let total = records.len() as u32;
(records, total, 0, total)
};
let total_pages = if page_size > 0 {
(total_count + page_size - 1) / page_size
} else {
1
};
info!("成功获取 {} 条执行历史记录,总数: {}", records.len(), total_count);
Ok(ExecutionHistoryResponse {
records,
total_count,
page,
page_size,
total_pages,
})
}
/// 获取执行环境列表
#[tauri::command]
pub async fn get_execution_environments(
filter: Option<ExecutionEnvironmentFilter>,
state: State<'_, AppState>,
) -> Result<Vec<WorkflowExecutionEnvironment>, String> {
info!("获取执行环境列表");
// 获取工作流执行环境仓库
let repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取工作流执行环境仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行环境仓库未初始化".to_string())?;
// 查询执行环境
let environments = repo.find_all(filter.as_ref())
.map_err(|e| format!("查询执行环境失败: {}", e))?;
info!("成功获取 {} 个执行环境", environments.len());
Ok(environments)
}
/// 创建执行环境
#[tauri::command]
pub async fn create_execution_environment(
request: CreateExecutionEnvironmentRequest,
state: State<'_, AppState>,
) -> Result<WorkflowExecutionEnvironment, String> {
info!("创建执行环境: {}", request.name);
// 获取工作流执行环境仓库
let repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取工作流执行环境仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行环境仓库未初始化".to_string())?;
// 创建执行环境
let environment_id = repo.create(&request)
.map_err(|e| format!("创建执行环境失败: {}", e))?;
// 查询创建的环境并返回
let environment = repo.find_by_id(environment_id)
.map_err(|e| format!("查询创建的执行环境失败: {}", e))?
.ok_or_else(|| "创建的执行环境未找到".to_string())?;
info!("成功创建执行环境ID: {}", environment_id);
Ok(environment)
}
/// 更新执行环境
#[tauri::command]
pub async fn update_execution_environment(
id: i64,
request: UpdateExecutionEnvironmentRequest,
state: State<'_, AppState>,
) -> Result<WorkflowExecutionEnvironment, String> {
info!("更新执行环境: {}, 请求数据: {:?}", id, request);
// 获取工作流执行环境仓库
let repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取工作流执行环境仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行环境仓库未初始化".to_string())?;
// 查询更新前的环境
let old_environment = repo.find_by_id(id)
.map_err(|e| format!("查询更新前的执行环境失败: {}", e))?
.ok_or_else(|| format!("执行环境不存在ID: {}", id))?;
info!("更新前的环境数据: {:?}", old_environment);
// 更新执行环境
repo.update(id, &request)
.map_err(|e| format!("更新执行环境失败: {}", e))?;
// 查询更新后的环境并返回
let environment = repo.find_by_id(id)
.map_err(|e| format!("查询更新后的执行环境失败: {}", e))?
.ok_or_else(|| "更新后的执行环境未找到".to_string())?;
info!("成功更新执行环境ID: {}, 更新后数据: {:?}", id, environment);
Ok(environment)
}
/// 删除执行环境
#[tauri::command]
pub async fn delete_execution_environment(
id: i64,
state: State<'_, AppState>,
) -> Result<(), String> {
info!("删除执行环境: {}", id);
// 获取工作流执行环境仓库
let repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取工作流执行环境仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行环境仓库未初始化".to_string())?;
// 删除执行环境
repo.delete(id)
.map_err(|e| format!("删除执行环境失败: {}", e))?;
info!("成功删除执行环境ID: {}", id);
Ok(())
}
/// 健康检查执行环境
#[tauri::command]
pub async fn health_check_execution_environment(
id: i64,
state: State<'_, AppState>,
) -> Result<crate::data::models::workflow_execution_environment::HealthStatus, String> {
info!("健康检查执行环境: {}", id);
// 获取工作流执行环境仓库并克隆以避免所有权问题
let repo = {
let repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取工作流执行环境仓库失败: {}", e))?;
repo_guard.as_ref()
.ok_or_else(|| "工作流执行环境仓库未初始化".to_string())?
.clone()
};
// 获取环境信息
let _environment = repo.find_by_id(id)
.map_err(|e| format!("查询执行环境失败: {}", e))?
.ok_or_else(|| format!("执行环境不存在ID: {}", id))?;
// 执行健康检查 - 使用公共方法
let health_results = repo.health_check_all().await
.map_err(|e| format!("执行健康检查失败: {}", e))?;
// 查找当前环境的健康状态
let health_status = health_results.iter()
.find(|(env_id, _)| *env_id == id)
.map(|(_, status)| status.clone())
.unwrap_or(crate::data::models::workflow_execution_environment::HealthStatus::Unknown);
info!("执行环境健康检查完成ID: {}, 状态: {:?}", id, health_status);
Ok(health_status)
}
/// 导出工作流模板
#[tauri::command]
pub async fn export_workflow_template(
id: i64,
state: State<'_, AppState>,
) -> Result<String, String> {
info!("导出工作流模板: {}", id);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 导出模板
let json_data = repo.export_template(id)
.map_err(|e| format!("导出工作流模板失败: {}", e))?;
info!("成功导出工作流模板ID: {}", id);
Ok(json_data)
}
/// 导入工作流模板
#[tauri::command]
pub async fn import_workflow_template(
json_data: String,
override_name: Option<String>,
state: State<'_, AppState>,
) -> Result<i64, String> {
info!("导入工作流模板");
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 导入模板
let template_id = repo.import_template(&json_data, override_name)
.map_err(|e| format!("导入工作流模板失败: {}", e))?;
info!("成功导入工作流模板新ID: {}", template_id);
Ok(template_id)
}
/// 导出工作流包
#[tauri::command]
pub async fn export_workflow_package(
package_data: serde_json::Value,
file_path: String,
) -> Result<(), String> {
info!("导出工作流包到: {}", file_path);
// 将数据写入文件
std::fs::write(&file_path, serde_json::to_string_pretty(&package_data).unwrap())
.map_err(|e| format!("写入文件失败: {}", e))?;
info!("成功导出工作流包");
Ok(())
}
/// 导入工作流包
#[tauri::command]
pub async fn import_workflow_package() -> Result<serde_json::Value, String> {
info!("导入工作流包");
// 在前端处理文件选择,这里只处理导入逻辑
// 暂时返回成功状态,实际实现时会接收文件路径参数
let result = serde_json::json!({
"success": true,
"message": "导入成功",
"imported_count": 1
});
info!("成功导入工作流包");
Ok(result)
}
/// 复制工作流模板
#[tauri::command]
pub async fn copy_workflow_template(
id: i64,
new_name: String,
new_base_name: Option<String>,
state: State<'_, AppState>,
) -> Result<i64, String> {
info!("复制工作流模板: {} -> {}", id, new_name);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 复制模板
let new_template_id = repo.copy_template(id, new_name, new_base_name)
.map_err(|e| format!("复制工作流模板失败: {}", e))?;
info!("成功复制工作流模板原ID: {}新ID: {}", id, new_template_id);
Ok(new_template_id)
}
/// 验证工作流模板
#[tauri::command]
pub async fn validate_workflow_template(
id: i64,
state: State<'_, AppState>,
) -> Result<Vec<String>, String> {
info!("验证工作流模板: {}", id);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 获取模板
let template = repo.find_by_id(id)
.map_err(|e| format!("查询工作流模板失败: {}", e))?
.ok_or_else(|| "工作流模板不存在".to_string())?;
// 验证模板
let warnings = repo.validate_template(&template)
.map_err(|e| format!("验证工作流模板失败: {}", e))?;
info!("工作流模板验证完成,发现 {} 个警告", warnings.len());
Ok(warnings)
}
/// 批量导出工作流模板
#[tauri::command]
pub async fn batch_export_workflow_templates(
ids: Vec<i64>,
state: State<'_, AppState>,
) -> Result<String, String> {
info!("批量导出工作流模板: {:?}", ids);
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 批量导出模板
let json_data = repo.batch_export(&ids)
.map_err(|e| format!("批量导出工作流模板失败: {}", e))?;
info!("成功批量导出 {} 个工作流模板", ids.len());
Ok(json_data)
}
/// 批量导入工作流模板
#[tauri::command]
pub async fn batch_import_workflow_templates(
json_data: String,
state: State<'_, AppState>,
) -> Result<Vec<i64>, String> {
info!("批量导入工作流模板");
// 获取工作流模板仓库
let repo_guard = state.get_workflow_template_repository()
.map_err(|e| format!("获取工作流模板仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流模板仓库未初始化".to_string())?;
// 批量导入模板
let template_ids = repo.batch_import(&json_data)
.map_err(|e| format!("批量导入工作流模板失败: {}", e))?;
info!("成功批量导入 {} 个工作流模板", template_ids.len());
Ok(template_ids)
}
/// 获取执行结果文件列表
#[tauri::command]
pub async fn get_execution_result_files(
execution_id: i64,
state: State<'_, AppState>,
) -> Result<Vec<crate::business::services::workflow_result_service::ResultFileInfo>, String> {
info!("获取执行结果文件列表: {}", execution_id);
// TODO: 从AppState获取WorkflowResultService实例
// 暂时返回空列表
Ok(Vec::new())
}
/// 下载执行结果文件
#[tauri::command]
pub async fn download_execution_result_file(
execution_id: i64,
file_name: String,
state: State<'_, AppState>,
) -> Result<Vec<u8>, String> {
info!("下载执行结果文件: {} - {}", execution_id, file_name);
// TODO: 从AppState获取WorkflowResultService实例
// 暂时返回空数据
Err("功能暂未实现".to_string())
}
/// 删除执行结果文件
#[tauri::command]
pub async fn delete_execution_result_files(
execution_id: i64,
state: State<'_, AppState>,
) -> Result<crate::business::services::workflow_result_service::CleanupResult, String> {
info!("删除执行结果文件: {}", execution_id);
// TODO: 从AppState获取WorkflowResultService实例
// 暂时返回空结果
Ok(crate::business::services::workflow_result_service::CleanupResult {
files_deleted: 0,
space_freed_mb: 0.0,
errors: Vec::new(),
})
}
/// 系统统计信息响应
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemStatistics {
pub execution_stats: ExecutionStats,
pub environment_stats: EnvironmentStats,
pub system_resources: SystemResources,
pub performance_metrics: PerformanceMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionStats {
pub total_executions_today: u32,
pub successful_executions_today: u32,
pub failed_executions_today: u32,
pub average_execution_time_seconds: f64,
pub current_queue_size: u32,
pub active_executions: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvironmentStats {
pub total_environments: u32,
pub healthy_environments: u32,
pub unhealthy_environments: u32,
pub average_response_time_ms: f64,
pub total_capacity: u32,
pub used_capacity: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemResources {
pub cpu_usage_percent: f64,
pub memory_usage_percent: f64,
pub disk_usage_percent: f64,
pub network_status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub requests_per_minute: u32,
pub average_response_time_ms: f64,
pub error_rate_percent: f64,
pub uptime_hours: f64,
}
/// 获取系统统计信息
#[tauri::command]
pub async fn get_system_statistics(
state: State<'_, AppState>,
) -> Result<SystemStatistics, String> {
info!("获取系统统计信息");
// 获取工作流执行记录仓库
let repo_guard = state.get_workflow_execution_record_repository()
.map_err(|e| format!("获取工作流执行记录仓库失败: {}", e))?;
let repo = repo_guard.as_ref()
.ok_or_else(|| "工作流执行记录仓库未初始化".to_string())?;
// 获取环境仓库
let env_repo_guard = state.get_workflow_execution_environment_repository()
.map_err(|e| format!("获取执行环境仓库失败: {}", e))?;
let env_repo = env_repo_guard.as_ref()
.ok_or_else(|| "执行环境仓库未初始化".to_string())?;
// 获取今天的执行统计
let today = chrono::Utc::now().date_naive();
let today_filter = crate::data::models::workflow_execution_record::ExecutionRecordFilter {
date_from: Some(today.and_hms_opt(0, 0, 0).unwrap().and_utc()),
date_to: Some(today.and_hms_opt(23, 59, 59).unwrap().and_utc()),
..Default::default()
};
let execution_stats_result = repo.get_execution_statistics(Some(&today_filter))
.map_err(|e| format!("获取执行统计失败: {}", e))?;
// 获取环境统计
let environments = env_repo.find_all(None)
.map_err(|e| format!("获取环境列表失败: {}", e))?;
let healthy_envs = environments.iter()
.filter(|env| env.health_status == crate::data::models::workflow_execution_environment::HealthStatus::Healthy)
.count() as u32;
let total_capacity: u32 = environments.iter()
.map(|env| env.max_concurrent_jobs as u32)
.sum();
// 获取当前活跃执行
let active_filter = crate::data::models::workflow_execution_record::ExecutionRecordFilter {
status: Some(crate::data::models::workflow_execution_record::ExecutionStatus::Running),
..Default::default()
};
let active_executions = repo.find_all(Some(&active_filter))
.map_err(|e| format!("获取活跃执行失败: {}", e))?;
// 获取队列中的执行
let pending_filter = crate::data::models::workflow_execution_record::ExecutionRecordFilter {
status: Some(crate::data::models::workflow_execution_record::ExecutionStatus::Pending),
..Default::default()
};
let pending_executions = repo.find_all(Some(&pending_filter))
.map_err(|e| format!("获取队列执行失败: {}", e))?;
// 计算系统资源使用情况(简化实现)
let cpu_usage = get_cpu_usage().unwrap_or(0.0);
let memory_usage = get_memory_usage().unwrap_or(0.0);
let disk_usage = get_disk_usage().unwrap_or(0.0);
Ok(SystemStatistics {
execution_stats: ExecutionStats {
total_executions_today: execution_stats_result.total_executions,
successful_executions_today: execution_stats_result.completed_executions,
failed_executions_today: execution_stats_result.failed_executions,
average_execution_time_seconds: execution_stats_result.average_duration_seconds.unwrap_or(0.0),
current_queue_size: pending_executions.len() as u32,
active_executions: active_executions.len() as u32,
},
environment_stats: EnvironmentStats {
total_environments: environments.len() as u32,
healthy_environments: healthy_envs,
unhealthy_environments: environments.len() as u32 - healthy_envs,
average_response_time_ms: environments.iter()
.filter_map(|env| env.average_response_time_ms)
.map(|t| t as f64)
.sum::<f64>() / environments.len().max(1) as f64,
total_capacity,
used_capacity: active_executions.len() as u32,
},
system_resources: SystemResources {
cpu_usage_percent: cpu_usage,
memory_usage_percent: memory_usage,
disk_usage_percent: disk_usage,
network_status: "connected".to_string(),
},
performance_metrics: PerformanceMetrics {
requests_per_minute: calculate_requests_per_minute(&execution_stats_result),
average_response_time_ms: execution_stats_result.average_duration_seconds.unwrap_or(0.0) * 1000.0,
error_rate_percent: if execution_stats_result.total_executions > 0 {
(execution_stats_result.failed_executions as f64 / execution_stats_result.total_executions as f64) * 100.0
} else {
0.0
},
uptime_hours: get_uptime_hours(),
},
})
}
/// 获取存储统计信息
#[tauri::command]
pub async fn get_storage_statistics(
state: State<'_, AppState>,
) -> Result<crate::business::services::workflow_result_service::StorageStatistics, String> {
info!("获取存储统计信息");
// TODO: 从AppState获取WorkflowResultService实例
// 暂时返回空统计
Ok(crate::business::services::workflow_result_service::StorageStatistics {
total_files: 0,
total_size_mb: 0.0,
oldest_file_date: None,
newest_file_date: None,
files_by_type: std::collections::HashMap::new(),
size_by_type: std::collections::HashMap::new(),
})
}
/// 清理过期结果文件
#[tauri::command]
pub async fn cleanup_expired_results(
state: State<'_, AppState>,
) -> Result<crate::business::services::workflow_result_service::CleanupResult, String> {
info!("清理过期结果文件");
// TODO: 从AppState获取WorkflowResultService实例
// 暂时返回空结果
Ok(crate::business::services::workflow_result_service::CleanupResult {
files_deleted: 0,
space_freed_mb: 0.0,
errors: Vec::new(),
})
}
/// 批量清理执行结果
#[tauri::command]
pub async fn batch_cleanup_execution_results(
execution_ids: Vec<i64>,
state: State<'_, AppState>,
) -> Result<crate::business::services::workflow_result_service::CleanupResult, String> {
info!("批量清理执行结果: {:?}", execution_ids);
// TODO: 从AppState获取WorkflowResultService实例
// 暂时返回空结果
Ok(crate::business::services::workflow_result_service::CleanupResult {
files_deleted: 0,
space_freed_mb: 0.0,
errors: Vec::new(),
})
}
// 辅助函数获取CPU使用率
fn get_cpu_usage() -> Option<f64> {
use sysinfo::System;
let mut system = System::new_all();
system.refresh_cpu();
// 等待一小段时间以获得准确的CPU使用率
std::thread::sleep(std::time::Duration::from_millis(200));
system.refresh_cpu();
let cpu_usage: f64 = system.cpus().iter()
.map(|cpu| cpu.cpu_usage() as f64)
.sum::<f64>() / system.cpus().len() as f64;
Some(cpu_usage)
}
// 辅助函数:获取内存使用率
fn get_memory_usage() -> Option<f64> {
use sysinfo::System;
let mut system = System::new_all();
system.refresh_memory();
let total_memory = system.total_memory();
let used_memory = system.used_memory();
if total_memory > 0 {
Some((used_memory as f64 / total_memory as f64) * 100.0)
} else {
None
}
}
// 辅助函数:获取磁盘使用率
fn get_disk_usage() -> Option<f64> {
use sysinfo::Disks;
let disks = Disks::new_with_refreshed_list();
// 获取主磁盘的使用率
if let Some(disk) = disks.iter().next() {
let total_space = disk.total_space();
let available_space = disk.available_space();
let used_space = total_space - available_space;
if total_space > 0 {
Some((used_space as f64 / total_space as f64) * 100.0)
} else {
None
}
} else {
None
}
}
// 辅助函数:计算每分钟请求数
fn calculate_requests_per_minute(stats: &crate::data::repositories::workflow_execution_record_repository::ExecutionStatistics) -> u32 {
// 基于今天的总执行数估算每分钟请求数
let total_today = stats.total_executions;
let hours_passed = chrono::Utc::now().hour() + 1; // 至少1小时
let minutes_passed = hours_passed * 60;
if minutes_passed > 0 {
(total_today * 60) / minutes_passed
} else {
0
}
}
// 辅助函数:获取系统运行时间
fn get_uptime_hours() -> f64 {
use sysinfo::System;
System::uptime() as f64 / 3600.0 // 转换为小时
}