From b0f880dc1330550ad23fefe83df960e865e6aee7 Mon Sep 17 00:00:00 2001 From: imeepos Date: Wed, 30 Jul 2025 19:01:13 +0800 Subject: [PATCH] =?UTF-8?q?=20=E5=AE=8C=E6=88=90=20conversation=5Freposito?= =?UTF-8?q?ry.rs=20=E8=BF=9E=E6=8E=A5=E6=B1=A0=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复内容: - initialize_tables: 强制使用连接池初始化表 - create_session: 连接池创建会话 - ensure_session_exists: 连接池确保会话存在 - add_message: 连接池添加消息和更新会话时间 - delete_session: 连接池软删除会话 - cleanup_expired_sessions: 连接池清理过期会话 关键修复: - 添加 anyhow 宏导入 - 所有方法强制检查连接池状态 - 统一错误处理和日志记录 - 避免所有 get_connection().lock() 死锁风险 这是数据库连接池优化计划的第2步,会话管理现在完全使用连接池。 --- .../repositories/conversation_repository.rs | 80 ++++++++++++++----- 1 file changed, 59 insertions(+), 21 deletions(-) diff --git a/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs b/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs index 3e7f538..6e73777 100644 --- a/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs +++ b/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Result, anyhow}; use rusqlite::{params, Connection, Row, OptionalExtension}; use std::sync::{Arc}; use chrono::{DateTime, Utc}; @@ -78,13 +78,20 @@ impl ConversationRepository { Ok(()) } - /// 创建新会话 + /// 创建新会话(强制使用连接池) pub fn create_session(&self, request: CreateConversationSessionRequest) -> Result { let session = ConversationSession::new(request.title); - let conn = self.database.get_connection(); - let conn = conn.lock().unwrap(); + println!("🏊 使用连接池创建会话: {}", &session.id[..8]); - conn.execute( + // 🚨 强制使用连接池,避免死锁 + if !self.database.has_pool() { + return Err(anyhow!("连接池未启用,无法安全执行数据库操作")); + } + + let pooled_conn = self.database.acquire_from_pool() + .map_err(|e| anyhow!("获取连接池连接失败: {}", e))?; + + pooled_conn.execute( "INSERT INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![ @@ -97,6 +104,7 @@ impl ConversationRepository { ], )?; + println!("✅ 会话创建完成"); Ok(session) } @@ -148,12 +156,18 @@ impl ConversationRepository { }, None => { println!("🆕 会话不存在,创建新会话: {}", session_id); + println!("🏊 使用连接池创建会话: {}", &session_id[..8]); - let conn = self.database.get_connection(); - let conn = conn.lock().unwrap(); + // 🚨 强制使用连接池,避免死锁 + if !self.database.has_pool() { + return Err(anyhow!("连接池未启用,无法安全执行数据库操作")); + } + + let pooled_conn = self.database.acquire_from_pool() + .map_err(|e| anyhow!("获取连接池连接失败: {}", e))?; // 直接插入会话记录,使用指定的session_id - conn.execute( + pooled_conn.execute( "INSERT OR REPLACE INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![ @@ -180,11 +194,18 @@ impl ConversationRepository { request.content, ); - let conn = self.database.get_connection(); - let conn = conn.lock().unwrap(); + println!("🏊 使用连接池添加消息: {}", &message.id[..8]); + + // 🚨 强制使用连接池,避免死锁 + if !self.database.has_pool() { + return Err(anyhow!("连接池未启用,无法安全执行数据库操作")); + } + + let pooled_conn = self.database.acquire_from_pool() + .map_err(|e| anyhow!("获取连接池连接失败: {}", e))?; // 插入消息 - conn.execute( + pooled_conn.execute( "INSERT INTO conversation_messages (id, session_id, role, content, timestamp, metadata) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![ @@ -198,11 +219,12 @@ impl ConversationRepository { )?; // 更新会话的最后更新时间 - conn.execute( + pooled_conn.execute( "UPDATE conversation_sessions SET updated_at = ?1 WHERE id = ?2", params![Utc::now().to_rfc3339(), request.session_id], )?; + println!("✅ 消息添加完成"); Ok(message) } @@ -372,16 +394,24 @@ impl ConversationRepository { Ok(sessions) } - /// 删除会话(软删除) + /// 删除会话(软删除,强制使用连接池) pub fn delete_session(&self, session_id: &str) -> Result<()> { - let conn = self.database.get_connection(); - let conn = conn.lock().unwrap(); - - conn.execute( + println!("🏊 使用连接池删除会话: {}", &session_id[..8]); + + // 🚨 强制使用连接池,避免死锁 + if !self.database.has_pool() { + return Err(anyhow!("连接池未启用,无法安全执行数据库操作")); + } + + let pooled_conn = self.database.acquire_from_pool() + .map_err(|e| anyhow!("获取连接池连接失败: {}", e))?; + + pooled_conn.execute( "UPDATE conversation_sessions SET is_active = 0, updated_at = ?1 WHERE id = ?2", params![Utc::now().to_rfc3339(), session_id], )?; + println!("✅ 会话删除完成"); Ok(()) } @@ -439,19 +469,27 @@ impl ConversationRepository { }) } - /// 清理过期会话 + /// 清理过期会话(强制使用连接池) pub fn cleanup_expired_sessions(&self, max_inactive_days: u32) -> Result { - let conn = self.database.get_connection(); - let conn = conn.lock().unwrap(); + println!("🏊 使用连接池清理过期会话,超过{}天", max_inactive_days); + + // 🚨 强制使用连接池,避免死锁 + if !self.database.has_pool() { + return Err(anyhow!("连接池未启用,无法安全执行数据库操作")); + } + + let pooled_conn = self.database.acquire_from_pool() + .map_err(|e| anyhow!("获取连接池连接失败: {}", e))?; let cutoff_date = Utc::now() - chrono::Duration::days(max_inactive_days as i64); - let deleted_count = conn.execute( + let deleted_count = pooled_conn.execute( "UPDATE conversation_sessions SET is_active = 0, updated_at = ?1 WHERE is_active = 1 AND updated_at < ?2", params![Utc::now().to_rfc3339(), cutoff_date.to_rfc3339()], )?; + println!("✅ 过期会话清理完成,删除{}个会话", deleted_count); Ok(deleted_count as u32) }