mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-05-06 20:03:05 +08:00
- 统一同库同步与跨库迁移入口,补充模式区分与风险提示 - 扩展 ClickHouse 与 PG-like 双向迁移,并新增 PG-like、ClickHouse、TDengine 到 MongoDB 的迁移路由 - 完善 TDengine 目标端建表规划、回归测试与需求追踪文档 - refs #51
1316 lines
43 KiB
Go
1316 lines
43 KiB
Go
package sync
|
||
|
||
import (
|
||
"GoNavi-Wails/internal/connection"
|
||
"GoNavi-Wails/internal/db"
|
||
redispkg "GoNavi-Wails/internal/redis"
|
||
"encoding/json"
|
||
"fmt"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
)
|
||
|
||
type redisMigrationClient interface {
|
||
Connect(config connection.ConnectionConfig) error
|
||
Close() error
|
||
ScanKeys(pattern string, cursor uint64, count int64) (*redispkg.RedisScanResult, error)
|
||
GetKeyType(key string) (string, error)
|
||
GetValue(key string) (*redispkg.RedisValue, error)
|
||
DeleteKeys(keys []string) (int64, error)
|
||
SetTTL(key string, ttl int64) error
|
||
SetString(key, value string, ttl int64) error
|
||
SetHashField(key, field, value string) error
|
||
ListPush(key string, values ...string) error
|
||
SetAdd(key string, members ...string) error
|
||
ZSetAdd(key string, members ...redispkg.ZSetMember) error
|
||
StreamAdd(key string, fields map[string]string, id string) (string, error)
|
||
}
|
||
|
||
var newSyncDatabase = db.NewDatabase
|
||
var newRedisSourceClient = func() redisMigrationClient { return redispkg.NewRedisClient() }
|
||
|
||
func isRedisToMongoKeyspacePair(config SyncConfig) bool {
|
||
return resolveMigrationDBType(config.SourceConfig) == "redis" && resolveMigrationDBType(config.TargetConfig) == "mongodb"
|
||
}
|
||
|
||
func resolveRedisDBIndex(config connection.ConnectionConfig) int {
|
||
if config.RedisDB >= 0 && config.RedisDB <= 15 {
|
||
return config.RedisDB
|
||
}
|
||
if text := strings.TrimSpace(config.Database); text != "" {
|
||
if idx, err := strconv.Atoi(text); err == nil && idx >= 0 && idx <= 15 {
|
||
return idx
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func withResolvedRedisDB(config connection.ConnectionConfig) connection.ConnectionConfig {
|
||
next := config
|
||
next.Type = "redis"
|
||
next.RedisDB = resolveRedisDBIndex(config)
|
||
return next
|
||
}
|
||
|
||
func resolveMongoCollectionName(config SyncConfig) string {
|
||
if name := strings.TrimSpace(config.MongoCollectionName); name != "" {
|
||
return name
|
||
}
|
||
if resolveMigrationDBType(config.SourceConfig) == "redis" {
|
||
return fmt.Sprintf("redis_db_%d_keys", resolveRedisDBIndex(config.SourceConfig))
|
||
}
|
||
return fmt.Sprintf("redis_db_%d_keys", resolveRedisDBIndex(config.TargetConfig))
|
||
}
|
||
|
||
func deriveRedisMongoCollectionName(config SyncConfig) string {
|
||
return resolveMongoCollectionName(config)
|
||
}
|
||
|
||
func buildRedisToMongoPlan(config SyncConfig, keyName string, targetDB db.Database) (SchemaMigrationPlan, error) {
|
||
collection := deriveRedisMongoCollectionName(config)
|
||
plan := SchemaMigrationPlan{
|
||
SourceSchema: strconv.Itoa(resolveRedisDBIndex(config.SourceConfig)),
|
||
SourceTable: keyName,
|
||
SourceQueryTable: keyName,
|
||
TargetSchema: strings.TrimSpace(config.TargetConfig.Database),
|
||
TargetTable: collection,
|
||
TargetQueryTable: collection,
|
||
PlannedAction: "按 Redis Key 生成 MongoDB 文档导入",
|
||
Warnings: []string{"Redis -> MongoDB 按 keyspace 语义迁移,不执行表级 schema 校验", "Redis TTL/集合顺序等语义会按文档字段保留,不保证与原系统完全等价"},
|
||
UnsupportedObjects: []string{"Redis Consumer Group / PubSub / Lua 脚本 / 事务状态当前不迁移"},
|
||
}
|
||
exists, err := inspectMongoCollection(targetDB, plan.TargetSchema, collection)
|
||
if err != nil {
|
||
return plan, fmt.Errorf("检查目标集合失败: %w", err)
|
||
}
|
||
plan.TargetTableExists = exists
|
||
strategy := normalizeTargetTableStrategy(config.TargetTableStrategy)
|
||
if exists {
|
||
return dedupeSchemaMigrationPlan(plan), nil
|
||
}
|
||
if strategy == "existing_only" {
|
||
plan.PlannedAction = "目标集合不存在,需先手工创建"
|
||
plan.Warnings = append(plan.Warnings, "当前策略要求目标集合已存在,执行时不会自动建集合")
|
||
return dedupeSchemaMigrationPlan(plan), nil
|
||
}
|
||
createCommand, err := buildMongoCreateCollectionCommand(collection)
|
||
if err != nil {
|
||
return plan, err
|
||
}
|
||
plan.AutoCreate = true
|
||
plan.PlannedAction = "目标集合不存在,将自动创建集合后导入"
|
||
plan.PreDataSQL = []string{createCommand}
|
||
return dedupeSchemaMigrationPlan(plan), nil
|
||
}
|
||
|
||
func listRedisMigrationKeys(client redisMigrationClient, selected []string) ([]string, error) {
|
||
if len(selected) > 0 {
|
||
return dedupeStrings(selected), nil
|
||
}
|
||
cursor := uint64(0)
|
||
keys := make([]string, 0, 64)
|
||
seen := map[string]struct{}{}
|
||
for {
|
||
result, err := client.ScanKeys("*", cursor, 1000)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if result != nil {
|
||
for _, item := range result.Keys {
|
||
key := strings.TrimSpace(item.Key)
|
||
if key == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[key]; ok {
|
||
continue
|
||
}
|
||
seen[key] = struct{}{}
|
||
keys = append(keys, key)
|
||
}
|
||
if strings.TrimSpace(result.Cursor) == "" || strings.TrimSpace(result.Cursor) == "0" {
|
||
break
|
||
}
|
||
next, err := strconv.ParseUint(strings.TrimSpace(result.Cursor), 10, 64)
|
||
if err != nil || next == cursor {
|
||
break
|
||
}
|
||
cursor = next
|
||
continue
|
||
}
|
||
break
|
||
}
|
||
sort.Strings(keys)
|
||
return keys, nil
|
||
}
|
||
|
||
func buildRedisMongoDocument(dbIndex int, key string, value *redispkg.RedisValue) map[string]interface{} {
|
||
doc := map[string]interface{}{
|
||
"_id": fmt.Sprintf("db%d:%s", dbIndex, key),
|
||
"redisDb": dbIndex,
|
||
"key": key,
|
||
"source": "redis",
|
||
}
|
||
if value == nil {
|
||
return doc
|
||
}
|
||
doc["type"] = value.Type
|
||
doc["ttl"] = value.TTL
|
||
doc["length"] = value.Length
|
||
doc["value"] = normalizeRedisMongoValue(value.Value)
|
||
return doc
|
||
}
|
||
|
||
func normalizeRedisMongoValue(value interface{}) interface{} {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return nil
|
||
case []byte:
|
||
return string(typed)
|
||
case map[string]string:
|
||
result := make(map[string]interface{}, len(typed))
|
||
for k, v := range typed {
|
||
result[k] = v
|
||
}
|
||
return result
|
||
case []string:
|
||
result := make([]interface{}, 0, len(typed))
|
||
for _, item := range typed {
|
||
result = append(result, item)
|
||
}
|
||
return result
|
||
case []redispkg.ZSetMember:
|
||
result := make([]map[string]interface{}, 0, len(typed))
|
||
for _, item := range typed {
|
||
result = append(result, map[string]interface{}{"member": item.Member, "score": item.Score})
|
||
}
|
||
return result
|
||
case []redispkg.StreamEntry:
|
||
result := make([]map[string]interface{}, 0, len(typed))
|
||
for _, item := range typed {
|
||
fields := make(map[string]interface{}, len(item.Fields))
|
||
for k, v := range item.Fields {
|
||
fields[k] = v
|
||
}
|
||
result = append(result, map[string]interface{}{"id": item.ID, "fields": fields})
|
||
}
|
||
return result
|
||
case map[string]interface{}:
|
||
result := make(map[string]interface{}, len(typed))
|
||
for k, v := range typed {
|
||
result[k] = normalizeRedisMongoValue(v)
|
||
}
|
||
return result
|
||
case []interface{}:
|
||
result := make([]interface{}, 0, len(typed))
|
||
for _, item := range typed {
|
||
result = append(result, normalizeRedisMongoValue(item))
|
||
}
|
||
return result
|
||
default:
|
||
return typed
|
||
}
|
||
}
|
||
|
||
func buildRedisMongoExistingDocsQuery(collection string, ids []string) (string, error) {
|
||
command := map[string]interface{}{
|
||
"find": collection,
|
||
"filter": map[string]interface{}{
|
||
"_id": map[string]interface{}{"$in": ids},
|
||
},
|
||
}
|
||
data, err := json.Marshal(command)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return string(data), nil
|
||
}
|
||
|
||
func loadExistingRedisMongoDocs(targetDB db.Database, collection string, ids []string) (map[string]map[string]interface{}, error) {
|
||
result := make(map[string]map[string]interface{}, len(ids))
|
||
if len(ids) == 0 {
|
||
return result, nil
|
||
}
|
||
query, err := buildRedisMongoExistingDocsQuery(collection, ids)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
rows, _, err := targetDB.Query(query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for _, row := range rows {
|
||
id := strings.TrimSpace(fmt.Sprintf("%v", row["_id"]))
|
||
if id == "" || id == "<nil>" {
|
||
continue
|
||
}
|
||
result[id] = row
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
func buildRedisMongoChanges(config SyncConfig, keys []string, client redisMigrationClient, targetDB db.Database, collection string) (connection.ChangeSet, []map[string]interface{}, error) {
|
||
changeSet := connection.ChangeSet{Inserts: []map[string]interface{}{}, Updates: []connection.UpdateRow{}, Deletes: []map[string]interface{}{}}
|
||
documents := make([]map[string]interface{}, 0, len(keys))
|
||
dbIndex := resolveRedisDBIndex(config.SourceConfig)
|
||
for _, key := range keys {
|
||
value, err := client.GetValue(key)
|
||
if err != nil {
|
||
return changeSet, nil, fmt.Errorf("读取 Redis Key 失败: key=%s err=%w", key, err)
|
||
}
|
||
documents = append(documents, buildRedisMongoDocument(dbIndex, key, value))
|
||
}
|
||
ids := make([]string, 0, len(documents))
|
||
for _, doc := range documents {
|
||
ids = append(ids, fmt.Sprintf("%v", doc["_id"]))
|
||
}
|
||
existing, err := loadExistingRedisMongoDocs(targetDB, collection, ids)
|
||
if err != nil {
|
||
return changeSet, nil, err
|
||
}
|
||
mode := normalizeSyncMode(config.Mode)
|
||
for _, doc := range documents {
|
||
id := fmt.Sprintf("%v", doc["_id"])
|
||
existingDoc, ok := existing[id]
|
||
if !ok {
|
||
changeSet.Inserts = append(changeSet.Inserts, doc)
|
||
continue
|
||
}
|
||
if mode == "insert_only" {
|
||
continue
|
||
}
|
||
values := cloneMapWithoutKeys(doc, "_id")
|
||
if sameRedisMongoDocument(existingDoc, doc) {
|
||
continue
|
||
}
|
||
changeSet.Updates = append(changeSet.Updates, connection.UpdateRow{Keys: map[string]interface{}{"_id": id}, Values: values})
|
||
}
|
||
return changeSet, documents, nil
|
||
}
|
||
|
||
func sameRedisMongoDocument(existing map[string]interface{}, desired map[string]interface{}) bool {
|
||
for k, v := range desired {
|
||
if k == "_id" {
|
||
continue
|
||
}
|
||
if fmt.Sprintf("%v", normalizeRedisMongoValue(v)) != fmt.Sprintf("%v", normalizeRedisMongoValue(existing[k])) {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
func cloneMapWithoutKeys(input map[string]interface{}, skipKeys ...string) map[string]interface{} {
|
||
skip := make(map[string]struct{}, len(skipKeys))
|
||
for _, key := range skipKeys {
|
||
skip[key] = struct{}{}
|
||
}
|
||
result := make(map[string]interface{}, len(input))
|
||
for k, v := range input {
|
||
if _, ok := skip[k]; ok {
|
||
continue
|
||
}
|
||
result[k] = v
|
||
}
|
||
return result
|
||
}
|
||
|
||
func (s *SyncEngine) runRedisToMongoSync(config SyncConfig, result SyncResult) SyncResult {
|
||
tables := config.Tables
|
||
strategy := normalizeTargetTableStrategy(config.TargetTableStrategy)
|
||
mode := normalizeSyncMode(config.Mode)
|
||
s.progress(config.JobID, 0, len(tables), "", "开始 Redis 键空间迁移")
|
||
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("Redis -> MongoDB 键空间迁移;模式:%s;目标策略:%s", mode, strategy))
|
||
if mode == "full_overwrite" {
|
||
s.appendLog(config.JobID, &result, "warn", "Redis -> MongoDB 第一版暂不执行集合级 full_overwrite 删除,已降级为 insert_update")
|
||
}
|
||
|
||
sourceClient := newRedisSourceClient()
|
||
sourceConfig := withResolvedRedisDB(config.SourceConfig)
|
||
if err := sourceClient.Connect(sourceConfig); err != nil {
|
||
return s.fail(config.JobID, len(tables), result, "源 Redis 连接失败: "+err.Error())
|
||
}
|
||
defer sourceClient.Close()
|
||
|
||
targetDB, err := newSyncDatabase(config.TargetConfig.Type)
|
||
if err != nil {
|
||
return s.fail(config.JobID, len(tables), result, "初始化目标数据库驱动失败: "+err.Error())
|
||
}
|
||
if err := targetDB.Connect(config.TargetConfig); err != nil {
|
||
return s.fail(config.JobID, len(tables), result, "目标数据库连接失败: "+err.Error())
|
||
}
|
||
defer targetDB.Close()
|
||
|
||
keys, err := listRedisMigrationKeys(sourceClient, config.Tables)
|
||
if err != nil {
|
||
return s.fail(config.JobID, len(tables), result, "扫描 Redis Key 失败: "+err.Error())
|
||
}
|
||
if len(keys) == 0 {
|
||
result.Message = "未发现可迁移的 Redis Key"
|
||
s.progress(config.JobID, 0, 0, "", "同步完成")
|
||
return result
|
||
}
|
||
totalKeys := len(keys)
|
||
collection := deriveRedisMongoCollectionName(config)
|
||
plan, err := buildRedisToMongoPlan(config, firstNonEmpty(keys[0], collection), targetDB)
|
||
if err != nil {
|
||
return s.fail(config.JobID, totalKeys, result, err.Error())
|
||
}
|
||
for _, warning := range plan.Warnings {
|
||
s.appendLog(config.JobID, &result, "warn", " -> "+warning)
|
||
}
|
||
for _, unsupported := range plan.UnsupportedObjects {
|
||
s.appendLog(config.JobID, &result, "warn", " -> "+unsupported)
|
||
}
|
||
if strings.TrimSpace(plan.PlannedAction) != "" {
|
||
s.appendLog(config.JobID, &result, "info", " -> "+plan.PlannedAction)
|
||
}
|
||
if !plan.TargetTableExists && !plan.AutoCreate {
|
||
result.Message = firstNonEmpty(plan.PlannedAction, "目标集合不存在,当前策略不允许自动创建")
|
||
return result
|
||
}
|
||
if !plan.TargetTableExists && len(plan.PreDataSQL) > 0 {
|
||
s.progress(config.JobID, 0, totalKeys, collection, "创建目标集合")
|
||
if err := executeSQLStatements(targetDB.Exec, plan.PreDataSQL); err != nil {
|
||
return s.fail(config.JobID, totalKeys, result, "创建目标集合失败: "+err.Error())
|
||
}
|
||
}
|
||
|
||
changeSet, documents, err := buildRedisMongoChanges(config, keys, sourceClient, targetDB, collection)
|
||
if err != nil {
|
||
return s.fail(config.JobID, totalKeys, result, "构建 Redis 迁移变更失败: "+err.Error())
|
||
}
|
||
for idx, key := range keys {
|
||
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("正在迁移 Key: %s", key))
|
||
s.progress(config.JobID, idx, totalKeys, key, fmt.Sprintf("迁移 Key(%d/%d)", idx+1, totalKeys))
|
||
}
|
||
if len(changeSet.Inserts) == 0 && len(changeSet.Updates) == 0 && len(changeSet.Deletes) == 0 {
|
||
s.appendLog(config.JobID, &result, "info", " -> 目标集合中对应文档已是最新状态")
|
||
result.TablesSynced = totalKeys
|
||
result.Message = fmt.Sprintf("Redis 键空间迁移完成,共处理 %d 个 Key", totalKeys)
|
||
s.progress(config.JobID, totalKeys, totalKeys, collection, "同步完成")
|
||
return result
|
||
}
|
||
applier, ok := targetDB.(db.BatchApplier)
|
||
if !ok {
|
||
return s.fail(config.JobID, totalKeys, result, "目标驱动不支持 MongoDB 文档写入")
|
||
}
|
||
_ = documents
|
||
if err := applier.ApplyChanges(collection, changeSet); err != nil {
|
||
return s.fail(config.JobID, totalKeys, result, "应用 Redis 迁移变更失败: "+err.Error())
|
||
}
|
||
result.RowsInserted += len(changeSet.Inserts)
|
||
result.RowsUpdated += len(changeSet.Updates)
|
||
result.RowsDeleted += len(changeSet.Deletes)
|
||
result.TablesSynced = totalKeys
|
||
result.Message = fmt.Sprintf("Redis 键空间迁移完成,共处理 %d 个 Key", totalKeys)
|
||
s.progress(config.JobID, totalKeys, totalKeys, collection, "同步完成")
|
||
return result
|
||
}
|
||
|
||
func (s *SyncEngine) analyzeRedisToMongo(config SyncConfig) SyncAnalyzeResult {
|
||
result := SyncAnalyzeResult{Success: true, Tables: []TableDiffSummary{}}
|
||
sourceClient := newRedisSourceClient()
|
||
sourceConfig := withResolvedRedisDB(config.SourceConfig)
|
||
if err := sourceClient.Connect(sourceConfig); err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "源 Redis 连接失败: " + err.Error()}
|
||
}
|
||
defer sourceClient.Close()
|
||
targetDB, err := newSyncDatabase(config.TargetConfig.Type)
|
||
if err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "初始化目标数据库驱动失败: " + err.Error()}
|
||
}
|
||
if err := targetDB.Connect(config.TargetConfig); err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "目标数据库连接失败: " + err.Error()}
|
||
}
|
||
defer targetDB.Close()
|
||
keys, err := listRedisMigrationKeys(sourceClient, config.Tables)
|
||
if err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "扫描 Redis Key 失败: " + err.Error()}
|
||
}
|
||
collection := deriveRedisMongoCollectionName(config)
|
||
changeSet, documents, err := buildRedisMongoChanges(config, keys, sourceClient, targetDB, collection)
|
||
if err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "分析 Redis 迁移变更失败: " + err.Error()}
|
||
}
|
||
insertSet := make(map[string]struct{}, len(changeSet.Inserts))
|
||
updateSet := make(map[string]struct{}, len(changeSet.Updates))
|
||
for _, row := range changeSet.Inserts {
|
||
insertSet[fmt.Sprintf("%v", row["_id"])] = struct{}{}
|
||
}
|
||
for _, row := range changeSet.Updates {
|
||
updateSet[fmt.Sprintf("%v", row.Keys["_id"])] = struct{}{}
|
||
}
|
||
for _, doc := range documents {
|
||
key := fmt.Sprintf("%v", doc["key"])
|
||
id := fmt.Sprintf("%v", doc["_id"])
|
||
summary := TableDiffSummary{
|
||
Table: key,
|
||
PKColumn: "_id",
|
||
CanSync: true,
|
||
TargetTableExists: true,
|
||
PlannedAction: fmt.Sprintf("迁移到集合 %s", collection),
|
||
Warnings: []string{
|
||
"Redis Key 将按文档写入 MongoDB 集合",
|
||
},
|
||
}
|
||
if _, ok := insertSet[id]; ok {
|
||
summary.Inserts = 1
|
||
summary.Message = "执行时将写入新文档"
|
||
} else if _, ok := updateSet[id]; ok {
|
||
summary.Updates = 1
|
||
summary.Message = "执行时将更新已有文档"
|
||
} else {
|
||
summary.Same = 1
|
||
summary.Message = "目标集合中对应文档已是最新状态"
|
||
}
|
||
result.Tables = append(result.Tables, summary)
|
||
}
|
||
result.Message = fmt.Sprintf("已完成 %d 个 Redis Key 的迁移分析", len(result.Tables))
|
||
return result
|
||
}
|
||
|
||
func (s *SyncEngine) previewRedisToMongo(config SyncConfig, keyName string, limit int) (TableDiffPreview, error) {
|
||
_ = limit
|
||
sourceClient := newRedisSourceClient()
|
||
sourceConfig := withResolvedRedisDB(config.SourceConfig)
|
||
if err := sourceClient.Connect(sourceConfig); err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("源 Redis 连接失败: %w", err)
|
||
}
|
||
defer sourceClient.Close()
|
||
targetDB, err := newSyncDatabase(config.TargetConfig.Type)
|
||
if err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("初始化目标数据库驱动失败: %w", err)
|
||
}
|
||
if err := targetDB.Connect(config.TargetConfig); err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("目标数据库连接失败: %w", err)
|
||
}
|
||
defer targetDB.Close()
|
||
collection := deriveRedisMongoCollectionName(config)
|
||
changeSet, documents, err := buildRedisMongoChanges(config, []string{keyName}, sourceClient, targetDB, collection)
|
||
if err != nil {
|
||
return TableDiffPreview{}, err
|
||
}
|
||
preview := TableDiffPreview{Table: keyName, PKColumn: "_id", Inserts: []PreviewRow{}, Updates: []PreviewUpdateRow{}, Deletes: []PreviewRow{}}
|
||
if len(documents) == 0 {
|
||
return preview, nil
|
||
}
|
||
doc := documents[0]
|
||
id := fmt.Sprintf("%v", doc["_id"])
|
||
existingDocs, err := loadExistingRedisMongoDocs(targetDB, collection, []string{id})
|
||
if err != nil {
|
||
return TableDiffPreview{}, err
|
||
}
|
||
if len(changeSet.Inserts) > 0 {
|
||
preview.TotalInserts = 1
|
||
preview.Inserts = append(preview.Inserts, PreviewRow{PK: id, Row: doc})
|
||
return preview, nil
|
||
}
|
||
if len(changeSet.Updates) > 0 {
|
||
preview.TotalUpdates = 1
|
||
preview.Updates = append(preview.Updates, PreviewUpdateRow{PK: id, ChangedColumns: sortedMapKeys(changeSet.Updates[0].Values), Source: doc, Target: existingDocs[id]})
|
||
return preview, nil
|
||
}
|
||
return preview, nil
|
||
}
|
||
|
||
func sortedMapKeys(values map[string]interface{}) []string {
|
||
keys := make([]string, 0, len(values))
|
||
for key := range values {
|
||
keys = append(keys, key)
|
||
}
|
||
sort.Strings(keys)
|
||
return keys
|
||
}
|
||
|
||
func isMongoToRedisKeyspacePair(config SyncConfig) bool {
|
||
return resolveMigrationDBType(config.SourceConfig) == "mongodb" && resolveMigrationDBType(config.TargetConfig) == "redis"
|
||
}
|
||
|
||
type mongoRedisKeyDocument struct {
|
||
Key string
|
||
Type string
|
||
TTL int64
|
||
Value interface{}
|
||
SourceRow map[string]interface{}
|
||
Desired *redispkg.RedisValue
|
||
}
|
||
|
||
type mongoRedisKeyDiff struct {
|
||
Collection string
|
||
Document mongoRedisKeyDocument
|
||
Current *redispkg.RedisValue
|
||
Exists bool
|
||
Action string
|
||
ChangedColumns []string
|
||
}
|
||
|
||
func deriveRedisTargetLabel(config SyncConfig) string {
|
||
return fmt.Sprintf("Redis DB %d", resolveRedisDBIndex(config.TargetConfig))
|
||
}
|
||
|
||
func deriveDefaultMongoRedisCollection(config SyncConfig) string {
|
||
return resolveMongoCollectionName(config)
|
||
}
|
||
|
||
func listMongoRedisCollections(sourceDB db.Database, config SyncConfig) ([]string, error) {
|
||
if len(config.Tables) > 0 {
|
||
return dedupeStrings(config.Tables), nil
|
||
}
|
||
tables, err := sourceDB.GetTables(strings.TrimSpace(config.SourceConfig.Database))
|
||
if err == nil && len(tables) > 0 {
|
||
return dedupeStrings(tables), nil
|
||
}
|
||
return []string{deriveDefaultMongoRedisCollection(config)}, nil
|
||
}
|
||
|
||
func buildMongoRedisFindQuery(collection string, limit int) (string, error) {
|
||
command := map[string]interface{}{
|
||
"find": strings.TrimSpace(collection),
|
||
"filter": map[string]interface{}{},
|
||
}
|
||
if limit > 0 {
|
||
command["limit"] = limit
|
||
}
|
||
data, err := json.Marshal(command)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return string(data), nil
|
||
}
|
||
|
||
func loadMongoRedisDocuments(sourceDB db.Database, collection string, limit int) ([]map[string]interface{}, error) {
|
||
query, err := buildMongoRedisFindQuery(collection, limit)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
rows, _, err := sourceDB.Query(query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return rows, nil
|
||
}
|
||
|
||
func parseMongoRedisDocument(row map[string]interface{}) (mongoRedisKeyDocument, error) {
|
||
key := strings.TrimSpace(asRedisMigrationString(row["key"]))
|
||
if key == "" {
|
||
if rawID := strings.TrimSpace(asRedisMigrationString(row["_id"])); rawID != "" {
|
||
if _, tail, ok := strings.Cut(rawID, ":"); ok {
|
||
key = strings.TrimSpace(tail)
|
||
}
|
||
}
|
||
}
|
||
if key == "" {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("文档缺少 key 字段")
|
||
}
|
||
|
||
redisType := strings.ToLower(strings.TrimSpace(asRedisMigrationString(row["type"])))
|
||
if redisType == "" {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("文档缺少 type 字段: key=%s", key)
|
||
}
|
||
|
||
ttl := normalizeRedisMigrationTTL(asRedisMigrationInt64(row["ttl"], -1))
|
||
desired := &redispkg.RedisValue{Type: redisType, TTL: ttl}
|
||
|
||
sourceRow := cloneMapWithoutKeys(row)
|
||
sourceRow["key"] = key
|
||
sourceRow["type"] = redisType
|
||
sourceRow["ttl"] = ttl
|
||
|
||
switch redisType {
|
||
case "string":
|
||
value := asRedisMigrationString(row["value"])
|
||
desired.Value = value
|
||
desired.Length = int64(len(value))
|
||
sourceRow["value"] = value
|
||
case "hash":
|
||
value, err := asRedisMigrationStringMap(row["value"])
|
||
if err != nil {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("key=%s hash 值无效: %w", key, err)
|
||
}
|
||
desired.Value = value
|
||
desired.Length = int64(len(value))
|
||
sourceRow["value"] = normalizeRedisMongoValue(value)
|
||
case "list":
|
||
value, err := asRedisMigrationStringSlice(row["value"])
|
||
if err != nil {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("key=%s list 值无效: %w", key, err)
|
||
}
|
||
desired.Value = value
|
||
desired.Length = int64(len(value))
|
||
sourceRow["value"] = normalizeRedisMongoValue(value)
|
||
case "set":
|
||
value, err := asRedisMigrationStringSlice(row["value"])
|
||
if err != nil {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("key=%s set 值无效: %w", key, err)
|
||
}
|
||
sort.Strings(value)
|
||
desired.Value = value
|
||
desired.Length = int64(len(value))
|
||
sourceRow["value"] = normalizeRedisMongoValue(value)
|
||
case "zset":
|
||
value, err := asRedisMigrationZSetMembers(row["value"])
|
||
if err != nil {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("key=%s zset 值无效: %w", key, err)
|
||
}
|
||
sort.Slice(value, func(i, j int) bool {
|
||
if value[i].Score == value[j].Score {
|
||
return value[i].Member < value[j].Member
|
||
}
|
||
return value[i].Score < value[j].Score
|
||
})
|
||
desired.Value = value
|
||
desired.Length = int64(len(value))
|
||
sourceRow["value"] = normalizeRedisMongoValue(value)
|
||
case "stream":
|
||
value, err := asRedisMigrationStreamEntries(row["value"])
|
||
if err != nil {
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("key=%s stream 值无效: %w", key, err)
|
||
}
|
||
sort.Slice(value, func(i, j int) bool { return value[i].ID < value[j].ID })
|
||
desired.Value = value
|
||
desired.Length = int64(len(value))
|
||
sourceRow["value"] = normalizeRedisMongoValue(value)
|
||
default:
|
||
return mongoRedisKeyDocument{}, fmt.Errorf("key=%s 暂不支持 Redis 类型 %s", key, redisType)
|
||
}
|
||
|
||
return mongoRedisKeyDocument{Key: key, Type: redisType, TTL: ttl, Value: desired.Value, SourceRow: sourceRow, Desired: desired}, nil
|
||
}
|
||
|
||
func buildMongoToRedisDiffs(sourceDB db.Database, targetClient redisMigrationClient, collection string, mode string) ([]mongoRedisKeyDiff, error) {
|
||
rows, err := loadMongoRedisDocuments(sourceDB, collection, 0)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
diffs := make([]mongoRedisKeyDiff, 0, len(rows))
|
||
effectiveMode := normalizeSyncMode(mode)
|
||
for _, row := range rows {
|
||
doc, err := parseMongoRedisDocument(row)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
current, exists, err := loadExistingRedisMigrationValue(targetClient, doc.Key)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("读取目标 Redis Key 失败: key=%s err=%w", doc.Key, err)
|
||
}
|
||
action := "insert"
|
||
changedColumns := []string{"type", "ttl", "value"}
|
||
if exists {
|
||
if sameRedisMigrationValue(current, doc.Desired) {
|
||
action = "same"
|
||
changedColumns = nil
|
||
} else if effectiveMode == "insert_only" {
|
||
action = "same"
|
||
changedColumns = nil
|
||
} else {
|
||
action = "update"
|
||
changedColumns = diffRedisMigrationColumns(current, doc.Desired)
|
||
}
|
||
}
|
||
diffs = append(diffs, mongoRedisKeyDiff{
|
||
Collection: collection,
|
||
Document: doc,
|
||
Current: current,
|
||
Exists: exists,
|
||
Action: action,
|
||
ChangedColumns: changedColumns,
|
||
})
|
||
}
|
||
sort.Slice(diffs, func(i, j int) bool { return diffs[i].Document.Key < diffs[j].Document.Key })
|
||
return diffs, nil
|
||
}
|
||
|
||
func loadExistingRedisMigrationValue(client redisMigrationClient, key string) (*redispkg.RedisValue, bool, error) {
|
||
keyType, err := client.GetKeyType(key)
|
||
if err != nil {
|
||
return nil, false, err
|
||
}
|
||
keyType = strings.ToLower(strings.TrimSpace(keyType))
|
||
if keyType == "" || keyType == "none" {
|
||
return nil, false, nil
|
||
}
|
||
value, err := client.GetValue(key)
|
||
if err != nil {
|
||
return nil, false, err
|
||
}
|
||
if value == nil {
|
||
return nil, false, nil
|
||
}
|
||
value.Type = keyType
|
||
value.TTL = normalizeRedisMigrationTTL(value.TTL)
|
||
return value, true, nil
|
||
}
|
||
|
||
func normalizeRedisMigrationTTL(ttl int64) int64 {
|
||
if ttl > 0 {
|
||
return ttl
|
||
}
|
||
return -1
|
||
}
|
||
|
||
func sameRedisMigrationValue(current *redispkg.RedisValue, desired *redispkg.RedisValue) bool {
|
||
if current == nil || desired == nil {
|
||
return current == nil && desired == nil
|
||
}
|
||
if strings.ToLower(strings.TrimSpace(current.Type)) != strings.ToLower(strings.TrimSpace(desired.Type)) {
|
||
return false
|
||
}
|
||
if normalizeRedisMigrationTTL(current.TTL) != normalizeRedisMigrationTTL(desired.TTL) {
|
||
return false
|
||
}
|
||
return canonicalRedisMigrationValue(current) == canonicalRedisMigrationValue(desired)
|
||
}
|
||
|
||
func canonicalRedisMigrationValue(value *redispkg.RedisValue) string {
|
||
if value == nil {
|
||
return "null"
|
||
}
|
||
payload := map[string]interface{}{
|
||
"type": strings.ToLower(strings.TrimSpace(value.Type)),
|
||
"ttl": normalizeRedisMigrationTTL(value.TTL),
|
||
"value": normalizeRedisComparablePayload(strings.ToLower(strings.TrimSpace(value.Type)), value.Value),
|
||
}
|
||
data, err := json.Marshal(payload)
|
||
if err != nil {
|
||
return fmt.Sprintf("%v", payload)
|
||
}
|
||
return string(data)
|
||
}
|
||
|
||
func normalizeRedisComparablePayload(redisType string, value interface{}) interface{} {
|
||
switch redisType {
|
||
case "string":
|
||
return asRedisMigrationString(value)
|
||
case "hash":
|
||
mapped, err := asRedisMigrationStringMap(value)
|
||
if err != nil {
|
||
return fmt.Sprintf("%v", value)
|
||
}
|
||
return normalizeRedisMongoValue(mapped)
|
||
case "list":
|
||
items, err := asRedisMigrationStringSlice(value)
|
||
if err != nil {
|
||
return fmt.Sprintf("%v", value)
|
||
}
|
||
return normalizeRedisMongoValue(items)
|
||
case "set":
|
||
items, err := asRedisMigrationStringSlice(value)
|
||
if err != nil {
|
||
return fmt.Sprintf("%v", value)
|
||
}
|
||
sort.Strings(items)
|
||
return normalizeRedisMongoValue(items)
|
||
case "zset":
|
||
members, err := asRedisMigrationZSetMembers(value)
|
||
if err != nil {
|
||
return fmt.Sprintf("%v", value)
|
||
}
|
||
sort.Slice(members, func(i, j int) bool {
|
||
if members[i].Score == members[j].Score {
|
||
return members[i].Member < members[j].Member
|
||
}
|
||
return members[i].Score < members[j].Score
|
||
})
|
||
return normalizeRedisMongoValue(members)
|
||
case "stream":
|
||
entries, err := asRedisMigrationStreamEntries(value)
|
||
if err != nil {
|
||
return fmt.Sprintf("%v", value)
|
||
}
|
||
sort.Slice(entries, func(i, j int) bool { return entries[i].ID < entries[j].ID })
|
||
return normalizeRedisMongoValue(entries)
|
||
default:
|
||
return normalizeRedisMongoValue(value)
|
||
}
|
||
}
|
||
|
||
func diffRedisMigrationColumns(current *redispkg.RedisValue, desired *redispkg.RedisValue) []string {
|
||
changed := make([]string, 0, 3)
|
||
if current == nil || desired == nil {
|
||
return []string{"type", "ttl", "value"}
|
||
}
|
||
if strings.ToLower(strings.TrimSpace(current.Type)) != strings.ToLower(strings.TrimSpace(desired.Type)) {
|
||
changed = append(changed, "type")
|
||
}
|
||
if normalizeRedisMigrationTTL(current.TTL) != normalizeRedisMigrationTTL(desired.TTL) {
|
||
changed = append(changed, "ttl")
|
||
}
|
||
currentComparable := normalizeRedisComparablePayload(strings.ToLower(strings.TrimSpace(desired.Type)), current.Value)
|
||
desiredComparable := normalizeRedisComparablePayload(strings.ToLower(strings.TrimSpace(desired.Type)), desired.Value)
|
||
currentJSON, _ := json.Marshal(currentComparable)
|
||
desiredJSON, _ := json.Marshal(desiredComparable)
|
||
if string(currentJSON) != string(desiredJSON) {
|
||
changed = append(changed, "value")
|
||
}
|
||
return dedupeStrings(changed)
|
||
}
|
||
|
||
func buildRedisPreviewRow(key string, value *redispkg.RedisValue) map[string]interface{} {
|
||
if value == nil {
|
||
return map[string]interface{}{"key": key}
|
||
}
|
||
return map[string]interface{}{
|
||
"key": key,
|
||
"type": strings.ToLower(strings.TrimSpace(value.Type)),
|
||
"ttl": normalizeRedisMigrationTTL(value.TTL),
|
||
"value": normalizeRedisComparablePayload(strings.ToLower(strings.TrimSpace(value.Type)), value.Value),
|
||
}
|
||
}
|
||
|
||
func applyMongoRedisDiff(targetClient redisMigrationClient, diff mongoRedisKeyDiff) error {
|
||
desired := diff.Document.Desired
|
||
if desired == nil {
|
||
return fmt.Errorf("空的 Redis 目标值: key=%s", diff.Document.Key)
|
||
}
|
||
redisType := strings.ToLower(strings.TrimSpace(desired.Type))
|
||
ttl := normalizeRedisMigrationTTL(desired.TTL)
|
||
if diff.Exists && diff.Action == "update" && redisType != "string" {
|
||
if _, err := targetClient.DeleteKeys([]string{diff.Document.Key}); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
switch redisType {
|
||
case "string":
|
||
return targetClient.SetString(diff.Document.Key, asRedisMigrationString(desired.Value), ttl)
|
||
case "hash":
|
||
mapped, err := asRedisMigrationStringMap(desired.Value)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
fields := make([]string, 0, len(mapped))
|
||
for field := range mapped {
|
||
fields = append(fields, field)
|
||
}
|
||
sort.Strings(fields)
|
||
for _, field := range fields {
|
||
if err := targetClient.SetHashField(diff.Document.Key, field, mapped[field]); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return targetClient.SetTTL(diff.Document.Key, ttl)
|
||
case "list":
|
||
items, err := asRedisMigrationStringSlice(desired.Value)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(items) > 0 {
|
||
if err := targetClient.ListPush(diff.Document.Key, items...); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return targetClient.SetTTL(diff.Document.Key, ttl)
|
||
case "set":
|
||
items, err := asRedisMigrationStringSlice(desired.Value)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(items) > 0 {
|
||
if err := targetClient.SetAdd(diff.Document.Key, items...); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return targetClient.SetTTL(diff.Document.Key, ttl)
|
||
case "zset":
|
||
members, err := asRedisMigrationZSetMembers(desired.Value)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(members) > 0 {
|
||
if err := targetClient.ZSetAdd(diff.Document.Key, members...); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return targetClient.SetTTL(diff.Document.Key, ttl)
|
||
case "stream":
|
||
entries, err := asRedisMigrationStreamEntries(desired.Value)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, entry := range entries {
|
||
if _, err := targetClient.StreamAdd(diff.Document.Key, entry.Fields, entry.ID); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return targetClient.SetTTL(diff.Document.Key, ttl)
|
||
default:
|
||
return fmt.Errorf("暂不支持 Redis 类型 %s", redisType)
|
||
}
|
||
}
|
||
|
||
func asRedisMigrationString(value interface{}) string {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return ""
|
||
case string:
|
||
return typed
|
||
case []byte:
|
||
return string(typed)
|
||
default:
|
||
return fmt.Sprintf("%v", typed)
|
||
}
|
||
}
|
||
|
||
func asRedisMigrationInt64(value interface{}, defaultValue int64) int64 {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return defaultValue
|
||
case int:
|
||
return int64(typed)
|
||
case int8:
|
||
return int64(typed)
|
||
case int16:
|
||
return int64(typed)
|
||
case int32:
|
||
return int64(typed)
|
||
case int64:
|
||
return typed
|
||
case uint:
|
||
return int64(typed)
|
||
case uint8:
|
||
return int64(typed)
|
||
case uint16:
|
||
return int64(typed)
|
||
case uint32:
|
||
return int64(typed)
|
||
case uint64:
|
||
return int64(typed)
|
||
case float32:
|
||
return int64(typed)
|
||
case float64:
|
||
return int64(typed)
|
||
case json.Number:
|
||
if n, err := typed.Int64(); err == nil {
|
||
return n
|
||
}
|
||
case string:
|
||
if n, err := strconv.ParseInt(strings.TrimSpace(typed), 10, 64); err == nil {
|
||
return n
|
||
}
|
||
}
|
||
return defaultValue
|
||
}
|
||
|
||
func asRedisMigrationFloat64(value interface{}) (float64, error) {
|
||
switch typed := value.(type) {
|
||
case float64:
|
||
return typed, nil
|
||
case float32:
|
||
return float64(typed), nil
|
||
case int:
|
||
return float64(typed), nil
|
||
case int8:
|
||
return float64(typed), nil
|
||
case int16:
|
||
return float64(typed), nil
|
||
case int32:
|
||
return float64(typed), nil
|
||
case int64:
|
||
return float64(typed), nil
|
||
case uint:
|
||
return float64(typed), nil
|
||
case uint8:
|
||
return float64(typed), nil
|
||
case uint16:
|
||
return float64(typed), nil
|
||
case uint32:
|
||
return float64(typed), nil
|
||
case uint64:
|
||
return float64(typed), nil
|
||
case json.Number:
|
||
return typed.Float64()
|
||
case string:
|
||
return strconv.ParseFloat(strings.TrimSpace(typed), 64)
|
||
default:
|
||
return 0, fmt.Errorf("无法转换为 float64: %T", value)
|
||
}
|
||
}
|
||
|
||
func asRedisMigrationStringMap(value interface{}) (map[string]string, error) {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return map[string]string{}, nil
|
||
case map[string]string:
|
||
result := make(map[string]string, len(typed))
|
||
for k, v := range typed {
|
||
result[k] = v
|
||
}
|
||
return result, nil
|
||
case map[string]interface{}:
|
||
result := make(map[string]string, len(typed))
|
||
for k, v := range typed {
|
||
result[k] = asRedisMigrationString(v)
|
||
}
|
||
return result, nil
|
||
default:
|
||
return nil, fmt.Errorf("期望对象,实际=%T", value)
|
||
}
|
||
}
|
||
|
||
func asRedisMigrationStringSlice(value interface{}) ([]string, error) {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return []string{}, nil
|
||
case []string:
|
||
result := append([]string(nil), typed...)
|
||
return result, nil
|
||
case []interface{}:
|
||
result := make([]string, 0, len(typed))
|
||
for _, item := range typed {
|
||
result = append(result, asRedisMigrationString(item))
|
||
}
|
||
return result, nil
|
||
default:
|
||
return nil, fmt.Errorf("期望数组,实际=%T", value)
|
||
}
|
||
}
|
||
|
||
func asRedisMigrationZSetMembers(value interface{}) ([]redispkg.ZSetMember, error) {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return []redispkg.ZSetMember{}, nil
|
||
case []redispkg.ZSetMember:
|
||
result := append([]redispkg.ZSetMember(nil), typed...)
|
||
return result, nil
|
||
case []interface{}:
|
||
result := make([]redispkg.ZSetMember, 0, len(typed))
|
||
for _, item := range typed {
|
||
mapped, ok := item.(map[string]interface{})
|
||
if !ok {
|
||
return nil, fmt.Errorf("zset 成员格式无效: %T", item)
|
||
}
|
||
score, err := asRedisMigrationFloat64(mapped["score"])
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
result = append(result, redispkg.ZSetMember{Member: asRedisMigrationString(mapped["member"]), Score: score})
|
||
}
|
||
return result, nil
|
||
default:
|
||
return nil, fmt.Errorf("期望 zset 数组,实际=%T", value)
|
||
}
|
||
}
|
||
|
||
func asRedisMigrationStreamEntries(value interface{}) ([]redispkg.StreamEntry, error) {
|
||
switch typed := value.(type) {
|
||
case nil:
|
||
return []redispkg.StreamEntry{}, nil
|
||
case []redispkg.StreamEntry:
|
||
result := append([]redispkg.StreamEntry(nil), typed...)
|
||
return result, nil
|
||
case []interface{}:
|
||
result := make([]redispkg.StreamEntry, 0, len(typed))
|
||
for _, item := range typed {
|
||
mapped, ok := item.(map[string]interface{})
|
||
if !ok {
|
||
return nil, fmt.Errorf("stream 条目格式无效: %T", item)
|
||
}
|
||
fields, err := asRedisMigrationStringMap(mapped["fields"])
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
result = append(result, redispkg.StreamEntry{ID: asRedisMigrationString(mapped["id"]), Fields: fields})
|
||
}
|
||
return result, nil
|
||
default:
|
||
return nil, fmt.Errorf("期望 stream 数组,实际=%T", value)
|
||
}
|
||
}
|
||
|
||
func (s *SyncEngine) runMongoToRedisSync(config SyncConfig, result SyncResult) SyncResult {
|
||
collections := dedupeStrings(config.Tables)
|
||
sourceDB, err := newSyncDatabase(config.SourceConfig.Type)
|
||
if err != nil {
|
||
return s.fail(config.JobID, len(collections), result, "初始化源数据库驱动失败: "+err.Error())
|
||
}
|
||
if err := sourceDB.Connect(config.SourceConfig); err != nil {
|
||
return s.fail(config.JobID, len(collections), result, "源 MongoDB 连接失败: "+err.Error())
|
||
}
|
||
defer sourceDB.Close()
|
||
if len(collections) == 0 {
|
||
collections, err = listMongoRedisCollections(sourceDB, config)
|
||
if err != nil {
|
||
return s.fail(config.JobID, 0, result, "获取 MongoDB 集合列表失败: "+err.Error())
|
||
}
|
||
}
|
||
if len(collections) == 0 {
|
||
result.Message = "未发现可迁移的 MongoDB 集合"
|
||
s.progress(config.JobID, 0, 0, "", "同步完成")
|
||
return result
|
||
}
|
||
|
||
effectiveMode := normalizeSyncMode(config.Mode)
|
||
totalCollections := len(collections)
|
||
s.progress(config.JobID, 0, totalCollections, "", "开始 MongoDB 键空间迁移")
|
||
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("MongoDB -> Redis 键空间迁移;模式:%s;目标:%s", effectiveMode, deriveRedisTargetLabel(config)))
|
||
s.appendLog(config.JobID, &result, "warn", "MongoDB -> Redis 第一版仅支持固定文档格式:key/type/ttl/value")
|
||
if effectiveMode == "full_overwrite" {
|
||
s.appendLog(config.JobID, &result, "warn", "MongoDB -> Redis 第一版暂不执行 Redis DB 级 full_overwrite 删除,已降级为 insert_update")
|
||
effectiveMode = "insert_update"
|
||
}
|
||
|
||
targetClient := newRedisSourceClient()
|
||
targetConfig := withResolvedRedisDB(config.TargetConfig)
|
||
if err := targetClient.Connect(targetConfig); err != nil {
|
||
return s.fail(config.JobID, totalCollections, result, "目标 Redis 连接失败: "+err.Error())
|
||
}
|
||
defer targetClient.Close()
|
||
|
||
processedKeys := 0
|
||
for idx, collection := range collections {
|
||
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("正在同步集合: %s", collection))
|
||
s.progress(config.JobID, idx, totalCollections, collection, fmt.Sprintf("迁移集合(%d/%d)", idx+1, totalCollections))
|
||
diffs, err := buildMongoToRedisDiffs(sourceDB, targetClient, collection, effectiveMode)
|
||
if err != nil {
|
||
return s.fail(config.JobID, totalCollections, result, fmt.Sprintf("分析集合 %s 失败: %v", collection, err))
|
||
}
|
||
for _, diff := range diffs {
|
||
processedKeys++
|
||
if diff.Action == "same" {
|
||
continue
|
||
}
|
||
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("正在迁移 Key: %s", diff.Document.Key))
|
||
if err := applyMongoRedisDiff(targetClient, diff); err != nil {
|
||
return s.fail(config.JobID, totalCollections, result, fmt.Sprintf("写入 Redis Key %s 失败: %v", diff.Document.Key, err))
|
||
}
|
||
switch diff.Action {
|
||
case "insert":
|
||
result.RowsInserted++
|
||
case "update":
|
||
result.RowsUpdated++
|
||
}
|
||
}
|
||
result.TablesSynced++
|
||
s.progress(config.JobID, idx+1, totalCollections, collection, "集合处理完成")
|
||
}
|
||
|
||
if processedKeys == 0 {
|
||
result.Message = "未发现可迁移的 MongoDB Redis 文档"
|
||
return result
|
||
}
|
||
result.Message = fmt.Sprintf("MongoDB 键空间迁移完成,共处理 %d 个集合、%d 个 Key", result.TablesSynced, processedKeys)
|
||
return result
|
||
}
|
||
|
||
func (s *SyncEngine) analyzeMongoToRedis(config SyncConfig) SyncAnalyzeResult {
|
||
result := SyncAnalyzeResult{Success: true, Tables: []TableDiffSummary{}}
|
||
sourceDB, err := newSyncDatabase(config.SourceConfig.Type)
|
||
if err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "初始化源数据库驱动失败: " + err.Error()}
|
||
}
|
||
if err := sourceDB.Connect(config.SourceConfig); err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "源 MongoDB 连接失败: " + err.Error()}
|
||
}
|
||
defer sourceDB.Close()
|
||
|
||
collections, err := listMongoRedisCollections(sourceDB, config)
|
||
if err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "获取 MongoDB 集合列表失败: " + err.Error()}
|
||
}
|
||
|
||
effectiveMode := normalizeSyncMode(config.Mode)
|
||
modeWarning := ""
|
||
if effectiveMode == "full_overwrite" {
|
||
modeWarning = "MongoDB -> Redis 第一版会将 full_overwrite 降级为 insert_update,避免误删 DB 内其他 Key"
|
||
effectiveMode = "insert_update"
|
||
}
|
||
|
||
targetClient := newRedisSourceClient()
|
||
targetConfig := withResolvedRedisDB(config.TargetConfig)
|
||
if err := targetClient.Connect(targetConfig); err != nil {
|
||
return SyncAnalyzeResult{Success: false, Message: "目标 Redis 连接失败: " + err.Error()}
|
||
}
|
||
defer targetClient.Close()
|
||
|
||
for _, collection := range collections {
|
||
summary := TableDiffSummary{
|
||
Table: collection,
|
||
PKColumn: "key",
|
||
CanSync: true,
|
||
TargetTableExists: true,
|
||
PlannedAction: fmt.Sprintf("迁移到 %s", deriveRedisTargetLabel(config)),
|
||
Warnings: []string{
|
||
"MongoDB 集合中的文档会按 keyspace 语义写入 Redis",
|
||
"当前仅支持固定文档格式:key/type/ttl/value",
|
||
},
|
||
}
|
||
if modeWarning != "" {
|
||
summary.Warnings = append(summary.Warnings, modeWarning)
|
||
}
|
||
diffs, err := buildMongoToRedisDiffs(sourceDB, targetClient, collection, effectiveMode)
|
||
if err != nil {
|
||
summary.CanSync = false
|
||
summary.Message = err.Error()
|
||
result.Tables = append(result.Tables, summary)
|
||
continue
|
||
}
|
||
for _, diff := range diffs {
|
||
switch diff.Action {
|
||
case "insert":
|
||
summary.Inserts++
|
||
case "update":
|
||
summary.Updates++
|
||
default:
|
||
summary.Same++
|
||
}
|
||
}
|
||
if summary.Inserts == 0 && summary.Updates == 0 {
|
||
if summary.Same == 0 {
|
||
summary.Message = "集合中未发现可迁移文档"
|
||
} else {
|
||
summary.Message = "目标 Redis 中对应 Key 已是最新状态"
|
||
}
|
||
} else {
|
||
summary.Message = fmt.Sprintf("执行时将写入 %d 个新 Key、更新 %d 个已有 Key", summary.Inserts, summary.Updates)
|
||
}
|
||
result.Tables = append(result.Tables, summary)
|
||
}
|
||
result.Message = fmt.Sprintf("已完成 %d 个 MongoDB 集合的 Redis 迁移分析", len(result.Tables))
|
||
return result
|
||
}
|
||
|
||
func (s *SyncEngine) previewMongoToRedis(config SyncConfig, collection string, limit int) (TableDiffPreview, error) {
|
||
sourceDB, err := newSyncDatabase(config.SourceConfig.Type)
|
||
if err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("初始化源数据库驱动失败: %w", err)
|
||
}
|
||
if err := sourceDB.Connect(config.SourceConfig); err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("源 MongoDB 连接失败: %w", err)
|
||
}
|
||
defer sourceDB.Close()
|
||
|
||
targetClient := newRedisSourceClient()
|
||
targetConfig := withResolvedRedisDB(config.TargetConfig)
|
||
if err := targetClient.Connect(targetConfig); err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("目标 Redis 连接失败: %w", err)
|
||
}
|
||
defer targetClient.Close()
|
||
|
||
effectiveMode := normalizeSyncMode(config.Mode)
|
||
if effectiveMode == "full_overwrite" {
|
||
effectiveMode = "insert_update"
|
||
}
|
||
|
||
diffs, err := buildMongoToRedisDiffs(sourceDB, targetClient, collection, effectiveMode)
|
||
if err != nil {
|
||
return TableDiffPreview{}, err
|
||
}
|
||
preview := TableDiffPreview{Table: collection, PKColumn: "key", Inserts: []PreviewRow{}, Updates: []PreviewUpdateRow{}, Deletes: []PreviewRow{}}
|
||
for _, diff := range diffs {
|
||
switch diff.Action {
|
||
case "insert":
|
||
preview.TotalInserts++
|
||
if len(preview.Inserts) < limit {
|
||
preview.Inserts = append(preview.Inserts, PreviewRow{PK: diff.Document.Key, Row: diff.Document.SourceRow})
|
||
}
|
||
case "update":
|
||
preview.TotalUpdates++
|
||
if len(preview.Updates) < limit {
|
||
preview.Updates = append(preview.Updates, PreviewUpdateRow{PK: diff.Document.Key, ChangedColumns: diff.ChangedColumns, Source: diff.Document.SourceRow, Target: buildRedisPreviewRow(diff.Document.Key, diff.Current)})
|
||
}
|
||
}
|
||
}
|
||
return preview, nil
|
||
}
|