mxivideo/python_core/services/base.py

221 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
服务基类
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Callable
from python_core.storage import StorageInterface, get_storage
from python_core.utils.logger import logger
class ServiceBase(ABC):
"""服务基类"""
def __init__(self, storage: Optional[StorageInterface] = None):
"""初始化服务
Args:
storage: 存储接口实例如果为None则使用默认存储
"""
self.storage = storage or get_storage(self.get_service_name())
logger.info(f"Service {self.get_service_name()} initialized with storage: {type(self.storage).__name__}")
@abstractmethod
def get_service_name(self) -> str:
"""获取服务名称
Returns:
str: 服务名称,用作存储键前缀
"""
pass
def get_collection_name(self, collection_type: str) -> str:
"""获取集合名称
Args:
collection_type: 集合类型
Returns:
str: 完整的集合名称
"""
return f"{self.get_service_name()}_{collection_type}"
def save_data(self, collection_type: str, key: str, data: Any) -> bool:
"""保存数据
Args:
collection_type: 集合类型
key: 数据键
data: 要保存的数据
Returns:
bool: 保存是否成功
"""
collection = self.get_collection_name(collection_type)
return self.storage.save(collection, key, data)
def load_data(self, collection_type: str, key: str) -> Any:
"""加载数据
Args:
collection_type: 集合类型
key: 数据键
Returns:
Any: 加载的数据不存在时返回None
"""
collection = self.get_collection_name(collection_type)
return self.storage.load(collection, key)
def delete_data(self, collection_type: str, key: str) -> bool:
"""删除数据
Args:
collection_type: 集合类型
key: 数据键
Returns:
bool: 删除是否成功
"""
collection = self.get_collection_name(collection_type)
return self.storage.delete(collection, key)
def exists_data(self, collection_type: str, key: str) -> bool:
"""检查数据是否存在
Args:
collection_type: 集合类型
key: 数据键
Returns:
bool: 数据是否存在
"""
collection = self.get_collection_name(collection_type)
return self.storage.exists(collection, key)
def list_keys(self, collection_type: str, pattern: str = "*") -> list[str]:
"""列出集合中的所有键
Args:
collection_type: 集合类型
pattern: 键的模式匹配
Returns:
List[str]: 键列表
"""
collection = self.get_collection_name(collection_type)
return self.storage.list_keys(collection, pattern)
def save_batch_data(self, collection_type: str, data: Dict[str, Any]) -> bool:
"""批量保存数据
Args:
collection_type: 集合类型
data: 键值对数据
Returns:
bool: 保存是否成功
"""
collection = self.get_collection_name(collection_type)
return self.storage.save_batch(collection, data)
def load_batch_data(self, collection_type: str, keys: list[str]) -> Dict[str, Any]:
"""批量加载数据
Args:
collection_type: 集合类型
keys: 键列表
Returns:
Dict[str, Any]: 键值对数据
"""
collection = self.get_collection_name(collection_type)
return self.storage.load_batch(collection, keys)
def clear_collection(self, collection_type: str) -> bool:
"""清空集合
Args:
collection_type: 集合类型
Returns:
bool: 清空是否成功
"""
collection = self.get_collection_name(collection_type)
return self.storage.clear_collection(collection)
def get_collection_stats(self, collection_type: str) -> Dict[str, Any]:
"""获取集合统计信息
Args:
collection_type: 集合类型
Returns:
Dict[str, Any]: 统计信息
"""
collection = self.get_collection_name(collection_type)
return self.storage.get_stats(collection)
def get_all_collections(self) -> list[str]:
"""获取服务的所有集合
Returns:
List[str]: 集合名称列表
"""
all_collections = self.storage.get_collections()
service_prefix = f"{self.get_service_name()}_"
return [c for c in all_collections if c.startswith(service_prefix)]
def close(self):
"""关闭服务"""
if hasattr(self.storage, 'close'):
self.storage.close()
logger.info(f"Service {self.get_service_name()} closed")
class ProgressServiceBase(ServiceBase):
"""支持进度回调的服务基类"""
def __init__(self, storage: Optional[StorageInterface] = None):
super().__init__(storage)
self._progress_callback: Optional[Callable[[str], None]] = None
def set_progress_callback(self, callback: Optional[Callable[[str], None]]):
"""设置进度回调函数
Args:
callback: 进度回调函数,接收进度消息字符串
"""
self._progress_callback = callback
def report_progress(self, message: str):
"""报告进度
Args:
message: 进度消息
"""
if self._progress_callback:
self._progress_callback(message)
logger.debug(f"Progress: {message}")
def execute_with_progress(self, operation_name: str, operation_func: Callable, *args, **kwargs) -> Any:
"""执行带进度报告的操作
Args:
operation_name: 操作名称
operation_func: 操作函数
*args: 操作函数参数
**kwargs: 操作函数关键字参数
Returns:
Any: 操作结果
"""
self.report_progress(f"开始 {operation_name}")
try:
result = operation_func(*args, **kwargs)
self.report_progress(f"完成 {operation_name}")
return result
except Exception as e:
self.report_progress(f"失败 {operation_name}: {str(e)}")
raise