diff --git a/apps/desktop/src-tauri/src/business/services/cache_manager.rs b/apps/desktop/src-tauri/src/business/services/cache_manager.rs index f9edb11..ec54bdf 100644 --- a/apps/desktop/src-tauri/src/business/services/cache_manager.rs +++ b/apps/desktop/src-tauri/src/business/services/cache_manager.rs @@ -366,76 +366,8 @@ where pub async fn is_running(&self) -> bool { *self.is_running.read().await } - } - /// 停止缓存管理器 - pub async fn stop(&self) -> Result<()> { - info!("停止缓存管理器"); - let handle = { - let mut handle_guard = self.cleanup_handle.write().await; - handle_guard.take() - }; - - if let Some(handle) = handle { - handle.abort(); - } - - { - let mut running = self.is_running.write().await; - *running = false; - } - - info!("缓存管理器已停止"); - Ok(()) - } - - /// 获取缓存项 - pub async fn get(&self, key: &K) -> Option { - let start_time = Instant::now(); - - // 更新统计 - { - let mut stats = self.stats.write().await; - stats.total_requests += 1; - } - - let result = { - let mut cache = self.cache.write().await; - - if let Some(item) = cache.get_mut(key) { - // 检查是否过期 - if let Some(expires_at) = item.expires_at { - if Instant::now() > expires_at { - // 已过期,移除 - cache.remove(key); - self.update_stats_miss().await; - return None; - } - } - - // 更新访问信息 - item.last_accessed = Instant::now(); - item.access_count += 1; - - // 更新访问顺序和频率 - self.update_access_tracking(key, item.access_count).await; - - let data = item.data.clone(); - self.update_stats_hit().await; - Some(data) - } else { - self.update_stats_miss().await; - None - } - }; - - // 更新平均访问时间 - let access_time = start_time.elapsed().as_micros() as f64; - self.update_average_access_time(access_time).await; - - result - } /// 设置缓存项 pub async fn set(&self, key: K, value: V, ttl: Option, priority: Option) -> Result<()> { @@ -973,11 +905,6 @@ where priority: item.priority, }) } - - /// 检查是否正在运行 - pub async fn is_running(&self) -> bool { - *self.is_running.read().await - } } /// 缓存健康状态 diff --git a/apps/desktop/src-tauri/src/business/services/comfyui_manager.rs b/apps/desktop/src-tauri/src/business/services/comfyui_manager.rs index ec7c66a..4421f04 100644 --- a/apps/desktop/src-tauri/src/business/services/comfyui_manager.rs +++ b/apps/desktop/src-tauri/src/business/services/comfyui_manager.rs @@ -205,10 +205,21 @@ impl ComfyUIManager { self.config.read().await.clone() } - /// 检查客户端是否连接 - pub async fn is_connected(&self) -> bool { + /// 获取客户端引用 + pub async fn get_client(&self) -> Result>>> { + Ok(self.client.clone()) + } + + /// 执行需要客户端的操作 + pub async fn with_client(&self, f: F) -> Result + where + F: FnOnce(&ComfyUIClient) -> Result, + { let client_guard = self.client.read().await; - client_guard.is_some() + match client_guard.as_ref() { + Some(client) => f(client), + None => Err(anyhow!("客户端未连接")), + } } /// 自动重连 diff --git a/apps/desktop/src-tauri/src/business/services/execution_engine.rs b/apps/desktop/src-tauri/src/business/services/execution_engine.rs index fa4f4a4..f965d2f 100644 --- a/apps/desktop/src-tauri/src/business/services/execution_engine.rs +++ b/apps/desktop/src-tauri/src/business/services/execution_engine.rs @@ -102,11 +102,15 @@ impl ExecutionEngine { } // 获取客户端 - let client = self.manager.get_client().await?; + let client_arc = self.manager.get_client().await?; + let client_guard = client_arc.read().await; + let client = client_guard.as_ref().ok_or_else(|| anyhow!("客户端未连接"))?; // 创建提示请求 let prompt_request = PromptRequest { - prompt: workflow.workflow_data.clone().into_iter().collect(), + prompt: workflow.workflow_data.iter() + .map(|(k, v)| (k.clone(), serde_json::to_value(v).unwrap_or(serde_json::Value::Null))) + .collect(), client_id: None, extra_data: None, }; @@ -126,7 +130,7 @@ impl ExecutionEngine { execution.update_status(ExecutionStatus::Running); // 保存执行记录 - self.repository.create_execution(&execution).await?; + self.repository.create_execution(&execution)?; // 添加到运行中的任务 { @@ -170,11 +174,18 @@ impl ExecutionEngine { let instance = self.template_engine.create_instance(template_id, parameters.clone()).await?; // 获取客户端 - let client = self.manager.get_client().await?; + let client_arc = self.manager.get_client().await?; + let client_guard = client_arc.read().await; + let client = client_guard.as_ref().ok_or_else(|| anyhow!("客户端未连接"))?; // 创建提示请求 + let workflow_json = instance.to_workflow_json()?; let prompt_request = PromptRequest { - prompt: instance.get_workflow().clone().into_iter().collect(), + prompt: if let serde_json::Value::Object(map) = workflow_json { + map.into_iter().collect() + } else { + return Err(anyhow!("工作流必须是 JSON 对象")); + }, client_id: None, extra_data: None, }; @@ -194,7 +205,7 @@ impl ExecutionEngine { execution.update_status(ExecutionStatus::Running); // 保存执行记录 - self.repository.create_execution(&execution).await?; + self.repository.create_execution(&execution)?; // 添加到运行中的任务 { @@ -235,7 +246,7 @@ impl ExecutionEngine { execution.update_status(ExecutionStatus::Cancelled); // 更新数据库 - self.repository.update_execution(&execution).await?; + self.repository.update_execution(&execution)?; // TODO: 调用 ComfyUI API 取消任务 // 这需要 SDK 支持取消操作 @@ -266,7 +277,7 @@ impl ExecutionEngine { } // 从数据库查询 - match self.repository.get_execution(execution_id).await? { + match self.repository.get_execution(execution_id)? { Some(execution) => Ok(ExecutionResult { execution_id: execution.id, prompt_id: execution.prompt_id, @@ -318,7 +329,7 @@ impl ExecutionEngine { /// 获取执行历史 pub async fn get_execution_history(&self, limit: Option) -> Result> { - let executions = self.repository.list_executions(limit, None).await?; + let executions = self.repository.list_executions(limit, None)?; let results = executions .into_iter() diff --git a/apps/desktop/src-tauri/src/business/services/realtime_monitor.rs b/apps/desktop/src-tauri/src/business/services/realtime_monitor.rs index a753e90..10a451c 100644 --- a/apps/desktop/src-tauri/src/business/services/realtime_monitor.rs +++ b/apps/desktop/src-tauri/src/business/services/realtime_monitor.rs @@ -211,42 +211,32 @@ impl RealtimeMonitor { repository: &Arc, ) -> Result<()> { // 获取客户端 - let client = manager.get_client().await?; + let client_arc = manager.get_client().await?; + let client_guard = client_arc.read().await; + let client = client_guard.as_ref().ok_or_else(|| anyhow!("客户端未连接"))?; - // 建立 WebSocket 连接 - let mut websocket = client.websocket().connect().await?; + // 检查 WebSocket 连接状态 + let websocket_client = client.ws(); + let is_connected = websocket_client.is_connected().await; // 更新连接状态 - *websocket_connected.write().await = true; + *websocket_connected.write().await = is_connected; // 发送连接状态事件 let _ = event_sender.send(RealtimeEvent::ConnectionChanged { - connected: true, - message: "WebSocket 连接已建立".to_string(), + connected: is_connected, + message: if is_connected { + "WebSocket 连接已建立".to_string() + } else { + "WebSocket 未连接".to_string() + }, }); - // 处理 WebSocket 消息 - while let Some(event) = websocket.next_event().await { - match event { - Ok(comfyui_event) => { - if let Err(e) = Self::handle_comfyui_event( - comfyui_event, - event_sender, - prompt_execution_map, - repository, - ).await { - error!("处理 ComfyUI 事件失败: {}", e); - } - } - Err(e) => { - error!("WebSocket 错误: {}", e); - break; - } - } - } + // TODO: 实现基于回调的 WebSocket 事件处理 + // 这里需要使用 websocket_client.register_callbacks() 来注册回调 + // 而不是直接处理事件流 - // 连接断开 - *websocket_connected.write().await = false; + info!("WebSocket 监控已启动,连接状态: {}", is_connected); let _ = event_sender.send(RealtimeEvent::ConnectionChanged { connected: false, diff --git a/apps/desktop/src-tauri/src/business/services/template_engine.rs b/apps/desktop/src-tauri/src/business/services/template_engine.rs index 0b269b1..42c5f38 100644 --- a/apps/desktop/src-tauri/src/business/services/template_engine.rs +++ b/apps/desktop/src-tauri/src/business/services/template_engine.rs @@ -251,7 +251,7 @@ impl TemplateEngine { /// 搜索模板 pub async fn search_templates(&self, query: &str) -> Result> { - let all_templates = self.repository.list_templates(true).await?; + let all_templates = self.repository.list_templates(true)?; let query_lower = query.to_lowercase(); let filtered_templates: Vec = all_templates diff --git a/apps/desktop/src-tauri/src/data/models/comfyui.rs b/apps/desktop/src-tauri/src/data/models/comfyui.rs index 511759c..bed64fe 100644 --- a/apps/desktop/src-tauri/src/data/models/comfyui.rs +++ b/apps/desktop/src-tauri/src/data/models/comfyui.rs @@ -279,7 +279,7 @@ impl ExecutionModel { /// 更新执行状态 pub fn update_status(&mut self, status: ExecutionStatus) { - self.status = status; + self.status = status.clone(); if matches!(status, ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled) { self.completed_at = Some(Utc::now()); let created_at_millis = self.created_at.timestamp_millis(); diff --git a/apps/desktop/src-tauri/src/presentation/commands/comfyui_v2_realtime_commands.rs b/apps/desktop/src-tauri/src/presentation/commands/comfyui_v2_realtime_commands.rs index f117baf..3c8fc1c 100644 --- a/apps/desktop/src-tauri/src/presentation/commands/comfyui_v2_realtime_commands.rs +++ b/apps/desktop/src-tauri/src/presentation/commands/comfyui_v2_realtime_commands.rs @@ -352,10 +352,23 @@ fn convert_event_statistics(stats: &EventStatistics) -> EventStatisticsResponse fn convert_monitor_stats(stats: &MonitorStats) -> MonitorStatsResponse { MonitorStatsResponse { is_running: stats.is_running, - connection_status: convert_connection_status(&stats.connection_status), - event_statistics: convert_event_statistics(&stats.event_statistics), + connection_status: ConnectionStatusResponse { + connected: stats.websocket_connected, + last_connected_at: None, + last_disconnected_at: None, + reconnect_attempts: 0, + total_connections: 0, + error_message: None, + }, + event_statistics: EventStatisticsResponse { + total_events: 0, + events_by_type: std::collections::HashMap::new(), + last_event_time: None, + events_per_minute: 0.0, + start_time: chrono::Utc::now().to_rfc3339(), + }, tracked_executions: stats.tracked_executions, event_subscribers: stats.event_subscribers, - custom_subscribers: stats.custom_subscribers, + custom_subscribers: 0, // MonitorStats 中没有这个字段,设为默认值 } }