mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-05-11 16:49:42 +08:00
- 统一同库同步与跨库迁移入口,补充模式区分与风险提示 - 扩展 ClickHouse 与 PG-like 双向迁移,并新增 PG-like、ClickHouse、TDengine 到 MongoDB 的迁移路由 - 完善 TDengine 目标端建表规划、回归测试与需求追踪文档 - refs #51
168 lines
4.8 KiB
Go
168 lines
4.8 KiB
Go
package sync
|
||
|
||
import (
|
||
"errors"
|
||
"fmt"
|
||
"strings"
|
||
)
|
||
|
||
type PreviewRow struct {
|
||
PK string `json:"pk"`
|
||
Row map[string]interface{} `json:"row"`
|
||
}
|
||
|
||
type PreviewUpdateRow struct {
|
||
PK string `json:"pk"`
|
||
ChangedColumns []string `json:"changedColumns"`
|
||
Source map[string]interface{} `json:"source"`
|
||
Target map[string]interface{} `json:"target"`
|
||
}
|
||
|
||
type TableDiffPreview struct {
|
||
Table string `json:"table"`
|
||
PKColumn string `json:"pkColumn"`
|
||
TotalInserts int `json:"totalInserts"`
|
||
TotalUpdates int `json:"totalUpdates"`
|
||
TotalDeletes int `json:"totalDeletes"`
|
||
Inserts []PreviewRow `json:"inserts"`
|
||
Updates []PreviewUpdateRow `json:"updates"`
|
||
Deletes []PreviewRow `json:"deletes"`
|
||
}
|
||
|
||
func (s *SyncEngine) Preview(config SyncConfig, tableName string, limit int) (TableDiffPreview, error) {
|
||
if limit <= 0 {
|
||
limit = 200
|
||
}
|
||
if limit > 500 {
|
||
limit = 500
|
||
}
|
||
if isRedisToMongoKeyspacePair(config) {
|
||
return s.previewRedisToMongo(config, tableName, limit)
|
||
}
|
||
if isMongoToRedisKeyspacePair(config) {
|
||
return s.previewMongoToRedis(config, tableName, limit)
|
||
}
|
||
|
||
sourceDB, err := newSyncDatabase(config.SourceConfig.Type)
|
||
if err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("初始化源数据库驱动失败: %w", err)
|
||
}
|
||
targetDB, err := newSyncDatabase(config.TargetConfig.Type)
|
||
if err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("初始化目标数据库驱动失败: %w", err)
|
||
}
|
||
|
||
if err := sourceDB.Connect(config.SourceConfig); err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("源数据库连接失败: %w", err)
|
||
}
|
||
defer sourceDB.Close()
|
||
|
||
if err := targetDB.Connect(config.TargetConfig); err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("目标数据库连接失败: %w", err)
|
||
}
|
||
defer targetDB.Close()
|
||
|
||
plan, cols, _, err := buildSchemaMigrationPlan(config, tableName, sourceDB, targetDB)
|
||
if err != nil {
|
||
return TableDiffPreview{}, err
|
||
}
|
||
if !plan.TargetTableExists && !plan.AutoCreate {
|
||
return TableDiffPreview{}, errors.New(firstNonEmpty(plan.PlannedAction, "目标表不存在,无法预览差异"))
|
||
}
|
||
|
||
pkCols := make([]string, 0, 2)
|
||
for _, c := range cols {
|
||
if c.Key == "PRI" || c.Key == "PK" {
|
||
pkCols = append(pkCols, c.Name)
|
||
}
|
||
}
|
||
if len(pkCols) == 0 {
|
||
return TableDiffPreview{}, fmt.Errorf("无主键,不支持数据预览")
|
||
}
|
||
if len(pkCols) > 1 {
|
||
return TableDiffPreview{}, fmt.Errorf("复合主键(%s),暂不支持数据预览", strings.Join(pkCols, ","))
|
||
}
|
||
pkCol := pkCols[0]
|
||
|
||
sourceRows, _, err := sourceDB.Query(fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(resolveMigrationDBType(config.SourceConfig), plan.SourceQueryTable)))
|
||
if err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("读取源表失败: %w", err)
|
||
}
|
||
|
||
targetRows := make([]map[string]interface{}, 0)
|
||
if plan.TargetTableExists {
|
||
targetRows, _, err = targetDB.Query(fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(resolveMigrationDBType(config.TargetConfig), plan.TargetQueryTable)))
|
||
if err != nil {
|
||
return TableDiffPreview{}, fmt.Errorf("读取目标表失败: %w", err)
|
||
}
|
||
}
|
||
|
||
targetMap := make(map[string]map[string]interface{}, len(targetRows))
|
||
for _, row := range targetRows {
|
||
if row[pkCol] == nil {
|
||
continue
|
||
}
|
||
pkVal := strings.TrimSpace(fmt.Sprintf("%v", row[pkCol]))
|
||
if pkVal == "" || pkVal == "<nil>" {
|
||
continue
|
||
}
|
||
targetMap[pkVal] = row
|
||
}
|
||
|
||
out := TableDiffPreview{
|
||
Table: tableName,
|
||
PKColumn: pkCol,
|
||
TotalInserts: 0,
|
||
TotalUpdates: 0,
|
||
TotalDeletes: 0,
|
||
Inserts: make([]PreviewRow, 0),
|
||
Updates: make([]PreviewUpdateRow, 0),
|
||
Deletes: make([]PreviewRow, 0),
|
||
}
|
||
|
||
sourcePKSet := make(map[string]struct{}, len(sourceRows))
|
||
for _, sRow := range sourceRows {
|
||
if sRow[pkCol] == nil {
|
||
continue
|
||
}
|
||
pkVal := strings.TrimSpace(fmt.Sprintf("%v", sRow[pkCol]))
|
||
if pkVal == "" || pkVal == "<nil>" {
|
||
continue
|
||
}
|
||
sourcePKSet[pkVal] = struct{}{}
|
||
|
||
if tRow, exists := targetMap[pkVal]; exists {
|
||
changedColumns := make([]string, 0)
|
||
for k, v := range sRow {
|
||
if fmt.Sprintf("%v", v) != fmt.Sprintf("%v", tRow[k]) {
|
||
changedColumns = append(changedColumns, k)
|
||
}
|
||
}
|
||
if len(changedColumns) > 0 {
|
||
out.TotalUpdates++
|
||
if len(out.Updates) < limit {
|
||
out.Updates = append(out.Updates, PreviewUpdateRow{PK: pkVal, ChangedColumns: changedColumns, Source: sRow, Target: tRow})
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
out.TotalInserts++
|
||
if len(out.Inserts) < limit {
|
||
out.Inserts = append(out.Inserts, PreviewRow{PK: pkVal, Row: sRow})
|
||
}
|
||
}
|
||
|
||
for pkVal, row := range targetMap {
|
||
if _, ok := sourcePKSet[pkVal]; ok {
|
||
continue
|
||
}
|
||
out.TotalDeletes++
|
||
if len(out.Deletes) < limit {
|
||
out.Deletes = append(out.Deletes, PreviewRow{PK: pkVal, Row: row})
|
||
}
|
||
}
|
||
|
||
return out, nil
|
||
}
|