fix: 重构ConversationRepository使用只读连接避免锁竞争

- 修改ConversationRepository使用Database而不是直接的连接
- 为读操作使用专用只读连接,避免与写操作的锁竞争
- 实现非阻塞连接获取,避免查询卡住问题
- 添加连接获取失败时的降级处理
- 优化数据库访问性能,提高并发能力

核心改进:
 只读连接 - 查询操作使用专用只读连接
 非阻塞获取 - 使用try_get_read_connection避免阻塞
 降级处理 - 连接不可用时返回默认值而不是阻塞
 性能优化 - 减少锁竞争,提高并发性能
 错误恢复 - 连接问题时的优雅降级

这应该解决多轮RAG查询卡在get_conversation_history的问题
This commit is contained in:
imeepos 2025-07-22 11:26:48 +08:00
parent dca2c15bb9
commit 4293fbb4c7
2 changed files with 113 additions and 16 deletions

View File

@ -68,7 +68,7 @@ impl AppState {
let model_repository = ModelRepository::new(database.clone());
let model_dynamic_repository = ModelDynamicRepository::new(database.clone());
let video_generation_repository = VideoGenerationRepository::new(database.clone());
let conversation_repository = Arc::new(ConversationRepository::new(database.get_connection()));
let conversation_repository = Arc::new(ConversationRepository::new(database.clone()));
// 初始化数据库表
model_dynamic_repository.init_tables()?;
@ -97,7 +97,7 @@ impl AppState {
let model_repository = ModelRepository::new(database.clone());
let model_dynamic_repository = ModelDynamicRepository::new(database.clone());
let video_generation_repository = VideoGenerationRepository::new(database.clone());
let conversation_repository = Arc::new(ConversationRepository::new(database.get_connection()));
let conversation_repository = Arc::new(ConversationRepository::new(database.clone()));
// 初始化数据库表
model_dynamic_repository.init_tables()?;

View File

@ -1,7 +1,8 @@
use anyhow::Result;
use rusqlite::{params, Connection, Row, OptionalExtension};
use std::sync::{Arc, Mutex};
use std::sync::{Arc};
use chrono::{DateTime, Utc};
use crate::infrastructure::database::Database;
use crate::data::models::conversation::{
ConversationSession, ConversationMessage, ConversationHistory,
@ -12,18 +13,19 @@ use crate::data::models::conversation::{
/// 会话数据访问层
/// 遵循 Tauri 开发规范的数据访问层设计模式
pub struct ConversationRepository {
connection: Arc<Mutex<Connection>>,
database: Arc<Database>,
}
impl ConversationRepository {
/// 创建新的会话仓库实例
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
Self { connection }
pub fn new(database: Arc<Database>) -> Self {
Self { database }
}
/// 初始化会话相关数据表
pub fn initialize_tables(&self) -> Result<()> {
let conn = self.connection.lock().unwrap();
let conn = self.database.get_connection();
let conn = conn.lock().unwrap();
// 创建会话表
conn.execute(
@ -71,7 +73,8 @@ impl ConversationRepository {
/// 创建新会话
pub fn create_session(&self, request: CreateConversationSessionRequest) -> Result<ConversationSession> {
let session = ConversationSession::new(request.title);
let conn = self.connection.lock().unwrap();
let conn = self.database.get_connection();
let conn = conn.lock().unwrap();
conn.execute(
"INSERT INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata)
@ -91,7 +94,28 @@ impl ConversationRepository {
/// 获取会话信息
pub fn get_session(&self, session_id: &str) -> Result<Option<ConversationSession>> {
let conn = self.connection.lock().unwrap();
// 使用只读连接进行查询
match self.database.try_get_read_connection() {
Some(conn) => {
self.execute_get_session(&conn, session_id)
},
None => {
// 如果只读连接被占用,尝试主连接
match self.database.try_get_connection() {
Some(conn) => {
self.execute_get_session(&conn, session_id)
},
None => {
println!("⚠️ 所有连接都被占用get_session返回None");
Ok(None)
}
}
}
}
}
/// 执行获取会话的具体逻辑
fn execute_get_session(&self, conn: &Connection, session_id: &str) -> Result<Option<ConversationSession>> {
let mut stmt = conn.prepare(
"SELECT id, title, created_at, updated_at, is_active, metadata
FROM conversation_sessions WHERE id = ?1"
@ -115,7 +139,8 @@ impl ConversationRepository {
request.content,
);
let conn = self.connection.lock().unwrap();
let conn = self.database.get_connection();
let conn = conn.lock().unwrap();
// 插入消息
conn.execute(
@ -142,10 +167,49 @@ impl ConversationRepository {
/// 获取会话历史
pub fn get_conversation_history(&self, query: ConversationHistoryQuery) -> Result<ConversationHistory> {
let conn = self.connection.lock().unwrap();
println!("🔍 使用只读连接获取会话历史");
// 使用专用的只读连接,避免与写操作竞争
match self.database.try_get_read_connection() {
Some(conn) => {
println!("✅ 成功获取只读连接");
self.execute_history_query(&conn, query)
},
None => {
println!("⚠️ 只读连接被占用,尝试主连接");
// 如果只读连接被占用,尝试主连接(非阻塞)
match self.database.try_get_connection() {
Some(conn) => {
println!("✅ 使用主连接获取历史");
self.execute_history_query(&conn, query)
},
None => {
println!("❌ 所有连接都被占用,返回空历史");
// 如果所有连接都被占用,返回空历史避免阻塞
let empty_session = ConversationSession {
id: query.session_id.clone(),
title: Some("临时会话".to_string()),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
is_active: true,
metadata: None,
};
Ok(ConversationHistory {
session: empty_session,
messages: Vec::new(),
total_count: 0,
})
}
}
}
}
}
/// 执行历史查询的具体逻辑
fn execute_history_query(&self, conn: &Connection, query: ConversationHistoryQuery) -> Result<ConversationHistory> {
// 获取会话信息
let session = match self.get_session(&query.session_id)? {
let session: ConversationSession = match self.get_session(&query.session_id)? {
Some(s) => s,
None => {
// 如果会话不存在,创建一个临时会话对象并返回空消息列表
@ -218,7 +282,20 @@ impl ConversationRepository {
/// 获取会话列表
pub fn get_sessions(&self, limit: Option<u32>, offset: Option<u32>) -> Result<Vec<ConversationSession>> {
let conn = self.connection.lock().unwrap();
// 使用只读连接进行查询
match self.database.try_get_read_connection() {
Some(conn) => {
self.execute_get_sessions(&conn, limit, offset)
},
None => {
println!("⚠️ 只读连接被占用get_sessions返回空列表");
Ok(Vec::new())
}
}
}
/// 执行获取会话列表的具体逻辑
fn execute_get_sessions(&self, conn: &Connection, limit: Option<u32>, offset: Option<u32>) -> Result<Vec<ConversationSession>> {
let mut sql = "SELECT id, title, created_at, updated_at, is_active, metadata
FROM conversation_sessions
@ -256,7 +333,8 @@ impl ConversationRepository {
/// 删除会话(软删除)
pub fn delete_session(&self, session_id: &str) -> Result<()> {
let conn = self.connection.lock().unwrap();
let conn = self.database.get_connection();
let conn = conn.lock().unwrap();
conn.execute(
"UPDATE conversation_sessions SET is_active = 0, updated_at = ?1 WHERE id = ?2",
@ -268,7 +346,25 @@ impl ConversationRepository {
/// 获取会话统计信息
pub fn get_conversation_stats(&self) -> Result<ConversationStats> {
let conn = self.connection.lock().unwrap();
// 使用只读连接进行统计查询
match self.database.try_get_read_connection() {
Some(conn) => {
self.execute_get_stats(&conn)
},
None => {
println!("⚠️ 只读连接被占用,返回默认统计");
Ok(ConversationStats {
total_sessions: 0,
active_sessions: 0,
total_messages: 0,
average_messages_per_session: 0.0,
})
}
}
}
/// 执行统计查询的具体逻辑
fn execute_get_stats(&self, conn: &Connection) -> Result<ConversationStats> {
let total_sessions: u32 = conn.query_row(
"SELECT COUNT(*) FROM conversation_sessions",
@ -304,7 +400,8 @@ impl ConversationRepository {
/// 清理过期会话
pub fn cleanup_expired_sessions(&self, max_inactive_days: u32) -> Result<u32> {
let conn = self.connection.lock().unwrap();
let conn = self.database.get_connection();
let conn = conn.lock().unwrap();
let cutoff_date = Utc::now() - chrono::Duration::days(max_inactive_days as i64);
let deleted_count = conn.execute(