feat(redis): add configurable connection pool settings for Redis client

This commit is contained in:
jxxghp
2026-06-18 18:07:37 +08:00
parent 69ed70cc66
commit ccaeb7662c
3 changed files with 193 additions and 19 deletions

View File

@@ -160,6 +160,10 @@ class ConfigModel(BaseModel):
CACHE_BACKEND_URL: Optional[str] = "redis://localhost:6379"
# Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb"
CACHE_REDIS_MAXMEMORY: Optional[str] = None
# Redis 连接池最大连接数
CACHE_REDIS_MAX_CONNECTIONS: int = 256
# Redis 连接池耗尽时等待可用连接的时间(秒)
CACHE_REDIS_POOL_TIMEOUT: int = 3
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 全局图片缓存保留天数

View File

@@ -1,10 +1,12 @@
import asyncio
import json
import pickle
import threading
from typing import Any, Optional, Generator, Tuple, AsyncGenerator, Union
from urllib.parse import quote, unquote
import redis
from redis.asyncio import BlockingConnectionPool as AsyncBlockingConnectionPool
from redis.asyncio import Redis
from app.core.config import settings
@@ -83,7 +85,13 @@ class RedisHelper(ConfigReloadMixin, metaclass=Singleton):
- 支持内存限制和淘汰策略设置
- 提供键名生成和区域管理功能
"""
CONFIG_WATCH = {"CACHE_BACKEND_TYPE", "CACHE_BACKEND_URL", "CACHE_REDIS_MAXMEMORY"}
CONFIG_WATCH = {
"CACHE_BACKEND_TYPE",
"CACHE_BACKEND_URL",
"CACHE_REDIS_MAXMEMORY",
"CACHE_REDIS_MAX_CONNECTIONS",
"CACHE_REDIS_POOL_TIMEOUT",
}
def __init__(self):
"""
@@ -91,31 +99,46 @@ class RedisHelper(ConfigReloadMixin, metaclass=Singleton):
"""
self.redis_url = settings.CACHE_BACKEND_URL
self.client = None
self._connect_lock = threading.RLock()
def _connect(self):
"""
建立Redis连接
"""
if self.client is not None:
return
client = None
try:
if self.client is None:
self.client = redis.Redis.from_url(
with self._connect_lock:
if self.client is not None:
return
self.redis_url = settings.CACHE_BACKEND_URL
connection_pool = redis.BlockingConnectionPool.from_url(
self.redis_url,
decode_responses=False,
socket_timeout=_socket_timeout,
socket_connect_timeout=_socket_connect_timeout,
health_check_interval=_health_check_interval,
max_connections=settings.CACHE_REDIS_MAX_CONNECTIONS,
timeout=settings.CACHE_REDIS_POOL_TIMEOUT,
)
client = redis.Redis(connection_pool=connection_pool)
# 测试连接确保Redis可用
self.client.ping()
client.ping()
self.client = client
logger.info(f"Successfully connected to Redis{self.redis_url}")
self.set_memory_limit()
except Exception as e:
if client:
client.close()
logger.error(f"Failed to connect to Redis: {e}")
self.client = None
raise RuntimeError("Redis connection failed") from e
def on_config_changed(self):
self.close()
with self._connect_lock:
self.redis_url = settings.CACHE_BACKEND_URL
self.close()
self._connect()
def get_reload_name(self):
@@ -296,10 +319,11 @@ class RedisHelper(ConfigReloadMixin, metaclass=Singleton):
"""
关闭Redis客户端的连接池
"""
if self.client:
self.client.close()
self.client = None
logger.debug("Redis connection closed")
with self._connect_lock:
if self.client:
self.client.close()
self.client = None
logger.debug("Redis connection closed")
class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton):
@@ -313,7 +337,13 @@ class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton):
- 提供键名生成和区域管理功能
- 所有操作都是异步的
"""
CONFIG_WATCH = {"CACHE_BACKEND_TYPE", "CACHE_BACKEND_URL", "CACHE_REDIS_MAXMEMORY"}
CONFIG_WATCH = {
"CACHE_BACKEND_TYPE",
"CACHE_BACKEND_URL",
"CACHE_REDIS_MAXMEMORY",
"CACHE_REDIS_MAX_CONNECTIONS",
"CACHE_REDIS_POOL_TIMEOUT",
}
def __init__(self):
"""
@@ -322,31 +352,53 @@ class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton):
self.redis_url = settings.CACHE_BACKEND_URL
self.client: Optional[Redis] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._connect_lock: Optional[asyncio.Lock] = None
self._connect_lock_loop: Optional[asyncio.AbstractEventLoop] = None
def _get_connect_lock(self, current_loop: asyncio.AbstractEventLoop) -> asyncio.Lock:
"""
获取当前事件循环对应的异步连接锁
"""
if self._connect_lock is None or self._connect_lock_loop is not current_loop:
self._connect_lock = asyncio.Lock()
self._connect_lock_loop = current_loop
return self._connect_lock
async def _connect(self):
"""
建立异步Redis连接
"""
current_loop = asyncio.get_running_loop()
connect_lock = self._get_connect_lock(current_loop)
client = None
try:
current_loop = asyncio.get_running_loop()
# 检测事件循环是否发生变化,如果变化则重新连接
if self.client is not None and self._loop is not current_loop:
logger.debug("Event loop changed, reconnecting Redis (async)")
await self._close_client()
if self.client is None:
self.client = Redis.from_url(
async with connect_lock:
# 检测事件循环是否发生变化,如果变化则重新连接
if self.client is not None and self._loop is not current_loop:
logger.debug("Event loop changed, reconnecting Redis (async)")
await self._close_client()
if self.client is not None:
return
self.redis_url = settings.CACHE_BACKEND_URL
connection_pool = AsyncBlockingConnectionPool.from_url(
self.redis_url,
decode_responses=False,
socket_timeout=_socket_timeout,
socket_connect_timeout=_socket_connect_timeout,
health_check_interval=_health_check_interval,
max_connections=settings.CACHE_REDIS_MAX_CONNECTIONS,
timeout=settings.CACHE_REDIS_POOL_TIMEOUT,
)
client = Redis(connection_pool=connection_pool)
self._loop = current_loop
# 测试连接确保Redis可用
await self.client.ping()
await client.ping()
self.client = client
logger.info(f"Successfully connected to Redis (async){self.redis_url}")
await self.set_memory_limit()
except Exception as e:
if client:
await client.close()
logger.error(f"Failed to connect to Redis (async): {e}")
self.client = None
self._loop = None
@@ -365,6 +417,7 @@ class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton):
self._loop = None
async def on_config_changed(self):
self.redis_url = settings.CACHE_BACKEND_URL
await self._close_client()
await self._connect()

View File

@@ -1,7 +1,8 @@
import asyncio
from app.core.cache import AsyncFileBackend, FileBackend, MemoryBackend
from app.helper.redis import RedisHelper
from app.core.config import settings
from app.helper.redis import AsyncRedisHelper, RedisHelper
def test_file_backend_items_keep_relative_keys_and_bytes(tmp_path):
@@ -49,6 +50,122 @@ def test_redis_original_key_decodes_quoted_key():
assert RedisHelper._RedisHelper__get_original_key(redis_key) == "nested/poster one.jpg"
def test_redis_helper_uses_blocking_pool_settings(monkeypatch):
"""
Redis 同步客户端应使用阻塞连接池,避免并发峰值直接耗尽 Redis 连接数。
"""
calls = {}
class FakeClient:
"""模拟同步 Redis 客户端。"""
def __init__(self, connection_pool):
self.connection_pool = connection_pool
self.config_calls = []
self.closed = False
def ping(self):
"""模拟 Redis ping。"""
calls["ping"] = True
def config_set(self, key, value):
"""记录 Redis 配置写入。"""
self.config_calls.append((key, value))
def close(self):
"""标记客户端已关闭。"""
self.closed = True
def fake_from_url(url, **kwargs):
"""记录连接池构造参数。"""
calls["pool"] = {"url": url, **kwargs}
return "pool"
monkeypatch.setattr(settings, "CACHE_BACKEND_URL", "redis://cache:6379/2")
monkeypatch.setattr(settings, "CACHE_REDIS_MAX_CONNECTIONS", 7)
monkeypatch.setattr(settings, "CACHE_REDIS_POOL_TIMEOUT", 3)
monkeypatch.setattr("app.helper.redis.redis.BlockingConnectionPool.from_url", fake_from_url)
monkeypatch.setattr("app.helper.redis.redis.Redis", FakeClient)
helper = RedisHelper()
helper.close()
helper._connect()
assert calls["pool"]["url"] == "redis://cache:6379/2"
assert calls["pool"]["max_connections"] == 7
assert calls["pool"]["timeout"] == 3
assert calls["pool"]["decode_responses"] is False
assert calls["ping"] is True
assert ("maxmemory-policy", "allkeys-lru") in helper.client.config_calls
helper.close()
def test_async_redis_helper_uses_blocking_pool_settings(monkeypatch):
"""
Redis 异步客户端应使用阻塞连接池,避免高并发缓存读取立刻抛出连接耗尽错误。
"""
calls = {}
class FakeAsyncClient:
"""模拟异步 Redis 客户端。"""
def __init__(self, connection_pool):
self.connection_pool = connection_pool
self.config_calls = []
self.closed = False
async def ping(self):
"""模拟 Redis ping。"""
calls["ping"] = True
async def config_set(self, key, value):
"""记录 Redis 配置写入。"""
self.config_calls.append((key, value))
async def close(self):
"""标记客户端已关闭。"""
self.closed = True
def fake_from_url(url, **kwargs):
"""记录连接池构造参数。"""
calls["pool"] = {"url": url, **kwargs}
return "async_pool"
async def run_connect():
helper = AsyncRedisHelper()
await helper.close()
await helper._connect()
config_calls = list(helper.client.config_calls)
await helper.close()
return config_calls
monkeypatch.setattr(settings, "CACHE_BACKEND_URL", "redis://cache:6379/3")
monkeypatch.setattr(settings, "CACHE_REDIS_MAX_CONNECTIONS", 9)
monkeypatch.setattr(settings, "CACHE_REDIS_POOL_TIMEOUT", 4)
monkeypatch.setattr("app.helper.redis.AsyncBlockingConnectionPool.from_url", fake_from_url)
monkeypatch.setattr("app.helper.redis.Redis", FakeAsyncClient)
config_calls = asyncio.run(run_connect())
assert calls["pool"]["url"] == "redis://cache:6379/3"
assert calls["pool"]["max_connections"] == 9
assert calls["pool"]["timeout"] == 4
assert calls["pool"]["decode_responses"] is False
assert calls["ping"] is True
assert ("maxmemory-policy", "allkeys-lru") in config_calls
def test_redis_helpers_watch_pool_settings():
"""
Redis 连接池配置变化应触发客户端重建。
"""
assert "CACHE_REDIS_MAX_CONNECTIONS" in RedisHelper.CONFIG_WATCH
assert "CACHE_REDIS_POOL_TIMEOUT" in RedisHelper.CONFIG_WATCH
assert "CACHE_REDIS_MAX_CONNECTIONS" in AsyncRedisHelper.CONFIG_WATCH
assert "CACHE_REDIS_POOL_TIMEOUT" in AsyncRedisHelper.CONFIG_WATCH
def test_async_file_backend_missing_region_has_no_items(tmp_path):
"""
异步文件缓存缺失区域时应返回空迭代,而不是伪造空 key。