Files
MyGoNavi/internal/sync/preview.go
Syngnat 74a422a5e2 🐛 fix(sync): 修复 Oracle 同步连接与 MySQL 备份导出异常
- 分离 Oracle/OceanBase Oracle 同步连接 Service Name 与选中 schema

- 兼容旧同步请求中 database 被 schema 覆盖的情况

- 规范 MySQL/MariaDB SHOW CREATE TABLE 标识符引用

Refs #549

Refs #518
2026-06-11 10:24:48 +08:00

291 lines
9.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"errors"
"fmt"
"strings"
)
type PreviewRow struct {
PK string `json:"pk"`
Row map[string]interface{} `json:"row"`
}
type PreviewUpdateRow struct {
PK string `json:"pk"`
ChangedColumns []string `json:"changedColumns"`
Source map[string]interface{} `json:"source"`
Target map[string]interface{} `json:"target"`
}
type TableDiffPreview struct {
Table string `json:"table"`
PKColumn string `json:"pkColumn"`
ColumnTypes map[string]string `json:"columnTypes,omitempty"`
SchemaSummary string `json:"schemaSummary,omitempty"`
SchemaWarnings []string `json:"schemaWarnings,omitempty"`
SchemaStatements []string `json:"schemaStatements,omitempty"`
TotalInserts int `json:"totalInserts"`
TotalUpdates int `json:"totalUpdates"`
TotalDeletes int `json:"totalDeletes"`
Inserts []PreviewRow `json:"inserts"`
Updates []PreviewUpdateRow `json:"updates"`
Deletes []PreviewRow `json:"deletes"`
}
func (s *SyncEngine) Preview(config SyncConfig, tableName string, limit int) (TableDiffPreview, error) {
config = normalizeSyncConnectionDatabases(config)
if limit <= 0 {
limit = 200
}
if limit > 500 {
limit = 500
}
if isRedisToMongoKeyspacePair(config) {
return s.previewRedisToMongo(config, tableName, limit)
}
if isMongoToRedisKeyspacePair(config) {
return s.previewMongoToRedis(config, tableName, limit)
}
if hasSourceQuery(config) {
return s.previewSourceQuery(config, limit)
}
sourceDB, err := newSyncDatabase(config.SourceConfig.Type)
if err != nil {
return TableDiffPreview{}, fmt.Errorf("初始化源数据库驱动失败: %w", err)
}
targetDB, err := newSyncDatabase(config.TargetConfig.Type)
if err != nil {
return TableDiffPreview{}, fmt.Errorf("初始化目标数据库驱动失败: %w", err)
}
if err := sourceDB.Connect(config.SourceConfig); err != nil {
return TableDiffPreview{}, fmt.Errorf("源数据库连接失败: %w", err)
}
defer sourceDB.Close()
if err := targetDB.Connect(config.TargetConfig); err != nil {
return TableDiffPreview{}, fmt.Errorf("目标数据库连接失败: %w", err)
}
defer targetDB.Close()
plan, cols, _, err := buildSchemaMigrationPlan(config, tableName, sourceDB, targetDB)
if err != nil {
return TableDiffPreview{}, err
}
if !plan.TargetTableExists && !plan.AutoCreate {
return TableDiffPreview{}, errors.New(firstNonEmpty(plan.PlannedAction, "目标表不存在,无法预览差异"))
}
schemaStatements := make([]string, 0, len(plan.PreDataSQL)+len(plan.PostDataSQL))
schemaStatements = append(schemaStatements, plan.PreDataSQL...)
schemaStatements = append(schemaStatements, plan.PostDataSQL...)
contentRaw := strings.ToLower(strings.TrimSpace(config.Content))
if contentRaw == "schema" {
return TableDiffPreview{
Table: tableName,
SchemaSummary: firstNonEmpty(plan.PlannedAction, "仅同步结构"),
SchemaWarnings: append([]string(nil), plan.Warnings...),
SchemaStatements: append([]string(nil), schemaStatements...),
}, nil
}
pkCols := make([]string, 0, 2)
for _, c := range cols {
if c.Key == "PRI" || c.Key == "PK" {
pkCols = append(pkCols, c.Name)
}
}
if len(pkCols) == 0 {
return TableDiffPreview{}, fmt.Errorf("无主键,不支持数据预览")
}
if len(pkCols) > 1 {
return TableDiffPreview{}, fmt.Errorf("复合主键(%s暂不支持数据预览", strings.Join(pkCols, ","))
}
pkCol := pkCols[0]
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
out := TableDiffPreview{
Table: tableName,
PKColumn: pkCol,
ColumnTypes: make(map[string]string, len(cols)),
SchemaSummary: firstNonEmpty(plan.PlannedAction, "结构预览"),
SchemaWarnings: append([]string(nil), plan.Warnings...),
SchemaStatements: append([]string(nil), schemaStatements...),
TotalInserts: 0,
TotalUpdates: 0,
TotalDeletes: 0,
Inserts: make([]PreviewRow, 0),
Updates: make([]PreviewUpdateRow, 0),
Deletes: make([]PreviewRow, 0),
}
for _, col := range cols {
name := strings.ToLower(strings.TrimSpace(col.Name))
typ := strings.TrimSpace(col.Type)
if name == "" || typ == "" {
continue
}
out.ColumnTypes[name] = typ
}
tableMode := normalizeSyncMode(config.Mode)
targetColSet := map[string]struct{}{}
if plan.TargetTableExists {
targetCols, err := targetDB.GetColumns(plan.TargetSchema, plan.TargetTable)
if err == nil {
targetColSet = buildTargetColumnSet(targetCols)
}
}
if !plan.TargetTableExists || tableMode != "insert_update" {
sourceCount, counted, err := countTableRowsForSync(sourceDB, sourceType, plan.SourceQueryTable)
if err != nil {
return TableDiffPreview{}, fmt.Errorf("读取源表数量失败: %w", err)
}
query := buildPagedSourceTableQuery(sourceType, plan.SourceQueryTable, cols, pkCol, limit, 0)
if strings.TrimSpace(query) == "" {
return TableDiffPreview{}, fmt.Errorf("当前数据源不支持分页预览")
}
sourceRows, _, err := sourceDB.Query(query)
if err != nil {
return TableDiffPreview{}, fmt.Errorf("读取源表失败: %w", err)
}
if !counted {
sourceCount = len(sourceRows)
}
out.TotalInserts = sourceCount
for _, row := range sourceRows {
if len(out.Inserts) >= limit {
break
}
pkVal := strings.TrimSpace(fmt.Sprintf("%v", row[pkCol]))
if pkVal == "" || pkVal == "<nil>" {
continue
}
out.Inserts = append(out.Inserts, PreviewRow{PK: pkVal, Row: row})
}
return out, nil
}
handled, _, err := scanTableDiffInPages(sourceDB, targetDB, sourceType, targetType, plan, cols, nil, pkCol, targetColSet, true, func(page pagedDiffPage) error {
out.TotalInserts += len(page.Inserts)
out.TotalUpdates += len(page.Updates)
out.TotalDeletes += len(page.Deletes)
for _, row := range page.Inserts {
if len(out.Inserts) >= limit {
break
}
pkVal := strings.TrimSpace(fmt.Sprintf("%v", row[pkCol]))
if pkVal == "" || pkVal == "<nil>" {
continue
}
out.Inserts = append(out.Inserts, PreviewRow{PK: pkVal, Row: row})
}
for _, update := range page.Updates {
if len(out.Updates) >= limit {
break
}
pkVal := strings.TrimSpace(fmt.Sprintf("%v", update.UpdateRow.Keys[pkCol]))
if pkVal == "" || pkVal == "<nil>" {
continue
}
out.Updates = append(out.Updates, PreviewUpdateRow{
PK: pkVal,
ChangedColumns: append([]string(nil), update.ChangedColumns...),
Source: update.Source,
Target: update.Target,
})
}
for _, row := range page.Deletes {
if len(out.Deletes) >= limit {
break
}
pkVal := strings.TrimSpace(fmt.Sprintf("%v", row[pkCol]))
if pkVal == "" || pkVal == "<nil>" {
continue
}
out.Deletes = append(out.Deletes, PreviewRow{PK: pkVal, Row: row})
}
return nil
})
if handled {
if err != nil {
return TableDiffPreview{}, err
}
return out, nil
}
sourceRows, _, err := sourceDB.Query(fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(sourceType, plan.SourceQueryTable)))
if err != nil {
return TableDiffPreview{}, fmt.Errorf("读取源表失败: %w", err)
}
targetRows := make([]map[string]interface{}, 0)
if plan.TargetTableExists {
targetRows, _, err = targetDB.Query(fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(targetType, plan.TargetQueryTable)))
if err != nil {
return TableDiffPreview{}, fmt.Errorf("读取目标表失败: %w", err)
}
}
targetMap := make(map[string]map[string]interface{}, len(targetRows))
for _, row := range targetRows {
if row[pkCol] == nil {
continue
}
pkVal := strings.TrimSpace(fmt.Sprintf("%v", row[pkCol]))
if pkVal == "" || pkVal == "<nil>" {
continue
}
targetMap[pkVal] = row
}
sourcePKSet := make(map[string]struct{}, len(sourceRows))
for _, sRow := range sourceRows {
if sRow[pkCol] == nil {
continue
}
pkVal := strings.TrimSpace(fmt.Sprintf("%v", sRow[pkCol]))
if pkVal == "" || pkVal == "<nil>" {
continue
}
sourcePKSet[pkVal] = struct{}{}
if tRow, exists := targetMap[pkVal]; exists {
changedColumns := make([]string, 0)
for k, v := range sRow {
if fmt.Sprintf("%v", v) != fmt.Sprintf("%v", tRow[k]) {
changedColumns = append(changedColumns, k)
}
}
if len(changedColumns) > 0 {
out.TotalUpdates++
if len(out.Updates) < limit {
out.Updates = append(out.Updates, PreviewUpdateRow{PK: pkVal, ChangedColumns: changedColumns, Source: sRow, Target: tRow})
}
}
continue
}
out.TotalInserts++
if len(out.Inserts) < limit {
out.Inserts = append(out.Inserts, PreviewRow{PK: pkVal, Row: sRow})
}
}
for pkVal, row := range targetMap {
if _, ok := sourcePKSet[pkVal]; ok {
continue
}
out.TotalDeletes++
if len(out.Deletes) < limit {
out.Deletes = append(out.Deletes, PreviewRow{PK: pkVal, Row: row})
}
}
return out, nil
}