feat(redis-cluster): 支持集群模式逻辑多库隔离与 0-15 库切换

- 前端恢复 Redis 集群场景下 db0-db15 的数据库选择与展示
- 后端新增集群逻辑库命名空间前缀映射,统一 key/pattern 读写隔离
- 覆盖扫描、读取、写入、删除、重命名等核心操作的键映射规则
- 集群命令通道支持 SELECT 逻辑切库与 FLUSHDB 逻辑库清空
- refs #145
This commit is contained in:
Syngnat
2026-03-03 09:42:49 +08:00
parent c02e7c12e8
commit b904c0b107
7 changed files with 587 additions and 90 deletions

View File

@@ -106,6 +106,7 @@ const ConnectionModal: React.FC<{
const mysqlTopology = Form.useWatch('mysqlTopology', form) || 'single';
const mongoTopology = Form.useWatch('mongoTopology', form) || 'single';
const mongoSrv = Form.useWatch('mongoSrv', form) || false;
const redisTopology = Form.useWatch('redisTopology', form) || 'single';
const getSectionBg = (darkHex: string) => {
if (!darkMode) {
@@ -449,6 +450,35 @@ const ConnectionModal: React.FC<{
return { host: normalizeFileDbPath(safeDecode(rawPath)) };
}
if (type === 'redis') {
const parsed = parseMultiHostUri(trimmedUri, 'redis');
if (!parsed) {
return null;
}
if (!parsed.hosts.length || parsed.hosts.length > MAX_URI_HOSTS) {
return null;
}
if (parsed.hosts.some((entry) => !isValidUriHostEntry(entry))) {
return null;
}
const hostList = normalizeAddressList(parsed.hosts, 6379);
if (!hostList.length) {
return null;
}
const primary = parseHostPort(hostList[0] || 'localhost:6379', 6379);
const topologyParam = String(parsed.params.get('topology') || '').toLowerCase();
const dbText = String(parsed.database || '').trim().replace(/^\//, '');
const dbIndex = Number(dbText);
return {
host: primary?.host || 'localhost',
port: primary?.port || 6379,
password: parsed.password || '',
redisTopology: hostList.length > 1 || topologyParam === 'cluster' ? 'cluster' : 'single',
redisHosts: hostList.slice(1),
redisDB: Number.isFinite(dbIndex) && dbIndex >= 0 && dbIndex <= 15 ? Math.trunc(dbIndex) : 0,
};
}
if (type === 'mongodb') {
const parsed = parseMultiHostUri(trimmedUri, 'mongodb') || parseMultiHostUri(trimmedUri, 'mongodb+srv');
if (!parsed) {
@@ -547,6 +577,9 @@ const ConnectionModal: React.FC<{
if (dbType === 'clickhouse') {
return 'clickhouse://default:pass@127.0.0.1:9000/default';
}
if (dbType === 'redis') {
return 'redis://:pass@127.0.0.1:6379,127.0.0.2:6379/0?topology=cluster';
}
if (dbType === 'oracle') {
return 'oracle://user:pass@127.0.0.1:1521/ORCLPDB1';
}
@@ -585,6 +618,26 @@ const ConnectionModal: React.FC<{
return `${scheme}://${encodedAuth}${hosts.join(',')}${dbPath}${query ? `?${query}` : ''}`;
}
if (type === 'redis') {
const primary = toAddress(host, port, 6379);
const clusterHosts = values.redisTopology === 'cluster'
? normalizeAddressList(values.redisHosts, 6379)
: [];
const hosts = normalizeAddressList([primary, ...clusterHosts], 6379);
const params = new URLSearchParams();
if (hosts.length > 1 || values.redisTopology === 'cluster') {
params.set('topology', 'cluster');
}
const redisPassword = String(values.password || '');
const redisAuth = redisPassword ? `:${encodeURIComponent(redisPassword)}@` : '';
const redisDB = Number.isFinite(Number(values.redisDB))
? Math.max(0, Math.min(15, Math.trunc(Number(values.redisDB))))
: 0;
const dbPath = `/${redisDB}`;
const query = params.toString();
return `redis://${redisAuth}${hosts.join(',')}${dbPath}${query ? `?${query}` : ''}`;
}
if (isFileDatabaseType(type)) {
const pathText = normalizeFileDbPath(String(values.host || '').trim());
if (!pathText) {
@@ -770,8 +823,10 @@ const ConnectionModal: React.FC<{
: (primaryAddress?.port || Number(config.port || defaultPort));
const mysqlReplicaHosts = (configType === 'mysql' || configType === 'mariadb' || configType === 'diros' || configType === 'sphinx') ? normalizedHosts.slice(1) : [];
const mongoHosts = configType === 'mongodb' ? normalizedHosts.slice(1) : [];
const redisHosts = configType === 'redis' ? normalizedHosts.slice(1) : [];
const mysqlIsReplica = String(config.topology || '').toLowerCase() === 'replica' || mysqlReplicaHosts.length > 0;
const mongoIsReplica = String(config.topology || '').toLowerCase() === 'replica' || mongoHosts.length > 0 || !!config.replicaSet;
const redisIsCluster = String(config.topology || '').toLowerCase() === 'cluster' || redisHosts.length > 0;
form.setFieldsValue({
type: configType,
name: initialValues.name,
@@ -804,12 +859,15 @@ const ConnectionModal: React.FC<{
mysqlReplicaPassword: config.mysqlReplicaPassword || '',
mongoTopology: mongoIsReplica ? 'replica' : 'single',
mongoHosts: mongoHosts,
redisTopology: redisIsCluster ? 'cluster' : 'single',
redisHosts: redisHosts,
mongoSrv: !!config.mongoSrv,
mongoReplicaSet: config.replicaSet || '',
mongoAuthSource: config.authSource || '',
mongoReadPreference: config.readPreference || 'primary',
mongoAuthMechanism: config.mongoAuthMechanism || '',
savePassword: config.savePassword !== false,
redisDB: Number.isFinite(Number(config.redisDB)) ? Number(config.redisDB) : 0,
mongoReplicaUser: config.mongoReplicaUser || '',
mongoReplicaPassword: config.mongoReplicaPassword || ''
});
@@ -924,7 +982,6 @@ const ConnectionModal: React.FC<{
if (res.success) {
setTestResult({ type: 'success', message: res.message });
if (isRedisType) {
// Redis: generate database list 0-15
setRedisDbList(Array.from({ length: 16 }, (_, i) => i));
} else {
// Other databases: fetch database list
@@ -1033,7 +1090,7 @@ const ConnectionModal: React.FC<{
}
let hosts: string[] = [];
let topology: 'single' | 'replica' | undefined;
let topology: 'single' | 'replica' | 'cluster' | undefined;
let replicaSet = '';
let authSource = '';
let readPreference = '';
@@ -1087,6 +1144,22 @@ const ConnectionModal: React.FC<{
mongoAuthMechanism = String(mergedValues.mongoAuthMechanism || '').trim().toUpperCase();
}
if (type === 'redis') {
const clusterNodes = mergedValues.redisTopology === 'cluster'
? normalizeAddressList(mergedValues.redisHosts, defaultPort)
: [];
const allHosts = normalizeAddressList([`${primaryHost}:${primaryPort}`, ...clusterNodes], defaultPort);
if (mergedValues.redisTopology === 'cluster' || allHosts.length > 1) {
hosts = allHosts;
topology = 'cluster';
} else {
topology = 'single';
}
mergedValues.redisDB = Number.isFinite(Number(mergedValues.redisDB))
? Math.max(0, Math.min(15, Math.trunc(Number(mergedValues.redisDB))))
: 0;
}
const sshConfig = mergedValues.useSSH ? {
host: mergedValues.sshHost,
port: Number(mergedValues.sshPort),
@@ -1128,6 +1201,9 @@ const ConnectionModal: React.FC<{
driver: mergedValues.driver,
dsn: mergedValues.dsn,
timeout: Number(mergedValues.timeout || 30),
redisDB: Number.isFinite(Number(mergedValues.redisDB))
? Math.max(0, Math.min(15, Math.trunc(Number(mergedValues.redisDB))))
: 0,
uri: String(mergedValues.uri || '').trim(),
hosts: hosts,
topology: topology,
@@ -1178,6 +1254,7 @@ const ConnectionModal: React.FC<{
proxyUser: '',
proxyPassword: '',
mysqlTopology: 'single',
redisTopology: 'single',
mongoTopology: 'single',
mongoSrv: false,
mongoReadPreference: 'primary',
@@ -1186,11 +1263,13 @@ const ConnectionModal: React.FC<{
mongoAuthMechanism: '',
savePassword: true,
mysqlReplicaHosts: [],
redisHosts: [],
mongoHosts: [],
mysqlReplicaUser: '',
mysqlReplicaPassword: '',
mongoReplicaUser: '',
mongoReplicaPassword: '',
redisDB: 0,
});
} else if (type !== 'custom') {
const defaultUser = type === 'clickhouse' ? 'default' : 'root';
@@ -1199,6 +1278,7 @@ const ConnectionModal: React.FC<{
database: '',
port: defaultPort,
mysqlTopology: 'single',
redisTopology: 'single',
mongoTopology: 'single',
mongoSrv: false,
mongoReadPreference: 'primary',
@@ -1207,11 +1287,13 @@ const ConnectionModal: React.FC<{
mongoAuthMechanism: '',
savePassword: true,
mysqlReplicaHosts: [],
redisHosts: [],
mongoHosts: [],
mysqlReplicaUser: '',
mysqlReplicaPassword: '',
mongoReplicaUser: '',
mongoReplicaPassword: '',
redisDB: 0,
});
}
@@ -1346,17 +1428,20 @@ const ConnectionModal: React.FC<{
timeout: 30,
uri: '',
mysqlTopology: 'single',
redisTopology: 'single',
mongoTopology: 'single',
mongoSrv: false,
mongoReadPreference: 'primary',
mongoAuthMechanism: '',
savePassword: true,
mysqlReplicaHosts: [],
redisHosts: [],
mongoHosts: [],
mysqlReplicaUser: '',
mysqlReplicaPassword: '',
mongoReplicaUser: '',
mongoReplicaPassword: '',
redisDB: 0,
}}
onValuesChange={(changed) => {
if (testResult) {
@@ -1384,6 +1469,17 @@ const ConnectionModal: React.FC<{
}
// Type change handled by step 1, but keep sync if select changes (hidden now)
if (changed.type !== undefined) setDbType(changed.type);
if (changed.redisTopology !== undefined) {
const supportedDbs = Array.from({ length: 16 }, (_, i) => i);
setRedisDbList(supportedDbs);
const selectedDbsRaw = form.getFieldValue('includeRedisDatabases');
const selectedDbs = Array.isArray(selectedDbsRaw) ? selectedDbsRaw.map((entry: any) => Number(entry)) : [];
const validDbs = selectedDbs
.filter((entry: number) => Number.isFinite(entry))
.map((entry: number) => Math.trunc(entry))
.filter((entry: number) => supportedDbs.includes(entry));
form.setFieldValue('includeRedisDatabases', validDbs.length > 0 ? validDbs : undefined);
}
if (
changed.type !== undefined
|| changed.host !== undefined
@@ -1657,11 +1753,36 @@ const ConnectionModal: React.FC<{
{/* Redis specific: password only, no username */}
{isRedis && (
<>
<Form.Item name="redisTopology" label="连接模式">
<Select
options={[
{ value: 'single', label: '单机模式' },
{ value: 'cluster', label: '集群模式Redis Cluster' },
]}
/>
</Form.Item>
{redisTopology === 'cluster' && (
<Form.Item
name="redisHosts"
label="集群附加节点地址"
help="主节点使用上方主机地址这里填写其他种子节点格式host:port"
>
<Select mode="tags" placeholder="例如10.10.0.12:6379、10.10.0.13:6379" tokenSeparators={[',', ';', ' ']} />
</Form.Item>
)}
<Form.Item name="password" label="密码 (可选)">
<Input.Password placeholder="Redis 密码(如果设置了 requirepass" />
</Form.Item>
<Form.Item name="includeRedisDatabases" label="显示数据库 (留空显示全部)" help="连接测试成功后可选择">
<Select mode="multiple" placeholder="选择显示的数据库 (0-15)" allowClear>
<Form.Item
name="includeRedisDatabases"
label="显示数据库 (留空显示全部)"
help="连接测试成功后可选择"
>
<Select
mode="multiple"
placeholder="选择显示的数据库 (0-15)"
allowClear
>
{redisDbList.map(db => <Select.Option key={db} value={db}>db{db}</Select.Option>)}
</Select>
</Form.Item>

View File

@@ -199,7 +199,7 @@ const sanitizeConnectionConfig = (value: unknown): ConnectionConfig => {
proxy,
uri: toTrimmedString(raw.uri).slice(0, MAX_URI_LENGTH),
hosts: sanitizeAddressList(raw.hosts),
topology: raw.topology === 'replica' ? 'replica' : 'single',
topology: raw.topology === 'replica' ? 'replica' : (raw.topology === 'cluster' ? 'cluster' : 'single'),
mysqlReplicaUser: toTrimmedString(raw.mysqlReplicaUser),
mysqlReplicaPassword: savePassword ? toTrimmedString(raw.mysqlReplicaPassword) : '',
replicaSet: toTrimmedString(raw.replicaSet),

View File

@@ -32,7 +32,7 @@ export interface ConnectionConfig {
redisDB?: number; // Redis database index (0-15)
uri?: string; // Connection URI for copy/paste
hosts?: string[]; // Multi-host addresses: host:port
topology?: 'single' | 'replica';
topology?: 'single' | 'replica' | 'cluster';
mysqlReplicaUser?: string;
mysqlReplicaPassword?: string;
replicaSet?: string;

View File

@@ -67,24 +67,27 @@ func getRedisClientCacheKey(config connection.ConnectionConfig) string {
}
func formatRedisConnSummary(config connection.ConnectionConfig) string {
timeoutSeconds := config.Timeout
if timeoutSeconds <= 0 {
timeoutSeconds = 30
}
var b strings.Builder
b.WriteString("类型=redis 地址=")
b.WriteString(config.Host)
b.WriteString(":")
b.WriteString(string(rune(config.Port + '0')))
b.WriteString(strconv.Itoa(config.Port))
if topology := strings.TrimSpace(config.Topology); topology != "" {
b.WriteString(" 模式=")
b.WriteString(topology)
}
if len(config.Hosts) > 0 {
b.WriteString(" 节点数=")
b.WriteString(strconv.Itoa(len(config.Hosts)))
}
b.WriteString(" DB=")
b.WriteString(string(rune(config.RedisDB + '0')))
b.WriteString(strconv.Itoa(config.RedisDB))
if config.UseSSH {
b.WriteString(" SSH=")
b.WriteString(config.SSH.Host)
b.WriteString(":")
b.WriteString(string(rune(config.SSH.Port + '0')))
b.WriteString(strconv.Itoa(config.SSH.Port))
b.WriteString(" 用户=")
b.WriteString(config.SSH.User)
}

View File

@@ -37,7 +37,7 @@ type ConnectionConfig struct {
RedisDB int `json:"redisDB,omitempty"` // Redis database index (0-15)
URI string `json:"uri,omitempty"` // Connection URI for copy/paste
Hosts []string `json:"hosts,omitempty"` // Multi-host addresses: host:port
Topology string `json:"topology,omitempty"` // single | replica
Topology string `json:"topology,omitempty"` // single | replica | cluster
MySQLReplicaUser string `json:"mysqlReplicaUser,omitempty"` // MySQL replica auth user
MySQLReplicaPassword string `json:"mysqlReplicaPassword,omitempty"` // MySQL replica auth password
ReplicaSet string `json:"replicaSet,omitempty"` // MongoDB replica set name

View File

@@ -12,7 +12,7 @@ type RedisValue struct {
// RedisDBInfo represents information about a Redis database
type RedisDBInfo struct {
Index int `json:"index"` // Database index (0-15)
Index int `json:"index"` // Database index (single: 0-15, cluster: logical 0-15)
Keys int64 `json:"keys"` // Number of keys in this database
}

View File

@@ -3,8 +3,10 @@ package redis
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"GoNavi-Wails/internal/connection"
@@ -16,10 +18,14 @@ import (
// RedisClientImpl implements RedisClient using go-redis
type RedisClientImpl struct {
client *redis.Client
config connection.ConnectionConfig
currentDB int
forwarder *ssh.LocalForwarder
client redis.UniversalClient
singleClient *redis.Client
clusterClient *redis.ClusterClient
config connection.ConnectionConfig
currentDB int
isCluster bool
seedAddrs []string
forwarder *ssh.LocalForwarder
}
const (
@@ -40,14 +46,183 @@ func NewRedisClient() RedisClient {
return &RedisClientImpl{}
}
func normalizeRedisTimeout(timeoutSeconds int) time.Duration {
if timeoutSeconds <= 0 {
return 30 * time.Second
}
return time.Duration(timeoutSeconds) * time.Second
}
func normalizeRedisSeedAddress(raw string, defaultPort int) (string, error) {
addr := strings.TrimSpace(raw)
if addr == "" {
return "", fmt.Errorf("Redis 节点地址不能为空")
}
if _, _, err := net.SplitHostPort(addr); err == nil {
return addr, nil
}
if !strings.Contains(addr, ":") {
return net.JoinHostPort(addr, strconv.Itoa(defaultPort)), nil
}
// 尝试兼容 host:port 但端口格式异常的场景。
host, port, ok := strings.Cut(addr, ":")
if !ok {
return "", fmt.Errorf("无效 Redis 节点地址: %s", addr)
}
host = strings.TrimSpace(host)
port = strings.TrimSpace(port)
if host == "" {
return "", fmt.Errorf("无效 Redis 节点地址: %s", addr)
}
if _, err := strconv.Atoi(port); err != nil {
return "", fmt.Errorf("无效 Redis 端口: %s", addr)
}
return net.JoinHostPort(host, port), nil
}
func buildRedisSeedAddrs(config connection.ConnectionConfig) ([]string, error) {
defaultPort := config.Port
if defaultPort <= 0 {
defaultPort = 6379
}
candidates := make([]string, 0, 1+len(config.Hosts))
if strings.TrimSpace(config.Host) != "" {
candidates = append(candidates, fmt.Sprintf("%s:%d", strings.TrimSpace(config.Host), defaultPort))
}
candidates = append(candidates, config.Hosts...)
seen := make(map[string]struct{}, len(candidates))
addrs := make([]string, 0, len(candidates))
for _, candidate := range candidates {
normalized, err := normalizeRedisSeedAddress(candidate, defaultPort)
if err != nil {
return nil, err
}
if _, exists := seen[normalized]; exists {
continue
}
seen[normalized] = struct{}{}
addrs = append(addrs, normalized)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("Redis 连接地址不能为空")
}
return addrs, nil
}
func (r *RedisClientImpl) redisNamespacePrefixForDB(index int) string {
if !r.isCluster || index <= 0 {
return ""
}
// Redis Cluster 仅支持物理 db0这里用固定前缀模拟逻辑库隔离。
return fmt.Sprintf("__gonavi_db_%d__:", index)
}
func (r *RedisClientImpl) redisNamespacePrefix() string {
return r.redisNamespacePrefixForDB(r.currentDB)
}
func (r *RedisClientImpl) toPhysicalKey(key string) string {
trimmed := strings.TrimSpace(key)
if trimmed == "" {
return ""
}
prefix := r.redisNamespacePrefix()
if prefix == "" || strings.HasPrefix(trimmed, prefix) {
return trimmed
}
return prefix + trimmed
}
func (r *RedisClientImpl) toPhysicalPattern(pattern string) string {
normalized := strings.TrimSpace(pattern)
if normalized == "" {
normalized = "*"
}
prefix := r.redisNamespacePrefix()
if prefix == "" {
return normalized
}
return prefix + normalized
}
func (r *RedisClientImpl) toPhysicalKeys(keys []string) []string {
if len(keys) == 0 {
return nil
}
result := make([]string, 0, len(keys))
for _, key := range keys {
physical := r.toPhysicalKey(key)
if physical == "" {
continue
}
result = append(result, physical)
}
return result
}
func (r *RedisClientImpl) toDisplayKey(key string) string {
prefix := r.redisNamespacePrefix()
if prefix == "" {
return key
}
return strings.TrimPrefix(key, prefix)
}
// Connect establishes a connection to Redis
func (r *RedisClientImpl) Connect(config connection.ConnectionConfig) error {
r.config = config
r.currentDB = config.RedisDB
if r.config.RedisDB < 0 || r.config.RedisDB > 15 {
r.config.RedisDB = 0
}
r.currentDB = r.config.RedisDB
r.forwarder = nil
r.client = nil
r.singleClient = nil
r.clusterClient = nil
r.isCluster = false
addr := fmt.Sprintf("%s:%d", config.Host, config.Port)
seedAddrs, err := buildRedisSeedAddrs(config)
if err != nil {
return err
}
r.seedAddrs = append([]string(nil), seedAddrs...)
// Handle SSH tunnel if enabled
topology := strings.ToLower(strings.TrimSpace(config.Topology))
r.isCluster = topology == "cluster" || len(seedAddrs) > 1
if r.isCluster && config.UseSSH {
return fmt.Errorf("Redis 集群模式暂不支持 SSH 隧道,请关闭 SSH 后重试")
}
timeout := normalizeRedisTimeout(config.Timeout)
if r.isCluster {
opts := &redis.ClusterOptions{
Addrs: seedAddrs,
Username: strings.TrimSpace(config.User),
Password: config.Password,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
clusterClient := redis.NewClusterClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := clusterClient.Ping(ctx).Err(); err != nil {
clusterClient.Close()
return fmt.Errorf("Redis 集群连接失败: %w", err)
}
r.client = clusterClient
r.clusterClient = clusterClient
logger.Infof("Redis 集群连接成功: seeds=%s 逻辑库=db%d", strings.Join(seedAddrs, ","), r.currentDB)
return nil
}
addr := seedAddrs[0]
if config.UseSSH {
forwarder, err := ssh.GetOrCreateLocalForwarder(config.SSH, config.Host, config.Port)
if err != nil {
@@ -60,32 +235,26 @@ func (r *RedisClientImpl) Connect(config connection.ConnectionConfig) error {
opts := &redis.Options{
Addr: addr,
Username: strings.TrimSpace(config.User),
Password: config.Password,
DB: config.RedisDB,
DialTimeout: time.Duration(config.Timeout) * time.Second,
ReadTimeout: time.Duration(config.Timeout) * time.Second,
WriteTimeout: time.Duration(config.Timeout) * time.Second,
DB: r.currentDB,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
if opts.DialTimeout == 0 {
opts.DialTimeout = 30 * time.Second
opts.ReadTimeout = 30 * time.Second
opts.WriteTimeout = 30 * time.Second
}
r.client = redis.NewClient(opts)
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), opts.DialTimeout)
singleClient := redis.NewClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := r.client.Ping(ctx).Err(); err != nil {
r.client.Close()
r.client = nil
if err := singleClient.Ping(ctx).Err(); err != nil {
singleClient.Close()
return fmt.Errorf("Redis 连接失败: %w", err)
}
logger.Infof("Redis 连接成功: %s DB=%d", addr, config.RedisDB)
r.client = singleClient
r.singleClient = singleClient
logger.Infof("Redis 连接成功: %s DB=%d", addr, r.currentDB)
return nil
}
@@ -94,6 +263,11 @@ func (r *RedisClientImpl) Close() error {
if r.client != nil {
err := r.client.Close()
r.client = nil
r.singleClient = nil
r.clusterClient = nil
r.isCluster = false
r.seedAddrs = nil
r.forwarder = nil
return err
}
return nil
@@ -118,6 +292,7 @@ func (r *RedisClientImpl) ScanKeys(pattern string, cursor uint64, count int64) (
if pattern == "" {
pattern = "*"
}
physicalPattern := r.toPhysicalPattern(pattern)
isSearchPattern := pattern != "*"
targetCount := normalizeRedisScanTargetCount(count)
@@ -150,7 +325,7 @@ func (r *RedisClientImpl) ScanKeys(pattern string, cursor uint64, count int64) (
break
}
batch, nextCursor, err := r.client.Scan(ctx, currentCursor, pattern, scanStepCount).Result()
batch, nextCursor, err := r.client.Scan(ctx, currentCursor, physicalPattern, scanStepCount).Result()
if err != nil {
return nil, err
}
@@ -226,7 +401,7 @@ func (r *RedisClientImpl) loadRedisKeyInfos(ctx context.Context, keys []string)
ttlValue = -2
}
result = append(result, RedisKeyInfo{
Key: key,
Key: r.toDisplayKey(key),
Type: keyType,
TTL: toRedisTTLSeconds(ttlValue),
})
@@ -236,7 +411,7 @@ func (r *RedisClientImpl) loadRedisKeyInfos(ctx context.Context, keys []string)
for i, key := range keys {
result = append(result, RedisKeyInfo{
Key: key,
Key: r.toDisplayKey(key),
Type: typeResults[i].Val(),
TTL: toRedisTTLSeconds(ttlResults[i].Val()),
})
@@ -261,7 +436,7 @@ func (r *RedisClientImpl) GetKeyType(key string) (string, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Type(ctx, key).Result()
return r.client.Type(ctx, r.toPhysicalKey(key)).Result()
}
// GetTTL returns the TTL of a key in seconds
@@ -272,7 +447,7 @@ func (r *RedisClientImpl) GetTTL(key string) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ttl, err := r.client.TTL(ctx, key).Result()
ttl, err := r.client.TTL(ctx, r.toPhysicalKey(key)).Result()
if err != nil {
return 0, err
}
@@ -295,9 +470,9 @@ func (r *RedisClientImpl) SetTTL(key string, ttl int64) error {
if ttl < 0 {
// Remove expiry
return r.client.Persist(ctx, key).Err()
return r.client.Persist(ctx, r.toPhysicalKey(key)).Err()
}
return r.client.Expire(ctx, key, time.Duration(ttl)*time.Second).Err()
return r.client.Expire(ctx, r.toPhysicalKey(key), time.Duration(ttl)*time.Second).Err()
}
// DeleteKeys deletes one or more keys
@@ -307,7 +482,11 @@ func (r *RedisClientImpl) DeleteKeys(keys []string) (int64, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.Del(ctx, keys...).Result()
physicalKeys := r.toPhysicalKeys(keys)
if len(physicalKeys) == 0 {
return 0, nil
}
return r.client.Del(ctx, physicalKeys...).Result()
}
// RenameKey renames a key
@@ -317,7 +496,7 @@ func (r *RedisClientImpl) RenameKey(oldKey, newKey string) error {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Rename(ctx, oldKey, newKey).Err()
return r.client.Rename(ctx, r.toPhysicalKey(oldKey), r.toPhysicalKey(newKey)).Err()
}
// KeyExists checks if a key exists
@@ -327,7 +506,7 @@ func (r *RedisClientImpl) KeyExists(key string) (bool, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
n, err := r.client.Exists(ctx, key).Result()
n, err := r.client.Exists(ctx, r.toPhysicalKey(key)).Result()
return n > 0, err
}
@@ -343,6 +522,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
}
ttl, _ := r.GetTTL(key)
physicalKey := r.toPhysicalKey(key)
result := &RedisValue{
Type: keyType,
@@ -354,7 +534,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
switch keyType {
case "string":
val, err := r.client.Get(ctx, key).Result()
val, err := r.client.Get(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
@@ -362,7 +542,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
result.Length = int64(len(val))
case "hash":
val, err := r.client.HGetAll(ctx, key).Result()
val, err := r.client.HGetAll(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
@@ -370,7 +550,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
result.Length = int64(len(val))
case "list":
length, err := r.client.LLen(ctx, key).Result()
length, err := r.client.LLen(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
@@ -379,7 +559,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
if length < limit {
limit = length
}
val, err := r.client.LRange(ctx, key, 0, limit-1).Result()
val, err := r.client.LRange(ctx, physicalKey, 0, limit-1).Result()
if err != nil {
return nil, err
}
@@ -387,12 +567,12 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
result.Length = length
case "set":
length, err := r.client.SCard(ctx, key).Result()
length, err := r.client.SCard(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
// Get members using SMembers (limited by Redis server)
members, err := r.client.SMembers(ctx, key).Result()
members, err := r.client.SMembers(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
@@ -400,7 +580,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
result.Length = length
case "zset":
length, err := r.client.ZCard(ctx, key).Result()
length, err := r.client.ZCard(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
@@ -409,7 +589,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
if length < limit {
limit = length
}
val, err := r.client.ZRangeWithScores(ctx, key, 0, limit-1).Result()
val, err := r.client.ZRangeWithScores(ctx, physicalKey, 0, limit-1).Result()
if err != nil {
return nil, err
}
@@ -424,7 +604,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
result.Length = length
case "stream":
length, err := r.client.XLen(ctx, key).Result()
length, err := r.client.XLen(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
@@ -437,7 +617,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
if length < limit {
limit = length
}
val, err := r.client.XRangeN(ctx, key, "-", "+", limit).Result()
val, err := r.client.XRangeN(ctx, physicalKey, "-", "+", limit).Result()
if err != nil {
return nil, err
}
@@ -457,7 +637,7 @@ func (r *RedisClientImpl) GetString(key string) (string, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Get(ctx, key).Result()
return r.client.Get(ctx, r.toPhysicalKey(key)).Result()
}
// SetString sets a string value with optional TTL
@@ -472,7 +652,7 @@ func (r *RedisClientImpl) SetString(key, value string, ttl int64) error {
if ttl > 0 {
expiration = time.Duration(ttl) * time.Second
}
return r.client.Set(ctx, key, value, expiration).Err()
return r.client.Set(ctx, r.toPhysicalKey(key), value, expiration).Err()
}
// GetHash gets all fields of a hash
@@ -482,7 +662,7 @@ func (r *RedisClientImpl) GetHash(key string) (map[string]string, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.HGetAll(ctx, key).Result()
return r.client.HGetAll(ctx, r.toPhysicalKey(key)).Result()
}
// SetHashField sets a field in a hash
@@ -492,7 +672,7 @@ func (r *RedisClientImpl) SetHashField(key, field, value string) error {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.HSet(ctx, key, field, value).Err()
return r.client.HSet(ctx, r.toPhysicalKey(key), field, value).Err()
}
// DeleteHashField deletes fields from a hash
@@ -502,7 +682,7 @@ func (r *RedisClientImpl) DeleteHashField(key string, fields ...string) error {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.HDel(ctx, key, fields...).Err()
return r.client.HDel(ctx, r.toPhysicalKey(key), fields...).Err()
}
// GetList gets a range of elements from a list
@@ -512,7 +692,7 @@ func (r *RedisClientImpl) GetList(key string, start, stop int64) ([]string, erro
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.LRange(ctx, key, start, stop).Result()
return r.client.LRange(ctx, r.toPhysicalKey(key), start, stop).Result()
}
// ListPush pushes values to the end of a list
@@ -526,7 +706,7 @@ func (r *RedisClientImpl) ListPush(key string, values ...string) error {
for i, v := range values {
args[i] = v
}
return r.client.RPush(ctx, key, args...).Err()
return r.client.RPush(ctx, r.toPhysicalKey(key), args...).Err()
}
// ListSet sets the value at an index in a list
@@ -536,7 +716,7 @@ func (r *RedisClientImpl) ListSet(key string, index int64, value string) error {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.LSet(ctx, key, index, value).Err()
return r.client.LSet(ctx, r.toPhysicalKey(key), index, value).Err()
}
// GetSet gets all members of a set
@@ -546,7 +726,7 @@ func (r *RedisClientImpl) GetSet(key string) ([]string, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.SMembers(ctx, key).Result()
return r.client.SMembers(ctx, r.toPhysicalKey(key)).Result()
}
// SetAdd adds members to a set
@@ -560,7 +740,7 @@ func (r *RedisClientImpl) SetAdd(key string, members ...string) error {
for i, m := range members {
args[i] = m
}
return r.client.SAdd(ctx, key, args...).Err()
return r.client.SAdd(ctx, r.toPhysicalKey(key), args...).Err()
}
// SetRemove removes members from a set
@@ -574,7 +754,7 @@ func (r *RedisClientImpl) SetRemove(key string, members ...string) error {
for i, m := range members {
args[i] = m
}
return r.client.SRem(ctx, key, args...).Err()
return r.client.SRem(ctx, r.toPhysicalKey(key), args...).Err()
}
// GetZSet gets members with scores from a sorted set
@@ -585,7 +765,7 @@ func (r *RedisClientImpl) GetZSet(key string, start, stop int64) ([]ZSetMember,
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
val, err := r.client.ZRangeWithScores(ctx, key, start, stop).Result()
val, err := r.client.ZRangeWithScores(ctx, r.toPhysicalKey(key), start, stop).Result()
if err != nil {
return nil, err
}
@@ -615,7 +795,7 @@ func (r *RedisClientImpl) ZSetAdd(key string, members ...ZSetMember) error {
Member: m.Member,
}
}
return r.client.ZAdd(ctx, key, zMembers...).Err()
return r.client.ZAdd(ctx, r.toPhysicalKey(key), zMembers...).Err()
}
// ZSetRemove removes members from a sorted set
@@ -629,7 +809,7 @@ func (r *RedisClientImpl) ZSetRemove(key string, members ...string) error {
for i, m := range members {
args[i] = m
}
return r.client.ZRem(ctx, key, args...).Err()
return r.client.ZRem(ctx, r.toPhysicalKey(key), args...).Err()
}
// GetStream gets stream entries in a range
@@ -650,7 +830,7 @@ func (r *RedisClientImpl) GetStream(key, start, stop string, count int64) ([]Str
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
val, err := r.client.XRangeN(ctx, key, start, stop, count).Result()
val, err := r.client.XRangeN(ctx, r.toPhysicalKey(key), start, stop, count).Result()
if err != nil {
return nil, err
}
@@ -678,7 +858,7 @@ func (r *RedisClientImpl) StreamAdd(key string, fields map[string]string, id str
defer cancel()
newID, err := r.client.XAdd(ctx, &redis.XAddArgs{
Stream: key,
Stream: r.toPhysicalKey(key),
ID: id,
Values: values,
}).Result()
@@ -699,7 +879,7 @@ func (r *RedisClientImpl) StreamDelete(key string, ids ...string) (int64, error)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.XDel(ctx, key, ids...).Result()
return r.client.XDel(ctx, r.toPhysicalKey(key), ids...).Result()
}
func toStreamEntries(messages []redis.XMessage) []StreamEntry {
@@ -717,6 +897,72 @@ func toStreamEntries(messages []redis.XMessage) []StreamEntry {
return entries
}
func parseRedisCommandGetKeysResult(result interface{}) []string {
items, ok := result.([]interface{})
if !ok || len(items) == 0 {
return nil
}
keys := make([]string, 0, len(items))
for _, item := range items {
switch v := item.(type) {
case string:
if v != "" {
keys = append(keys, v)
}
case []byte:
text := string(v)
if text != "" {
keys = append(keys, text)
}
}
}
return keys
}
func (r *RedisClientImpl) rewriteCommandArgsForNamespace(ctx context.Context, args []string) []string {
if !r.isCluster || r.currentDB <= 0 || len(args) == 0 {
return args
}
command := strings.ToUpper(strings.TrimSpace(args[0]))
if command == "COMMAND" || command == "SELECT" || command == "FLUSHDB" {
return args
}
probeArgs := make([]interface{}, 0, len(args)+2)
probeArgs = append(probeArgs, "COMMAND", "GETKEYS")
for _, arg := range args {
probeArgs = append(probeArgs, arg)
}
result, err := r.client.Do(ctx, probeArgs...).Result()
if err != nil {
return args
}
keyCandidates := parseRedisCommandGetKeysResult(result)
if len(keyCandidates) == 0 {
return args
}
rewritten := append([]string(nil), args...)
used := make([]bool, len(rewritten))
for _, key := range keyCandidates {
for i := 1; i < len(rewritten); i++ {
if used[i] {
continue
}
if rewritten[i] != key {
continue
}
rewritten[i] = r.toPhysicalKey(rewritten[i])
used[i] = true
break
}
}
return rewritten
}
// ExecuteCommand executes a raw Redis command
func (r *RedisClientImpl) ExecuteCommand(args []string) (interface{}, error) {
if r.client == nil {
@@ -729,6 +975,33 @@ func (r *RedisClientImpl) ExecuteCommand(args []string) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if r.isCluster {
command := strings.ToUpper(strings.TrimSpace(args[0]))
switch command {
case "SELECT":
if len(args) < 2 {
return nil, fmt.Errorf("SELECT 命令缺少数据库索引")
}
index, err := strconv.Atoi(strings.TrimSpace(args[1]))
if err != nil {
return nil, fmt.Errorf("无效数据库索引: %s", args[1])
}
if index < 0 || index > 15 {
return nil, fmt.Errorf("数据库索引必须在 0-15 之间")
}
r.currentDB = index
r.config.RedisDB = index
return "OK", nil
case "FLUSHDB":
if err := r.FlushDB(); err != nil {
return nil, err
}
return "OK", nil
}
}
args = r.rewriteCommandArgsForNamespace(ctx, args)
// Convert to []interface{}
cmdArgs := make([]interface{}, len(args))
for i, arg := range args {
@@ -795,6 +1068,31 @@ func (r *RedisClientImpl) GetDatabases() ([]RedisDBInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if r.isCluster && r.clusterClient != nil {
var totalKeys int64
var mu sync.Mutex
err := r.clusterClient.ForEachMaster(ctx, func(nodeCtx context.Context, node *redis.Client) error {
keys, err := node.DBSize(nodeCtx).Result()
if err != nil {
return err
}
mu.Lock()
totalKeys += keys
mu.Unlock()
return nil
})
if err != nil {
logger.Warnf("Redis 集群获取 key 数量失败,回退为 0: %v", err)
totalKeys = 0
}
result := make([]RedisDBInfo, 16)
for i := 0; i < 16; i++ {
result[i] = RedisDBInfo{Index: i, Keys: 0}
}
result[0].Keys = totalKeys
return result, nil
}
// Get keyspace info
info, err := r.client.Info(ctx, "keyspace").Result()
if err != nil {
@@ -845,34 +1143,47 @@ func (r *RedisClientImpl) SelectDB(index int) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
if r.isCluster {
if index < 0 || index > 15 {
return fmt.Errorf("数据库索引必须在 0-15 之间")
}
r.currentDB = index
r.config.RedisDB = index
return nil
}
if index < 0 || index > 15 {
return fmt.Errorf("数据库索引必须在 0-15 之间")
}
// Create new client with different DB
addr := fmt.Sprintf("%s:%d", r.config.Host, r.config.Port)
addr := ""
if len(r.seedAddrs) > 0 {
addr = r.seedAddrs[0]
}
if r.forwarder != nil {
addr = r.forwarder.LocalAddr
}
if addr == "" {
addr = fmt.Sprintf("%s:%d", r.config.Host, r.config.Port)
}
timeout := normalizeRedisTimeout(r.config.Timeout)
opts := &redis.Options{
Addr: addr,
Username: strings.TrimSpace(r.config.User),
Password: r.config.Password,
DB: index,
DialTimeout: time.Duration(r.config.Timeout) * time.Second,
ReadTimeout: time.Duration(r.config.Timeout) * time.Second,
WriteTimeout: time.Duration(r.config.Timeout) * time.Second,
}
if opts.DialTimeout == 0 {
opts.DialTimeout = 30 * time.Second
opts.ReadTimeout = 30 * time.Second
opts.WriteTimeout = 30 * time.Second
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
newClient := redis.NewClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), opts.DialTimeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := newClient.Ping(ctx).Err(); err != nil {
@@ -881,9 +1192,14 @@ func (r *RedisClientImpl) SelectDB(index int) error {
}
// Close old client and replace
r.client.Close()
if r.client != nil {
_ = r.client.Close()
}
r.client = newClient
r.singleClient = newClient
r.clusterClient = nil
r.currentDB = index
r.config.RedisDB = index
logger.Infof("Redis 切换到数据库: db%d", index)
return nil
@@ -899,6 +1215,63 @@ func (r *RedisClientImpl) FlushDB() error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
if r.isCluster && r.clusterClient != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
namespacePrefix := r.redisNamespacePrefix()
var deletedTotal int64
var deletedMu sync.Mutex
err := r.clusterClient.ForEachMaster(ctx, func(nodeCtx context.Context, node *redis.Client) error {
var cursor uint64
for {
pattern := "*"
if namespacePrefix != "" {
pattern = namespacePrefix + "*"
}
keys, nextCursor, err := node.Scan(nodeCtx, cursor, pattern, 2000).Result()
if err != nil {
return err
}
if namespacePrefix == "" {
filtered := keys[:0]
for _, key := range keys {
// db0 保留兼容:不删除逻辑库前缀 key避免误清理 db1~db15。
if strings.HasPrefix(key, "__gonavi_db_") {
continue
}
filtered = append(filtered, key)
}
keys = filtered
}
if len(keys) > 0 {
deleted, err := node.Del(nodeCtx, keys...).Result()
if err != nil {
return err
}
deletedMu.Lock()
deletedTotal += deleted
deletedMu.Unlock()
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return nil
})
if err != nil {
return err
}
logger.Infof("Redis 集群逻辑库清空完成: db%d deleted=%d", r.currentDB, deletedTotal)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.FlushDB(ctx).Err()