Files
MyGoNavi/internal/sync/sync_engine.go
杨国锋 e3bf160072 feat(sync): 数据同步支持差异对比、行级选择与实时进度日志
- 新增差异分析/预览接口与前端预览抽屉(插入/更新/删除)
  - 支持按表勾选插入/更新/删除(删除默认不勾选)
  - 支持按主键选择行级同步;无主键/复合主键表跳过并提示
  - 同步过程实时输出中文日志与进度条,便于定位失败步骤
2026-02-03 17:37:41 +08:00

557 lines
21 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/db"
"GoNavi-Wails/internal/logger"
"fmt"
"sort"
"strings"
"time"
)
// SyncConfig defines the parameters for a synchronization task
type SyncConfig struct {
SourceConfig connection.ConnectionConfig `json:"sourceConfig"`
TargetConfig connection.ConnectionConfig `json:"targetConfig"`
Tables []string `json:"tables"` // Tables to sync
Content string `json:"content,omitempty"` // "data", "schema", "both"
Mode string `json:"mode"` // "insert_update", "insert_only", "full_overwrite"
JobID string `json:"jobId,omitempty"`
AutoAddColumns bool `json:"autoAddColumns,omitempty"` // 自动补齐缺失字段(当前仅 MySQL 目标支持)
TableOptions map[string]TableOptions `json:"tableOptions,omitempty"`
}
// SyncResult holds the result of the sync operation
type SyncResult struct {
Success bool `json:"success"`
Message string `json:"message"`
Logs []string `json:"logs"`
TablesSynced int `json:"tablesSynced"`
RowsInserted int `json:"rowsInserted"`
RowsUpdated int `json:"rowsUpdated"`
RowsDeleted int `json:"rowsDeleted"`
}
type SyncEngine struct {
reporter Reporter
}
func NewSyncEngine(reporter Reporter) *SyncEngine {
return &SyncEngine{reporter: reporter}
}
// CompareAndSync performs the synchronization
func (s *SyncEngine) RunSync(config SyncConfig) SyncResult {
result := SyncResult{Success: true, Logs: []string{}}
logger.Infof("开始数据同步:源=%s 目标=%s 表数量=%d", formatConnSummaryForSync(config.SourceConfig), formatConnSummaryForSync(config.TargetConfig), len(config.Tables))
totalTables := len(config.Tables)
s.progress(config.JobID, 0, totalTables, "", "开始同步")
contentRaw := strings.ToLower(strings.TrimSpace(config.Content))
syncSchema := false
syncData := true
switch contentRaw {
case "", "data":
syncData = true
case "schema":
syncSchema = true
syncData = false
case "both":
syncSchema = true
syncData = true
default:
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf("未知同步内容 %q已自动使用仅同步数据", config.Content))
syncData = true
}
modeRaw := strings.ToLower(strings.TrimSpace(config.Mode))
if modeRaw != "" && modeRaw != "insert_update" && modeRaw != "insert_only" && modeRaw != "full_overwrite" {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf("未知同步模式 %q已自动使用 insert_update", config.Mode))
}
defaultMode := normalizeSyncMode(config.Mode)
contentLabel := "仅同步数据"
if syncSchema && syncData {
contentLabel = "同步结构+数据"
} else if syncSchema {
contentLabel = "仅同步结构"
}
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("同步内容:%s模式%s自动补字段%v", contentLabel, defaultMode, config.AutoAddColumns))
sourceDB, err := db.NewDatabase(config.SourceConfig.Type)
if err != nil {
logger.Error(err, "初始化源数据库驱动失败:类型=%s", config.SourceConfig.Type)
return s.fail(config.JobID, totalTables, result, "初始化源数据库驱动失败: "+err.Error())
}
if config.SourceConfig.Type == "custom" {
// Custom DB setup would go here if needed
}
targetDB, err := db.NewDatabase(config.TargetConfig.Type)
if err != nil {
logger.Error(err, "初始化目标数据库驱动失败:类型=%s", config.TargetConfig.Type)
return s.fail(config.JobID, totalTables, result, "初始化目标数据库驱动失败: "+err.Error())
}
// Connect Source
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("正在连接源数据库: %s...", config.SourceConfig.Host))
s.progress(config.JobID, 0, totalTables, "", "连接源数据库")
if err := sourceDB.Connect(config.SourceConfig); err != nil {
logger.Error(err, "源数据库连接失败:%s", formatConnSummaryForSync(config.SourceConfig))
return s.fail(config.JobID, totalTables, result, "源数据库连接失败: "+err.Error())
}
defer sourceDB.Close()
// Connect Target
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("正在连接目标数据库: %s...", config.TargetConfig.Host))
s.progress(config.JobID, 0, totalTables, "", "连接目标数据库")
if err := targetDB.Connect(config.TargetConfig); err != nil {
logger.Error(err, "目标数据库连接失败:%s", formatConnSummaryForSync(config.TargetConfig))
return s.fail(config.JobID, totalTables, result, "目标数据库连接失败: "+err.Error())
}
defer targetDB.Close()
// Iterate Tables
for i, tableName := range config.Tables {
func() {
tableMode := defaultMode
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("正在同步表: %s", tableName))
s.progress(config.JobID, i, totalTables, tableName, fmt.Sprintf("同步表(%d/%d)", i+1, totalTables))
defer s.progress(config.JobID, i+1, totalTables, tableName, "表处理完成")
if syncSchema {
s.progress(config.JobID, i, totalTables, tableName, "同步表结构")
if err := s.syncTableSchema(config, &result, sourceDB, targetDB, tableName); err != nil {
s.appendLog(config.JobID, &result, "error", fmt.Sprintf("表结构同步失败:表=%s 错误=%v", tableName, err))
return
}
}
if !syncData {
result.TablesSynced++
return
}
sourceSchema, sourceTable := normalizeSchemaAndTable(config.SourceConfig.Type, config.SourceConfig.Database, tableName)
targetSchema, targetTable := normalizeSchemaAndTable(config.TargetConfig.Type, config.TargetConfig.Database, tableName)
sourceQueryTable := qualifiedNameForQuery(config.SourceConfig.Type, sourceSchema, sourceTable, tableName)
targetQueryTable := qualifiedNameForQuery(config.TargetConfig.Type, targetSchema, targetTable, tableName)
// 1. Get Columns & PKs
cols, err := sourceDB.GetColumns(sourceSchema, sourceTable)
if err != nil {
logger.Error(err, "获取源表列信息失败:表=%s", tableName)
s.appendLog(config.JobID, &result, "error", fmt.Sprintf("获取表 %s 的列信息失败: %v", tableName, err))
return
}
sourceColsByLower := make(map[string]connection.ColumnDefinition, len(cols))
for _, col := range cols {
if strings.TrimSpace(col.Name) == "" {
continue
}
sourceColsByLower[strings.ToLower(strings.TrimSpace(col.Name))] = col
}
pkCols := make([]string, 0, 2)
for _, col := range cols {
if col.Key == "PRI" || col.Key == "PK" {
pkCols = append(pkCols, col.Name)
}
}
if len(pkCols) == 0 {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf("表 %s 未找到主键,已跳过数据同步(避免产生重复数据)", tableName))
return
}
if len(pkCols) > 1 {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf("表 %s 为复合主键(%s当前暂不支持数据同步", tableName, strings.Join(pkCols, ",")))
return
}
pkCol := pkCols[0]
opts := TableOptions{Insert: true, Update: true, Delete: false}
if config.TableOptions != nil {
if t, ok := config.TableOptions[tableName]; ok {
opts = t
// 默认防护:如用户未设置任意一个字段,保持 insert/update 默认 true、delete 默认 false
if !t.Insert && !t.Update && !t.Delete {
opts = t
}
}
}
if !opts.Insert && !opts.Update && !opts.Delete {
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("表 %s 未勾选任何操作,已跳过", tableName))
return
}
// 2. Fetch Data (MEMORY INTENSIVE - PROTOTYPE ONLY)
// TODO: Implement paging/streaming
s.progress(config.JobID, i, totalTables, tableName, "读取源表数据")
sourceRows, _, err := sourceDB.Query(fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(config.SourceConfig.Type, sourceQueryTable)))
if err != nil {
logger.Error(err, "读取源表失败:表=%s", tableName)
s.appendLog(config.JobID, &result, "error", fmt.Sprintf("读取源表 %s 失败: %v", tableName, err))
return
}
var inserts []map[string]interface{}
var updates []connection.UpdateRow
if tableMode == "insert_update" {
s.progress(config.JobID, i, totalTables, tableName, "读取目标表数据")
targetRows, _, err := targetDB.Query(fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(config.TargetConfig.Type, targetQueryTable)))
if err != nil {
logger.Error(err, "读取目标表失败:表=%s", tableName)
s.appendLog(config.JobID, &result, "error", fmt.Sprintf("读取目标表 %s 失败: %v", tableName, err))
return
}
// 3. Compare (In-Memory Hash Map)
s.progress(config.JobID, i, totalTables, tableName, "对比差异")
targetMap := make(map[string]map[string]interface{})
for _, row := range targetRows {
if row[pkCol] == nil {
continue
}
pkVal := fmt.Sprintf("%v", row[pkCol])
if strings.TrimSpace(pkVal) == "" || pkVal == "<nil>" {
continue
}
targetMap[pkVal] = row
}
sourcePKSet := make(map[string]struct{}, len(sourceRows))
for _, sRow := range sourceRows {
if sRow[pkCol] == nil {
continue
}
pkVal := fmt.Sprintf("%v", sRow[pkCol])
if strings.TrimSpace(pkVal) == "" || pkVal == "<nil>" {
continue
}
sourcePKSet[pkVal] = struct{}{}
if tRow, exists := targetMap[pkVal]; exists {
changes := make(map[string]interface{})
for k, v := range sRow {
if fmt.Sprintf("%v", v) != fmt.Sprintf("%v", tRow[k]) {
changes[k] = v
}
}
if len(changes) > 0 {
updates = append(updates, connection.UpdateRow{
Keys: map[string]interface{}{pkCol: sRow[pkCol]},
Values: changes,
})
}
} else {
inserts = append(inserts, sRow)
}
}
var deletes []map[string]interface{}
if opts.Delete {
for pkStr, row := range targetMap {
if _, ok := sourcePKSet[pkStr]; ok {
continue
}
deletes = append(deletes, map[string]interface{}{pkCol: row[pkCol]})
}
}
// apply operation selection
inserts = filterRowsByPKSelection(pkCol, inserts, opts.Insert, opts.SelectedInsertPKs)
updates = filterUpdatesByPKSelection(pkCol, updates, opts.Update, opts.SelectedUpdatePKs)
deletes = filterRowsByPKSelection(pkCol, deletes, opts.Delete, opts.SelectedDeletePKs)
changeSet := connection.ChangeSet{
Inserts: inserts,
Updates: updates,
Deletes: deletes,
}
// 4. Align schema (target missing columns)
s.progress(config.JobID, i, totalTables, tableName, "检查字段一致性")
requiredCols := collectRequiredColumns(changeSet.Inserts, changeSet.Updates)
targetCols, err := targetDB.GetColumns(targetSchema, targetTable)
if err != nil {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 获取目标表字段失败,已跳过字段一致性检查: %v", err))
} else {
targetColSet := make(map[string]struct{}, len(targetCols))
for _, c := range targetCols {
name := strings.ToLower(strings.TrimSpace(c.Name))
if name == "" {
continue
}
targetColSet[name] = struct{}{}
}
missing := make([]string, 0)
for lower, original := range requiredCols {
if _, ok := targetColSet[lower]; !ok {
missing = append(missing, original)
}
}
sort.Strings(missing)
if len(missing) > 0 {
if config.AutoAddColumns && strings.ToLower(strings.TrimSpace(config.TargetConfig.Type)) == "mysql" {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 目标表缺少字段 %d 个,开始自动补齐: %s", len(missing), strings.Join(missing, ", ")))
added := 0
for _, colName := range missing {
colLower := strings.ToLower(strings.TrimSpace(colName))
colType := "TEXT"
if strings.ToLower(strings.TrimSpace(config.SourceConfig.Type)) == "mysql" {
if srcCol, ok := sourceColsByLower[colLower]; ok {
colType = sanitizeMySQLColumnType(srcCol.Type)
}
}
alterSQL := fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s NULL",
quoteQualifiedIdentByType("mysql", targetQueryTable),
quoteIdentByType("mysql", colName),
colType,
)
if _, err := targetDB.Exec(alterSQL); err != nil {
s.appendLog(config.JobID, &result, "error", fmt.Sprintf(" -> 自动补字段失败:字段=%s 错误=%v", colName, err))
continue
}
added++
}
s.appendLog(config.JobID, &result, "info", fmt.Sprintf(" -> 自动补字段完成:成功=%d 失败=%d", added, len(missing)-added))
// refresh columns
targetCols, err = targetDB.GetColumns(targetSchema, targetTable)
if err == nil {
targetColSet = make(map[string]struct{}, len(targetCols))
for _, c := range targetCols {
name := strings.ToLower(strings.TrimSpace(c.Name))
if name == "" {
continue
}
targetColSet[name] = struct{}{}
}
}
} else {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 目标表缺少字段 %d 个(未开启自动补齐),将自动忽略:%s", len(missing), strings.Join(missing, ", ")))
}
// filter out still-missing columns to avoid apply failure
changeSet.Inserts = filterInsertRows(changeSet.Inserts, targetColSet)
changeSet.Updates = filterUpdateRows(changeSet.Updates, targetColSet)
}
}
// 5. Apply Changes
s.progress(config.JobID, i, totalTables, tableName, "应用变更")
if len(changeSet.Inserts) > 0 || len(changeSet.Updates) > 0 || len(changeSet.Deletes) > 0 {
s.appendLog(config.JobID, &result, "info", fmt.Sprintf(" -> 需插入: %d 行, 需更新: %d 行, 需删除: %d 行", len(changeSet.Inserts), len(changeSet.Updates), len(changeSet.Deletes)))
if applier, ok := targetDB.(db.BatchApplier); ok {
if err := applier.ApplyChanges(targetTable, changeSet); err != nil {
s.appendLog(config.JobID, &result, "error", fmt.Sprintf(" -> 应用变更失败: %v", err))
} else {
result.RowsInserted += len(changeSet.Inserts)
result.RowsUpdated += len(changeSet.Updates)
result.RowsDeleted += len(changeSet.Deletes)
}
} else {
s.appendLog(config.JobID, &result, "warn", " -> 目标驱动不支持应用数据变更 (ApplyChanges).")
}
} else {
s.appendLog(config.JobID, &result, "info", " -> 数据一致,无需变更.")
}
result.TablesSynced++
return
} else {
// insert_only / full_overwrite: do not compare target, just insert source rows
inserts = sourceRows
}
// full_overwrite: clear target table first
if tableMode == "full_overwrite" {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 全量覆盖模式:即将清空目标表 %s", tableName))
s.progress(config.JobID, i, totalTables, tableName, "清空目标表")
clearSQL := ""
if strings.ToLower(strings.TrimSpace(config.TargetConfig.Type)) == "mysql" {
clearSQL = fmt.Sprintf("TRUNCATE TABLE %s", quoteQualifiedIdentByType(config.TargetConfig.Type, targetQueryTable))
} else {
clearSQL = fmt.Sprintf("DELETE FROM %s", quoteQualifiedIdentByType(config.TargetConfig.Type, targetQueryTable))
}
if _, err := targetDB.Exec(clearSQL); err != nil {
s.appendLog(config.JobID, &result, "error", fmt.Sprintf(" -> 清空目标表失败: %v", err))
return
}
}
// 4. Align schema (target missing columns)
s.progress(config.JobID, i, totalTables, tableName, "检查字段一致性")
requiredCols := collectRequiredColumns(inserts, updates)
targetCols, err := targetDB.GetColumns(targetSchema, targetTable)
if err != nil {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 获取目标表字段失败,已跳过字段一致性检查: %v", err))
} else {
targetColSet := make(map[string]struct{}, len(targetCols))
for _, c := range targetCols {
name := strings.ToLower(strings.TrimSpace(c.Name))
if name == "" {
continue
}
targetColSet[name] = struct{}{}
}
missing := make([]string, 0)
for lower, original := range requiredCols {
if _, ok := targetColSet[lower]; !ok {
missing = append(missing, original)
}
}
sort.Strings(missing)
if len(missing) > 0 {
if config.AutoAddColumns && strings.ToLower(strings.TrimSpace(config.TargetConfig.Type)) == "mysql" {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 目标表缺少字段 %d 个,开始自动补齐: %s", len(missing), strings.Join(missing, ", ")))
added := 0
for _, colName := range missing {
colLower := strings.ToLower(strings.TrimSpace(colName))
colType := "TEXT"
if strings.ToLower(strings.TrimSpace(config.SourceConfig.Type)) == "mysql" {
if srcCol, ok := sourceColsByLower[colLower]; ok {
colType = sanitizeMySQLColumnType(srcCol.Type)
}
}
alterSQL := fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s NULL",
quoteQualifiedIdentByType("mysql", targetQueryTable),
quoteIdentByType("mysql", colName),
colType,
)
if _, err := targetDB.Exec(alterSQL); err != nil {
s.appendLog(config.JobID, &result, "error", fmt.Sprintf(" -> 自动补字段失败:字段=%s 错误=%v", colName, err))
continue
}
added++
}
s.appendLog(config.JobID, &result, "info", fmt.Sprintf(" -> 自动补字段完成:成功=%d 失败=%d", added, len(missing)-added))
// refresh columns
targetCols, err = targetDB.GetColumns(targetSchema, targetTable)
if err == nil {
targetColSet = make(map[string]struct{}, len(targetCols))
for _, c := range targetCols {
name := strings.ToLower(strings.TrimSpace(c.Name))
if name == "" {
continue
}
targetColSet[name] = struct{}{}
}
}
} else {
s.appendLog(config.JobID, &result, "warn", fmt.Sprintf(" -> 目标表缺少字段 %d 个(未开启自动补齐),将自动忽略:%s", len(missing), strings.Join(missing, ", ")))
}
// filter out still-missing columns to avoid apply failure
inserts = filterInsertRows(inserts, targetColSet)
updates = filterUpdateRows(updates, targetColSet)
}
}
// 5. Apply Changes
s.progress(config.JobID, i, totalTables, tableName, "应用变更")
changeSet := connection.ChangeSet{
Inserts: inserts,
Updates: updates,
}
if len(changeSet.Inserts) > 0 || len(changeSet.Updates) > 0 {
s.appendLog(config.JobID, &result, "info", fmt.Sprintf(" -> 需插入: %d 行, 需更新: %d 行", len(changeSet.Inserts), len(changeSet.Updates)))
if applier, ok := targetDB.(db.BatchApplier); ok {
if err := applier.ApplyChanges(targetTable, changeSet); err != nil {
s.appendLog(config.JobID, &result, "error", fmt.Sprintf(" -> 应用变更失败: %v", err))
} else {
result.RowsInserted += len(changeSet.Inserts)
result.RowsUpdated += len(changeSet.Updates)
}
} else {
s.appendLog(config.JobID, &result, "warn", " -> 目标驱动不支持应用数据变更 (ApplyChanges).")
}
} else {
s.appendLog(config.JobID, &result, "info", " -> 数据一致,无需变更.")
}
result.TablesSynced++
}()
}
s.progress(config.JobID, totalTables, totalTables, "", "同步完成")
return result
}
func formatConnSummaryForSync(config connection.ConnectionConfig) string {
timeoutSeconds := config.Timeout
if timeoutSeconds <= 0 {
timeoutSeconds = 30
}
dbName := strings.TrimSpace(config.Database)
if dbName == "" {
dbName = "(default)"
}
return fmt.Sprintf("类型=%s 地址=%s:%d 数据库=%s 用户=%s 超时=%ds",
config.Type, config.Host, config.Port, dbName, config.User, timeoutSeconds)
}
func (s *SyncEngine) appendLog(jobID string, res *SyncResult, level string, msg string) {
if res != nil {
res.Logs = append(res.Logs, msg)
}
if s.reporter.OnLog != nil && strings.TrimSpace(jobID) != "" {
s.reporter.OnLog(SyncLogEvent{
JobID: jobID,
Level: level,
Message: msg,
Ts: time.Now().UnixMilli(),
})
}
}
func (s *SyncEngine) progress(jobID string, current, total int, table string, stage string) {
if s.reporter.OnProgress == nil || strings.TrimSpace(jobID) == "" {
return
}
percent := 0
if total <= 0 {
if current > 0 {
percent = 100
}
} else {
if current < 0 {
current = 0
}
if current > total {
current = total
}
percent = (current * 100) / total
}
s.reporter.OnProgress(SyncProgressEvent{
JobID: jobID,
Percent: percent,
Current: current,
Total: total,
Table: table,
Stage: stage,
})
}
func (s *SyncEngine) fail(jobID string, totalTables int, res SyncResult, msg string) SyncResult {
res.Success = false
res.Message = msg
s.appendLog(jobID, &res, "error", "致命错误: "+msg)
s.progress(jobID, res.TablesSynced, totalTables, "", "同步失败")
return res
}