221 lines
6.7 KiB
Python
221 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
服务基类
|
||
"""
|
||
|
||
from abc import ABC, abstractmethod
|
||
from typing import Any, Dict, Optional, Callable
|
||
from python_core.storage import StorageInterface, get_storage
|
||
from python_core.utils.logger import logger
|
||
|
||
class ServiceBase(ABC):
|
||
"""服务基类"""
|
||
|
||
def __init__(self, storage: Optional[StorageInterface] = None):
|
||
"""初始化服务
|
||
|
||
Args:
|
||
storage: 存储接口实例,如果为None则使用默认存储
|
||
"""
|
||
self.storage = storage or get_storage(self.get_service_name())
|
||
logger.info(f"Service {self.get_service_name()} initialized with storage: {type(self.storage).__name__}")
|
||
|
||
@abstractmethod
|
||
def get_service_name(self) -> str:
|
||
"""获取服务名称
|
||
|
||
Returns:
|
||
str: 服务名称,用作存储键前缀
|
||
"""
|
||
pass
|
||
|
||
def get_collection_name(self, collection_type: str) -> str:
|
||
"""获取集合名称
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
|
||
Returns:
|
||
str: 完整的集合名称
|
||
"""
|
||
return f"{self.get_service_name()}_{collection_type}"
|
||
|
||
def save_data(self, collection_type: str, key: str, data: Any) -> bool:
|
||
"""保存数据
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
key: 数据键
|
||
data: 要保存的数据
|
||
|
||
Returns:
|
||
bool: 保存是否成功
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.save(collection, key, data)
|
||
|
||
def load_data(self, collection_type: str, key: str) -> Any:
|
||
"""加载数据
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
key: 数据键
|
||
|
||
Returns:
|
||
Any: 加载的数据,不存在时返回None
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.load(collection, key)
|
||
|
||
def delete_data(self, collection_type: str, key: str) -> bool:
|
||
"""删除数据
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
key: 数据键
|
||
|
||
Returns:
|
||
bool: 删除是否成功
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.delete(collection, key)
|
||
|
||
def exists_data(self, collection_type: str, key: str) -> bool:
|
||
"""检查数据是否存在
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
key: 数据键
|
||
|
||
Returns:
|
||
bool: 数据是否存在
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.exists(collection, key)
|
||
|
||
def list_keys(self, collection_type: str, pattern: str = "*") -> list[str]:
|
||
"""列出集合中的所有键
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
pattern: 键的模式匹配
|
||
|
||
Returns:
|
||
List[str]: 键列表
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.list_keys(collection, pattern)
|
||
|
||
def save_batch_data(self, collection_type: str, data: Dict[str, Any]) -> bool:
|
||
"""批量保存数据
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
data: 键值对数据
|
||
|
||
Returns:
|
||
bool: 保存是否成功
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.save_batch(collection, data)
|
||
|
||
def load_batch_data(self, collection_type: str, keys: list[str]) -> Dict[str, Any]:
|
||
"""批量加载数据
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
keys: 键列表
|
||
|
||
Returns:
|
||
Dict[str, Any]: 键值对数据
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.load_batch(collection, keys)
|
||
|
||
def clear_collection(self, collection_type: str) -> bool:
|
||
"""清空集合
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
|
||
Returns:
|
||
bool: 清空是否成功
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.clear_collection(collection)
|
||
|
||
def get_collection_stats(self, collection_type: str) -> Dict[str, Any]:
|
||
"""获取集合统计信息
|
||
|
||
Args:
|
||
collection_type: 集合类型
|
||
|
||
Returns:
|
||
Dict[str, Any]: 统计信息
|
||
"""
|
||
collection = self.get_collection_name(collection_type)
|
||
return self.storage.get_stats(collection)
|
||
|
||
def get_all_collections(self) -> list[str]:
|
||
"""获取服务的所有集合
|
||
|
||
Returns:
|
||
List[str]: 集合名称列表
|
||
"""
|
||
all_collections = self.storage.get_collections()
|
||
service_prefix = f"{self.get_service_name()}_"
|
||
return [c for c in all_collections if c.startswith(service_prefix)]
|
||
|
||
def close(self):
|
||
"""关闭服务"""
|
||
if hasattr(self.storage, 'close'):
|
||
self.storage.close()
|
||
logger.info(f"Service {self.get_service_name()} closed")
|
||
|
||
class ProgressServiceBase(ServiceBase):
|
||
"""支持进度回调的服务基类"""
|
||
|
||
def __init__(self, storage: Optional[StorageInterface] = None):
|
||
super().__init__(storage)
|
||
self._progress_callback: Optional[Callable[[str], None]] = None
|
||
|
||
def set_progress_callback(self, callback: Optional[Callable[[str], None]]):
|
||
"""设置进度回调函数
|
||
|
||
Args:
|
||
callback: 进度回调函数,接收进度消息字符串
|
||
"""
|
||
self._progress_callback = callback
|
||
|
||
def report_progress(self, message: str):
|
||
"""报告进度
|
||
|
||
Args:
|
||
message: 进度消息
|
||
"""
|
||
if self._progress_callback:
|
||
self._progress_callback(message)
|
||
logger.debug(f"Progress: {message}")
|
||
|
||
def execute_with_progress(self, operation_name: str, operation_func: Callable, *args, **kwargs) -> Any:
|
||
"""执行带进度报告的操作
|
||
|
||
Args:
|
||
operation_name: 操作名称
|
||
operation_func: 操作函数
|
||
*args: 操作函数参数
|
||
**kwargs: 操作函数关键字参数
|
||
|
||
Returns:
|
||
Any: 操作结果
|
||
"""
|
||
self.report_progress(f"开始 {operation_name}")
|
||
|
||
try:
|
||
result = operation_func(*args, **kwargs)
|
||
self.report_progress(f"完成 {operation_name}")
|
||
return result
|
||
except Exception as e:
|
||
self.report_progress(f"失败 {operation_name}: {str(e)}")
|
||
raise
|