feat(kafka): 新增 Kafka 数据源连接支持

Refs #387
This commit is contained in:
Syngnat
2026-06-13 21:11:08 +08:00
parent d2f68acae8
commit 0ff17dc27c
37 changed files with 1989 additions and 12 deletions

View File

@@ -16,6 +16,8 @@ func normalizeRunConfig(config connection.ConnectionConfig, dbName string) conne
}
switch strings.ToLower(strings.TrimSpace(config.Type)) {
case "kafka", "apache-kafka", "apache_kafka":
// Kafka 的 Database 字段表示默认 Topic不能被树上的 synthetic database(topics) 覆盖。
case "oceanbase":
if !isOceanBaseOracleProtocol(config) {
runConfig.Database = name
@@ -51,7 +53,7 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string,
// Elasticsearch索引名可能含多个点如 iot_pro_biz_operate_log.index.20240626
// 不能按点分割,直接返回原始数据库名和完整表名。
if dbType == "elasticsearch" || dbType == "iotdb" {
if dbType == "elasticsearch" || dbType == "iotdb" || dbType == "kafka" {
return rawDB, rawTable
}
@@ -110,6 +112,8 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string,
func normalizeMetadataSchemaAndTable(config connection.ConnectionConfig, dbName string, tableName string) (string, string) {
schema, table := normalizeSchemaAndTable(config, dbName, tableName)
switch resolveDDLDBType(config) {
case "kafka":
return schema, table
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb":
rawTable := strings.TrimSpace(tableName)
if rawTable == "" {

View File

@@ -242,6 +242,19 @@ func TestNormalizeRunConfig_RedisAllowsDatabaseIndexAboveDefault(t *testing.T) {
}
}
func TestNormalizeRunConfig_KafkaKeepsDefaultTopic(t *testing.T) {
t.Parallel()
runConfig := normalizeRunConfig(connection.ConnectionConfig{
Type: "kafka",
Database: "orders.events",
}, "topics")
if runConfig.Database != "orders.events" {
t.Fatalf("expected Kafka default topic to stay orders.events, got %q", runConfig.Database)
}
}
func TestNormalizeSchemaAndTable_IRISDoesNotTreatNamespaceAsSchema(t *testing.T) {
t.Parallel()
@@ -294,6 +307,30 @@ func TestNormalizeSchemaAndTable_DuckDBPreservesQuotedQualifiedName(t *testing.T
}
}
func TestNormalizeSchemaAndTable_KafkaPreservesDottedTopicName(t *testing.T) {
t.Parallel()
schemaOrDb, table := normalizeSchemaAndTable(connection.ConnectionConfig{
Type: "kafka",
}, "topics", "orders.events.v1")
if schemaOrDb != "topics" || table != "orders.events.v1" {
t.Fatalf("expected kafka topic to stay intact, got %q.%q", schemaOrDb, table)
}
}
func TestNormalizeMetadataSchemaAndTable_KafkaPreservesDottedTopicName(t *testing.T) {
t.Parallel()
schemaOrDb, table := normalizeMetadataSchemaAndTable(connection.ConnectionConfig{
Type: "kafka",
}, "topics", "logs.app-1")
if schemaOrDb != "topics" || table != "logs.app-1" {
t.Fatalf("expected kafka metadata topic to stay intact, got %q.%q", schemaOrDb, table)
}
}
func TestQuoteTableIdentByType_KingbaseNormalizesQuotedQualifiedTable(t *testing.T) {
t.Parallel()

View File

@@ -239,6 +239,8 @@ func defaultPortByType(driverType string) int {
return 8000
case "qdrant":
return 6333
case "kafka":
return 9092
default:
return 0
}

View File

@@ -309,7 +309,7 @@ func normalizeSchemaAndTableByType(dbType string, dbName string, tableName strin
}
// Elasticsearch索引名可能含多个点不能按点分割
if dbType == "elasticsearch" {
if dbType == "elasticsearch" || dbType == "kafka" {
return rawDB, rawTable
}

View File

@@ -143,6 +143,15 @@ func TestNormalizeSchemaAndTableByType_PGLikeQuotedQualifiedName(t *testing.T) {
}
}
func TestNormalizeSchemaAndTableByType_KafkaPreservesDottedTopicName(t *testing.T) {
t.Parallel()
schema, table := normalizeSchemaAndTableByType("kafka", "topics", "orders.events.v1")
if schema != "topics" || table != "orders.events.v1" {
t.Fatalf("expected kafka topic to stay intact, got %q.%q", schema, table)
}
}
func TestBuildRunConfigForDDL_CustomHighGoUsesDatabase(t *testing.T) {
t.Parallel()

View File

@@ -1427,6 +1427,8 @@ func normalizeDriverType(driverType string) string {
return "opengauss"
case "gaussdb", "gauss_db", "gauss-db":
return "gaussdb"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
case "intersystems", "intersystemsiris", "inter-systems-iris", "inter-systems":
return "iris"
default:
@@ -1495,6 +1497,7 @@ func allDriverDefinitionsWithPackages(packages map[string]pinnedDriverPackage) [
{Type: "oracle", Name: "Oracle", Engine: driverEngineGo, BuiltIn: true},
{Type: "redis", Name: "Redis", Engine: driverEngineGo, BuiltIn: true},
{Type: "postgres", Name: "PostgreSQL", Engine: driverEngineGo, BuiltIn: true},
{Type: "kafka", Name: "Kafka", Engine: driverEngineGo, BuiltIn: true},
// 其他数据源需要先在驱动管理中“安装启用”。
buildOptionalGoDriverDefinition("mariadb", "MariaDB", packages),

View File

@@ -502,6 +502,22 @@ func TestIoTDBDriverDefinitionUsesOptionalAgent(t *testing.T) {
}
}
func TestKafkaDriverDefinitionIsBuiltIn(t *testing.T) {
definition, ok := resolveDriverDefinition("apache-kafka")
if !ok {
t.Fatal("expected kafka driver definition")
}
if definition.Name != "Kafka" {
t.Fatalf("unexpected kafka driver name: %q", definition.Name)
}
if !definition.BuiltIn {
t.Fatal("expected kafka to be a built-in driver")
}
if definition.PinnedVersion != "" || definition.DefaultDownloadURL != "" {
t.Fatalf("expected kafka builtin definition to omit optional-agent metadata: %#v", definition)
}
}
func TestGaussDBDriverDefinitionUsesOptionalAgent(t *testing.T) {
definition, ok := resolveDriverDefinition("gaussdb")
if !ok {

View File

@@ -346,7 +346,7 @@ func isReadOnlySQLQuery(dbType string, query string) bool {
return false
}
switch keyword {
case "select", "with", "show", "describe", "desc", "explain", "pragma", "values":
case "select", "with", "show", "describe", "desc", "explain", "pragma", "values", "consume":
return true
default:
return false

View File

@@ -86,6 +86,12 @@ func TestIsReadOnlySQLQuery_ClassifiesWithByTopLevelOperation(t *testing.T) {
}
}
func TestIsReadOnlySQLQuery_TreatsKafkaConsumeAsReadOnly(t *testing.T) {
if !isReadOnlySQLQuery("kafka", `CONSUME GROUP "analytics" FROM "orders.events" LIMIT 20`) {
t.Fatal("Kafka CONSUME should be treated as read-only")
}
}
func TestIsBatchableWriteSQLStatement_OnlyMatchesRealWriteStatements(t *testing.T) {
if !isBatchableWriteSQLStatement("mysql", "INSERT INTO demo(id) VALUES (1)") {
t.Fatal("expected INSERT to be treated as batchable write")

View File

@@ -486,6 +486,9 @@ var databaseFactories = map[string]databaseFactory{
"qdrant": func() Database {
return &QdrantDB{}
},
"kafka": func() Database {
return &KafkaDB{}
},
}
func init() {
@@ -524,6 +527,8 @@ func normalizeDatabaseType(dbType string) string {
return "chroma"
case "qdrantdb", "qdrant-db":
return "qdrant"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
default:
return normalized
}

View File

@@ -18,6 +18,7 @@ var coreBuiltinDrivers = map[string]struct{}{
"postgres": {},
"chroma": {},
"qdrant": {},
"kafka": {},
}
// optionalGoDrivers 表示需要用户“安装启用”后才能使用的纯 Go 驱动。
@@ -78,6 +79,8 @@ func normalizeRuntimeDriverType(driverType string) string {
return "qdrant"
case "apache-iotdb", "apache_iotdb", "iotdb":
return "iotdb"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
default:
return normalized
}
@@ -137,6 +140,8 @@ func driverDisplayName(driverType string) string {
return "Chroma"
case "qdrant":
return "Qdrant"
case "kafka":
return "Kafka"
default:
return strings.ToUpper(strings.TrimSpace(driverType))
}

View File

@@ -29,6 +29,11 @@ func TestBuiltinLikeDriversRemainAvailable(t *testing.T) {
if !supported {
t.Fatalf("redis 应始终可用reason=%s", reason)
}
supported, reason = DriverRuntimeSupportStatus("kafka")
if !supported {
t.Fatalf("kafka 应始终可用reason=%s", reason)
}
}
func TestOptionalDriverAgentRevisionsGeneratedForOptionalDrivers(t *testing.T) {

1341
internal/db/kafka_impl.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,215 @@
package db
import (
"context"
"reflect"
"strings"
"testing"
"GoNavi-Wails/internal/connection"
kafka "github.com/segmentio/kafka-go"
)
type fakeKafkaRuntime struct {
listTopicsResult []kafkaTopicInfo
describeResult kafkaTopicDescription
fetchResult []kafkaMessageRecord
publishAffected int64
lastDescribeTopic string
lastFetchRequest kafkaFetchRequest
lastPublishCommand kafkaPublishCommand
}
func (f *fakeKafkaRuntime) Close() error { return nil }
func (f *fakeKafkaRuntime) Ping(ctx context.Context) error { return nil }
func (f *fakeKafkaRuntime) ListTopics(ctx context.Context, includeInternal bool) ([]kafkaTopicInfo, error) {
return append([]kafkaTopicInfo(nil), f.listTopicsResult...), nil
}
func (f *fakeKafkaRuntime) DescribeTopic(ctx context.Context, topic string) (kafkaTopicDescription, error) {
f.lastDescribeTopic = topic
return f.describeResult, nil
}
func (f *fakeKafkaRuntime) FetchMessages(ctx context.Context, request kafkaFetchRequest) ([]kafkaMessageRecord, error) {
f.lastFetchRequest = request
return append([]kafkaMessageRecord(nil), f.fetchResult...), nil
}
func (f *fakeKafkaRuntime) Publish(ctx context.Context, command kafkaPublishCommand) (int64, error) {
f.lastPublishCommand = command
return f.publishAffected, nil
}
func TestNormalizeKafkaConfigParsesURIAndParams(t *testing.T) {
config := normalizeKafkaConfig(connection.ConnectionConfig{
URI: "kafka://alice:secret@127.0.0.1:9092,127.0.0.2:9093/orders.events?topology=cluster&tls=true&skip_verify=true",
ConnectionParams: "groupId=analytics&mechanism=scram-sha-256",
})
if config.Host != "127.0.0.1" || config.Port != 9092 {
t.Fatalf("unexpected primary broker: %#v", config)
}
if !reflect.DeepEqual(config.Hosts, []string{"127.0.0.2:9093"}) {
t.Fatalf("unexpected extra brokers: %#v", config.Hosts)
}
if config.User != "alice" || config.Password != "secret" {
t.Fatalf("unexpected credentials: %#v", config)
}
if config.Database != "orders.events" || config.Topology != "cluster" {
t.Fatalf("unexpected topic/topology: %#v", config)
}
if !config.UseSSL || config.SSLMode != "skip-verify" {
t.Fatalf("unexpected tls settings: %#v", config)
}
params := kafkaConnectionParams(config)
if params.Get("groupId") != "analytics" || params.Get("mechanism") != "scram-sha-256" {
t.Fatalf("unexpected kafka params: %#v", params)
}
}
func TestKafkaQueryShowTopicsAndDescribeTopic(t *testing.T) {
runtime := &fakeKafkaRuntime{
listTopicsResult: []kafkaTopicInfo{
{Name: "logs.app", Partitions: []kafka.Partition{{}, {}}},
{Name: "orders-events", Partitions: []kafka.Partition{{}}},
},
describeResult: kafkaTopicDescription{
Name: "logs.app",
Partitions: []kafkaTopicPartition{{
ID: 0,
Leader: kafka.Broker{Host: "127.0.0.1", Port: 9092},
EarliestOffset: 1,
LatestOffset: 9,
ApproximateCount: 8,
}},
},
}
client := &KafkaDB{runtime: runtime}
rows, columns, err := client.Query(`SHOW TOPICS LIMIT 1`)
if err != nil {
t.Fatalf("SHOW TOPICS failed: %v", err)
}
if len(rows) != 1 || rows[0]["topic"] != "logs.app" {
t.Fatalf("unexpected topic rows: %#v", rows)
}
if !containsString(columns, "partition_count") {
t.Fatalf("expected partition_count column, got %v", columns)
}
rows, columns, err = client.Query(`DESCRIBE TOPIC "logs.app"`)
if err != nil {
t.Fatalf("DESCRIBE TOPIC failed: %v", err)
}
if runtime.lastDescribeTopic != "logs.app" {
t.Fatalf("expected describe topic logs.app, got %q", runtime.lastDescribeTopic)
}
if len(rows) != 1 || rows[0]["leader"] != "127.0.0.1:9092" {
t.Fatalf("unexpected describe rows: %#v", rows)
}
if !containsString(columns, "approximate_count") {
t.Fatalf("expected approximate_count column, got %v", columns)
}
}
func TestKafkaQuerySelectAndConsumeKeepTopicNameIntact(t *testing.T) {
runtime := &fakeKafkaRuntime{
fetchResult: []kafkaMessageRecord{{
Message: kafka.Message{
Topic: "logs.app-1",
Partition: 2,
Offset: 42,
HighWaterMark: 100,
Key: []byte(`{"tenant":"a"}`),
Value: []byte(`{"event":"login","meta":{"ip":"127.0.0.1"}}`),
},
Key: map[string]interface{}{"tenant": "a"},
Value: map[string]interface{}{
"event": "login",
"meta": map[string]interface{}{"ip": "127.0.0.1"},
},
Headers: map[string]interface{}{"x-trace-id": "trace-1"},
}},
}
client := &KafkaDB{
runtime: runtime,
defaultGroup: "gonavi",
startLatest: false,
}
rows, columns, err := client.Query(`SELECT * FROM "logs.app-1" LIMIT 5 OFFSET 2`)
if err != nil {
t.Fatalf("SELECT failed: %v", err)
}
if runtime.lastFetchRequest.Topic != "logs.app-1" || runtime.lastFetchRequest.Limit != 5 || runtime.lastFetchRequest.Offset != 2 {
t.Fatalf("unexpected select fetch request: %#v", runtime.lastFetchRequest)
}
if len(rows) != 1 || rows[0]["value.meta.ip"] != "127.0.0.1" || rows[0]["headers.x-trace-id"] != "trace-1" {
t.Fatalf("unexpected select rows: %#v", rows)
}
if !containsString(columns, "value.meta.ip") || !containsString(columns, "headers.x-trace-id") {
t.Fatalf("unexpected columns: %v", columns)
}
_, _, err = client.Query(`CONSUME FROM "logs.app-1" LIMIT 3`)
if err != nil {
t.Fatalf("CONSUME failed: %v", err)
}
if runtime.lastFetchRequest.Topic != "logs.app-1" || runtime.lastFetchRequest.GroupID != "gonavi" || !runtime.lastFetchRequest.Latest {
t.Fatalf("unexpected consume request: %#v", runtime.lastFetchRequest)
}
}
func TestKafkaExecPublishesJSONCommand(t *testing.T) {
runtime := &fakeKafkaRuntime{publishAffected: 1}
client := &KafkaDB{runtime: runtime, defaultTopic: "orders.events"}
affected, err := client.Exec(`{"key":{"tenant":"a"},"value":{"id":1},"headers":{"x-env":"dev"}}`)
if err != nil {
t.Fatalf("Exec failed: %v", err)
}
if affected != 1 {
t.Fatalf("unexpected affected rows: %d", affected)
}
if runtime.lastPublishCommand.Topic != "orders.events" {
t.Fatalf("expected default topic publish, got %#v", runtime.lastPublishCommand)
}
if valueMap, ok := runtime.lastPublishCommand.Value.(map[string]interface{}); !ok || valueMap["id"] == nil {
t.Fatalf("unexpected publish value: %#v", runtime.lastPublishCommand.Value)
}
}
func TestKafkaGetColumnsIncludesDerivedFields(t *testing.T) {
runtime := &fakeKafkaRuntime{
fetchResult: []kafkaMessageRecord{{
Message: kafka.Message{Topic: "orders.events"},
Value: map[string]interface{}{
"meta": map[string]interface{}{
"ip": "127.0.0.1",
},
},
Headers: map[string]interface{}{"x-request-id": "req-1"},
}},
}
client := &KafkaDB{runtime: runtime}
columns, err := client.GetColumns("topics", "orders.events")
if err != nil {
t.Fatalf("GetColumns failed: %v", err)
}
names := make([]string, 0, len(columns))
for _, col := range columns {
names = append(names, col.Name)
}
joined := strings.Join(names, ",")
for _, want := range []string{"topic", "partition", "offset", "value.meta.ip", "headers.x-request-id"} {
if !strings.Contains(joined, want) {
t.Fatalf("expected derived column %q in %s", want, joined)
}
}
}