From b904c0b1079bf5e37b48cddbeeba6ea744c19dd5 Mon Sep 17 00:00:00 2001 From: Syngnat Date: Tue, 3 Mar 2026 09:42:49 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(redis-cluster):=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E9=9B=86=E7=BE=A4=E6=A8=A1=E5=BC=8F=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=A4=9A=E5=BA=93=E9=9A=94=E7=A6=BB=E4=B8=8E=200-15=20?= =?UTF-8?q?=E5=BA=93=E5=88=87=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 前端恢复 Redis 集群场景下 db0-db15 的数据库选择与展示 - 后端新增集群逻辑库命名空间前缀映射,统一 key/pattern 读写隔离 - 覆盖扫描、读取、写入、删除、重命名等核心操作的键映射规则 - 集群命令通道支持 SELECT 逻辑切库与 FLUSHDB 逻辑库清空 - refs #145 --- frontend/src/components/ConnectionModal.tsx | 129 ++++- frontend/src/store.ts | 2 +- frontend/src/types.ts | 2 +- internal/app/methods_redis.go | 19 +- internal/connection/types.go | 2 +- internal/redis/redis.go | 2 +- internal/redis/redis_impl.go | 521 +++++++++++++++++--- 7 files changed, 587 insertions(+), 90 deletions(-) diff --git a/frontend/src/components/ConnectionModal.tsx b/frontend/src/components/ConnectionModal.tsx index bf1c8ad..22aa35b 100644 --- a/frontend/src/components/ConnectionModal.tsx +++ b/frontend/src/components/ConnectionModal.tsx @@ -106,6 +106,7 @@ const ConnectionModal: React.FC<{ const mysqlTopology = Form.useWatch('mysqlTopology', form) || 'single'; const mongoTopology = Form.useWatch('mongoTopology', form) || 'single'; const mongoSrv = Form.useWatch('mongoSrv', form) || false; + const redisTopology = Form.useWatch('redisTopology', form) || 'single'; const getSectionBg = (darkHex: string) => { if (!darkMode) { @@ -449,6 +450,35 @@ const ConnectionModal: React.FC<{ return { host: normalizeFileDbPath(safeDecode(rawPath)) }; } + if (type === 'redis') { + const parsed = parseMultiHostUri(trimmedUri, 'redis'); + if (!parsed) { + return null; + } + if (!parsed.hosts.length || parsed.hosts.length > MAX_URI_HOSTS) { + return null; + } + if (parsed.hosts.some((entry) => !isValidUriHostEntry(entry))) { + return null; + } + const hostList = normalizeAddressList(parsed.hosts, 6379); + if (!hostList.length) { + return null; + } + const primary = parseHostPort(hostList[0] || 'localhost:6379', 6379); + const topologyParam = String(parsed.params.get('topology') || '').toLowerCase(); + const dbText = String(parsed.database || '').trim().replace(/^\//, ''); + const dbIndex = Number(dbText); + return { + host: primary?.host || 'localhost', + port: primary?.port || 6379, + password: parsed.password || '', + redisTopology: hostList.length > 1 || topologyParam === 'cluster' ? 'cluster' : 'single', + redisHosts: hostList.slice(1), + redisDB: Number.isFinite(dbIndex) && dbIndex >= 0 && dbIndex <= 15 ? Math.trunc(dbIndex) : 0, + }; + } + if (type === 'mongodb') { const parsed = parseMultiHostUri(trimmedUri, 'mongodb') || parseMultiHostUri(trimmedUri, 'mongodb+srv'); if (!parsed) { @@ -547,6 +577,9 @@ const ConnectionModal: React.FC<{ if (dbType === 'clickhouse') { return 'clickhouse://default:pass@127.0.0.1:9000/default'; } + if (dbType === 'redis') { + return 'redis://:pass@127.0.0.1:6379,127.0.0.2:6379/0?topology=cluster'; + } if (dbType === 'oracle') { return 'oracle://user:pass@127.0.0.1:1521/ORCLPDB1'; } @@ -585,6 +618,26 @@ const ConnectionModal: React.FC<{ return `${scheme}://${encodedAuth}${hosts.join(',')}${dbPath}${query ? `?${query}` : ''}`; } + if (type === 'redis') { + const primary = toAddress(host, port, 6379); + const clusterHosts = values.redisTopology === 'cluster' + ? normalizeAddressList(values.redisHosts, 6379) + : []; + const hosts = normalizeAddressList([primary, ...clusterHosts], 6379); + const params = new URLSearchParams(); + if (hosts.length > 1 || values.redisTopology === 'cluster') { + params.set('topology', 'cluster'); + } + const redisPassword = String(values.password || ''); + const redisAuth = redisPassword ? `:${encodeURIComponent(redisPassword)}@` : ''; + const redisDB = Number.isFinite(Number(values.redisDB)) + ? Math.max(0, Math.min(15, Math.trunc(Number(values.redisDB)))) + : 0; + const dbPath = `/${redisDB}`; + const query = params.toString(); + return `redis://${redisAuth}${hosts.join(',')}${dbPath}${query ? `?${query}` : ''}`; + } + if (isFileDatabaseType(type)) { const pathText = normalizeFileDbPath(String(values.host || '').trim()); if (!pathText) { @@ -770,8 +823,10 @@ const ConnectionModal: React.FC<{ : (primaryAddress?.port || Number(config.port || defaultPort)); const mysqlReplicaHosts = (configType === 'mysql' || configType === 'mariadb' || configType === 'diros' || configType === 'sphinx') ? normalizedHosts.slice(1) : []; const mongoHosts = configType === 'mongodb' ? normalizedHosts.slice(1) : []; + const redisHosts = configType === 'redis' ? normalizedHosts.slice(1) : []; const mysqlIsReplica = String(config.topology || '').toLowerCase() === 'replica' || mysqlReplicaHosts.length > 0; const mongoIsReplica = String(config.topology || '').toLowerCase() === 'replica' || mongoHosts.length > 0 || !!config.replicaSet; + const redisIsCluster = String(config.topology || '').toLowerCase() === 'cluster' || redisHosts.length > 0; form.setFieldsValue({ type: configType, name: initialValues.name, @@ -804,12 +859,15 @@ const ConnectionModal: React.FC<{ mysqlReplicaPassword: config.mysqlReplicaPassword || '', mongoTopology: mongoIsReplica ? 'replica' : 'single', mongoHosts: mongoHosts, + redisTopology: redisIsCluster ? 'cluster' : 'single', + redisHosts: redisHosts, mongoSrv: !!config.mongoSrv, mongoReplicaSet: config.replicaSet || '', mongoAuthSource: config.authSource || '', mongoReadPreference: config.readPreference || 'primary', mongoAuthMechanism: config.mongoAuthMechanism || '', savePassword: config.savePassword !== false, + redisDB: Number.isFinite(Number(config.redisDB)) ? Number(config.redisDB) : 0, mongoReplicaUser: config.mongoReplicaUser || '', mongoReplicaPassword: config.mongoReplicaPassword || '' }); @@ -924,7 +982,6 @@ const ConnectionModal: React.FC<{ if (res.success) { setTestResult({ type: 'success', message: res.message }); if (isRedisType) { - // Redis: generate database list 0-15 setRedisDbList(Array.from({ length: 16 }, (_, i) => i)); } else { // Other databases: fetch database list @@ -1033,7 +1090,7 @@ const ConnectionModal: React.FC<{ } let hosts: string[] = []; - let topology: 'single' | 'replica' | undefined; + let topology: 'single' | 'replica' | 'cluster' | undefined; let replicaSet = ''; let authSource = ''; let readPreference = ''; @@ -1087,6 +1144,22 @@ const ConnectionModal: React.FC<{ mongoAuthMechanism = String(mergedValues.mongoAuthMechanism || '').trim().toUpperCase(); } + if (type === 'redis') { + const clusterNodes = mergedValues.redisTopology === 'cluster' + ? normalizeAddressList(mergedValues.redisHosts, defaultPort) + : []; + const allHosts = normalizeAddressList([`${primaryHost}:${primaryPort}`, ...clusterNodes], defaultPort); + if (mergedValues.redisTopology === 'cluster' || allHosts.length > 1) { + hosts = allHosts; + topology = 'cluster'; + } else { + topology = 'single'; + } + mergedValues.redisDB = Number.isFinite(Number(mergedValues.redisDB)) + ? Math.max(0, Math.min(15, Math.trunc(Number(mergedValues.redisDB)))) + : 0; + } + const sshConfig = mergedValues.useSSH ? { host: mergedValues.sshHost, port: Number(mergedValues.sshPort), @@ -1128,6 +1201,9 @@ const ConnectionModal: React.FC<{ driver: mergedValues.driver, dsn: mergedValues.dsn, timeout: Number(mergedValues.timeout || 30), + redisDB: Number.isFinite(Number(mergedValues.redisDB)) + ? Math.max(0, Math.min(15, Math.trunc(Number(mergedValues.redisDB)))) + : 0, uri: String(mergedValues.uri || '').trim(), hosts: hosts, topology: topology, @@ -1178,6 +1254,7 @@ const ConnectionModal: React.FC<{ proxyUser: '', proxyPassword: '', mysqlTopology: 'single', + redisTopology: 'single', mongoTopology: 'single', mongoSrv: false, mongoReadPreference: 'primary', @@ -1186,11 +1263,13 @@ const ConnectionModal: React.FC<{ mongoAuthMechanism: '', savePassword: true, mysqlReplicaHosts: [], + redisHosts: [], mongoHosts: [], mysqlReplicaUser: '', mysqlReplicaPassword: '', mongoReplicaUser: '', mongoReplicaPassword: '', + redisDB: 0, }); } else if (type !== 'custom') { const defaultUser = type === 'clickhouse' ? 'default' : 'root'; @@ -1199,6 +1278,7 @@ const ConnectionModal: React.FC<{ database: '', port: defaultPort, mysqlTopology: 'single', + redisTopology: 'single', mongoTopology: 'single', mongoSrv: false, mongoReadPreference: 'primary', @@ -1207,11 +1287,13 @@ const ConnectionModal: React.FC<{ mongoAuthMechanism: '', savePassword: true, mysqlReplicaHosts: [], + redisHosts: [], mongoHosts: [], mysqlReplicaUser: '', mysqlReplicaPassword: '', mongoReplicaUser: '', mongoReplicaPassword: '', + redisDB: 0, }); } @@ -1346,17 +1428,20 @@ const ConnectionModal: React.FC<{ timeout: 30, uri: '', mysqlTopology: 'single', + redisTopology: 'single', mongoTopology: 'single', mongoSrv: false, mongoReadPreference: 'primary', mongoAuthMechanism: '', savePassword: true, mysqlReplicaHosts: [], + redisHosts: [], mongoHosts: [], mysqlReplicaUser: '', mysqlReplicaPassword: '', mongoReplicaUser: '', mongoReplicaPassword: '', + redisDB: 0, }} onValuesChange={(changed) => { if (testResult) { @@ -1384,6 +1469,17 @@ const ConnectionModal: React.FC<{ } // Type change handled by step 1, but keep sync if select changes (hidden now) if (changed.type !== undefined) setDbType(changed.type); + if (changed.redisTopology !== undefined) { + const supportedDbs = Array.from({ length: 16 }, (_, i) => i); + setRedisDbList(supportedDbs); + const selectedDbsRaw = form.getFieldValue('includeRedisDatabases'); + const selectedDbs = Array.isArray(selectedDbsRaw) ? selectedDbsRaw.map((entry: any) => Number(entry)) : []; + const validDbs = selectedDbs + .filter((entry: number) => Number.isFinite(entry)) + .map((entry: number) => Math.trunc(entry)) + .filter((entry: number) => supportedDbs.includes(entry)); + form.setFieldValue('includeRedisDatabases', validDbs.length > 0 ? validDbs : undefined); + } if ( changed.type !== undefined || changed.host !== undefined @@ -1657,11 +1753,36 @@ const ConnectionModal: React.FC<{ {/* Redis specific: password only, no username */} {isRedis && ( <> + + + + )} - - {redisDbList.map(db => db{db})} diff --git a/frontend/src/store.ts b/frontend/src/store.ts index beaea1b..4a4b320 100644 --- a/frontend/src/store.ts +++ b/frontend/src/store.ts @@ -199,7 +199,7 @@ const sanitizeConnectionConfig = (value: unknown): ConnectionConfig => { proxy, uri: toTrimmedString(raw.uri).slice(0, MAX_URI_LENGTH), hosts: sanitizeAddressList(raw.hosts), - topology: raw.topology === 'replica' ? 'replica' : 'single', + topology: raw.topology === 'replica' ? 'replica' : (raw.topology === 'cluster' ? 'cluster' : 'single'), mysqlReplicaUser: toTrimmedString(raw.mysqlReplicaUser), mysqlReplicaPassword: savePassword ? toTrimmedString(raw.mysqlReplicaPassword) : '', replicaSet: toTrimmedString(raw.replicaSet), diff --git a/frontend/src/types.ts b/frontend/src/types.ts index e8a6cb4..2bc8dac 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -32,7 +32,7 @@ export interface ConnectionConfig { redisDB?: number; // Redis database index (0-15) uri?: string; // Connection URI for copy/paste hosts?: string[]; // Multi-host addresses: host:port - topology?: 'single' | 'replica'; + topology?: 'single' | 'replica' | 'cluster'; mysqlReplicaUser?: string; mysqlReplicaPassword?: string; replicaSet?: string; diff --git a/internal/app/methods_redis.go b/internal/app/methods_redis.go index e88d79d..1b626b0 100644 --- a/internal/app/methods_redis.go +++ b/internal/app/methods_redis.go @@ -67,24 +67,27 @@ func getRedisClientCacheKey(config connection.ConnectionConfig) string { } func formatRedisConnSummary(config connection.ConnectionConfig) string { - timeoutSeconds := config.Timeout - if timeoutSeconds <= 0 { - timeoutSeconds = 30 - } - var b strings.Builder b.WriteString("类型=redis 地址=") b.WriteString(config.Host) b.WriteString(":") - b.WriteString(string(rune(config.Port + '0'))) + b.WriteString(strconv.Itoa(config.Port)) + if topology := strings.TrimSpace(config.Topology); topology != "" { + b.WriteString(" 模式=") + b.WriteString(topology) + } + if len(config.Hosts) > 0 { + b.WriteString(" 节点数=") + b.WriteString(strconv.Itoa(len(config.Hosts))) + } b.WriteString(" DB=") - b.WriteString(string(rune(config.RedisDB + '0'))) + b.WriteString(strconv.Itoa(config.RedisDB)) if config.UseSSH { b.WriteString(" SSH=") b.WriteString(config.SSH.Host) b.WriteString(":") - b.WriteString(string(rune(config.SSH.Port + '0'))) + b.WriteString(strconv.Itoa(config.SSH.Port)) b.WriteString(" 用户=") b.WriteString(config.SSH.User) } diff --git a/internal/connection/types.go b/internal/connection/types.go index cfc0253..20b4cbb 100644 --- a/internal/connection/types.go +++ b/internal/connection/types.go @@ -37,7 +37,7 @@ type ConnectionConfig struct { RedisDB int `json:"redisDB,omitempty"` // Redis database index (0-15) URI string `json:"uri,omitempty"` // Connection URI for copy/paste Hosts []string `json:"hosts,omitempty"` // Multi-host addresses: host:port - Topology string `json:"topology,omitempty"` // single | replica + Topology string `json:"topology,omitempty"` // single | replica | cluster MySQLReplicaUser string `json:"mysqlReplicaUser,omitempty"` // MySQL replica auth user MySQLReplicaPassword string `json:"mysqlReplicaPassword,omitempty"` // MySQL replica auth password ReplicaSet string `json:"replicaSet,omitempty"` // MongoDB replica set name diff --git a/internal/redis/redis.go b/internal/redis/redis.go index 80e58f6..d9e776b 100644 --- a/internal/redis/redis.go +++ b/internal/redis/redis.go @@ -12,7 +12,7 @@ type RedisValue struct { // RedisDBInfo represents information about a Redis database type RedisDBInfo struct { - Index int `json:"index"` // Database index (0-15) + Index int `json:"index"` // Database index (single: 0-15, cluster: logical 0-15) Keys int64 `json:"keys"` // Number of keys in this database } diff --git a/internal/redis/redis_impl.go b/internal/redis/redis_impl.go index 044f16d..f08b4f5 100644 --- a/internal/redis/redis_impl.go +++ b/internal/redis/redis_impl.go @@ -3,8 +3,10 @@ package redis import ( "context" "fmt" + "net" "strconv" "strings" + "sync" "time" "GoNavi-Wails/internal/connection" @@ -16,10 +18,14 @@ import ( // RedisClientImpl implements RedisClient using go-redis type RedisClientImpl struct { - client *redis.Client - config connection.ConnectionConfig - currentDB int - forwarder *ssh.LocalForwarder + client redis.UniversalClient + singleClient *redis.Client + clusterClient *redis.ClusterClient + config connection.ConnectionConfig + currentDB int + isCluster bool + seedAddrs []string + forwarder *ssh.LocalForwarder } const ( @@ -40,14 +46,183 @@ func NewRedisClient() RedisClient { return &RedisClientImpl{} } +func normalizeRedisTimeout(timeoutSeconds int) time.Duration { + if timeoutSeconds <= 0 { + return 30 * time.Second + } + return time.Duration(timeoutSeconds) * time.Second +} + +func normalizeRedisSeedAddress(raw string, defaultPort int) (string, error) { + addr := strings.TrimSpace(raw) + if addr == "" { + return "", fmt.Errorf("Redis 节点地址不能为空") + } + + if _, _, err := net.SplitHostPort(addr); err == nil { + return addr, nil + } + + if !strings.Contains(addr, ":") { + return net.JoinHostPort(addr, strconv.Itoa(defaultPort)), nil + } + + // 尝试兼容 host:port 但端口格式异常的场景。 + host, port, ok := strings.Cut(addr, ":") + if !ok { + return "", fmt.Errorf("无效 Redis 节点地址: %s", addr) + } + host = strings.TrimSpace(host) + port = strings.TrimSpace(port) + if host == "" { + return "", fmt.Errorf("无效 Redis 节点地址: %s", addr) + } + if _, err := strconv.Atoi(port); err != nil { + return "", fmt.Errorf("无效 Redis 端口: %s", addr) + } + return net.JoinHostPort(host, port), nil +} + +func buildRedisSeedAddrs(config connection.ConnectionConfig) ([]string, error) { + defaultPort := config.Port + if defaultPort <= 0 { + defaultPort = 6379 + } + + candidates := make([]string, 0, 1+len(config.Hosts)) + if strings.TrimSpace(config.Host) != "" { + candidates = append(candidates, fmt.Sprintf("%s:%d", strings.TrimSpace(config.Host), defaultPort)) + } + candidates = append(candidates, config.Hosts...) + + seen := make(map[string]struct{}, len(candidates)) + addrs := make([]string, 0, len(candidates)) + for _, candidate := range candidates { + normalized, err := normalizeRedisSeedAddress(candidate, defaultPort) + if err != nil { + return nil, err + } + if _, exists := seen[normalized]; exists { + continue + } + seen[normalized] = struct{}{} + addrs = append(addrs, normalized) + } + if len(addrs) == 0 { + return nil, fmt.Errorf("Redis 连接地址不能为空") + } + return addrs, nil +} + +func (r *RedisClientImpl) redisNamespacePrefixForDB(index int) string { + if !r.isCluster || index <= 0 { + return "" + } + // Redis Cluster 仅支持物理 db0;这里用固定前缀模拟逻辑库隔离。 + return fmt.Sprintf("__gonavi_db_%d__:", index) +} + +func (r *RedisClientImpl) redisNamespacePrefix() string { + return r.redisNamespacePrefixForDB(r.currentDB) +} + +func (r *RedisClientImpl) toPhysicalKey(key string) string { + trimmed := strings.TrimSpace(key) + if trimmed == "" { + return "" + } + prefix := r.redisNamespacePrefix() + if prefix == "" || strings.HasPrefix(trimmed, prefix) { + return trimmed + } + return prefix + trimmed +} + +func (r *RedisClientImpl) toPhysicalPattern(pattern string) string { + normalized := strings.TrimSpace(pattern) + if normalized == "" { + normalized = "*" + } + prefix := r.redisNamespacePrefix() + if prefix == "" { + return normalized + } + return prefix + normalized +} + +func (r *RedisClientImpl) toPhysicalKeys(keys []string) []string { + if len(keys) == 0 { + return nil + } + result := make([]string, 0, len(keys)) + for _, key := range keys { + physical := r.toPhysicalKey(key) + if physical == "" { + continue + } + result = append(result, physical) + } + return result +} + +func (r *RedisClientImpl) toDisplayKey(key string) string { + prefix := r.redisNamespacePrefix() + if prefix == "" { + return key + } + return strings.TrimPrefix(key, prefix) +} + // Connect establishes a connection to Redis func (r *RedisClientImpl) Connect(config connection.ConnectionConfig) error { r.config = config - r.currentDB = config.RedisDB + if r.config.RedisDB < 0 || r.config.RedisDB > 15 { + r.config.RedisDB = 0 + } + r.currentDB = r.config.RedisDB + r.forwarder = nil + r.client = nil + r.singleClient = nil + r.clusterClient = nil + r.isCluster = false - addr := fmt.Sprintf("%s:%d", config.Host, config.Port) + seedAddrs, err := buildRedisSeedAddrs(config) + if err != nil { + return err + } + r.seedAddrs = append([]string(nil), seedAddrs...) - // Handle SSH tunnel if enabled + topology := strings.ToLower(strings.TrimSpace(config.Topology)) + r.isCluster = topology == "cluster" || len(seedAddrs) > 1 + + if r.isCluster && config.UseSSH { + return fmt.Errorf("Redis 集群模式暂不支持 SSH 隧道,请关闭 SSH 后重试") + } + + timeout := normalizeRedisTimeout(config.Timeout) + if r.isCluster { + opts := &redis.ClusterOptions{ + Addrs: seedAddrs, + Username: strings.TrimSpace(config.User), + Password: config.Password, + DialTimeout: timeout, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + clusterClient := redis.NewClusterClient(opts) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := clusterClient.Ping(ctx).Err(); err != nil { + clusterClient.Close() + return fmt.Errorf("Redis 集群连接失败: %w", err) + } + r.client = clusterClient + r.clusterClient = clusterClient + logger.Infof("Redis 集群连接成功: seeds=%s 逻辑库=db%d", strings.Join(seedAddrs, ","), r.currentDB) + return nil + } + + addr := seedAddrs[0] if config.UseSSH { forwarder, err := ssh.GetOrCreateLocalForwarder(config.SSH, config.Host, config.Port) if err != nil { @@ -60,32 +235,26 @@ func (r *RedisClientImpl) Connect(config connection.ConnectionConfig) error { opts := &redis.Options{ Addr: addr, + Username: strings.TrimSpace(config.User), Password: config.Password, - DB: config.RedisDB, - DialTimeout: time.Duration(config.Timeout) * time.Second, - ReadTimeout: time.Duration(config.Timeout) * time.Second, - WriteTimeout: time.Duration(config.Timeout) * time.Second, + DB: r.currentDB, + DialTimeout: timeout, + ReadTimeout: timeout, + WriteTimeout: timeout, } - if opts.DialTimeout == 0 { - opts.DialTimeout = 30 * time.Second - opts.ReadTimeout = 30 * time.Second - opts.WriteTimeout = 30 * time.Second - } - - r.client = redis.NewClient(opts) - - // Test connection - ctx, cancel := context.WithTimeout(context.Background(), opts.DialTimeout) + singleClient := redis.NewClient(opts) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - if err := r.client.Ping(ctx).Err(); err != nil { - r.client.Close() - r.client = nil + if err := singleClient.Ping(ctx).Err(); err != nil { + singleClient.Close() return fmt.Errorf("Redis 连接失败: %w", err) } - logger.Infof("Redis 连接成功: %s DB=%d", addr, config.RedisDB) + r.client = singleClient + r.singleClient = singleClient + logger.Infof("Redis 连接成功: %s DB=%d", addr, r.currentDB) return nil } @@ -94,6 +263,11 @@ func (r *RedisClientImpl) Close() error { if r.client != nil { err := r.client.Close() r.client = nil + r.singleClient = nil + r.clusterClient = nil + r.isCluster = false + r.seedAddrs = nil + r.forwarder = nil return err } return nil @@ -118,6 +292,7 @@ func (r *RedisClientImpl) ScanKeys(pattern string, cursor uint64, count int64) ( if pattern == "" { pattern = "*" } + physicalPattern := r.toPhysicalPattern(pattern) isSearchPattern := pattern != "*" targetCount := normalizeRedisScanTargetCount(count) @@ -150,7 +325,7 @@ func (r *RedisClientImpl) ScanKeys(pattern string, cursor uint64, count int64) ( break } - batch, nextCursor, err := r.client.Scan(ctx, currentCursor, pattern, scanStepCount).Result() + batch, nextCursor, err := r.client.Scan(ctx, currentCursor, physicalPattern, scanStepCount).Result() if err != nil { return nil, err } @@ -226,7 +401,7 @@ func (r *RedisClientImpl) loadRedisKeyInfos(ctx context.Context, keys []string) ttlValue = -2 } result = append(result, RedisKeyInfo{ - Key: key, + Key: r.toDisplayKey(key), Type: keyType, TTL: toRedisTTLSeconds(ttlValue), }) @@ -236,7 +411,7 @@ func (r *RedisClientImpl) loadRedisKeyInfos(ctx context.Context, keys []string) for i, key := range keys { result = append(result, RedisKeyInfo{ - Key: key, + Key: r.toDisplayKey(key), Type: typeResults[i].Val(), TTL: toRedisTTLSeconds(ttlResults[i].Val()), }) @@ -261,7 +436,7 @@ func (r *RedisClientImpl) GetKeyType(key string) (string, error) { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.Type(ctx, key).Result() + return r.client.Type(ctx, r.toPhysicalKey(key)).Result() } // GetTTL returns the TTL of a key in seconds @@ -272,7 +447,7 @@ func (r *RedisClientImpl) GetTTL(key string) (int64, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - ttl, err := r.client.TTL(ctx, key).Result() + ttl, err := r.client.TTL(ctx, r.toPhysicalKey(key)).Result() if err != nil { return 0, err } @@ -295,9 +470,9 @@ func (r *RedisClientImpl) SetTTL(key string, ttl int64) error { if ttl < 0 { // Remove expiry - return r.client.Persist(ctx, key).Err() + return r.client.Persist(ctx, r.toPhysicalKey(key)).Err() } - return r.client.Expire(ctx, key, time.Duration(ttl)*time.Second).Err() + return r.client.Expire(ctx, r.toPhysicalKey(key), time.Duration(ttl)*time.Second).Err() } // DeleteKeys deletes one or more keys @@ -307,7 +482,11 @@ func (r *RedisClientImpl) DeleteKeys(keys []string) (int64, error) { } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - return r.client.Del(ctx, keys...).Result() + physicalKeys := r.toPhysicalKeys(keys) + if len(physicalKeys) == 0 { + return 0, nil + } + return r.client.Del(ctx, physicalKeys...).Result() } // RenameKey renames a key @@ -317,7 +496,7 @@ func (r *RedisClientImpl) RenameKey(oldKey, newKey string) error { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.Rename(ctx, oldKey, newKey).Err() + return r.client.Rename(ctx, r.toPhysicalKey(oldKey), r.toPhysicalKey(newKey)).Err() } // KeyExists checks if a key exists @@ -327,7 +506,7 @@ func (r *RedisClientImpl) KeyExists(key string) (bool, error) { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - n, err := r.client.Exists(ctx, key).Result() + n, err := r.client.Exists(ctx, r.toPhysicalKey(key)).Result() return n > 0, err } @@ -343,6 +522,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { } ttl, _ := r.GetTTL(key) + physicalKey := r.toPhysicalKey(key) result := &RedisValue{ Type: keyType, @@ -354,7 +534,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { switch keyType { case "string": - val, err := r.client.Get(ctx, key).Result() + val, err := r.client.Get(ctx, physicalKey).Result() if err != nil { return nil, err } @@ -362,7 +542,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { result.Length = int64(len(val)) case "hash": - val, err := r.client.HGetAll(ctx, key).Result() + val, err := r.client.HGetAll(ctx, physicalKey).Result() if err != nil { return nil, err } @@ -370,7 +550,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { result.Length = int64(len(val)) case "list": - length, err := r.client.LLen(ctx, key).Result() + length, err := r.client.LLen(ctx, physicalKey).Result() if err != nil { return nil, err } @@ -379,7 +559,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { if length < limit { limit = length } - val, err := r.client.LRange(ctx, key, 0, limit-1).Result() + val, err := r.client.LRange(ctx, physicalKey, 0, limit-1).Result() if err != nil { return nil, err } @@ -387,12 +567,12 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { result.Length = length case "set": - length, err := r.client.SCard(ctx, key).Result() + length, err := r.client.SCard(ctx, physicalKey).Result() if err != nil { return nil, err } // Get members using SMembers (limited by Redis server) - members, err := r.client.SMembers(ctx, key).Result() + members, err := r.client.SMembers(ctx, physicalKey).Result() if err != nil { return nil, err } @@ -400,7 +580,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { result.Length = length case "zset": - length, err := r.client.ZCard(ctx, key).Result() + length, err := r.client.ZCard(ctx, physicalKey).Result() if err != nil { return nil, err } @@ -409,7 +589,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { if length < limit { limit = length } - val, err := r.client.ZRangeWithScores(ctx, key, 0, limit-1).Result() + val, err := r.client.ZRangeWithScores(ctx, physicalKey, 0, limit-1).Result() if err != nil { return nil, err } @@ -424,7 +604,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { result.Length = length case "stream": - length, err := r.client.XLen(ctx, key).Result() + length, err := r.client.XLen(ctx, physicalKey).Result() if err != nil { return nil, err } @@ -437,7 +617,7 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) { if length < limit { limit = length } - val, err := r.client.XRangeN(ctx, key, "-", "+", limit).Result() + val, err := r.client.XRangeN(ctx, physicalKey, "-", "+", limit).Result() if err != nil { return nil, err } @@ -457,7 +637,7 @@ func (r *RedisClientImpl) GetString(key string) (string, error) { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.Get(ctx, key).Result() + return r.client.Get(ctx, r.toPhysicalKey(key)).Result() } // SetString sets a string value with optional TTL @@ -472,7 +652,7 @@ func (r *RedisClientImpl) SetString(key, value string, ttl int64) error { if ttl > 0 { expiration = time.Duration(ttl) * time.Second } - return r.client.Set(ctx, key, value, expiration).Err() + return r.client.Set(ctx, r.toPhysicalKey(key), value, expiration).Err() } // GetHash gets all fields of a hash @@ -482,7 +662,7 @@ func (r *RedisClientImpl) GetHash(key string) (map[string]string, error) { } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - return r.client.HGetAll(ctx, key).Result() + return r.client.HGetAll(ctx, r.toPhysicalKey(key)).Result() } // SetHashField sets a field in a hash @@ -492,7 +672,7 @@ func (r *RedisClientImpl) SetHashField(key, field, value string) error { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.HSet(ctx, key, field, value).Err() + return r.client.HSet(ctx, r.toPhysicalKey(key), field, value).Err() } // DeleteHashField deletes fields from a hash @@ -502,7 +682,7 @@ func (r *RedisClientImpl) DeleteHashField(key string, fields ...string) error { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.HDel(ctx, key, fields...).Err() + return r.client.HDel(ctx, r.toPhysicalKey(key), fields...).Err() } // GetList gets a range of elements from a list @@ -512,7 +692,7 @@ func (r *RedisClientImpl) GetList(key string, start, stop int64) ([]string, erro } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - return r.client.LRange(ctx, key, start, stop).Result() + return r.client.LRange(ctx, r.toPhysicalKey(key), start, stop).Result() } // ListPush pushes values to the end of a list @@ -526,7 +706,7 @@ func (r *RedisClientImpl) ListPush(key string, values ...string) error { for i, v := range values { args[i] = v } - return r.client.RPush(ctx, key, args...).Err() + return r.client.RPush(ctx, r.toPhysicalKey(key), args...).Err() } // ListSet sets the value at an index in a list @@ -536,7 +716,7 @@ func (r *RedisClientImpl) ListSet(key string, index int64, value string) error { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.LSet(ctx, key, index, value).Err() + return r.client.LSet(ctx, r.toPhysicalKey(key), index, value).Err() } // GetSet gets all members of a set @@ -546,7 +726,7 @@ func (r *RedisClientImpl) GetSet(key string) ([]string, error) { } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - return r.client.SMembers(ctx, key).Result() + return r.client.SMembers(ctx, r.toPhysicalKey(key)).Result() } // SetAdd adds members to a set @@ -560,7 +740,7 @@ func (r *RedisClientImpl) SetAdd(key string, members ...string) error { for i, m := range members { args[i] = m } - return r.client.SAdd(ctx, key, args...).Err() + return r.client.SAdd(ctx, r.toPhysicalKey(key), args...).Err() } // SetRemove removes members from a set @@ -574,7 +754,7 @@ func (r *RedisClientImpl) SetRemove(key string, members ...string) error { for i, m := range members { args[i] = m } - return r.client.SRem(ctx, key, args...).Err() + return r.client.SRem(ctx, r.toPhysicalKey(key), args...).Err() } // GetZSet gets members with scores from a sorted set @@ -585,7 +765,7 @@ func (r *RedisClientImpl) GetZSet(key string, start, stop int64) ([]ZSetMember, ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - val, err := r.client.ZRangeWithScores(ctx, key, start, stop).Result() + val, err := r.client.ZRangeWithScores(ctx, r.toPhysicalKey(key), start, stop).Result() if err != nil { return nil, err } @@ -615,7 +795,7 @@ func (r *RedisClientImpl) ZSetAdd(key string, members ...ZSetMember) error { Member: m.Member, } } - return r.client.ZAdd(ctx, key, zMembers...).Err() + return r.client.ZAdd(ctx, r.toPhysicalKey(key), zMembers...).Err() } // ZSetRemove removes members from a sorted set @@ -629,7 +809,7 @@ func (r *RedisClientImpl) ZSetRemove(key string, members ...string) error { for i, m := range members { args[i] = m } - return r.client.ZRem(ctx, key, args...).Err() + return r.client.ZRem(ctx, r.toPhysicalKey(key), args...).Err() } // GetStream gets stream entries in a range @@ -650,7 +830,7 @@ func (r *RedisClientImpl) GetStream(key, start, stop string, count int64) ([]Str ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - val, err := r.client.XRangeN(ctx, key, start, stop, count).Result() + val, err := r.client.XRangeN(ctx, r.toPhysicalKey(key), start, stop, count).Result() if err != nil { return nil, err } @@ -678,7 +858,7 @@ func (r *RedisClientImpl) StreamAdd(key string, fields map[string]string, id str defer cancel() newID, err := r.client.XAdd(ctx, &redis.XAddArgs{ - Stream: key, + Stream: r.toPhysicalKey(key), ID: id, Values: values, }).Result() @@ -699,7 +879,7 @@ func (r *RedisClientImpl) StreamDelete(key string, ids ...string) (int64, error) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return r.client.XDel(ctx, key, ids...).Result() + return r.client.XDel(ctx, r.toPhysicalKey(key), ids...).Result() } func toStreamEntries(messages []redis.XMessage) []StreamEntry { @@ -717,6 +897,72 @@ func toStreamEntries(messages []redis.XMessage) []StreamEntry { return entries } +func parseRedisCommandGetKeysResult(result interface{}) []string { + items, ok := result.([]interface{}) + if !ok || len(items) == 0 { + return nil + } + keys := make([]string, 0, len(items)) + for _, item := range items { + switch v := item.(type) { + case string: + if v != "" { + keys = append(keys, v) + } + case []byte: + text := string(v) + if text != "" { + keys = append(keys, text) + } + } + } + return keys +} + +func (r *RedisClientImpl) rewriteCommandArgsForNamespace(ctx context.Context, args []string) []string { + if !r.isCluster || r.currentDB <= 0 || len(args) == 0 { + return args + } + + command := strings.ToUpper(strings.TrimSpace(args[0])) + if command == "COMMAND" || command == "SELECT" || command == "FLUSHDB" { + return args + } + + probeArgs := make([]interface{}, 0, len(args)+2) + probeArgs = append(probeArgs, "COMMAND", "GETKEYS") + for _, arg := range args { + probeArgs = append(probeArgs, arg) + } + + result, err := r.client.Do(ctx, probeArgs...).Result() + if err != nil { + return args + } + + keyCandidates := parseRedisCommandGetKeysResult(result) + if len(keyCandidates) == 0 { + return args + } + + rewritten := append([]string(nil), args...) + used := make([]bool, len(rewritten)) + for _, key := range keyCandidates { + for i := 1; i < len(rewritten); i++ { + if used[i] { + continue + } + if rewritten[i] != key { + continue + } + rewritten[i] = r.toPhysicalKey(rewritten[i]) + used[i] = true + break + } + } + return rewritten +} + // ExecuteCommand executes a raw Redis command func (r *RedisClientImpl) ExecuteCommand(args []string) (interface{}, error) { if r.client == nil { @@ -729,6 +975,33 @@ func (r *RedisClientImpl) ExecuteCommand(args []string) (interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + if r.isCluster { + command := strings.ToUpper(strings.TrimSpace(args[0])) + switch command { + case "SELECT": + if len(args) < 2 { + return nil, fmt.Errorf("SELECT 命令缺少数据库索引") + } + index, err := strconv.Atoi(strings.TrimSpace(args[1])) + if err != nil { + return nil, fmt.Errorf("无效数据库索引: %s", args[1]) + } + if index < 0 || index > 15 { + return nil, fmt.Errorf("数据库索引必须在 0-15 之间") + } + r.currentDB = index + r.config.RedisDB = index + return "OK", nil + case "FLUSHDB": + if err := r.FlushDB(); err != nil { + return nil, err + } + return "OK", nil + } + } + + args = r.rewriteCommandArgsForNamespace(ctx, args) + // Convert to []interface{} cmdArgs := make([]interface{}, len(args)) for i, arg := range args { @@ -795,6 +1068,31 @@ func (r *RedisClientImpl) GetDatabases() ([]RedisDBInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + if r.isCluster && r.clusterClient != nil { + var totalKeys int64 + var mu sync.Mutex + err := r.clusterClient.ForEachMaster(ctx, func(nodeCtx context.Context, node *redis.Client) error { + keys, err := node.DBSize(nodeCtx).Result() + if err != nil { + return err + } + mu.Lock() + totalKeys += keys + mu.Unlock() + return nil + }) + if err != nil { + logger.Warnf("Redis 集群获取 key 数量失败,回退为 0: %v", err) + totalKeys = 0 + } + result := make([]RedisDBInfo, 16) + for i := 0; i < 16; i++ { + result[i] = RedisDBInfo{Index: i, Keys: 0} + } + result[0].Keys = totalKeys + return result, nil + } + // Get keyspace info info, err := r.client.Info(ctx, "keyspace").Result() if err != nil { @@ -845,34 +1143,47 @@ func (r *RedisClientImpl) SelectDB(index int) error { if r.client == nil { return fmt.Errorf("Redis 客户端未连接") } + + if r.isCluster { + if index < 0 || index > 15 { + return fmt.Errorf("数据库索引必须在 0-15 之间") + } + r.currentDB = index + r.config.RedisDB = index + return nil + } + if index < 0 || index > 15 { return fmt.Errorf("数据库索引必须在 0-15 之间") } // Create new client with different DB - addr := fmt.Sprintf("%s:%d", r.config.Host, r.config.Port) + addr := "" + if len(r.seedAddrs) > 0 { + addr = r.seedAddrs[0] + } if r.forwarder != nil { addr = r.forwarder.LocalAddr } + if addr == "" { + addr = fmt.Sprintf("%s:%d", r.config.Host, r.config.Port) + } + + timeout := normalizeRedisTimeout(r.config.Timeout) opts := &redis.Options{ Addr: addr, + Username: strings.TrimSpace(r.config.User), Password: r.config.Password, DB: index, - DialTimeout: time.Duration(r.config.Timeout) * time.Second, - ReadTimeout: time.Duration(r.config.Timeout) * time.Second, - WriteTimeout: time.Duration(r.config.Timeout) * time.Second, - } - - if opts.DialTimeout == 0 { - opts.DialTimeout = 30 * time.Second - opts.ReadTimeout = 30 * time.Second - opts.WriteTimeout = 30 * time.Second + DialTimeout: timeout, + ReadTimeout: timeout, + WriteTimeout: timeout, } newClient := redis.NewClient(opts) - ctx, cancel := context.WithTimeout(context.Background(), opts.DialTimeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if err := newClient.Ping(ctx).Err(); err != nil { @@ -881,9 +1192,14 @@ func (r *RedisClientImpl) SelectDB(index int) error { } // Close old client and replace - r.client.Close() + if r.client != nil { + _ = r.client.Close() + } r.client = newClient + r.singleClient = newClient + r.clusterClient = nil r.currentDB = index + r.config.RedisDB = index logger.Infof("Redis 切换到数据库: db%d", index) return nil @@ -899,6 +1215,63 @@ func (r *RedisClientImpl) FlushDB() error { if r.client == nil { return fmt.Errorf("Redis 客户端未连接") } + + if r.isCluster && r.clusterClient != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + namespacePrefix := r.redisNamespacePrefix() + var deletedTotal int64 + var deletedMu sync.Mutex + + err := r.clusterClient.ForEachMaster(ctx, func(nodeCtx context.Context, node *redis.Client) error { + var cursor uint64 + for { + pattern := "*" + if namespacePrefix != "" { + pattern = namespacePrefix + "*" + } + keys, nextCursor, err := node.Scan(nodeCtx, cursor, pattern, 2000).Result() + if err != nil { + return err + } + + if namespacePrefix == "" { + filtered := keys[:0] + for _, key := range keys { + // db0 保留兼容:不删除逻辑库前缀 key,避免误清理 db1~db15。 + if strings.HasPrefix(key, "__gonavi_db_") { + continue + } + filtered = append(filtered, key) + } + keys = filtered + } + + if len(keys) > 0 { + deleted, err := node.Del(nodeCtx, keys...).Result() + if err != nil { + return err + } + deletedMu.Lock() + deletedTotal += deleted + deletedMu.Unlock() + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + return nil + }) + if err != nil { + return err + } + logger.Infof("Redis 集群逻辑库清空完成: db%d deleted=%d", r.currentDB, deletedTotal) + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() return r.client.FlushDB(ctx).Err()