#!/usr/bin/env python3 """ 通用命令行工具函数 从video_splitter等服务中抽象出的可复用功能 """ import os import sys import json import time import logging from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional, Callable, Tuple from functools import wraps class DependencyChecker: """依赖检查器""" @staticmethod def check_optional_dependency( module_name: str, import_items: List[str], fallback_setup: Optional[Callable] = None, success_message: str = None, error_message: str = None ) -> Tuple[bool, Dict[str, Any]]: """ 通用的可选依赖检查函数 Args: module_name: 模块名称 import_items: 要导入的项目列表 fallback_setup: 导入失败时的回退设置函数 success_message: 成功时的日志消息 error_message: 失败时的日志消息 Returns: (是否可用, 导入的模块字典) """ try: # 动态导入 module = __import__(module_name) imported_items = {} for item in import_items: if '.' in item: # 处理子模块导入,如 'scenedetect.VideoManager' parts = item.split('.') obj = module for part in parts[1:]: # 跳过第一部分(模块名) obj = getattr(obj, part) imported_items[parts[-1]] = obj else: # 直接从模块导入 imported_items[item] = getattr(module, item) # 记录成功日志 if success_message: logging.info(success_message) return True, imported_items except ImportError as e: # 执行回退设置 if fallback_setup: fallback_setup() # 记录错误日志 if error_message: logging.warning(f"{error_message}: {e}") return False, {} class CommandLineParser: """命令行参数解析器""" @staticmethod def parse_command_args( args: List[str], arg_definitions: Dict[str, Dict[str, Any]] ) -> Dict[str, Any]: """ 通用的命令行参数解析函数 Args: args: 命令行参数列表 arg_definitions: 参数定义字典 格式: { "threshold": {"type": float, "default": 30.0}, "detector": {"type": str, "default": "content", "choices": ["content", "threshold"]}, "output-dir": {"type": str, "default": None} } Returns: 解析后的参数字典 """ parsed_args = {} # 设置默认值 for arg_name, definition in arg_definitions.items(): key = arg_name.replace('-', '_') parsed_args[key] = definition.get('default') # 解析参数 i = 0 while i < len(args): arg = args[i] if arg.startswith('--'): arg_name = arg[2:] # 移除 '--' if arg_name in arg_definitions: definition = arg_definitions[arg_name] # 检查是否有值 if i + 1 < len(args) and not args[i + 1].startswith('--'): value_str = args[i + 1] # 类型转换 try: arg_type = definition.get('type', str) if arg_type == bool: value = value_str.lower() in ('true', '1', 'yes', 'on') else: value = arg_type(value_str) # 检查选择范围 choices = definition.get('choices') if choices and value not in choices: raise ValueError(f"Invalid choice for {arg_name}: {value}. Choices: {choices}") key = arg_name.replace('-', '_') parsed_args[key] = value i += 2 except (ValueError, TypeError) as e: raise ValueError(f"Invalid value for {arg_name}: {value_str}. {e}") else: # 布尔标志 if definition.get('type') == bool: key = arg_name.replace('-', '_') parsed_args[key] = True i += 1 else: raise ValueError(f"Missing value for argument: {arg_name}") else: # 未知参数,跳过 i += 1 else: i += 1 return parsed_args class JSONRPCHandler: """JSON-RPC响应处理器""" @staticmethod def handle_command_response( rpc_handler: Optional[Any], result: Dict[str, Any], error_code: str, fallback_message: str = "Operation failed" ) -> None: """ 通用的命令响应处理函数 Args: rpc_handler: JSON-RPC处理器实例 result: 操作结果 error_code: 错误代码 fallback_message: 默认错误消息 """ if rpc_handler: # 使用JSON-RPC格式 if isinstance(result, dict) and result.get("success", True): rpc_handler.success(result) else: error_msg = result.get("error", fallback_message) if isinstance(result, dict) else fallback_message rpc_handler.error(error_code, error_msg) else: # 直接输出JSON print(json.dumps(result, indent=2, ensure_ascii=False)) class FileUtils: """文件处理工具""" @staticmethod def validate_input_file(file_path: str, file_type: str = "file") -> str: """ 通用的输入文件验证函数 Args: file_path: 文件路径 file_type: 文件类型描述 Returns: 验证后的文件路径 Raises: FileNotFoundError: 文件不存在 """ if not os.path.exists(file_path): raise FileNotFoundError(f"{file_type.capitalize()} file not found: {file_path}") if not os.path.isfile(file_path): raise ValueError(f"Path is not a file: {file_path}") return os.path.abspath(file_path) @staticmethod def create_timestamped_output_dir( base_dir: str, name_prefix: str, timestamp_format: str = "%Y%m%d_%H%M%S" ) -> Path: """ 通用的时间戳输出目录创建函数 Args: base_dir: 基础目录 name_prefix: 名称前缀 timestamp_format: 时间戳格式 Returns: 创建的目录路径 """ timestamp = datetime.now().strftime(timestamp_format) output_dir = Path(base_dir) / f"{name_prefix}_{timestamp}" output_dir.mkdir(parents=True, exist_ok=True) return output_dir @staticmethod def scan_files_by_extension(directory: str, extensions: List[str]) -> List[str]: """ 扫描目录中指定扩展名的文件 Args: directory: 目录路径 extensions: 扩展名列表 (如 ['.mp4', '.avi']) Returns: 文件路径列表 """ files = [] directory = Path(directory) if directory.exists() and directory.is_dir(): for ext in extensions: files.extend(directory.rglob(f"*{ext}")) return [str(f) for f in files] class PerformanceUtils: """性能测量工具""" @staticmethod def measure_execution_time(func: Callable) -> Callable: """ 执行时间测量装饰器 Args: func: 要测量的函数 Returns: 装饰后的函数,返回 (result, execution_time) """ @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) execution_time = time.time() - start_time return result, execution_time return wrapper @staticmethod def time_operation(operation: Callable, *args, **kwargs) -> Tuple[Any, float]: """ 测量操作执行时间 Args: operation: 要执行的操作 *args, **kwargs: 操作参数 Returns: (操作结果, 执行时间) """ start_time = time.time() result = operation(*args, **kwargs) execution_time = time.time() - start_time return result, execution_time class LoggingUtils: """日志工具""" @staticmethod def setup_fallback_logger( name: str, level: int = logging.INFO, format_string: str = '%(asctime)s | %(levelname)s | %(name)s | %(message)s' ) -> logging.Logger: """ 设置回退日志记录器 Args: name: 日志记录器名称 level: 日志级别 format_string: 日志格式 Returns: 配置好的日志记录器 """ logger = logging.getLogger(name) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter(format_string) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(level) return logger # 便捷函数 def create_command_service_base( service_name: str, required_dependencies: Dict[str, Dict[str, Any]] = None, optional_dependencies: Dict[str, Dict[str, Any]] = None ) -> Dict[str, Any]: """ 创建命令服务的基础设置 Args: service_name: 服务名称 required_dependencies: 必需依赖 optional_dependencies: 可选依赖 Returns: 服务基础配置字典 """ config = { "service_name": service_name, "logger": LoggingUtils.setup_fallback_logger(service_name), "dependencies": {}, "available_features": [] } # 检查依赖 if required_dependencies: for dep_name, dep_config in required_dependencies.items(): available, items = DependencyChecker.check_optional_dependency(**dep_config) if not available: raise ImportError(f"Required dependency {dep_name} is not available") config["dependencies"][dep_name] = items config["available_features"].append(dep_name) if optional_dependencies: for dep_name, dep_config in optional_dependencies.items(): available, items = DependencyChecker.check_optional_dependency(**dep_config) if available: config["dependencies"][dep_name] = items config["available_features"].append(dep_name) return config