️ perf(sync): 优化大表同步分页与批量写入

- 同步分析和预览改为分页扫描差异,避免一次性加载源表和目标表

- 直接导入与源查询同步支持分页读取和分批提交,降低低内存机器 OOM 风险

- 各数据库 ApplyChanges 统一使用参数化批量 INSERT,减少大表同步 SQL 超时

- MySQL 批量写入按行数和参数数量拆分,兼容超宽表场景

- 补充批量插入、分页差异和源查询同步回归测试
This commit is contained in:
Syngnat
2026-05-26 08:27:15 +08:00
parent aa2177d35a
commit 5ab50db51c
27 changed files with 2846 additions and 319 deletions

View File

@@ -68,7 +68,7 @@ func resolveSinglePKColumn(cols []connection.ColumnDefinition) (string, error) {
return pkCols[0], nil
}
func loadSourceQuerySyncContext(config SyncConfig, sourceDB db.Database, targetDB db.Database, needTargetRows bool, requirePK bool) (sourceQuerySyncContext, error) {
func loadSourceQuerySyncContext(config SyncConfig, sourceDB db.Database, targetDB db.Database, needSourceRows bool, needTargetRows bool, requirePK bool) (sourceQuerySyncContext, error) {
tableName, err := validateSourceQuerySyncConfig(config)
if err != nil {
return sourceQuerySyncContext{}, err
@@ -83,11 +83,6 @@ func loadSourceQuerySyncContext(config SyncConfig, sourceDB db.Database, targetD
return sourceQuerySyncContext{}, fmt.Errorf("目标表 %s 不存在或未读取到字段定义", tableName)
}
sourceRows, _, err := sourceDB.Query(strings.TrimSpace(config.SourceQuery))
if err != nil {
return sourceQuerySyncContext{}, fmt.Errorf("执行源查询失败: %w", err)
}
ctx := sourceQuerySyncContext{
TableName: tableName,
TargetSchema: targetSchema,
@@ -95,10 +90,18 @@ func loadSourceQuerySyncContext(config SyncConfig, sourceDB db.Database, targetD
TargetQueryTable: targetQueryTable,
TargetType: targetType,
TargetCols: targetCols,
SourceRows: sourceRows,
SourceRows: make([]map[string]interface{}, 0),
TargetRows: make([]map[string]interface{}, 0),
}
if needSourceRows {
sourceRows, _, err := sourceDB.Query(strings.TrimSpace(config.SourceQuery))
if err != nil {
return sourceQuerySyncContext{}, fmt.Errorf("执行源查询失败: %w", err)
}
ctx.SourceRows = sourceRows
}
if requirePK {
pkColumn, err := resolveSinglePKColumn(targetCols)
if err != nil {
@@ -226,7 +229,40 @@ func (s *SyncEngine) analyzeSourceQuery(config SyncConfig) SyncAnalyzeResult {
Table: tableName,
CanSync: false,
}
ctx, err := loadSourceQuerySyncContext(config, sourceDB, targetDB, true, true)
ctx, err := loadSourceQuerySyncContext(config, sourceDB, targetDB, false, false, true)
if err != nil {
summary.Message = err.Error()
result.Tables = append(result.Tables, summary)
result.Message = "已完成 1 个目标表的差异分析"
s.progress(config.JobID, totalTables, totalTables, tableName, "差异分析完成")
return result
}
sourceType := resolveMigrationDBType(config.SourceConfig)
handled, counts, scanErr := scanSourceQueryDiffInPages(sourceDB, targetDB, sourceType, ctx.TargetType, strings.TrimSpace(config.SourceQuery), ctx.TargetQueryTable, ctx.TargetCols, ctx.PKColumn, true, nil)
if handled {
if scanErr != nil {
summary.Message = scanErr.Error()
result.Tables = append(result.Tables, summary)
result.Message = "已完成 1 个目标表的差异分析"
s.progress(config.JobID, totalTables, totalTables, tableName, "差异分析完成")
return result
}
summary.CanSync = true
summary.PKColumn = ctx.PKColumn
summary.Inserts = counts.Inserts
summary.Updates = counts.Updates
summary.Deletes = counts.Deletes
summary.Same = counts.Same
summary.TargetTableExists = true
summary.Message = "SQL 结果集差异分析完成"
result.Tables = append(result.Tables, summary)
result.Message = "已完成 1 个目标表的差异分析"
s.progress(config.JobID, totalTables, totalTables, tableName, "差异分析完成")
return result
}
ctx, err = loadSourceQuerySyncContext(config, sourceDB, targetDB, true, true, true)
if err != nil {
summary.Message = err.Error()
result.Tables = append(result.Tables, summary)
@@ -270,13 +306,83 @@ func (s *SyncEngine) previewSourceQuery(config SyncConfig, limit int) (TableDiff
}
defer targetDB.Close()
ctx, err := loadSourceQuerySyncContext(config, sourceDB, targetDB, true, true)
ctx, err := loadSourceQuerySyncContext(config, sourceDB, targetDB, false, false, true)
if err != nil {
return TableDiffPreview{}, err
}
sourceType := resolveMigrationDBType(config.SourceConfig)
out := TableDiffPreview{
Table: ctx.TableName,
PKColumn: ctx.PKColumn,
ColumnTypes: make(map[string]string, len(ctx.TargetCols)),
SchemaSummary: "SQL 结果集同步预览",
Inserts: make([]PreviewRow, 0, limit),
Updates: make([]PreviewUpdateRow, 0, limit),
Deletes: make([]PreviewRow, 0, limit),
}
for _, col := range ctx.TargetCols {
name := strings.ToLower(strings.TrimSpace(col.Name))
typ := strings.TrimSpace(col.Type)
if name == "" || typ == "" {
continue
}
out.ColumnTypes[name] = typ
}
handled, _, scanErr := scanSourceQueryDiffInPages(sourceDB, targetDB, sourceType, ctx.TargetType, strings.TrimSpace(config.SourceQuery), ctx.TargetQueryTable, ctx.TargetCols, ctx.PKColumn, true, func(page pagedDiffPage) error {
out.TotalInserts += len(page.Inserts)
out.TotalUpdates += len(page.Updates)
out.TotalDeletes += len(page.Deletes)
for _, row := range page.Inserts {
if len(out.Inserts) >= limit {
break
}
pk := strings.TrimSpace(fmt.Sprintf("%v", row[ctx.PKColumn]))
if pk != "" && pk != "<nil>" {
out.Inserts = append(out.Inserts, PreviewRow{PK: pk, Row: row})
}
}
for _, update := range page.Updates {
if len(out.Updates) >= limit {
break
}
pk := strings.TrimSpace(fmt.Sprintf("%v", update.UpdateRow.Keys[ctx.PKColumn]))
if pk == "" || pk == "<nil>" {
continue
}
out.Updates = append(out.Updates, PreviewUpdateRow{
PK: pk,
ChangedColumns: append([]string(nil), update.ChangedColumns...),
Source: update.Source,
Target: update.Target,
})
}
for _, row := range page.Deletes {
if len(out.Deletes) >= limit {
break
}
pk := strings.TrimSpace(fmt.Sprintf("%v", row[ctx.PKColumn]))
if pk != "" && pk != "<nil>" {
out.Deletes = append(out.Deletes, PreviewRow{PK: pk, Row: row})
}
}
return nil
})
if handled {
if scanErr != nil {
return TableDiffPreview{}, scanErr
}
return out, nil
}
ctx, err = loadSourceQuerySyncContext(config, sourceDB, targetDB, true, true, true)
if err != nil {
return TableDiffPreview{}, err
}
inserts, updates, deletes, _ := diffRowsByPK(ctx.PKColumn, ctx.SourceRows, ctx.TargetRows)
out := TableDiffPreview{
out = TableDiffPreview{
Table: ctx.TableName,
PKColumn: ctx.PKColumn,
ColumnTypes: make(map[string]string, len(ctx.TargetCols)),
@@ -389,7 +495,7 @@ func (s *SyncEngine) runSourceQuerySync(config SyncConfig) SyncResult {
needTargetRows := tableMode == "insert_update"
requirePK := tableMode == "insert_update"
ctx, err := loadSourceQuerySyncContext(config, sourceDB, targetDB, needTargetRows, requirePK)
ctx, err := loadSourceQuerySyncContext(config, sourceDB, targetDB, false, false, requirePK)
if err != nil {
return s.fail(config.JobID, totalTables, result, err.Error())
}
@@ -397,6 +503,33 @@ func (s *SyncEngine) runSourceQuerySync(config SyncConfig) SyncResult {
inserts := make([]map[string]interface{}, 0)
updates := make([]connection.UpdateRow, 0)
deletes := make([]map[string]interface{}, 0)
applyTableName := ctx.TargetTable
switch ctx.TargetType {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver":
applyTableName = ctx.TargetQueryTable
}
if handled, counts, err := s.tryApplySourceQueryInPages(config, &result, tableName, sourceDB, targetDB, ctx, opts, tableMode, applyTableName); handled {
if err != nil {
return s.fail(config.JobID, totalTables, result, "分页同步 SQL 结果集失败: "+err.Error())
}
result.TablesSynced++
result.RowsInserted += counts.Inserts
result.RowsUpdated += counts.Updates
result.RowsDeleted += counts.Deletes
if counts.Inserts == 0 && counts.Updates == 0 && counts.Deletes == 0 {
s.appendLog(config.JobID, &result, "info", "SQL 结果集与目标表一致,无需应用变更")
} else {
s.appendLog(config.JobID, &result, "info", fmt.Sprintf("SQL 结果集分页同步完成:插入=%d 更新=%d 删除=%d", counts.Inserts, counts.Updates, counts.Deletes))
}
s.progress(config.JobID, totalTables, totalTables, tableName, "同步完成")
return result
}
ctx, err = loadSourceQuerySyncContext(config, sourceDB, targetDB, true, needTargetRows, requirePK)
if err != nil {
return s.fail(config.JobID, totalTables, result, err.Error())
}
if tableMode == "insert_update" {
inserts, updates, deletes, _ = diffRowsByPK(ctx.PKColumn, ctx.SourceRows, ctx.TargetRows)
inserts = filterRowsByPKSelection(ctx.PKColumn, inserts, opts.Insert, opts.SelectedInsertPKs)
@@ -431,16 +564,11 @@ func (s *SyncEngine) runSourceQuerySync(config SyncConfig) SyncResult {
return result
}
applyTableName := ctx.TargetTable
switch ctx.TargetType {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver":
applyTableName = ctx.TargetQueryTable
}
applier, ok := targetDB.(db.BatchApplier)
if !ok {
return s.fail(config.JobID, totalTables, result, "目标驱动不支持应用数据变更 (ApplyChanges)")
}
if err := applier.ApplyChanges(applyTableName, changeSet); err != nil {
if err := s.applyChangesInBatches(config.JobID, &result, applyTableName, applier, changeSet); err != nil {
return s.fail(config.JobID, totalTables, result, "应用 SQL 结果集变更失败: "+err.Error())
}