This commit is contained in:
root 2025-07-11 20:09:11 +08:00
parent 8474e5004b
commit b81a2eb54c
2 changed files with 261 additions and 9 deletions

View File

@ -305,7 +305,7 @@ class PySceneDetectSceneDetector(SceneDetector):
video_duration = float(duration_obj)
else:
# 回退方案:从文件路径获取时长
video_duration = self._get_video_duration_fallback(file_path)
video_duration = self._get_video_duration_from_file(file_path)
if video_duration > 0:
scene_changes.append(video_duration)
@ -313,7 +313,7 @@ class PySceneDetectSceneDetector(SceneDetector):
except Exception as e:
logger.warning(f"Failed to get video duration from PySceneDetect: {e}")
# 回退方案:从文件路径获取时长
video_duration = self._get_video_duration_fallback(file_path)
video_duration = self._get_video_duration_from_file(file_path)
if video_duration > 0:
scene_changes.append(video_duration)
logger.info(f"Using fallback video duration: {video_duration:.2f}s")
@ -330,15 +330,28 @@ class PySceneDetectSceneDetector(SceneDetector):
logger.error(f"PySceneDetect failed: {e}")
raise
def _get_video_duration_fallback(self, file_path: str) -> float:
"""获取视频时长的回退方案"""
def _get_video_duration_from_file(self, file_path: str) -> float:
"""从文件获取视频时长"""
try:
# 使用视频信息提取器获取时长
video_info = self.video_info_extractor.extract_video_info(file_path)
return video_info.get('duration', 0.0)
except Exception as e:
logger.warning(f"Fallback duration extraction failed: {e}")
# 使用OpenCV获取时长
if self.dependency_manager.is_available('opencv'):
cv2 = self.dependency_manager.get_module('opencv', 'cv2')
cap = cv2.VideoCapture(file_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if fps > 0:
duration = frame_count / fps
return duration
# 如果OpenCV不可用返回0
return 0.0
except Exception as e:
logger.warning(f"Failed to get duration from file: {e}")
return 0.0
class OpenCVSceneDetector(SceneDetector):
"""使用OpenCV进行场景检测"""
@ -949,6 +962,16 @@ class MediaManager:
except:
return [0.0]
def _get_video_duration_fallback(self, file_path: str) -> float:
"""获取视频时长的回退方案"""
try:
# 使用视频信息提取器获取时长
video_info = self.video_info_extractor.extract_video_info(file_path)
return video_info.get('duration', 0.0)
except Exception as e:
logger.warning(f"Fallback duration extraction failed: {e}")
return 0.0
def _generate_thumbnail(self, video_path: str, timestamp: float, output_path: str) -> bool:

View File

@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
测试PySceneDetect duration修复
"""
import os
import sys
import tempfile
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_pyscenedetect_duration_fix():
"""测试PySceneDetect duration获取修复"""
print("🔧 测试PySceneDetect duration获取修复")
print("=" * 50)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("❌ 没有找到测试视频")
return False
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
try:
from python_core.services.media_manager import get_media_manager
media_manager = get_media_manager()
# 检查依赖
scenedetect_available = media_manager.dependency_manager.is_available('scenedetect')
print(f"🔍 PySceneDetect: {'' if scenedetect_available else ''}")
if not scenedetect_available:
print("⚠️ PySceneDetect不可用跳过此测试")
return True
# 测试场景检测这会触发duration获取
print(f"\n🎯 测试场景检测...")
scene_changes = media_manager._detect_scene_changes(test_video, threshold=30.0)
print(f"✅ 场景检测结果:")
print(f" 场景变化点数量: {len(scene_changes)}")
print(f" 时间点: {[f'{t:.2f}s' for t in scene_changes]}")
# 验证结果
if len(scene_changes) >= 2:
print(f"✅ 场景检测正常 - 包含开始和结束时间")
return True
else:
print(f"❌ 场景检测异常 - 只有 {len(scene_changes)} 个时间点")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_fallback_duration():
"""测试回退方案的时长获取"""
print("\n" + "=" * 50)
print("🔄 测试回退方案的时长获取")
print("=" * 50)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("❌ 没有找到测试视频")
return False
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
try:
from python_core.services.media_manager import get_media_manager
media_manager = get_media_manager()
# 测试回退方案
print(f"\n🔄 测试回退时长获取...")
duration = media_manager._get_video_duration_fallback(test_video)
print(f"✅ 回退时长获取结果:")
print(f" 视频时长: {duration:.2f}")
if duration > 0:
print(f"✅ 回退方案正常工作")
return True
else:
print(f"❌ 回退方案失败")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_complete_upload_flow():
"""测试完整的上传流程"""
print("\n" + "=" * 50)
print("🎬 测试完整的上传流程")
print("=" * 50)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("❌ 没有找到测试视频")
return False
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
# 创建临时目录
temp_dir = tempfile.mkdtemp(prefix="video_test_")
print(f"📁 临时目录: {temp_dir}")
try:
from unittest.mock import patch
with patch('python_core.config.settings') as mock_settings:
mock_settings.temp_dir = Path(temp_dir)
from python_core.services.media_manager import MediaManager
media_manager = MediaManager()
# 上传视频
print(f"\n📤 上传视频...")
result = media_manager.upload_video_file(
source_path=test_video,
filename=os.path.basename(test_video),
tags=["测试", "duration修复"]
)
print(f"📊 上传结果:")
print(f" 是否重复: {result.get('is_duplicate', False)}")
print(f" 分镜头数量: {len(result.get('segments', []))}")
segments = result.get('segments', [])
if segments:
print(f"\n✅ 分镜头生成成功:")
total_duration = 0
for i, segment in enumerate(segments):
if isinstance(segment, dict):
duration = segment.get('duration', 0)
start_time = segment.get('start_time', 0)
end_time = segment.get('end_time', 0)
filename = segment.get('filename', 'unknown')
else:
duration = segment.duration
start_time = segment.start_time
end_time = segment.end_time
filename = segment.filename
print(f" 片段 {i+1}: {filename}")
print(f" 时长: {duration:.2f}")
print(f" 时间范围: {start_time:.2f}s - {end_time:.2f}s")
total_duration += duration
print(f"\n📊 统计:")
print(f" 片段总时长: {total_duration:.2f}")
return True
else:
print(f"❌ 分镜头生成失败")
return False
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
finally:
# 清理临时目录
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
def main():
"""主函数"""
print("🚀 PySceneDetect Duration修复验证测试")
try:
# 测试PySceneDetect duration修复
success1 = test_pyscenedetect_duration_fix()
# 测试回退方案
success2 = test_fallback_duration()
# 测试完整流程
success3 = test_complete_upload_flow()
print("\n" + "=" * 50)
print("📊 测试总结")
print("=" * 50)
if success1 and success2 and success3:
print("🎉 所有测试通过Duration修复成功")
print("\n✅ 修复要点:")
print(" 1. 处理PySceneDetect返回的不同duration格式")
print(" 2. 添加回退方案获取视频时长")
print(" 3. 确保场景检测始终包含结束时间")
print(" 4. 完整的错误处理和日志记录")
return 0
else:
print("⚠️ 部分测试失败,需要进一步检查")
return 1
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)