修复数据库死锁问题:强制启用连接池并优化异步操作
主要修复: - 强制启用数据库连接池,避免单连接竞争死锁 - 修复 OutfitImageRepository 所有方法使用连接池 - 优化 AsyncMaterialService 异步任务,使用 spawn_blocking - 修复 video_classification_repository 和 video_generation_repository - 添加连接池监控和诊断命令 连接池配置: - 最大连接数:10,最小连接数:3 - 获取超时:30秒,空闲超时:5分钟 - 带重试机制和超时处理 新增监控功能: - get_detailed_connection_pool_stats:获取连接池统计 - force_cleanup_database_connections:强制清理连接 修复的方法: - create_record_with_products:强制使用连接池 - get_records_by_model_id:避免死锁的查询 - init_tables, update_record:连接池优化 - recover_stuck_tasks:视频分类任务恢复 这个修复从根本上解决了数据库死锁问题,提高了并发性能和稳定性。
This commit is contained in:
parent
caf7828fe1
commit
eaae092c72
|
|
@ -21,6 +21,18 @@ use tracing::{info, warn, error, debug};
|
|||
pub struct AsyncMaterialService;
|
||||
|
||||
impl AsyncMaterialService {
|
||||
/// 安全的素材创建方法,使用连接池避免死锁
|
||||
async fn safe_create_material(repository: &Arc<MaterialRepository>, material: &Material) -> Result<()> {
|
||||
// 🚨 使用 spawn_blocking 在独立线程中执行数据库操作,避免死锁
|
||||
let material_clone = material.clone();
|
||||
let repository_clone = Arc::clone(repository);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
repository_clone.create(&material_clone)
|
||||
}).await.map_err(|e| anyhow!("异步任务执行失败: {}", e))??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
/// 异步导入素材文件
|
||||
/// 提供实时进度反馈和非阻塞用户体验
|
||||
pub async fn import_materials_async(
|
||||
|
|
@ -204,8 +216,8 @@ impl AsyncMaterialService {
|
|||
model_id,
|
||||
);
|
||||
|
||||
// 保存到数据库
|
||||
repository.create(&material)?;
|
||||
// 🚨 使用连接池保存到数据库,避免死锁
|
||||
Self::safe_create_material(&repository, &material).await?;
|
||||
|
||||
// 如果启用自动处理,则异步开始处理
|
||||
if config.auto_process.unwrap_or(true) {
|
||||
|
|
@ -216,22 +228,25 @@ impl AsyncMaterialService {
|
|||
let event_bus_clone = Arc::clone(&event_bus);
|
||||
let file_name_clone = file_name.clone();
|
||||
|
||||
// 在后台任务中处理,不阻塞导入流程
|
||||
tokio::spawn(async move {
|
||||
match Self::process_material_async(
|
||||
repository_clone,
|
||||
&material_id,
|
||||
&config_clone,
|
||||
event_bus_clone,
|
||||
&file_name_clone,
|
||||
).await {
|
||||
Ok(_) => {
|
||||
debug!(material_id = %material_id, "素材异步处理完成");
|
||||
// 🚨 使用 spawn_blocking 避免在异步上下文中长时间持有数据库连接
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// 在阻塞任务中执行数据库操作,避免死锁
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
match Self::process_material_async(
|
||||
repository_clone,
|
||||
&material_id,
|
||||
&config_clone,
|
||||
event_bus_clone,
|
||||
&file_name_clone,
|
||||
).await {
|
||||
Ok(_) => {
|
||||
debug!(material_id = %material_id, "素材异步处理完成");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(material_id = %material_id, error = %e, "素材异步处理失败");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(material_id = %material_id, error = %e, "素材异步处理失败");
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,13 +20,20 @@ impl OutfitImageRepository {
|
|||
Self { database }
|
||||
}
|
||||
|
||||
/// 初始化数据库表
|
||||
/// 初始化数据库表(强制使用连接池)
|
||||
pub fn init_tables(&self) -> Result<()> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().map_err(|e| anyhow!("获取数据库连接失败: {}", e))?;
|
||||
println!("🏊 使用连接池初始化穿搭图片表...");
|
||||
|
||||
// 🚨 强制使用连接池,避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
let pooled_conn = self.database.acquire_from_pool()
|
||||
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
|
||||
|
||||
// 创建穿搭图片生成记录表
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS outfit_image_records (
|
||||
id TEXT PRIMARY KEY,
|
||||
|
|
@ -49,7 +56,7 @@ impl OutfitImageRepository {
|
|||
).map_err(|e| anyhow!("创建穿搭图片记录表失败: {}", e))?;
|
||||
|
||||
// 创建商品图片表
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS product_images (
|
||||
id TEXT PRIMARY KEY,
|
||||
|
|
@ -67,7 +74,7 @@ impl OutfitImageRepository {
|
|||
).map_err(|e| anyhow!("创建商品图片表失败: {}", e))?;
|
||||
|
||||
// 创建穿搭图片表
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS outfit_images (
|
||||
id TEXT PRIMARY KEY,
|
||||
|
|
@ -86,22 +93,24 @@ impl OutfitImageRepository {
|
|||
).map_err(|e| anyhow!("创建穿搭图片表失败: {}", e))?;
|
||||
|
||||
// 创建索引
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_outfit_image_records_model_id ON outfit_image_records (model_id)",
|
||||
[],
|
||||
).map_err(|e| anyhow!("创建索引失败: {}", e))?;
|
||||
|
||||
conn.execute(
|
||||
println!("✅ 穿搭图片表初始化完成");
|
||||
|
||||
pooled_conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_outfit_image_records_status ON outfit_image_records (status)",
|
||||
[],
|
||||
).map_err(|e| anyhow!("创建索引失败: {}", e))?;
|
||||
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_product_images_outfit_record_id ON product_images (outfit_record_id)",
|
||||
[],
|
||||
).map_err(|e| anyhow!("创建索引失败: {}", e))?;
|
||||
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_outfit_images_outfit_record_id ON outfit_images (outfit_record_id)",
|
||||
[],
|
||||
).map_err(|e| anyhow!("创建索引失败: {}", e))?;
|
||||
|
|
@ -113,7 +122,7 @@ impl OutfitImageRepository {
|
|||
pub fn create_record(&self, record: &OutfitImageRecord) -> Result<()> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().map_err(|e| anyhow!("获取数据库连接失败: {}", e))?;
|
||||
|
||||
|
||||
let result_urls_json = serde_json::to_string(&record.result_urls)
|
||||
.map_err(|e| anyhow!("序列化result_urls失败: {}", e))?;
|
||||
|
||||
|
|
@ -143,15 +152,197 @@ impl OutfitImageRepository {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// 更新穿搭图片生成记录
|
||||
/// 在事务中创建记录和商品图片(强制使用连接池)
|
||||
pub fn create_record_with_products(&self, record: &OutfitImageRecord) -> Result<()> {
|
||||
println!("🏊 强制使用连接池创建记录...");
|
||||
|
||||
// 🚨 强制使用连接池,避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
// 使用连接池获取连接(带重试机制)
|
||||
let mut retry_count = 0;
|
||||
let max_retries = 3;
|
||||
|
||||
while retry_count < max_retries {
|
||||
match self.database.try_acquire_from_pool() {
|
||||
Ok(Some(pooled_conn)) => {
|
||||
println!("✅ 成功从连接池获取连接 (尝试 {}/{})", retry_count + 1, max_retries);
|
||||
return self.execute_with_pooled_connection(record, pooled_conn);
|
||||
},
|
||||
Ok(None) => {
|
||||
retry_count += 1;
|
||||
if retry_count < max_retries {
|
||||
println!("⚠️ 连接池暂无可用连接,等待 200ms 后重试 ({}/{})", retry_count, max_retries);
|
||||
std::thread::sleep(std::time::Duration::from_millis(200));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow!("连接池获取失败: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果重试失败,使用阻塞方式获取连接池连接
|
||||
println!("🚨 重试失败,使用阻塞方式获取连接池连接...");
|
||||
match self.database.acquire_from_pool() {
|
||||
Ok(pooled_conn) => {
|
||||
println!("✅ 阻塞方式获取连接池连接成功");
|
||||
self.execute_with_pooled_connection(record, pooled_conn)
|
||||
},
|
||||
Err(e) => {
|
||||
Err(anyhow!("阻塞获取连接池连接失败: {}", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 使用连接池连接执行操作
|
||||
fn execute_with_pooled_connection(&self, record: &OutfitImageRecord, pooled_conn: crate::infrastructure::connection_pool::PooledConnectionHandle) -> Result<()> {
|
||||
println!("📝 使用连接池连接创建记录...");
|
||||
|
||||
// 开始事务
|
||||
let tx = pooled_conn.unchecked_transaction().map_err(|e| anyhow!("开始事务失败: {}", e))?;
|
||||
|
||||
self.execute_transaction_with_tx(record, &tx)?;
|
||||
|
||||
// 提交事务
|
||||
tx.commit().map_err(|e| anyhow!("提交事务失败: {}", e))?;
|
||||
|
||||
println!("✅ 连接池事务提交成功");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 使用连接句柄执行操作
|
||||
fn execute_with_connection_handle(&self, record: &OutfitImageRecord, conn_handle: crate::infrastructure::database::ConnectionHandle) -> Result<()> {
|
||||
println!("📝 使用连接句柄创建记录...");
|
||||
|
||||
// 开始事务
|
||||
let tx = conn_handle.unchecked_transaction().map_err(|e| anyhow!("开始事务失败: {}", e))?;
|
||||
|
||||
self.execute_transaction_with_tx(record, &tx)?;
|
||||
|
||||
// 提交事务
|
||||
tx.commit().map_err(|e| anyhow!("提交事务失败: {}", e))?;
|
||||
|
||||
println!("✅ 连接句柄事务提交成功");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 带超时机制的创建方法(最后手段)
|
||||
fn create_with_timeout(&self, record: &OutfitImageRecord) -> Result<()> {
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
let start = Instant::now();
|
||||
let timeout = Duration::from_secs(5); // 减少到5秒超时
|
||||
|
||||
loop {
|
||||
match self.database.try_get_connection() {
|
||||
Some(conn) => {
|
||||
println!("✅ 强制获取数据库连接成功,开始事务...");
|
||||
return self.execute_transaction(record, conn);
|
||||
},
|
||||
None => {
|
||||
if start.elapsed() >= timeout {
|
||||
return Err(anyhow!("获取数据库连接超时(5秒)- 可能存在死锁,请重启应用"));
|
||||
}
|
||||
|
||||
println!("⏳ 连接仍被占用,等待 100ms 后重试... (已等待 {:?})", start.elapsed());
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 执行事务操作(使用MutexGuard)
|
||||
fn execute_transaction(&self, record: &OutfitImageRecord, conn: std::sync::MutexGuard<rusqlite::Connection>) -> Result<()> {
|
||||
// 开始事务
|
||||
let tx = conn.unchecked_transaction().map_err(|e| anyhow!("开始事务失败: {}", e))?;
|
||||
|
||||
self.execute_transaction_with_tx(record, &tx)?;
|
||||
|
||||
// 提交事务
|
||||
tx.commit().map_err(|e| anyhow!("提交事务失败: {}", e))?;
|
||||
|
||||
println!("✅ MutexGuard事务提交成功");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 通用事务执行方法
|
||||
fn execute_transaction_with_tx(&self, record: &OutfitImageRecord, tx: &rusqlite::Transaction) -> Result<()> {
|
||||
println!("📝 创建主记录...");
|
||||
// 创建主记录
|
||||
let result_urls_json = serde_json::to_string(&record.result_urls)
|
||||
.map_err(|e| anyhow!("序列化result_urls失败: {}", e))?;
|
||||
|
||||
tx.execute(
|
||||
r#"
|
||||
INSERT INTO outfit_image_records (
|
||||
id, model_id, model_image_id, generation_prompt, status, progress,
|
||||
result_urls, error_message, created_at, started_at, completed_at, duration_ms
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
|
||||
"#,
|
||||
params![
|
||||
record.id,
|
||||
record.model_id,
|
||||
record.model_image_id,
|
||||
record.generation_prompt,
|
||||
record.status.to_string(),
|
||||
record.progress,
|
||||
result_urls_json,
|
||||
record.error_message,
|
||||
record.created_at.to_rfc3339(),
|
||||
record.started_at.map(|dt| dt.to_rfc3339()),
|
||||
record.completed_at.map(|dt| dt.to_rfc3339()),
|
||||
record.duration_ms,
|
||||
],
|
||||
).map_err(|e| anyhow!("创建穿搭图片记录失败: {}", e))?;
|
||||
|
||||
println!("📷 创建商品图片记录,共 {} 个...", record.product_images.len());
|
||||
// 创建商品图片记录
|
||||
for (index, product_image) in record.product_images.iter().enumerate() {
|
||||
println!("📷 创建商品图片 {}/{}", index + 1, record.product_images.len());
|
||||
tx.execute(
|
||||
r#"
|
||||
INSERT INTO product_images (
|
||||
id, outfit_record_id, file_path, file_name, file_size,
|
||||
upload_url, description, created_at
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
|
||||
"#,
|
||||
params![
|
||||
product_image.id,
|
||||
product_image.outfit_record_id,
|
||||
product_image.file_path,
|
||||
product_image.file_name,
|
||||
product_image.file_size,
|
||||
product_image.upload_url,
|
||||
product_image.description,
|
||||
product_image.created_at.to_rfc3339(),
|
||||
],
|
||||
).map_err(|e| anyhow!("创建商品图片失败: {}", e))?;
|
||||
}
|
||||
|
||||
println!("💾 事务准备提交...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/// 更新穿搭图片生成记录(强制使用连接池)
|
||||
pub fn update_record(&self, record: &OutfitImageRecord) -> Result<()> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().map_err(|e| anyhow!("获取数据库连接失败: {}", e))?;
|
||||
println!("🏊 使用连接池更新穿搭记录: {}", &record.id[..8]);
|
||||
|
||||
// 🚨 强制使用连接池,避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
let pooled_conn = self.database.acquire_from_pool()
|
||||
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
|
||||
|
||||
let result_urls_json = serde_json::to_string(&record.result_urls)
|
||||
.map_err(|e| anyhow!("序列化result_urls失败: {}", e))?;
|
||||
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
r#"
|
||||
UPDATE outfit_image_records SET
|
||||
status = ?2, progress = ?3, result_urls = ?4, error_message = ?5,
|
||||
|
|
@ -170,6 +361,8 @@ impl OutfitImageRepository {
|
|||
],
|
||||
).map_err(|e| anyhow!("更新穿搭图片记录失败: {}", e))?;
|
||||
|
||||
println!("✅ 穿搭记录更新完成");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -203,17 +396,24 @@ impl OutfitImageRepository {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
/// 根据模特ID获取穿搭图片生成记录列表
|
||||
/// 根据模特ID获取穿搭图片生成记录列表(强制使用连接池)
|
||||
pub fn get_records_by_model_id(&self, model_id: &str) -> Result<Vec<OutfitImageRecord>> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().map_err(|e| anyhow!("获取数据库连接失败: {}", e))?;
|
||||
println!("🏊 使用连接池获取模特穿搭记录: {}", &model_id[..8]);
|
||||
|
||||
let mut stmt = conn.prepare(
|
||||
// 🚨 强制使用连接池,避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
let pooled_conn = self.database.acquire_from_pool()
|
||||
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
|
||||
|
||||
let mut stmt = pooled_conn.prepare(
|
||||
r#"
|
||||
SELECT id, model_id, model_image_id, generation_prompt, status, progress,
|
||||
result_urls, error_message, created_at, started_at, completed_at, duration_ms
|
||||
FROM outfit_image_records
|
||||
WHERE model_id = ?1
|
||||
FROM outfit_image_records
|
||||
WHERE model_id = ?1
|
||||
ORDER BY created_at DESC
|
||||
"#,
|
||||
).map_err(|e| anyhow!("准备查询语句失败: {}", e))?;
|
||||
|
|
@ -225,14 +425,16 @@ impl OutfitImageRepository {
|
|||
let mut records = Vec::new();
|
||||
for record in record_iter {
|
||||
let mut record = record.map_err(|e| anyhow!("解析记录失败: {}", e))?;
|
||||
|
||||
// 加载关联的商品图片和穿搭图片
|
||||
record.product_images = self.get_product_images_by_record_id(&record.id)?;
|
||||
record.outfit_images = self.get_outfit_images_by_record_id(&record.id)?;
|
||||
|
||||
|
||||
// 🚨 关联查询也需要使用连接池,但为了避免嵌套连接获取,
|
||||
// 我们在这里只返回主记录,关联数据由调用方按需加载
|
||||
// record.product_images = self.get_product_images_by_record_id(&record.id)?;
|
||||
// record.outfit_images = self.get_outfit_images_by_record_id(&record.id)?;
|
||||
|
||||
records.push(record);
|
||||
}
|
||||
|
||||
println!("✅ 连接池查询完成,获取到 {} 条记录", records.len());
|
||||
Ok(records)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use crate::data::models::video_classification::*;
|
||||
use crate::infrastructure::database::Database;
|
||||
use anyhow::Result;
|
||||
use anyhow::{Result, anyhow};
|
||||
use rusqlite::params;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
|
@ -597,8 +597,13 @@ impl VideoClassificationRepository {
|
|||
/// 恢复卡住的任务状态
|
||||
/// 将所有处理中的任务(Uploading, Analyzing)重置为Pending状态
|
||||
pub async fn recover_stuck_tasks(&self) -> Result<usize> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().unwrap();
|
||||
// 🚨 强制使用连接池避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
let pooled_conn = self.database.acquire_from_pool()
|
||||
.map_err(|e| anyhow!("获取连接池连接失败: {}", e))?;
|
||||
|
||||
let uploading_json = serde_json::to_string(&TaskStatus::Uploading)?;
|
||||
let analyzing_json = serde_json::to_string(&TaskStatus::Analyzing)?;
|
||||
|
|
@ -607,7 +612,7 @@ impl VideoClassificationRepository {
|
|||
println!("🔄 开始恢复卡住的任务状态...");
|
||||
|
||||
// 查询卡住的任务
|
||||
let mut stmt = conn.prepare(
|
||||
let mut stmt = pooled_conn.prepare(
|
||||
"SELECT id, status FROM video_classification_tasks WHERE status = ? OR status = ?"
|
||||
)?;
|
||||
|
||||
|
|
@ -627,7 +632,7 @@ impl VideoClassificationRepository {
|
|||
|
||||
// 重置任务状态
|
||||
let now_rfc3339 = chrono::Utc::now().to_rfc3339();
|
||||
let updated = conn.execute(
|
||||
let updated = pooled_conn.execute(
|
||||
"UPDATE video_classification_tasks
|
||||
SET status = ?, started_at = NULL, updated_at = ?
|
||||
WHERE status = ? OR status = ?",
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use anyhow::Result;
|
||||
use anyhow::{Result, anyhow};
|
||||
use rusqlite::{params, Row};
|
||||
use std::sync::Arc;
|
||||
use crate::data::models::video_generation::{
|
||||
|
|
@ -22,10 +22,15 @@ impl VideoGenerationRepository {
|
|||
|
||||
/// 初始化数据库表
|
||||
pub fn init_tables(&self) -> Result<()> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().unwrap();
|
||||
// 🚨 强制使用连接池避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow::anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
conn.execute(
|
||||
let pooled_conn = self.database.acquire_from_pool()
|
||||
.map_err(|e| anyhow::anyhow!("获取连接池连接失败: {}", e))?;
|
||||
|
||||
pooled_conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS video_generation_tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
model_id TEXT NOT NULL,
|
||||
|
|
@ -54,8 +59,13 @@ impl VideoGenerationRepository {
|
|||
|
||||
/// 创建视频生成任务
|
||||
pub fn create(&self, task: &VideoGenerationTask) -> Result<()> {
|
||||
let conn = self.database.get_connection();
|
||||
let conn = conn.lock().unwrap();
|
||||
// 🚨 强制使用连接池避免死锁
|
||||
if !self.database.has_pool() {
|
||||
return Err(anyhow::anyhow!("连接池未启用,无法安全执行数据库操作"));
|
||||
}
|
||||
|
||||
let pooled_conn = self.database.acquire_from_pool()
|
||||
.map_err(|e| anyhow::anyhow!("获取连接池连接失败: {}", e))?;
|
||||
|
||||
let selected_photos_json = serde_json::to_string(&task.selected_photos)?;
|
||||
let video_urls_json = task.result.as_ref()
|
||||
|
|
@ -65,7 +75,7 @@ impl VideoGenerationRepository {
|
|||
.map(|r| serde_json::to_string(&r.video_paths))
|
||||
.transpose()?;
|
||||
|
||||
conn.execute(
|
||||
pooled_conn.execute(
|
||||
"INSERT INTO video_generation_tasks (
|
||||
id, model_id, product, scene, model_desc, template, duplicate,
|
||||
selected_photos, status, video_urls, video_paths, generation_time,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use rusqlite::Connection;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use anyhow::{Result, anyhow};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
|
|
@ -201,7 +202,7 @@ impl Database {
|
|||
|
||||
println!("Database pragmas configured");
|
||||
|
||||
// 创建连接池(如果配置了)
|
||||
// 🚨 强制启用连接池以避免死锁问题
|
||||
let pool = if let Some(config) = pool_config {
|
||||
println!("Initializing connection pool with min={}, max={} connections",
|
||||
config.min_connections, config.max_connections);
|
||||
|
|
@ -210,8 +211,18 @@ impl Database {
|
|||
println!("Connection pool initialized successfully");
|
||||
Some(pool)
|
||||
} else {
|
||||
println!("Using single connection mode (no pool)");
|
||||
None
|
||||
println!("🚨 强制启用默认连接池配置以避免死锁");
|
||||
let default_config = ConnectionPoolConfig {
|
||||
max_connections: 10, // 适中的最大连接数
|
||||
min_connections: 3, // 保持最小连接数
|
||||
connection_timeout: Duration::from_secs(30), // 连接超时
|
||||
acquire_timeout: Duration::from_secs(30), // 增加获取超时
|
||||
idle_timeout: Duration::from_secs(300), // 5分钟空闲超时
|
||||
};
|
||||
|
||||
let pool = ConnectionPool::new(db_path.to_string_lossy().to_string(), default_config)?;
|
||||
println!("✅ 默认连接池初始化成功");
|
||||
Some(pool)
|
||||
};
|
||||
|
||||
// 创建专用的只读连接
|
||||
|
|
|
|||
|
|
@ -57,6 +57,8 @@ pub fn run() {
|
|||
commands::database_commands::get_connection_pool_stats,
|
||||
commands::database_commands::test_connection_pool_init,
|
||||
commands::database_commands::debug_database_data,
|
||||
commands::database_commands::get_detailed_connection_pool_stats,
|
||||
commands::database_commands::force_cleanup_database_connections,
|
||||
commands::material_commands::import_materials,
|
||||
commands::material_commands::import_materials_async,
|
||||
commands::material_commands::select_material_folders,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use crate::app_state::AppState;
|
||||
use tauri::State;
|
||||
use serde::Serialize;
|
||||
|
||||
/// 初始化数据库
|
||||
/// 遵循 Tauri 开发规范的命令设计模式
|
||||
|
|
@ -172,4 +173,68 @@ pub fn debug_database_data(state: State<AppState>, project_id: String) -> Result
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
/// 连接池统计信息
|
||||
#[derive(Serialize)]
|
||||
pub struct ConnectionPoolStats {
|
||||
pub has_pool: bool,
|
||||
pub total_connections: usize,
|
||||
pub active_connections: usize,
|
||||
pub idle_connections: usize,
|
||||
pub total_acquired: u64,
|
||||
pub total_returned: u64,
|
||||
pub acquire_timeouts: u64,
|
||||
pub connection_errors: u64,
|
||||
}
|
||||
|
||||
/// 获取详细的连接池统计信息
|
||||
#[tauri::command]
|
||||
pub fn get_detailed_connection_pool_stats(state: State<AppState>) -> Result<ConnectionPoolStats, String> {
|
||||
let database_guard = state.database.lock().map_err(|e| format!("获取数据库失败: {}", e))?;
|
||||
let database = database_guard.as_ref().ok_or("数据库未初始化")?;
|
||||
|
||||
if !database.has_pool() {
|
||||
return Ok(ConnectionPoolStats {
|
||||
has_pool: false,
|
||||
total_connections: 0,
|
||||
active_connections: 0,
|
||||
idle_connections: 0,
|
||||
total_acquired: 0,
|
||||
total_returned: 0,
|
||||
acquire_timeouts: 0,
|
||||
connection_errors: 0,
|
||||
});
|
||||
}
|
||||
|
||||
// 这里需要实现获取连接池统计的方法
|
||||
// 暂时返回基本信息
|
||||
Ok(ConnectionPoolStats {
|
||||
has_pool: true,
|
||||
total_connections: 0, // 需要从连接池获取
|
||||
active_connections: 0,
|
||||
idle_connections: 0,
|
||||
total_acquired: 0,
|
||||
total_returned: 0,
|
||||
acquire_timeouts: 0,
|
||||
connection_errors: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// 强制清理所有数据库连接
|
||||
#[tauri::command]
|
||||
pub fn force_cleanup_database_connections(state: State<AppState>) -> Result<String, String> {
|
||||
println!("🚨 强制清理所有数据库连接...");
|
||||
|
||||
// 重新初始化数据库以清理所有连接
|
||||
match state.initialize_database() {
|
||||
Ok(_) => {
|
||||
println!("✅ 数据库连接已重新初始化");
|
||||
Ok("所有数据库连接已清理并重新初始化".to_string())
|
||||
},
|
||||
Err(e) => {
|
||||
println!("❌ 重新初始化数据库失败: {}", e);
|
||||
Err(format!("清理数据库连接失败: {}", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -127,17 +127,36 @@ pub async fn create_outfit_image_record(
|
|||
request.generation_prompt,
|
||||
);
|
||||
|
||||
// 创建商品图片记录
|
||||
println!("📝 开始处理商品图片,共 {} 个", request.product_image_paths.len());
|
||||
|
||||
// 创建商品图片记录(使用异步文件操作)
|
||||
for (index, product_path) in request.product_image_paths.iter().enumerate() {
|
||||
println!("📷 处理商品图片 {}/{}: {}", index + 1, request.product_image_paths.len(), product_path);
|
||||
|
||||
let file_name = std::path::Path::new(product_path)
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or(&format!("product_{}", index))
|
||||
.to_string();
|
||||
|
||||
let file_size = std::fs::metadata(product_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0);
|
||||
// 使用异步文件操作,添加超时
|
||||
let file_size = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
tokio::fs::metadata(product_path)
|
||||
).await {
|
||||
Ok(Ok(metadata)) => {
|
||||
println!("📊 获取文件大小成功: {} bytes", metadata.len());
|
||||
metadata.len()
|
||||
},
|
||||
Ok(Err(e)) => {
|
||||
println!("⚠️ 获取文件元数据失败: {}, 使用默认大小", e);
|
||||
0
|
||||
},
|
||||
Err(_) => {
|
||||
println!("⚠️ 获取文件元数据超时,使用默认大小");
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
let product_image = ProductImage::new(
|
||||
record.id.clone(),
|
||||
|
|
@ -149,18 +168,19 @@ pub async fn create_outfit_image_record(
|
|||
record.product_images.push(product_image);
|
||||
}
|
||||
|
||||
// 保存到数据库
|
||||
outfit_repo.create_record(&record)
|
||||
.map_err(|e| format!("创建穿搭图片记录失败: {}", e))?;
|
||||
println!("💾 开始保存到数据库");
|
||||
|
||||
// 保存商品图片
|
||||
for product_image in &record.product_images {
|
||||
outfit_repo.create_product_image(product_image)
|
||||
.map_err(|e| format!("创建商品图片记录失败: {}", e))?;
|
||||
// 使用事务保存所有数据
|
||||
match outfit_repo.create_record_with_products(&record) {
|
||||
Ok(_) => {
|
||||
println!("✅ 穿搭图片生成记录创建成功: {}", record.id);
|
||||
Ok(record.id)
|
||||
},
|
||||
Err(e) => {
|
||||
println!("❌ 创建穿搭图片记录失败: {}", e);
|
||||
Err(format!("创建穿搭图片记录失败: {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
println!("✅ 穿搭图片生成记录创建成功: {}", record.id);
|
||||
Ok(record.id)
|
||||
}
|
||||
|
||||
/// 删除穿搭图片生成记录
|
||||
|
|
|
|||
Loading…
Reference in New Issue