mxivideo/python_core/cli/commands/scene_detect.py

443 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Scene Detection CLI - Refactored
场景检测命令行工具 - 重构版
使用重构后的场景检测模块,代码更简洁、模块化更好。
"""
import typer
from pathlib import Path
from typing import Optional, List
from rich.console import Console
from rich.table import Table
from python_core.scene_detection import (
SceneDetector,
DetectorType,
OutputFormat
)
from python_core.scene_detection.single_scene_detector import SingleSceneDetector
from python_core.utils.logger import logger
scene_detect = typer.Typer(help="场景检测工具 - 重构版")
console = Console()
@scene_detect.command("detect")
def detect(
video_path: Path = typer.Argument(..., help="视频文件路径"),
detector_type: DetectorType = typer.Option(DetectorType.CONTENT, "--detector", "-d", help="检测器类型"),
threshold: float = typer.Option(30.0, "--threshold", "-t", help="检测阈值"),
min_scene_length: float = typer.Option(1.0, "--min-length", "-m", help="最小场景长度(秒)"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="输出文件路径"),
output_format: OutputFormat = typer.Option(OutputFormat.JSON, "--format", "-f", help="输出格式"),
ai_analysis: bool = typer.Option(True, "--ai/--no-ai", help="启用/禁用AI分析"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出")
):
"""使用LangGraph工作流进行场景检测"""
console.print(f"🔄 使用工作流检测视频: [bold blue]{video_path}[/bold blue]")
try:
# 创建检测器
detector = SceneDetector()
# 执行工作流检测
result = detector.detect_with_workflow(
video_path, detector_type, threshold, min_scene_length,
output, output_format, ai_analysis
)
# 显示结果
if result.get("workflow_state") == "completed":
detection_result = result.get("detection_result")
if detection_result and detection_result.success:
console.print(f"✅ 工作流完成: [bold green]{detection_result.total_scenes}[/bold green] 个场景")
console.print(f"📊 检测时间: {detection_result.detection_time:.2f}")
# 显示AI分析结果
ai_analysis_result = result.get("ai_analysis")
if ai_analysis_result and ai_analysis_result != "AI分析已禁用":
console.print("\n🧠 AI分析结果:")
console.print(ai_analysis_result[:500] + "..." if len(ai_analysis_result) > 500 else ai_analysis_result)
# 显示场景列表
if verbose:
_display_scenes_table(detection_result.scenes)
else:
console.print(f"❌ 检测失败: [bold red]{detection_result.error if detection_result else '未知错误'}[/bold red]")
raise typer.Exit(1)
else:
errors = result.get("errors", [])
error_msg = "; ".join(errors) if errors else "工作流执行失败"
console.print(f"❌ 工作流失败: [bold red]{error_msg}[/bold red]")
raise typer.Exit(1)
except Exception as e:
console.print(f"❌ 执行失败: [bold red]{str(e)}[/bold red]")
raise typer.Exit(1)
def _display_scenes_table(scenes):
"""显示场景表格"""
table = Table(title="检测到的场景")
table.add_column("场景", style="cyan")
table.add_column("开始时间", style="green")
table.add_column("结束时间", style="green")
table.add_column("时长", style="yellow")
for scene in scenes:
table.add_row(
str(scene.index + 1),
f"{scene.start_time:.2f}s",
f"{scene.end_time:.2f}s",
f"{scene.duration:.2f}s"
)
console.print(table)
@scene_detect.command("batch")
def batch_detect_and_split(
input_dir: Path = typer.Argument(..., help="包含视频文件的输入目录"),
output_dir: Path = typer.Argument(..., help="输出目录"),
detector_type: DetectorType = typer.Option(DetectorType.CONTENT, "--detector", "-d", help="检测器类型"),
threshold: float = typer.Option(30.0, "--threshold", "-t", help="检测阈值"),
min_scene_length: float = typer.Option(1.0, "--min-length", "-m", help="最小场景长度(秒)"),
output_format: OutputFormat = typer.Option(OutputFormat.JSON, "--format", "-f", help="输出格式"),
ai_analysis: bool = typer.Option(False, "--ai/--no-ai", help="启用/禁用AI分析"),
video_splitting: bool = typer.Option(True, "--split/--no-split", help="启用/禁用视频切分"),
max_concurrent: int = typer.Option(2, "--concurrent", "-c", help="最大并发数"),
continue_on_error: bool = typer.Option(True, "--continue/--stop-on-error", help="遇到错误时继续/停止"),
file_pattern: str = typer.Option("*.mp4", "--pattern", "-p", help="视频文件匹配模式"),
use_advanced_split: bool = typer.Option(True, "--advanced/--traditional", help="使用高效批量切分/传统逐个切分"),
split_quality: int = typer.Option(23, "--quality", "-q", help="切分质量 (CRF值, 18-28)"),
split_preset: str = typer.Option("fast", "--preset", help="编码预设 (ultrafast/fast/medium/slow)"),
max_duration: float = typer.Option(2.0, "--max-duration", "-d", help="最大视频时长限制(秒),超过将二次切分"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出")
):
"""批量场景检测和视频切分"""
console.print(f"🔄 批量处理目录: [bold blue]{input_dir}[/bold blue]")
console.print(f"📂 输出目录: [bold blue]{output_dir}[/bold blue]")
try:
# 检查输入目录
if not input_dir.exists() or not input_dir.is_dir():
console.print(f"❌ 输入目录不存在或不是目录: [bold red]{input_dir}[/bold red]")
raise typer.Exit(1)
# 查找视频文件
video_extensions = ['*.mp4', '*.avi', '*.mov', '*.mkv', '*.wmv', '*.flv', '*.webm', '*.m4v']
video_files = []
if file_pattern in video_extensions:
# 使用指定的模式
video_files = list(input_dir.glob(file_pattern))
else:
# 使用自定义模式
video_files = list(input_dir.glob(file_pattern))
# 如果自定义模式没找到文件,尝试所有支持的格式
if not video_files:
for pattern in video_extensions:
video_files.extend(input_dir.glob(pattern))
if not video_files:
console.print(f"❌ 在目录中未找到视频文件: [bold red]{input_dir}[/bold red]")
console.print(f"💡 尝试的模式: {file_pattern}")
raise typer.Exit(1)
console.print(f"📹 找到 {len(video_files)} 个视频文件")
# 创建检测器
detector = SceneDetector()
# 执行批量处理
result = detector.batch_detect_and_split(
video_paths=video_files,
output_base_dir=output_dir,
detector_type=detector_type,
threshold=threshold,
min_scene_length=min_scene_length,
output_format=output_format,
enable_ai_analysis=ai_analysis,
enable_video_splitting=video_splitting,
max_concurrent=max_concurrent,
continue_on_error=continue_on_error,
use_advanced_split=use_advanced_split,
split_quality=split_quality,
split_preset=split_preset,
max_video_duration=max_duration
)
# 显示结果
if result.get("workflow_state") == "completed":
summary = result.get("batch_results", {})
console.print(f"\n✅ 批量处理完成!")
console.print(f"📊 处理统计:")
console.print(f" 总视频数: {summary.get('total_videos', 0)}")
console.print(f" 成功处理: {summary.get('completed_videos', 0)}")
console.print(f" 处理失败: {summary.get('failed_videos', 0)}")
console.print(f" 成功率: {summary.get('success_rate', 0):.1f}%")
if video_splitting:
tasks_data = summary.get('tasks', [])
if tasks_data:
total_scenes = sum(task.get('total_scenes', 0) for task in tasks_data)
total_splits = sum(task.get('split_count', 0) for task in tasks_data)
console.print(f" 总场景数: {total_scenes}")
console.print(f" 切分片段: {total_splits}")
else:
console.print(" ⚠️ 无任务数据")
# 显示详细结果
if verbose:
tasks = summary.get('tasks', [])
if tasks:
_display_batch_results_table(tasks)
else:
console.print(" ⚠️ 无详细任务数据可显示")
else:
console.print(f"❌ 批量处理失败")
errors = result.get("errors", [])
if errors:
for error in errors:
console.print(f"{error}")
raise typer.Exit(1)
except Exception as e:
console.print(f"❌ 执行失败: [bold red]{str(e)}[/bold red]")
raise typer.Exit(1)
def _display_batch_results_table(tasks):
"""显示批量处理结果表格"""
table = Table(title="批量处理结果")
table.add_column("视频文件", style="cyan")
table.add_column("状态", style="green")
table.add_column("场景数", style="yellow")
table.add_column("切分数", style="blue")
table.add_column("处理时间", style="magenta")
table.add_column("错误", style="red")
for task in tasks:
video_name = Path(task["video_path"]).name
status = "✅ 成功" if task["status"] == "completed" else "❌ 失败"
scenes = str(task.get("total_scenes", 0))
splits = str(task.get("split_count", 0))
proc_time = f"{task.get('processing_time', 0):.1f}s"
error_text = task.get("error") or ""
error = error_text[:50] + "..." if len(error_text) > 50 else error_text
table.add_row(video_name, status, scenes, splits, proc_time, error)
console.print(table)
@scene_detect.command("single")
def single_detect_and_split(
video_path: str = typer.Argument(..., help="视频文件路径"),
output_dir: Optional[str] = typer.Option(None, "--output", "-o", help="输出目录"),
detector_type: str = typer.Option("content", "--detector", "-d", help="检测器类型 (content/threshold/adaptive)"),
threshold: float = typer.Option(30.0, "--threshold", "-t", help="检测阈值 (0-100)"),
min_scene_length: float = typer.Option(1.0, "--min-length", "-l", help="最小场景长度(秒)"),
output_format: str = typer.Option("json", "--format", "-f", help="输出格式 (json/csv/txt)"),
ai_analysis: bool = typer.Option(False, "--ai/--no-ai", help="启用/禁用AI分析"),
video_splitting: bool = typer.Option(True, "--split/--no-split", help="启用/禁用视频切分"),
use_advanced_split: bool = typer.Option(True, "--advanced/--traditional", help="使用高效批量切分/传统逐个切分"),
split_quality: int = typer.Option(23, "--quality", "-q", help="切分质量 (CRF值, 18-28)"),
split_preset: str = typer.Option("fast", "--preset", help="编码预设 (ultrafast/fast/medium/slow)"),
max_duration: float = typer.Option(2.0, "--max-duration", "-m", help="最大视频时长限制(秒),超过将二次切分"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出")
):
"""单个视频文件的场景检测和切分"""
try:
console.print(f"🎬 [bold blue]单个视频场景检测和切分[/bold blue]")
console.print(f"📁 视频文件: {video_path}")
# 验证输入参数
try:
detector_type_enum = DetectorType(detector_type)
output_format_enum = OutputFormat(output_format)
except ValueError as e:
console.print(f"[red]❌ 参数错误: {e}[/red]")
raise typer.Exit(1)
# 创建检测器
detector = SingleSceneDetector()
# 验证视频文件
validation_result = detector.validate_video(video_path)
if not validation_result["valid"]:
console.print(f"[red]❌ 视频文件验证失败: {validation_result['error']}[/red]")
raise typer.Exit(1)
console.print(f"✅ 视频文件验证通过")
console.print(f"📊 文件大小: {validation_result['file_size']:,} 字节")
# 执行处理
result = detector.detect_and_split(
video_path=video_path,
output_dir=output_dir,
detector_type=detector_type_enum,
threshold=threshold,
min_scene_length=min_scene_length,
output_format=output_format_enum,
enable_ai_analysis=ai_analysis,
enable_video_splitting=video_splitting,
use_advanced_split=use_advanced_split,
split_quality=split_quality,
split_preset=split_preset,
max_video_duration=max_duration
)
# 显示结果
if result["success"]:
console.print(f"\n✅ [bold green]处理完成![/bold green]")
console.print(f"📊 处理统计:")
console.print(f" 视频文件: {Path(result['video_path']).name}")
console.print(f" 输出目录: {result['output_dir']}")
console.print(f" 处理时间: {result['processing_time']:.1f}s")
console.print(f" 场景数量: {result['total_scenes']}")
console.print(f" 切分片段: {result['total_segments']}")
# 显示详细结果表格
if result.get("split_results") and verbose:
table = Table(title="切分结果详情")
table.add_column("片段", style="cyan")
table.add_column("输出文件", style="green")
table.add_column("时长", style="yellow")
table.add_column("文件大小", style="blue")
table.add_column("状态", style="magenta")
for split_result in result["split_results"]:
status = "✅ 成功" if split_result["success"] else f"{split_result.get('error', '失败')}"
file_size = f"{split_result['file_size']:,} B" if split_result['file_size'] > 0 else "0 B"
duration = f"{split_result['duration']:.2f}s"
filename = Path(split_result['output_path']).name
table.add_row(
str(split_result['scene_index'] + 1),
filename,
duration,
file_size,
status
)
console.print(table)
else:
console.print(f"\n[red]❌ 处理失败: {result.get('error', '未知错误')}[/red]")
raise typer.Exit(1)
except Exception as e:
console.print(f"\n[red]❌ 单个视频处理失败: {str(e)}[/red]")
raise typer.Exit(1)
@scene_detect.command("import")
def import_to_project(
video_path: str = typer.Argument(..., help="视频文件路径"),
project_id: str = typer.Argument(..., help="项目ID"),
project_directory: str = typer.Argument(..., help="项目目录路径"),
tags: Optional[str] = typer.Option(None, "--tags", "-t", help="素材标签,用逗号分隔"),
detector_type: str = typer.Option("content", "--detector", "-d", help="检测器类型"),
threshold: float = typer.Option(30.0, "--threshold", help="检测阈值"),
min_scene_length: float = typer.Option(1.0, "--min-length", help="最小场景长度(秒)"),
split_quality: int = typer.Option(23, "--quality", "-q", help="切分质量"),
split_preset: str = typer.Option("fast", "--preset", help="编码预设"),
max_duration: float = typer.Option(2.0, "--max-duration", "-m", help="最大视频时长限制(秒)"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出")
):
"""导入单个视频到项目素材库"""
try:
console.print(f"📦 [bold blue]导入视频到项目素材库[/bold blue]")
console.print(f"📁 视频文件: {video_path}")
console.print(f"🎯 项目ID: {project_id}")
console.print(f"📂 项目目录: {project_directory}")
# 解析标签
material_tags = []
if tags:
material_tags = [tag.strip() for tag in tags.split(",") if tag.strip()]
console.print(f"🏷️ 素材标签: {', '.join(material_tags)}")
# 验证输入参数
try:
detector_type_enum = DetectorType(detector_type)
except ValueError as e:
console.print(f"[red]❌ 参数错误: {e}[/red]")
raise typer.Exit(1)
# 创建检测器
detector = SingleSceneDetector()
# 验证视频文件
validation_result = detector.validate_video(video_path)
if not validation_result["valid"]:
console.print(f"[red]❌ 视频文件验证失败: {validation_result['error']}[/red]")
raise typer.Exit(1)
# 验证项目目录
project_dir_path = Path(project_directory)
if not project_dir_path.exists():
console.print(f"[red]❌ 项目目录不存在: {project_directory}[/red]")
raise typer.Exit(1)
console.print(f"✅ 验证通过,开始导入...")
# 执行导入
result = detector.import_to_project(
video_path=video_path,
project_id=project_id,
project_directory=project_directory,
material_tags=material_tags,
detector_type=detector_type_enum,
threshold=threshold,
min_scene_length=min_scene_length,
split_quality=split_quality,
split_preset=split_preset,
max_video_duration=max_duration
)
# 显示结果
if result["success"]:
console.print(f"\n✅ [bold green]导入完成![/bold green]")
console.print(f"📊 导入统计:")
console.print(f" 视频文件: {Path(result['video_path']).name}")
console.print(f" 处理时间: {result['processing_time']:.1f}s")
console.print(f" 场景数量: {result['total_scenes']}")
console.print(f" 导入片段: {result['total_segments']}")
console.print(f" 项目目录: {result['output_dir']}")
# 显示详细结果
if result.get("split_results") and verbose:
table = Table(title="导入素材详情")
table.add_column("片段", style="cyan")
table.add_column("时长", style="yellow")
table.add_column("文件大小", style="blue")
table.add_column("状态", style="magenta")
for split_result in result["split_results"]:
status = "✅ 已导入" if split_result["success"] else f"{split_result.get('error', '失败')}"
file_size = f"{split_result['file_size']:,} B" if split_result['file_size'] > 0 else "0 B"
duration = f"{split_result['duration']:.2f}s"
table.add_row(
str(split_result['scene_index'] + 1),
duration,
file_size,
status
)
console.print(table)
else:
console.print(f"\n[red]❌ 导入失败: {result.get('error', '未知错误')}[/red]")
raise typer.Exit(1)
except Exception as e:
console.print(f"\n[red]❌ 项目导入失败: {str(e)}[/red]")
raise typer.Exit(1)
if __name__ == "__main__":
scene_detect()