mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-05-31 12:49:36 +08:00
- 同步分析和预览改为分页扫描差异,避免一次性加载源表和目标表 - 直接导入与源查询同步支持分页读取和分批提交,降低低内存机器 OOM 风险 - 各数据库 ApplyChanges 统一使用参数化批量 INSERT,减少大表同步 SQL 超时 - MySQL 批量写入按行数和参数数量拆分,兼容超宽表场景 - 补充批量插入、分页差异和源查询同步回归测试
492 lines
16 KiB
Go
492 lines
16 KiB
Go
package sync
|
|
|
|
import (
|
|
"GoNavi-Wails/internal/connection"
|
|
"GoNavi-Wails/internal/db"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
type fakeQuerySyncTargetDB struct {
|
|
fakeMigrationDB
|
|
appliedTable string
|
|
appliedChanges connection.ChangeSet
|
|
appliedBatches []connection.ChangeSet
|
|
}
|
|
|
|
func (f *fakeQuerySyncTargetDB) ApplyChanges(tableName string, changes connection.ChangeSet) error {
|
|
f.appliedTable = tableName
|
|
f.appliedChanges.Inserts = append(f.appliedChanges.Inserts, changes.Inserts...)
|
|
f.appliedChanges.Updates = append(f.appliedChanges.Updates, changes.Updates...)
|
|
f.appliedChanges.Deletes = append(f.appliedChanges.Deletes, changes.Deletes...)
|
|
f.appliedBatches = append(f.appliedBatches, changes)
|
|
return nil
|
|
}
|
|
|
|
var _ db.BatchApplier = (*fakeQuerySyncTargetDB)(nil)
|
|
|
|
func TestAnalyze_SourceQueryUsesQueryResultAsSourceDataset(t *testing.T) {
|
|
sourceDB := &fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.users": {
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
},
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": {
|
|
{"id": 1, "name": "Alice New"},
|
|
{"id": 2, "name": "Bob"},
|
|
},
|
|
"SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": {
|
|
{"id": 1},
|
|
},
|
|
},
|
|
}
|
|
targetDB := &fakeQuerySyncTargetDB{
|
|
fakeMigrationDB: fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.users": {
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
},
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": {
|
|
{"id": 1, "name": "Alice Old"},
|
|
},
|
|
"SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": {
|
|
{"id": 1},
|
|
{"id": 3, "name": "Carol"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
oldFactory := newSyncDatabase
|
|
defer func() { newSyncDatabase = oldFactory }()
|
|
callCount := 0
|
|
newSyncDatabase = func(dbType string) (db.Database, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return sourceDB, nil
|
|
}
|
|
return targetDB, nil
|
|
}
|
|
|
|
engine := NewSyncEngine(Reporter{})
|
|
result := engine.Analyze(SyncConfig{
|
|
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
Tables: []string{"users"},
|
|
Mode: "insert_update",
|
|
SourceQuery: "SELECT id, name FROM active_users",
|
|
})
|
|
|
|
if !result.Success {
|
|
t.Fatalf("Analyze 返回失败: %+v", result)
|
|
}
|
|
if len(result.Tables) != 1 {
|
|
t.Fatalf("expected one table summary, got %d", len(result.Tables))
|
|
}
|
|
|
|
summary := result.Tables[0]
|
|
if summary.PKColumn != "id" {
|
|
t.Fatalf("expected PKColumn=id, got %q", summary.PKColumn)
|
|
}
|
|
if !summary.CanSync {
|
|
t.Fatalf("expected summary can sync, got %+v", summary)
|
|
}
|
|
if summary.Inserts != 1 || summary.Updates != 1 || summary.Deletes != 1 {
|
|
t.Fatalf("unexpected diff summary: %+v", summary)
|
|
}
|
|
}
|
|
|
|
func TestRunSync_SourceQueryAppliesDiffAgainstTargetTable(t *testing.T) {
|
|
sourceDB := &fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.users": {
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
},
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": {
|
|
{"id": 1, "name": "Alice New"},
|
|
{"id": 2, "name": "Bob"},
|
|
},
|
|
"SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": {
|
|
{"id": 1},
|
|
},
|
|
},
|
|
}
|
|
targetDB := &fakeQuerySyncTargetDB{
|
|
fakeMigrationDB: fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.users": {
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
},
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": {
|
|
{"id": 1, "name": "Alice Old"},
|
|
},
|
|
"SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": {
|
|
{"id": 1},
|
|
{"id": 3, "name": "Carol"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
oldFactory := newSyncDatabase
|
|
defer func() { newSyncDatabase = oldFactory }()
|
|
callCount := 0
|
|
newSyncDatabase = func(dbType string) (db.Database, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return sourceDB, nil
|
|
}
|
|
return targetDB, nil
|
|
}
|
|
|
|
engine := NewSyncEngine(Reporter{})
|
|
result := engine.RunSync(SyncConfig{
|
|
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
Tables: []string{"users"},
|
|
Mode: "insert_update",
|
|
SourceQuery: "SELECT id, name FROM active_users",
|
|
TableOptions: map[string]TableOptions{
|
|
"users": {Insert: true, Update: true, Delete: true},
|
|
},
|
|
})
|
|
|
|
if !result.Success {
|
|
t.Fatalf("RunSync 返回失败: %+v", result)
|
|
}
|
|
if result.TablesSynced != 1 || result.RowsInserted != 1 || result.RowsUpdated != 1 || result.RowsDeleted != 1 {
|
|
t.Fatalf("unexpected sync result: %+v", result)
|
|
}
|
|
if targetDB.appliedTable != "users" {
|
|
t.Fatalf("expected applied table users, got %q", targetDB.appliedTable)
|
|
}
|
|
|
|
wantInserts := []map[string]interface{}{{"id": 2, "name": "Bob"}}
|
|
if !reflect.DeepEqual(targetDB.appliedChanges.Inserts, wantInserts) {
|
|
t.Fatalf("unexpected inserts: got=%v want=%v", targetDB.appliedChanges.Inserts, wantInserts)
|
|
}
|
|
|
|
wantUpdates := []connection.UpdateRow{{
|
|
Keys: map[string]interface{}{"id": 1},
|
|
Values: map[string]interface{}{"name": "Alice New"},
|
|
}}
|
|
if !reflect.DeepEqual(targetDB.appliedChanges.Updates, wantUpdates) {
|
|
t.Fatalf("unexpected updates: got=%v want=%v", targetDB.appliedChanges.Updates, wantUpdates)
|
|
}
|
|
|
|
wantDeletes := []map[string]interface{}{{"id": 3}}
|
|
if !reflect.DeepEqual(targetDB.appliedChanges.Deletes, wantDeletes) {
|
|
t.Fatalf("unexpected deletes: got=%v want=%v", targetDB.appliedChanges.Deletes, wantDeletes)
|
|
}
|
|
}
|
|
|
|
func TestRunSync_SourceQueryInsertUpdateUsesPagedQueries(t *testing.T) {
|
|
columns := []connection.ColumnDefinition{
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
}
|
|
sourceDB := &fakeMigrationDB{
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": {
|
|
{"id": 1, "name": "Alice New"},
|
|
{"id": 2, "name": "Bob"},
|
|
},
|
|
"SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": {
|
|
{"id": 1},
|
|
},
|
|
},
|
|
}
|
|
targetDB := &fakeQuerySyncTargetDB{
|
|
fakeMigrationDB: fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.users": columns,
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": {
|
|
{"id": 1, "name": "Alice Old"},
|
|
},
|
|
"SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": {
|
|
{"id": 1},
|
|
{"id": 3},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
oldFactory := newSyncDatabase
|
|
defer func() { newSyncDatabase = oldFactory }()
|
|
callCount := 0
|
|
newSyncDatabase = func(dbType string) (db.Database, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return sourceDB, nil
|
|
}
|
|
return targetDB, nil
|
|
}
|
|
|
|
engine := NewSyncEngine(Reporter{})
|
|
result := engine.RunSync(SyncConfig{
|
|
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
Tables: []string{"users"},
|
|
Mode: "insert_update",
|
|
SourceQuery: "SELECT id, name FROM active_users",
|
|
TableOptions: map[string]TableOptions{
|
|
"users": {Insert: true, Update: true, Delete: true},
|
|
},
|
|
})
|
|
|
|
if !result.Success {
|
|
t.Fatalf("RunSync 返回失败: %+v", result)
|
|
}
|
|
if result.RowsInserted != 1 || result.RowsUpdated != 1 || result.RowsDeleted != 1 {
|
|
t.Fatalf("unexpected sync result: %+v", result)
|
|
}
|
|
for _, query := range sourceDB.queryLog {
|
|
if query == "SELECT id, name FROM active_users" {
|
|
t.Fatalf("SQL 结果集分页同步不应全量执行原始查询,实际查询=%s", query)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRunSync_BatchesLargeTableChanges(t *testing.T) {
|
|
sourceRows := make([]map[string]interface{}, 2501)
|
|
for i := range sourceRows {
|
|
sourceRows[i] = map[string]interface{}{
|
|
"id": i + 1,
|
|
"name": "event",
|
|
}
|
|
}
|
|
|
|
columns := []connection.ColumnDefinition{
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
}
|
|
sourceDB := &fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.events": columns,
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 0": sourceRows[:1000],
|
|
"SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 1000": sourceRows[1000:2000],
|
|
"SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 2000": sourceRows[2000:],
|
|
},
|
|
}
|
|
targetDB := &fakeQuerySyncTargetDB{
|
|
fakeMigrationDB: fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.events": columns,
|
|
},
|
|
},
|
|
}
|
|
|
|
oldFactory := newSyncDatabase
|
|
defer func() { newSyncDatabase = oldFactory }()
|
|
callCount := 0
|
|
newSyncDatabase = func(dbType string) (db.Database, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return sourceDB, nil
|
|
}
|
|
return targetDB, nil
|
|
}
|
|
|
|
engine := NewSyncEngine(Reporter{})
|
|
result := engine.RunSync(SyncConfig{
|
|
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
Tables: []string{"events"},
|
|
Mode: "insert_only",
|
|
})
|
|
|
|
if !result.Success {
|
|
t.Fatalf("RunSync 返回失败: %+v", result)
|
|
}
|
|
if result.RowsInserted != len(sourceRows) {
|
|
t.Fatalf("RowsInserted=%d, want %d", result.RowsInserted, len(sourceRows))
|
|
}
|
|
for _, query := range sourceDB.queryLog {
|
|
if strings.HasPrefix(query, "SELECT * FROM") {
|
|
t.Fatalf("期望分页流式导入不再全量读取源表,实际查询=%s", query)
|
|
}
|
|
}
|
|
if len(targetDB.appliedBatches) != 3 {
|
|
t.Fatalf("期望大表拆成 3 批提交,实际 %d 批", len(targetDB.appliedBatches))
|
|
}
|
|
wantBatchSizes := []int{1000, 1000, 501}
|
|
for idx, want := range wantBatchSizes {
|
|
if got := len(targetDB.appliedBatches[idx].Inserts); got != want {
|
|
t.Fatalf("batch %d inserts=%d, want %d", idx+1, got, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRunSync_DirectImportPagingKeepsSelectedPKFilter(t *testing.T) {
|
|
sourceRows := []map[string]interface{}{
|
|
{"id": 1, "name": "event-1"},
|
|
{"id": 2, "name": "event-2"},
|
|
{"id": 3, "name": "event-3"},
|
|
}
|
|
columns := []connection.ColumnDefinition{
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
}
|
|
sourceDB := &fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.events": columns,
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 0": sourceRows,
|
|
},
|
|
}
|
|
targetDB := &fakeQuerySyncTargetDB{
|
|
fakeMigrationDB: fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.events": columns,
|
|
},
|
|
},
|
|
}
|
|
|
|
oldFactory := newSyncDatabase
|
|
defer func() { newSyncDatabase = oldFactory }()
|
|
callCount := 0
|
|
newSyncDatabase = func(dbType string) (db.Database, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return sourceDB, nil
|
|
}
|
|
return targetDB, nil
|
|
}
|
|
|
|
engine := NewSyncEngine(Reporter{})
|
|
result := engine.RunSync(SyncConfig{
|
|
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
Tables: []string{"events"},
|
|
Mode: "insert_only",
|
|
TableOptions: map[string]TableOptions{
|
|
"events": {
|
|
Insert: true,
|
|
SelectedInsertPKs: []string{"2"},
|
|
},
|
|
},
|
|
})
|
|
|
|
if !result.Success {
|
|
t.Fatalf("RunSync 返回失败: %+v", result)
|
|
}
|
|
if result.RowsInserted != 1 {
|
|
t.Fatalf("RowsInserted=%d, want 1", result.RowsInserted)
|
|
}
|
|
if len(targetDB.appliedBatches) != 1 || len(targetDB.appliedBatches[0].Inserts) != 1 {
|
|
t.Fatalf("expected one selected insert batch, got %+v", targetDB.appliedBatches)
|
|
}
|
|
if got := targetDB.appliedBatches[0].Inserts[0]["id"]; got != 2 {
|
|
t.Fatalf("selected insert id=%v, want 2", got)
|
|
}
|
|
}
|
|
|
|
func TestRunSync_InsertUpdateDiffUsesPagedPKLookups(t *testing.T) {
|
|
sourceRows := []map[string]interface{}{
|
|
{"id": 1, "name": "one-new"},
|
|
{"id": 2, "name": "two"},
|
|
{"id": 3, "name": "three"},
|
|
}
|
|
columns := []connection.ColumnDefinition{
|
|
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
|
|
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
|
|
}
|
|
sourceDB := &fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.events": columns,
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 0": sourceRows,
|
|
"SELECT `id` FROM `app`.`events` WHERE `id` IN (1, 4)": {
|
|
{"id": 1},
|
|
},
|
|
},
|
|
}
|
|
targetDB := &fakeQuerySyncTargetDB{
|
|
fakeMigrationDB: fakeMigrationDB{
|
|
columns: map[string][]connection.ColumnDefinition{
|
|
"app.events": columns,
|
|
},
|
|
queryData: map[string][]map[string]interface{}{
|
|
"SELECT `id`, `name` FROM `app`.`events` WHERE `id` IN (1, 2, 3)": {
|
|
{"id": 1, "name": "one-old"},
|
|
{"id": 2, "name": "two"},
|
|
},
|
|
"SELECT `id` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000": {
|
|
{"id": 1},
|
|
{"id": 4},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
oldFactory := newSyncDatabase
|
|
defer func() { newSyncDatabase = oldFactory }()
|
|
callCount := 0
|
|
newSyncDatabase = func(dbType string) (db.Database, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return sourceDB, nil
|
|
}
|
|
return targetDB, nil
|
|
}
|
|
|
|
engine := NewSyncEngine(Reporter{})
|
|
result := engine.RunSync(SyncConfig{
|
|
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
|
|
Tables: []string{"events"},
|
|
Mode: "insert_update",
|
|
TableOptions: map[string]TableOptions{
|
|
"events": {Insert: true, Update: true, Delete: true},
|
|
},
|
|
})
|
|
|
|
if !result.Success {
|
|
t.Fatalf("RunSync 返回失败: %+v", result)
|
|
}
|
|
if result.RowsInserted != 1 || result.RowsUpdated != 1 || result.RowsDeleted != 1 {
|
|
t.Fatalf("unexpected sync result: %+v", result)
|
|
}
|
|
if len(targetDB.appliedBatches) != 2 {
|
|
t.Fatalf("expected source diff batch and delete batch, got %d", len(targetDB.appliedBatches))
|
|
}
|
|
firstBatch := targetDB.appliedBatches[0]
|
|
if !reflect.DeepEqual(firstBatch.Inserts, []map[string]interface{}{{"id": 3, "name": "three"}}) {
|
|
t.Fatalf("unexpected inserts: %+v", firstBatch.Inserts)
|
|
}
|
|
wantUpdates := []connection.UpdateRow{{
|
|
Keys: map[string]interface{}{"id": 1},
|
|
Values: map[string]interface{}{"name": "one-new"},
|
|
}}
|
|
if !reflect.DeepEqual(firstBatch.Updates, wantUpdates) {
|
|
t.Fatalf("unexpected updates: %+v", firstBatch.Updates)
|
|
}
|
|
if !reflect.DeepEqual(targetDB.appliedBatches[1].Deletes, []map[string]interface{}{{"id": 4}}) {
|
|
t.Fatalf("unexpected deletes: %+v", targetDB.appliedBatches[1].Deletes)
|
|
}
|
|
for _, query := range append(sourceDB.queryLog, targetDB.queryLog...) {
|
|
if strings.HasPrefix(query, "SELECT * FROM") {
|
|
t.Fatalf("分页差异同步不应全量读取表,实际查询=%s", query)
|
|
}
|
|
}
|
|
}
|