8.7 KiB
8.7 KiB
媒体管理器进度条集成
🎯 集成目标
为媒体管理器的批量操作添加进度条支持,提升用户体验,让用户能够实时了解处理进度。
📊 集成结果
🎉 所有进度条测试通过!
✅ 进度功能验证:
1. 继承关系正确 - ✅
2. 进度命令识别 - ✅
3. 参数解析正常 - ✅
4. 进度回调工作 - ✅
5. 命令执行正常 - ✅
🔧 核心改进
1. CLI基类升级
# 改进前:普通Commander
class MediaManagerCommander(JSONRPCCommander):
"""媒体管理器命令行接口"""
# 改进后:进度Commander
class MediaManagerCommander(ProgressJSONRPCCommander):
"""媒体管理器命令行接口 - 支持进度条"""
2. 进度命令识别
def _is_progressive_command(self, command: str) -> bool:
"""判断是否需要进度报告的命令"""
# 上传操作需要进度报告
return command in ["upload", "batch_upload"]
进度命令:
- ✅
upload- 单个文件上传(7个步骤进度) - ✅
batch_upload- 批量文件上传(文件级进度)
非进度命令:
- ⚡
get_all_segments- 查询操作 - ⚡
search_segments- 搜索操作 - ⚡
delete_segment- 删除操作
3. 单个上传进度
def _upload_with_progress(self, media_manager, source_path: str, filename: str, tags: list) -> dict:
"""带进度的单个上传"""
with self.create_task("上传视频文件", 5) as task:
def progress_callback(message: str):
# 根据消息更新进度
if "计算文件哈希" in message:
task.update(0, message)
elif "检查重复文件" in message:
task.update(1, message)
elif "复制文件" in message:
task.update(2, message)
elif "提取视频信息" in message:
task.update(3, message)
elif "检测场景变化" in message:
task.update(4, message)
elif "分割视频" in message:
task.update(5, message)
elif "保存数据" in message:
task.update(6, message)
result = media_manager.upload_video_file(
source_path, filename, tags, progress_callback
)
task.finish("上传完成")
return asdict(result)
单个上传的7个步骤:
- 📊 计算文件哈希...
- 📊 检查重复文件...
- 📊 复制文件到存储目录...
- 📊 提取视频信息...
- 📊 检测场景变化...
- 📊 分割视频成片段...
- 📊 保存数据...
4. 批量上传进度
def _batch_upload_with_progress(self, media_manager, source_directory: str, tags: list) -> dict:
"""带进度的批量上传"""
# 先扫描所有视频文件
video_files = []
for root, _, files in os.walk(source_directory):
for file in files:
file_ext = os.path.splitext(file)[1].lower()
if file_ext in video_extensions:
video_files.append(os.path.join(root, file))
# 使用进度任务
with self.create_task("批量上传视频", len(video_files)) as task:
for i, file_path in enumerate(video_files):
filename = os.path.basename(file_path)
task.update(i, f"处理文件: {filename}")
# 处理每个文件...
task.finish(f"批量上传完成: {result['uploaded_files']} 成功")
批量上传特点:
- 📊 文件级进度显示
- 📈 实时统计:成功/跳过/失败
- 🔄 错误处理:单个文件失败不影响整体
- 📝 详细报告:每个文件的处理结果
5. MediaManager进度回调
def upload_video_file(self, source_path: str, filename: str = None, tags: List[str] = None,
progress_callback=None) -> UploadResult:
"""上传单个视频文件并分割成片段"""
# 进度回调
def report_progress(message: str):
if progress_callback:
progress_callback(message)
report_progress("计算文件哈希...")
# 计算MD5
md5_hash = self.video_processor.calculate_hash(source_path)
report_progress("检查重复文件...")
# 检查是否已存在相同MD5的视频
existing = self.storage.get_video_by_md5(self.original_videos, md5_hash)
# ... 其他步骤
🚀 用户体验提升
1. 实时进度反馈
📊 进度: 计算文件哈希...
📊 进度: 检查重复文件...
📊 进度: 复制文件到存储目录...
📊 进度: 提取视频信息...
📊 进度: 检测场景变化...
📊 进度: 分割视频成片段...
📊 进度: 保存数据...
✅ 上传完成: 新文件
2. JSON-RPC进度协议
{
"jsonrpc": "2.0",
"method": "progress",
"params": {
"step": "media_manager",
"progress": 0.6,
"message": "检测场景变化...",
"details": {
"current": 3,
"total": 5,
"elapsed_time": 2.5,
"estimated_remaining": 1.2
}
}
}
3. 批量操作统计
{
"total_files": 10,
"uploaded_files": 8,
"skipped_files": 1,
"failed_files": 1,
"total_segments": 24,
"uploaded_list": [...],
"skipped_list": [{"filename": "duplicate.mp4", "reason": "Already exists"}],
"failed_list": [{"filename": "corrupt.mp4", "error": "Invalid format"}]
}
📈 性能和体验对比
改进前的问题
- ❌ 无进度反馈 - 用户不知道处理进度
- ❌ 批量操作黑盒 - 大量文件处理时无响应
- ❌ 错误不明确 - 失败时缺少详细信息
- ❌ 用户体验差 - 长时间等待无反馈
改进后的优势
- ✅ 实时进度 - 每个步骤都有进度反馈
- ✅ 批量可视化 - 文件级进度显示
- ✅ 错误透明 - 详细的错误信息和统计
- ✅ 用户友好 - 清晰的状态和预期时间
🎯 使用示例
1. 单个文件上传
# 命令行使用
python -m python_core.services.media_manager upload video.mp4 --tags 测试
# 进度输出
📊 进度: 计算文件哈希...
📊 进度: 检查重复文件...
📊 进度: 复制文件到存储目录...
📊 进度: 提取视频信息...
📊 进度: 检测场景变化...
📊 进度: 分割视频成片段...
📊 进度: 保存数据...
✅ 上传完成
2. 批量文件上传
# 命令行使用
python -m python_core.services.media_manager batch_upload /path/to/videos --tags 批量
# 进度输出
📊 进度: 处理文件: video1.mp4 (1/10)
📊 进度: 处理文件: video2.mp4 (2/10)
📊 进度: 处理文件: video3.mp4 (3/10)
...
✅ 批量上传完成: 8 成功, 1 跳过, 1 失败
3. 编程接口使用
from python_core.services.media_manager import MediaManagerCommander
# 创建Commander
commander = MediaManagerCommander()
# 执行带进度的命令
result = commander.execute_command("upload", {
"source_path": "video.mp4",
"tags": "测试,进度条"
})
🔧 技术实现细节
1. 进度任务管理
with self.create_task("任务名称", 总步数) as task:
for i in range(总步数):
# 执行工作
task.update(i, f"步骤 {i}")
task.finish("任务完成")
2. 进度回调机制
def progress_callback(message: str):
# 根据消息内容更新进度
if "关键词" in message:
task.update(步骤号, message)
# 传递回调给业务逻辑
manager.upload_video_file(path, callback=progress_callback)
3. 错误处理和统计
try:
result = process_file(file_path)
success_count += 1
except Exception as e:
failed_list.append({"filename": filename, "error": str(e)})
failed_count += 1
🎉 总结
集成成果
- ✅ 进度可视化 - 所有长时间操作都有进度显示
- ✅ 用户体验 - 实时反馈,减少等待焦虑
- ✅ 错误透明 - 详细的错误信息和处理统计
- ✅ 标准协议 - 使用JSON-RPC 2.0进度协议
- ✅ 向后兼容 - 保持原有API不变
技术特点
- 🎯 智能识别 - 自动区分需要进度的命令
- 🔄 回调机制 - 灵活的进度回调系统
- 📊 多级进度 - 支持任务级和步骤级进度
- 🛡️ 错误恢复 - 单个失败不影响整体处理
实际价值
- 💡 提升体验 - 用户知道系统在工作
- 🚀 提高效率 - 可以预估完成时间
- 🔍 便于调试 - 详细的处理日志
- 📈 数据洞察 - 处理统计和性能分析
通过集成进度条功能,媒体管理器从一个"黑盒"工具变成了一个透明、友好的用户界面,大大提升了用户体验!
进度条集成 - 让长时间操作变得可视化、可预期、用户友好!