diff --git a/python_core/services/media_manager.py b/python_core/services/media_manager.py index 0f7fd0c..4c22289 100644 --- a/python_core/services/media_manager.py +++ b/python_core/services/media_manager.py @@ -989,11 +989,56 @@ class MediaManager: existing = self.get_video_by_md5(md5_hash) if existing: logger.info(f"Video with MD5 {md5_hash} already exists") - return { - 'original_video': existing, - 'segments': self.get_segments_by_video_id(existing['id']), - 'is_duplicate': True - } + existing_segments = self.get_segments_by_video_id(existing['id']) + + # 如果已存在的视频没有分镜头,重新生成分镜头 + if not existing_segments: + logger.info(f"Existing video has no segments, regenerating segments for video {existing['id']}") + + # 获取存储的视频文件路径 + stored_video_path = existing['file_path'] + + # 检测场景变化 + scene_changes = self._detect_scene_changes(stored_video_path) + logger.info(f"Detected {len(scene_changes)} scene changes for existing video") + + # 为分镜片段处理标签:去掉"原始",添加"分镜" + segment_tags = [tag for tag in tags if tag != "原始"] if tags else [] + if "分镜" not in segment_tags: + segment_tags.append("分镜") + + # 分割视频成片段 + segments = self._split_video_by_scenes(stored_video_path, scene_changes, existing['id'], segment_tags) + + # 保存新生成的片段 + self.video_segments.extend(segments) + self._save_video_segments() + + # 更新原始视频的segment_count + for i, video in enumerate(self.original_videos): + if video.id == existing['id']: + video.segment_count = len(segments) + video.updated_at = datetime.now().isoformat() + self.original_videos[i] = video + break + + self._save_original_videos() + + logger.info(f"Regenerated {len(segments)} segments for existing video {existing['id']}") + + return { + 'original_video': existing, + 'segments': [asdict(segment) if hasattr(segment, '__dict__') else segment for segment in segments], + 'is_duplicate': True, + 'segments_regenerated': True + } + else: + return { + 'original_video': existing, + 'segments': existing_segments, + 'is_duplicate': True, + 'segments_regenerated': False + } # 生成新的视频ID和文件名 video_id = str(uuid.uuid4()) diff --git a/scripts/test_segment_regeneration.py b/scripts/test_segment_regeneration.py new file mode 100644 index 0000000..fba8893 --- /dev/null +++ b/scripts/test_segment_regeneration.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +测试分镜头重新生成功能 +""" + +import os +import sys +import json +from pathlib import Path + +# 添加项目根目录到Python路径 +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +def test_segment_regeneration(): + """测试分镜头重新生成功能""" + print("🎬 测试分镜头重新生成功能") + print("=" * 50) + + # 查找assets文件夹中的测试视频 + assets_dir = project_root / "assets" + video_files = list(assets_dir.rglob("*.mp4")) + + if not video_files: + print(f"❌ 没有找到测试视频文件") + return False + + # 使用第一个视频文件进行测试 + test_video_path = str(video_files[0]) + + print(f"📹 使用测试视频: {test_video_path}") + print(f" 文件大小: {os.path.getsize(test_video_path) / (1024*1024):.1f} MB") + + print(f"📹 测试视频: {test_video_path}") + + try: + # 导入MediaManager + from python_core.services.media_manager import get_media_manager + + media_manager = get_media_manager() + + # 首先上传视频 + print("\n📤 首次上传视频...") + + result1 = media_manager.upload_video_file( + source_path=test_video_path, + filename=os.path.basename(test_video_path), + tags=["test001", "M001", "首次上传"] + ) + + video_id = result1['original_video']['id'] + print(f"📊 首次上传结果:") + print(f" 视频ID: {video_id}") + print(f" 是否重复: {result1.get('is_duplicate', False)}") + print(f" 分镜头数量: {len(result1.get('segments', []))}") + + # 如果首次上传就有分镜头,先清空分镜头来模拟没有分镜头的情况 + if result1.get('segments'): + print("\n🧹 清空现有分镜头以模拟没有分镜头的情况...") + # 清空分镜头数据 + media_manager.video_segments = [s for s in media_manager.video_segments + if s.original_video_id != video_id] + media_manager._save_video_segments() + + # 检查当前的分镜头状态 + existing_segments = media_manager.get_segments_by_video_id(video_id) + print(f"📊 当前分镜头数量: {len(existing_segments)}") + + # 测试重新上传同一个视频(模拟重新生成分镜头) + print("\n🔄 重新上传视频以触发分镜头重新生成...") + + result = media_manager.upload_video_file( + source_path=test_video_path, + filename=os.path.basename(test_video_path), + tags=["test001", "M001", "重新生成测试"] + ) + + print(f"📊 上传结果:") + print(f" 是否重复: {result.get('is_duplicate', False)}") + print(f" 分镜头重新生成: {result.get('segments_regenerated', False)}") + print(f" 分镜头数量: {len(result.get('segments', []))}") + + if result.get('segments'): + print("\n✅ 分镜头生成成功:") + for i, segment in enumerate(result['segments']): + print(f" 片段 {i+1}: {segment.get('filename', 'unknown')}") + print(f" 时长: {segment.get('duration', 0):.2f}秒") + print(f" 时间范围: {segment.get('start_time', 0):.2f}s - {segment.get('end_time', 0):.2f}s") + + # 检查文件是否存在 + file_path = segment.get('file_path') + if file_path and os.path.exists(file_path): + file_size = os.path.getsize(file_path) / (1024 * 1024) + print(f" 文件大小: {file_size:.1f} MB") + print(f" ✅ 文件存在") + else: + print(f" ❌ 文件不存在: {file_path}") + else: + print("❌ 分镜头生成失败") + return False + + return True + + except Exception as e: + print(f"❌ 测试过程中出错: {e}") + import traceback + traceback.print_exc() + return False + +def test_scene_detection_directly(): + """直接测试场景检测功能""" + print("\n" + "=" * 50) + print("🎯 直接测试场景检测功能") + print("=" * 50) + + test_video_path = "D:\\mxivideo\\.mixvideo\\temp\\video_storage\\796407cd-1c8a-4bc1-8b1c-1b3120676143.mp4" + + if not os.path.exists(test_video_path): + print(f"❌ 测试视频文件不存在: {test_video_path}") + return False + + try: + from python_core.services.media_manager import get_media_manager + + media_manager = get_media_manager() + + # 测试视频信息提取 + print("📊 提取视频信息...") + video_info = media_manager._get_video_info(test_video_path) + print(f" 时长: {video_info.get('duration', 0):.2f}秒") + print(f" 分辨率: {video_info.get('width', 0)}x{video_info.get('height', 0)}") + print(f" 帧率: {video_info.get('fps', 0):.2f} FPS") + + # 测试场景检测 + print("\n🎯 检测场景变化...") + scene_changes = media_manager._detect_scene_changes(test_video_path, threshold=30.0) + print(f" 检测到 {len(scene_changes)} 个场景变化点") + print(f" 场景时间点: {[f'{t:.2f}s' for t in scene_changes]}") + + # 测试依赖可用性 + print("\n🔍 检查依赖...") + opencv_available = media_manager.dependency_manager.is_available('opencv') + scenedetect_available = media_manager.dependency_manager.is_available('scenedetect') + + print(f" OpenCV: {'✅' if opencv_available else '❌'}") + print(f" PySceneDetect: {'✅' if scenedetect_available else '❌'}") + + return True + + except Exception as e: + print(f"❌ 场景检测测试失败: {e}") + import traceback + traceback.print_exc() + return False + +def main(): + """主函数""" + print("🚀 分镜头重新生成测试") + + # 检查环境 + print(f"Python版本: {sys.version}") + print(f"项目目录: {project_root}") + + try: + # 直接测试场景检测 + success1 = test_scene_detection_directly() + + # 测试分镜头重新生成 + success2 = test_segment_regeneration() + + if success1 and success2: + print("\n🎉 所有测试通过!") + return 0 + else: + print("\n⚠️ 部分测试失败") + return 1 + + except Exception as e: + print(f"\n❌ 测试过程中出错: {e}") + import traceback + traceback.print_exc() + return 1 + +if __name__ == "__main__": + exit_code = main() + sys.exit(exit_code)