fix: 修复分镜问题
This commit is contained in:
parent
4791a50955
commit
21ba229562
|
|
@ -989,11 +989,56 @@ class MediaManager:
|
|||
existing = self.get_video_by_md5(md5_hash)
|
||||
if existing:
|
||||
logger.info(f"Video with MD5 {md5_hash} already exists")
|
||||
return {
|
||||
'original_video': existing,
|
||||
'segments': self.get_segments_by_video_id(existing['id']),
|
||||
'is_duplicate': True
|
||||
}
|
||||
existing_segments = self.get_segments_by_video_id(existing['id'])
|
||||
|
||||
# 如果已存在的视频没有分镜头,重新生成分镜头
|
||||
if not existing_segments:
|
||||
logger.info(f"Existing video has no segments, regenerating segments for video {existing['id']}")
|
||||
|
||||
# 获取存储的视频文件路径
|
||||
stored_video_path = existing['file_path']
|
||||
|
||||
# 检测场景变化
|
||||
scene_changes = self._detect_scene_changes(stored_video_path)
|
||||
logger.info(f"Detected {len(scene_changes)} scene changes for existing video")
|
||||
|
||||
# 为分镜片段处理标签:去掉"原始",添加"分镜"
|
||||
segment_tags = [tag for tag in tags if tag != "原始"] if tags else []
|
||||
if "分镜" not in segment_tags:
|
||||
segment_tags.append("分镜")
|
||||
|
||||
# 分割视频成片段
|
||||
segments = self._split_video_by_scenes(stored_video_path, scene_changes, existing['id'], segment_tags)
|
||||
|
||||
# 保存新生成的片段
|
||||
self.video_segments.extend(segments)
|
||||
self._save_video_segments()
|
||||
|
||||
# 更新原始视频的segment_count
|
||||
for i, video in enumerate(self.original_videos):
|
||||
if video.id == existing['id']:
|
||||
video.segment_count = len(segments)
|
||||
video.updated_at = datetime.now().isoformat()
|
||||
self.original_videos[i] = video
|
||||
break
|
||||
|
||||
self._save_original_videos()
|
||||
|
||||
logger.info(f"Regenerated {len(segments)} segments for existing video {existing['id']}")
|
||||
|
||||
return {
|
||||
'original_video': existing,
|
||||
'segments': [asdict(segment) if hasattr(segment, '__dict__') else segment for segment in segments],
|
||||
'is_duplicate': True,
|
||||
'segments_regenerated': True
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'original_video': existing,
|
||||
'segments': existing_segments,
|
||||
'is_duplicate': True,
|
||||
'segments_regenerated': False
|
||||
}
|
||||
|
||||
# 生成新的视频ID和文件名
|
||||
video_id = str(uuid.uuid4())
|
||||
|
|
|
|||
|
|
@ -0,0 +1,186 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
测试分镜头重新生成功能
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
# 添加项目根目录到Python路径
|
||||
project_root = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
def test_segment_regeneration():
|
||||
"""测试分镜头重新生成功能"""
|
||||
print("🎬 测试分镜头重新生成功能")
|
||||
print("=" * 50)
|
||||
|
||||
# 查找assets文件夹中的测试视频
|
||||
assets_dir = project_root / "assets"
|
||||
video_files = list(assets_dir.rglob("*.mp4"))
|
||||
|
||||
if not video_files:
|
||||
print(f"❌ 没有找到测试视频文件")
|
||||
return False
|
||||
|
||||
# 使用第一个视频文件进行测试
|
||||
test_video_path = str(video_files[0])
|
||||
|
||||
print(f"📹 使用测试视频: {test_video_path}")
|
||||
print(f" 文件大小: {os.path.getsize(test_video_path) / (1024*1024):.1f} MB")
|
||||
|
||||
print(f"📹 测试视频: {test_video_path}")
|
||||
|
||||
try:
|
||||
# 导入MediaManager
|
||||
from python_core.services.media_manager import get_media_manager
|
||||
|
||||
media_manager = get_media_manager()
|
||||
|
||||
# 首先上传视频
|
||||
print("\n📤 首次上传视频...")
|
||||
|
||||
result1 = media_manager.upload_video_file(
|
||||
source_path=test_video_path,
|
||||
filename=os.path.basename(test_video_path),
|
||||
tags=["test001", "M001", "首次上传"]
|
||||
)
|
||||
|
||||
video_id = result1['original_video']['id']
|
||||
print(f"📊 首次上传结果:")
|
||||
print(f" 视频ID: {video_id}")
|
||||
print(f" 是否重复: {result1.get('is_duplicate', False)}")
|
||||
print(f" 分镜头数量: {len(result1.get('segments', []))}")
|
||||
|
||||
# 如果首次上传就有分镜头,先清空分镜头来模拟没有分镜头的情况
|
||||
if result1.get('segments'):
|
||||
print("\n🧹 清空现有分镜头以模拟没有分镜头的情况...")
|
||||
# 清空分镜头数据
|
||||
media_manager.video_segments = [s for s in media_manager.video_segments
|
||||
if s.original_video_id != video_id]
|
||||
media_manager._save_video_segments()
|
||||
|
||||
# 检查当前的分镜头状态
|
||||
existing_segments = media_manager.get_segments_by_video_id(video_id)
|
||||
print(f"📊 当前分镜头数量: {len(existing_segments)}")
|
||||
|
||||
# 测试重新上传同一个视频(模拟重新生成分镜头)
|
||||
print("\n🔄 重新上传视频以触发分镜头重新生成...")
|
||||
|
||||
result = media_manager.upload_video_file(
|
||||
source_path=test_video_path,
|
||||
filename=os.path.basename(test_video_path),
|
||||
tags=["test001", "M001", "重新生成测试"]
|
||||
)
|
||||
|
||||
print(f"📊 上传结果:")
|
||||
print(f" 是否重复: {result.get('is_duplicate', False)}")
|
||||
print(f" 分镜头重新生成: {result.get('segments_regenerated', False)}")
|
||||
print(f" 分镜头数量: {len(result.get('segments', []))}")
|
||||
|
||||
if result.get('segments'):
|
||||
print("\n✅ 分镜头生成成功:")
|
||||
for i, segment in enumerate(result['segments']):
|
||||
print(f" 片段 {i+1}: {segment.get('filename', 'unknown')}")
|
||||
print(f" 时长: {segment.get('duration', 0):.2f}秒")
|
||||
print(f" 时间范围: {segment.get('start_time', 0):.2f}s - {segment.get('end_time', 0):.2f}s")
|
||||
|
||||
# 检查文件是否存在
|
||||
file_path = segment.get('file_path')
|
||||
if file_path and os.path.exists(file_path):
|
||||
file_size = os.path.getsize(file_path) / (1024 * 1024)
|
||||
print(f" 文件大小: {file_size:.1f} MB")
|
||||
print(f" ✅ 文件存在")
|
||||
else:
|
||||
print(f" ❌ 文件不存在: {file_path}")
|
||||
else:
|
||||
print("❌ 分镜头生成失败")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 测试过程中出错: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
def test_scene_detection_directly():
|
||||
"""直接测试场景检测功能"""
|
||||
print("\n" + "=" * 50)
|
||||
print("🎯 直接测试场景检测功能")
|
||||
print("=" * 50)
|
||||
|
||||
test_video_path = "D:\\mxivideo\\.mixvideo\\temp\\video_storage\\796407cd-1c8a-4bc1-8b1c-1b3120676143.mp4"
|
||||
|
||||
if not os.path.exists(test_video_path):
|
||||
print(f"❌ 测试视频文件不存在: {test_video_path}")
|
||||
return False
|
||||
|
||||
try:
|
||||
from python_core.services.media_manager import get_media_manager
|
||||
|
||||
media_manager = get_media_manager()
|
||||
|
||||
# 测试视频信息提取
|
||||
print("📊 提取视频信息...")
|
||||
video_info = media_manager._get_video_info(test_video_path)
|
||||
print(f" 时长: {video_info.get('duration', 0):.2f}秒")
|
||||
print(f" 分辨率: {video_info.get('width', 0)}x{video_info.get('height', 0)}")
|
||||
print(f" 帧率: {video_info.get('fps', 0):.2f} FPS")
|
||||
|
||||
# 测试场景检测
|
||||
print("\n🎯 检测场景变化...")
|
||||
scene_changes = media_manager._detect_scene_changes(test_video_path, threshold=30.0)
|
||||
print(f" 检测到 {len(scene_changes)} 个场景变化点")
|
||||
print(f" 场景时间点: {[f'{t:.2f}s' for t in scene_changes]}")
|
||||
|
||||
# 测试依赖可用性
|
||||
print("\n🔍 检查依赖...")
|
||||
opencv_available = media_manager.dependency_manager.is_available('opencv')
|
||||
scenedetect_available = media_manager.dependency_manager.is_available('scenedetect')
|
||||
|
||||
print(f" OpenCV: {'✅' if opencv_available else '❌'}")
|
||||
print(f" PySceneDetect: {'✅' if scenedetect_available else '❌'}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 场景检测测试失败: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("🚀 分镜头重新生成测试")
|
||||
|
||||
# 检查环境
|
||||
print(f"Python版本: {sys.version}")
|
||||
print(f"项目目录: {project_root}")
|
||||
|
||||
try:
|
||||
# 直接测试场景检测
|
||||
success1 = test_scene_detection_directly()
|
||||
|
||||
# 测试分镜头重新生成
|
||||
success2 = test_segment_regeneration()
|
||||
|
||||
if success1 and success2:
|
||||
print("\n🎉 所有测试通过!")
|
||||
return 0
|
||||
else:
|
||||
print("\n⚠️ 部分测试失败")
|
||||
return 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ 测试过程中出错: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit_code = main()
|
||||
sys.exit(exit_code)
|
||||
Loading…
Reference in New Issue