364 lines
12 KiB
Python
364 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
通用命令行工具函数
|
|
从video_splitter等服务中抽象出的可复用功能
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional, Callable, Tuple
|
|
from functools import wraps
|
|
|
|
class DependencyChecker:
|
|
"""依赖检查器"""
|
|
|
|
@staticmethod
|
|
def check_optional_dependency(
|
|
module_name: str,
|
|
import_items: List[str],
|
|
fallback_setup: Optional[Callable] = None,
|
|
success_message: str = None,
|
|
error_message: str = None
|
|
) -> Tuple[bool, Dict[str, Any]]:
|
|
"""
|
|
通用的可选依赖检查函数
|
|
|
|
Args:
|
|
module_name: 模块名称
|
|
import_items: 要导入的项目列表
|
|
fallback_setup: 导入失败时的回退设置函数
|
|
success_message: 成功时的日志消息
|
|
error_message: 失败时的日志消息
|
|
|
|
Returns:
|
|
(是否可用, 导入的模块字典)
|
|
"""
|
|
try:
|
|
# 动态导入
|
|
module = __import__(module_name)
|
|
imported_items = {}
|
|
|
|
for item in import_items:
|
|
if '.' in item:
|
|
# 处理子模块导入,如 'scenedetect.VideoManager'
|
|
parts = item.split('.')
|
|
obj = module
|
|
for part in parts[1:]: # 跳过第一部分(模块名)
|
|
obj = getattr(obj, part)
|
|
imported_items[parts[-1]] = obj
|
|
else:
|
|
# 直接从模块导入
|
|
imported_items[item] = getattr(module, item)
|
|
|
|
# 记录成功日志
|
|
if success_message:
|
|
logging.info(success_message)
|
|
|
|
return True, imported_items
|
|
|
|
except ImportError as e:
|
|
# 执行回退设置
|
|
if fallback_setup:
|
|
fallback_setup()
|
|
|
|
# 记录错误日志
|
|
if error_message:
|
|
logging.warning(f"{error_message}: {e}")
|
|
|
|
return False, {}
|
|
|
|
class CommandLineParser:
|
|
"""命令行参数解析器"""
|
|
|
|
@staticmethod
|
|
def parse_command_args(
|
|
args: List[str],
|
|
arg_definitions: Dict[str, Dict[str, Any]]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
通用的命令行参数解析函数
|
|
|
|
Args:
|
|
args: 命令行参数列表
|
|
arg_definitions: 参数定义字典
|
|
格式: {
|
|
"threshold": {"type": float, "default": 30.0},
|
|
"detector": {"type": str, "default": "content", "choices": ["content", "threshold"]},
|
|
"output-dir": {"type": str, "default": None}
|
|
}
|
|
|
|
Returns:
|
|
解析后的参数字典
|
|
"""
|
|
parsed_args = {}
|
|
|
|
# 设置默认值
|
|
for arg_name, definition in arg_definitions.items():
|
|
key = arg_name.replace('-', '_')
|
|
parsed_args[key] = definition.get('default')
|
|
|
|
# 解析参数
|
|
i = 0
|
|
while i < len(args):
|
|
arg = args[i]
|
|
|
|
if arg.startswith('--'):
|
|
arg_name = arg[2:] # 移除 '--'
|
|
|
|
if arg_name in arg_definitions:
|
|
definition = arg_definitions[arg_name]
|
|
|
|
# 检查是否有值
|
|
if i + 1 < len(args) and not args[i + 1].startswith('--'):
|
|
value_str = args[i + 1]
|
|
|
|
# 类型转换
|
|
try:
|
|
arg_type = definition.get('type', str)
|
|
if arg_type == bool:
|
|
value = value_str.lower() in ('true', '1', 'yes', 'on')
|
|
else:
|
|
value = arg_type(value_str)
|
|
|
|
# 检查选择范围
|
|
choices = definition.get('choices')
|
|
if choices and value not in choices:
|
|
raise ValueError(f"Invalid choice for {arg_name}: {value}. Choices: {choices}")
|
|
|
|
key = arg_name.replace('-', '_')
|
|
parsed_args[key] = value
|
|
i += 2
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid value for {arg_name}: {value_str}. {e}")
|
|
else:
|
|
# 布尔标志
|
|
if definition.get('type') == bool:
|
|
key = arg_name.replace('-', '_')
|
|
parsed_args[key] = True
|
|
i += 1
|
|
else:
|
|
raise ValueError(f"Missing value for argument: {arg_name}")
|
|
else:
|
|
# 未知参数,跳过
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
return parsed_args
|
|
|
|
class JSONRPCHandler:
|
|
"""JSON-RPC响应处理器"""
|
|
|
|
@staticmethod
|
|
def handle_command_response(
|
|
rpc_handler: Optional[Any],
|
|
result: Dict[str, Any],
|
|
error_code: str,
|
|
fallback_message: str = "Operation failed"
|
|
) -> None:
|
|
"""
|
|
通用的命令响应处理函数
|
|
|
|
Args:
|
|
rpc_handler: JSON-RPC处理器实例
|
|
result: 操作结果
|
|
error_code: 错误代码
|
|
fallback_message: 默认错误消息
|
|
"""
|
|
if rpc_handler:
|
|
# 使用JSON-RPC格式
|
|
if isinstance(result, dict) and result.get("success", True):
|
|
rpc_handler.success(result)
|
|
else:
|
|
error_msg = result.get("error", fallback_message) if isinstance(result, dict) else fallback_message
|
|
rpc_handler.error(error_code, error_msg)
|
|
else:
|
|
# 直接输出JSON
|
|
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
|
|
class FileUtils:
|
|
"""文件处理工具"""
|
|
|
|
@staticmethod
|
|
def validate_input_file(file_path: str, file_type: str = "file") -> str:
|
|
"""
|
|
通用的输入文件验证函数
|
|
|
|
Args:
|
|
file_path: 文件路径
|
|
file_type: 文件类型描述
|
|
|
|
Returns:
|
|
验证后的文件路径
|
|
|
|
Raises:
|
|
FileNotFoundError: 文件不存在
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"{file_type.capitalize()} file not found: {file_path}")
|
|
|
|
if not os.path.isfile(file_path):
|
|
raise ValueError(f"Path is not a file: {file_path}")
|
|
|
|
return os.path.abspath(file_path)
|
|
|
|
@staticmethod
|
|
def create_timestamped_output_dir(
|
|
base_dir: str,
|
|
name_prefix: str,
|
|
timestamp_format: str = "%Y%m%d_%H%M%S"
|
|
) -> Path:
|
|
"""
|
|
通用的时间戳输出目录创建函数
|
|
|
|
Args:
|
|
base_dir: 基础目录
|
|
name_prefix: 名称前缀
|
|
timestamp_format: 时间戳格式
|
|
|
|
Returns:
|
|
创建的目录路径
|
|
"""
|
|
timestamp = datetime.now().strftime(timestamp_format)
|
|
output_dir = Path(base_dir) / f"{name_prefix}_{timestamp}"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
return output_dir
|
|
|
|
@staticmethod
|
|
def scan_files_by_extension(directory: str, extensions: List[str]) -> List[str]:
|
|
"""
|
|
扫描目录中指定扩展名的文件
|
|
|
|
Args:
|
|
directory: 目录路径
|
|
extensions: 扩展名列表 (如 ['.mp4', '.avi'])
|
|
|
|
Returns:
|
|
文件路径列表
|
|
"""
|
|
files = []
|
|
directory = Path(directory)
|
|
|
|
if directory.exists() and directory.is_dir():
|
|
for ext in extensions:
|
|
files.extend(directory.rglob(f"*{ext}"))
|
|
|
|
return [str(f) for f in files]
|
|
|
|
class PerformanceUtils:
|
|
"""性能测量工具"""
|
|
|
|
@staticmethod
|
|
def measure_execution_time(func: Callable) -> Callable:
|
|
"""
|
|
执行时间测量装饰器
|
|
|
|
Args:
|
|
func: 要测量的函数
|
|
|
|
Returns:
|
|
装饰后的函数,返回 (result, execution_time)
|
|
"""
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
result = func(*args, **kwargs)
|
|
execution_time = time.time() - start_time
|
|
return result, execution_time
|
|
return wrapper
|
|
|
|
@staticmethod
|
|
def time_operation(operation: Callable, *args, **kwargs) -> Tuple[Any, float]:
|
|
"""
|
|
测量操作执行时间
|
|
|
|
Args:
|
|
operation: 要执行的操作
|
|
*args, **kwargs: 操作参数
|
|
|
|
Returns:
|
|
(操作结果, 执行时间)
|
|
"""
|
|
start_time = time.time()
|
|
result = operation(*args, **kwargs)
|
|
execution_time = time.time() - start_time
|
|
return result, execution_time
|
|
|
|
class LoggingUtils:
|
|
"""日志工具"""
|
|
|
|
@staticmethod
|
|
def setup_fallback_logger(
|
|
name: str,
|
|
level: int = logging.INFO,
|
|
format_string: str = '%(asctime)s | %(levelname)s | %(name)s | %(message)s'
|
|
) -> logging.Logger:
|
|
"""
|
|
设置回退日志记录器
|
|
|
|
Args:
|
|
name: 日志记录器名称
|
|
level: 日志级别
|
|
format_string: 日志格式
|
|
|
|
Returns:
|
|
配置好的日志记录器
|
|
"""
|
|
logger = logging.getLogger(name)
|
|
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(format_string)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.setLevel(level)
|
|
|
|
return logger
|
|
|
|
# 便捷函数
|
|
def create_command_service_base(
|
|
service_name: str,
|
|
required_dependencies: Dict[str, Dict[str, Any]] = None,
|
|
optional_dependencies: Dict[str, Dict[str, Any]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
创建命令服务的基础设置
|
|
|
|
Args:
|
|
service_name: 服务名称
|
|
required_dependencies: 必需依赖
|
|
optional_dependencies: 可选依赖
|
|
|
|
Returns:
|
|
服务基础配置字典
|
|
"""
|
|
config = {
|
|
"service_name": service_name,
|
|
"logger": LoggingUtils.setup_fallback_logger(service_name),
|
|
"dependencies": {},
|
|
"available_features": []
|
|
}
|
|
|
|
# 检查依赖
|
|
if required_dependencies:
|
|
for dep_name, dep_config in required_dependencies.items():
|
|
available, items = DependencyChecker.check_optional_dependency(**dep_config)
|
|
if not available:
|
|
raise ImportError(f"Required dependency {dep_name} is not available")
|
|
config["dependencies"][dep_name] = items
|
|
config["available_features"].append(dep_name)
|
|
|
|
if optional_dependencies:
|
|
for dep_name, dep_config in optional_dependencies.items():
|
|
available, items = DependencyChecker.check_optional_dependency(**dep_config)
|
|
if available:
|
|
config["dependencies"][dep_name] = items
|
|
config["available_features"].append(dep_name)
|
|
|
|
return config
|