#!/usr/bin/env python3 """ 测试PySceneDetect视频拆分服务 """ import os import sys import tempfile import shutil from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def test_video_splitter_service(): """测试视频拆分服务""" print("🎬 测试PySceneDetect视频拆分服务") print("=" * 60) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("❌ 没有找到测试视频文件") return False test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") print(f" 文件大小: {os.path.getsize(test_video) / (1024*1024):.1f} MB") # 创建临时输出目录 temp_dir = tempfile.mkdtemp(prefix="video_splitter_test_") print(f"📁 临时输出目录: {temp_dir}") try: from python_core.services.video_splitter import VideoSplitterService, SCENEDETECT_AVAILABLE if not SCENEDETECT_AVAILABLE: print("❌ PySceneDetect不可用,跳过测试") return False # 创建视频拆分服务 splitter = VideoSplitterService(output_base_dir=temp_dir) print("✅ 视频拆分服务创建成功") # 1. 测试视频分析 print(f"\n🔍 步骤1: 分析视频...") analysis_result = splitter.analyze_video(test_video, threshold=30.0) if analysis_result["success"]: print(f"✅ 视频分析成功:") print(f" 总场景数: {analysis_result['total_scenes']}") print(f" 总时长: {analysis_result['total_duration']:.2f}秒") print(f" 平均场景时长: {analysis_result['average_scene_duration']:.2f}秒") # 显示场景详情 scenes = analysis_result["scenes"] for i, scene in enumerate(scenes[:3]): # 只显示前3个场景 print(f" 场景 {scene['scene_number']}: {scene['start_time']:.2f}s - {scene['end_time']:.2f}s ({scene['duration']:.2f}s)") if len(scenes) > 3: print(f" ... 还有 {len(scenes) - 3} 个场景") else: print(f"❌ 视频分析失败: {analysis_result.get('error', 'Unknown error')}") return False # 2. 测试场景检测 print(f"\n🎯 步骤2: 检测场景...") scenes = splitter.detect_scenes(test_video, threshold=30.0, detector_type="content") print(f"✅ 场景检测成功:") print(f" 检测到 {len(scenes)} 个场景") for scene in scenes: print(f" 场景 {scene.scene_number}: {scene.start_time:.2f}s - {scene.end_time:.2f}s ({scene.duration:.2f}s)") # 3. 测试视频拆分 print(f"\n✂️ 步骤3: 拆分视频...") split_result = splitter.split_video( video_path=test_video, scenes=scenes, # 使用已检测的场景 threshold=30.0, detector_type="content" ) if split_result.success: print(f"✅ 视频拆分成功:") print(f" 输出目录: {split_result.output_directory}") print(f" 创建文件数: {len(split_result.output_files)}") print(f" 总场景数: {split_result.total_scenes}") print(f" 总时长: {split_result.total_duration:.2f}秒") print(f" 处理时间: {split_result.processing_time:.2f}秒") # 验证输出文件 print(f"\n📁 输出文件验证:") total_size = 0 for i, output_file in enumerate(split_result.output_files): if os.path.exists(output_file): file_size = os.path.getsize(output_file) / (1024 * 1024) total_size += file_size print(f" ✅ {os.path.basename(output_file)}: {file_size:.1f} MB") else: print(f" ❌ {os.path.basename(output_file)}: 文件不存在") print(f" 📊 总输出大小: {total_size:.1f} MB") # 检查场景信息文件 scenes_info_file = Path(split_result.output_directory) / "scenes_info.json" if scenes_info_file.exists(): print(f" ✅ 场景信息文件: {scenes_info_file}") # 读取并显示场景信息 import json with open(scenes_info_file, 'r', encoding='utf-8') as f: scenes_data = json.load(f) print(f" 📊 场景信息摘要:") print(f" 检测设置: {scenes_data['detection_settings']}") print(f" 创建时间: {scenes_data['created_at']}") else: print(f" ⚠️ 场景信息文件不存在") return True else: print(f"❌ 视频拆分失败: {split_result.message}") return False except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return False finally: # 清理临时目录 print(f"\n🧹 清理临时目录: {temp_dir}") shutil.rmtree(temp_dir, ignore_errors=True) def test_command_line_interface(): """测试命令行接口""" print("\n" + "=" * 60) print("🖥️ 测试命令行接口") print("=" * 60) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("❌ 没有找到测试视频文件") return False test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") try: import subprocess # 测试分析命令 print(f"\n🔍 测试分析命令...") cmd = [ sys.executable, str(project_root / "python_core" / "services" / "video_splitter.py"), "analyze", test_video, "--threshold", "30.0" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) if result.returncode == 0: print(f"✅ 分析命令执行成功") # 解析JSON输出 import json try: analysis_data = json.loads(result.stdout) print(f" 总场景数: {analysis_data.get('total_scenes', 0)}") print(f" 总时长: {analysis_data.get('total_duration', 0):.2f}秒") except json.JSONDecodeError: print(f" 输出: {result.stdout[:200]}...") else: print(f"❌ 分析命令执行失败") print(f" 错误: {result.stderr}") return False return True except Exception as e: print(f"❌ 命令行测试失败: {e}") return False def main(): """主函数""" print("🚀 PySceneDetect视频拆分服务测试") try: # 检查PySceneDetect可用性 try: import scenedetect print(f"✅ PySceneDetect {scenedetect.__version__} 可用") except ImportError: print("❌ PySceneDetect不可用,请安装: pip install scenedetect[opencv]") return 1 # 测试服务功能 success1 = test_video_splitter_service() # 测试命令行接口 success2 = test_command_line_interface() print("\n" + "=" * 60) print("📊 测试总结") print("=" * 60) if success1 and success2: print("🎉 所有测试通过!") print("\n✅ 功能验证:") print(" 1. 视频场景分析 - 正常工作") print(" 2. 场景检测 - 正常工作") print(" 3. 视频拆分 - 正常工作") print(" 4. 文件输出 - 正常工作") print(" 5. 命令行接口 - 正常工作") print("\n🚀 使用方法:") print(" # 分析视频") print(" python python_core/services/video_splitter.py analyze video.mp4") print(" # 拆分视频") print(" python python_core/services/video_splitter.py split video.mp4 --threshold 30") return 0 else: print("⚠️ 部分测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)