diff --git a/python_core/services/media_manager.py b/python_core/services/media_manager.py index 4c22289..d90d588 100644 --- a/python_core/services/media_manager.py +++ b/python_core/services/media_manager.py @@ -290,6 +290,14 @@ class PySceneDetectSceneDetector(SceneDetector): if end_time not in scene_changes: scene_changes.append(end_time) + # 如果没有检测到场景变化,添加视频结束时间 + if len(scene_changes) == 1: # 只有开始时间0.0 + # 获取视频时长 + video_duration = video_manager.get_duration().get_seconds() + if video_duration > 0: + scene_changes.append(video_duration) + logger.info(f"No scenes detected, using full video duration: {video_duration:.2f}s") + scene_changes = sorted(list(set(scene_changes))) video_manager.release() @@ -505,7 +513,6 @@ class OpenCVVideoSegmentCreator(VideoSegmentCreator): def _create_single_segment(self, video_path: str, original_video_id: str, tags: List[str]) -> List['VideoSegment']: """创建单个片段(当OpenCV不可用时)""" - from python_core.models.video_segment import VideoSegment import shutil import uuid from datetime import datetime @@ -615,29 +622,29 @@ class OpenCVVideoSegmentCreator(VideoSegmentCreator): from datetime import datetime now = datetime.now().isoformat() - # 这里需要导入VideoSegment,但为了避免循环导入,我们返回字典 - segment_data = { - 'id': segment_id, - 'original_video_id': original_video_id, - 'segment_index': i, - 'filename': segment_filename, - 'file_path': str(segment_path), - 'md5_hash': self.hash_calculator.calculate_hash(str(segment_path)), - 'file_size': segment_path.stat().st_size, - 'duration': duration, - 'width': width, - 'height': height, - 'fps': fps, - 'format': 'mp4', - 'start_time': start_time, - 'end_time': end_time, - 'tags': tags.copy(), - 'use_count': 0, - 'created_at': now, - 'updated_at': now - } + # 创建VideoSegment对象 + segment = VideoSegment( + id=segment_id, + original_video_id=original_video_id, + segment_index=i, + filename=segment_filename, + file_path=str(segment_path), + md5_hash=self.hash_calculator.calculate_hash(str(segment_path)), + file_size=segment_path.stat().st_size, + duration=duration, + width=width, + height=height, + fps=fps, + format='mp4', + start_time=start_time, + end_time=end_time, + tags=tags.copy(), + use_count=0, + created_at=now, + updated_at=now + ) - segments.append(segment_data) + segments.append(segment) logger.info(f"Created segment {i}: {start_time:.2f}s - {end_time:.2f}s ({duration:.2f}s)") cap.release() diff --git a/scripts/test_scene_detection_fix.py b/scripts/test_scene_detection_fix.py new file mode 100644 index 0000000..b03d053 --- /dev/null +++ b/scripts/test_scene_detection_fix.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +""" +测试修复后的场景检测功能 +""" + +import os +import sys +import tempfile +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +def test_pyscenedetect_fix(): + """测试PySceneDetect修复""" + print("🎯 测试PySceneDetect场景检测修复") + print("=" * 50) + + # 查找测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print("❌ 没有找到测试视频") + return False + + test_video = str(video_files[0]) + print(f"📹 测试视频: {test_video}") + + try: + from python_core.services.media_manager import get_media_manager + + media_manager = get_media_manager() + + # 检查依赖 + opencv_available = media_manager.dependency_manager.is_available('opencv') + scenedetect_available = media_manager.dependency_manager.is_available('scenedetect') + + print(f"🔍 依赖检查:") + print(f" OpenCV: {'✅' if opencv_available else '❌'}") + print(f" PySceneDetect: {'✅' if scenedetect_available else '❌'}") + + # 测试场景检测 + print(f"\n🎯 测试场景检测...") + scene_changes = media_manager._detect_scene_changes(test_video, threshold=30.0) + + print(f"✅ 场景检测结果:") + print(f" 场景变化点数量: {len(scene_changes)}") + print(f" 时间点: {[f'{t:.2f}s' for t in scene_changes]}") + + # 验证结果 + if len(scene_changes) >= 2: + print(f"✅ 场景检测正常 - 至少有开始和结束时间") + return True + else: + print(f"❌ 场景检测异常 - 只有 {len(scene_changes)} 个时间点") + return False + + except Exception as e: + print(f"❌ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + +def test_video_upload_with_segments(): + """测试视频上传和分镜头生成""" + print("\n" + "=" * 50) + print("🎬 测试视频上传和分镜头生成") + print("=" * 50) + + # 查找测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print("❌ 没有找到测试视频") + return False + + test_video = str(video_files[0]) + print(f"📹 测试视频: {test_video}") + + # 创建临时目录 + temp_dir = tempfile.mkdtemp(prefix="video_test_") + print(f"📁 临时目录: {temp_dir}") + + try: + from unittest.mock import patch + + with patch('python_core.config.settings') as mock_settings: + mock_settings.temp_dir = Path(temp_dir) + + from python_core.services.media_manager import MediaManager + + media_manager = MediaManager() + + # 上传视频 + print(f"\n📤 上传视频...") + result = media_manager.upload_video_file( + source_path=test_video, + filename=os.path.basename(test_video), + tags=["测试", "修复验证"] + ) + + print(f"📊 上传结果:") + print(f" 是否重复: {result.get('is_duplicate', False)}") + print(f" 分镜头数量: {len(result.get('segments', []))}") + + segments = result.get('segments', []) + if segments: + print(f"\n✅ 分镜头生成成功:") + total_duration = 0 + for i, segment in enumerate(segments): + if isinstance(segment, dict): + duration = segment.get('duration', 0) + start_time = segment.get('start_time', 0) + end_time = segment.get('end_time', 0) + filename = segment.get('filename', 'unknown') + else: + duration = segment.duration + start_time = segment.start_time + end_time = segment.end_time + filename = segment.filename + + print(f" 片段 {i+1}: {filename}") + print(f" 时长: {duration:.2f}秒") + print(f" 时间范围: {start_time:.2f}s - {end_time:.2f}s") + total_duration += duration + + print(f"\n📊 统计:") + print(f" 片段总时长: {total_duration:.2f}秒") + + return True + else: + print(f"❌ 分镜头生成失败") + return False + + except Exception as e: + print(f"❌ 测试失败: {e}") + import traceback + traceback.print_exc() + return False + finally: + # 清理临时目录 + import shutil + shutil.rmtree(temp_dir, ignore_errors=True) + +def main(): + """主函数""" + print("🚀 场景检测修复验证测试") + + try: + # 测试场景检测修复 + success1 = test_pyscenedetect_fix() + + # 测试完整的视频上传流程 + success2 = test_video_upload_with_segments() + + print("\n" + "=" * 50) + print("📊 测试总结") + print("=" * 50) + + if success1 and success2: + print("🎉 所有测试通过!场景检测修复成功!") + print("\n✅ 修复要点:") + print(" 1. PySceneDetect没有检测到场景时,自动添加视频结束时间") + print(" 2. 确保场景变化点至少包含开始和结束时间") + print(" 3. 视频分镜头功能正常工作") + return 0 + else: + print("⚠️ 部分测试失败,需要进一步检查") + return 1 + + except Exception as e: + print(f"❌ 测试过程中出错: {e}") + import traceback + traceback.print_exc() + return 1 + +if __name__ == "__main__": + exit_code = main() + sys.exit(exit_code) diff --git a/scripts/test_segment_regeneration.py b/scripts/test_segment_regeneration.py index fba8893..5f0c548 100644 --- a/scripts/test_segment_regeneration.py +++ b/scripts/test_segment_regeneration.py @@ -112,12 +112,17 @@ def test_scene_detection_directly(): print("\n" + "=" * 50) print("🎯 直接测试场景检测功能") print("=" * 50) - - test_video_path = "D:\\mxivideo\\.mixvideo\\temp\\video_storage\\796407cd-1c8a-4bc1-8b1c-1b3120676143.mp4" - - if not os.path.exists(test_video_path): - print(f"❌ 测试视频文件不存在: {test_video_path}") + + # 查找assets文件夹中的测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print(f"❌ 没有找到测试视频文件") return False + + test_video_path = str(video_files[0]) + print(f"📹 使用测试视频: {test_video_path}") try: from python_core.services.media_manager import get_media_manager