🐛 fix(iris): 修复 InterSystems IRIS 连接后表元数据为空

- 兼容 IRIS INFORMATION_SCHEMA 返回的紧凑列名格式
- 修复表、列、索引元数据读取时字段取值为空的问题
- 保持系统 schema 过滤逻辑,避免误展示内置对象
- 补充 IRIS metadata 回归测试覆盖表列表与列索引解析
- Refs #505
This commit is contained in:
Syngnat
2026-05-31 14:18:40 +08:00
parent b8053ff368
commit 6f132db328
2 changed files with 103 additions and 26 deletions

View File

@@ -273,7 +273,7 @@ func (i *IrisDB) GetDatabases() ([]string, error) {
var namespaces []string
seen := map[string]struct{}{}
for _, row := range data {
name := strings.TrimSpace(rowString(row, "TABLE_CATALOG", "table_catalog"))
name := strings.TrimSpace(rowString(row, "TABLE_CATALOG", "table_catalog", "TABLECATALOG", "tablecatalog"))
if name == "" {
continue
}
@@ -295,12 +295,12 @@ func (i *IrisDB) GetTables(dbName string) ([]string, error) {
var tables []string
seen := map[string]struct{}{}
for _, row := range data {
tableType := strings.ToUpper(strings.TrimSpace(rowString(row, "TABLE_TYPE", "table_type")))
tableType := strings.ToUpper(strings.TrimSpace(rowString(row, "TABLE_TYPE", "table_type", "TABLETYPE", "tabletype")))
if tableType != "" && tableType != "TABLE" && tableType != "BASE TABLE" {
continue
}
schema := strings.TrimSpace(rowString(row, "TABLE_SCHEMA", "table_schema", "SCHEMA_NAME", "schema_name"))
table := strings.TrimSpace(rowString(row, "TABLE_NAME", "table_name"))
schema := strings.TrimSpace(rowString(row, "TABLE_SCHEMA", "table_schema", "SCHEMA_NAME", "schema_name", "TABLESCHEMA", "tableschema", "SCHEMANAME", "schemaname"))
table := strings.TrimSpace(rowString(row, "TABLE_NAME", "table_name", "TABLENAME", "tablename"))
if table == "" || isIRISSystemSchema(schema) {
continue
}
@@ -351,27 +351,27 @@ func (i *IrisDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefini
columns := make([]connection.ColumnDefinition, 0, len(data))
for _, row := range data {
name := strings.TrimSpace(rowString(row, "COLUMN_NAME", "column_name"))
name := strings.TrimSpace(rowString(row, "COLUMN_NAME", "column_name", "COLUMNNAME", "columnname"))
if name == "" {
continue
}
key := keyByColumn[name]
if primary, ok := irisBoolFromRow(row, "PRIMARY_KEY", "primary_key"); ok && primary {
if primary, ok := irisBoolFromRow(row, "PRIMARY_KEY", "primary_key", "PRIMARYKEY", "primarykey"); ok && primary {
key = "PRI"
} else if key == "" {
if unique, ok := irisBoolFromRow(row, "UNIQUE_COLUMN", "unique_column", "IS_UNIQUE", "is_unique", "UNIQUE", "unique"); ok && unique {
if unique, ok := irisBoolFromRow(row, "UNIQUE_COLUMN", "unique_column", "UNIQUECOLUMN", "uniquecolumn", "IS_UNIQUE", "is_unique", "ISUNIQUE", "isunique", "UNIQUE", "unique"); ok && unique {
key = "UNI"
}
}
col := connection.ColumnDefinition{
Name: name,
Type: buildIRISColumnType(row),
Nullable: normalizeIRISNullable(rowString(row, "IS_NULLABLE", "is_nullable")),
Nullable: normalizeIRISNullable(rowString(row, "IS_NULLABLE", "is_nullable", "ISNULLABLE", "isnullable")),
Key: key,
Extra: "",
Comment: rowString(row, "DESCRIPTION", "description", "COMMENT", "comment"),
}
if rawDefault, ok := rowValue(row, "COLUMN_DEFAULT", "column_default"); ok && rawDefault != nil {
if rawDefault, ok := rowValue(row, "COLUMN_DEFAULT", "column_default", "COLUMNDEFAULT", "columndefault"); ok && rawDefault != nil {
def := strings.TrimSpace(fmt.Sprintf("%v", rawDefault))
if def != "" {
col.Default = &def
@@ -392,9 +392,9 @@ func (i *IrisDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWith
}
cols := make([]connection.ColumnDefinitionWithTable, 0, len(data))
for _, row := range data {
schema := strings.TrimSpace(rowString(row, "TABLE_SCHEMA", "table_schema"))
table := strings.TrimSpace(rowString(row, "TABLE_NAME", "table_name"))
name := strings.TrimSpace(rowString(row, "COLUMN_NAME", "column_name"))
schema := strings.TrimSpace(rowString(row, "TABLE_SCHEMA", "table_schema", "TABLESCHEMA", "tableschema"))
table := strings.TrimSpace(rowString(row, "TABLE_NAME", "table_name", "TABLENAME", "tablename"))
name := strings.TrimSpace(rowString(row, "COLUMN_NAME", "column_name", "COLUMNNAME", "columnname"))
if table == "" || name == "" || isIRISSystemSchema(schema) {
continue
}
@@ -429,16 +429,16 @@ func (i *IrisDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinit
}
indexes := make([]connection.IndexDefinition, 0, len(data))
for _, row := range data {
name := strings.TrimSpace(rowString(row, "INDEX_NAME", "index_name", "KEY_NAME", "key_name", "CONSTRAINT_NAME", "constraint_name"))
column := strings.TrimSpace(rowString(row, "COLUMN_NAME", "column_name"))
primary, hasPrimaryFlag := irisBoolFromRow(row, "PRIMARY_KEY", "primary_key")
name := strings.TrimSpace(rowString(row, "INDEX_NAME", "index_name", "INDEXNAME", "indexname", "KEY_NAME", "key_name", "KEYNAME", "keyname", "CONSTRAINT_NAME", "constraint_name", "CONSTRAINTNAME", "constraintname"))
column := strings.TrimSpace(rowString(row, "COLUMN_NAME", "column_name", "COLUMNNAME", "columnname"))
primary, hasPrimaryFlag := irisBoolFromRow(row, "PRIMARY_KEY", "primary_key", "PRIMARYKEY", "primarykey")
if name == "" && hasPrimaryFlag && primary {
name = "PRIMARY"
}
if name == "" || column == "" {
continue
}
indexType := normalizeIRISIndexType(rowString(row, "INDEX_TYPE", "index_type", "TYPE", "type"))
indexType := normalizeIRISIndexType(rowString(row, "INDEX_TYPE", "index_type", "INDEXTYPE", "indextype", "TYPE", "type"))
if hasPrimaryFlag && primary {
indexType = "PRIMARY"
}
@@ -447,7 +447,7 @@ func (i *IrisDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinit
Name: name,
ColumnName: column,
NonUnique: nonUnique,
SeqInIndex: parseIRISInt(rowValueAny(row, "ORDINAL_POSITION", "ordinal_position", "SEQ_IN_INDEX", "seq_in_index", "KEY_SEQ", "key_seq")),
SeqInIndex: parseIRISInt(rowValueAny(row, "ORDINAL_POSITION", "ordinal_position", "ORDINALPOSITION", "ordinalposition", "SEQ_IN_INDEX", "seq_in_index", "SEQININDEX", "seqinindex", "KEY_SEQ", "key_seq", "KEYSEQ", "keyseq")),
IndexType: indexType,
})
}
@@ -686,10 +686,10 @@ func irisBoolFromRow(row map[string]interface{}, keys ...string) (bool, bool) {
}
func parseIRISNonUnique(row map[string]interface{}) int {
if primary, ok := irisBoolFromRow(row, "PRIMARY_KEY", "primary_key"); ok && primary {
if primary, ok := irisBoolFromRow(row, "PRIMARY_KEY", "primary_key", "PRIMARYKEY", "primarykey"); ok && primary {
return 0
}
if value, ok := rowValue(row, "NON_UNIQUE", "non_unique"); ok {
if value, ok := rowValue(row, "NON_UNIQUE", "non_unique", "NONUNIQUE", "nonunique"); ok {
if enabled, ok := parseIRISBool(value); ok {
if enabled {
return 1
@@ -707,7 +707,7 @@ func parseIRISNonUnique(row map[string]interface{}) int {
return 0
}
}
if unique, ok := irisBoolFromRow(row, "UNIQUE_COLUMN", "unique_column"); ok && unique {
if unique, ok := irisBoolFromRow(row, "UNIQUE_COLUMN", "unique_column", "UNIQUECOLUMN", "uniquecolumn"); ok && unique {
return 0
}
return 1
@@ -731,14 +731,14 @@ func normalizeIRISNullable(raw string) string {
}
func buildIRISColumnType(row map[string]interface{}) string {
dataType := strings.TrimSpace(rowString(row, "DATA_TYPE", "data_type", "TYPE_NAME", "type_name"))
dataType := strings.TrimSpace(rowString(row, "DATA_TYPE", "data_type", "DATATYPE", "datatype", "TYPE_NAME", "type_name", "TYPENAME", "typename"))
if dataType == "" {
dataType = "VARCHAR"
}
upper := strings.ToUpper(dataType)
charLength := parseIRISInt(rowValueAny(row, "CHARACTER_MAXIMUM_LENGTH", "character_maximum_length", "CHARACTER_MAX_LENGTH", "character_max_length"))
precision := parseIRISInt(rowValueAny(row, "NUMERIC_PRECISION", "numeric_precision"))
scale := parseIRISInt(rowValueAny(row, "NUMERIC_SCALE", "numeric_scale"))
charLength := parseIRISInt(rowValueAny(row, "CHARACTER_MAXIMUM_LENGTH", "character_maximum_length", "CHARACTERMAXIMUMLENGTH", "charactermaximumlength", "CHARACTER_MAX_LENGTH", "character_max_length", "CHARACTERMAXLENGTH", "charactermaxlength"))
precision := parseIRISInt(rowValueAny(row, "NUMERIC_PRECISION", "numeric_precision", "NUMERICPRECISION", "numericprecision"))
scale := parseIRISInt(rowValueAny(row, "NUMERIC_SCALE", "numeric_scale", "NUMERICSCALE", "numericscale"))
if charLength > 0 && (strings.Contains(upper, "CHAR") || strings.Contains(upper, "VARCHAR")) && !strings.Contains(dataType, "(") {
return fmt.Sprintf("%s(%d)", dataType, charLength)
}
@@ -753,8 +753,8 @@ func buildIRISColumnType(row map[string]interface{}) string {
func rowOrdinal(rows []map[string]interface{}, columnName string) int {
for idx, row := range rows {
if strings.EqualFold(rowString(row, "COLUMN_NAME", "column_name"), columnName) {
ordinal := parseIRISInt(rowValueAny(row, "ORDINAL_POSITION", "ordinal_position"))
if strings.EqualFold(rowString(row, "COLUMN_NAME", "column_name", "COLUMNNAME", "columnname"), columnName) {
ordinal := parseIRISInt(rowValueAny(row, "ORDINAL_POSITION", "ordinal_position", "ORDINALPOSITION", "ordinalposition"))
if ordinal > 0 {
return ordinal
}

View File

@@ -200,6 +200,83 @@ func TestIrisMetadataMapsColumnsAndIndexes(t *testing.T) {
}
}
func TestIrisGetTablesReadsCompactInfoSchemaColumnNames(t *testing.T) {
t.Parallel()
dbConn, state := openOracleRecordingDB(t)
state.mu.Lock()
state.queryResults[`SELECT * FROM INFORMATION_SCHEMA.TABLES`] = oracleRecordingQueryResult{
columns: []string{"TABLECATALOG", "TABLESCHEMA", "TABLENAME", "TABLETYPE"},
rows: [][]driver.Value{
{"USER", "Sample", "Person", "TABLE"},
{"USER", "UserApp", "AuditLog", "BASE TABLE"},
{"USER", "Sample", "PersonView", "VIEW"},
{"USER", "%Library", "ClassDefinition", "TABLE"},
},
}
state.mu.Unlock()
iris := &IrisDB{conn: dbConn, namespace: "USER"}
tables, err := iris.GetTables("USER")
if err != nil {
t.Fatalf("GetTables 返回错误: %v", err)
}
want := []string{"Sample.Person", "UserApp.AuditLog"}
if !reflect.DeepEqual(tables, want) {
t.Fatalf("期望读取 IRIS 紧凑列名并过滤系统对象want=%v got=%v", want, tables)
}
}
func TestIrisMetadataMapsCompactColumnsAndIndexes(t *testing.T) {
t.Parallel()
dbConn, state := openOracleRecordingDB(t)
iris := &IrisDB{conn: dbConn}
columnsQuery := buildIRISInfoSchemaWhereQuery("INFORMATION_SCHEMA.COLUMNS", irisTableRef{Schema: "Sample", Table: "Person"})
indexesQuery := buildIRISInfoSchemaWhereQuery("INFORMATION_SCHEMA.INDEXES", irisTableRef{Schema: "Sample", Table: "Person"})
state.mu.Lock()
state.queryResults[columnsQuery] = oracleRecordingQueryResult{
columns: []string{"TABLESCHEMA", "TABLENAME", "COLUMNNAME", "DATATYPE", "CHARACTERMAXIMUMLENGTH", "ISNULLABLE", "COLUMNDEFAULT", "ORDINALPOSITION", "DESCRIPTION", "PRIMARYKEY", "UNIQUECOLUMN"},
rows: [][]driver.Value{
{"Sample", "Person", "id", "INTEGER", nil, "NO", nil, int64(1), "identifier", true, false},
{"Sample", "Person", "name", "VARCHAR", int64(80), "YES", "'anonymous'", int64(2), "display name", false, true},
},
}
state.queryResults[indexesQuery] = oracleRecordingQueryResult{
columns: []string{"INDEXNAME", "COLUMNNAME", "NONUNIQUE", "ORDINALPOSITION", "INDEXTYPE", "PRIMARYKEY"},
rows: [][]driver.Value{
{"app_person_pk", "id", int64(0), int64(1), "bitmap", true},
{"idx_person_name", "name", int64(0), int64(1), "", false},
},
}
state.mu.Unlock()
columns, err := iris.GetColumns("Sample", "Person")
if err != nil {
t.Fatalf("GetColumns 返回错误: %v", err)
}
if len(columns) != 2 {
t.Fatalf("columns len = %d", len(columns))
}
if columns[0].Name != "id" || columns[0].Key != "PRI" || columns[0].Nullable != "NO" {
t.Fatalf("unexpected compact id column: %#v", columns[0])
}
if columns[1].Type != "VARCHAR(80)" || columns[1].Key != "UNI" {
t.Fatalf("unexpected compact name column: %#v", columns[1])
}
indexes, err := iris.GetIndexes("Sample", "Person")
if err != nil {
t.Fatalf("GetIndexes 返回错误: %v", err)
}
if len(indexes) != 2 || indexes[0].Name != "app_person_pk" || indexes[0].IndexType != "PRIMARY" || indexes[0].NonUnique != 0 {
t.Fatalf("unexpected compact indexes: %#v", indexes)
}
}
func TestBuildIRISApplyChangesSQL(t *testing.T) {
deleteSQL, deleteArgs, ok := buildIRISDeleteSQL("Sample.Person", map[string]interface{}{"id": 1})
if !ok {