Files
MyGoNavi/internal/sync/schema_migration_test.go
Syngnat 0daf702d25 feat(data-sync): 扩展跨库迁移链路并优化数据同步交互
- 统一同库同步与跨库迁移入口,补充模式区分与风险提示
- 扩展 ClickHouse 与 PG-like 双向迁移,并新增 PG-like、ClickHouse、TDengine 到 MongoDB 的迁移路由
- 完善 TDengine 目标端建表规划、回归测试与需求追踪文档
- refs #51
2026-03-09 17:22:26 +08:00

958 lines
36 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"GoNavi-Wails/internal/connection"
"context"
"reflect"
"strings"
"testing"
)
type fakeMigrationDB struct {
columns map[string][]connection.ColumnDefinition
indexes map[string][]connection.IndexDefinition
queryData map[string][]map[string]interface{}
queryCols map[string][]string
}
func (f *fakeMigrationDB) Connect(config connection.ConnectionConfig) error { return nil }
func (f *fakeMigrationDB) Close() error { return nil }
func (f *fakeMigrationDB) Ping() error { return nil }
func (f *fakeMigrationDB) Query(query string) ([]map[string]interface{}, []string, error) {
if rows, ok := f.queryData[query]; ok {
return rows, f.queryCols[query], nil
}
return nil, nil, nil
}
func (f *fakeMigrationDB) Exec(query string) (int64, error) { return 0, nil }
func (f *fakeMigrationDB) GetDatabases() ([]string, error) { return nil, nil }
func (f *fakeMigrationDB) GetTables(dbName string) ([]string, error) {
return nil, nil
}
func (f *fakeMigrationDB) GetCreateStatement(dbName, tableName string) (string, error) {
return "", nil
}
func (f *fakeMigrationDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) {
key := dbName + "." + tableName
if rows, ok := f.columns[key]; ok {
return rows, nil
}
return []connection.ColumnDefinition{}, nil
}
func (f *fakeMigrationDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) {
return nil, nil
}
func (f *fakeMigrationDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) {
key := dbName + "." + tableName
if rows, ok := f.indexes[key]; ok {
return rows, nil
}
return nil, nil
}
func (f *fakeMigrationDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) {
return nil, nil
}
func (f *fakeMigrationDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) {
return nil, nil
}
func (f *fakeMigrationDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
return f.Query(query)
}
func (f *fakeMigrationDB) ExecContext(ctx context.Context, query string) (int64, error) {
return 0, nil
}
func TestBuildMySQLToKingbaseColumnDefinition_AutoIncrementAndBoolean(t *testing.T) {
t.Parallel()
def, warnings := buildMySQLToKingbaseColumnDefinition(connection.ColumnDefinition{
Name: "id",
Type: "int unsigned",
Nullable: "NO",
Extra: "auto_increment",
})
if !strings.Contains(def, "bigint") || !strings.Contains(def, "GENERATED BY DEFAULT AS IDENTITY") || !strings.Contains(def, "NOT NULL") {
t.Fatalf("unexpected definition: %s", def)
}
if len(warnings) != 0 {
t.Fatalf("unexpected warnings: %v", warnings)
}
def, warnings = buildMySQLToKingbaseColumnDefinition(connection.ColumnDefinition{
Name: "enabled",
Type: "tinyint(1)",
Nullable: "YES",
Default: stringPtr("1"),
})
if !strings.Contains(def, "boolean") || !strings.Contains(def, "DEFAULT TRUE") {
t.Fatalf("unexpected boolean definition: %s", def)
}
if len(warnings) != 0 {
t.Fatalf("unexpected warnings for boolean: %v", warnings)
}
}
func TestBuildMySQLToKingbaseCreateTablePlan_GeneratesAndSkipsIndexes(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
indexes: map[string][]connection.IndexDefinition{
"shop.orders": {
{Name: "PRIMARY", ColumnName: "id", NonUnique: 0, SeqInIndex: 1, IndexType: "BTREE"},
{Name: "idx_user_status", ColumnName: "user_id", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE"},
{Name: "idx_user_status", ColumnName: "status", NonUnique: 1, SeqInIndex: 2, IndexType: "BTREE"},
{Name: "idx_name_prefix", ColumnName: "name", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE", SubPart: 12},
{Name: "idx_fulltext_note", ColumnName: "note", NonUnique: 1, SeqInIndex: 1, IndexType: "FULLTEXT"},
},
},
}
cols := []connection.ColumnDefinition{
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI", Extra: "auto_increment"},
{Name: "user_id", Type: "bigint", Nullable: "NO"},
{Name: "status", Type: "varchar(32)", Nullable: "YES"},
{Name: "name", Type: "varchar(128)", Nullable: "YES"},
{Name: "note", Type: "text", Nullable: "YES"},
}
cfg := SyncConfig{CreateIndexes: true}
createSQL, postSQL, warnings, unsupported, idxCreate, idxSkip, err := buildMySQLToKingbaseCreateTablePlan(cfg, "public.orders", cols, sourceDB, "shop", "orders")
if err != nil {
t.Fatalf("buildMySQLToKingbaseCreateTablePlan returned error: %v", err)
}
if !strings.Contains(createSQL, `CREATE TABLE "public"."orders"`) {
t.Fatalf("unexpected create SQL: %s", createSQL)
}
if !strings.Contains(createSQL, `PRIMARY KEY ("id")`) {
t.Fatalf("create SQL missing primary key: %s", createSQL)
}
if idxCreate != 1 || idxSkip != 2 {
t.Fatalf("unexpected index summary: create=%d skip=%d", idxCreate, idxSkip)
}
if len(postSQL) != 1 || !strings.Contains(postSQL[0], `CREATE INDEX "idx_user_status"`) {
t.Fatalf("unexpected post SQL: %v", postSQL)
}
if len(warnings) != 0 {
t.Fatalf("unexpected warnings: %v", warnings)
}
wantUnsupported := []string{
"索引 idx_name_prefix 使用前缀长度,当前暂不支持迁移",
"索引 idx_fulltext_note 类型=FULLTEXT当前暂不支持自动迁移",
}
if !reflect.DeepEqual(unsupported, wantUnsupported) {
t.Fatalf("unexpected unsupported objects: got=%v want=%v", unsupported, wantUnsupported)
}
}
func TestBuildSchemaMigrationPlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"shop.orders": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI", Extra: "auto_increment"},
{Name: "name", Type: "varchar(128)", Nullable: "YES"},
},
},
indexes: map[string][]connection.IndexDefinition{},
}
targetDB := &fakeMigrationDB{columns: map[string][]connection.ColumnDefinition{}}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "shop"},
TargetConfig: connection.ConnectionConfig{Type: "kingbase", Database: "demo"},
TargetTableStrategy: "smart",
CreateIndexes: true,
}
plan, sourceCols, targetCols, err := buildSchemaMigrationPlan(cfg, "orders", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildSchemaMigrationPlan returned error: %v", err)
}
if len(sourceCols) != 2 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if plan.TargetTableExists {
t.Fatalf("expected target table missing")
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.PlannedAction, "自动建表") {
t.Fatalf("unexpected planned action: %s", plan.PlannedAction)
}
if !strings.Contains(plan.CreateTableSQL, `CREATE TABLE "public"."orders"`) {
t.Fatalf("unexpected create table SQL: %s", plan.CreateTableSQL)
}
}
func stringPtr(v string) *string { return &v }
func TestBuildPGLikeToMySQLCreateTablePlan_GeneratesMySQLDDL(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
indexes: map[string][]connection.IndexDefinition{
"public.users": {
{Name: "users_email_key", ColumnName: "email", NonUnique: 0, SeqInIndex: 1, IndexType: "BTREE"},
{Name: "idx_users_name", ColumnName: "name", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE"},
},
},
}
cols := []connection.ColumnDefinition{
{Name: "id", Type: "integer", Nullable: "NO", Key: "PRI", Extra: "auto_increment"},
{Name: "email", Type: "character varying(120)", Nullable: "NO"},
{Name: "name", Type: "text", Nullable: "YES"},
{Name: "profile", Type: "jsonb", Nullable: "YES"},
}
cfg := SyncConfig{CreateIndexes: true}
createSQL, postSQL, warnings, unsupported, idxCreate, idxSkip, err := buildPGLikeToMySQLCreateTablePlan(cfg, "app.users", cols, sourceDB, "public", "users")
if err != nil {
t.Fatalf("buildPGLikeToMySQLCreateTablePlan returned error: %v", err)
}
if !strings.Contains(createSQL, "CREATE TABLE `app`.`users`") {
t.Fatalf("unexpected create SQL: %s", createSQL)
}
if !strings.Contains(createSQL, "`id` int AUTO_INCREMENT NOT NULL") {
t.Fatalf("unexpected id definition: %s", createSQL)
}
if !strings.Contains(createSQL, "`profile` json") {
t.Fatalf("unexpected json definition: %s", createSQL)
}
if idxCreate != 2 || idxSkip != 0 {
t.Fatalf("unexpected index summary: create=%d skip=%d", idxCreate, idxSkip)
}
if len(postSQL) != 2 {
t.Fatalf("unexpected post sql length: %v", postSQL)
}
if len(warnings) != 0 {
t.Fatalf("unexpected warnings: %v", warnings)
}
if len(unsupported) != 0 {
t.Fatalf("unexpected unsupported: %v", unsupported)
}
}
func TestBuildPGLikeToMySQLPlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"public.orders": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI", Extra: "auto_increment"},
{Name: "amount", Type: "numeric(10,2)", Nullable: "NO"},
},
},
indexes: map[string][]connection.IndexDefinition{},
}
targetDB := &fakeMigrationDB{columns: map[string][]connection.ColumnDefinition{}}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "kingbase", Database: "public"},
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
TargetTableStrategy: "smart",
CreateIndexes: true,
}
plan, sourceCols, targetCols, err := buildPGLikeToMySQLPlan(cfg, "orders", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildPGLikeToMySQLPlan returned error: %v", err)
}
if len(sourceCols) != 2 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if plan.TargetTableExists {
t.Fatalf("expected target table missing")
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `app`.`orders`") {
t.Fatalf("unexpected create table SQL: %s", plan.CreateTableSQL)
}
}
func TestBuildMySQLToPGLikeCreateTablePlan_GeneratesPostgresDDL(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
indexes: map[string][]connection.IndexDefinition{
"shop.orders": {
{Name: "idx_orders_user", ColumnName: "user_id", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE"},
{Name: "idx_orders_user", ColumnName: "status", NonUnique: 1, SeqInIndex: 2, IndexType: "BTREE"},
},
},
}
cols := []connection.ColumnDefinition{
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI", Extra: "auto_increment"},
{Name: "user_id", Type: "bigint", Nullable: "NO"},
{Name: "status", Type: "varchar(32)", Nullable: "YES"},
{Name: "payload", Type: "json", Nullable: "YES"},
}
cfg := SyncConfig{CreateIndexes: true}
createSQL, postSQL, warnings, unsupported, idxCreate, idxSkip, err := buildMySQLToPGLikeCreateTablePlan("postgres", cfg, "public.orders", cols, sourceDB, "shop", "orders")
if err != nil {
t.Fatalf("buildMySQLToPGLikeCreateTablePlan returned error: %v", err)
}
if !strings.Contains(createSQL, `CREATE TABLE "public"."orders"`) {
t.Fatalf("unexpected create SQL: %s", createSQL)
}
if !strings.Contains(createSQL, `GENERATED BY DEFAULT AS IDENTITY`) {
t.Fatalf("missing identity mapping: %s", createSQL)
}
if !strings.Contains(createSQL, `jsonb`) {
t.Fatalf("missing jsonb mapping: %s", createSQL)
}
if idxCreate != 1 || idxSkip != 0 {
t.Fatalf("unexpected index summary: create=%d skip=%d", idxCreate, idxSkip)
}
if len(postSQL) != 1 || !strings.Contains(postSQL[0], `CREATE INDEX "idx_orders_user"`) {
t.Fatalf("unexpected post SQL: %v", postSQL)
}
if len(warnings) != 0 || len(unsupported) != 0 {
t.Fatalf("unexpected warnings/unsupported: warnings=%v unsupported=%v", warnings, unsupported)
}
}
func TestBuildMySQLToClickHouseCreateTableSQL_GeneratesMergeTree(t *testing.T) {
t.Parallel()
cols := []connection.ColumnDefinition{
{Name: "id", Type: "bigint unsigned", Nullable: "NO", Key: "PRI"},
{Name: "name", Type: "varchar(128)", Nullable: "YES"},
{Name: "payload", Type: "json", Nullable: "YES"},
}
createSQL, warnings, unsupported := buildMySQLToClickHouseCreateTableSQL("analytics.orders", cols)
if !strings.Contains(createSQL, "ENGINE = MergeTree()") {
t.Fatalf("unexpected create SQL: %s", createSQL)
}
if !strings.Contains(createSQL, "ORDER BY (`id`)") {
t.Fatalf("unexpected order by: %s", createSQL)
}
if !strings.Contains(createSQL, "`payload` Nullable(String)") {
t.Fatalf("unexpected json mapping: %s", createSQL)
}
if len(warnings) == 0 {
t.Fatalf("expected warnings for clickhouse semantics")
}
if len(unsupported) != 0 {
t.Fatalf("unexpected unsupported: %v", unsupported)
}
}
func TestBuildClickHouseToMySQLCreateTableSQL_GeneratesMySQLDDL(t *testing.T) {
t.Parallel()
cols := []connection.ColumnDefinition{
{Name: "id", Type: "UInt64", Nullable: "NO", Key: "PRI"},
{Name: "event_time", Type: "DateTime", Nullable: "NO"},
{Name: "payload", Type: "Map(String, String)", Nullable: "YES"},
}
createSQL, warnings := buildClickHouseToMySQLCreateTableSQL("app.metrics", cols)
if !strings.Contains(createSQL, "CREATE TABLE `app`.`metrics`") {
t.Fatalf("unexpected create SQL: %s", createSQL)
}
if !strings.Contains(createSQL, "`id` bigint unsigned NOT NULL") {
t.Fatalf("unexpected uint64 mapping: %s", createSQL)
}
if !strings.Contains(createSQL, "`payload` json") {
t.Fatalf("unexpected complex type mapping: %s", createSQL)
}
if len(warnings) == 0 {
t.Fatalf("expected warning for limited clickhouse reverse semantics")
}
}
func TestBuildMySQLToMongoPlan_AutoCreateCollection(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"shop.users": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
},
},
indexes: map[string][]connection.IndexDefinition{
"shop.users": {
{Name: "idx_users_name", ColumnName: "name", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "shop"},
TargetConfig: connection.ConnectionConfig{Type: "mongodb", Database: "app"},
TargetTableStrategy: "smart",
CreateIndexes: true,
}
plan, sourceCols, targetCols, err := buildMySQLToMongoPlan(cfg, "users", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildMySQLToMongoPlan returned error: %v", err)
}
if len(sourceCols) != 2 || targetCols != nil {
t.Fatalf("unexpected source/target columns: %d / %v", len(sourceCols), targetCols)
}
if !plan.AutoCreate || len(plan.PreDataSQL) == 0 {
t.Fatalf("expected auto create collection command: %+v", plan)
}
if !strings.Contains(plan.PreDataSQL[0], `"create":"users"`) {
t.Fatalf("unexpected create collection command: %v", plan.PreDataSQL)
}
if len(plan.PostDataSQL) != 1 || !strings.Contains(plan.PostDataSQL[0], `"createIndexes":"users"`) {
t.Fatalf("unexpected index commands: %v", plan.PostDataSQL)
}
}
func TestBuildPGLikeToMongoPlan_AutoCreateCollection(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"public.orders": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
},
},
indexes: map[string][]connection.IndexDefinition{
"public.orders": {
{Name: "idx_orders_name", ColumnName: "name", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "postgres", Database: "public"},
TargetConfig: connection.ConnectionConfig{Type: "mongodb", Database: "app"},
TargetTableStrategy: "smart",
CreateIndexes: true,
}
plan, sourceCols, targetCols, err := buildPGLikeToMongoPlan(cfg, "orders", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildPGLikeToMongoPlan returned error: %v", err)
}
if len(sourceCols) != 2 || targetCols != nil {
t.Fatalf("unexpected source/target columns: %d / %v", len(sourceCols), targetCols)
}
if !plan.AutoCreate || len(plan.PreDataSQL) == 0 {
t.Fatalf("expected auto create collection command: %+v", plan)
}
if !strings.Contains(plan.PreDataSQL[0], `"create":"orders"`) {
t.Fatalf("unexpected create collection command: %v", plan.PreDataSQL)
}
if len(plan.PostDataSQL) != 1 || !strings.Contains(plan.PostDataSQL[0], `"createIndexes":"orders"`) {
t.Fatalf("unexpected index commands: %v", plan.PostDataSQL)
}
}
func TestBuildClickHouseToMongoPlan_AutoCreateCollection(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"analytics.metrics": {
{Name: "id", Type: "UInt64", Nullable: "NO", Key: "PRI"},
{Name: "host", Type: "String", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "clickhouse", Database: "analytics"},
TargetConfig: connection.ConnectionConfig{Type: "mongodb", Database: "app"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildClickHouseToMongoPlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildClickHouseToMongoPlan returned error: %v", err)
}
if len(sourceCols) != 2 || targetCols != nil {
t.Fatalf("unexpected source/target columns: %d / %v", len(sourceCols), targetCols)
}
if !plan.AutoCreate || len(plan.PreDataSQL) == 0 {
t.Fatalf("expected auto create collection command: %+v", plan)
}
if !strings.Contains(plan.PreDataSQL[0], `"create":"metrics"`) {
t.Fatalf("unexpected create collection command: %v", plan.PreDataSQL)
}
}
func TestBuildTDengineToMongoPlan_AutoCreateCollection(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"src.cpu": {
{Name: "ts", Type: "TIMESTAMP", Nullable: "NO"},
{Name: "host", Type: "NCHAR(64)", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "tdengine", Database: "src"},
TargetConfig: connection.ConnectionConfig{Type: "mongodb", Database: "app"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildTDengineToMongoPlan(cfg, "cpu", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildTDengineToMongoPlan returned error: %v", err)
}
if len(sourceCols) != 2 || targetCols != nil {
t.Fatalf("unexpected source/target columns: %d / %v", len(sourceCols), targetCols)
}
if !plan.AutoCreate || len(plan.PreDataSQL) == 0 {
t.Fatalf("expected auto create collection command: %+v", plan)
}
if !strings.Contains(plan.PreDataSQL[0], `"create":"cpu"`) {
t.Fatalf("unexpected create collection command: %v", plan.PreDataSQL)
}
}
func TestBuildMongoToMySQLPlan_InfersColumnsAndCreatesTable(t *testing.T) {
t.Parallel()
query := `{"find":"users","filter":{},"limit":200}`
sourceDB := &fakeMigrationDB{
queryData: map[string][]map[string]interface{}{
query: {
{"_id": "a1", "name": "alice", "age": int64(18), "profile": map[string]interface{}{"city": "shanghai"}},
{"_id": "b2", "name": "bob", "profile": map[string]interface{}{"city": "beijing"}},
},
},
queryCols: map[string][]string{query: {"_id", "name", "age", "profile"}},
indexes: map[string][]connection.IndexDefinition{
"crm.users": {{Name: "email_1", ColumnName: "name", NonUnique: 1, SeqInIndex: 1, IndexType: "BTREE"}},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "mongodb", Database: "crm"},
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
TargetTableStrategy: "smart",
CreateIndexes: true,
}
plan, sourceCols, _, err := buildMongoToMySQLPlan(cfg, "users", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildMongoToMySQLPlan returned error: %v", err)
}
if len(sourceCols) == 0 {
t.Fatalf("expected inferred source cols")
}
if !plan.AutoCreate || !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `app`.`users`") {
t.Fatalf("unexpected create table sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`_id` text NOT NULL") && !strings.Contains(plan.CreateTableSQL, "`_id` varchar") {
t.Fatalf("missing inferred _id column: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`profile` json") {
t.Fatalf("expected nested field degrade to json: %s", plan.CreateTableSQL)
}
if len(plan.PostDataSQL) != 1 {
t.Fatalf("expected one post index sql, got=%v", plan.PostDataSQL)
}
}
func TestBuildTDengineToMySQLPlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"metrics.cpu": {
{Name: "ts", Type: "TIMESTAMP", Nullable: "NO"},
{Name: "host", Type: "NCHAR(64)", Nullable: "YES", Key: "TAG", Extra: "TAG"},
{Name: "usage", Type: "DOUBLE", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "tdengine", Database: "metrics"},
TargetConfig: connection.ConnectionConfig{Type: "mysql", Database: "app"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildTDengineToMySQLPlan(cfg, "cpu", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildTDengineToMySQLPlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `app`.`cpu`") {
t.Fatalf("unexpected create table sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`ts` datetime") {
t.Fatalf("expected timestamp mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`host` varchar(64)") {
t.Fatalf("expected nchar mapping, got: %s", plan.CreateTableSQL)
}
if len(plan.Warnings) == 0 || !strings.Contains(strings.Join(plan.Warnings, " "), "TAG") {
t.Fatalf("expected TAG warning, got: %v", plan.Warnings)
}
}
func TestBuildTDengineToPGLikePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"metrics.cpu": {
{Name: "ts", Type: "TIMESTAMP", Nullable: "NO"},
{Name: "payload", Type: "JSON", Nullable: "YES"},
{Name: "host", Type: "BINARY(32)", Nullable: "YES", Key: "TAG", Extra: "TAG"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "tdengine", Database: "metrics"},
TargetConfig: connection.ConnectionConfig{Type: "kingbase", Database: "ignored"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildTDengineToPGLikePlan(cfg, "cpu", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildTDengineToPGLikePlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, `CREATE TABLE "public"."cpu"`) {
t.Fatalf("unexpected create table sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `"ts" timestamp`) {
t.Fatalf("expected timestamp mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `"payload" jsonb`) {
t.Fatalf("expected json mapping, got: %s", plan.CreateTableSQL)
}
if len(plan.Warnings) == 0 || !strings.Contains(strings.Join(plan.Warnings, " "), "TAG") {
t.Fatalf("expected TAG warning, got: %v", plan.Warnings)
}
}
func TestBuildSchemaMigrationPlan_TDengineTargetWarnsInsertOnlyBoundary(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"shop.metrics": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
{Name: "ts", Type: "datetime", Nullable: "NO"},
{Name: "value", Type: "double", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"taos.metrics": {
{Name: "id", Type: "bigint", Nullable: "NO"},
{Name: "ts", Type: "timestamp", Nullable: "NO"},
{Name: "value", Type: "double", Nullable: "YES"},
},
},
}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "shop"},
TargetConfig: connection.ConnectionConfig{Type: "tdengine", Database: "taos"},
Mode: "insert_update",
}
plan, _, _, err := buildSchemaMigrationPlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildSchemaMigrationPlan returned error: %v", err)
}
warnings := strings.Join(plan.Warnings, " ")
if !strings.Contains(warnings, "仅支持 INSERT 写入") {
t.Fatalf("expected TDengine target warning, got: %v", plan.Warnings)
}
}
func TestBuildMySQLLikeToTDenginePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"shop.metrics": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI", Extra: "auto_increment"},
{Name: "ts", Type: "datetime", Nullable: "NO"},
{Name: "payload", Type: "json", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "shop"},
TargetConfig: connection.ConnectionConfig{Type: "tdengine", Database: "taos"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildMySQLLikeToTDenginePlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildMySQLLikeToTDenginePlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `taos`.`metrics`") {
t.Fatalf("unexpected create sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`ts` TIMESTAMP") {
t.Fatalf("expected ts first column mapped to TIMESTAMP, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`payload` VARCHAR(") {
t.Fatalf("expected json degrade to VARCHAR, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(strings.Join(plan.Warnings, " "), "insert-only") && !strings.Contains(strings.Join(plan.Warnings, " "), "INSERT") {
t.Fatalf("expected tdengine target warning, got: %v", plan.Warnings)
}
}
func TestBuildPGLikeToTDenginePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"public.metrics": {
{Name: "event_time", Type: "timestamp without time zone", Nullable: "NO"},
{Name: "name", Type: "character varying(64)", Nullable: "YES"},
{Name: "meta", Type: "jsonb", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "postgres", Database: "ignored"},
TargetConfig: connection.ConnectionConfig{Type: "tdengine", Database: "taos"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildPGLikeToTDenginePlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildPGLikeToTDenginePlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `taos`.`metrics`") {
t.Fatalf("unexpected create sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`event_time` TIMESTAMP") {
t.Fatalf("expected timestamp mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`meta` VARCHAR(") {
t.Fatalf("expected jsonb degrade to VARCHAR, got: %s", plan.CreateTableSQL)
}
}
func TestBuildMySQLLikeToTDenginePlan_RejectsAutoCreateWithoutTimestampColumn(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"shop.metrics": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
{Name: "name", Type: "varchar(64)", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "shop"},
TargetConfig: connection.ConnectionConfig{Type: "tdengine", Database: "taos"},
TargetTableStrategy: "smart",
}
plan, _, _, err := buildMySQLLikeToTDenginePlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildMySQLLikeToTDenginePlan returned error: %v", err)
}
if plan.AutoCreate {
t.Fatalf("expected auto create disabled when source has no timestamp column")
}
if !strings.Contains(plan.PlannedAction, "时间列") {
t.Fatalf("unexpected planned action: %s", plan.PlannedAction)
}
if !strings.Contains(strings.Join(plan.Warnings, " "), "时间列") {
t.Fatalf("expected missing timestamp warning, got: %v", plan.Warnings)
}
}
func TestBuildClickHouseToTDenginePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"analytics.metrics": {
{Name: "event_time", Type: "DateTime64(3)", Nullable: "NO"},
{Name: "host", Type: "FixedString(64)", Nullable: "YES"},
{Name: "payload", Type: "Map(String,String)", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "clickhouse", Database: "analytics"},
TargetConfig: connection.ConnectionConfig{Type: "tdengine", Database: "taos"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildClickHouseToTDenginePlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildClickHouseToTDenginePlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `taos`.`metrics`") {
t.Fatalf("unexpected create sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`event_time` TIMESTAMP") {
t.Fatalf("expected datetime64 mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`host` VARCHAR(64)") {
t.Fatalf("expected fixedstring mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`payload` VARCHAR(") {
t.Fatalf("expected complex type degrade to VARCHAR, got: %s", plan.CreateTableSQL)
}
}
func TestBuildClickHouseToPGLikePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"analytics.metrics": {
{Name: "id", Type: "UInt64", Nullable: "NO", Key: "PRI"},
{Name: "event_time", Type: "DateTime64(3)", Nullable: "NO"},
{Name: "host", Type: "FixedString(64)", Nullable: "YES"},
{Name: "payload", Type: "Map(String,String)", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "clickhouse", Database: "analytics"},
TargetConfig: connection.ConnectionConfig{Type: "postgres", Database: "public"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildClickHouseToPGLikePlan(cfg, "metrics", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildClickHouseToPGLikePlan returned error: %v", err)
}
if len(sourceCols) != 4 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, `CREATE TABLE "public"."metrics"`) {
t.Fatalf("unexpected create sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `"id" numeric(20,0)`) {
t.Fatalf("expected uint64 safeguard mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `"event_time" timestamp`) {
t.Fatalf("expected datetime64 mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `"host" varchar(64)`) {
t.Fatalf("expected fixedstring mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `"payload" jsonb`) {
t.Fatalf("expected complex type degrade to jsonb, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, `PRIMARY KEY ("id")`) {
t.Fatalf("expected primary key preservation, got: %s", plan.CreateTableSQL)
}
}
func TestBuildPGLikeToClickHousePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"public.orders": {
{Name: "id", Type: "bigint", Nullable: "NO", Key: "PRI"},
{Name: "created_at", Type: "timestamp without time zone", Nullable: "NO"},
{Name: "profile", Type: "jsonb", Nullable: "YES"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "postgres", Database: "public"},
TargetConfig: connection.ConnectionConfig{Type: "clickhouse", Database: "analytics"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildPGLikeToClickHousePlan(cfg, "orders", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildPGLikeToClickHousePlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `analytics`.`orders`") {
t.Fatalf("unexpected create sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`created_at` DateTime") {
t.Fatalf("expected timestamp mapping, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`profile` Nullable(String)") {
t.Fatalf("expected jsonb degrade to Nullable(String), got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "ORDER BY (`id`)") {
t.Fatalf("expected primary key order by, got: %s", plan.CreateTableSQL)
}
}
func TestBuildTDengineToTDenginePlan_AutoCreateWhenTargetMissing(t *testing.T) {
t.Parallel()
sourceDB := &fakeMigrationDB{
columns: map[string][]connection.ColumnDefinition{
"src.cpu": {
{Name: "ts", Type: "TIMESTAMP", Nullable: "NO"},
{Name: "host", Type: "NCHAR(64)", Nullable: "YES"},
{Name: "region", Type: "NCHAR(32)", Nullable: "YES", Key: "TAG"},
},
},
}
targetDB := &fakeMigrationDB{}
cfg := SyncConfig{
SourceConfig: connection.ConnectionConfig{Type: "tdengine", Database: "src"},
TargetConfig: connection.ConnectionConfig{Type: "tdengine", Database: "dst"},
TargetTableStrategy: "smart",
}
plan, sourceCols, targetCols, err := buildTDengineToTDenginePlan(cfg, "cpu", sourceDB, targetDB)
if err != nil {
t.Fatalf("buildTDengineToTDenginePlan returned error: %v", err)
}
if len(sourceCols) != 3 || len(targetCols) != 0 {
t.Fatalf("unexpected columns lengths: source=%d target=%d", len(sourceCols), len(targetCols))
}
if !plan.AutoCreate {
t.Fatalf("expected auto create enabled")
}
if !strings.Contains(plan.CreateTableSQL, "CREATE TABLE `dst`.`cpu`") {
t.Fatalf("unexpected create sql: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`ts` TIMESTAMP") {
t.Fatalf("expected timestamp preserved, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(plan.CreateTableSQL, "`region` NCHAR(32)") {
t.Fatalf("expected tag degrade to regular nchar column, got: %s", plan.CreateTableSQL)
}
if !strings.Contains(strings.Join(plan.Warnings, " "), "TAG") {
t.Fatalf("expected TAG degrade warning, got: %v", plan.Warnings)
}
}