244 lines
8.7 KiB
Python
244 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
测试PySceneDetect视频拆分服务
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
import shutil
|
||
from pathlib import Path
|
||
|
||
# 添加项目根目录到Python路径
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
def test_video_splitter_service():
|
||
"""测试视频拆分服务"""
|
||
print("🎬 测试PySceneDetect视频拆分服务")
|
||
print("=" * 60)
|
||
|
||
# 查找测试视频
|
||
assets_dir = project_root / "assets"
|
||
video_files = list(assets_dir.rglob("*.mp4"))
|
||
|
||
if not video_files:
|
||
print("❌ 没有找到测试视频文件")
|
||
return False
|
||
|
||
test_video = str(video_files[0])
|
||
print(f"📹 测试视频: {test_video}")
|
||
print(f" 文件大小: {os.path.getsize(test_video) / (1024*1024):.1f} MB")
|
||
|
||
# 创建临时输出目录
|
||
temp_dir = tempfile.mkdtemp(prefix="video_splitter_test_")
|
||
print(f"📁 临时输出目录: {temp_dir}")
|
||
|
||
try:
|
||
from python_core.services.video_splitter import VideoSplitterService, SCENEDETECT_AVAILABLE
|
||
|
||
if not SCENEDETECT_AVAILABLE:
|
||
print("❌ PySceneDetect不可用,跳过测试")
|
||
return False
|
||
|
||
# 创建视频拆分服务
|
||
splitter = VideoSplitterService(output_base_dir=temp_dir)
|
||
print("✅ 视频拆分服务创建成功")
|
||
|
||
# 1. 测试视频分析
|
||
print(f"\n🔍 步骤1: 分析视频...")
|
||
analysis_result = splitter.analyze_video(test_video, threshold=30.0)
|
||
|
||
if analysis_result["success"]:
|
||
print(f"✅ 视频分析成功:")
|
||
print(f" 总场景数: {analysis_result['total_scenes']}")
|
||
print(f" 总时长: {analysis_result['total_duration']:.2f}秒")
|
||
print(f" 平均场景时长: {analysis_result['average_scene_duration']:.2f}秒")
|
||
|
||
# 显示场景详情
|
||
scenes = analysis_result["scenes"]
|
||
for i, scene in enumerate(scenes[:3]): # 只显示前3个场景
|
||
print(f" 场景 {scene['scene_number']}: {scene['start_time']:.2f}s - {scene['end_time']:.2f}s ({scene['duration']:.2f}s)")
|
||
if len(scenes) > 3:
|
||
print(f" ... 还有 {len(scenes) - 3} 个场景")
|
||
else:
|
||
print(f"❌ 视频分析失败: {analysis_result.get('error', 'Unknown error')}")
|
||
return False
|
||
|
||
# 2. 测试场景检测
|
||
print(f"\n🎯 步骤2: 检测场景...")
|
||
scenes = splitter.detect_scenes(test_video, threshold=30.0, detector_type="content")
|
||
|
||
print(f"✅ 场景检测成功:")
|
||
print(f" 检测到 {len(scenes)} 个场景")
|
||
for scene in scenes:
|
||
print(f" 场景 {scene.scene_number}: {scene.start_time:.2f}s - {scene.end_time:.2f}s ({scene.duration:.2f}s)")
|
||
|
||
# 3. 测试视频拆分
|
||
print(f"\n✂️ 步骤3: 拆分视频...")
|
||
split_result = splitter.split_video(
|
||
video_path=test_video,
|
||
scenes=scenes, # 使用已检测的场景
|
||
threshold=30.0,
|
||
detector_type="content"
|
||
)
|
||
|
||
if split_result.success:
|
||
print(f"✅ 视频拆分成功:")
|
||
print(f" 输出目录: {split_result.output_directory}")
|
||
print(f" 创建文件数: {len(split_result.output_files)}")
|
||
print(f" 总场景数: {split_result.total_scenes}")
|
||
print(f" 总时长: {split_result.total_duration:.2f}秒")
|
||
print(f" 处理时间: {split_result.processing_time:.2f}秒")
|
||
|
||
# 验证输出文件
|
||
print(f"\n📁 输出文件验证:")
|
||
total_size = 0
|
||
for i, output_file in enumerate(split_result.output_files):
|
||
if os.path.exists(output_file):
|
||
file_size = os.path.getsize(output_file) / (1024 * 1024)
|
||
total_size += file_size
|
||
print(f" ✅ {os.path.basename(output_file)}: {file_size:.1f} MB")
|
||
else:
|
||
print(f" ❌ {os.path.basename(output_file)}: 文件不存在")
|
||
|
||
print(f" 📊 总输出大小: {total_size:.1f} MB")
|
||
|
||
# 检查场景信息文件
|
||
scenes_info_file = Path(split_result.output_directory) / "scenes_info.json"
|
||
if scenes_info_file.exists():
|
||
print(f" ✅ 场景信息文件: {scenes_info_file}")
|
||
|
||
# 读取并显示场景信息
|
||
import json
|
||
with open(scenes_info_file, 'r', encoding='utf-8') as f:
|
||
scenes_data = json.load(f)
|
||
|
||
print(f" 📊 场景信息摘要:")
|
||
print(f" 检测设置: {scenes_data['detection_settings']}")
|
||
print(f" 创建时间: {scenes_data['created_at']}")
|
||
else:
|
||
print(f" ⚠️ 场景信息文件不存在")
|
||
|
||
return True
|
||
else:
|
||
print(f"❌ 视频拆分失败: {split_result.message}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试过程中出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
finally:
|
||
# 清理临时目录
|
||
print(f"\n🧹 清理临时目录: {temp_dir}")
|
||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||
|
||
def test_command_line_interface():
|
||
"""测试命令行接口"""
|
||
print("\n" + "=" * 60)
|
||
print("🖥️ 测试命令行接口")
|
||
print("=" * 60)
|
||
|
||
# 查找测试视频
|
||
assets_dir = project_root / "assets"
|
||
video_files = list(assets_dir.rglob("*.mp4"))
|
||
|
||
if not video_files:
|
||
print("❌ 没有找到测试视频文件")
|
||
return False
|
||
|
||
test_video = str(video_files[0])
|
||
print(f"📹 测试视频: {test_video}")
|
||
|
||
try:
|
||
import subprocess
|
||
|
||
# 测试分析命令
|
||
print(f"\n🔍 测试分析命令...")
|
||
cmd = [
|
||
sys.executable,
|
||
str(project_root / "python_core" / "services" / "video_splitter.py"),
|
||
"analyze",
|
||
test_video,
|
||
"--threshold", "30.0"
|
||
]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||
|
||
if result.returncode == 0:
|
||
print(f"✅ 分析命令执行成功")
|
||
|
||
# 解析JSON输出
|
||
import json
|
||
try:
|
||
analysis_data = json.loads(result.stdout)
|
||
print(f" 总场景数: {analysis_data.get('total_scenes', 0)}")
|
||
print(f" 总时长: {analysis_data.get('total_duration', 0):.2f}秒")
|
||
except json.JSONDecodeError:
|
||
print(f" 输出: {result.stdout[:200]}...")
|
||
else:
|
||
print(f"❌ 分析命令执行失败")
|
||
print(f" 错误: {result.stderr}")
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 命令行测试失败: {e}")
|
||
return False
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("🚀 PySceneDetect视频拆分服务测试")
|
||
|
||
try:
|
||
# 检查PySceneDetect可用性
|
||
try:
|
||
import scenedetect
|
||
print(f"✅ PySceneDetect {scenedetect.__version__} 可用")
|
||
except ImportError:
|
||
print("❌ PySceneDetect不可用,请安装: pip install scenedetect[opencv]")
|
||
return 1
|
||
|
||
# 测试服务功能
|
||
success1 = test_video_splitter_service()
|
||
|
||
# 测试命令行接口
|
||
success2 = test_command_line_interface()
|
||
|
||
print("\n" + "=" * 60)
|
||
print("📊 测试总结")
|
||
print("=" * 60)
|
||
|
||
if success1 and success2:
|
||
print("🎉 所有测试通过!")
|
||
print("\n✅ 功能验证:")
|
||
print(" 1. 视频场景分析 - 正常工作")
|
||
print(" 2. 场景检测 - 正常工作")
|
||
print(" 3. 视频拆分 - 正常工作")
|
||
print(" 4. 文件输出 - 正常工作")
|
||
print(" 5. 命令行接口 - 正常工作")
|
||
|
||
print("\n🚀 使用方法:")
|
||
print(" # 分析视频")
|
||
print(" python python_core/services/video_splitter.py analyze video.mp4")
|
||
print(" # 拆分视频")
|
||
print(" python python_core/services/video_splitter.py split video.mp4 --threshold 30")
|
||
|
||
return 0
|
||
else:
|
||
print("⚠️ 部分测试失败")
|
||
return 1
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试过程中出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
if __name__ == "__main__":
|
||
exit_code = main()
|
||
sys.exit(exit_code)
|