完成 conversation_repository.rs 连接池优化

修复内容:
- initialize_tables: 强制使用连接池初始化表
- create_session: 连接池创建会话
- ensure_session_exists: 连接池确保会话存在
- add_message: 连接池添加消息和更新会话时间
- delete_session: 连接池软删除会话
- cleanup_expired_sessions: 连接池清理过期会话

 关键修复:
- 添加 anyhow 宏导入
- 所有方法强制检查连接池状态
- 统一错误处理和日志记录
- 避免所有 get_connection().lock() 死锁风险

这是数据库连接池优化计划的第2步,会话管理现在完全使用连接池。
This commit is contained in:
imeepos 2025-07-30 19:01:13 +08:00
parent b6999c379b
commit b0f880dc13
1 changed files with 59 additions and 21 deletions

View File

@ -1,4 +1,4 @@
use anyhow::Result; use anyhow::{Result, anyhow};
use rusqlite::{params, Connection, Row, OptionalExtension}; use rusqlite::{params, Connection, Row, OptionalExtension};
use std::sync::{Arc}; use std::sync::{Arc};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -78,13 +78,20 @@ impl ConversationRepository {
Ok(()) Ok(())
} }
/// 创建新会话 /// 创建新会话(强制使用连接池)
pub fn create_session(&self, request: CreateConversationSessionRequest) -> Result<ConversationSession> { pub fn create_session(&self, request: CreateConversationSessionRequest) -> Result<ConversationSession> {
let session = ConversationSession::new(request.title); let session = ConversationSession::new(request.title);
let conn = self.database.get_connection(); println!("🏊 使用连接池创建会话: {}", &session.id[..8]);
let conn = conn.lock().unwrap();
conn.execute( // 🚨 强制使用连接池,避免死锁
if !self.database.has_pool() {
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
}
let pooled_conn = self.database.acquire_from_pool()
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
pooled_conn.execute(
"INSERT INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata) "INSERT INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)", VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![ params![
@ -97,6 +104,7 @@ impl ConversationRepository {
], ],
)?; )?;
println!("✅ 会话创建完成");
Ok(session) Ok(session)
} }
@ -148,12 +156,18 @@ impl ConversationRepository {
}, },
None => { None => {
println!("🆕 会话不存在,创建新会话: {}", session_id); println!("🆕 会话不存在,创建新会话: {}", session_id);
println!("🏊 使用连接池创建会话: {}", &session_id[..8]);
let conn = self.database.get_connection(); // 🚨 强制使用连接池,避免死锁
let conn = conn.lock().unwrap(); if !self.database.has_pool() {
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
}
let pooled_conn = self.database.acquire_from_pool()
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
// 直接插入会话记录使用指定的session_id // 直接插入会话记录使用指定的session_id
conn.execute( pooled_conn.execute(
"INSERT OR REPLACE INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata) "INSERT OR REPLACE INTO conversation_sessions (id, title, created_at, updated_at, is_active, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)", VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![ params![
@ -180,11 +194,18 @@ impl ConversationRepository {
request.content, request.content,
); );
let conn = self.database.get_connection(); println!("🏊 使用连接池添加消息: {}", &message.id[..8]);
let conn = conn.lock().unwrap();
// 🚨 强制使用连接池,避免死锁
if !self.database.has_pool() {
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
}
let pooled_conn = self.database.acquire_from_pool()
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
// 插入消息 // 插入消息
conn.execute( pooled_conn.execute(
"INSERT INTO conversation_messages (id, session_id, role, content, timestamp, metadata) "INSERT INTO conversation_messages (id, session_id, role, content, timestamp, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)", VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![ params![
@ -198,11 +219,12 @@ impl ConversationRepository {
)?; )?;
// 更新会话的最后更新时间 // 更新会话的最后更新时间
conn.execute( pooled_conn.execute(
"UPDATE conversation_sessions SET updated_at = ?1 WHERE id = ?2", "UPDATE conversation_sessions SET updated_at = ?1 WHERE id = ?2",
params![Utc::now().to_rfc3339(), request.session_id], params![Utc::now().to_rfc3339(), request.session_id],
)?; )?;
println!("✅ 消息添加完成");
Ok(message) Ok(message)
} }
@ -372,16 +394,24 @@ impl ConversationRepository {
Ok(sessions) Ok(sessions)
} }
/// 删除会话(软删除 /// 删除会话(软删除,强制使用连接池
pub fn delete_session(&self, session_id: &str) -> Result<()> { pub fn delete_session(&self, session_id: &str) -> Result<()> {
let conn = self.database.get_connection(); println!("🏊 使用连接池删除会话: {}", &session_id[..8]);
let conn = conn.lock().unwrap();
conn.execute( // 🚨 强制使用连接池,避免死锁
if !self.database.has_pool() {
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
}
let pooled_conn = self.database.acquire_from_pool()
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
pooled_conn.execute(
"UPDATE conversation_sessions SET is_active = 0, updated_at = ?1 WHERE id = ?2", "UPDATE conversation_sessions SET is_active = 0, updated_at = ?1 WHERE id = ?2",
params![Utc::now().to_rfc3339(), session_id], params![Utc::now().to_rfc3339(), session_id],
)?; )?;
println!("✅ 会话删除完成");
Ok(()) Ok(())
} }
@ -439,19 +469,27 @@ impl ConversationRepository {
}) })
} }
/// 清理过期会话 /// 清理过期会话(强制使用连接池)
pub fn cleanup_expired_sessions(&self, max_inactive_days: u32) -> Result<u32> { pub fn cleanup_expired_sessions(&self, max_inactive_days: u32) -> Result<u32> {
let conn = self.database.get_connection(); println!("🏊 使用连接池清理过期会话,超过{}", max_inactive_days);
let conn = conn.lock().unwrap();
// 🚨 强制使用连接池,避免死锁
if !self.database.has_pool() {
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
}
let pooled_conn = self.database.acquire_from_pool()
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
let cutoff_date = Utc::now() - chrono::Duration::days(max_inactive_days as i64); let cutoff_date = Utc::now() - chrono::Duration::days(max_inactive_days as i64);
let deleted_count = conn.execute( let deleted_count = pooled_conn.execute(
"UPDATE conversation_sessions "UPDATE conversation_sessions
SET is_active = 0, updated_at = ?1 SET is_active = 0, updated_at = ?1
WHERE is_active = 1 AND updated_at < ?2", WHERE is_active = 1 AND updated_at < ?2",
params![Utc::now().to_rfc3339(), cutoff_date.to_rfc3339()], params![Utc::now().to_rfc3339(), cutoff_date.to_rfc3339()],
)?; )?;
println!("✅ 过期会话清理完成,删除{}个会话", deleted_count);
Ok(deleted_count as u32) Ok(deleted_count as u32)
} }