refactor:新增文件缓存组合

This commit is contained in:
jxxghp
2025-08-20 19:04:07 +08:00
parent 055c8e26f0
commit b3113e13ec
5 changed files with 548 additions and 234 deletions

View File

@@ -4,6 +4,7 @@ import json
import re
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Optional, Union, Annotated
import aiofiles
@@ -16,6 +17,7 @@ from fastapi.responses import StreamingResponse
from app import schemas
from app.chain.search import SearchChain
from app.chain.system import SystemChain
from app.core.cache import get_async_file_cache_backend
from app.core.config import global_vars, settings
from app.core.event import eventmanager
from app.core.metainfo import MetaInfo
@@ -28,7 +30,6 @@ from app.db.user_oper import get_current_active_superuser, get_current_active_su
from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.redis import AsyncRedisHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper # noqa # noqa
from app.helper.subscribe import SubscribeHelper
@@ -67,53 +68,28 @@ async def fetch_image(
# 缓存路径
sanitized_path = SecurityUtils.sanitize_url_path(url)
base_path = AsyncPath(settings.CACHE_PATH)
cache_path = base_path / "images" / sanitized_path
cache_path = Path("images") / sanitized_path
if not cache_path.suffix:
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
cache_path = cache_path.with_suffix(".jpg")
# 缓存对像
cache_backend = get_async_file_cache_backend(base=settings.CACHE_PATH)
if use_cache:
if settings.CACHE_BACKEND_TYPE == "redis":
# 使用Redis缓存
redis_helper = AsyncRedisHelper()
content = await redis_helper.get(sanitized_path, region="image_cache")
if content:
# 检查 If-None-Match
etag = HashUtils.md5(content)
if if_none_match == etag:
headers = RequestUtils.generate_cache_headers()
return Response(status_code=304, headers=headers)
# 返回缓存图片
headers = RequestUtils.generate_cache_headers(etag)
return Response(
content=content,
media_type=UrlUtils.get_mime_type(url, "image/jpeg"),
headers=headers
)
else:
# 使用磁盘缓存
if not cache_path.suffix:
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
cache_path = cache_path.with_suffix(".jpg")
# 确保缓存路径和文件类型合法
if not await SecurityUtils.async_is_safe_path(base_path=base_path,
user_path=cache_path,
allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES):
raise HTTPException(status_code=400, detail="Invalid cache path or file type")
# 目前暂不考虑磁盘缓存文件是否过期,通过缓存清理机制处理
if cache_path and await cache_path.exists():
try:
# 读取磁盘缓存图片返回
async with aiofiles.open(cache_path, 'rb') as f:
content = await f.read()
etag = HashUtils.md5(content)
headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7)
if if_none_match == etag:
return Response(status_code=304, headers=headers)
return Response(content=content, media_type="image/jpeg", headers=headers)
except Exception as e:
# 如果读取磁盘缓存发生异常,这里仅记录日志,尝试再次请求远端进行处理
logger.debug(f"Failed to read cache file {cache_path}: {e}")
content = await cache_backend.get(cache_path.as_posix(), region="images")
if content:
# 检查 If-None-Match
etag = HashUtils.md5(content)
headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7)
if if_none_match == etag:
return Response(status_code=304, headers=headers)
# 返回缓存图片
return Response(
content=content,
media_type=UrlUtils.get_mime_type(url, "image/jpeg"),
headers=headers
)
# 请求远程图片
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
@@ -138,21 +114,8 @@ async def fetch_image(
# 保存缓存
if use_cache:
if settings.CACHE_BACKEND_TYPE == "redis":
# 保存到Redis缓存
redis_helper = AsyncRedisHelper()
await redis_helper.set(sanitized_path, content, region="image_cache")
else:
# 保存到磁盘缓存
try:
if not await cache_path.parent.exists():
await cache_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
await tmp_file.write(content)
temp_path = AsyncPath(tmp_file.name)
await temp_path.replace(cache_path)
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path}: {e}")
await cache_backend.set(cache_path.as_posix(), content, region="images")
logger.debug(f"Image cached at {cache_path.as_posix()}")
# 检查 If-None-Match
etag = HashUtils.md5(content)

View File

@@ -7,13 +7,11 @@ from collections.abc import Callable
from pathlib import Path
from typing import Optional, Any, Tuple, List, Set, Union, Dict
import aiofiles
from anyio import Path as AsyncPath
from fastapi.concurrency import run_in_threadpool
from qbittorrentapi import TorrentFilesList
from transmission_rpc import File
from app.core.cache import get_cache_backend
from app.core.cache import get_file_cache_backend, get_async_file_cache_backend
from app.core.config import settings
from app.core.context import Context, MediaInfo, TorrentInfo
from app.core.event import EventManager
@@ -48,137 +46,66 @@ class ChainBase(metaclass=ABCMeta):
send_callback=self.run_module
)
self.pluginmanager = PluginManager()
# 文件类缓存保留1
self._cache = get_cache_backend(ttl=30 * 24 * 3600)
self.filecache = get_file_cache_backend()
self.async_filecache = get_async_file_cache_backend()
def load_cache(self, filename: str) -> Any:
"""
加载缓存优先从Redis读取没有数据时从本地读取兼容存量未迁移数据
加载缓存
"""
# 如果Redis可用优先从Redis读取
if self._cache.is_redis():
try:
cache_data = self._cache.get(filename, region="chain_cache")
if cache_data is not None:
logger.debug(f"从Redis加载缓存: {filename}")
return cache_data
except Exception as e:
logger.warning(f"从Redis加载缓存 {filename} 失败: {e}")
# 从本地文件读取(兼容存量数据)
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
try:
with open(cache_path, 'rb') as f:
return pickle.load(f)
except Exception as err:
logger.error(f"加载缓存 {filename} 出错:{str(err)}")
return None
content = self.filecache.get(filename)
if not content:
return None
try:
return pickle.loads(content)
except Exception as err:
logger.error(f"加载缓存 {filename} 出错:{str(err)}")
return None
async def async_load_cache(self, filename: str) -> Any:
"""
异步加载缓存优先从Redis读取没有数据时从本地读取兼容存量未迁移数据
异步加载缓存
"""
# 如果Redis可用优先从Redis读取
if self._cache.is_redis():
try:
cache_data = self._cache.get(filename, region="chain_cache")
if cache_data is not None:
logger.debug(f"从Redis异步加载缓存: {filename}")
return cache_data
except Exception as e:
logger.warning(f"从Redis异步加载缓存 {filename} 失败: {e}")
# 从本地文件读取(兼容存量数据)
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
try:
async with aiofiles.open(cache_path, 'rb') as f:
content = await f.read()
return pickle.loads(content)
except Exception as err:
logger.error(f"异步加载缓存 {filename} 出错:{str(err)}")
return None
content = await self.async_filecache.get(filename)
if not content:
return None
try:
return pickle.loads(content)
except Exception as err:
logger.error(f"异步加载缓存 {filename} 出错:{str(err)}")
return None
async def async_save_cache(self, cache: Any, filename: str) -> None:
"""
异步保存缓存优先保存到Redis同时保存到本地作为备份
异步保存缓存
"""
# 如果Redis可用优先保存到Redis
if self._cache.is_redis():
try:
self._cache.set(filename, cache, region="chain_cache")
logger.debug(f"异步保存缓存到Redis: {filename}")
except Exception as e:
logger.warning(f"异步保存缓存到Redis失败: {e}")
else:
# 保存到本地
try:
async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f:
await f.write(pickle.dumps(cache))
except Exception as err:
logger.error(f"异步保存缓存到本地 {filename} 出错:{str(err)}")
try:
await self.async_filecache.set(filename, pickle.dumps(cache))
except Exception as err:
logger.error(f"异步保存缓存 {filename} 出错:{str(err)}")
return
def save_cache(self, cache: Any, filename: str) -> None:
"""
保存缓存优先保存到Redis同时保存到本地作为备份
保存缓存
"""
# 如果Redis可用优先保存到Redis
if self._cache.is_redis():
try:
self._cache.set(filename, cache, region="chain_cache")
logger.debug(f"保存缓存到Redis: {filename}")
except Exception as e:
logger.warning(f"保存缓存到Redis失败: {e}")
else:
# 保存到本地
try:
with open(settings.TEMP_PATH / filename, 'wb') as f:
pickle.dump(cache, f) # noqa
except Exception as err:
logger.error(f"保存缓存到本地 {filename} 出错:{str(err)}")
try:
self.filecache.set(filename, pickle.dumps(cache))
except Exception as err:
logger.error(f"保存缓存 {filename} 出错:{str(err)}")
return
def remove_cache(self, filename: str) -> None:
"""
删除缓存同时删除Redis和本地缓存
"""
# 如果Redis可用删除Redis缓存
if self._cache.is_redis():
try:
self._cache.delete(filename, region="chain_cache")
logger.debug(f"删除Redis缓存: {filename}")
except Exception as e:
logger.warning(f"删除Redis缓存失败: {e}")
# 删除本地缓存
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
try:
cache_path.unlink()
logger.debug(f"删除本地缓存: {filename}")
except Exception as e:
logger.warning(f"删除本地缓存失败: {e}")
self.filecache.delete(filename)
async def async_remove_cache(self, filename: str) -> None:
"""
异步删除缓存同时删除Redis和本地缓存
"""
# 如果Redis可用删除Redis缓存
if self._cache.is_redis():
try:
self._cache.delete(filename, region="chain_cache")
logger.debug(f"异步删除Redis缓存: {filename}")
except Exception as e:
logger.warning(f"异步删除Redis缓存失败: {e}")
# 删除本地缓存
cache_path = AsyncPath(settings.TEMP_PATH) / filename
if await cache_path.exists():
try:
await cache_path.unlink()
logger.debug(f"异步删除本地缓存: {filename}")
except Exception as err:
logger.error(f"异步删除本地缓存 {filename} 出错:{str(err)}")
pass
@staticmethod
def __is_valid_empty(ret):

View File

@@ -1,5 +1,4 @@
import io
import tempfile
from pathlib import Path
from typing import List, Optional
@@ -10,7 +9,7 @@ from app.chain import ChainBase
from app.chain.bangumi import BangumiChain
from app.chain.douban import DoubanChain
from app.chain.tmdb import TmdbChain
from app.core.cache import cached
from app.core.cache import cached, get_file_cache_backend
from app.core.config import settings, global_vars
from app.log import logger
from app.schemas import MediaType
@@ -37,8 +36,6 @@ class RecommendChain(ChainBase, metaclass=Singleton):
刷新推荐
"""
logger.debug("Starting to refresh Recommend data.")
self._cache.clear(region=self.recommend_cache_region)
logger.debug("Recommend Cache has been cleared.")
# 推荐来源方法
recommend_methods = [
@@ -100,33 +97,26 @@ class RecommendChain(ChainBase, metaclass=Singleton):
logger.debug(f"Caching poster image: {poster_url}")
self.__fetch_and_save_image(poster_url)
def __fetch_and_save_image(self, url: str):
@staticmethod
def __fetch_and_save_image(url: str):
"""
请求并保存图片
:param url: 图片路径
"""
# 生成缓存路径
sanitized_path = SecurityUtils.sanitize_url_path(url)
cache_path = settings.CACHE_PATH / "images" / sanitized_path
cache_path = Path("images") / sanitized_path
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
if not cache_path.suffix:
cache_path = cache_path.with_suffix(".jpg")
if self._cache.is_redis():
if self._cache.get(sanitized_path, region=self.recommend_cache_region):
logger.debug(f"Cache hit: Image already exists for URL: {url}")
return
else:
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
if not cache_path.suffix:
cache_path = cache_path.with_suffix(".jpg")
# 获取缓存后端
cache_backend = get_file_cache_backend(base=settings.CACHE_PATH)
# 确保缓存路径和文件类型合法
if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES):
logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}")
return
# 本地存在缓存图片,则直接跳过
if cache_path.exists():
logger.debug(f"Cache hit: Image already exists at {cache_path}")
return
# 本地存在缓存图片,则直接跳过
if cache_backend.get(cache_path.as_posix(), region="images"):
logger.debug(f"Cache hit: Image already exists at {cache_path}")
return
# 请求远程图片
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
@@ -142,25 +132,10 @@ class RecommendChain(ChainBase, metaclass=Singleton):
except Exception as e:
logger.debug(f"Invalid image format for URL {url}: {e}")
return
if self._cache.is_redis():
# 如果是Redis缓存直接存储到缓存
try:
self._cache.set(sanitized_path, response.content, region=self.recommend_cache_region)
logger.debug(f"Successfully cached image for URL: {url} in Redis.")
except Exception as e:
logger.debug(f"Failed to cache image for URL {url} in Redis: {e}")
else:
# 如果是本地文件缓存,写入到指定路径
try:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
tmp_file.write(response.content)
temp_path = Path(tmp_file.name)
temp_path.replace(cache_path)
logger.debug(f"Successfully cached image at {cache_path} for URL: {url}")
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}")
# 保存缓存
cache_backend.set(cache_path.as_posix(), response.content, region="images")
logger.debug(f"Successfully cached image at {cache_path} for URL: {url}")
@log_execution_time(logger=logger)
@cached(ttl=recommend_ttl, region=recommend_cache_region)

View File

@@ -1,14 +1,20 @@
import inspect
import shutil
import tempfile
import threading
from abc import ABC, abstractmethod
from functools import wraps
from pathlib import Path
from typing import Any, Dict, Optional
import aiofiles
import aioshutil
from anyio import Path as AsyncPath
from cachetools import TTLCache as CacheToolsTTLCache
from cachetools.keys import hashkey
from app.core.config import settings
from app.helper.redis import RedisHelper
from app.helper.redis import RedisHelper, AsyncRedisHelper
from app.log import logger
# 默认缓存区
@@ -130,17 +136,122 @@ class CacheBackend(ABC):
return settings.CACHE_BACKEND_TYPE == "redis"
class AsyncCacheBackend(ABC):
"""
缓存后端基类,定义通用的缓存接口(异步)
"""
@abstractmethod
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,单位秒
:param region: 缓存的区
:param kwargs: 其他参数
"""
pass
@abstractmethod
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
pass
@abstractmethod
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
pass
@abstractmethod
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
pass
@abstractmethod
async def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
pass
@abstractmethod
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
pass
@abstractmethod
async def close(self) -> None:
"""
关闭缓存连接
"""
pass
@staticmethod
def get_region(region: Optional[str] = DEFAULT_CACHE_REGION):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
@staticmethod
def get_cache_key(func, args, kwargs):
"""
获取缓存的键,通过哈希函数对函数的参数进行处理
:param func: 被装饰的函数
:param args: 位置参数
:param kwargs: 关键字参数
:return: 缓存键
"""
signature = inspect.signature(func)
# 绑定传入的参数并应用默认值
bound = signature.bind(*args, **kwargs)
bound.apply_defaults()
# 忽略第一个参数,如果它是实例(self)或类(cls)
parameters = list(signature.parameters.keys())
if parameters and parameters[0] in ("self", "cls"):
bound.arguments.pop(parameters[0], None)
# 按照函数签名顺序提取参数值列表
keys = [
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
@staticmethod
def is_redis() -> bool:
return settings.CACHE_BACKEND_TYPE == "redis"
class CacheToolsBackend(CacheBackend):
"""
基于 `cachetools.TTLCache` 实现的缓存后端
特性:
- 支持动态设置缓存的 TTLTime To Live存活时间和最大条目数Maxsize
- 缓存实例按区域region划分不同 region 拥有独立的缓存实例
- 同一 region 共享相同的 TTL 和 Maxsize设置时只能作用于整个 region
限制:
- 不支持按 `key` 独立隔离 TTL 和 Maxsize仅支持作用于 region 级别
"""
def __init__(self, maxsize: Optional[int] = 1024, ttl: Optional[int] = 1800):
@@ -263,18 +374,9 @@ class CacheToolsBackend(CacheBackend):
class RedisBackend(CacheBackend):
"""
基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存
特性:
- 支持动态设置缓存的 TTLTime To Live存活时间
- 支持分区域region管理缓存不同的 region 采用独立的命名空间
- 支持自定义最大内存限制maxmemory和内存淘汰策略如 allkeys-lru
限制:
- 由于 Redis 的分布式特性,写入和读取可能受到网络延迟的影响
- Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存
"""
def __init__(self, ttl: Optional[int] = 1800):
def __init__(self, ttl: Optional[int] = None):
"""
初始化 Redis 缓存实例
@@ -350,19 +452,342 @@ class RedisBackend(CacheBackend):
self.redis_helper.close()
class AsyncRedisBackend(AsyncCacheBackend):
"""
基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存
"""
def __init__(self, ttl: Optional[int] = None):
"""
初始化 Redis 缓存实例
:param ttl: 缓存的存活时间,单位秒
"""
self.ttl = ttl
self.redis_helper = AsyncRedisHelper()
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,单位秒如果未传入则使用默认值
:param region: 缓存的区
:param kwargs: kwargs
"""
ttl = ttl or self.ttl
await self.redis_helper.set(key, value, ttl=ttl, region=region, **kwargs)
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
return await self.redis_helper.exists(key, region=region)
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
return await self.redis_helper.get(key, region=region)
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
await self.redis_helper.delete(key, region=region)
async def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
await self.redis_helper.clear(region=region)
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
return await self.redis_helper.items(region=region)
async def close(self) -> None:
"""
关闭 Redis 客户端的连接池
"""
await self.redis_helper.close()
class FileBackend(CacheBackend):
"""
基于 文件系统 实现的缓存后端
"""
def __init__(self, base: Path):
"""
初始化文件缓存实例
"""
self.base = base
if not self.base.exists():
self.base.mkdir(parents=True, exist_ok=True)
def set(self, key: str, value: Any, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param region: 缓存的区
:param kwargs: kwargs
"""
cache_path = self.base / region / key
# 确保缓存目录存在
cache_path.parent.mkdir(parents=True, exist_ok=True)
# 将值序列化为字符串存储
with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
tmp_file.write(value)
temp_path = Path(tmp_file.name)
temp_path.replace(cache_path)
def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
cache_path = self.base / key
return cache_path.exists()
def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
cache_path = self.base / region / key
if not cache_path.exists():
return None
with open(cache_path, 'rb') as f:
return f.read()
def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
cache_path = self.base / region / key
if cache_path.exists():
cache_path.unlink()
def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
if region:
# 清理指定缓存区
cache_path = self.base / region
if cache_path.exists():
for item in cache_path.iterdir():
if item.is_file():
item.unlink()
else:
shutil.rmtree(item, ignore_errors=True)
else:
# 清除所有区域的缓存
for item in self.base.iterdir():
if item.is_file():
item.unlink()
else:
shutil.rmtree(item, ignore_errors=True)
def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
cache_path = self.base / region
if not cache_path.exists():
return {}
for item in cache_path.iterdir():
if item.is_file():
with open(item, 'r') as f:
yield f.read()
def close(self) -> None:
"""
关闭 Redis 客户端的连接池
"""
pass
class AsyncFileBackend(AsyncCacheBackend):
"""
基于 文件系统 实现的缓存后端(异步模式)
"""
def __init__(self, base: Path):
"""
初始化文件缓存实例
"""
self.base = base
if not self.base.exists():
self.base.mkdir(parents=True, exist_ok=True)
async def set(self, key: str, value: Any, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param region: 缓存的区
:param kwargs: kwargs
"""
cache_path = AsyncPath(self.base) / region / key
# 确保缓存目录存在
await cache_path.parent.mkdir(parents=True, exist_ok=True)
# 保存文件
async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
await tmp_file.write(value)
temp_path = AsyncPath(tmp_file.name)
await temp_path.replace(cache_path)
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
cache_path = AsyncPath(self.base) / region / key
return await cache_path.exists()
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
cache_path = AsyncPath(self.base) / region / key
if not await cache_path.exists():
return None
async with aiofiles.open(cache_path, 'rb') as f:
return await f.read()
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
cache_path = AsyncPath(self.base) / region / key
if await cache_path.exists():
await cache_path.unlink()
async def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
if region:
# 清理指定缓存区
cache_path = AsyncPath(self.base) / region
if await cache_path.exists():
for item in cache_path.iterdir():
if await item.is_file():
await item.unlink()
else:
await aioshutil.rmtree(item, ignore_errors=True)
else:
# 清除所有区域的缓存
for item in AsyncPath(self.base).iterdir():
if await item.is_file():
await item.unlink()
else:
await aioshutil.rmtree(item, ignore_errors=True)
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
cache_path = AsyncPath(self.base) / region
if not await cache_path.exists():
yield None
for item in cache_path.iterdir():
if await item.is_file():
async with aiofiles.open(item, 'r') as f:
yield await f.read()
async def close(self) -> None:
"""
关闭 Redis 客户端的连接池
"""
pass
def get_file_cache_backend(base: Path = settings.TEMP_PATH) -> CacheBackend:
"""
获取文件缓存后端实例Redis或文件系统
"""
if settings.CACHE_BACKEND_TYPE == "redis":
return RedisBackend()
else:
return FileBackend(base=base)
def get_async_file_cache_backend(base: Path = settings.TEMP_PATH) -> AsyncCacheBackend:
"""
获取文件异步缓存后端实例Redis或文件系统
"""
if settings.CACHE_BACKEND_TYPE == "redis":
return AsyncRedisBackend()
else:
return AsyncFileBackend(base=base)
def get_cache_backend(maxsize: Optional[int] = 512, ttl: Optional[int] = 1800) -> CacheBackend:
"""
根据配置获取缓存后端实例
根据配置获取缓存后端实例内存或Redis
:param maxsize: 缓存的最大条目数仅使用cachetools时生效
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回缓存后端实例
"""
if settings.CACHE_BACKEND_TYPE == "redis":
logger.debug(f"Attempting to use RedisBackend, TTL: {ttl}")
return RedisBackend(ttl=ttl)
else:
logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}")
return CacheToolsBackend(maxsize=maxsize, ttl=ttl)
@@ -376,7 +801,7 @@ class TTLCache:
- 支持Redis和cachetools的切换
"""
def __init__(self, maxsize: int = 128, ttl: int = 600):
def __init__(self, maxsize: int = 128, ttl: int = 1800):
"""
初始化TTL缓存

View File

@@ -525,6 +525,30 @@ class AsyncRedisHelper(metaclass=Singleton):
except Exception as e:
logger.error(f"Failed to clear cache (async), region: {region}, error: {e}")
async def items(self, region: Optional[str] = None):
"""
获取指定区域的所有缓存键值对
:param region: 缓存的区
:return: 返回键值对生成器
"""
try:
await self._connect()
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
for key in self.client.scan_iter(redis_key):
value = await self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
else:
for key in self.client.scan_iter("*"):
value = await self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")
async def test(self) -> bool:
"""
异步测试Redis连接性