mxivideo/scripts/test_video_splitter.py

244 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
测试PySceneDetect视频拆分服务
"""
import os
import sys
import tempfile
import shutil
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
def test_video_splitter_service():
"""测试视频拆分服务"""
print("🎬 测试PySceneDetect视频拆分服务")
print("=" * 60)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("❌ 没有找到测试视频文件")
return False
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
print(f" 文件大小: {os.path.getsize(test_video) / (1024*1024):.1f} MB")
# 创建临时输出目录
temp_dir = tempfile.mkdtemp(prefix="video_splitter_test_")
print(f"📁 临时输出目录: {temp_dir}")
try:
from python_core.services.video_splitter import VideoSplitterService, SCENEDETECT_AVAILABLE
if not SCENEDETECT_AVAILABLE:
print("❌ PySceneDetect不可用跳过测试")
return False
# 创建视频拆分服务
splitter = VideoSplitterService(output_base_dir=temp_dir)
print("✅ 视频拆分服务创建成功")
# 1. 测试视频分析
print(f"\n🔍 步骤1: 分析视频...")
analysis_result = splitter.analyze_video(test_video, threshold=30.0)
if analysis_result["success"]:
print(f"✅ 视频分析成功:")
print(f" 总场景数: {analysis_result['total_scenes']}")
print(f" 总时长: {analysis_result['total_duration']:.2f}")
print(f" 平均场景时长: {analysis_result['average_scene_duration']:.2f}")
# 显示场景详情
scenes = analysis_result["scenes"]
for i, scene in enumerate(scenes[:3]): # 只显示前3个场景
print(f" 场景 {scene['scene_number']}: {scene['start_time']:.2f}s - {scene['end_time']:.2f}s ({scene['duration']:.2f}s)")
if len(scenes) > 3:
print(f" ... 还有 {len(scenes) - 3} 个场景")
else:
print(f"❌ 视频分析失败: {analysis_result.get('error', 'Unknown error')}")
return False
# 2. 测试场景检测
print(f"\n🎯 步骤2: 检测场景...")
scenes = splitter.detect_scenes(test_video, threshold=30.0, detector_type="content")
print(f"✅ 场景检测成功:")
print(f" 检测到 {len(scenes)} 个场景")
for scene in scenes:
print(f" 场景 {scene.scene_number}: {scene.start_time:.2f}s - {scene.end_time:.2f}s ({scene.duration:.2f}s)")
# 3. 测试视频拆分
print(f"\n✂️ 步骤3: 拆分视频...")
split_result = splitter.split_video(
video_path=test_video,
scenes=scenes, # 使用已检测的场景
threshold=30.0,
detector_type="content"
)
if split_result.success:
print(f"✅ 视频拆分成功:")
print(f" 输出目录: {split_result.output_directory}")
print(f" 创建文件数: {len(split_result.output_files)}")
print(f" 总场景数: {split_result.total_scenes}")
print(f" 总时长: {split_result.total_duration:.2f}")
print(f" 处理时间: {split_result.processing_time:.2f}")
# 验证输出文件
print(f"\n📁 输出文件验证:")
total_size = 0
for i, output_file in enumerate(split_result.output_files):
if os.path.exists(output_file):
file_size = os.path.getsize(output_file) / (1024 * 1024)
total_size += file_size
print(f"{os.path.basename(output_file)}: {file_size:.1f} MB")
else:
print(f"{os.path.basename(output_file)}: 文件不存在")
print(f" 📊 总输出大小: {total_size:.1f} MB")
# 检查场景信息文件
scenes_info_file = Path(split_result.output_directory) / "scenes_info.json"
if scenes_info_file.exists():
print(f" ✅ 场景信息文件: {scenes_info_file}")
# 读取并显示场景信息
import json
with open(scenes_info_file, 'r', encoding='utf-8') as f:
scenes_data = json.load(f)
print(f" 📊 场景信息摘要:")
print(f" 检测设置: {scenes_data['detection_settings']}")
print(f" 创建时间: {scenes_data['created_at']}")
else:
print(f" ⚠️ 场景信息文件不存在")
return True
else:
print(f"❌ 视频拆分失败: {split_result.message}")
return False
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
return False
finally:
# 清理临时目录
print(f"\n🧹 清理临时目录: {temp_dir}")
shutil.rmtree(temp_dir, ignore_errors=True)
def test_command_line_interface():
"""测试命令行接口"""
print("\n" + "=" * 60)
print("🖥️ 测试命令行接口")
print("=" * 60)
# 查找测试视频
assets_dir = project_root / "assets"
video_files = list(assets_dir.rglob("*.mp4"))
if not video_files:
print("❌ 没有找到测试视频文件")
return False
test_video = str(video_files[0])
print(f"📹 测试视频: {test_video}")
try:
import subprocess
# 测试分析命令
print(f"\n🔍 测试分析命令...")
cmd = [
sys.executable,
str(project_root / "python_core" / "services" / "video_splitter.py"),
"analyze",
test_video,
"--threshold", "30.0"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
print(f"✅ 分析命令执行成功")
# 解析JSON输出
import json
try:
analysis_data = json.loads(result.stdout)
print(f" 总场景数: {analysis_data.get('total_scenes', 0)}")
print(f" 总时长: {analysis_data.get('total_duration', 0):.2f}")
except json.JSONDecodeError:
print(f" 输出: {result.stdout[:200]}...")
else:
print(f"❌ 分析命令执行失败")
print(f" 错误: {result.stderr}")
return False
return True
except Exception as e:
print(f"❌ 命令行测试失败: {e}")
return False
def main():
"""主函数"""
print("🚀 PySceneDetect视频拆分服务测试")
try:
# 检查PySceneDetect可用性
try:
import scenedetect
print(f"✅ PySceneDetect {scenedetect.__version__} 可用")
except ImportError:
print("❌ PySceneDetect不可用请安装: pip install scenedetect[opencv]")
return 1
# 测试服务功能
success1 = test_video_splitter_service()
# 测试命令行接口
success2 = test_command_line_interface()
print("\n" + "=" * 60)
print("📊 测试总结")
print("=" * 60)
if success1 and success2:
print("🎉 所有测试通过!")
print("\n✅ 功能验证:")
print(" 1. 视频场景分析 - 正常工作")
print(" 2. 场景检测 - 正常工作")
print(" 3. 视频拆分 - 正常工作")
print(" 4. 文件输出 - 正常工作")
print(" 5. 命令行接口 - 正常工作")
print("\n🚀 使用方法:")
print(" # 分析视频")
print(" python python_core/services/video_splitter.py analyze video.mp4")
print(" # 拆分视频")
print(" python python_core/services/video_splitter.py split video.mp4 --threshold 30")
return 0
else:
print("⚠️ 部分测试失败")
return 1
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)