""" 项目素材管理CLI命令 """ from pathlib import Path from typing import Optional, List import typer from rich.console import Console from rich.table import Table from python_core.services.project_material_service import ProjectMaterialService from python_core.scene_detection.single_scene_detector import SingleSceneDetector from python_core.scene_detection.types.enums import DetectorType console = Console() material_app = typer.Typer(name="material", help="项目素材管理命令") @material_app.command("import") def import_video( video_path: str = typer.Argument(..., help="视频文件路径"), project_id: str = typer.Argument(..., help="项目ID"), project_directory: str = typer.Argument(..., help="项目目录路径"), tags: Optional[str] = typer.Option(None, "--tags", "-t", help="素材标签,用逗号分隔"), detector_type: str = typer.Option("content", "--detector", "-d", help="检测器类型"), threshold: float = typer.Option(30.0, "--threshold", help="检测阈值"), min_scene_length: float = typer.Option(1.0, "--min-length", help="最小场景长度(秒)"), split_quality: int = typer.Option(23, "--quality", "-q", help="切分质量"), split_preset: str = typer.Option("fast", "--preset", help="编码预设"), max_duration: float = typer.Option(2.0, "--max-duration", "-m", help="最大视频时长限制(秒)"), verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出") ): """导入单个视频到项目素材库""" try: console.print(f"📦 [bold blue]导入视频到项目素材库[/bold blue]") console.print(f"📁 视频文件: {video_path}") console.print(f"🎯 项目ID: {project_id}") console.print(f"📂 项目目录: {project_directory}") # 解析标签 material_tags = [] if tags: material_tags = [tag.strip() for tag in tags.split(",") if tag.strip()] console.print(f"🏷️ 素材标签: {', '.join(material_tags)}") # 验证输入参数 try: detector_type_enum = DetectorType(detector_type) except ValueError as e: console.print(f"[red]❌ 参数错误: {e}[/red]") raise typer.Exit(1) # 创建检测器 detector = SingleSceneDetector() # 验证视频文件 validation_result = detector.validate_video(video_path) if not validation_result["valid"]: console.print(f"[red]❌ 视频文件验证失败: {validation_result['error']}[/red]") raise typer.Exit(1) # 验证项目目录 project_dir_path = Path(project_directory) if not project_dir_path.exists(): console.print(f"[red]❌ 项目目录不存在: {project_directory}[/red]") raise typer.Exit(1) console.print(f"✅ 验证通过,开始导入...") # 执行导入 result = detector.import_to_project( video_path=video_path, project_id=project_id, project_directory=project_directory, material_tags=material_tags, detector_type=detector_type_enum, threshold=threshold, min_scene_length=min_scene_length, split_quality=split_quality, split_preset=split_preset, max_video_duration=max_duration ) # 显示结果 if result["success"]: console.print(f"\n✅ [bold green]导入完成![/bold green]") console.print(f"📊 导入统计:") console.print(f" 视频文件: {Path(result['video_path']).name}") console.print(f" 处理时间: {result['processing_time']:.1f}s") console.print(f" 场景数量: {result['total_scenes']}") console.print(f" 导入片段: {result['total_segments']}") console.print(f" 项目目录: {result['output_dir']}") # 显示详细结果 if result.get("split_results") and verbose: table = Table(title="导入素材详情") table.add_column("片段", style="cyan") table.add_column("时长", style="yellow") table.add_column("文件大小", style="blue") table.add_column("状态", style="magenta") for split_result in result["split_results"]: status = "✅ 已导入" if split_result["success"] else f"❌ {split_result.get('error', '失败')}" file_size = f"{split_result['file_size']:,} B" if split_result['file_size'] > 0 else "0 B" duration = f"{split_result['duration']:.2f}s" table.add_row( str(split_result['scene_index'] + 1), duration, file_size, status ) console.print(table) else: console.print(f"\n[red]❌ 导入失败: {result.get('error', '未知错误')}[/red]") raise typer.Exit(1) except Exception as e: console.print(f"\n[red]❌ 项目导入失败: {str(e)}[/red]") raise typer.Exit(1) @material_app.command("list") def list_materials( project_directory: str = typer.Argument(..., help="项目目录路径"), tags: Optional[str] = typer.Option(None, "--tags", "-t", help="按标签过滤,用逗号分隔"), limit: int = typer.Option(20, "--limit", "-l", help="显示数量限制"), verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出") ): """列出项目素材""" try: console.print(f"📋 [bold blue]项目素材列表[/bold blue]") console.print(f"📂 项目目录: {project_directory}") # 验证项目目录 project_dir_path = Path(project_directory) if not project_dir_path.exists(): console.print(f"[red]❌ 项目目录不存在: {project_directory}[/red]") raise typer.Exit(1) # 创建素材服务 material_service = ProjectMaterialService() # 获取素材列表 materials = material_service.get_project_materials(project_dir_path) if not materials: console.print("📭 项目中没有素材") return # 按标签过滤 if tags: filter_tags = [tag.strip() for tag in tags.split(",") if tag.strip()] materials = [ material for material in materials if any(tag in material.get('tags', []) for tag in filter_tags) ] console.print(f"🏷️ 按标签过滤: {', '.join(filter_tags)}") # 限制显示数量 if len(materials) > limit: materials = materials[:limit] console.print(f"📊 显示前 {limit} 个素材(共 {len(materials)} 个)") # 创建表格 table = Table(title="项目素材列表") table.add_column("ID", style="cyan", width=8) table.add_column("原始文件名", style="green") table.add_column("时长", style="yellow") table.add_column("文件大小", style="blue") table.add_column("标签", style="magenta") table.add_column("使用次数", style="red") if verbose: table.add_column("创建时间", style="dim") for material in materials: file_size = f"{material.get('file_size', 0):,} B" duration = f"{material.get('duration', 0):.2f}s" tags_str = ", ".join(material.get('tags', [])) use_count = str(material.get('use_count', 0)) row = [ material.get('id', '')[:8], material.get('original_filename', ''), duration, file_size, tags_str, use_count ] if verbose: created_at = material.get('created_at', '')[:19].replace('T', ' ') row.append(created_at) table.add_row(*row) console.print(table) except Exception as e: console.print(f"\n[red]❌ 获取素材列表失败: {str(e)}[/red]") raise typer.Exit(1) @material_app.command("stats") def show_stats( project_directory: str = typer.Argument(..., help="项目目录路径") ): """显示项目素材统计信息""" try: console.print(f"📊 [bold blue]项目素材统计[/bold blue]") console.print(f"📂 项目目录: {project_directory}") # 验证项目目录 project_dir_path = Path(project_directory) if not project_dir_path.exists(): console.print(f"[red]❌ 项目目录不存在: {project_directory}[/red]") raise typer.Exit(1) # 创建素材服务 material_service = ProjectMaterialService() # 获取统计信息 stats = material_service.get_material_stats(project_dir_path) console.print(f"\n📈 [bold green]统计信息[/bold green]") console.print(f" 总素材数: {stats['total_count']}") console.print(f" 总文件大小: {stats['total_size']:,} 字节") console.print(f" 总时长: {stats['total_duration']:.2f} 秒") console.print(f" 已使用: {stats['used_count']}") console.print(f" 未使用: {stats['unused_count']}") # 标签统计 if stats['tag_stats']: console.print(f"\n🏷️ [bold green]标签统计[/bold green]") for tag, count in sorted(stats['tag_stats'].items(), key=lambda x: x[1], reverse=True): console.print(f" {tag}: {count}") except Exception as e: console.print(f"\n[red]❌ 获取统计信息失败: {str(e)}[/red]") raise typer.Exit(1) @material_app.command("remove") def remove_material( project_directory: str = typer.Argument(..., help="项目目录路径"), material_id: str = typer.Argument(..., help="素材ID"), force: bool = typer.Option(False, "--force", "-f", help="强制删除,不询问确认") ): """删除项目素材""" try: console.print(f"🗑️ [bold red]删除项目素材[/bold red]") console.print(f"📂 项目目录: {project_directory}") console.print(f"🎯 素材ID: {material_id}") # 验证项目目录 project_dir_path = Path(project_directory) if not project_dir_path.exists(): console.print(f"[red]❌ 项目目录不存在: {project_directory}[/red]") raise typer.Exit(1) # 创建素材服务 material_service = ProjectMaterialService() # 确认删除 if not force: confirm = typer.confirm("确定要删除这个素材吗?此操作不可撤销。") if not confirm: console.print("❌ 操作已取消") return # 删除素材 success = material_service.remove_material(project_dir_path, material_id) if success: console.print(f"✅ [bold green]素材删除成功[/bold green]") else: console.print(f"[red]❌ 素材删除失败[/red]") raise typer.Exit(1) except Exception as e: console.print(f"\n[red]❌ 删除素材失败: {str(e)}[/red]") raise typer.Exit(1) if __name__ == "__main__": material_app()