mxivideo/docs/media-manager-progress-inte...

8.7 KiB
Raw Blame History

媒体管理器进度条集成

🎯 集成目标

为媒体管理器的批量操作添加进度条支持,提升用户体验,让用户能够实时了解处理进度。

📊 集成结果

🎉 所有进度条测试通过!

✅ 进度功能验证:
   1. 继承关系正确 - ✅
   2. 进度命令识别 - ✅
   3. 参数解析正常 - ✅
   4. 进度回调工作 - ✅
   5. 命令执行正常 - ✅

🔧 核心改进

1. CLI基类升级

# 改进前普通Commander
class MediaManagerCommander(JSONRPCCommander):
    """媒体管理器命令行接口"""

# 改进后进度Commander
class MediaManagerCommander(ProgressJSONRPCCommander):
    """媒体管理器命令行接口 - 支持进度条"""

2. 进度命令识别

def _is_progressive_command(self, command: str) -> bool:
    """判断是否需要进度报告的命令"""
    # 上传操作需要进度报告
    return command in ["upload", "batch_upload"]

进度命令:

  • upload - 单个文件上传7个步骤进度
  • batch_upload - 批量文件上传(文件级进度)

非进度命令:

  • get_all_segments - 查询操作
  • search_segments - 搜索操作
  • delete_segment - 删除操作

3. 单个上传进度

def _upload_with_progress(self, media_manager, source_path: str, filename: str, tags: list) -> dict:
    """带进度的单个上传"""
    with self.create_task("上传视频文件", 5) as task:
        def progress_callback(message: str):
            # 根据消息更新进度
            if "计算文件哈希" in message:
                task.update(0, message)
            elif "检查重复文件" in message:
                task.update(1, message)
            elif "复制文件" in message:
                task.update(2, message)
            elif "提取视频信息" in message:
                task.update(3, message)
            elif "检测场景变化" in message:
                task.update(4, message)
            elif "分割视频" in message:
                task.update(5, message)
            elif "保存数据" in message:
                task.update(6, message)
        
        result = media_manager.upload_video_file(
            source_path, filename, tags, progress_callback
        )
        
        task.finish("上传完成")
        return asdict(result)

单个上传的7个步骤:

  1. 📊 计算文件哈希...
  2. 📊 检查重复文件...
  3. 📊 复制文件到存储目录...
  4. 📊 提取视频信息...
  5. 📊 检测场景变化...
  6. 📊 分割视频成片段...
  7. 📊 保存数据...

4. 批量上传进度

def _batch_upload_with_progress(self, media_manager, source_directory: str, tags: list) -> dict:
    """带进度的批量上传"""
    # 先扫描所有视频文件
    video_files = []
    for root, _, files in os.walk(source_directory):
        for file in files:
            file_ext = os.path.splitext(file)[1].lower()
            if file_ext in video_extensions:
                video_files.append(os.path.join(root, file))
    
    # 使用进度任务
    with self.create_task("批量上传视频", len(video_files)) as task:
        for i, file_path in enumerate(video_files):
            filename = os.path.basename(file_path)
            task.update(i, f"处理文件: {filename}")
            
            # 处理每个文件...
            
        task.finish(f"批量上传完成: {result['uploaded_files']} 成功")

批量上传特点:

  • 📊 文件级进度显示
  • 📈 实时统计:成功/跳过/失败
  • 🔄 错误处理:单个文件失败不影响整体
  • 📝 详细报告:每个文件的处理结果

5. MediaManager进度回调

def upload_video_file(self, source_path: str, filename: str = None, tags: List[str] = None, 
                     progress_callback=None) -> UploadResult:
    """上传单个视频文件并分割成片段"""
    
    # 进度回调
    def report_progress(message: str):
        if progress_callback:
            progress_callback(message)

    report_progress("计算文件哈希...")
    # 计算MD5
    md5_hash = self.video_processor.calculate_hash(source_path)

    report_progress("检查重复文件...")
    # 检查是否已存在相同MD5的视频
    existing = self.storage.get_video_by_md5(self.original_videos, md5_hash)
    
    # ... 其他步骤

🚀 用户体验提升

1. 实时进度反馈

📊 进度: 计算文件哈希...
📊 进度: 检查重复文件...
📊 进度: 复制文件到存储目录...
📊 进度: 提取视频信息...
📊 进度: 检测场景变化...
📊 进度: 分割视频成片段...
📊 进度: 保存数据...
✅ 上传完成: 新文件

2. JSON-RPC进度协议

{
  "jsonrpc": "2.0",
  "method": "progress",
  "params": {
    "step": "media_manager",
    "progress": 0.6,
    "message": "检测场景变化...",
    "details": {
      "current": 3,
      "total": 5,
      "elapsed_time": 2.5,
      "estimated_remaining": 1.2
    }
  }
}

3. 批量操作统计

{
  "total_files": 10,
  "uploaded_files": 8,
  "skipped_files": 1,
  "failed_files": 1,
  "total_segments": 24,
  "uploaded_list": [...],
  "skipped_list": [{"filename": "duplicate.mp4", "reason": "Already exists"}],
  "failed_list": [{"filename": "corrupt.mp4", "error": "Invalid format"}]
}

📈 性能和体验对比

改进前的问题

  • 无进度反馈 - 用户不知道处理进度
  • 批量操作黑盒 - 大量文件处理时无响应
  • 错误不明确 - 失败时缺少详细信息
  • 用户体验差 - 长时间等待无反馈

改进后的优势

  • 实时进度 - 每个步骤都有进度反馈
  • 批量可视化 - 文件级进度显示
  • 错误透明 - 详细的错误信息和统计
  • 用户友好 - 清晰的状态和预期时间

🎯 使用示例

1. 单个文件上传

# 命令行使用
python -m python_core.services.media_manager upload video.mp4 --tags 测试

# 进度输出
📊 进度: 计算文件哈希...
📊 进度: 检查重复文件...
📊 进度: 复制文件到存储目录...
📊 进度: 提取视频信息...
📊 进度: 检测场景变化...
📊 进度: 分割视频成片段...
📊 进度: 保存数据...
✅ 上传完成

2. 批量文件上传

# 命令行使用
python -m python_core.services.media_manager batch_upload /path/to/videos --tags 批量

# 进度输出
📊 进度: 处理文件: video1.mp4 (1/10)
📊 进度: 处理文件: video2.mp4 (2/10)
📊 进度: 处理文件: video3.mp4 (3/10)
...
✅ 批量上传完成: 8 成功, 1 跳过, 1 失败

3. 编程接口使用

from python_core.services.media_manager import MediaManagerCommander

# 创建Commander
commander = MediaManagerCommander()

# 执行带进度的命令
result = commander.execute_command("upload", {
    "source_path": "video.mp4",
    "tags": "测试,进度条"
})

🔧 技术实现细节

1. 进度任务管理

with self.create_task("任务名称", 总步数) as task:
    for i in range(总步数):
        # 执行工作
        task.update(i, f"步骤 {i}")
    task.finish("任务完成")

2. 进度回调机制

def progress_callback(message: str):
    # 根据消息内容更新进度
    if "关键词" in message:
        task.update(步骤号, message)

# 传递回调给业务逻辑
manager.upload_video_file(path, callback=progress_callback)

3. 错误处理和统计

try:
    result = process_file(file_path)
    success_count += 1
except Exception as e:
    failed_list.append({"filename": filename, "error": str(e)})
    failed_count += 1

🎉 总结

集成成果

  • 进度可视化 - 所有长时间操作都有进度显示
  • 用户体验 - 实时反馈,减少等待焦虑
  • 错误透明 - 详细的错误信息和处理统计
  • 标准协议 - 使用JSON-RPC 2.0进度协议
  • 向后兼容 - 保持原有API不变

技术特点

  • 🎯 智能识别 - 自动区分需要进度的命令
  • 🔄 回调机制 - 灵活的进度回调系统
  • 📊 多级进度 - 支持任务级和步骤级进度
  • 🛡️ 错误恢复 - 单个失败不影响整体处理

实际价值

  • 💡 提升体验 - 用户知道系统在工作
  • 🚀 提高效率 - 可以预估完成时间
  • 🔍 便于调试 - 详细的处理日志
  • 📈 数据洞察 - 处理统计和性能分析

通过集成进度条功能,媒体管理器从一个"黑盒"工具变成了一个透明、友好的用户界面,大大提升了用户体验!


进度条集成 - 让长时间操作变得可视化、可预期、用户友好!