mxivideo/python_core/kv.py

401 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from python_core.config import settings
import httpx
from python_core.utils.logger import setup_logger
from typing import Dict
import time
import random
from functools import wraps
logger = setup_logger(__name__)
def retry_on_timeout(max_retries=3, base_delay=1.0, max_delay=10.0, backoff_factor=2.0):
"""
重试装饰器,用于处理网络超时等临时性错误
Args:
max_retries: 最大重试次数
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
backoff_factor: 退避因子
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.TimeoutException) as e:
last_exception = e
if attempt == max_retries:
logger.error(f"Function {func.__name__} failed after {max_retries} retries: {e}")
raise e
# 计算延迟时间(指数退避 + 随机抖动)
delay = min(base_delay * (backoff_factor ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1) # 10% 随机抖动
total_delay = delay + jitter
logger.warning(f"Function {func.__name__} attempt {attempt + 1} failed with timeout: {e}. Retrying in {total_delay:.2f}s...")
time.sleep(total_delay)
except Exception as e:
# 对于非超时错误,直接抛出
logger.error(f"Function {func.__name__} failed with non-timeout error: {e}")
raise e
# 如果所有重试都失败了,抛出最后一个异常
raise last_exception
return wrapper
return decorator
class Kv:
kv: Dict[str, str]
def __init__(self):
self.cf_account_id = settings.cloudflare_account_id
self.cf_kv_api_token = settings.cloudflare_api_key
self.cf_kv_id = settings.cloudflare_kv_id
self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{self.cf_account_id}/storage/kv/namespaces/{self.cf_kv_id}"
self.headers = {"Authorization": f"Bearer {self.cf_kv_api_token}"}
@retry_on_timeout(max_retries=3, base_delay=1.0)
def get(self, key: str, default=None):
"""
获取单个键的值
Args:
key: 要获取的键
default: 如果键不存在时返回的默认值
Returns:
键对应的值如果不存在则返回default
"""
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(
f"{self.base_url}/values/{key}",
headers=self.headers
)
if response.status_code == 404:
return default
response.raise_for_status()
return response.text
except httpx.RequestError as e:
logger.error(f"An error occurred while getting key '{key}' from cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return default
logger.error(f"HTTP error occurred while getting key '{key}' from cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while getting key '{key}': {e}")
raise e
@retry_on_timeout(max_retries=3, base_delay=1.0)
def set(self, key: str, value: str):
"""
设置单个键值对
Args:
key: 键名
value: 值
expiration_ttl: TTL过期时间
expiration: 绝对过期时间Unix时间戳
Returns:
操作结果
"""
try:
with httpx.Client(timeout=30.0) as client:
params = {}
response = client.put(
f"{self.base_url}/values/{key}",
headers=self.headers,
params=params,
content=value
)
response.raise_for_status()
return response.json() if response.text else {"success": True}
except httpx.RequestError as e:
logger.error(f"An error occurred while setting key '{key}' to cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred while setting key '{key}' to cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while setting key '{key}': {e}")
raise e
@retry_on_timeout(max_retries=3, base_delay=1.0)
def remove(self, key: str):
"""
删除单个键
Args:
key: 要删除的键名
Returns:
操作结果
"""
try:
with httpx.Client(timeout=30.0) as client:
response = client.delete(
f"{self.base_url}/values/{key}",
headers=self.headers
)
if response.status_code == 404:
return {"success": True, "message": "Key not found"}
response.raise_for_status()
return response.json() if response.text else {"success": True}
except httpx.RequestError as e:
logger.error(f"An error occurred while removing key '{key}' from cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {"success": True, "message": "Key not found"}
logger.error(f"HTTP error occurred while removing key '{key}' from cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while removing key '{key}': {e}")
raise e
@retry_on_timeout(max_retries=3, base_delay=1.0)
def sets(self, caches: Dict[str, str]):
"""
批量设置多个键值对
Args:
caches: 包含键值对的字典
expiration_ttl: TTL过期时间
expiration: 绝对过期时间Unix时间戳
Returns:
操作结果
"""
try:
with httpx.Client(timeout=60.0) as client: # 批量操作使用更长的超时时间
bulk_data = []
for key, value in caches.items():
item = {
"base64": False,
"key": key,
"value": value,
}
bulk_data.append(item)
response = client.put(
f"{self.base_url}/bulk",
headers=self.headers,
json=bulk_data
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
logger.error(f"An error occurred while bulk setting keys to cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred while bulk setting keys to cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while bulk setting keys: {e}")
raise e
def gets(self, keys: list[str]):
"""
批量获取多个键的值
Args:
keys: 要获取的键列表
Returns:
包含键值对的字典
"""
try:
# 由于Cloudflare KV的批量获取API有问题我们使用单个获取的方式
result = {}
for key in keys:
value = self.get(key)
if value is not None:
result[key] = value
return result
except Exception as e:
logger.error(f"An error occurred while bulk getting keys from cloudflare: {e}")
raise e
@retry_on_timeout(max_retries=3, base_delay=1.0)
def removes(self, keys: list[str]):
"""
批量删除多个键
Args:
keys: 要删除的键列表
Returns:
操作结果
"""
try:
with httpx.Client(timeout=60.0) as client: # 批量操作使用更长的超时时间
response = client.post(
f"{self.base_url}/bulk/delete",
headers=self.headers,
json=keys
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
logger.error(f"An error occurred while bulk removing keys from cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred while bulk removing keys from cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while bulk removing keys: {e}")
raise e
@retry_on_timeout(max_retries=3, base_delay=1.0)
def exists(self, key: str) -> bool:
"""
检查键是否存在
Args:
key: 要检查的键名
Returns:
如果键存在返回True否则返回False
"""
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(
f"{self.base_url}/values/{key}",
headers=self.headers
)
return response.status_code == 200
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return False
raise e
except Exception:
return False
@retry_on_timeout(max_retries=3, base_delay=1.0)
def list_keys(self, prefix: str = None, limit: int = 1000, cursor: str = None):
"""
列出KV存储中的键
Args:
prefix: 键名前缀过滤
limit: 返回的最大键数量默认1000
cursor: 分页游标
Returns:
包含键列表和分页信息的字典
"""
try:
with httpx.Client(timeout=30.0) as client:
params = {"limit": limit}
if prefix:
params["prefix"] = prefix
if cursor:
params["cursor"] = cursor
response = client.get(
f"{self.base_url}/keys",
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
logger.error(f"An error occurred while listing keys from cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred while listing keys from cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while listing keys: {e}")
raise e
@retry_on_timeout(max_retries=3, base_delay=1.0)
def get_metadata(self, key: str):
"""
获取键的元数据
Args:
key: 键名
Returns:
键的元数据信息
"""
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(
f"{self.base_url}/metadata/{key}",
headers=self.headers
)
if response.status_code == 404:
return None
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
logger.error(f"An error occurred while getting metadata for key '{key}' from cloudflare: {e}")
raise e
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
logger.error(f"HTTP error occurred while getting metadata for key '{key}' from cloudflare: {e}")
raise e
except Exception as e:
logger.error(f"An unexpected error occurred while getting metadata for key '{key}': {e}")
raise e
def clear_all(self, prefix: str = None):
"""
清空所有键(危险操作!)
Args:
prefix: 如果提供,只删除指定前缀的键
Returns:
操作结果
"""
try:
# 首先获取所有键
all_keys = []
cursor = None
while True:
result = self.list_keys(prefix=prefix, limit=1000, cursor=cursor)
keys_data = result.get("result", [])
if not keys_data:
break
# 提取键名
for key_info in keys_data:
if isinstance(key_info, dict):
all_keys.append(key_info.get("name"))
else:
all_keys.append(str(key_info))
# 检查是否有更多数据
result_info = result.get("result_info", {})
cursor = result_info.get("cursor")
if not cursor:
break
# 批量删除
if all_keys:
return self.removes(all_keys)
else:
return {"success": True, "message": "No keys to delete"}
except Exception as e:
logger.error(f"An error occurred while clearing keys: {e}")
raise e
kv = Kv()