#!/usr/bin/env python3 """ Scene Detection CLI - Refactored 场景检测命令行工具 - 重构版 使用重构后的场景检测模块,代码更简洁、模块化更好。 """ import typer from pathlib import Path from typing import Optional, List from rich.console import Console from rich.table import Table from python_core.scene_detection import ( SceneDetector, DetectorType, OutputFormat ) from python_core.utils.logger import logger scene_detect = typer.Typer(help="场景检测工具 - 重构版") console = Console() @scene_detect.command("detect") def detect( video_path: Path = typer.Argument(..., help="视频文件路径"), detector_type: DetectorType = typer.Option(DetectorType.CONTENT, "--detector", "-d", help="检测器类型"), threshold: float = typer.Option(30.0, "--threshold", "-t", help="检测阈值"), min_scene_length: float = typer.Option(1.0, "--min-length", "-m", help="最小场景长度(秒)"), output: Optional[Path] = typer.Option(None, "--output", "-o", help="输出文件路径"), output_format: OutputFormat = typer.Option(OutputFormat.JSON, "--format", "-f", help="输出格式"), ai_analysis: bool = typer.Option(True, "--ai/--no-ai", help="启用/禁用AI分析"), verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出") ): """使用LangGraph工作流进行场景检测""" console.print(f"🔄 使用工作流检测视频: [bold blue]{video_path}[/bold blue]") try: # 创建检测器 detector = SceneDetector() # 执行工作流检测 result = detector.detect_with_workflow( video_path, detector_type, threshold, min_scene_length, output, output_format, ai_analysis ) # 显示结果 if result.get("workflow_state") == "completed": detection_result = result.get("detection_result") if detection_result and detection_result.success: console.print(f"✅ 工作流完成: [bold green]{detection_result.total_scenes}[/bold green] 个场景") console.print(f"📊 检测时间: {detection_result.detection_time:.2f}秒") # 显示AI分析结果 ai_analysis_result = result.get("ai_analysis") if ai_analysis_result and ai_analysis_result != "AI分析已禁用": console.print("\n🧠 AI分析结果:") console.print(ai_analysis_result[:500] + "..." if len(ai_analysis_result) > 500 else ai_analysis_result) # 显示场景列表 if verbose: _display_scenes_table(detection_result.scenes) else: console.print(f"❌ 检测失败: [bold red]{detection_result.error if detection_result else '未知错误'}[/bold red]") raise typer.Exit(1) else: errors = result.get("errors", []) error_msg = "; ".join(errors) if errors else "工作流执行失败" console.print(f"❌ 工作流失败: [bold red]{error_msg}[/bold red]") raise typer.Exit(1) except Exception as e: console.print(f"❌ 执行失败: [bold red]{str(e)}[/bold red]") raise typer.Exit(1) def _display_scenes_table(scenes): """显示场景表格""" table = Table(title="检测到的场景") table.add_column("场景", style="cyan") table.add_column("开始时间", style="green") table.add_column("结束时间", style="green") table.add_column("时长", style="yellow") for scene in scenes: table.add_row( str(scene.index + 1), f"{scene.start_time:.2f}s", f"{scene.end_time:.2f}s", f"{scene.duration:.2f}s" ) console.print(table) @scene_detect.command("batch") def batch_detect_and_split( input_dir: Path = typer.Argument(..., help="包含视频文件的输入目录"), output_dir: Path = typer.Argument(..., help="输出目录"), detector_type: DetectorType = typer.Option(DetectorType.CONTENT, "--detector", "-d", help="检测器类型"), threshold: float = typer.Option(30.0, "--threshold", "-t", help="检测阈值"), min_scene_length: float = typer.Option(1.0, "--min-length", "-m", help="最小场景长度(秒)"), output_format: OutputFormat = typer.Option(OutputFormat.JSON, "--format", "-f", help="输出格式"), ai_analysis: bool = typer.Option(False, "--ai/--no-ai", help="启用/禁用AI分析"), video_splitting: bool = typer.Option(True, "--split/--no-split", help="启用/禁用视频切分"), max_concurrent: int = typer.Option(2, "--concurrent", "-c", help="最大并发数"), continue_on_error: bool = typer.Option(True, "--continue/--stop-on-error", help="遇到错误时继续/停止"), file_pattern: str = typer.Option("*.mp4", "--pattern", "-p", help="视频文件匹配模式"), use_advanced_split: bool = typer.Option(True, "--advanced/--traditional", help="使用高效批量切分/传统逐个切分"), split_quality: int = typer.Option(23, "--quality", "-q", help="切分质量 (CRF值, 18-28)"), split_preset: str = typer.Option("fast", "--preset", help="编码预设 (ultrafast/fast/medium/slow)"), max_duration: float = typer.Option(2.0, "--max-duration", "-d", help="最大视频时长限制(秒),超过将二次切分"), verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出") ): """批量场景检测和视频切分""" console.print(f"🔄 批量处理目录: [bold blue]{input_dir}[/bold blue]") console.print(f"📂 输出目录: [bold blue]{output_dir}[/bold blue]") try: # 检查输入目录 if not input_dir.exists() or not input_dir.is_dir(): console.print(f"❌ 输入目录不存在或不是目录: [bold red]{input_dir}[/bold red]") raise typer.Exit(1) # 查找视频文件 video_extensions = ['*.mp4', '*.avi', '*.mov', '*.mkv', '*.wmv', '*.flv', '*.webm', '*.m4v'] video_files = [] if file_pattern in video_extensions: # 使用指定的模式 video_files = list(input_dir.glob(file_pattern)) else: # 使用自定义模式 video_files = list(input_dir.glob(file_pattern)) # 如果自定义模式没找到文件,尝试所有支持的格式 if not video_files: for pattern in video_extensions: video_files.extend(input_dir.glob(pattern)) if not video_files: console.print(f"❌ 在目录中未找到视频文件: [bold red]{input_dir}[/bold red]") console.print(f"💡 尝试的模式: {file_pattern}") raise typer.Exit(1) console.print(f"📹 找到 {len(video_files)} 个视频文件") # 创建检测器 detector = SceneDetector() # 执行批量处理 result = detector.batch_detect_and_split( video_paths=video_files, output_base_dir=output_dir, detector_type=detector_type, threshold=threshold, min_scene_length=min_scene_length, output_format=output_format, enable_ai_analysis=ai_analysis, enable_video_splitting=video_splitting, max_concurrent=max_concurrent, continue_on_error=continue_on_error, use_advanced_split=use_advanced_split, split_quality=split_quality, split_preset=split_preset, max_video_duration=max_duration ) # 显示结果 if result.get("workflow_state") == "completed": summary = result.get("batch_results", {}) console.print(f"\n✅ 批量处理完成!") console.print(f"📊 处理统计:") console.print(f" 总视频数: {summary.get('total_videos', 0)}") console.print(f" 成功处理: {summary.get('completed_videos', 0)}") console.print(f" 处理失败: {summary.get('failed_videos', 0)}") console.print(f" 成功率: {summary.get('success_rate', 0):.1f}%") if video_splitting: tasks_data = summary.get('tasks', []) if tasks_data: total_scenes = sum(task.get('total_scenes', 0) for task in tasks_data) total_splits = sum(task.get('split_count', 0) for task in tasks_data) console.print(f" 总场景数: {total_scenes}") console.print(f" 切分片段: {total_splits}") else: console.print(" ⚠️ 无任务数据") # 显示详细结果 if verbose: tasks = summary.get('tasks', []) if tasks: _display_batch_results_table(tasks) else: console.print(" ⚠️ 无详细任务数据可显示") else: console.print(f"❌ 批量处理失败") errors = result.get("errors", []) if errors: for error in errors: console.print(f" • {error}") raise typer.Exit(1) except Exception as e: console.print(f"❌ 执行失败: [bold red]{str(e)}[/bold red]") raise typer.Exit(1) def _display_batch_results_table(tasks): """显示批量处理结果表格""" table = Table(title="批量处理结果") table.add_column("视频文件", style="cyan") table.add_column("状态", style="green") table.add_column("场景数", style="yellow") table.add_column("切分数", style="blue") table.add_column("处理时间", style="magenta") table.add_column("错误", style="red") for task in tasks: video_name = Path(task["video_path"]).name status = "✅ 成功" if task["status"] == "completed" else "❌ 失败" scenes = str(task.get("total_scenes", 0)) splits = str(task.get("split_count", 0)) proc_time = f"{task.get('processing_time', 0):.1f}s" error_text = task.get("error") or "" error = error_text[:50] + "..." if len(error_text) > 50 else error_text table.add_row(video_name, status, scenes, splits, proc_time, error) console.print(table) if __name__ == "__main__": scene_detect()