mixvideo-v2/apps/desktop/src-tauri/src/business/services/video_classification_queue.rs

389 lines
14 KiB
Rust

use crate::data::models::video_classification::*;
use crate::business::services::video_classification_service::VideoClassificationService;
use anyhow::{Result, anyhow};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;
/// 队列状态
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum QueueStatus {
Stopped,
Running,
Paused,
}
/// 队列统计信息
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QueueStats {
pub status: QueueStatus,
pub total_tasks: usize,
pub pending_tasks: usize,
pub processing_tasks: usize,
pub completed_tasks: usize,
pub failed_tasks: usize,
pub current_task_id: Option<String>,
pub processing_rate: f64, // 任务/分钟
}
/// 任务进度信息
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TaskProgress {
pub task_id: String,
pub status: TaskStatus,
pub progress_percentage: f64,
pub current_step: String,
pub error_message: Option<String>,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub estimated_completion: Option<chrono::DateTime<chrono::Utc>>,
}
/// AI视频分类任务队列
/// 遵循 Tauri 开发规范的业务层设计模式
pub struct VideoClassificationQueue {
service: Arc<VideoClassificationService>,
status: Arc<RwLock<QueueStatus>>,
current_task: Arc<Mutex<Option<String>>>,
task_progress: Arc<RwLock<HashMap<String, TaskProgress>>>,
stats: Arc<RwLock<QueueStats>>,
max_concurrent_tasks: usize,
processing_delay: Duration,
}
impl VideoClassificationQueue {
/// 创建新的任务队列
pub fn new(service: Arc<VideoClassificationService>) -> Self {
Self {
service,
status: Arc::new(RwLock::new(QueueStatus::Stopped)),
current_task: Arc::new(Mutex::new(None)),
task_progress: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(QueueStats {
status: QueueStatus::Stopped,
total_tasks: 0,
pending_tasks: 0,
processing_tasks: 0,
completed_tasks: 0,
failed_tasks: 0,
current_task_id: None,
processing_rate: 0.0,
})),
max_concurrent_tasks: 1, // 目前只支持单任务处理
processing_delay: Duration::from_secs(2), // 任务间延迟
}
}
/// 启动队列处理
pub async fn start(&self) -> Result<()> {
let mut status = self.status.write().await;
if *status == QueueStatus::Running {
return Err(anyhow!("队列已经在运行中"));
}
*status = QueueStatus::Running;
drop(status);
// 恢复卡住的任务状态
match self.service.recover_stuck_tasks().await {
Ok(recovered_count) => {
if recovered_count > 0 {
println!("🔄 队列启动时恢复了 {} 个卡住的任务", recovered_count);
}
}
Err(e) => {
println!("⚠️ 恢复卡住任务时出错: {}", e);
}
}
// 更新统计信息
self.update_stats().await?;
// 启动处理循环
let queue_clone = self.clone_for_processing();
tokio::spawn(async move {
queue_clone.processing_loop().await;
});
println!("AI视频分类队列已启动");
Ok(())
}
/// 停止队列处理
pub async fn stop(&self) -> Result<()> {
let mut status = self.status.write().await;
*status = QueueStatus::Stopped;
// 更新统计信息
self.update_stats().await?;
println!("AI视频分类队列已停止");
Ok(())
}
/// 暂停队列处理
pub async fn pause(&self) -> Result<()> {
let mut status = self.status.write().await;
if *status != QueueStatus::Running {
return Err(anyhow!("队列未在运行中"));
}
*status = QueueStatus::Paused;
// 更新统计信息
self.update_stats().await?;
println!("AI视频分类队列已暂停");
Ok(())
}
/// 恢复队列处理
pub async fn resume(&self) -> Result<()> {
let mut status = self.status.write().await;
if *status != QueueStatus::Paused {
return Err(anyhow!("队列未处于暂停状态"));
}
*status = QueueStatus::Running;
// 更新统计信息
self.update_stats().await?;
println!("AI视频分类队列已恢复");
Ok(())
}
/// 添加批量分类任务到队列
pub async fn add_batch_tasks(&self, request: BatchClassificationRequest) -> Result<Vec<String>> {
let tasks = self.service.create_batch_classification_tasks(request).await?;
let task_ids: Vec<String> = tasks.iter().map(|t| t.id.clone()).collect();
// 初始化任务进度
let mut progress_map = self.task_progress.write().await;
for task in &tasks {
progress_map.insert(task.id.clone(), TaskProgress {
task_id: task.id.clone(),
status: TaskStatus::Pending,
progress_percentage: 0.0,
current_step: "等待处理".to_string(),
error_message: None,
started_at: None,
estimated_completion: None,
});
}
// 更新统计信息
self.update_stats().await?;
println!("已添加 {} 个分类任务到队列", tasks.len());
Ok(task_ids)
}
/// 获取队列统计信息
pub async fn get_stats(&self) -> QueueStats {
self.stats.read().await.clone()
}
/// 获取项目的队列统计信息
pub async fn get_project_stats(&self, project_id: &str) -> Result<QueueStats> {
let status = self.status.read().await.clone();
let current_task = self.current_task.lock().await.clone();
// 获取项目特定的分类统计
let classification_stats = self.service.get_classification_stats(Some(project_id)).await?;
Ok(QueueStats {
status,
total_tasks: classification_stats.total_tasks as usize,
pending_tasks: classification_stats.pending_tasks as usize,
processing_tasks: classification_stats.processing_tasks as usize,
completed_tasks: classification_stats.completed_tasks as usize,
failed_tasks: classification_stats.failed_tasks as usize,
current_task_id: current_task,
processing_rate: self.stats.read().await.processing_rate, // 保持全局处理速率
})
}
/// 获取任务进度
pub async fn get_task_progress(&self, task_id: &str) -> Option<TaskProgress> {
self.task_progress.read().await.get(task_id).cloned()
}
/// 获取所有任务进度
pub async fn get_all_task_progress(&self) -> HashMap<String, TaskProgress> {
self.task_progress.read().await.clone()
}
/// 获取项目的任务进度
pub async fn get_project_task_progress(&self, _project_id: &str) -> HashMap<String, TaskProgress> {
// 暂时返回所有任务进度,后续可以通过数据库查询来过滤项目相关任务
// TODO: 实现真正的项目过滤逻辑
self.task_progress.read().await.clone()
}
/// 处理循环
async fn processing_loop(&self) {
let last_task_time = std::time::Instant::now();
let mut completed_count = 0;
loop {
let status = self.status.read().await.clone();
match status {
QueueStatus::Stopped => break,
QueueStatus::Paused => {
sleep(Duration::from_secs(1)).await;
continue;
}
QueueStatus::Running => {
// 处理下一个任务
match self.process_next_task().await {
Ok(Some(_)) => {
completed_count += 1;
// 计算处理速率
let elapsed = last_task_time.elapsed();
if elapsed.as_secs() > 0 {
let rate = (completed_count as f64) / (elapsed.as_secs() as f64 / 60.0);
let mut stats = self.stats.write().await;
stats.processing_rate = rate;
}
// 任务间延迟
sleep(self.processing_delay).await;
}
Ok(None) => {
// 没有待处理任务,等待
sleep(Duration::from_secs(5)).await;
}
Err(e) => {
println!("处理任务时发生错误: {}", e);
sleep(Duration::from_secs(10)).await;
}
}
}
}
// 定期更新统计信息
if let Err(e) = self.update_stats().await {
println!("更新统计信息失败: {}", e);
}
}
println!("AI视频分类队列处理循环已退出");
}
/// 处理下一个任务
async fn process_next_task(&self) -> Result<Option<String>> {
println!("🔍 查找待处理任务...");
// 获取待处理任务
let pending_tasks = self.service.get_pending_tasks(Some(1)).await?;
if let Some(task) = pending_tasks.first() {
let task_id = task.id.clone();
println!("📋 找到待处理任务: {}", task_id);
println!("📁 任务视频文件: {}", task.video_file_path);
println!("🔢 任务优先级: {}", task.priority);
// 设置当前任务
{
let mut current_task = self.current_task.lock().await;
*current_task = Some(task_id.clone());
println!("🎯 设置当前任务: {}", task_id);
}
// 更新任务进度
println!("📊 更新任务进度: 开始处理");
self.update_task_progress(&task_id, TaskStatus::Uploading, 10.0, "开始处理").await;
let task_start_time = std::time::Instant::now();
// 处理任务
println!("🚀 开始处理分类任务: {}", task_id);
match self.service.process_classification_task(&task_id).await {
Ok(_) => {
let task_duration = task_start_time.elapsed();
self.update_task_progress(&task_id, TaskStatus::Completed, 100.0, "处理完成").await;
println!("✅ 任务处理成功: {} (耗时: {:?})", task_id, task_duration);
}
Err(e) => {
let task_duration = task_start_time.elapsed();
self.update_task_progress_with_error(&task_id, TaskStatus::Failed, e.to_string()).await;
println!("❌ 任务处理失败: {} - {} (耗时: {:?})", task_id, e, task_duration);
}
}
// 清除当前任务
{
let mut current_task = self.current_task.lock().await;
*current_task = None;
println!("🔄 清除当前任务");
}
Ok(Some(task_id))
} else {
println!("⏸️ 没有待处理任务,等待中...");
Ok(None)
}
}
/// 更新任务进度
async fn update_task_progress(&self, task_id: &str, status: TaskStatus, progress: f64, step: &str) {
let mut progress_map = self.task_progress.write().await;
if let Some(task_progress) = progress_map.get_mut(task_id) {
task_progress.status = status;
task_progress.progress_percentage = progress;
task_progress.current_step = step.to_string();
if task_progress.started_at.is_none() && progress > 0.0 {
task_progress.started_at = Some(chrono::Utc::now());
}
}
}
/// 更新任务进度(带错误信息)
async fn update_task_progress_with_error(&self, task_id: &str, status: TaskStatus, error: String) {
let mut progress_map = self.task_progress.write().await;
if let Some(task_progress) = progress_map.get_mut(task_id) {
task_progress.status = status;
task_progress.error_message = Some(error);
task_progress.current_step = "处理失败".to_string();
}
}
/// 更新统计信息
async fn update_stats(&self) -> Result<()> {
let status = self.status.read().await.clone();
let current_task = self.current_task.lock().await.clone();
// 获取分类统计
let classification_stats = self.service.get_classification_stats(None).await?;
let mut stats = self.stats.write().await;
stats.status = status;
stats.total_tasks = classification_stats.total_tasks as usize;
stats.pending_tasks = classification_stats.pending_tasks as usize;
stats.processing_tasks = classification_stats.processing_tasks as usize;
stats.completed_tasks = classification_stats.completed_tasks as usize;
stats.failed_tasks = classification_stats.failed_tasks as usize;
stats.current_task_id = current_task;
Ok(())
}
/// 克隆用于处理的实例
fn clone_for_processing(&self) -> Self {
Self {
service: Arc::clone(&self.service),
status: Arc::clone(&self.status),
current_task: Arc::clone(&self.current_task),
task_progress: Arc::clone(&self.task_progress),
stats: Arc::clone(&self.stats),
max_concurrent_tasks: self.max_concurrent_tasks,
processing_delay: self.processing_delay,
}
}
}