401 lines
14 KiB
Python
401 lines
14 KiB
Python
|
||
from python_core.config import settings
|
||
import httpx
|
||
from python_core.utils.logger import setup_logger
|
||
from typing import Dict
|
||
import time
|
||
import random
|
||
from functools import wraps
|
||
|
||
logger = setup_logger(__name__)
|
||
|
||
def retry_on_timeout(max_retries=3, base_delay=1.0, max_delay=10.0, backoff_factor=2.0):
|
||
"""
|
||
重试装饰器,用于处理网络超时等临时性错误
|
||
|
||
Args:
|
||
max_retries: 最大重试次数
|
||
base_delay: 基础延迟时间(秒)
|
||
max_delay: 最大延迟时间(秒)
|
||
backoff_factor: 退避因子
|
||
"""
|
||
def decorator(func):
|
||
@wraps(func)
|
||
def wrapper(*args, **kwargs):
|
||
last_exception = None
|
||
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
return func(*args, **kwargs)
|
||
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.TimeoutException) as e:
|
||
last_exception = e
|
||
if attempt == max_retries:
|
||
logger.error(f"Function {func.__name__} failed after {max_retries} retries: {e}")
|
||
raise e
|
||
|
||
# 计算延迟时间(指数退避 + 随机抖动)
|
||
delay = min(base_delay * (backoff_factor ** attempt), max_delay)
|
||
jitter = random.uniform(0, delay * 0.1) # 10% 随机抖动
|
||
total_delay = delay + jitter
|
||
|
||
logger.warning(f"Function {func.__name__} attempt {attempt + 1} failed with timeout: {e}. Retrying in {total_delay:.2f}s...")
|
||
time.sleep(total_delay)
|
||
except Exception as e:
|
||
# 对于非超时错误,直接抛出
|
||
logger.error(f"Function {func.__name__} failed with non-timeout error: {e}")
|
||
raise e
|
||
|
||
# 如果所有重试都失败了,抛出最后一个异常
|
||
raise last_exception
|
||
return wrapper
|
||
return decorator
|
||
|
||
class Kv:
|
||
kv: Dict[str, str]
|
||
def __init__(self):
|
||
self.cf_account_id = settings.cloudflare_account_id
|
||
self.cf_kv_api_token = settings.cloudflare_api_key
|
||
self.cf_kv_id = settings.cloudflare_kv_id
|
||
self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{self.cf_account_id}/storage/kv/namespaces/{self.cf_kv_id}"
|
||
self.headers = {"Authorization": f"Bearer {self.cf_kv_api_token}"}
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def get(self, key: str, default=None):
|
||
"""
|
||
获取单个键的值
|
||
|
||
Args:
|
||
key: 要获取的键
|
||
default: 如果键不存在时返回的默认值
|
||
|
||
Returns:
|
||
键对应的值,如果不存在则返回default
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
response = client.get(
|
||
f"{self.base_url}/values/{key}",
|
||
headers=self.headers
|
||
)
|
||
if response.status_code == 404:
|
||
return default
|
||
response.raise_for_status()
|
||
return response.text
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while getting key '{key}' from cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
if e.response.status_code == 404:
|
||
return default
|
||
logger.error(f"HTTP error occurred while getting key '{key}' from cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while getting key '{key}': {e}")
|
||
raise e
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def set(self, key: str, value: str):
|
||
"""
|
||
设置单个键值对
|
||
|
||
Args:
|
||
key: 键名
|
||
value: 值
|
||
expiration_ttl: TTL过期时间(秒)
|
||
expiration: 绝对过期时间(Unix时间戳)
|
||
|
||
Returns:
|
||
操作结果
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
params = {}
|
||
|
||
response = client.put(
|
||
f"{self.base_url}/values/{key}",
|
||
headers=self.headers,
|
||
params=params,
|
||
content=value
|
||
)
|
||
response.raise_for_status()
|
||
return response.json() if response.text else {"success": True}
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while setting key '{key}' to cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
logger.error(f"HTTP error occurred while setting key '{key}' to cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while setting key '{key}': {e}")
|
||
raise e
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def remove(self, key: str):
|
||
"""
|
||
删除单个键
|
||
|
||
Args:
|
||
key: 要删除的键名
|
||
|
||
Returns:
|
||
操作结果
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
response = client.delete(
|
||
f"{self.base_url}/values/{key}",
|
||
headers=self.headers
|
||
)
|
||
if response.status_code == 404:
|
||
return {"success": True, "message": "Key not found"}
|
||
response.raise_for_status()
|
||
return response.json() if response.text else {"success": True}
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while removing key '{key}' from cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
if e.response.status_code == 404:
|
||
return {"success": True, "message": "Key not found"}
|
||
logger.error(f"HTTP error occurred while removing key '{key}' from cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while removing key '{key}': {e}")
|
||
raise e
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def sets(self, caches: Dict[str, str]):
|
||
"""
|
||
批量设置多个键值对
|
||
|
||
Args:
|
||
caches: 包含键值对的字典
|
||
expiration_ttl: TTL过期时间(秒)
|
||
expiration: 绝对过期时间(Unix时间戳)
|
||
|
||
Returns:
|
||
操作结果
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=60.0) as client: # 批量操作使用更长的超时时间
|
||
bulk_data = []
|
||
for key, value in caches.items():
|
||
item = {
|
||
"base64": False,
|
||
"key": key,
|
||
"value": value,
|
||
}
|
||
bulk_data.append(item)
|
||
|
||
response = client.put(
|
||
f"{self.base_url}/bulk",
|
||
headers=self.headers,
|
||
json=bulk_data
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while bulk setting keys to cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
logger.error(f"HTTP error occurred while bulk setting keys to cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while bulk setting keys: {e}")
|
||
raise e
|
||
|
||
def gets(self, keys: list[str]):
|
||
"""
|
||
批量获取多个键的值
|
||
|
||
Args:
|
||
keys: 要获取的键列表
|
||
|
||
Returns:
|
||
包含键值对的字典
|
||
"""
|
||
try:
|
||
# 由于Cloudflare KV的批量获取API有问题,我们使用单个获取的方式
|
||
result = {}
|
||
for key in keys:
|
||
value = self.get(key)
|
||
if value is not None:
|
||
result[key] = value
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"An error occurred while bulk getting keys from cloudflare: {e}")
|
||
raise e
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def removes(self, keys: list[str]):
|
||
"""
|
||
批量删除多个键
|
||
|
||
Args:
|
||
keys: 要删除的键列表
|
||
|
||
Returns:
|
||
操作结果
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=60.0) as client: # 批量操作使用更长的超时时间
|
||
response = client.post(
|
||
f"{self.base_url}/bulk/delete",
|
||
headers=self.headers,
|
||
json=keys
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while bulk removing keys from cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
logger.error(f"HTTP error occurred while bulk removing keys from cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while bulk removing keys: {e}")
|
||
raise e
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def exists(self, key: str) -> bool:
|
||
"""
|
||
检查键是否存在
|
||
|
||
Args:
|
||
key: 要检查的键名
|
||
|
||
Returns:
|
||
如果键存在返回True,否则返回False
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
response = client.get(
|
||
f"{self.base_url}/values/{key}",
|
||
headers=self.headers
|
||
)
|
||
return response.status_code == 200
|
||
except httpx.HTTPStatusError as e:
|
||
if e.response.status_code == 404:
|
||
return False
|
||
raise e
|
||
except Exception:
|
||
return False
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def list_keys(self, prefix: str = None, limit: int = 1000, cursor: str = None):
|
||
"""
|
||
列出KV存储中的键
|
||
|
||
Args:
|
||
prefix: 键名前缀过滤
|
||
limit: 返回的最大键数量(默认1000)
|
||
cursor: 分页游标
|
||
|
||
Returns:
|
||
包含键列表和分页信息的字典
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
params = {"limit": limit}
|
||
if prefix:
|
||
params["prefix"] = prefix
|
||
if cursor:
|
||
params["cursor"] = cursor
|
||
|
||
response = client.get(
|
||
f"{self.base_url}/keys",
|
||
headers=self.headers,
|
||
params=params
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while listing keys from cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
logger.error(f"HTTP error occurred while listing keys from cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while listing keys: {e}")
|
||
raise e
|
||
|
||
@retry_on_timeout(max_retries=3, base_delay=1.0)
|
||
def get_metadata(self, key: str):
|
||
"""
|
||
获取键的元数据
|
||
|
||
Args:
|
||
key: 键名
|
||
|
||
Returns:
|
||
键的元数据信息
|
||
"""
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
response = client.get(
|
||
f"{self.base_url}/metadata/{key}",
|
||
headers=self.headers
|
||
)
|
||
if response.status_code == 404:
|
||
return None
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except httpx.RequestError as e:
|
||
logger.error(f"An error occurred while getting metadata for key '{key}' from cloudflare: {e}")
|
||
raise e
|
||
except httpx.HTTPStatusError as e:
|
||
if e.response.status_code == 404:
|
||
return None
|
||
logger.error(f"HTTP error occurred while getting metadata for key '{key}' from cloudflare: {e}")
|
||
raise e
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred while getting metadata for key '{key}': {e}")
|
||
raise e
|
||
|
||
def clear_all(self, prefix: str = None):
|
||
"""
|
||
清空所有键(危险操作!)
|
||
|
||
Args:
|
||
prefix: 如果提供,只删除指定前缀的键
|
||
|
||
Returns:
|
||
操作结果
|
||
"""
|
||
try:
|
||
# 首先获取所有键
|
||
all_keys = []
|
||
cursor = None
|
||
|
||
while True:
|
||
result = self.list_keys(prefix=prefix, limit=1000, cursor=cursor)
|
||
keys_data = result.get("result", [])
|
||
|
||
if not keys_data:
|
||
break
|
||
|
||
# 提取键名
|
||
for key_info in keys_data:
|
||
if isinstance(key_info, dict):
|
||
all_keys.append(key_info.get("name"))
|
||
else:
|
||
all_keys.append(str(key_info))
|
||
|
||
# 检查是否有更多数据
|
||
result_info = result.get("result_info", {})
|
||
cursor = result_info.get("cursor")
|
||
if not cursor:
|
||
break
|
||
|
||
# 批量删除
|
||
if all_keys:
|
||
return self.removes(all_keys)
|
||
else:
|
||
return {"success": True, "message": "No keys to delete"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"An error occurred while clearing keys: {e}")
|
||
raise e
|
||
|
||
|
||
kv = Kv() |