401 lines
14 KiB
Rust
401 lines
14 KiB
Rust
use crate::data::models::video_classification::*;
|
|
use crate::business::services::video_classification_service::VideoClassificationService;
|
|
use anyhow::{Result, anyhow};
|
|
use std::sync::Arc;
|
|
use tokio::sync::{Mutex, RwLock};
|
|
use tokio::time::{sleep, Duration};
|
|
use std::collections::HashMap;
|
|
|
|
|
|
/// 队列状态
|
|
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
|
|
pub enum QueueStatus {
|
|
Stopped,
|
|
Running,
|
|
Paused,
|
|
}
|
|
|
|
/// 队列统计信息
|
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
|
pub struct QueueStats {
|
|
pub status: QueueStatus,
|
|
pub total_tasks: usize,
|
|
pub pending_tasks: usize,
|
|
pub processing_tasks: usize,
|
|
pub completed_tasks: usize,
|
|
pub failed_tasks: usize,
|
|
pub current_task_id: Option<String>,
|
|
pub processing_rate: f64, // 任务/分钟
|
|
}
|
|
|
|
/// 任务进度信息
|
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
|
pub struct TaskProgress {
|
|
pub task_id: String,
|
|
pub status: TaskStatus,
|
|
pub progress_percentage: f64,
|
|
pub current_step: String,
|
|
pub error_message: Option<String>,
|
|
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
|
|
pub estimated_completion: Option<chrono::DateTime<chrono::Utc>>,
|
|
}
|
|
|
|
/// AI视频分类任务队列
|
|
/// 遵循 Tauri 开发规范的业务层设计模式
|
|
pub struct VideoClassificationQueue {
|
|
service: Arc<VideoClassificationService>,
|
|
status: Arc<RwLock<QueueStatus>>,
|
|
current_task: Arc<Mutex<Option<String>>>,
|
|
task_progress: Arc<RwLock<HashMap<String, TaskProgress>>>,
|
|
stats: Arc<RwLock<QueueStats>>,
|
|
max_concurrent_tasks: usize,
|
|
processing_delay: Duration,
|
|
}
|
|
|
|
impl VideoClassificationQueue {
|
|
/// 创建新的任务队列
|
|
pub fn new(service: Arc<VideoClassificationService>) -> Self {
|
|
Self {
|
|
service,
|
|
status: Arc::new(RwLock::new(QueueStatus::Stopped)),
|
|
current_task: Arc::new(Mutex::new(None)),
|
|
task_progress: Arc::new(RwLock::new(HashMap::new())),
|
|
stats: Arc::new(RwLock::new(QueueStats {
|
|
status: QueueStatus::Stopped,
|
|
total_tasks: 0,
|
|
pending_tasks: 0,
|
|
processing_tasks: 0,
|
|
completed_tasks: 0,
|
|
failed_tasks: 0,
|
|
current_task_id: None,
|
|
processing_rate: 0.0,
|
|
})),
|
|
max_concurrent_tasks: 1, // 目前只支持单任务处理
|
|
processing_delay: Duration::from_secs(2), // 任务间延迟
|
|
}
|
|
}
|
|
|
|
/// 启动队列处理
|
|
pub async fn start(&self) -> Result<()> {
|
|
let mut status = self.status.write().await;
|
|
if *status == QueueStatus::Running {
|
|
return Err(anyhow!("队列已经在运行中"));
|
|
}
|
|
|
|
*status = QueueStatus::Running;
|
|
drop(status);
|
|
|
|
// 修复数据库中的日期格式问题
|
|
match self.service.fix_date_formats().await {
|
|
Ok(fixed_count) => {
|
|
if fixed_count > 0 {
|
|
println!("🔧 队列启动时修复了 {} 个任务的日期格式", fixed_count);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
println!("⚠️ 修复日期格式时出错: {}", e);
|
|
}
|
|
}
|
|
|
|
// 恢复卡住的任务状态
|
|
match self.service.recover_stuck_tasks().await {
|
|
Ok(recovered_count) => {
|
|
if recovered_count > 0 {
|
|
println!("🔄 队列启动时恢复了 {} 个卡住的任务", recovered_count);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
println!("⚠️ 恢复卡住任务时出错: {}", e);
|
|
}
|
|
}
|
|
|
|
// 更新统计信息
|
|
self.update_stats().await?;
|
|
|
|
// 启动处理循环
|
|
let queue_clone = self.clone_for_processing();
|
|
tokio::spawn(async move {
|
|
queue_clone.processing_loop().await;
|
|
});
|
|
|
|
println!("AI视频分类队列已启动");
|
|
Ok(())
|
|
}
|
|
|
|
/// 停止队列处理
|
|
pub async fn stop(&self) -> Result<()> {
|
|
let mut status = self.status.write().await;
|
|
*status = QueueStatus::Stopped;
|
|
|
|
// 更新统计信息
|
|
self.update_stats().await?;
|
|
|
|
println!("AI视频分类队列已停止");
|
|
Ok(())
|
|
}
|
|
|
|
/// 暂停队列处理
|
|
pub async fn pause(&self) -> Result<()> {
|
|
let mut status = self.status.write().await;
|
|
if *status != QueueStatus::Running {
|
|
return Err(anyhow!("队列未在运行中"));
|
|
}
|
|
|
|
*status = QueueStatus::Paused;
|
|
|
|
// 更新统计信息
|
|
self.update_stats().await?;
|
|
|
|
println!("AI视频分类队列已暂停");
|
|
Ok(())
|
|
}
|
|
|
|
/// 恢复队列处理
|
|
pub async fn resume(&self) -> Result<()> {
|
|
let mut status = self.status.write().await;
|
|
if *status != QueueStatus::Paused {
|
|
return Err(anyhow!("队列未处于暂停状态"));
|
|
}
|
|
|
|
*status = QueueStatus::Running;
|
|
|
|
// 更新统计信息
|
|
self.update_stats().await?;
|
|
|
|
println!("AI视频分类队列已恢复");
|
|
Ok(())
|
|
}
|
|
|
|
/// 添加批量分类任务到队列
|
|
pub async fn add_batch_tasks(&self, request: BatchClassificationRequest) -> Result<Vec<String>> {
|
|
let tasks = self.service.create_batch_classification_tasks(request).await?;
|
|
let task_ids: Vec<String> = tasks.iter().map(|t| t.id.clone()).collect();
|
|
|
|
// 初始化任务进度
|
|
let mut progress_map = self.task_progress.write().await;
|
|
for task in &tasks {
|
|
progress_map.insert(task.id.clone(), TaskProgress {
|
|
task_id: task.id.clone(),
|
|
status: TaskStatus::Pending,
|
|
progress_percentage: 0.0,
|
|
current_step: "等待处理".to_string(),
|
|
error_message: None,
|
|
started_at: None,
|
|
estimated_completion: None,
|
|
});
|
|
}
|
|
|
|
// 更新统计信息
|
|
self.update_stats().await?;
|
|
|
|
println!("已添加 {} 个分类任务到队列", tasks.len());
|
|
Ok(task_ids)
|
|
}
|
|
|
|
/// 获取队列统计信息
|
|
pub async fn get_stats(&self) -> QueueStats {
|
|
self.stats.read().await.clone()
|
|
}
|
|
|
|
/// 获取项目的队列统计信息
|
|
pub async fn get_project_stats(&self, project_id: &str) -> Result<QueueStats> {
|
|
let status = self.status.read().await.clone();
|
|
let current_task = self.current_task.lock().await.clone();
|
|
|
|
// 获取项目特定的分类统计
|
|
let classification_stats = self.service.get_classification_stats(Some(project_id)).await?;
|
|
|
|
Ok(QueueStats {
|
|
status,
|
|
total_tasks: classification_stats.total_tasks as usize,
|
|
pending_tasks: classification_stats.pending_tasks as usize,
|
|
processing_tasks: classification_stats.processing_tasks as usize,
|
|
completed_tasks: classification_stats.completed_tasks as usize,
|
|
failed_tasks: classification_stats.failed_tasks as usize,
|
|
current_task_id: current_task,
|
|
processing_rate: self.stats.read().await.processing_rate, // 保持全局处理速率
|
|
})
|
|
}
|
|
|
|
/// 获取任务进度
|
|
pub async fn get_task_progress(&self, task_id: &str) -> Option<TaskProgress> {
|
|
self.task_progress.read().await.get(task_id).cloned()
|
|
}
|
|
|
|
/// 获取所有任务进度
|
|
pub async fn get_all_task_progress(&self) -> HashMap<String, TaskProgress> {
|
|
self.task_progress.read().await.clone()
|
|
}
|
|
|
|
/// 获取项目的任务进度
|
|
pub async fn get_project_task_progress(&self, _project_id: &str) -> HashMap<String, TaskProgress> {
|
|
// 暂时返回所有任务进度,后续可以通过数据库查询来过滤项目相关任务
|
|
// TODO: 实现真正的项目过滤逻辑
|
|
self.task_progress.read().await.clone()
|
|
}
|
|
|
|
/// 处理循环
|
|
async fn processing_loop(&self) {
|
|
let last_task_time = std::time::Instant::now();
|
|
let mut completed_count = 0;
|
|
|
|
loop {
|
|
let status = self.status.read().await.clone();
|
|
|
|
match status {
|
|
QueueStatus::Stopped => break,
|
|
QueueStatus::Paused => {
|
|
sleep(Duration::from_secs(1)).await;
|
|
continue;
|
|
}
|
|
QueueStatus::Running => {
|
|
// 处理下一个任务
|
|
match self.process_next_task().await {
|
|
Ok(Some(_)) => {
|
|
completed_count += 1;
|
|
|
|
// 计算处理速率
|
|
let elapsed = last_task_time.elapsed();
|
|
if elapsed.as_secs() > 0 {
|
|
let rate = (completed_count as f64) / (elapsed.as_secs() as f64 / 60.0);
|
|
let mut stats = self.stats.write().await;
|
|
stats.processing_rate = rate;
|
|
}
|
|
|
|
// 任务间延迟
|
|
sleep(self.processing_delay).await;
|
|
}
|
|
Ok(None) => {
|
|
// 没有待处理任务,等待
|
|
sleep(Duration::from_secs(5)).await;
|
|
}
|
|
Err(e) => {
|
|
println!("处理任务时发生错误: {}", e);
|
|
sleep(Duration::from_secs(10)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 定期更新统计信息
|
|
if let Err(e) = self.update_stats().await {
|
|
println!("更新统计信息失败: {}", e);
|
|
}
|
|
}
|
|
|
|
println!("AI视频分类队列处理循环已退出");
|
|
}
|
|
|
|
/// 处理下一个任务
|
|
async fn process_next_task(&self) -> Result<Option<String>> {
|
|
println!("🔍 查找待处理任务...");
|
|
|
|
// 获取待处理任务
|
|
let pending_tasks = self.service.get_pending_tasks(Some(1)).await?;
|
|
|
|
if let Some(task) = pending_tasks.first() {
|
|
let task_id = task.id.clone();
|
|
println!("📋 找到待处理任务: {}", task_id);
|
|
println!("📁 任务视频文件: {}", task.video_file_path);
|
|
println!("🔢 任务优先级: {}", task.priority);
|
|
|
|
// 设置当前任务
|
|
{
|
|
let mut current_task = self.current_task.lock().await;
|
|
*current_task = Some(task_id.clone());
|
|
println!("🎯 设置当前任务: {}", task_id);
|
|
}
|
|
|
|
// 更新任务进度
|
|
println!("📊 更新任务进度: 开始处理");
|
|
self.update_task_progress(&task_id, TaskStatus::Uploading, 10.0, "开始处理").await;
|
|
|
|
let task_start_time = std::time::Instant::now();
|
|
|
|
// 处理任务
|
|
println!("🚀 开始处理分类任务: {}", task_id);
|
|
match self.service.process_classification_task(&task_id).await {
|
|
Ok(_) => {
|
|
let task_duration = task_start_time.elapsed();
|
|
self.update_task_progress(&task_id, TaskStatus::Completed, 100.0, "处理完成").await;
|
|
println!("✅ 任务处理成功: {} (耗时: {:?})", task_id, task_duration);
|
|
}
|
|
Err(e) => {
|
|
let task_duration = task_start_time.elapsed();
|
|
self.update_task_progress_with_error(&task_id, TaskStatus::Failed, e.to_string()).await;
|
|
println!("❌ 任务处理失败: {} - {} (耗时: {:?})", task_id, e, task_duration);
|
|
}
|
|
}
|
|
|
|
// 清除当前任务
|
|
{
|
|
let mut current_task = self.current_task.lock().await;
|
|
*current_task = None;
|
|
println!("🔄 清除当前任务");
|
|
}
|
|
|
|
Ok(Some(task_id))
|
|
} else {
|
|
println!("⏸️ 没有待处理任务,等待中...");
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
/// 更新任务进度
|
|
async fn update_task_progress(&self, task_id: &str, status: TaskStatus, progress: f64, step: &str) {
|
|
let mut progress_map = self.task_progress.write().await;
|
|
if let Some(task_progress) = progress_map.get_mut(task_id) {
|
|
task_progress.status = status;
|
|
task_progress.progress_percentage = progress;
|
|
task_progress.current_step = step.to_string();
|
|
|
|
if task_progress.started_at.is_none() && progress > 0.0 {
|
|
task_progress.started_at = Some(chrono::Utc::now());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// 更新任务进度(带错误信息)
|
|
async fn update_task_progress_with_error(&self, task_id: &str, status: TaskStatus, error: String) {
|
|
let mut progress_map = self.task_progress.write().await;
|
|
if let Some(task_progress) = progress_map.get_mut(task_id) {
|
|
task_progress.status = status;
|
|
task_progress.error_message = Some(error);
|
|
task_progress.current_step = "处理失败".to_string();
|
|
}
|
|
}
|
|
|
|
/// 更新统计信息
|
|
async fn update_stats(&self) -> Result<()> {
|
|
let status = self.status.read().await.clone();
|
|
let current_task = self.current_task.lock().await.clone();
|
|
|
|
// 获取分类统计
|
|
let classification_stats = self.service.get_classification_stats(None).await?;
|
|
|
|
let mut stats = self.stats.write().await;
|
|
stats.status = status;
|
|
stats.total_tasks = classification_stats.total_tasks as usize;
|
|
stats.pending_tasks = classification_stats.pending_tasks as usize;
|
|
stats.processing_tasks = classification_stats.processing_tasks as usize;
|
|
stats.completed_tasks = classification_stats.completed_tasks as usize;
|
|
stats.failed_tasks = classification_stats.failed_tasks as usize;
|
|
stats.current_task_id = current_task;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// 克隆用于处理的实例
|
|
fn clone_for_processing(&self) -> Self {
|
|
Self {
|
|
service: Arc::clone(&self.service),
|
|
status: Arc::clone(&self.status),
|
|
current_task: Arc::clone(&self.current_task),
|
|
task_progress: Arc::clone(&self.task_progress),
|
|
stats: Arc::clone(&self.stats),
|
|
max_concurrent_tasks: self.max_concurrent_tasks,
|
|
processing_delay: self.processing_delay,
|
|
}
|
|
}
|
|
}
|