#!/usr/bin/env python3 """ 测试PySceneDetect视频拆分服务的JSON-RPC功能 """ import os import sys import json import subprocess from pathlib import Path # 添加项目根目录到Python路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) def run_video_splitter_command(command, video_path, **kwargs): """运行视频拆分命令并解析JSON-RPC结果""" # 构建命令 cmd = [ sys.executable, str(project_root / "python_core" / "services" / "video_splitter.py"), command, video_path ] # 添加可选参数 for key, value in kwargs.items(): if value is not None: cmd.extend([f"--{key.replace('_', '-')}", str(value)]) # 设置环境变量 env = os.environ.copy() env['PYTHONPATH'] = str(project_root) print(f"🔧 执行命令: {' '.join(cmd)}") try: # 执行命令 result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, env=env ) if result.returncode == 0: # 解析JSON-RPC输出 stdout = result.stdout.strip() # 检查是否是JSON-RPC格式 if stdout.startswith("JSONRPC:"): json_str = stdout[8:] # 移除"JSONRPC:"前缀 try: json_data = json.loads(json_str) return { "success": True, "data": json_data, "stderr": result.stderr } except json.JSONDecodeError as e: return { "success": False, "error": f"JSON decode error: {e}", "raw_output": stdout, "stderr": result.stderr } else: # 尝试直接解析JSON try: json_data = json.loads(stdout) return { "success": True, "data": json_data, "stderr": result.stderr } except json.JSONDecodeError: return { "success": True, "data": {"raw_output": stdout}, "stderr": result.stderr } else: return { "success": False, "error": f"Command failed with return code {result.returncode}", "stdout": result.stdout, "stderr": result.stderr } except subprocess.TimeoutExpired: return { "success": False, "error": "Command timeout" } except Exception as e: return { "success": False, "error": f"Execution error: {e}" } def test_analyze_command(): """测试分析命令的JSON-RPC输出""" print("🔍 测试视频分析命令 (JSON-RPC)") print("=" * 50) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("❌ 没有找到测试视频文件") return False test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") # 执行分析命令 result = run_video_splitter_command("analyze", test_video, threshold=30.0) if result["success"]: print("✅ 命令执行成功") data = result["data"] if isinstance(data, dict): if "result" in data: # JSON-RPC格式 analysis_result = data["result"] print("📊 JSON-RPC分析结果:") print(f" 成功: {analysis_result.get('success', False)}") if analysis_result.get("success"): print(f" 总场景数: {analysis_result.get('total_scenes', 0)}") print(f" 总时长: {analysis_result.get('total_duration', 0):.2f}秒") print(f" 平均场景时长: {analysis_result.get('average_scene_duration', 0):.2f}秒") else: print(f" 错误: {analysis_result.get('error', 'Unknown error')}") else: # 直接JSON格式 print("📊 直接JSON分析结果:") print(f" 成功: {data.get('success', False)}") if data.get("success"): print(f" 总场景数: {data.get('total_scenes', 0)}") print(f" 总时长: {data.get('total_duration', 0):.2f}秒") print(f" 平均场景时长: {data.get('average_scene_duration', 0):.2f}秒") return True else: print(f"❌ 命令执行失败: {result['error']}") if "stderr" in result: print(f" 错误输出: {result['stderr']}") return False def test_detect_scenes_command(): """测试场景检测命令的JSON-RPC输出""" print("\n🎯 测试场景检测命令 (JSON-RPC)") print("=" * 50) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("❌ 没有找到测试视频文件") return False test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") # 执行场景检测命令 result = run_video_splitter_command("detect_scenes", test_video, threshold=30.0, detector="content") if result["success"]: print("✅ 命令执行成功") data = result["data"] if isinstance(data, dict): if "result" in data: # JSON-RPC格式 detect_result = data["result"] print("🎬 JSON-RPC场景检测结果:") print(f" 成功: {detect_result.get('success', False)}") if detect_result.get("success"): print(f" 总场景数: {detect_result.get('total_scenes', 0)}") print(f" 检测设置: {detect_result.get('detection_settings', {})}") scenes = detect_result.get('scenes', []) for i, scene in enumerate(scenes[:3]): # 只显示前3个 print(f" 场景 {scene.get('scene_number', i+1)}: {scene.get('start_time', 0):.2f}s - {scene.get('end_time', 0):.2f}s") if len(scenes) > 3: print(f" ... 还有 {len(scenes) - 3} 个场景") else: # 直接JSON格式 print("🎬 直接JSON场景检测结果:") print(f" 成功: {data.get('success', False)}") if data.get("success"): print(f" 总场景数: {data.get('total_scenes', 0)}") print(f" 检测设置: {data.get('detection_settings', {})}") return True else: print(f"❌ 命令执行失败: {result['error']}") if "stderr" in result: print(f" 错误输出: {result['stderr']}") return False def test_split_command(): """测试视频拆分命令的JSON-RPC输出""" print("\n✂️ 测试视频拆分命令 (JSON-RPC)") print("=" * 50) # 查找测试视频 assets_dir = project_root / "assets" video_files = list(assets_dir.rglob("*.mp4")) if not video_files: print("❌ 没有找到测试视频文件") return False test_video = str(video_files[0]) print(f"📹 测试视频: {test_video}") # 创建临时输出目录 import tempfile temp_dir = tempfile.mkdtemp(prefix="video_split_test_") print(f"📁 输出目录: {temp_dir}") try: # 执行拆分命令 result = run_video_splitter_command( "split", test_video, threshold=30.0, detector="content", output_dir=temp_dir ) if result["success"]: print("✅ 命令执行成功") data = result["data"] if isinstance(data, dict): if "result" in data: # JSON-RPC格式 split_result = data["result"] print("🎬 JSON-RPC拆分结果:") print(f" 成功: {split_result.get('success', False)}") if split_result.get("success"): print(f" 输出目录: {split_result.get('output_directory', '')}") print(f" 总场景数: {split_result.get('total_scenes', 0)}") print(f" 输出文件数: {len(split_result.get('output_files', []))}") print(f" 处理时间: {split_result.get('processing_time', 0):.2f}秒") else: print(f" 错误: {split_result.get('message', 'Unknown error')}") else: # 直接JSON格式 print("🎬 直接JSON拆分结果:") print(f" 成功: {data.get('success', False)}") if data.get("success"): print(f" 输出目录: {data.get('output_directory', '')}") print(f" 总场景数: {data.get('total_scenes', 0)}") print(f" 输出文件数: {len(data.get('output_files', []))}") print(f" 处理时间: {data.get('processing_time', 0):.2f}秒") return True else: print(f"❌ 命令执行失败: {result['error']}") if "stderr" in result: print(f" 错误输出: {result['stderr']}") return False finally: # 清理临时目录 import shutil shutil.rmtree(temp_dir, ignore_errors=True) print(f"🧹 清理临时目录: {temp_dir}") def main(): """主函数""" print("🚀 PySceneDetect视频拆分服务 JSON-RPC 测试") try: # 检查PySceneDetect try: import scenedetect print(f"✅ PySceneDetect {scenedetect.__version__} 可用") except ImportError: print("❌ PySceneDetect不可用,请安装: pip install scenedetect[opencv]") return 1 # 测试各个命令 success1 = test_analyze_command() success2 = test_detect_scenes_command() success3 = test_split_command() print("\n" + "=" * 60) print("📊 JSON-RPC测试总结") print("=" * 60) if success1 and success2 and success3: print("🎉 所有JSON-RPC测试通过!") print("\n✅ 功能验证:") print(" 1. 视频分析命令 JSON-RPC - ✅") print(" 2. 场景检测命令 JSON-RPC - ✅") print(" 3. 视频拆分命令 JSON-RPC - ✅") print("\n🚀 JSON-RPC使用方法:") print(" # 分析视频") print(" python python_core/services/video_splitter.py analyze video.mp4") print(" # 检测场景") print(" python python_core/services/video_splitter.py detect_scenes video.mp4") print(" # 拆分视频") print(" python python_core/services/video_splitter.py split video.mp4") return 0 else: print("⚠️ 部分JSON-RPC测试失败") return 1 except Exception as e: print(f"❌ 测试过程中出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)