from python_core.config import settings import httpx from python_core.utils.logger import setup_logger from typing import Dict import time import random from functools import wraps logger = setup_logger(__name__) def retry_on_timeout(max_retries=3, base_delay=1.0, max_delay=10.0, backoff_factor=2.0): """ 重试装饰器,用于处理网络超时等临时性错误 Args: max_retries: 最大重试次数 base_delay: 基础延迟时间(秒) max_delay: 最大延迟时间(秒) backoff_factor: 退避因子 """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.TimeoutException) as e: last_exception = e if attempt == max_retries: logger.error(f"Function {func.__name__} failed after {max_retries} retries: {e}") raise e # 计算延迟时间(指数退避 + 随机抖动) delay = min(base_delay * (backoff_factor ** attempt), max_delay) jitter = random.uniform(0, delay * 0.1) # 10% 随机抖动 total_delay = delay + jitter logger.warning(f"Function {func.__name__} attempt {attempt + 1} failed with timeout: {e}. Retrying in {total_delay:.2f}s...") time.sleep(total_delay) except Exception as e: # 对于非超时错误,直接抛出 logger.error(f"Function {func.__name__} failed with non-timeout error: {e}") raise e # 如果所有重试都失败了,抛出最后一个异常 raise last_exception return wrapper return decorator class Kv: kv: Dict[str, str] def __init__(self): self.cf_account_id = settings.cloudflare_account_id self.cf_kv_api_token = settings.cloudflare_api_key self.cf_kv_id = settings.cloudflare_kv_id self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{self.cf_account_id}/storage/kv/namespaces/{self.cf_kv_id}" self.headers = {"Authorization": f"Bearer {self.cf_kv_api_token}"} @retry_on_timeout(max_retries=3, base_delay=1.0) def get(self, key: str, default=None): """ 获取单个键的值 Args: key: 要获取的键 default: 如果键不存在时返回的默认值 Returns: 键对应的值,如果不存在则返回default """ try: with httpx.Client(timeout=30.0) as client: response = client.get( f"{self.base_url}/values/{key}", headers=self.headers ) if response.status_code == 404: return default response.raise_for_status() return response.text except httpx.RequestError as e: logger.error(f"An error occurred while getting key '{key}' from cloudflare: {e}") raise e except httpx.HTTPStatusError as e: if e.response.status_code == 404: return default logger.error(f"HTTP error occurred while getting key '{key}' from cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while getting key '{key}': {e}") raise e @retry_on_timeout(max_retries=3, base_delay=1.0) def set(self, key: str, value: str): """ 设置单个键值对 Args: key: 键名 value: 值 expiration_ttl: TTL过期时间(秒) expiration: 绝对过期时间(Unix时间戳) Returns: 操作结果 """ try: with httpx.Client(timeout=30.0) as client: params = {} response = client.put( f"{self.base_url}/values/{key}", headers=self.headers, params=params, content=value ) response.raise_for_status() return response.json() if response.text else {"success": True} except httpx.RequestError as e: logger.error(f"An error occurred while setting key '{key}' to cloudflare: {e}") raise e except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred while setting key '{key}' to cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while setting key '{key}': {e}") raise e @retry_on_timeout(max_retries=3, base_delay=1.0) def remove(self, key: str): """ 删除单个键 Args: key: 要删除的键名 Returns: 操作结果 """ try: with httpx.Client(timeout=30.0) as client: response = client.delete( f"{self.base_url}/values/{key}", headers=self.headers ) if response.status_code == 404: return {"success": True, "message": "Key not found"} response.raise_for_status() return response.json() if response.text else {"success": True} except httpx.RequestError as e: logger.error(f"An error occurred while removing key '{key}' from cloudflare: {e}") raise e except httpx.HTTPStatusError as e: if e.response.status_code == 404: return {"success": True, "message": "Key not found"} logger.error(f"HTTP error occurred while removing key '{key}' from cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while removing key '{key}': {e}") raise e @retry_on_timeout(max_retries=3, base_delay=1.0) def sets(self, caches: Dict[str, str]): """ 批量设置多个键值对 Args: caches: 包含键值对的字典 expiration_ttl: TTL过期时间(秒) expiration: 绝对过期时间(Unix时间戳) Returns: 操作结果 """ try: with httpx.Client(timeout=60.0) as client: # 批量操作使用更长的超时时间 bulk_data = [] for key, value in caches.items(): item = { "base64": False, "key": key, "value": value, } bulk_data.append(item) response = client.put( f"{self.base_url}/bulk", headers=self.headers, json=bulk_data ) response.raise_for_status() return response.json() except httpx.RequestError as e: logger.error(f"An error occurred while bulk setting keys to cloudflare: {e}") raise e except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred while bulk setting keys to cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while bulk setting keys: {e}") raise e def gets(self, keys: list[str]): """ 批量获取多个键的值 Args: keys: 要获取的键列表 Returns: 包含键值对的字典 """ try: # 由于Cloudflare KV的批量获取API有问题,我们使用单个获取的方式 result = {} for key in keys: value = self.get(key) if value is not None: result[key] = value return result except Exception as e: logger.error(f"An error occurred while bulk getting keys from cloudflare: {e}") raise e @retry_on_timeout(max_retries=3, base_delay=1.0) def removes(self, keys: list[str]): """ 批量删除多个键 Args: keys: 要删除的键列表 Returns: 操作结果 """ try: with httpx.Client(timeout=60.0) as client: # 批量操作使用更长的超时时间 response = client.post( f"{self.base_url}/bulk/delete", headers=self.headers, json=keys ) response.raise_for_status() return response.json() except httpx.RequestError as e: logger.error(f"An error occurred while bulk removing keys from cloudflare: {e}") raise e except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred while bulk removing keys from cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while bulk removing keys: {e}") raise e @retry_on_timeout(max_retries=3, base_delay=1.0) def exists(self, key: str) -> bool: """ 检查键是否存在 Args: key: 要检查的键名 Returns: 如果键存在返回True,否则返回False """ try: with httpx.Client(timeout=30.0) as client: response = client.get( f"{self.base_url}/values/{key}", headers=self.headers ) return response.status_code == 200 except httpx.HTTPStatusError as e: if e.response.status_code == 404: return False raise e except Exception: return False @retry_on_timeout(max_retries=3, base_delay=1.0) def list_keys(self, prefix: str = None, limit: int = 1000, cursor: str = None): """ 列出KV存储中的键 Args: prefix: 键名前缀过滤 limit: 返回的最大键数量(默认1000) cursor: 分页游标 Returns: 包含键列表和分页信息的字典 """ try: with httpx.Client(timeout=30.0) as client: params = {"limit": limit} if prefix: params["prefix"] = prefix if cursor: params["cursor"] = cursor response = client.get( f"{self.base_url}/keys", headers=self.headers, params=params ) response.raise_for_status() return response.json() except httpx.RequestError as e: logger.error(f"An error occurred while listing keys from cloudflare: {e}") raise e except httpx.HTTPStatusError as e: logger.error(f"HTTP error occurred while listing keys from cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while listing keys: {e}") raise e @retry_on_timeout(max_retries=3, base_delay=1.0) def get_metadata(self, key: str): """ 获取键的元数据 Args: key: 键名 Returns: 键的元数据信息 """ try: with httpx.Client(timeout=30.0) as client: response = client.get( f"{self.base_url}/metadata/{key}", headers=self.headers ) if response.status_code == 404: return None response.raise_for_status() return response.json() except httpx.RequestError as e: logger.error(f"An error occurred while getting metadata for key '{key}' from cloudflare: {e}") raise e except httpx.HTTPStatusError as e: if e.response.status_code == 404: return None logger.error(f"HTTP error occurred while getting metadata for key '{key}' from cloudflare: {e}") raise e except Exception as e: logger.error(f"An unexpected error occurred while getting metadata for key '{key}': {e}") raise e def clear_all(self, prefix: str = None): """ 清空所有键(危险操作!) Args: prefix: 如果提供,只删除指定前缀的键 Returns: 操作结果 """ try: # 首先获取所有键 all_keys = [] cursor = None while True: result = self.list_keys(prefix=prefix, limit=1000, cursor=cursor) keys_data = result.get("result", []) if not keys_data: break # 提取键名 for key_info in keys_data: if isinstance(key_info, dict): all_keys.append(key_info.get("name")) else: all_keys.append(str(key_info)) # 检查是否有更多数据 result_info = result.get("result_info", {}) cursor = result_info.get("cursor") if not cursor: break # 批量删除 if all_keys: return self.removes(all_keys) else: return {"success": True, "message": "No keys to delete"} except Exception as e: logger.error(f"An error occurred while clearing keys: {e}") raise e kv = Kv()