Add helper methods for cache backend in sync and async versions

Co-authored-by: jxxghp <jxxghp@live.cn>
This commit is contained in:
Cursor Agent
2025-08-23 03:58:04 +00:00
parent 57a48f099f
commit b5ca7058c2
2 changed files with 62 additions and 1 deletions

View File

@@ -419,6 +419,44 @@ class AsyncCacheBackend(ABC):
return default
return value
def get_region(self, region: Optional[str] = None) -> str:
"""
获取缓存区域名称
:param region: 缓存区域名称
:return: 缓存区域名称
"""
return region or DEFAULT_CACHE_REGION
def get_cache_key(self, func, args, kwargs) -> str:
"""
根据函数和参数生成缓存键
:param func: 函数对象
:param args: 位置参数
:param kwargs: 关键字参数
:return: 缓存键
"""
# 使用函数名和参数生成缓存键
key_parts = [func.__module__, func.__name__]
# 添加位置参数
if args:
key_parts.extend([str(arg) for arg in args])
# 添加关键字参数(排序以确保一致性)
if kwargs:
sorted_kwargs = sorted(kwargs.items())
key_parts.extend([f"{k}={v}" for k, v in sorted_kwargs])
return hashkey(*key_parts)
def is_redis(self) -> bool:
"""
判断当前缓存后端是否为 Redis
"""
return isinstance(self, RedisBackend) or isinstance(self, AsyncRedisBackend)
class MemoryBackend(CacheBackend):
"""