diff --git a/python_core/services/media_manager.py b/python_core/services/media_manager.py index d32627b..0e3762c 100644 --- a/python_core/services/media_manager.py +++ b/python_core/services/media_manager.py @@ -305,7 +305,7 @@ class PySceneDetectSceneDetector(SceneDetector): video_duration = float(duration_obj) else: # 回退方案:从文件路径获取时长 - video_duration = self._get_video_duration_fallback(file_path) + video_duration = self._get_video_duration_from_file(file_path) if video_duration > 0: scene_changes.append(video_duration) @@ -313,7 +313,7 @@ class PySceneDetectSceneDetector(SceneDetector): except Exception as e: logger.warning(f"Failed to get video duration from PySceneDetect: {e}") # 回退方案:从文件路径获取时长 - video_duration = self._get_video_duration_fallback(file_path) + video_duration = self._get_video_duration_from_file(file_path) if video_duration > 0: scene_changes.append(video_duration) logger.info(f"Using fallback video duration: {video_duration:.2f}s") @@ -330,15 +330,28 @@ class PySceneDetectSceneDetector(SceneDetector): logger.error(f"PySceneDetect failed: {e}") raise - def _get_video_duration_fallback(self, file_path: str) -> float: - """获取视频时长的回退方案""" + def _get_video_duration_from_file(self, file_path: str) -> float: + """从文件获取视频时长""" try: - # 使用视频信息提取器获取时长 - video_info = self.video_info_extractor.extract_video_info(file_path) - return video_info.get('duration', 0.0) - except Exception as e: - logger.warning(f"Fallback duration extraction failed: {e}") + # 使用OpenCV获取时长 + if self.dependency_manager.is_available('opencv'): + cv2 = self.dependency_manager.get_module('opencv', 'cv2') + cap = cv2.VideoCapture(file_path) + fps = cap.get(cv2.CAP_PROP_FPS) + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + + if fps > 0: + duration = frame_count / fps + return duration + + # 如果OpenCV不可用,返回0 return 0.0 + except Exception as e: + logger.warning(f"Failed to get duration from file: {e}") + return 0.0 + + class OpenCVSceneDetector(SceneDetector): """使用OpenCV进行场景检测""" @@ -949,6 +962,16 @@ class MediaManager: except: return [0.0] + def _get_video_duration_fallback(self, file_path: str) -> float: + """获取视频时长的回退方案""" + try: + # 使用视频信息提取器获取时长 + video_info = self.video_info_extractor.extract_video_info(file_path) + return video_info.get('duration', 0.0) + except Exception as e: + logger.warning(f"Fallback duration extraction failed: {e}") + return 0.0 + def _generate_thumbnail(self, video_path: str, timestamp: float, output_path: str) -> bool: diff --git a/scripts/test_pyscenedetect_fix.py b/scripts/test_pyscenedetect_fix.py new file mode 100644 index 0000000..11eef20 --- /dev/null +++ b/scripts/test_pyscenedetect_fix.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +""" +测试PySceneDetect duration修复 +""" + +import os +import sys +import tempfile +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +def test_pyscenedetect_duration_fix(): + """测试PySceneDetect duration获取修复""" + print("🔧 测试PySceneDetect duration获取修复") + print("=" * 50) + + # 查找测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print("❌ 没有找到测试视频") + return False + + test_video = str(video_files[0]) + print(f"📹 测试视频: {test_video}") + + try: + from python_core.services.media_manager import get_media_manager + + media_manager = get_media_manager() + + # 检查依赖 + scenedetect_available = media_manager.dependency_manager.is_available('scenedetect') + + print(f"🔍 PySceneDetect: {'✅' if scenedetect_available else '❌'}") + + if not scenedetect_available: + print("⚠️ PySceneDetect不可用,跳过此测试") + return True + + # 测试场景检测(这会触发duration获取) + print(f"\n🎯 测试场景检测...") + scene_changes = media_manager._detect_scene_changes(test_video, threshold=30.0) + + print(f"✅ 场景检测结果:") + print(f" 场景变化点数量: {len(scene_changes)}") + print(f" 时间点: {[f'{t:.2f}s' for t in scene_changes]}") + + # 验证结果 + if len(scene_changes) >= 2: + print(f"✅ 场景检测正常 - 包含开始和结束时间") + return True + else: + print(f"❌ 场景检测异常 - 只有 {len(scene_changes)} 个时间点") + return False + + except Exception as e: + print(f"❌ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + +def test_fallback_duration(): + """测试回退方案的时长获取""" + print("\n" + "=" * 50) + print("🔄 测试回退方案的时长获取") + print("=" * 50) + + # 查找测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print("❌ 没有找到测试视频") + return False + + test_video = str(video_files[0]) + print(f"📹 测试视频: {test_video}") + + try: + from python_core.services.media_manager import get_media_manager + + media_manager = get_media_manager() + + # 测试回退方案 + print(f"\n🔄 测试回退时长获取...") + duration = media_manager._get_video_duration_fallback(test_video) + + print(f"✅ 回退时长获取结果:") + print(f" 视频时长: {duration:.2f}秒") + + if duration > 0: + print(f"✅ 回退方案正常工作") + return True + else: + print(f"❌ 回退方案失败") + return False + + except Exception as e: + print(f"❌ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + +def test_complete_upload_flow(): + """测试完整的上传流程""" + print("\n" + "=" * 50) + print("🎬 测试完整的上传流程") + print("=" * 50) + + # 查找测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print("❌ 没有找到测试视频") + return False + + test_video = str(video_files[0]) + print(f"📹 测试视频: {test_video}") + + # 创建临时目录 + temp_dir = tempfile.mkdtemp(prefix="video_test_") + print(f"📁 临时目录: {temp_dir}") + + try: + from unittest.mock import patch + + with patch('python_core.config.settings') as mock_settings: + mock_settings.temp_dir = Path(temp_dir) + + from python_core.services.media_manager import MediaManager + + media_manager = MediaManager() + + # 上传视频 + print(f"\n📤 上传视频...") + result = media_manager.upload_video_file( + source_path=test_video, + filename=os.path.basename(test_video), + tags=["测试", "duration修复"] + ) + + print(f"📊 上传结果:") + print(f" 是否重复: {result.get('is_duplicate', False)}") + print(f" 分镜头数量: {len(result.get('segments', []))}") + + segments = result.get('segments', []) + if segments: + print(f"\n✅ 分镜头生成成功:") + total_duration = 0 + for i, segment in enumerate(segments): + if isinstance(segment, dict): + duration = segment.get('duration', 0) + start_time = segment.get('start_time', 0) + end_time = segment.get('end_time', 0) + filename = segment.get('filename', 'unknown') + else: + duration = segment.duration + start_time = segment.start_time + end_time = segment.end_time + filename = segment.filename + + print(f" 片段 {i+1}: {filename}") + print(f" 时长: {duration:.2f}秒") + print(f" 时间范围: {start_time:.2f}s - {end_time:.2f}s") + total_duration += duration + + print(f"\n📊 统计:") + print(f" 片段总时长: {total_duration:.2f}秒") + + return True + else: + print(f"❌ 分镜头生成失败") + return False + + except Exception as e: + print(f"❌ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + finally: + # 清理临时目录 + import shutil + shutil.rmtree(temp_dir, ignore_errors=True) + +def main(): + """主函数""" + print("🚀 PySceneDetect Duration修复验证测试") + + try: + # 测试PySceneDetect duration修复 + success1 = test_pyscenedetect_duration_fix() + + # 测试回退方案 + success2 = test_fallback_duration() + + # 测试完整流程 + success3 = test_complete_upload_flow() + + print("\n" + "=" * 50) + print("📊 测试总结") + print("=" * 50) + + if success1 and success2 and success3: + print("🎉 所有测试通过!Duration修复成功!") + print("\n✅ 修复要点:") + print(" 1. 处理PySceneDetect返回的不同duration格式") + print(" 2. 添加回退方案获取视频时长") + print(" 3. 确保场景检测始终包含结束时间") + print(" 4. 完整的错误处理和日志记录") + return 0 + else: + print("⚠️ 部分测试失败,需要进一步检查") + return 1 + + except Exception as e: + print(f"❌ 测试过程中出错: {e}") + import traceback + traceback.print_exc() + return 1 + +if __name__ == "__main__": + exit_code = main() + sys.exit(exit_code)