package sync import ( "GoNavi-Wails/internal/connection" "GoNavi-Wails/internal/db" "GoNavi-Wails/shared/i18n" "errors" "fmt" "os" "reflect" "strings" "testing" ) type fakeQuerySyncTargetDB struct { fakeMigrationDB appliedTable string appliedChanges connection.ChangeSet appliedBatches []connection.ChangeSet } func (f *fakeQuerySyncTargetDB) ApplyChanges(tableName string, changes connection.ChangeSet) error { f.appliedTable = tableName f.appliedChanges.Inserts = append(f.appliedChanges.Inserts, changes.Inserts...) f.appliedChanges.Updates = append(f.appliedChanges.Updates, changes.Updates...) f.appliedChanges.Deletes = append(f.appliedChanges.Deletes, changes.Deletes...) f.appliedBatches = append(f.appliedBatches, changes) return nil } var _ db.BatchApplier = (*fakeQuerySyncTargetDB)(nil) type errorMigrationDB struct { fakeMigrationDB getColumnsErr error queryErrors map[string]error } type connectErrorMigrationDB struct { fakeMigrationDB connectErr error } type syncDatabaseFactoryStep struct { db db.Database err error } func (f *errorMigrationDB) Query(query string) ([]map[string]interface{}, []string, error) { if err, ok := f.queryErrors[query]; ok { f.queryLog = append(f.queryLog, query) return nil, nil, err } return f.fakeMigrationDB.Query(query) } func (f *errorMigrationDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { if f.getColumnsErr != nil { return nil, f.getColumnsErr } return f.fakeMigrationDB.GetColumns(dbName, tableName) } func (f *connectErrorMigrationDB) Connect(config connection.ConnectionConfig) error { return f.connectErr } func useSyncDatabaseFactorySequence(t *testing.T, steps ...syncDatabaseFactoryStep) { t.Helper() oldFactory := newSyncDatabase index := 0 newSyncDatabase = func(dbType string) (db.Database, error) { if index >= len(steps) { t.Fatalf("unexpected newSyncDatabase call %d for %s", index+1, dbType) } step := steps[index] index++ return step.db, step.err } t.Cleanup(func() { newSyncDatabase = oldFactory }) } func baseSourceQuerySyncConfig() SyncConfig { return SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, SourceQuery: "SELECT id, name FROM active_users", Tables: []string{"users"}, Mode: "insert_update", } } func localizedSyncTestText(t *testing.T, language i18n.Language, key string, params map[string]any) string { t.Helper() localizer, err := i18n.NewLocalizer(language) if err != nil { t.Fatalf("NewLocalizer(%s) error = %v", language, err) } return localizer.T(key, params) } func assertNoLegacySourceQueryChinese(t *testing.T, text string) { t.Helper() legacyFragments := []string{ "源查询 SQL 不能为空", "SQL 结果集同步当前仅支持", "SQL 结果集同步要求且仅允许选择一个目标表", "目标表不能为空", "目标表无主键,不支持基于 SQL 结果集的差异分析", "目标表为复合主键", "获取目标表字段失败", "不存在或未读取到字段定义", "执行源查询失败", "读取目标表失败", "\u521d\u59cb\u5316\u6e90\u6570\u636e\u5e93\u9a71\u52a8\u5931\u8d25", "\u521d\u59cb\u5316\u76ee\u6807\u6570\u636e\u5e93\u9a71\u52a8\u5931\u8d25", "\u6e90\u6570\u636e\u5e93\u8fde\u63a5\u5931\u8d25", "\u76ee\u6807\u6570\u636e\u5e93\u8fde\u63a5\u5931\u8d25", "已完成 1 个目标表的差异分析", "SQL 结果集差异分析完成", "SQL 结果集同步预览", "差异分析开始", "差异分析完成", "开始同步", "同步来源:SQL 结果集 -> 目标表", } for _, fragment := range legacyFragments { if strings.Contains(text, fragment) { t.Fatalf("expected localized message without legacy Chinese fragment %q, got %q", fragment, text) } } } func TestSourceQuerySyncUsesLocalizedBackendTextSourceGuard(t *testing.T) { sourceBytes, err := os.ReadFile("source_query_sync.go") if err != nil { t.Fatalf("read source_query_sync.go: %v", err) } source := string(sourceBytes) legacyMessages := []string{ `fmt.Errorf("源查询 SQL 不能为空")`, `fmt.Errorf("SQL 结果集同步当前仅支持“仅同步数据”")`, `fmt.Errorf("SQL 结果集同步要求且仅允许选择一个目标表")`, `fmt.Errorf("目标表不能为空")`, `fmt.Errorf("目标表无主键,不支持基于 SQL 结果集的差异分析")`, `fmt.Errorf("目标表为复合主键(%s),暂不支持基于 SQL 结果集的差异分析", strings.Join(pkCols, ","))`, `fmt.Errorf("获取目标表字段失败: %w", err)`, `fmt.Errorf("目标表 %s 不存在或未读取到字段定义", tableName)`, `fmt.Errorf("执行源查询失败: %w", err)`, `fmt.Errorf("读取目标表失败: %w", err)`, "\u521d\u59cb\u5316\u6e90\u6570\u636e\u5e93\u9a71\u52a8\u5931\u8d25: ", "\u521d\u59cb\u5316\u76ee\u6807\u6570\u636e\u5e93\u9a71\u52a8\u5931\u8d25: ", "\u6e90\u6570\u636e\u5e93\u8fde\u63a5\u5931\u8d25: ", "\u76ee\u6807\u6570\u636e\u5e93\u8fde\u63a5\u5931\u8d25: ", "已完成 1 个目标表的差异分析", "SQL 结果集差异分析完成", "SQL 结果集同步预览", "差异分析开始", "差异分析完成", "开始同步", "同步来源:SQL 结果集 -> 目标表", } for _, legacy := range legacyMessages { if strings.Contains(source, legacy) { t.Fatalf("source_query_sync.go still contains legacy raw user-visible message %q", legacy) } } requiredKeys := []string{ "data_sync.backend.validation.source_query_required", "data_sync.backend.validation.query_mode_data_only", "data_sync.backend.validation.single_target_table_required", "data_sync.backend.validation.target_table_required", "data_sync.backend.error.target_pk_required_for_query_diff", "data_sync.backend.error.target_composite_pk_query_diff_unsupported", "data_sync.backend.error.load_target_columns_failed", "data_sync.backend.error.target_table_columns_missing", "data_sync.backend.error.execute_source_query_failed", "data_sync.backend.error.read_target_table_failed", "data_sync.backend.error.init_source_driver_failed", "data_sync.backend.error.init_target_driver_failed", "data_sync.backend.error.connect_source_failed", "data_sync.backend.error.connect_target_failed", "data_sync.backend.result.analyzed_target_tables", "data_sync.backend.summary.source_query_diff_completed", "data_sync.plan.source_query_preview", "data_sync.progress.stage.analysis_started", "data_sync.progress.stage.analysis_completed", "data_sync.progress.stage.sync_started", "data_sync.backend.log.source_query_sync_source", } for _, key := range requiredKeys { if !strings.Contains(source, key) { t.Fatalf("source_query_sync.go should reference localized key %q", key) } } } func TestSourceQuerySyncCatalogKeysExist(t *testing.T) { catalogs, err := i18n.LoadCatalogs() if err != nil { t.Fatalf("LoadCatalogs() error = %v", err) } keys := []string{ "data_sync.backend.validation.source_query_required", "data_sync.backend.validation.query_mode_data_only", "data_sync.backend.validation.single_target_table_required", "data_sync.backend.validation.target_table_required", "data_sync.backend.error.target_pk_required_for_query_diff", "data_sync.backend.error.target_composite_pk_query_diff_unsupported", "data_sync.backend.error.load_target_columns_failed", "data_sync.backend.error.target_table_columns_missing", "data_sync.backend.error.execute_source_query_failed", "data_sync.backend.error.read_target_table_failed", "data_sync.backend.error.init_source_driver_failed", "data_sync.backend.error.init_target_driver_failed", "data_sync.backend.error.connect_source_failed", "data_sync.backend.error.connect_target_failed", "data_sync.backend.result.analyzed_target_tables", "data_sync.backend.summary.source_query_diff_completed", "data_sync.plan.source_query_preview", "data_sync.progress.stage.analysis_started", "data_sync.progress.stage.analysis_completed", "data_sync.progress.stage.sync_started", "data_sync.backend.log.source_query_sync_source", } for _, language := range i18n.SupportedLanguages() { catalog := catalogs[language] for _, key := range keys { if strings.TrimSpace(catalog[key]) == "" { t.Fatalf("%s catalog missing source query sync key %q", language, key) } } } } func TestValidateSourceQuerySyncConfigUsesCurrentLanguageForValidationErrors(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) cases := []struct { name string config SyncConfig key string params map[string]any }{ { name: "source query required", config: SyncConfig{ SourceQuery: " ", Tables: []string{"users"}, }, key: "data_sync.backend.validation.source_query_required", }, { name: "query mode data only", config: SyncConfig{ SourceQuery: "SELECT id FROM active_users", Content: "schema", Tables: []string{"users"}, }, key: "data_sync.backend.validation.query_mode_data_only", }, { name: "single target table required", config: SyncConfig{ SourceQuery: "SELECT id FROM active_users", Tables: []string{"users", "orders"}, }, key: "data_sync.backend.validation.single_target_table_required", }, { name: "target table required", config: SyncConfig{ SourceQuery: "SELECT id FROM active_users", Tables: []string{" "}, }, key: "data_sync.backend.validation.target_table_required", }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { _, err := validateSourceQuerySyncConfig(tc.config) if err == nil { t.Fatalf("expected validation error for %s", tc.name) } want := localizedSyncTestText(t, i18n.LanguageEnUS, tc.key, tc.params) if err.Error() != want { t.Fatalf("expected localized validation message %q, got %q", want, err.Error()) } assertNoLegacySourceQueryChinese(t, err.Error()) }) } } func TestResolveSinglePKColumnUsesCurrentLanguageForQueryDiffErrors(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) t.Run("target primary key required", func(t *testing.T) { _, err := resolveSinglePKColumn([]connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO"}, }) if err == nil { t.Fatal("expected missing primary key error") } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.error.target_pk_required_for_query_diff", nil) if err.Error() != want { t.Fatalf("expected localized PK required message %q, got %q", want, err.Error()) } assertNoLegacySourceQueryChinese(t, err.Error()) }) t.Run("composite primary key unsupported", func(t *testing.T) { _, err := resolveSinglePKColumn([]connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "tenant_id", Type: "bigint", Nullable: "NO", Key: "PRI"}, }) if err == nil { t.Fatal("expected composite primary key error") } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.error.target_composite_pk_query_diff_unsupported", map[string]any{ "columns": "id,tenant_id", }) if err.Error() != want { t.Fatalf("expected localized composite PK message %q, got %q", want, err.Error()) } assertNoLegacySourceQueryChinese(t, err.Error()) }) } func TestLoadSourceQuerySyncContextUsesCurrentLanguageForStructuredErrors(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) baseConfig := SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, SourceQuery: "SELECT id, name FROM active_users", Tables: []string{"users"}, } targetColumns := []connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, } targetType, _, _, targetQueryTable := resolveTargetQueryTable(baseConfig, "users") targetQuery := fmt.Sprintf("SELECT * FROM %s", quoteQualifiedIdentByType(targetType, targetQueryTable)) t.Run("load target columns failed", func(t *testing.T) { loadErr := errors.New("target columns boom") _, err := loadSourceQuerySyncContext( baseConfig, &errorMigrationDB{}, &errorMigrationDB{getColumnsErr: loadErr}, false, false, false, ) if err == nil { t.Fatal("expected target columns load error") } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.error.load_target_columns_failed", map[string]any{ "detail": loadErr.Error(), }) if err.Error() != want { t.Fatalf("expected localized load target columns message %q, got %q", want, err.Error()) } assertNoLegacySourceQueryChinese(t, err.Error()) }) t.Run("target table columns missing", func(t *testing.T) { _, err := loadSourceQuerySyncContext( baseConfig, &errorMigrationDB{}, &errorMigrationDB{}, false, false, false, ) if err == nil { t.Fatal("expected target table columns missing error") } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.error.target_table_columns_missing", map[string]any{ "table": "users", }) if err.Error() != want { t.Fatalf("expected localized missing target columns message %q, got %q", want, err.Error()) } assertNoLegacySourceQueryChinese(t, err.Error()) }) t.Run("execute source query failed", func(t *testing.T) { queryErr := errors.New("source query boom") _, err := loadSourceQuerySyncContext( baseConfig, &errorMigrationDB{ queryErrors: map[string]error{ "SELECT id, name FROM active_users": queryErr, }, }, &errorMigrationDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": targetColumns, }, }, }, true, false, false, ) if err == nil { t.Fatal("expected execute source query error") } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.error.execute_source_query_failed", map[string]any{ "detail": queryErr.Error(), }) if err.Error() != want { t.Fatalf("expected localized source query execution message %q, got %q", want, err.Error()) } if !errors.Is(err, queryErr) { t.Fatalf("expected wrapped source query error %v, got %v", queryErr, err) } assertNoLegacySourceQueryChinese(t, err.Error()) }) t.Run("read target table failed", func(t *testing.T) { queryErr := errors.New("target query boom") _, err := loadSourceQuerySyncContext( baseConfig, &errorMigrationDB{}, &errorMigrationDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": targetColumns, }, }, queryErrors: map[string]error{ targetQuery: queryErr, }, }, false, true, false, ) if err == nil { t.Fatal("expected read target table error") } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.error.read_target_table_failed", map[string]any{ "detail": queryErr.Error(), }) if err.Error() != want { t.Fatalf("expected localized target table read message %q, got %q", want, err.Error()) } if !errors.Is(err, queryErr) { t.Fatalf("expected wrapped target table error %v, got %v", queryErr, err) } assertNoLegacySourceQueryChinese(t, err.Error()) }) } func TestAnalyzeSourceQueryUsesCurrentLanguageForInitAndConnectFailures(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) sourceDriverErr := errors.New("source driver boom") targetDriverErr := errors.New("target driver boom") sourceConnectErr := errors.New("source connect boom") targetConnectErr := errors.New("target connect boom") cases := []struct { name string steps []syncDatabaseFactoryStep key string detail error }{ { name: "init source driver failed", steps: []syncDatabaseFactoryStep{ {err: sourceDriverErr}, }, key: "data_sync.backend.error.init_source_driver_failed", detail: sourceDriverErr, }, { name: "init target driver failed", steps: []syncDatabaseFactoryStep{ {db: &fakeMigrationDB{}}, {err: targetDriverErr}, }, key: "data_sync.backend.error.init_target_driver_failed", detail: targetDriverErr, }, { name: "connect source failed", steps: []syncDatabaseFactoryStep{ {db: &connectErrorMigrationDB{connectErr: sourceConnectErr}}, {db: &fakeMigrationDB{}}, }, key: "data_sync.backend.error.connect_source_failed", detail: sourceConnectErr, }, { name: "connect target failed", steps: []syncDatabaseFactoryStep{ {db: &fakeMigrationDB{}}, {db: &connectErrorMigrationDB{connectErr: targetConnectErr}}, }, key: "data_sync.backend.error.connect_target_failed", detail: targetConnectErr, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { useSyncDatabaseFactorySequence(t, tc.steps...) result := NewSyncEngine(Reporter{}).Analyze(baseSourceQuerySyncConfig()) if result.Success { t.Fatalf("expected Analyze failure for %s, got %+v", tc.name, result) } want := localizedSyncTestText(t, i18n.LanguageEnUS, tc.key, map[string]any{ "detail": tc.detail.Error(), }) if result.Message != want { t.Fatalf("expected localized Analyze failure message %q, got %q", want, result.Message) } assertNoLegacySourceQueryChinese(t, result.Message) }) } } func TestPreviewSourceQueryUsesCurrentLanguageForInitAndConnectFailures(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) sourceDriverErr := errors.New("source driver boom") targetDriverErr := errors.New("target driver boom") sourceConnectErr := errors.New("source connect boom") targetConnectErr := errors.New("target connect boom") cases := []struct { name string steps []syncDatabaseFactoryStep key string detail error }{ { name: "init source driver failed", steps: []syncDatabaseFactoryStep{ {err: sourceDriverErr}, }, key: "data_sync.backend.error.init_source_driver_failed", detail: sourceDriverErr, }, { name: "init target driver failed", steps: []syncDatabaseFactoryStep{ {db: &fakeMigrationDB{}}, {err: targetDriverErr}, }, key: "data_sync.backend.error.init_target_driver_failed", detail: targetDriverErr, }, { name: "connect source failed", steps: []syncDatabaseFactoryStep{ {db: &connectErrorMigrationDB{connectErr: sourceConnectErr}}, {db: &fakeMigrationDB{}}, }, key: "data_sync.backend.error.connect_source_failed", detail: sourceConnectErr, }, { name: "connect target failed", steps: []syncDatabaseFactoryStep{ {db: &fakeMigrationDB{}}, {db: &connectErrorMigrationDB{connectErr: targetConnectErr}}, }, key: "data_sync.backend.error.connect_target_failed", detail: targetConnectErr, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { useSyncDatabaseFactorySequence(t, tc.steps...) _, err := NewSyncEngine(Reporter{}).previewSourceQuery(baseSourceQuerySyncConfig(), 20) if err == nil { t.Fatalf("expected previewSourceQuery failure for %s", tc.name) } want := localizedSyncTestText(t, i18n.LanguageEnUS, tc.key, map[string]any{ "detail": tc.detail.Error(), }) if err.Error() != want { t.Fatalf("expected localized preview failure message %q, got %q", want, err.Error()) } if !errors.Is(err, tc.detail) { t.Fatalf("expected preview error to wrap %v, got %v", tc.detail, err) } assertNoLegacySourceQueryChinese(t, err.Error()) }) } } func TestRunSourceQuerySyncUsesCurrentLanguageForInitAndConnectFailures(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) sourceDriverErr := errors.New("source driver boom") targetDriverErr := errors.New("target driver boom") sourceConnectErr := errors.New("source connect boom") targetConnectErr := errors.New("target connect boom") cases := []struct { name string steps []syncDatabaseFactoryStep key string detail error }{ { name: "init source driver failed", steps: []syncDatabaseFactoryStep{ {err: sourceDriverErr}, }, key: "data_sync.backend.error.init_source_driver_failed", detail: sourceDriverErr, }, { name: "init target driver failed", steps: []syncDatabaseFactoryStep{ {db: &fakeMigrationDB{}}, {err: targetDriverErr}, }, key: "data_sync.backend.error.init_target_driver_failed", detail: targetDriverErr, }, { name: "connect source failed", steps: []syncDatabaseFactoryStep{ {db: &connectErrorMigrationDB{connectErr: sourceConnectErr}}, {db: &fakeMigrationDB{}}, }, key: "data_sync.backend.error.connect_source_failed", detail: sourceConnectErr, }, { name: "connect target failed", steps: []syncDatabaseFactoryStep{ {db: &fakeMigrationDB{}}, {db: &connectErrorMigrationDB{connectErr: targetConnectErr}}, }, key: "data_sync.backend.error.connect_target_failed", detail: targetConnectErr, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { useSyncDatabaseFactorySequence(t, tc.steps...) result := NewSyncEngine(Reporter{}).RunSync(baseSourceQuerySyncConfig()) if result.Success { t.Fatalf("expected RunSync failure for %s, got %+v", tc.name, result) } want := localizedSyncTestText(t, i18n.LanguageEnUS, tc.key, map[string]any{ "detail": tc.detail.Error(), }) if result.Message != want { t.Fatalf("expected localized RunSync failure message %q, got %q", want, result.Message) } assertNoLegacySourceQueryChinese(t, result.Message) }) } } func TestAnalyzeSourceQueryUsesCurrentLanguageForResultSummaryAndProgress(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": { {"id": 1, "name": "Alice New"}, {"id": 2, "name": "Bob"}, }, "SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": { {"id": 1}, }, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": { {"id": 1, "name": "Alice Old"}, }, "SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": { {"id": 1}, {"id": 3, "name": "Carol"}, }, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } progressEvents := make([]SyncProgressEvent, 0, 2) engine := NewSyncEngine(Reporter{ OnProgress: func(event SyncProgressEvent) { progressEvents = append(progressEvents, event) }, }) config := baseSourceQuerySyncConfig() config.JobID = "job-source-query-analyze" result := engine.Analyze(config) if !result.Success { t.Fatalf("Analyze returned failure: %+v", result) } if len(result.Tables) != 1 { t.Fatalf("expected one table summary, got %d", len(result.Tables)) } wantResult := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.result.analyzed_target_tables", map[string]any{ "count": 1, }) if result.Message != wantResult { t.Fatalf("expected localized Analyze result message %q, got %q", wantResult, result.Message) } assertNoLegacySourceQueryChinese(t, result.Message) wantSummary := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.summary.source_query_diff_completed", nil) if result.Tables[0].Message != wantSummary { t.Fatalf("expected localized Analyze summary message %q, got %q", wantSummary, result.Tables[0].Message) } assertNoLegacySourceQueryChinese(t, result.Tables[0].Message) if len(progressEvents) < 2 { t.Fatalf("expected at least two progress events, got %d", len(progressEvents)) } wantStarted := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.progress.stage.analysis_started", nil) if progressEvents[0].Stage != wantStarted { t.Fatalf("expected localized analysis start stage %q, got %q", wantStarted, progressEvents[0].Stage) } assertNoLegacySourceQueryChinese(t, progressEvents[0].Stage) wantCompleted := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.progress.stage.analysis_completed", nil) gotCompleted := progressEvents[len(progressEvents)-1].Stage if gotCompleted != wantCompleted { t.Fatalf("expected localized analysis completed stage %q, got %q", wantCompleted, gotCompleted) } assertNoLegacySourceQueryChinese(t, gotCompleted) } func TestPreviewSourceQueryUsesCurrentLanguageForSchemaSummary(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": { {"id": 1, "name": "Alice New"}, {"id": 2, "name": "Bob"}, }, "SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": { {"id": 1}, }, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": { {"id": 1, "name": "Alice Old"}, }, "SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": { {"id": 1}, {"id": 3, "name": "Carol"}, }, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } preview, err := NewSyncEngine(Reporter{}).previewSourceQuery(baseSourceQuerySyncConfig(), 20) if err != nil { t.Fatalf("previewSourceQuery returned error: %v", err) } want := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.plan.source_query_preview", nil) if preview.SchemaSummary != want { t.Fatalf("expected localized preview schema summary %q, got %q", want, preview.SchemaSummary) } assertNoLegacySourceQueryChinese(t, preview.SchemaSummary) } func TestRunSourceQuerySyncUsesCurrentLanguageForStartProgressAndSourceLog(t *testing.T) { SetBackendLanguage(i18n.LanguageEnUS) t.Cleanup(func() { SetBackendLanguage(i18n.LanguageZhCN) }) useSyncDatabaseFactorySequence(t, syncDatabaseFactoryStep{err: errors.New("source driver boom")}) progressEvents := make([]SyncProgressEvent, 0, 1) logEvents := make([]SyncLogEvent, 0, 2) engine := NewSyncEngine(Reporter{ OnProgress: func(event SyncProgressEvent) { progressEvents = append(progressEvents, event) }, OnLog: func(event SyncLogEvent) { logEvents = append(logEvents, event) }, }) config := baseSourceQuerySyncConfig() config.JobID = "job-source-query-run" result := engine.RunSync(config) if result.Success { t.Fatalf("expected RunSync failure, got %+v", result) } if len(progressEvents) == 0 { t.Fatalf("expected sync start progress event") } wantStage := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.progress.stage.sync_started", nil) if progressEvents[0].Stage != wantStage { t.Fatalf("expected localized sync start stage %q, got %q", wantStage, progressEvents[0].Stage) } assertNoLegacySourceQueryChinese(t, progressEvents[0].Stage) if len(logEvents) == 0 { t.Fatalf("expected source query sync source log event") } wantLog := localizedSyncTestText(t, i18n.LanguageEnUS, "data_sync.backend.log.source_query_sync_source", map[string]any{ "table": "users", "mode": "insert_update", }) if logEvents[0].Message != wantLog { t.Fatalf("expected localized source query sync log %q, got %q", wantLog, logEvents[0].Message) } assertNoLegacySourceQueryChinese(t, logEvents[0].Message) } func TestAnalyze_SourceQueryUsesQueryResultAsSourceDataset(t *testing.T) { sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": { {"id": 1, "name": "Alice New"}, {"id": 2, "name": "Bob"}, }, "SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": { {"id": 1}, }, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": { {"id": 1, "name": "Alice Old"}, }, "SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": { {"id": 1}, {"id": 3, "name": "Carol"}, }, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } engine := NewSyncEngine(Reporter{}) result := engine.Analyze(SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, Tables: []string{"users"}, Mode: "insert_update", SourceQuery: "SELECT id, name FROM active_users", }) if !result.Success { t.Fatalf("Analyze 返回失败: %+v", result) } if len(result.Tables) != 1 { t.Fatalf("expected one table summary, got %d", len(result.Tables)) } summary := result.Tables[0] if summary.PKColumn != "id" { t.Fatalf("expected PKColumn=id, got %q", summary.PKColumn) } if !summary.CanSync { t.Fatalf("expected summary can sync, got %+v", summary) } if summary.Inserts != 1 || summary.Updates != 1 || summary.Deletes != 1 { t.Fatalf("unexpected diff summary: %+v", summary) } } func TestRunSync_SourceQueryAppliesDiffAgainstTargetTable(t *testing.T) { sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": { {"id": 1, "name": "Alice New"}, {"id": 2, "name": "Bob"}, }, "SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": { {"id": 1}, }, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": { {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, }, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": { {"id": 1, "name": "Alice Old"}, }, "SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": { {"id": 1}, {"id": 3, "name": "Carol"}, }, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } engine := NewSyncEngine(Reporter{}) result := engine.RunSync(SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, Tables: []string{"users"}, Mode: "insert_update", SourceQuery: "SELECT id, name FROM active_users", TableOptions: map[string]TableOptions{ "users": {Insert: true, Update: true, Delete: true}, }, }) if !result.Success { t.Fatalf("RunSync 返回失败: %+v", result) } if result.TablesSynced != 1 || result.RowsInserted != 1 || result.RowsUpdated != 1 || result.RowsDeleted != 1 { t.Fatalf("unexpected sync result: %+v", result) } if targetDB.appliedTable != "users" { t.Fatalf("expected applied table users, got %q", targetDB.appliedTable) } wantInserts := []map[string]interface{}{{"id": 2, "name": "Bob"}} if !reflect.DeepEqual(targetDB.appliedChanges.Inserts, wantInserts) { t.Fatalf("unexpected inserts: got=%v want=%v", targetDB.appliedChanges.Inserts, wantInserts) } wantUpdates := []connection.UpdateRow{{ Keys: map[string]interface{}{"id": 1}, Values: map[string]interface{}{"name": "Alice New"}, }} if !reflect.DeepEqual(targetDB.appliedChanges.Updates, wantUpdates) { t.Fatalf("unexpected updates: got=%v want=%v", targetDB.appliedChanges.Updates, wantUpdates) } wantDeletes := []map[string]interface{}{{"id": 3}} if !reflect.DeepEqual(targetDB.appliedChanges.Deletes, wantDeletes) { t.Fatalf("unexpected deletes: got=%v want=%v", targetDB.appliedChanges.Deletes, wantDeletes) } } func TestRunSync_SourceQueryInsertUpdateUsesPagedQueries(t *testing.T) { columns := []connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, } sourceDB := &fakeMigrationDB{ queryData: map[string][]map[string]interface{}{ "SELECT * FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ ORDER BY `id` ASC LIMIT 1000 OFFSET 0": { {"id": 1, "name": "Alice New"}, {"id": 2, "name": "Bob"}, }, "SELECT `id` FROM (SELECT id, name FROM active_users) AS __gonavi_source_query__ WHERE `id` IN (1, 3)": { {"id": 1}, }, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.users": columns, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`users` WHERE `id` IN (1, 2)": { {"id": 1, "name": "Alice Old"}, }, "SELECT `id` FROM `app`.`users` ORDER BY `id` ASC LIMIT 1000": { {"id": 1}, {"id": 3}, }, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } engine := NewSyncEngine(Reporter{}) result := engine.RunSync(SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, Tables: []string{"users"}, Mode: "insert_update", SourceQuery: "SELECT id, name FROM active_users", TableOptions: map[string]TableOptions{ "users": {Insert: true, Update: true, Delete: true}, }, }) if !result.Success { t.Fatalf("RunSync 返回失败: %+v", result) } if result.RowsInserted != 1 || result.RowsUpdated != 1 || result.RowsDeleted != 1 { t.Fatalf("unexpected sync result: %+v", result) } for _, query := range sourceDB.queryLog { if query == "SELECT id, name FROM active_users" { t.Fatalf("SQL 结果集分页同步不应全量执行原始查询,实际查询=%s", query) } } } func TestRunSync_BatchesLargeTableChanges(t *testing.T) { sourceRows := make([]map[string]interface{}, 2501) for i := range sourceRows { sourceRows[i] = map[string]interface{}{ "id": i + 1, "name": "event", } } columns := []connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, } sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.events": columns, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 0": sourceRows[:1000], "SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 1000": sourceRows[1000:2000], "SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 2000": sourceRows[2000:], }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.events": columns, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } engine := NewSyncEngine(Reporter{}) result := engine.RunSync(SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, Tables: []string{"events"}, Mode: "insert_only", }) if !result.Success { t.Fatalf("RunSync 返回失败: %+v", result) } if result.RowsInserted != len(sourceRows) { t.Fatalf("RowsInserted=%d, want %d", result.RowsInserted, len(sourceRows)) } for _, query := range sourceDB.queryLog { if strings.HasPrefix(query, "SELECT * FROM") { t.Fatalf("期望分页流式导入不再全量读取源表,实际查询=%s", query) } } if len(targetDB.appliedBatches) != 3 { t.Fatalf("期望大表拆成 3 批提交,实际 %d 批", len(targetDB.appliedBatches)) } wantBatchSizes := []int{1000, 1000, 501} for idx, want := range wantBatchSizes { if got := len(targetDB.appliedBatches[idx].Inserts); got != want { t.Fatalf("batch %d inserts=%d, want %d", idx+1, got, want) } } } func TestRunSync_DirectImportPagingKeepsSelectedPKFilter(t *testing.T) { sourceRows := []map[string]interface{}{ {"id": 1, "name": "event-1"}, {"id": 2, "name": "event-2"}, {"id": 3, "name": "event-3"}, } columns := []connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, } sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.events": columns, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 0": sourceRows, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.events": columns, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } engine := NewSyncEngine(Reporter{}) result := engine.RunSync(SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, Tables: []string{"events"}, Mode: "insert_only", TableOptions: map[string]TableOptions{ "events": { Insert: true, SelectedInsertPKs: []string{"2"}, }, }, }) if !result.Success { t.Fatalf("RunSync 返回失败: %+v", result) } if result.RowsInserted != 1 { t.Fatalf("RowsInserted=%d, want 1", result.RowsInserted) } if len(targetDB.appliedBatches) != 1 || len(targetDB.appliedBatches[0].Inserts) != 1 { t.Fatalf("expected one selected insert batch, got %+v", targetDB.appliedBatches) } if got := targetDB.appliedBatches[0].Inserts[0]["id"]; got != 2 { t.Fatalf("selected insert id=%v, want 2", got) } } func TestRunSync_InsertUpdateDiffUsesPagedPKLookups(t *testing.T) { sourceRows := []map[string]interface{}{ {"id": 1, "name": "one-new"}, {"id": 2, "name": "two"}, {"id": 3, "name": "three"}, } columns := []connection.ColumnDefinition{ {Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"}, {Name: "name", Type: "varchar(64)", Nullable: "YES"}, } sourceDB := &fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.events": columns, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000 OFFSET 0": sourceRows, "SELECT `id` FROM `app`.`events` WHERE `id` IN (1, 4)": { {"id": 1}, }, }, } targetDB := &fakeQuerySyncTargetDB{ fakeMigrationDB: fakeMigrationDB{ columns: map[string][]connection.ColumnDefinition{ "app.events": columns, }, queryData: map[string][]map[string]interface{}{ "SELECT `id`, `name` FROM `app`.`events` WHERE `id` IN (1, 2, 3)": { {"id": 1, "name": "one-old"}, {"id": 2, "name": "two"}, }, "SELECT `id` FROM `app`.`events` ORDER BY `id` ASC LIMIT 1000": { {"id": 1}, {"id": 4}, }, }, }, } oldFactory := newSyncDatabase defer func() { newSyncDatabase = oldFactory }() callCount := 0 newSyncDatabase = func(dbType string) (db.Database, error) { callCount++ if callCount == 1 { return sourceDB, nil } return targetDB, nil } engine := NewSyncEngine(Reporter{}) result := engine.RunSync(SyncConfig{ SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"}, Tables: []string{"events"}, Mode: "insert_update", TableOptions: map[string]TableOptions{ "events": {Insert: true, Update: true, Delete: true}, }, }) if !result.Success { t.Fatalf("RunSync 返回失败: %+v", result) } if result.RowsInserted != 1 || result.RowsUpdated != 1 || result.RowsDeleted != 1 { t.Fatalf("unexpected sync result: %+v", result) } if len(targetDB.appliedBatches) != 2 { t.Fatalf("expected source diff batch and delete batch, got %d", len(targetDB.appliedBatches)) } firstBatch := targetDB.appliedBatches[0] if !reflect.DeepEqual(firstBatch.Inserts, []map[string]interface{}{{"id": 3, "name": "three"}}) { t.Fatalf("unexpected inserts: %+v", firstBatch.Inserts) } wantUpdates := []connection.UpdateRow{{ Keys: map[string]interface{}{"id": 1}, Values: map[string]interface{}{"name": "one-new"}, }} if !reflect.DeepEqual(firstBatch.Updates, wantUpdates) { t.Fatalf("unexpected updates: %+v", firstBatch.Updates) } if !reflect.DeepEqual(targetDB.appliedBatches[1].Deletes, []map[string]interface{}{{"id": 4}}) { t.Fatalf("unexpected deletes: %+v", targetDB.appliedBatches[1].Deletes) } for _, query := range append(sourceDB.queryLog, targetDB.queryLog...) { if strings.HasPrefix(query, "SELECT * FROM") { t.Fatalf("分页差异同步不应全量读取表,实际查询=%s", query) } } }