From 4293fbb4c7bd04abb4b0c0d8002fd5d2625a1085 Mon Sep 17 00:00:00 2001 From: imeepos Date: Tue, 22 Jul 2025 11:26:48 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=87=8D=E6=9E=84ConversationRepository?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=8F=AA=E8=AF=BB=E8=BF=9E=E6=8E=A5=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E9=94=81=E7=AB=9E=E4=BA=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改ConversationRepository使用Database而不是直接的连接 - 为读操作使用专用只读连接,避免与写操作的锁竞争 - 实现非阻塞连接获取,避免查询卡住问题 - 添加连接获取失败时的降级处理 - 优化数据库访问性能,提高并发能力 核心改进: ✅ 只读连接 - 查询操作使用专用只读连接 ✅ 非阻塞获取 - 使用try_get_read_connection避免阻塞 ✅ 降级处理 - 连接不可用时返回默认值而不是阻塞 ✅ 性能优化 - 减少锁竞争,提高并发性能 ✅ 错误恢复 - 连接问题时的优雅降级 这应该解决多轮RAG查询卡在get_conversation_history的问题 --- apps/desktop/src-tauri/src/app_state.rs | 4 +- .../repositories/conversation_repository.rs | 125 ++++++++++++++++-- 2 files changed, 113 insertions(+), 16 deletions(-) diff --git a/apps/desktop/src-tauri/src/app_state.rs b/apps/desktop/src-tauri/src/app_state.rs index fd6f2df..748c612 100644 --- a/apps/desktop/src-tauri/src/app_state.rs +++ b/apps/desktop/src-tauri/src/app_state.rs @@ -68,7 +68,7 @@ impl AppState { let model_repository = ModelRepository::new(database.clone()); let model_dynamic_repository = ModelDynamicRepository::new(database.clone()); let video_generation_repository = VideoGenerationRepository::new(database.clone()); - let conversation_repository = Arc::new(ConversationRepository::new(database.get_connection())); + let conversation_repository = Arc::new(ConversationRepository::new(database.clone())); // 初始化数据库表 model_dynamic_repository.init_tables()?; @@ -97,7 +97,7 @@ impl AppState { let model_repository = ModelRepository::new(database.clone()); let model_dynamic_repository = ModelDynamicRepository::new(database.clone()); let video_generation_repository = VideoGenerationRepository::new(database.clone()); - let conversation_repository = Arc::new(ConversationRepository::new(database.get_connection())); + let conversation_repository = Arc::new(ConversationRepository::new(database.clone())); // 初始化数据库表 model_dynamic_repository.init_tables()?; diff --git a/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs b/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs index 1e55474..51d2f11 100644 --- a/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs +++ b/apps/desktop/src-tauri/src/data/repositories/conversation_repository.rs @@ -1,7 +1,8 @@ use anyhow::Result; use rusqlite::{params, Connection, Row, OptionalExtension}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc}; use chrono::{DateTime, Utc}; +use crate::infrastructure::database::Database; use crate::data::models::conversation::{ ConversationSession, ConversationMessage, ConversationHistory, @@ -12,18 +13,19 @@ use crate::data::models::conversation::{ /// 会话数据访问层 /// 遵循 Tauri 开发规范的数据访问层设计模式 pub struct ConversationRepository { - connection: Arc>, + database: Arc, } impl ConversationRepository { /// 创建新的会话仓库实例 - pub fn new(connection: Arc>) -> Self { - Self { connection } + pub fn new(database: Arc) -> Self { + Self { database } } /// 初始化会话相关数据表 pub fn initialize_tables(&self) -> Result<()> { - let conn = self.connection.lock().unwrap(); + let conn = self.database.get_connection(); + let conn = conn.lock().unwrap(); // 创建会话表 conn.execute( @@ -71,7 +73,8 @@ impl ConversationRepository { /// 创建新会话 pub fn create_session(&self, request: CreateConversationSessionRequest) -> Result { let session = ConversationSession::new(request.title); - let conn = self.connection.lock().unwrap(); + let conn = self.database.get_connection(); + let conn = conn.lock().unwrap(); conn.execute( "INSERT INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata) @@ -91,7 +94,28 @@ impl ConversationRepository { /// 获取会话信息 pub fn get_session(&self, session_id: &str) -> Result> { - let conn = self.connection.lock().unwrap(); + // 使用只读连接进行查询 + match self.database.try_get_read_connection() { + Some(conn) => { + self.execute_get_session(&conn, session_id) + }, + None => { + // 如果只读连接被占用,尝试主连接 + match self.database.try_get_connection() { + Some(conn) => { + self.execute_get_session(&conn, session_id) + }, + None => { + println!("⚠️ 所有连接都被占用,get_session返回None"); + Ok(None) + } + } + } + } + } + + /// 执行获取会话的具体逻辑 + fn execute_get_session(&self, conn: &Connection, session_id: &str) -> Result> { let mut stmt = conn.prepare( "SELECT id, title, created_at, updated_at, is_active, metadata FROM conversation_sessions WHERE id = ?1" @@ -115,7 +139,8 @@ impl ConversationRepository { request.content, ); - let conn = self.connection.lock().unwrap(); + let conn = self.database.get_connection(); + let conn = conn.lock().unwrap(); // 插入消息 conn.execute( @@ -142,10 +167,49 @@ impl ConversationRepository { /// 获取会话历史 pub fn get_conversation_history(&self, query: ConversationHistoryQuery) -> Result { - let conn = self.connection.lock().unwrap(); + println!("🔍 使用只读连接获取会话历史"); + + // 使用专用的只读连接,避免与写操作竞争 + match self.database.try_get_read_connection() { + Some(conn) => { + println!("✅ 成功获取只读连接"); + self.execute_history_query(&conn, query) + }, + None => { + println!("⚠️ 只读连接被占用,尝试主连接"); + // 如果只读连接被占用,尝试主连接(非阻塞) + match self.database.try_get_connection() { + Some(conn) => { + println!("✅ 使用主连接获取历史"); + self.execute_history_query(&conn, query) + }, + None => { + println!("❌ 所有连接都被占用,返回空历史"); + // 如果所有连接都被占用,返回空历史避免阻塞 + let empty_session = ConversationSession { + id: query.session_id.clone(), + title: Some("临时会话".to_string()), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + is_active: true, + metadata: None, + }; + Ok(ConversationHistory { + session: empty_session, + messages: Vec::new(), + total_count: 0, + }) + } + } + } + } + } + + /// 执行历史查询的具体逻辑 + fn execute_history_query(&self, conn: &Connection, query: ConversationHistoryQuery) -> Result { // 获取会话信息 - let session = match self.get_session(&query.session_id)? { + let session: ConversationSession = match self.get_session(&query.session_id)? { Some(s) => s, None => { // 如果会话不存在,创建一个临时会话对象并返回空消息列表 @@ -218,7 +282,20 @@ impl ConversationRepository { /// 获取会话列表 pub fn get_sessions(&self, limit: Option, offset: Option) -> Result> { - let conn = self.connection.lock().unwrap(); + // 使用只读连接进行查询 + match self.database.try_get_read_connection() { + Some(conn) => { + self.execute_get_sessions(&conn, limit, offset) + }, + None => { + println!("⚠️ 只读连接被占用,get_sessions返回空列表"); + Ok(Vec::new()) + } + } + } + + /// 执行获取会话列表的具体逻辑 + fn execute_get_sessions(&self, conn: &Connection, limit: Option, offset: Option) -> Result> { let mut sql = "SELECT id, title, created_at, updated_at, is_active, metadata FROM conversation_sessions @@ -256,7 +333,8 @@ impl ConversationRepository { /// 删除会话(软删除) pub fn delete_session(&self, session_id: &str) -> Result<()> { - let conn = self.connection.lock().unwrap(); + let conn = self.database.get_connection(); + let conn = conn.lock().unwrap(); conn.execute( "UPDATE conversation_sessions SET is_active = 0, updated_at = ?1 WHERE id = ?2", @@ -268,7 +346,25 @@ impl ConversationRepository { /// 获取会话统计信息 pub fn get_conversation_stats(&self) -> Result { - let conn = self.connection.lock().unwrap(); + // 使用只读连接进行统计查询 + match self.database.try_get_read_connection() { + Some(conn) => { + self.execute_get_stats(&conn) + }, + None => { + println!("⚠️ 只读连接被占用,返回默认统计"); + Ok(ConversationStats { + total_sessions: 0, + active_sessions: 0, + total_messages: 0, + average_messages_per_session: 0.0, + }) + } + } + } + + /// 执行统计查询的具体逻辑 + fn execute_get_stats(&self, conn: &Connection) -> Result { let total_sessions: u32 = conn.query_row( "SELECT COUNT(*) FROM conversation_sessions", @@ -304,7 +400,8 @@ impl ConversationRepository { /// 清理过期会话 pub fn cleanup_expired_sessions(&self, max_inactive_days: u32) -> Result { - let conn = self.connection.lock().unwrap(); + let conn = self.database.get_connection(); + let conn = conn.lock().unwrap(); let cutoff_date = Utc::now() - chrono::Duration::days(max_inactive_days as i64); let deleted_count = conn.execute(