🐛 fix(data-sync): 完善多种目标库的 schema 同步链路

- 扩展数据同步目标端 schema 选择与元数据加载,覆盖 SQL Server、IRIS、DuckDB 等独立 schema 场景
- 修正同步链路中的目标表 schema 归一化与 query/apply 表名解析,避免落到错误模式
- 补充前后端回归测试与多语言文案,覆盖 schema 选择、别名识别和结果预览路径

Fixes #571
This commit is contained in:
Syngnat
2026-06-21 22:46:57 +08:00
parent 36233ba9aa
commit 4999fd544d
26 changed files with 4142 additions and 2182 deletions

View File

@@ -1,67 +1,102 @@
import { readFileSync } from 'node:fs';
import { describe, expect, it } from 'vitest';
import { readFileSync } from "node:fs";
import { describe, expect, it } from "vitest";
const source = readFileSync(new URL('./DataSyncModal.tsx', import.meta.url), 'utf8');
const source = readFileSync(
new URL("./DataSyncModal.tsx", import.meta.url),
"utf8",
);
describe('DataSyncModal i18n', () => {
it('localizes fixed workflow chrome while preserving raw table and SQL details as params', () => {
describe("DataSyncModal i18n", () => {
it("localizes fixed workflow chrome while preserving raw table and SQL details as params", () => {
[
'差异分析完成',
'确认全量覆盖',
'全量覆盖会清空目标表数据后再插入,请确认已备份目标库。',
'跨库迁移工作台',
'数据同步工作台',
'请选择需要同步的表:',
'差异预览:',
'SQL 已复制',
'复制失败,请手动复制',
'复制 SQL',
"差异分析完成",
"确认全量覆盖",
"全量覆盖会清空目标表数据后再插入,请确认已备份目标库。",
"跨库迁移工作台",
"数据同步工作台",
"请选择需要同步的表:",
"差异预览:",
"SQL 已复制",
"复制失败,请手动复制",
"复制 SQL",
].forEach((snippet) => {
expect(source).not.toContain(snippet);
});
expect(source).toContain('useOptionalI18n()');
expect(source).toContain("tr('data_sync.message.analysis_complete')");
expect(source).toContain("tr('data_sync.modal.full_overwrite_title')");
expect(source).toContain("tr('data_sync.preview.title', { table: previewTable })");
expect(source).toContain("tr('data_sync.preview.message.sql_copied')");
expect(source).toContain("tr('data_sync.preview.message.copy_failed')");
expect(source).toContain("useOptionalI18n()");
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.message\.analysis_complete\1\s*\)/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.modal\.full_overwrite_title\1\s*\)/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.preview\.title\1,\s*\{\s*table:\s*previewTable\s*\}\s*\)/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.preview\.message\.sql_copied\1\s*\)/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.preview\.message\.copy_failed\1\s*\)/,
);
});
it('wraps backend details in localized shells without translating raw detail values', () => {
expect(source).not.toContain('message.error(res.message || "差异分析失败")');
expect(source).not.toContain('message.error("差异分析失败: " + (e?.message || ""))');
expect(source).not.toContain('message.error(res.message || "加载差异预览失败")');
it("wraps backend details in localized shells without translating raw detail values", () => {
expect(source).not.toContain(
'message.error(res.message || "差异分析失败")',
);
expect(source).not.toContain(
'message.error("差异分析失败: " + (e?.message || ""))',
);
expect(source).not.toContain(
'message.error(res.message || "加载差异预览失败")',
);
expect(source).toContain("tr('data_sync.message.analysis_failed_detail', { detail:");
expect(source).toContain("tr('data_sync.message.preview_load_failed_detail', { detail:");
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.message\.analysis_failed_detail\1,\s*\{\s*detail:/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.message\.preview_load_failed_detail\1,\s*\{\s*detail:/,
);
expect(source).toMatch(/tr\(\s*(['"])data_sync\.field\.schema\1\s*\)/);
expect(source).toMatch(
/t\(\s*(['"])data_sync\.message\.fetch_target_schemas_failed_detail\1,\s*\{\s*detail:/,
);
});
it('localizes compare-entry only chrome without translating SQL preview or raw table names', () => {
it("localizes compare-entry only chrome without translating SQL preview or raw table names", () => {
[
'当前入口只做差异分析和预览',
'按表比对',
'按 SQL 结果集比对',
'当前为“表结构比对”入口',
'当前为“数据比对”入口',
'生成目标表缺失字段的兼容变更 SQL',
'正在比对',
'比对完成',
'比对失败',
'当前阶段:',
'成功比对 ',
'分析日志',
'返回比对',
'行选择只影响 SQL 预览范围',
'SQL 预览会按当前勾选的插入/更新/删除',
'SQL 预览展示结构差异建议语句',
"当前入口只做差异分析和预览",
"按表比对",
"按 SQL 结果集比对",
"当前为“表结构比对”入口",
"当前为“数据比对”入口",
"生成目标表缺失字段的兼容变更 SQL",
"正在比对",
"比对完成",
"比对失败",
"当前阶段:",
"成功比对 ",
"分析日志",
"返回比对",
"行选择只影响 SQL 预览范围",
"SQL 预览会按当前勾选的插入/更新/删除",
"SQL 预览展示结构差异建议语句",
].forEach((snippet) => {
expect(source).not.toContain(snippet);
});
expect(source).toContain("tr('data_sync.compare_entry.workflow_help')");
expect(source).toContain("tr('data_sync.compare_entry.option.source_dataset.table')");
expect(source).toContain("tr('data_sync.compare_entry.result.running_description'");
expect(source).toContain("tr('data_sync.compare_entry.preview.sql.data_help')");
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.compare_entry\.workflow_help\1\s*\)/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.compare_entry\.option\.source_dataset\.table\1/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.compare_entry\.result\.running_description\1/,
);
expect(source).toMatch(
/tr\(\s*(['"])data_sync\.compare_entry\.preview\.sql\.data_help\1/,
);
});
});

File diff suppressed because it is too large Load Diff

View File

@@ -42,6 +42,7 @@ describe('buildDataSyncRequest', () => {
targetConfig: { type: 'mysql' },
sourceDatabase: ' app ',
targetDatabase: ' warehouse ',
targetSchema: ' reporting ',
selectedTables: ['users'],
sourceDatasetMode: 'query',
sourceQuery: ' SELECT id, name FROM active_users ',
@@ -65,6 +66,7 @@ describe('buildDataSyncRequest', () => {
createIndexes: false,
sourceDatabase: 'app',
targetDatabase: 'warehouse',
targetSchema: 'reporting',
jobId: 'job-1',
});
});

View File

@@ -8,6 +8,7 @@ type BuildDataSyncRequestParams = {
targetConfig: any;
sourceDatabase?: string;
targetDatabase?: string;
targetSchema?: string;
selectedTables: string[];
sourceDatasetMode: SourceDatasetMode;
sourceQuery: string;
@@ -64,6 +65,7 @@ export const buildDataSyncRequest = ({
targetConfig,
sourceDatabase,
targetDatabase,
targetSchema,
selectedTables,
sourceDatasetMode,
sourceQuery,
@@ -83,6 +85,7 @@ export const buildDataSyncRequest = ({
targetConfig,
sourceDatabase: String(sourceDatabase || '').trim(),
targetDatabase: String(targetDatabase || '').trim(),
targetSchema: String(targetSchema || '').trim(),
tables: selectedTables,
sourceQuery: isQueryMode ? String(sourceQuery || '').trim() : undefined,
content: isQueryMode ? 'data' : syncContent,

View File

@@ -0,0 +1,28 @@
import { describe, expect, it, vi } from "vitest";
vi.mock("../../../wailsjs/go/app/App", () => ({
DBQuery: vi.fn(),
}));
import { buildSchemasMetadataQuerySpecs } from "./sidebarMetadataLoaders";
describe("buildSchemasMetadataQuerySpecs", () => {
it("returns schema queries for independent-schema targets", () => {
expect(
buildSchemasMetadataQuerySpecs("sqlserver", "app_db")[0]?.sql,
).toContain(".sys.schemas");
expect(
buildSchemasMetadataQuerySpecs("iris", "USER")[0]?.sql.toLowerCase(),
).toContain("information_schema.schemata");
expect(
buildSchemasMetadataQuerySpecs(
"duckdb",
"analytics",
)[0]?.sql.toLowerCase(),
).toContain("information_schema.schemata");
});
it("keeps unsupported dialects empty", () => {
expect(buildSchemasMetadataQuerySpecs("mysql", "app")).toEqual([]);
});
});

File diff suppressed because it is too large Load Diff

View File

@@ -1,57 +1,71 @@
import { describe, expect, it } from 'vitest';
import { describe, expect, it } from "vitest";
import {
isPostgresSchemaDialect,
normalizeDriverType,
resolveConnectionDriverType,
resolveSavedConnectionDriverType,
} from './connectionDriverType';
supportsIndependentSchemaSelection,
} from "./connectionDriverType";
describe('connectionDriverType', () => {
it('normalizes built-in driver aliases shared by connection modal and sidebar', () => {
expect(normalizeDriverType('postgresql')).toBe('postgres');
expect(normalizeDriverType('pgx')).toBe('postgres');
expect(normalizeDriverType('elastic')).toBe('elasticsearch');
expect(normalizeDriverType('chromadb')).toBe('chroma');
expect(normalizeDriverType('chroma-db')).toBe('chroma');
expect(normalizeDriverType('qdrantdb')).toBe('qdrant');
expect(normalizeDriverType('qdrant-db')).toBe('qdrant');
expect(normalizeDriverType('apache-iotdb')).toBe('iotdb');
expect(normalizeDriverType('apache_iotdb')).toBe('iotdb');
expect(normalizeDriverType('apache-kafka')).toBe('kafka');
expect(normalizeDriverType('apache_kafka')).toBe('kafka');
expect(normalizeDriverType('doris')).toBe('diros');
expect(normalizeDriverType('open-gauss')).toBe('opengauss');
expect(normalizeDriverType('gauss-db')).toBe('gaussdb');
expect(normalizeDriverType('greatdb')).toBe('goldendb');
expect(normalizeDriverType('gdb')).toBe('goldendb');
expect(normalizeDriverType('InterSystemsIRIS')).toBe('iris');
describe("connectionDriverType", () => {
it("normalizes built-in driver aliases shared by connection modal and sidebar", () => {
expect(normalizeDriverType("postgresql")).toBe("postgres");
expect(normalizeDriverType("pgx")).toBe("postgres");
expect(normalizeDriverType("elastic")).toBe("elasticsearch");
expect(normalizeDriverType("chromadb")).toBe("chroma");
expect(normalizeDriverType("chroma-db")).toBe("chroma");
expect(normalizeDriverType("qdrantdb")).toBe("qdrant");
expect(normalizeDriverType("qdrant-db")).toBe("qdrant");
expect(normalizeDriverType("apache-iotdb")).toBe("iotdb");
expect(normalizeDriverType("apache_iotdb")).toBe("iotdb");
expect(normalizeDriverType("apache-kafka")).toBe("kafka");
expect(normalizeDriverType("apache_kafka")).toBe("kafka");
expect(normalizeDriverType("doris")).toBe("diros");
expect(normalizeDriverType("open-gauss")).toBe("opengauss");
expect(normalizeDriverType("gauss-db")).toBe("gaussdb");
expect(normalizeDriverType("greatdb")).toBe("goldendb");
expect(normalizeDriverType("gdb")).toBe("goldendb");
expect(normalizeDriverType("InterSystemsIRIS")).toBe("iris");
});
it('resolves custom connection driver types from the selected driver field', () => {
expect(resolveConnectionDriverType('mysql', 'postgresql')).toBe('mysql');
expect(resolveConnectionDriverType('custom', 'postgresql')).toBe('postgres');
expect(resolveConnectionDriverType('custom', 'open_gauss')).toBe('opengauss');
expect(resolveConnectionDriverType('custom', 'gauss_db')).toBe('gaussdb');
expect(resolveConnectionDriverType('custom', 'goldendb')).toBe('goldendb');
expect(resolveConnectionDriverType('custom', '')).toBe('');
it("resolves custom connection driver types from the selected driver field", () => {
expect(resolveConnectionDriverType("mysql", "postgresql")).toBe("mysql");
expect(resolveConnectionDriverType("custom", "postgresql")).toBe(
"postgres",
);
expect(resolveConnectionDriverType("custom", "open_gauss")).toBe(
"opengauss",
);
expect(resolveConnectionDriverType("custom", "gauss_db")).toBe("gaussdb");
expect(resolveConnectionDriverType("custom", "goldendb")).toBe("goldendb");
expect(resolveConnectionDriverType("custom", "")).toBe("");
});
it('resolves saved custom connections using the same driver aliases', () => {
it("resolves saved custom connections using the same driver aliases", () => {
const conn = {
config: {
type: 'custom',
driver: 'pg',
type: "custom",
driver: "pg",
},
} as any;
expect(resolveSavedConnectionDriverType(conn)).toBe('postgres');
expect(resolveSavedConnectionDriverType(conn)).toBe("postgres");
});
it('detects postgres-compatible schema dialects', () => {
expect(isPostgresSchemaDialect('postgres')).toBe(true);
expect(isPostgresSchemaDialect('kingbase')).toBe(true);
expect(isPostgresSchemaDialect('open-gauss')).toBe(true);
expect(isPostgresSchemaDialect('gauss-db')).toBe(true);
expect(isPostgresSchemaDialect('mysql')).toBe(false);
it("detects postgres-compatible schema dialects", () => {
expect(isPostgresSchemaDialect("postgres")).toBe(true);
expect(isPostgresSchemaDialect("kingbase")).toBe(true);
expect(isPostgresSchemaDialect("open-gauss")).toBe(true);
expect(isPostgresSchemaDialect("gauss-db")).toBe(true);
expect(isPostgresSchemaDialect("mysql")).toBe(false);
});
it("detects dialects that need independent schema selection", () => {
expect(supportsIndependentSchemaSelection("postgres")).toBe(true);
expect(supportsIndependentSchemaSelection("sqlserver")).toBe(true);
expect(supportsIndependentSchemaSelection("iris")).toBe(true);
expect(supportsIndependentSchemaSelection("duckdb")).toBe(true);
expect(supportsIndependentSchemaSelection("oracle")).toBe(false);
expect(supportsIndependentSchemaSelection("mysql")).toBe(false);
});
});

View File

@@ -1,4 +1,4 @@
import type { SavedConnection } from '../types';
import type { SavedConnection } from "../types";
export type DriverStatusSnapshot = {
type: string;
@@ -12,53 +12,102 @@ export type DriverStatusSnapshot = {
};
export const normalizeDriverType = (value: string): string => {
const normalized = String(value || '').trim().toLowerCase();
if (normalized === 'postgresql' || normalized === 'pg' || normalized === 'pq' || normalized === 'pgx') return 'postgres';
if (normalized === 'elastic') return 'elasticsearch';
if (normalized === 'chromadb' || normalized === 'chroma-db') return 'chroma';
if (normalized === 'qdrantdb' || normalized === 'qdrant-db') return 'qdrant';
if (normalized === 'rocket-mq' || normalized === 'rocket_mq' || normalized === 'apache-rocketmq' || normalized === 'apache_rocketmq' || normalized === 'rmq') return 'rocketmq';
if (normalized === 'apache-iotdb' || normalized === 'apache_iotdb') return 'iotdb';
if (normalized === 'mqtts') return 'mqtt';
if (normalized === 'apache-kafka' || normalized === 'apache_kafka') return 'kafka';
if (normalized === 'rabbit-mq' || normalized === 'rabbit_mq') return 'rabbitmq';
if (normalized === 'doris') return 'diros';
const normalized = String(value || "")
.trim()
.toLowerCase();
if (
normalized === 'open_gauss' ||
normalized === 'open-gauss' ||
normalized === 'opengauss'
) return 'opengauss';
normalized === "postgresql" ||
normalized === "pg" ||
normalized === "pq" ||
normalized === "pgx"
)
return "postgres";
if (normalized === "elastic") return "elasticsearch";
if (normalized === "chromadb" || normalized === "chroma-db") return "chroma";
if (normalized === "qdrantdb" || normalized === "qdrant-db") return "qdrant";
if (
normalized === 'gaussdb' ||
normalized === 'gauss_db' ||
normalized === 'gauss-db'
) return 'gaussdb';
normalized === "rocket-mq" ||
normalized === "rocket_mq" ||
normalized === "apache-rocketmq" ||
normalized === "apache_rocketmq" ||
normalized === "rmq"
)
return "rocketmq";
if (normalized === "apache-iotdb" || normalized === "apache_iotdb")
return "iotdb";
if (normalized === "mqtts") return "mqtt";
if (normalized === "apache-kafka" || normalized === "apache_kafka")
return "kafka";
if (normalized === "rabbit-mq" || normalized === "rabbit_mq")
return "rabbitmq";
if (normalized === "doris") return "diros";
if (
normalized === 'goldendb' ||
normalized === 'greatdb' ||
normalized === 'gdb'
) return 'goldendb';
normalized === "open_gauss" ||
normalized === "open-gauss" ||
normalized === "opengauss"
)
return "opengauss";
if (
normalized === 'intersystems' ||
normalized === 'intersystemsiris' ||
normalized === 'inter-systems' ||
normalized === 'inter-systems-iris'
) return 'iris';
normalized === "gaussdb" ||
normalized === "gauss_db" ||
normalized === "gauss-db"
)
return "gaussdb";
if (
normalized === "goldendb" ||
normalized === "greatdb" ||
normalized === "gdb"
)
return "goldendb";
if (
normalized === "intersystems" ||
normalized === "intersystemsiris" ||
normalized === "inter-systems" ||
normalized === "inter-systems-iris"
)
return "iris";
return normalized;
};
export const resolveConnectionDriverType = (type: string, driver?: string): string => {
export const resolveConnectionDriverType = (
type: string,
driver?: string,
): string => {
const normalizedType = normalizeDriverType(type);
if (normalizedType !== 'custom') {
if (normalizedType !== "custom") {
return normalizedType;
}
return normalizeDriverType(driver || '');
return normalizeDriverType(driver || "");
};
export const resolveSavedConnectionDriverType = (conn: SavedConnection | undefined): string => {
return resolveConnectionDriverType(conn?.config?.type || '', conn?.config?.driver || '');
export const resolveSavedConnectionDriverType = (
conn: SavedConnection | undefined,
): string => {
return resolveConnectionDriverType(
conn?.config?.type || "",
conn?.config?.driver || "",
);
};
export const isPostgresSchemaDialect = (dialect: string): boolean => (
['postgres', 'kingbase', 'highgo', 'vastbase', 'opengauss', 'gaussdb'].includes(normalizeDriverType(dialect))
);
export const isPostgresSchemaDialect = (dialect: string): boolean =>
[
"postgres",
"kingbase",
"highgo",
"vastbase",
"opengauss",
"gaussdb",
].includes(normalizeDriverType(dialect));
export const supportsIndependentSchemaSelection = (dialect: string): boolean =>
[
"postgres",
"kingbase",
"highgo",
"vastbase",
"opengauss",
"gaussdb",
"sqlserver",
"iris",
"duckdb",
].includes(normalizeDriverType(dialect));

View File

@@ -10,8 +10,8 @@ import (
func buildMySQLToClickHousePlan(config SyncConfig, tableName string, sourceDB db.Database, targetDB db.Database) (SchemaMigrationPlan, []connection.ColumnDefinition, []connection.ColumnDefinition, error) {
plan := SchemaMigrationPlan{}
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(config.SourceConfig.Type, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(config.TargetConfig.Type, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(config.SourceConfig.Type, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(config.TargetConfig.Type, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -70,8 +70,8 @@ func buildPGLikeToClickHousePlan(config SyncConfig, tableName string, sourceDB d
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -128,8 +128,8 @@ func buildPGLikeToClickHousePlan(config SyncConfig, tableName string, sourceDB d
func buildClickHouseToMySQLPlan(config SyncConfig, tableName string, sourceDB db.Database, targetDB db.Database) (SchemaMigrationPlan, []connection.ColumnDefinition, []connection.ColumnDefinition, error) {
plan := SchemaMigrationPlan{}
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(config.SourceConfig.Type, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(config.TargetConfig.Type, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(config.SourceConfig.Type, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(config.TargetConfig.Type, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -187,8 +187,8 @@ func buildClickHouseToPGLikePlan(config SyncConfig, tableName string, sourceDB d
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -247,8 +247,8 @@ func buildClickHouseToClickHousePlan(config SyncConfig, tableName string, source
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"

View File

@@ -434,8 +434,8 @@ func (mongoToRelationalPlanner) BuildPlan(ctx MigrationBuildContext) (SchemaMigr
return SchemaMigrationPlan{}, nil, nil, err
}
plan := SchemaMigrationPlan{}
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(ctx.Config), ctx.TableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(ctx.Config), ctx.TableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(ctx.Config, ctx.TableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(ctx.Config, ctx.TableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, ctx.TableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, ctx.TableName)
plan.PlannedAction = "当前库对已进入迁移内核规划阶段,等待 schema 推断与目标方言生成器落地"

View File

@@ -30,8 +30,8 @@ func buildTabularToMongoPlan(config SyncConfig, tableName string, sourceDB db.Da
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标集合导入"
@@ -91,8 +91,8 @@ func buildMongoToMongoPlan(config SyncConfig, tableName string, sourceDB db.Data
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标集合导入"
@@ -154,8 +154,8 @@ func buildMongoToMongoPlan(config SyncConfig, tableName string, sourceDB db.Data
func buildMongoToMySQLPlan(config SyncConfig, tableName string, sourceDB db.Database, targetDB db.Database) (SchemaMigrationPlan, []connection.ColumnDefinition, []connection.ColumnDefinition, error) {
plan := SchemaMigrationPlan{}
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(config.SourceConfig.Type, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(config.TargetConfig.Type, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(config.SourceConfig.Type, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(config.TargetConfig.Type, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -497,8 +497,8 @@ func moveStringToFront(items []string, target string) []string {
func buildMongoToPGLikePlan(config SyncConfig, tableName string, sourceDB db.Database, targetDB db.Database) (SchemaMigrationPlan, []connection.ColumnDefinition, []connection.ColumnDefinition, error) {
plan := SchemaMigrationPlan{}
targetType := strings.ToLower(strings.TrimSpace(config.TargetConfig.Type))
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(config.SourceConfig.Type, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(config.TargetConfig.Type, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(config.SourceConfig.Type, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(config.TargetConfig.Type, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"

View File

@@ -12,8 +12,8 @@ func buildTDengineToMySQLPlan(config SyncConfig, tableName string, sourceDB db.D
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -67,8 +67,8 @@ func buildTDengineToPGLikePlan(config SyncConfig, tableName string, sourceDB db.
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"

View File

@@ -100,8 +100,8 @@ func buildSourceToTDenginePlan(config SyncConfig, tableName string, sourceDB db.
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"

View File

@@ -12,12 +12,16 @@ func normalizeMigrationDBType(dbType string) string {
return "diros"
case "postgresql":
return "postgres"
case "mssql", "sql_server", "sql-server":
return "sqlserver"
case "kingbase8", "kingbasees", "kingbasev8":
return "kingbase"
case "opengauss", "open_gauss", "open-gauss":
return "opengauss"
case "gaussdb", "gauss_db", "gauss-db":
return "gaussdb"
case "intersystems", "intersystemsiris", "inter-systems", "inter-systems-iris":
return "iris"
case "dm", "dm8":
return "dameng"
case "sqlite3":
@@ -37,10 +41,14 @@ func resolveMigrationDBType(config connection.ConnectionConfig) string {
switch driver {
case "postgresql", "postgres", "pg", "pq", "pgx":
return "postgres"
case "mssql", "sqlserver", "sql_server", "sql-server":
return "sqlserver"
case "opengauss", "open_gauss", "open-gauss":
return "opengauss"
case "gaussdb", "gauss_db", "gauss-db":
return "gaussdb"
case "intersystems", "intersystemsiris", "inter-systems", "inter-systems-iris", "iris":
return "iris"
case "dm", "dameng", "dm8":
return "dameng"
case "sqlite3", "sqlite":

View File

@@ -98,8 +98,8 @@ func buildSchemaMigrationPlanLegacy(config SyncConfig, tableName string, sourceD
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -595,8 +595,8 @@ func buildMySQLToMySQLPlan(config SyncConfig, tableName string, sourceDB db.Data
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -834,8 +834,8 @@ func buildPGLikeToPGLikePlan(config SyncConfig, tableName string, sourceDB db.Da
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -1080,8 +1080,8 @@ func buildPGLikeToMySQLPlan(config SyncConfig, tableName string, sourceDB db.Dat
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"
@@ -1371,8 +1371,8 @@ func buildMySQLToPGLikePlan(config SyncConfig, tableName string, sourceDB db.Dat
plan := SchemaMigrationPlan{}
sourceType := resolveMigrationDBType(config.SourceConfig)
targetType := resolveMigrationDBType(config.TargetConfig)
plan.SourceSchema, plan.SourceTable = normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
plan.TargetSchema, plan.TargetTable = normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
plan.SourceSchema, plan.SourceTable = normalizeSyncSourceSchemaAndTable(config, tableName)
plan.TargetSchema, plan.TargetTable = normalizeSyncTargetSchemaAndTable(config, tableName)
plan.SourceQueryTable = qualifiedNameForQuery(sourceType, plan.SourceSchema, plan.SourceTable, tableName)
plan.TargetQueryTable = qualifiedNameForQuery(targetType, plan.TargetSchema, plan.TargetTable, tableName)
plan.PlannedAction = "使用已有目标表导入"

View File

@@ -14,8 +14,8 @@ func (s *SyncEngine) syncTableSchema(config SyncConfig, res *SyncResult, sourceD
}
sourceType := resolveMigrationDBType(config.SourceConfig)
sourceSchema, sourceTable := normalizeSchemaAndTable(sourceType, selectedSyncSourceDatabase(config), tableName)
targetSchema, targetTable := normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
sourceSchema, sourceTable := normalizeSyncSourceSchemaAndTable(config, tableName)
targetSchema, targetTable := normalizeSyncTargetSchemaAndTable(config, tableName)
targetQueryTable := qualifiedNameForQuery(targetType, targetSchema, targetTable, tableName)
// 1) 获取源表字段

View File

@@ -47,7 +47,7 @@ func validateSourceQuerySyncConfig(config SyncConfig) (string, error) {
func resolveTargetQueryTable(config SyncConfig, tableName string) (string, string, string, string) {
targetType := resolveMigrationDBType(config.TargetConfig)
targetSchema, targetTable := normalizeSchemaAndTable(targetType, selectedSyncTargetDatabase(config), tableName)
targetSchema, targetTable := normalizeSyncTargetSchemaAndTable(config, tableName)
targetQueryTable := qualifiedNameForQuery(targetType, targetSchema, targetTable, tableName)
return targetType, targetSchema, targetTable, targetQueryTable
}
@@ -504,8 +504,7 @@ func (s *SyncEngine) runSourceQuerySync(config SyncConfig) SyncResult {
updates := make([]connection.UpdateRow, 0)
deletes := make([]map[string]interface{}, 0)
applyTableName := ctx.TargetTable
switch ctx.TargetType {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "sqlserver":
if shouldUseQualifiedSyncApplyTable(config.TargetConfig) {
applyTableName = ctx.TargetQueryTable
}

View File

@@ -140,6 +140,26 @@ func quoteQualifiedIdentByType(dbType string, ident string) string {
return strings.Join(quotedParts, ".")
}
func qualifySchemaTableName(schema string, table string) string {
rawSchema := strings.TrimSpace(schema)
rawTable := strings.TrimSpace(table)
if rawTable == "" || rawSchema == "" || strings.Contains(rawTable, ".") {
return rawTable
}
return rawSchema + "." + rawTable
}
func lastSyncTableIdentifier(tableName string) string {
parts := strings.Split(strings.TrimSpace(tableName), ".")
for i := len(parts) - 1; i >= 0; i-- {
part := strings.TrimSpace(parts[i])
if part != "" {
return part
}
}
return strings.TrimSpace(tableName)
}
func normalizeSchemaAndTable(dbType string, dbName string, tableName string) (string, string) {
rawTable := strings.TrimSpace(tableName)
rawDB := strings.TrimSpace(dbName)
@@ -148,6 +168,10 @@ func normalizeSchemaAndTable(dbType string, dbName string, tableName string) (st
}
normalizedType := normalizeMigrationDBType(dbType)
switch normalizedType {
case "sqlserver", "duckdb":
return rawDB, rawTable
}
if normalizedType == "kingbase" {
schema, table := db.SplitKingbaseQualifiedName(rawTable)
if schema != "" && table != "" {
@@ -176,14 +200,86 @@ func normalizeSchemaAndTable(dbType string, dbName string, tableName string) (st
}
}
func normalizeSchemaAndTableWithDefaultSchema(dbType string, dbName string, defaultSchema string, tableName string) (string, string) {
rawDefaultSchema := strings.TrimSpace(defaultSchema)
if rawDefaultSchema == "" {
return normalizeSchemaAndTable(dbType, dbName, tableName)
}
rawTable := strings.TrimSpace(tableName)
if rawTable == "" {
switch normalizeMigrationDBType(dbType) {
case "sqlserver", "duckdb":
return strings.TrimSpace(dbName), rawTable
default:
return rawDefaultSchema, rawTable
}
}
switch normalizeMigrationDBType(dbType) {
case "sqlserver", "duckdb":
return strings.TrimSpace(dbName), qualifySchemaTableName(rawDefaultSchema, rawTable)
}
if parts := strings.SplitN(rawTable, ".", 2); len(parts) == 2 {
schema := strings.TrimSpace(parts[0])
table := strings.TrimSpace(parts[1])
if schema != "" && table != "" {
return schema, table
}
}
switch normalizeMigrationDBType(dbType) {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "iris":
return rawDefaultSchema, rawTable
default:
return normalizeSchemaAndTable(dbType, dbName, tableName)
}
}
func normalizeSyncSourceSchemaAndTable(config SyncConfig, tableName string) (string, string) {
return normalizeSchemaAndTableWithDefaultSchema(config.SourceConfig.Type, selectedSyncSourceDatabase(config), "", tableName)
}
func normalizeSyncTargetSchemaAndTable(config SyncConfig, tableName string) (string, string) {
targetSchema := strings.TrimSpace(config.TargetSchema)
if targetSchema == "" || strings.TrimSpace(config.SourceQuery) != "" {
return normalizeSchemaAndTableWithDefaultSchema(config.TargetConfig.Type, selectedSyncTargetDatabase(config), targetSchema, tableName)
}
return normalizeSchemaAndTableWithDefaultSchema(
config.TargetConfig.Type,
selectedSyncTargetDatabase(config),
targetSchema,
lastSyncTableIdentifier(tableName),
)
}
func shouldUseQualifiedSyncApplyTable(config connection.ConnectionConfig) bool {
switch resolveMigrationDBType(config) {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "sqlserver", "oracle", "dameng", "iris", "duckdb":
return true
case "oceanbase":
return isOceanBaseOracleSyncConnection(config)
default:
return false
}
}
func qualifiedNameForQuery(dbType string, schema string, table string, original string) string {
raw := strings.TrimSpace(original)
rawTable := strings.TrimSpace(table)
if raw == "" {
return raw
}
if strings.Contains(raw, ".") {
return raw
}
if rawTable == "" {
return raw
}
if strings.Contains(rawTable, ".") {
return rawTable
}
switch normalizeMigrationDBType(dbType) {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb":
@@ -191,26 +287,24 @@ func qualifiedNameForQuery(dbType string, schema string, table string, original
if s == "" {
s = "public"
}
if table == "" {
return raw
}
return s + "." + table
return s + "." + rawTable
case "duckdb":
return "main." + rawTable
case "sqlserver":
return "dbo." + rawTable
case "oracle", "dameng", "iris":
s := strings.TrimSpace(schema)
if s == "" {
s = "main"
return rawTable
}
if table == "" {
return raw
}
return s + "." + table
return s + "." + rawTable
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "clickhouse", "tdengine":
s := strings.TrimSpace(schema)
if s == "" || table == "" {
return table
if s == "" {
return rawTable
}
return s + "." + table
return s + "." + rawTable
default:
return table
return rawTable
}
}

View File

@@ -52,6 +52,82 @@ func TestNormalizeSchemaAndTable_KingbaseNormalizesEscapedQualifiedName(t *testi
}
}
func TestNormalizeSyncTargetSchemaAndTable_UsesExplicitSchemaForPGLikeTargets(t *testing.T) {
t.Parallel()
config := SyncConfig{
TargetConfig: connection.ConnectionConfig{Type: "opengauss", Database: "analytics"},
TargetDatabase: "analytics",
TargetSchema: "reporting",
}
schema, table := normalizeSyncTargetSchemaAndTable(config, "orders")
if schema != "reporting" || table != "orders" {
t.Fatalf("unexpected opengauss target schema/table: %q.%q", schema, table)
}
}
func TestNormalizeSyncTargetSchemaAndTable_UsesTargetSchemaWhenSourceTableIsQualified(t *testing.T) {
t.Parallel()
config := SyncConfig{
TargetConfig: connection.ConnectionConfig{Type: "opengauss", Database: "analytics"},
TargetDatabase: "analytics",
TargetSchema: "reporting",
}
schema, table := normalizeSyncTargetSchemaAndTable(config, "archive.orders")
if schema != "reporting" || table != "orders" {
t.Fatalf("unexpected qualified source table override result: %q.%q", schema, table)
}
}
func TestNormalizeSyncTargetSchemaAndTable_KeepsQualifiedTargetTableInSourceQueryMode(t *testing.T) {
t.Parallel()
config := SyncConfig{
TargetConfig: connection.ConnectionConfig{Type: "opengauss", Database: "analytics"},
TargetDatabase: "analytics",
TargetSchema: "reporting",
SourceQuery: "select * from archive.orders",
}
schema, table := normalizeSyncTargetSchemaAndTable(config, "archive.orders")
if schema != "archive" || table != "orders" {
t.Fatalf("unexpected qualified target schema/table: %q.%q", schema, table)
}
}
func TestNormalizeSyncTargetSchemaAndTable_UsesQualifiedTableForSQLServerExplicitSchema(t *testing.T) {
t.Parallel()
config := SyncConfig{
TargetConfig: connection.ConnectionConfig{Type: "sqlserver", Database: "warehouse"},
TargetDatabase: "warehouse",
TargetSchema: "sales",
}
schema, table := normalizeSyncTargetSchemaAndTable(config, "archive.orders")
if schema != "warehouse" || table != "sales.orders" {
t.Fatalf("unexpected sqlserver target schema/table: %q.%q", schema, table)
}
}
func TestNormalizeSyncTargetSchemaAndTable_UsesQualifiedTableForDuckDBExplicitSchema(t *testing.T) {
t.Parallel()
config := SyncConfig{
TargetConfig: connection.ConnectionConfig{Type: "duckdb", Database: "analytics"},
TargetDatabase: "analytics",
TargetSchema: "reporting",
}
schema, table := normalizeSyncTargetSchemaAndTable(config, "orders")
if schema != "analytics" || table != "reporting.orders" {
t.Fatalf("unexpected duckdb target schema/table: %q.%q", schema, table)
}
}
func TestNormalizeSyncConnectionDatabasesKeepsOracleServiceName(t *testing.T) {
t.Parallel()
@@ -105,6 +181,57 @@ func TestNormalizeMigrationDBType_KingbaseAliases(t *testing.T) {
}
}
func TestNormalizeMigrationDBType_SQLServerAndIRISAliases(t *testing.T) {
t.Parallel()
if got := normalizeMigrationDBType("mssql"); got != "sqlserver" {
t.Fatalf("normalizeMigrationDBType(%q)=%q, want sqlserver", "mssql", got)
}
if got := resolveMigrationDBType(connection.ConnectionConfig{Type: "custom", Driver: "inter-systems-iris"}); got != "iris" {
t.Fatalf("resolveMigrationDBType(custom iris)=%q, want iris", got)
}
}
func TestQualifiedNameForQuery_UsesSchemaAwareTargets(t *testing.T) {
t.Parallel()
if got := qualifiedNameForQuery("sqlserver", "warehouse", "orders", "orders"); got != "dbo.orders" {
t.Fatalf("unexpected sqlserver qualified name: %s", got)
}
if got := qualifiedNameForQuery("sqlserver", "warehouse", "sales.orders", "orders"); got != "sales.orders" {
t.Fatalf("unexpected sqlserver explicit qualified name: %s", got)
}
if got := qualifiedNameForQuery("oracle", "APP_SCHEMA", "orders", "orders"); got != "APP_SCHEMA.orders" {
t.Fatalf("unexpected oracle qualified name: %s", got)
}
if got := qualifiedNameForQuery("dameng", "APP_SCHEMA", "orders", "orders"); got != "APP_SCHEMA.orders" {
t.Fatalf("unexpected dameng qualified name: %s", got)
}
if got := qualifiedNameForQuery("iris", "APP_SCHEMA", "orders", "orders"); got != "APP_SCHEMA.orders" {
t.Fatalf("unexpected iris qualified name: %s", got)
}
if got := qualifiedNameForQuery("duckdb", "analytics", "reporting.orders", "orders"); got != "reporting.orders" {
t.Fatalf("unexpected duckdb qualified name: %s", got)
}
}
func TestShouldUseQualifiedSyncApplyTable(t *testing.T) {
t.Parallel()
if !shouldUseQualifiedSyncApplyTable(connection.ConnectionConfig{Type: "oracle"}) {
t.Fatal("oracle should apply against qualified target table")
}
if !shouldUseQualifiedSyncApplyTable(connection.ConnectionConfig{Type: "duckdb"}) {
t.Fatal("duckdb should apply against qualified target table")
}
if !shouldUseQualifiedSyncApplyTable(connection.ConnectionConfig{Type: "oceanbase", OceanBaseProtocol: "oracle"}) {
t.Fatal("oceanbase oracle should apply against qualified target table")
}
if shouldUseQualifiedSyncApplyTable(connection.ConnectionConfig{Type: "mysql"}) {
t.Fatal("mysql should keep raw target table for apply")
}
}
func TestBuildPagedSourceTableQuery_MySQLUsesStablePKPagination(t *testing.T) {
t.Parallel()

View File

@@ -19,6 +19,7 @@ type SyncConfig struct {
TargetConfig connection.ConnectionConfig `json:"targetConfig"`
SourceDatabase string `json:"sourceDatabase,omitempty"`
TargetDatabase string `json:"targetDatabase,omitempty"`
TargetSchema string `json:"targetSchema,omitempty"`
Tables []string `json:"tables"`
SourceQuery string `json:"sourceQuery,omitempty"`
Content string `json:"content,omitempty"` // "data", "schema", "both"
@@ -213,8 +214,7 @@ func (s *SyncEngine) RunSync(config SyncConfig) SyncResult {
targetTable := plan.TargetTable
sourceQueryTable, targetQueryTable := plan.SourceQueryTable, plan.TargetQueryTable
applyTableName := targetTable
switch targetType {
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "sqlserver":
if shouldUseQualifiedSyncApplyTable(config.TargetConfig) {
applyTableName = targetQueryTable
}

View File

@@ -4847,5 +4847,8 @@
"sidebar.v2_database_menu.export_backup_section": "Export und Sicherung",
"sidebar.v2_database_menu.export_all_table_schema_sql": "Schemas aller Tabellen exportieren · SQL",
"sidebar.v2_database_menu.backup_all_tables_sql": "Alle Tabellen sichern · Schema + Daten SQL",
"ai_settings.message.load_provider_failed": "Anbieterkonfiguration konnte nicht gelesen werden"
"ai_settings.message.load_provider_failed": "Anbieterkonfiguration konnte nicht gelesen werden",
"data_sync.field.schema": "Schema",
"data_sync.message.fetch_target_schemas_failed_detail": "Zielschemas konnten nicht geladen werden: {{detail}}",
"data_sync.message.select_target_schema": "Zielschema auswählen"
}

View File

@@ -4847,5 +4847,8 @@
"app.theme.data_table.font_size": "Data Table Font Size",
"app.theme.data_table.sidebar_tree_font_size": "Sidebar Schema Tree Font Size",
"app.theme.data_table.follow_global": "Follow Global",
"ai_settings.message.load_provider_failed": "Failed to read provider configuration"
"ai_settings.message.load_provider_failed": "Failed to read provider configuration",
"data_sync.field.schema": "Schema",
"data_sync.message.fetch_target_schemas_failed_detail": "Failed to fetch target schemas: {{detail}}",
"data_sync.message.select_target_schema": "Select target schema"
}

View File

@@ -4847,5 +4847,8 @@
"sidebar.v2_database_menu.export_backup_section": "エクスポートとバックアップ",
"sidebar.v2_database_menu.export_all_table_schema_sql": "すべてのテーブル構造をエクスポート · SQL",
"sidebar.v2_database_menu.backup_all_tables_sql": "すべてのテーブルをバックアップ · スキーマ + データ SQL",
"ai_settings.message.load_provider_failed": "プロバイダー設定の読み込みに失敗しました"
"ai_settings.message.load_provider_failed": "プロバイダー設定の読み込みに失敗しました",
"data_sync.field.schema": "スキーマ",
"data_sync.message.fetch_target_schemas_failed_detail": "ターゲットスキーマの取得に失敗しました: {{detail}}",
"data_sync.message.select_target_schema": "ターゲットスキーマを選択してください"
}

View File

@@ -4847,5 +4847,8 @@
"sidebar.v2_database_menu.export_backup_section": "Экспорт и резервное копирование",
"sidebar.v2_database_menu.export_all_table_schema_sql": "Экспортировать схемы всех таблиц · SQL",
"sidebar.v2_database_menu.backup_all_tables_sql": "Резервная копия всех таблиц · схема + данные SQL",
"ai_settings.message.load_provider_failed": "Не удалось прочитать конфигурацию поставщика"
"ai_settings.message.load_provider_failed": "Не удалось прочитать конфигурацию поставщика",
"data_sync.field.schema": "Схема",
"data_sync.message.fetch_target_schemas_failed_detail": "Не удалось получить целевые схемы: {{detail}}",
"data_sync.message.select_target_schema": "Выберите целевую схему"
}

View File

@@ -4847,5 +4847,8 @@
"app.theme.data_table.font_size": "数据表字体大小",
"app.theme.data_table.sidebar_tree_font_size": "左侧库表字体大小",
"app.theme.data_table.follow_global": "跟随全局",
"ai_settings.message.load_provider_failed": "读取供应商配置失败"
"ai_settings.message.load_provider_failed": "读取供应商配置失败",
"data_sync.field.schema": "模式",
"data_sync.message.fetch_target_schemas_failed_detail": "获取目标模式失败: {{detail}}",
"data_sync.message.select_target_schema": "请选择目标模式"
}

View File

@@ -4847,5 +4847,8 @@
"sidebar.v2_database_menu.export_backup_section": "匯出與備份",
"sidebar.v2_database_menu.export_all_table_schema_sql": "匯出全部資料表結構 · SQL",
"sidebar.v2_database_menu.backup_all_tables_sql": "備份全部資料表 · 結構 + 資料 SQL",
"ai_settings.message.load_provider_failed": "讀取供應商設定失敗"
"ai_settings.message.load_provider_failed": "讀取供應商設定失敗",
"data_sync.field.schema": "模式",
"data_sync.message.fetch_target_schemas_failed_detail": "取得目標模式失敗: {{detail}}",
"data_sync.message.select_target_schema": "請選擇目標模式"
}