diff --git a/cmd/optional-driver-agent/provider_elasticsearch.go b/cmd/optional-driver-agent/provider_elasticsearch.go new file mode 100644 index 0000000..cd65b9f --- /dev/null +++ b/cmd/optional-driver-agent/provider_elasticsearch.go @@ -0,0 +1,12 @@ +//go:build gonavi_elasticsearch_driver + +package main + +import "GoNavi-Wails/internal/db" + +func init() { + agentDriverType = "elasticsearch" + agentDatabaseFactory = func() db.Database { + return &db.ElasticsearchDB{} + } +} diff --git a/docs/driver-manifest.json b/docs/driver-manifest.json index 7dd4802..81d826f 100644 --- a/docs/driver-manifest.json +++ b/docs/driver-manifest.json @@ -96,6 +96,12 @@ "version": "1.11.1", "checksumPolicy": "off", "downloadUrl": "builtin://activate/postgres" + }, + "elasticsearch": { + "engine": "go", + "version": "8.19.0", + "checksumPolicy": "off", + "downloadUrl": "builtin://activate/elasticsearch" } } } diff --git a/go.mod b/go.mod index daba08c..ea49a54 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.43.0 github.com/caretdev/go-irisnative v0.2.1 github.com/duckdb/duckdb-go/v2 v2.5.5 + github.com/elastic/go-elasticsearch/v8 v8.19.6 github.com/go-sql-driver/mysql v1.9.3 github.com/google/uuid v1.6.0 github.com/highgo/pq-sm3 v0.0.0 @@ -29,9 +30,11 @@ require ( ) require ( - github.com/kr/pretty v0.3.1 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + github.com/elastic/elastic-transport-go/v8 v8.9.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect ) require ( diff --git a/go.sum b/go.sum index a93b158..b15bbc0 100644 --- a/go.sum +++ b/go.sum @@ -38,7 +38,6 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -65,10 +64,19 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/elastic/elastic-transport-go/v8 v8.9.0 h1:KeT/2P54F0xS0S8Y3Pf+tFDg4HmBgReQMB+BMz8dDAs= +github.com/elastic/elastic-transport-go/v8 v8.9.0/go.mod h1:ssMTvNS2hwf7CaiGsRRsx4gQHFZ/jS/DkLcISxekWzc= +github.com/elastic/go-elasticsearch/v8 v8.19.6 h1:4qa7ecJkr5rLsoHKIVGbaqcFt2o57CnOHQJi9Pts/rk= +github.com/elastic/go-elasticsearch/v8 v8.19.6/go.mod h1:jeWebApE1oFEW/hKZqx/IRYmP/aa2+WMJkOfk+AduSI= github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= @@ -127,7 +135,6 @@ github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxh github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -186,7 +193,6 @@ github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0 github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -204,9 +210,8 @@ github.com/richardlehane/msoleps v1.0.4/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTK github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= @@ -276,8 +281,14 @@ go.mongodb.org/mongo-driver v1.17.9 h1:IexDdCuuNJ3BHrELgBlyaH9p60JXAvdzWR128q+U5 go.mongodb.org/mongo-driver v1.17.9/go.mod h1:LlOhpH5NUEfhxcAwG0UEkMqwYcc4JU18gtCdGudk/tQ= go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE= go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= diff --git a/internal/db/database_optional_factories_full.go b/internal/db/database_optional_factories_full.go index 3147992..41fc59b 100644 --- a/internal/db/database_optional_factories_full.go +++ b/internal/db/database_optional_factories_full.go @@ -20,4 +20,5 @@ func registerOptionalDatabaseFactories() { registerDatabaseFactory(newOptionalDriverAgentDatabase("mongodb"), "mongodb") registerDatabaseFactory(newOptionalDriverAgentDatabase("tdengine"), "tdengine") registerDatabaseFactory(newOptionalDriverAgentDatabase("clickhouse"), "clickhouse") + registerDatabaseFactory(newOptionalDriverAgentDatabase("elasticsearch"), "elasticsearch", "elastic") } diff --git a/internal/db/database_optional_factories_lite.go b/internal/db/database_optional_factories_lite.go index d726c0f..3e7e64d 100644 --- a/internal/db/database_optional_factories_lite.go +++ b/internal/db/database_optional_factories_lite.go @@ -20,4 +20,5 @@ func registerOptionalDatabaseFactories() { registerDatabaseFactory(newOptionalDriverAgentDatabase("mongodb"), "mongodb") registerDatabaseFactory(newOptionalDriverAgentDatabase("tdengine"), "tdengine") registerDatabaseFactory(newOptionalDriverAgentDatabase("clickhouse"), "clickhouse") + registerDatabaseFactory(newOptionalDriverAgentDatabase("elasticsearch"), "elasticsearch", "elastic") } diff --git a/internal/db/driver_agent_revisions_gen.go b/internal/db/driver_agent_revisions_gen.go index 264e525..d1041fb 100644 --- a/internal/db/driver_agent_revisions_gen.go +++ b/internal/db/driver_agent_revisions_gen.go @@ -20,6 +20,7 @@ func init() { "iris": "src-1b072c57af08bec4", "mongodb": "src-57fdd8bfebdcd46e", "tdengine": "src-939715f94df1ec9c", - "clickhouse": "src-482d62ed565b3e69", + "clickhouse": "src-482d62ed565b3e69", + "elasticsearch": "src-local", } } diff --git a/internal/db/driver_support.go b/internal/db/driver_support.go index 05f942e..87b97f5 100644 --- a/internal/db/driver_support.go +++ b/internal/db/driver_support.go @@ -35,9 +35,10 @@ var optionalGoDrivers = map[string]struct{}{ "vastbase": {}, "opengauss": {}, "iris": {}, - "mongodb": {}, - "tdengine": {}, - "clickhouse": {}, + "mongodb": {}, + "tdengine": {}, + "clickhouse": {}, + "elasticsearch": {}, } // optionalDriverAgentRevisions 记录 GoNavi 对各可选 driver-agent 包装逻辑的兼容版本。 @@ -63,6 +64,8 @@ func normalizeRuntimeDriverType(driverType string) string { return "opengauss" case "intersystems", "intersystemsiris", "inter-systems-iris", "inter-systems": return "iris" + case "elastic": + return "elasticsearch" default: return normalized } @@ -112,6 +115,8 @@ func driverDisplayName(driverType string) string { return "TDengine" case "clickhouse": return "ClickHouse" + case "elasticsearch": + return "Elasticsearch" default: return strings.ToUpper(strings.TrimSpace(driverType)) } diff --git a/internal/db/elasticsearch_helpers.go b/internal/db/elasticsearch_helpers.go new file mode 100644 index 0000000..dbcc34e --- /dev/null +++ b/internal/db/elasticsearch_helpers.go @@ -0,0 +1,366 @@ +//go:build gonavi_full_drivers || gonavi_elasticsearch_driver + +package db + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + proxytunnel "GoNavi-Wails/internal/proxy" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" +) + +const defaultEsPort = 9200 + +// ---- 配置规范化工具 ---- + +// normalizeElasticsearchConfig 规范化 Elasticsearch 连接配置。 +func normalizeElasticsearchConfig(config connection.ConnectionConfig) connection.ConnectionConfig { + runConfig := applyElasticsearchURI(config) + if strings.TrimSpace(runConfig.Host) == "" { + runConfig.Host = "localhost" + } + if runConfig.Port <= 0 { + runConfig.Port = defaultEsPort + } + return runConfig +} + +// applyElasticsearchURI 从 URI 中解析并回填连接参数。 +func applyElasticsearchURI(config connection.ConnectionConfig) connection.ConnectionConfig { + uriText := strings.TrimSpace(config.URI) + if uriText == "" { + return config + } + parsed, err := url.Parse(uriText) + if err != nil { + return config + } + scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme)) + if scheme != "http" && scheme != "https" { + return config + } + + if parsed.User != nil { + if strings.TrimSpace(config.User) == "" { + config.User = parsed.User.Username() + } + if pass, ok := parsed.User.Password(); ok && config.Password == "" { + config.Password = pass + } + } + + if scheme == "https" { + config.UseSSL = true + if strings.TrimSpace(config.SSLMode) == "" { + config.SSLMode = "required" + } + } + + if host := strings.TrimSpace(parsed.Host); host != "" { + if strings.TrimSpace(config.Host) == "" || config.Host == "localhost" { + h, port, ok := parseHostPortWithDefault(host, defaultEsPort) + if ok { + config.Host = h + config.Port = port + } + } + } + + return config +} + +// ---- 通用判断工具 ---- + +// isHiddenIndex 判断是否为 ES 隐藏索引(以 . 开头)。 +func isHiddenIndex(name string) bool { + return strings.HasPrefix(name, ".") +} + +// isJSONDSL 判断输入是否为 JSON DSL 格式。 +func isJSONDSL(query string) bool { + return strings.HasPrefix(query, "{") +} + +// resolveEsIndexName 从 dbName / tableName / 默认值中确定索引名。 +func resolveEsIndexName(dbName, tableName, defaultDB string) string { + if name := strings.TrimSpace(tableName); name != "" { + return name + } + if name := strings.TrimSpace(dbName); name != "" { + return name + } + return strings.TrimSpace(defaultDB) +} + +// ---- ES 客户端配置 ---- + +// esSSLAttemptLabel 返回连接尝试的模式标签。 +func esSSLAttemptLabel(config connection.ConnectionConfig, fallback bool) string { + if fallback { + return "明文回退" + } + if config.UseSSL { + return "SSL" + } + return "明文" +} + +// buildESClientConfig 从连接配置构建 ES 客户端配置。 +func buildESClientConfig(config connection.ConnectionConfig) elasticsearch.Config { + scheme := "http" + if config.UseSSL { + scheme = "https" + } + + cfg := elasticsearch.Config{ + Addresses: []string{ + fmt.Sprintf("%s://%s:%d", scheme, config.Host, config.Port), + }, + Username: strings.TrimSpace(config.User), + Password: config.Password, + } + + // TLS 配置 + tlsConfig, _ := resolveGenericTLSConfig(config) + if tlsConfig != nil { + cfg.Transport = &http.Transport{ + TLSClientConfig: tlsConfig, + } + } + + // 代理支持 + if config.UseProxy { + transport, ok := cfg.Transport.(*http.Transport) + if !ok { + transport = http.DefaultTransport.(*http.Transport).Clone() + } + proxyCfg := config.Proxy + transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return proxytunnel.DialContext(ctx, proxyCfg, network, addr) + } + cfg.Transport = transport + } + + // 超时设置 + timeout := getConnectTimeout(config) + if cfg.Transport == nil { + cfg.Transport = http.DefaultTransport.(*http.Transport).Clone() + } + if transport, ok := cfg.Transport.(*http.Transport); ok { + transport.ResponseHeaderTimeout = timeout + } + + return cfg +} + +// ---- 查询响应解析 ---- + +// esIndexInfo 用于解析 Cat Indices JSON 响应。 +type esIndexInfo struct { + Index string `json:"index"` + Health string `json:"health"` + Status string `json:"status"` + DocsCount string `json:"docs.count"` + StoreSize string `json:"store.size"` +} + +// esSearchResponse 用于解析 _search API 响应。 +type esSearchResponse struct { + Hits struct { + Total struct { + Value int64 `json:"value"` + } `json:"total"` + Hits []struct { + Source map[string]interface{} `json:"_source"` + Index string `json:"_index"` + ID string `json:"_id"` + } `json:"hits"` + } `json:"hits"` +} + +// esQueryWithDSL 使用 JSON DSL 执行 _search 查询。 +func (e *ElasticsearchDB) esQueryWithDSL(ctx context.Context, dsl string) ([]map[string]interface{}, []string, error) { + indexName := e.database + if indexName == "" { + indexName = "*" + } + + res, err := e.client.Search( + e.client.Search.WithContext(ctx), + e.client.Search.WithIndex(indexName), + e.client.Search.WithBody(strings.NewReader(dsl)), + ) + if err != nil { + return nil, nil, fmt.Errorf("Elasticsearch DSL 查询失败:%w", err) + } + defer res.Body.Close() + + return e.parseSearchResponse(res) +} + +// esQueryWithString 使用 query_string 模式执行查询。 +func (e *ElasticsearchDB) esQueryWithString(ctx context.Context, queryStr string) ([]map[string]interface{}, []string, error) { + indexName := e.database + if indexName == "" { + indexName = "*" + } + + dsl := fmt.Sprintf(`{"query":{"query_string":{"query":"%s"}}}`, strings.ReplaceAll(queryStr, `"`, `\"`)) + + res, err := e.client.Search( + e.client.Search.WithContext(ctx), + e.client.Search.WithIndex(indexName), + e.client.Search.WithBody(strings.NewReader(dsl)), + ) + if err != nil { + return nil, nil, fmt.Errorf("Elasticsearch 查询失败:%w", err) + } + defer res.Body.Close() + + return e.parseSearchResponse(res) +} + +// parseSearchResponse 解析 ES _search 响应为标准行格式。 +func (e *ElasticsearchDB) parseSearchResponse(res *esapi.Response) ([]map[string]interface{}, []string, error) { + if res.IsError() { + body, _ := io.ReadAll(res.Body) + return nil, nil, fmt.Errorf("Elasticsearch 查询错误:%s", string(body)) + } + + var result esSearchResponse + if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + return nil, nil, fmt.Errorf("解析查询结果失败:%w", err) + } + + columnSet := make(map[string]bool) + data := make([]map[string]interface{}, 0, len(result.Hits.Hits)) + + for _, hit := range result.Hits.Hits { + row := make(map[string]interface{}) + row["_index"] = hit.Index + row["_id"] = hit.ID + columnSet["_index"] = true + columnSet["_id"] = true + + for k, v := range hit.Source { + row[k] = v + columnSet[k] = true + } + data = append(data, row) + } + + columns := make([]string, 0, len(columnSet)) + for k := range columnSet { + columns = append(columns, k) + } + + return data, columns, nil +} + +// esFetchIndexMapping 获取索引的 mapping 定义。 +func (e *ElasticsearchDB) esFetchIndexMapping(indexName string) (map[string]interface{}, error) { + if e.client == nil { + return nil, fmt.Errorf("连接未打开") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + res, err := e.client.Indices.GetMapping( + e.client.Indices.GetMapping.WithContext(ctx), + e.client.Indices.GetMapping.WithIndex(indexName), + ) + if err != nil { + return nil, fmt.Errorf("获取索引 mapping 失败:%w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("获取索引 mapping 失败:%s", res.Status()) + } + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("读取 mapping 响应失败:%w", err) + } + + var mappingResult map[string]interface{} + if err := json.Unmarshal(body, &mappingResult); err != nil { + return nil, fmt.Errorf("解析 mapping 失败:%w", err) + } + + return mappingResult, nil +} + +// ---- Mapping 字段提取 ---- + +// extractColumnsFromMapping 从 mapping JSON 中提取字段定义。 +func extractColumnsFromMapping(indexName string, mapping map[string]interface{}) []connection.ColumnDefinition { + indexData, ok := mapping[indexName].(map[string]interface{}) + if !ok { + for _, v := range mapping { + if data, ok := v.(map[string]interface{}); ok { + indexData = data + break + } + } + } + if indexData == nil { + return []connection.ColumnDefinition{} + } + + mappings, ok := indexData["mappings"].(map[string]interface{}) + if !ok { + return []connection.ColumnDefinition{} + } + + properties, ok := mappings["properties"].(map[string]interface{}) + if !ok { + return []connection.ColumnDefinition{} + } + + columns := make([]connection.ColumnDefinition, 0, len(properties)) + for name, prop := range properties { + colType := extractEsFieldType(prop) + comment := "" + if propMap, ok := prop.(map[string]interface{}); ok { + if desc, ok := propMap["description"].(string); ok { + comment = desc + } + } + columns = append(columns, connection.ColumnDefinition{ + Name: name, + Type: colType, + Nullable: "YES", + Comment: comment, + }) + } + return columns +} + +// extractEsFieldType 从字段属性中提取类型描述。 +func extractEsFieldType(prop interface{}) string { + propMap, ok := prop.(map[string]interface{}) + if !ok { + return "unknown" + } + fieldType, _ := propMap["type"].(string) + if fieldType == "" { + if _, ok := propMap["properties"]; ok { + return "object" + } + return "unknown" + } + return fieldType +} diff --git a/internal/db/elasticsearch_impl.go b/internal/db/elasticsearch_impl.go new file mode 100644 index 0000000..ac41a7d --- /dev/null +++ b/internal/db/elasticsearch_impl.go @@ -0,0 +1,366 @@ +//go:build gonavi_full_drivers || gonavi_elasticsearch_driver + +package db + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "strconv" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/logger" + "GoNavi-Wails/internal/ssh" + + "github.com/elastic/go-elasticsearch/v8" +) + +const ( + defaultEsPingTimeout = 5 * time.Second + defaultEsQueryTimeout = 30 * time.Second +) + +// ElasticsearchDB 实现 Database 接口,提供 Elasticsearch 数据源连接能力。 +type ElasticsearchDB struct { + client *elasticsearch.Client + database string // 默认索引名 + pingTimeout time.Duration + forwarder *ssh.LocalForwarder +} + +// Connect 建立到 Elasticsearch 集群的连接。 +func (e *ElasticsearchDB) Connect(config connection.ConnectionConfig) error { + // 清理旧连接 + if e.forwarder != nil { + _ = e.forwarder.Close() + e.forwarder = nil + } + e.client = nil + + runConfig := normalizeElasticsearchConfig(config) + e.pingTimeout = getConnectTimeout(runConfig) + e.database = strings.TrimSpace(runConfig.Database) + + logger.Infof("Elasticsearch 连接准备:地址=%s:%d 用户=%s SSL=%t SSH=%t 超时=%s", + runConfig.Host, runConfig.Port, runConfig.User, runConfig.UseSSL, runConfig.UseSSH, e.pingTimeout) + + // SSH 隧道支持 + if runConfig.UseSSH { + logger.Infof("Elasticsearch 使用 SSH 连接:地址=%s:%d", runConfig.Host, runConfig.Port) + forwarder, err := ssh.GetOrCreateLocalForwarder(runConfig.SSH, runConfig.Host, runConfig.Port) + if err != nil { + return fmt.Errorf("创建 SSH 隧道失败:%w", err) + } + e.forwarder = forwarder + + host, portStr, err := net.SplitHostPort(forwarder.LocalAddr) + if err != nil { + return fmt.Errorf("解析本地转发地址失败:%w", err) + } + port, err := strconv.Atoi(portStr) + if err != nil { + return fmt.Errorf("解析本地端口失败:%w", err) + } + + runConfig.Host = host + runConfig.Port = port + runConfig.UseSSH = false + logger.Infof("Elasticsearch 通过本地端口转发连接:%s -> %s:%d", forwarder.LocalAddr, config.Host, config.Port) + } + + // SSL 回退尝试 + attempts := []connection.ConnectionConfig{runConfig} + if shouldTrySSLPreferredFallback(runConfig) { + attempts = append(attempts, withSSLDisabled(runConfig)) + } + + var lastErr error + for idx, attempt := range attempts { + sslLabel := esSSLAttemptLabel(attempt, idx > 0) + logger.Infof("Elasticsearch 连接尝试:%d/%d 模式=%s 地址=%s:%d", + idx+1, len(attempts), sslLabel, attempt.Host, attempt.Port) + + esCfg := buildESClientConfig(attempt) + client, err := elasticsearch.NewClient(esCfg) + if err != nil { + logger.Warnf("Elasticsearch 创建客户端失败:%d/%d 模式=%s 错误=%v", idx+1, len(attempts), sslLabel, err) + lastErr = err + continue + } + + e.client = client + if err := e.Ping(); err != nil { + e.client = nil + logger.Warnf("Elasticsearch 连接验证失败:%d/%d 模式=%s 错误=%v", idx+1, len(attempts), sslLabel, err) + lastErr = err + continue + } + + logger.Infof("Elasticsearch 连接成功:%d/%d 模式=%s", idx+1, len(attempts), sslLabel) + if idx > 0 { + logger.Warnf("Elasticsearch SSL 优先连接失败,已回退至明文连接") + } + return nil + } + + if lastErr != nil { + return fmt.Errorf("Elasticsearch 连接失败:%w", lastErr) + } + return fmt.Errorf("Elasticsearch 连接失败:无可用连接方案") +} + +// Close 关闭 Elasticsearch 连接。 +func (e *ElasticsearchDB) Close() error { + if e.forwarder != nil { + if err := e.forwarder.Close(); err != nil { + logger.Warnf("关闭 Elasticsearch SSH 端口转发失败:%v", err) + } + e.forwarder = nil + } + e.client = nil + return nil +} + +// Ping 检测 Elasticsearch 连通性。 +func (e *ElasticsearchDB) Ping() error { + if e.client == nil { + return fmt.Errorf("连接未打开") + } + timeout := e.pingTimeout + if timeout <= 0 { + timeout = defaultEsPingTimeout + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + res, err := e.client.Ping(e.client.Ping.WithContext(ctx)) + if err != nil { + return err + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("Elasticsearch Ping 失败:%s", res.Status()) + } + return nil +} + +// Query 执行 Elasticsearch 查询,支持 JSON DSL 和 query_string 两种模式。 +func (e *ElasticsearchDB) Query(query string) ([]map[string]interface{}, []string, error) { + if e.client == nil { + return nil, nil, fmt.Errorf("连接未打开") + } + + query = strings.TrimSpace(query) + if query == "" { + return nil, nil, fmt.Errorf("查询语句不能为空") + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultEsQueryTimeout) + defer cancel() + + if isJSONDSL(query) { + return e.esQueryWithDSL(ctx, query) + } + return e.esQueryWithString(ctx, query) +} + +// Exec 不支持 Elasticsearch 非查询语句执行。 +func (e *ElasticsearchDB) Exec(query string) (int64, error) { + return 0, fmt.Errorf("Elasticsearch 不支持执行非查询语句") +} + +// GetDatabases 列出所有 Elasticsearch 索引(排除隐藏索引)。 +func (e *ElasticsearchDB) GetDatabases() ([]string, error) { + if e.client == nil { + return nil, fmt.Errorf("连接未打开") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + res, err := e.client.Cat.Indices( + e.client.Cat.Indices.WithContext(ctx), + e.client.Cat.Indices.WithFormat("json"), + e.client.Cat.Indices.WithH("index"), + ) + if err != nil { + return nil, fmt.Errorf("获取索引列表失败:%w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("获取索引列表失败:%s", res.Status()) + } + + var indices []struct { + Index string `json:"index"` + } + if err := json.NewDecoder(res.Body).Decode(&indices); err != nil { + return nil, fmt.Errorf("解析索引列表失败:%w", err) + } + + result := make([]string, 0, len(indices)) + for _, idx := range indices { + name := strings.TrimSpace(idx.Index) + if name != "" && !isHiddenIndex(name) { + result = append(result, name) + } + } + return result, nil +} + +// GetTables 对 ES 而言索引即表,返回索引自身名称。 +func (e *ElasticsearchDB) GetTables(dbName string) ([]string, error) { + target := strings.TrimSpace(dbName) + if target == "" { + target = e.database + } + if target == "" { + return nil, fmt.Errorf("未指定索引名") + } + return []string{target}, nil +} + +// GetCreateStatement 返回索引的 settings + mappings 组合 JSON。 +func (e *ElasticsearchDB) GetCreateStatement(dbName, tableName string) (string, error) { + if e.client == nil { + return "", fmt.Errorf("连接未打开") + } + + indexName := resolveEsIndexName(dbName, tableName, e.database) + if indexName == "" { + return "", fmt.Errorf("未指定索引名") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + res, err := e.client.Indices.Get( + []string{indexName}, + e.client.Indices.Get.WithContext(ctx), + ) + if err != nil { + return "", fmt.Errorf("获取索引定义失败:%w", err) + } + defer res.Body.Close() + + if res.IsError() { + return "", fmt.Errorf("获取索引定义失败:%s", res.Status()) + } + + body, err := io.ReadAll(res.Body) + if err != nil { + return "", fmt.Errorf("读取索引定义失败:%w", err) + } + + var pretty map[string]interface{} + if err := json.Unmarshal(body, &pretty); err != nil { + return string(body), nil + } + formatted, _ := json.MarshalIndent(pretty, "", " ") + return string(formatted), nil +} + +// GetColumns 返回索引的 mapping 字段定义。 +func (e *ElasticsearchDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { + indexName := resolveEsIndexName(dbName, tableName, e.database) + if indexName == "" { + return nil, fmt.Errorf("未指定索引名") + } + + mapping, err := e.esFetchIndexMapping(indexName) + if err != nil { + return nil, err + } + return extractColumnsFromMapping(indexName, mapping), nil +} + +// GetAllColumns 返回索引的全部字段定义(带表名标识)。 +func (e *ElasticsearchDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) { + target := strings.TrimSpace(dbName) + if target == "" { + target = e.database + } + if target == "" { + return nil, fmt.Errorf("未指定索引名") + } + + mapping, err := e.esFetchIndexMapping(target) + if err != nil { + return nil, err + } + + columns := extractColumnsFromMapping(target, mapping) + result := make([]connection.ColumnDefinitionWithTable, 0, len(columns)) + for _, col := range columns { + result = append(result, connection.ColumnDefinitionWithTable{ + TableName: target, + Name: col.Name, + Type: col.Type, + Comment: col.Comment, + }) + } + return result, nil +} + +// GetIndexes 返回索引的统计信息。 +func (e *ElasticsearchDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) { + if e.client == nil { + return nil, fmt.Errorf("连接未打开") + } + + indexName := resolveEsIndexName(dbName, tableName, e.database) + if indexName == "" { + return nil, fmt.Errorf("未指定索引名") + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + res, err := e.client.Cat.Indices( + e.client.Cat.Indices.WithContext(ctx), + e.client.Cat.Indices.WithIndex(indexName), + e.client.Cat.Indices.WithFormat("json"), + e.client.Cat.Indices.WithH("index", "health", "status", "docs.count", "store.size"), + ) + if err != nil { + return nil, fmt.Errorf("获取索引信息失败:%w", err) + } + defer res.Body.Close() + + if res.IsError() { + return nil, fmt.Errorf("获取索引信息失败:%s", res.Status()) + } + + var info []esIndexInfo + if err := json.NewDecoder(res.Body).Decode(&info); err != nil { + return nil, fmt.Errorf("解析索引信息失败:%w", err) + } + + result := make([]connection.IndexDefinition, 0, len(info)) + for _, idx := range info { + result = append(result, connection.IndexDefinition{ + Name: idx.Index, + ColumnName: fmt.Sprintf("health=%s status=%s docs=%s size=%s", idx.Health, idx.Status, idx.DocsCount, idx.StoreSize), + NonUnique: 0, + SeqInIndex: 1, + IndexType: "INDEX", + }) + } + return result, nil +} + +// GetForeignKeys ES 不支持外键,返回空列表。 +func (e *ElasticsearchDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) { + return []connection.ForeignKeyDefinition{}, nil +} + +// GetTriggers ES 不支持触发器,返回空列表。 +func (e *ElasticsearchDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) { + return []connection.TriggerDefinition{}, nil +} diff --git a/internal/db/elasticsearch_impl_test.go b/internal/db/elasticsearch_impl_test.go new file mode 100644 index 0000000..ea28ebf --- /dev/null +++ b/internal/db/elasticsearch_impl_test.go @@ -0,0 +1,1111 @@ +//go:build gonavi_full_drivers || gonavi_elasticsearch_driver + +package db + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "slices" + "strings" + "testing" + + "GoNavi-Wails/internal/connection" + + "github.com/elastic/go-elasticsearch/v8" +) + +// ---- 测试辅助函数 ---- + +// newMockESServer 创建模拟 Elasticsearch REST API 的 HTTP 测试服务器。 +// 自动为所有响应添加 go-elasticsearch v8 客户端要求的 X-Elastic-Product 头。 +func newMockESServer(t *testing.T, handler http.HandlerFunc) *httptest.Server { + t.Helper() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + handler(w, r) + })) + t.Cleanup(server.Close) + return server +} + +// newTestESClient 创建连接到测试服务器的 ES 客户端。 +func newTestESClient(t *testing.T, serverURL string) *elasticsearch.Client { + t.Helper() + cfg := elasticsearch.Config{ + Addresses: []string{serverURL}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + t.Fatalf("创建测试 ES 客户端失败: %v", err) + } + return client +} + +// newTestESDB 创建连接到测试服务器的 ElasticsearchDB 实例。 +func newTestESDB(t *testing.T, serverURL, defaultIndex string) *ElasticsearchDB { + t.Helper() + return &ElasticsearchDB{ + client: newTestESClient(t, serverURL), + database: defaultIndex, + } +} + +// buildMockESMappingResponse 构造模拟的 mapping 响应 JSON。 +func buildMockESMappingResponse(indexName string, fields map[string]string) map[string]interface{} { + properties := make(map[string]interface{}) + for name, fieldType := range fields { + properties[name] = map[string]interface{}{"type": fieldType} + } + return map[string]interface{}{ + indexName: map[string]interface{}{ + "mappings": map[string]interface{}{ + "properties": properties, + }, + }, + } +} + +// writeJSON 将数据以 JSON 格式写入 HTTP 响应。 +func writeJSON(w http.ResponseWriter, data interface{}) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(data) +} + +// ---- 核心功能测试 ---- + +// TestElasticsearchPing 测试 Ping 成功和失败路径。 +func TestElasticsearchPing(t *testing.T) { + t.Run("ping 成功", func(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + // Ping 使用 HEAD / 方法 + if r.Method == http.MethodHead && r.URL.Path == "/" { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "") + if err := db.Ping(); err != nil { + t.Fatalf("Ping 应成功,但返回错误:%v", err) + } + }) + + t.Run("ping 服务端返回错误", func(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }) + + db := newTestESDB(t, server.URL, "") + err := db.Ping() + if err == nil { + t.Fatal("Ping 服务端 500 时应返回错误") + } + if !strings.Contains(err.Error(), "500") { + t.Fatalf("错误信息应包含状态码,实际:%v", err) + } + }) +} + +// TestElasticsearchGetDatabases 测试获取索引列表,验证隐藏索引过滤。 +func TestElasticsearchGetDatabases(t *testing.T) { + t.Run("正常获取并过滤隐藏索引", func(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/_cat/indices") && r.Method == http.MethodGet { + writeJSON(w, []map[string]string{ + {"index": "logs-2024"}, + {"index": "users"}, + {"index": ".security"}, + {"index": ".kibana_1"}, + {"index": "products"}, + }) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "") + databases, err := db.GetDatabases() + if err != nil { + t.Fatalf("GetDatabases 失败:%v", err) + } + + slices.Sort(databases) + expected := []string{"logs-2024", "products", "users"} + if len(databases) != len(expected) { + t.Fatalf("期望 %d 个索引,实际 %d:%v", len(expected), len(databases), databases) + } + for i, name := range expected { + if databases[i] != name { + t.Fatalf("索引 [%d] 期望 %q,实际 %q", i, name, databases[i]) + } + } + }) + + t.Run("连接未打开时返回错误", func(t *testing.T) { + db := &ElasticsearchDB{} + _, err := db.GetDatabases() + if err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("期望 '连接未打开' 错误,实际:%v", err) + } + }) +} + +// TestElasticsearchGetTables 测试 GetTables 返回索引名。 +func TestElasticsearchGetTables(t *testing.T) { + t.Run("指定索引名", func(t *testing.T) { + db := &ElasticsearchDB{database: "default-index"} + tables, err := db.GetTables("my-index") + if err != nil { + t.Fatalf("GetTables 失败:%v", err) + } + if len(tables) != 1 || tables[0] != "my-index" { + t.Fatalf("期望 [my-index],实际:%v", tables) + } + }) + + t.Run("回退到默认索引", func(t *testing.T) { + db := &ElasticsearchDB{database: "default-index"} + tables, err := db.GetTables("") + if err != nil { + t.Fatalf("GetTables 失败:%v", err) + } + if len(tables) != 1 || tables[0] != "default-index" { + t.Fatalf("期望 [default-index],实际:%v", tables) + } + }) + + t.Run("无索引名时报错", func(t *testing.T) { + db := &ElasticsearchDB{} + _, err := db.GetTables("") + if err == nil || !strings.Contains(err.Error(), "未指定索引名") { + t.Fatalf("期望 '未指定索引名' 错误,实际:%v", err) + } + }) +} + +// TestElasticsearchGetColumns 测试从 mapping 中提取字段定义。 +func TestElasticsearchGetColumns(t *testing.T) { + t.Run("正常提取字段", func(t *testing.T) { + fields := map[string]string{ + "title": "text", + "status": "keyword", + "price": "float", + "quantity": "integer", + } + mapping := buildMockESMappingResponse("test-index", fields) + + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/_mapping") && r.Method == http.MethodGet { + writeJSON(w, mapping) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "test-index") + columns, err := db.GetColumns("test-index", "") + if err != nil { + t.Fatalf("GetColumns 失败:%v", err) + } + if len(columns) != 4 { + t.Fatalf("期望 4 个字段,实际 %d", len(columns)) + } + + // 验证字段类型映射 + typeMap := make(map[string]string) + for _, col := range columns { + typeMap[col.Name] = col.Type + } + for name, expectedType := range fields { + if typeMap[name] != expectedType { + t.Fatalf("字段 %q 类型期望 %q,实际 %q", name, expectedType, typeMap[name]) + } + } + + // 验证所有字段标记为可空 + for _, col := range columns { + if col.Nullable != "YES" { + t.Fatalf("字段 %q Nullable 期望 YES,实际 %q", col.Name, col.Nullable) + } + } + }) + + t.Run("服务端返回错误", func(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error":"index_not_found"}`)) + }) + + db := newTestESDB(t, server.URL, "test-index") + _, err := db.GetColumns("test-index", "") + if err == nil { + t.Fatal("GetColumns 服务端 404 时应返回错误") + } + }) + + t.Run("连接未打开时返回错误", func(t *testing.T) { + db := &ElasticsearchDB{} + _, err := db.GetColumns("test-index", "") + if err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("期望 '连接未打开' 错误,实际:%v", err) + } + }) +} + +// TestElasticsearchGetAllColumns 测试获取全部字段。 +func TestElasticsearchGetAllColumns(t *testing.T) { + fields := map[string]string{ + "name": "text", + "email": "keyword", + } + mapping := buildMockESMappingResponse("users", fields) + + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/_mapping") { + writeJSON(w, mapping) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "users") + columns, err := db.GetAllColumns("") + if err != nil { + t.Fatalf("GetAllColumns 失败:%v", err) + } + if len(columns) != 2 { + t.Fatalf("期望 2 个字段,实际 %d", len(columns)) + } + + // 验证每个字段都带有表名标识 + for _, col := range columns { + if col.TableName != "users" { + t.Fatalf("字段 %q 的 TableName 期望 users,实际 %s", col.Name, col.TableName) + } + } +} + +// TestElasticsearchQueryDSL 测试 JSON DSL 查询模式。 +func TestElasticsearchQueryDSL(t *testing.T) { + t.Run("指定索引的 DSL 查询", func(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_search") { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "hits": { + "total": {"value": 1}, + "hits": [ + {"_index": "test-index", "_id": "1", "_source": {"title": "测试文档", "status": "active"}} + ] + } + }`)) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "test-index") + dsl := `{"query":{"match_all":{}}}` + rows, columns, err := db.Query(dsl) + if err != nil { + t.Fatalf("DSL 查询失败:%v", err) + } + if len(rows) != 1 { + t.Fatalf("期望 1 条结果,实际 %d", len(rows)) + } + + // 验证包含 _index 和 _id 元数据列 + colSet := make(map[string]bool) + for _, col := range columns { + colSet[col] = true + } + if !colSet["_index"] || !colSet["_id"] { + t.Fatalf("结果列应包含 _index 和 _id,实际:%v", columns) + } + + // 验证数据内容 + if rows[0]["title"] != "测试文档" { + t.Fatalf("期望 title=测试文档,实际:%v", rows[0]["title"]) + } + if rows[0]["_index"] != "test-index" { + t.Fatalf("期望 _index=test-index,实际:%v", rows[0]["_index"]) + } + }) + + t.Run("无默认索引时使用通配符", func(t *testing.T) { + var capturedPath string + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_search") { + capturedPath = r.URL.Path + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"hits":{"total":{"value":0},"hits":[]}}`)) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "") + _, _, err := db.Query(`{"query":{"match_all":{}}}`) + if err != nil { + t.Fatalf("DSL 查询失败:%v", err) + } + if !strings.HasPrefix(capturedPath, "/*/_search") { + t.Fatalf("无默认索引时应使用 * 通配符查询,实际路径:%s", capturedPath) + } + }) + + t.Run("查询服务端返回错误", func(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"parsing_exception"}`)) + }) + + db := newTestESDB(t, server.URL, "test-index") + _, _, err := db.Query(`{"query":{"invalid":{}}}`) + if err == nil { + t.Fatal("DSL 查询服务端错误时应返回错误") + } + }) +} + +// TestElasticsearchQueryString 测试 query_string 查询模式。 +func TestElasticsearchQueryString(t *testing.T) { + t.Run("简单字符串查询", func(t *testing.T) { + var capturedBody string + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_search") { + buf := make([]byte, r.ContentLength) + _, _ = r.Body.Read(buf) + capturedBody = string(buf) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "hits": { + "total": {"value": 2}, + "hits": [ + {"_index": "test", "_id": "1", "_source": {"title": "匹配结果1"}}, + {"_index": "test", "_id": "2", "_source": {"title": "匹配结果2"}} + ] + } + }`)) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "test") + rows, _, err := db.Query("hello world") + if err != nil { + t.Fatalf("query_string 查询失败:%v", err) + } + if len(rows) != 2 { + t.Fatalf("期望 2 条结果,实际 %d", len(rows)) + } + + // 验证请求体包含 query_string 包装 + if !strings.Contains(capturedBody, "query_string") { + t.Fatalf("请求体应包含 query_string,实际:%s", capturedBody) + } + if !strings.Contains(capturedBody, "hello world") { + t.Fatalf("请求体应包含查询文本,实际:%s", capturedBody) + } + }) + + t.Run("查询语句为空时报错", func(t *testing.T) { + db := newTestESDB(t, "http://localhost:9200", "test") + _, _, err := db.Query(" ") + if err == nil || !strings.Contains(err.Error(), "查询语句不能为空") { + t.Fatalf("期望 '查询语句不能为空' 错误,实际:%v", err) + } + }) + + t.Run("连接未打开时返回错误", func(t *testing.T) { + db := &ElasticsearchDB{} + _, _, err := db.Query("test") + if err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("期望 '连接未打开' 错误,实际:%v", err) + } + }) +} + +// TestElasticsearchExecNotSupported 测试 Exec 返回不支持错误。 +func TestElasticsearchExecNotSupported(t *testing.T) { + db := &ElasticsearchDB{} + rowsAffected, err := db.Exec("DELETE FROM test") + if err == nil || !strings.Contains(err.Error(), "不支持执行非查询语句") { + t.Fatalf("期望 '不支持执行非查询语句' 错误,实际:%v", err) + } + if rowsAffected != 0 { + t.Fatalf("Exec 应返回 0 受影响行数,实际:%d", rowsAffected) + } +} + +// TestElasticsearchGetForeignKeys 测试返回空外键列表。 +func TestElasticsearchGetForeignKeys(t *testing.T) { + db := &ElasticsearchDB{} + fks, err := db.GetForeignKeys("test-index", "test-table") + if err != nil { + t.Fatalf("GetForeignKeys 不应返回错误:%v", err) + } + if len(fks) != 0 { + t.Fatalf("GetForeignKeys 应返回空列表,实际:%v", fks) + } +} + +// TestElasticsearchGetTriggers 测试返回空触发器列表。 +func TestElasticsearchGetTriggers(t *testing.T) { + db := &ElasticsearchDB{} + triggers, err := db.GetTriggers("test-index", "test-table") + if err != nil { + t.Fatalf("GetTriggers 不应返回错误:%v", err) + } + if len(triggers) != 0 { + t.Fatalf("GetTriggers 应返回空列表,实际:%v", triggers) + } +} + +// TestElasticsearchConnectNilClient 测试未连接时各操作返回错误。 +func TestElasticsearchConnectNilClient(t *testing.T) { + db := &ElasticsearchDB{} + + // Ping + if err := db.Ping(); err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("Ping: 期望 '连接未打开' 错误,实际:%v", err) + } + + // GetDatabases + if _, err := db.GetDatabases(); err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("GetDatabases: 期望 '连接未打开' 错误,实际:%v", err) + } + + // GetColumns + if _, err := db.GetColumns("idx", ""); err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("GetColumns: 期望 '连接未打开' 错误,实际:%v", err) + } + + // GetAllColumns + if _, err := db.GetAllColumns("idx"); err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("GetAllColumns: 期望 '连接未打开' 错误,实际:%v", err) + } + + // GetIndexes + if _, err := db.GetIndexes("idx", "tbl"); err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("GetIndexes: 期望 '连接未打开' 错误,实际:%v", err) + } + + // GetCreateStatement(间接通过 esFetchIndexMapping) + if _, err := db.GetCreateStatement("idx", "tbl"); err == nil || !strings.Contains(err.Error(), "连接未打开") { + t.Fatalf("GetCreateStatement: 期望 '连接未打开' 错误,实际:%v", err) + } +} + +// TestElasticsearchGetCreateStatement 测试获取索引 settings + mappings。 +func TestElasticsearchGetCreateStatement(t *testing.T) { + indexDef := map[string]interface{}{ + "test-index": map[string]interface{}{ + "settings": map[string]interface{}{ + "index": map[string]interface{}{ + "number_of_shards": "1", + "number_of_replicas": "0", + }, + }, + "mappings": map[string]interface{}{ + "properties": map[string]interface{}{ + "title": map[string]interface{}{"type": "text"}, + }, + }, + }, + } + + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + // GetCreateStatement 使用 Indices.Get 方法,路径为 / + if r.Method == http.MethodGet && r.URL.Path == "/test-index" && !strings.Contains(r.URL.Path, "_") { + writeJSON(w, indexDef) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "test-index") + stmt, err := db.GetCreateStatement("test-index", "") + if err != nil { + t.Fatalf("GetCreateStatement 失败:%v", err) + } + if !strings.Contains(stmt, "test-index") { + t.Fatalf("CreateStatement 应包含索引名,实际:%s", stmt) + } + if !strings.Contains(stmt, "number_of_shards") { + t.Fatalf("CreateStatement 应包含 settings,实际:%s", stmt) + } + if !strings.Contains(stmt, "mappings") { + t.Fatalf("CreateStatement 应包含 mappings,实际:%s", stmt) + } +} + +// TestElasticsearchGetIndexes 测试获取索引统计信息。 +func TestElasticsearchGetIndexes(t *testing.T) { + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/_cat/indices") && r.Method == http.MethodGet { + writeJSON(w, []map[string]string{ + { + "index": "test-index", + "health": "green", + "status": "open", + "docs.count": "1000", + "store.size": "5mb", + }, + }) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + db := newTestESDB(t, server.URL, "test-index") + indexes, err := db.GetIndexes("test-index", "") + if err != nil { + t.Fatalf("GetIndexes 失败:%v", err) + } + if len(indexes) != 1 { + t.Fatalf("期望 1 个索引信息,实际 %d", len(indexes)) + } + + idx := indexes[0] + if idx.Name != "test-index" { + t.Fatalf("索引名期望 test-index,实际:%s", idx.Name) + } + if idx.IndexType != "INDEX" { + t.Fatalf("索引类型期望 INDEX,实际:%s", idx.IndexType) + } + if !strings.Contains(idx.ColumnName, "green") { + t.Fatalf("索引信息应包含 health=green,实际:%s", idx.ColumnName) + } + if !strings.Contains(idx.ColumnName, "1000") { + t.Fatalf("索引信息应包含 docs=1000,实际:%s", idx.ColumnName) + } +} + +// ---- 辅助函数测试 ---- + +// TestNormalizeElasticsearchConfig 测试配置规范化。 +func TestNormalizeElasticsearchConfig(t *testing.T) { + t.Run("设置默认值", func(t *testing.T) { + config := normalizeElasticsearchConfig(connection.ConnectionConfig{ + Type: "elasticsearch", + }) + if config.Host != "localhost" { + t.Fatalf("默认 Host 期望 localhost,实际:%q", config.Host) + } + if config.Port != defaultEsPort { + t.Fatalf("默认 Port 期望 %d,实际:%d", defaultEsPort, config.Port) + } + if config.User != "" { + t.Fatalf("默认 User 期望空字符串,实际:%q", config.User) + } + }) + + t.Run("保留用户设置", func(t *testing.T) { + config := normalizeElasticsearchConfig(connection.ConnectionConfig{ + Type: "elasticsearch", + Host: "es.example.com", + Port: 9201, + User: "admin", + Password: "secret", + }) + if config.Host != "es.example.com" { + t.Fatalf("Host 期望 es.example.com,实际:%q", config.Host) + } + if config.Port != 9201 { + t.Fatalf("Port 期望 9201,实际:%d", config.Port) + } + if config.User != "admin" { + t.Fatalf("User 期望 admin,实际:%q", config.User) + } + }) + + t.Run("从 URI 中提取配置", func(t *testing.T) { + config := normalizeElasticsearchConfig(connection.ConnectionConfig{ + Type: "elasticsearch", + URI: "http://uri-user:uri-pass@es-host:9202", + }) + if config.User != "uri-user" { + t.Fatalf("User 期望从 URI 提取 uri-user,实际:%q", config.User) + } + if config.Password != "uri-pass" { + t.Fatalf("Password 期望从 URI 提取 uri-pass,实际:%q", config.Password) + } + if config.Host != "es-host" { + t.Fatalf("Host 期望从 URI 提取 es-host,实际:%q", config.Host) + } + if config.Port != 9202 { + t.Fatalf("Port 期望从 URI 提取 9202,实际:%d", config.Port) + } + }) + + t.Run("已有 Host 时不从 URI 覆盖", func(t *testing.T) { + config := normalizeElasticsearchConfig(connection.ConnectionConfig{ + Type: "elasticsearch", + Host: "custom-host", + URI: "http://uri-user:uri-pass@uri-host:9200", + }) + if config.Host != "custom-host" { + t.Fatalf("已有 Host 时不应覆盖,期望 custom-host,实际:%q", config.Host) + } + }) +} + +// TestApplyElasticsearchURI 测试 URI 解析。 +func TestApplyElasticsearchURI(t *testing.T) { + t.Run("HTTPS URI 启用 SSL", func(t *testing.T) { + config := applyElasticsearchURI(connection.ConnectionConfig{ + URI: "https://user:pass@es.example.com:9200", + }) + if !config.UseSSL { + t.Fatal("HTTPS URI 应启用 SSL") + } + if config.SSLMode != "required" { + t.Fatalf("SSLMode 期望 required,实际:%q", config.SSLMode) + } + if config.Host != "es.example.com" { + t.Fatalf("Host 期望 es.example.com,实际:%q", config.Host) + } + }) + + t.Run("非 HTTP 协议忽略", func(t *testing.T) { + config := applyElasticsearchURI(connection.ConnectionConfig{ + URI: "tcp://localhost:9200", + }) + if config.Host != "" { + t.Fatalf("非 HTTP 协议不应设置 Host,实际:%q", config.Host) + } + }) + + t.Run("空 URI 不修改配置", func(t *testing.T) { + config := applyElasticsearchURI(connection.ConnectionConfig{ + Host: "original-host", + Port: 9300, + }) + if config.Host != "original-host" || config.Port != 9300 { + t.Fatal("空 URI 不应修改原有配置") + } + }) + + t.Run("已有用户凭证不被 URI 覆盖", func(t *testing.T) { + config := applyElasticsearchURI(connection.ConnectionConfig{ + User: "existing-user", + Password: "existing-pass", + URI: "http://uri-user:uri-pass@localhost:9200", + }) + if config.User != "existing-user" { + t.Fatalf("已有 User 不应覆盖,期望 existing-user,实际:%q", config.User) + } + if config.Password != "existing-pass" { + t.Fatalf("已有 Password 不应覆盖,期望 existing-pass,实际:%q", config.Password) + } + }) +} + +// TestExtractColumnsFromMapping 测试 mapping 字段提取。 +func TestExtractColumnsFromMapping(t *testing.T) { + t.Run("标准字段提取", func(t *testing.T) { + mapping := map[string]interface{}{ + "test-index": map[string]interface{}{ + "mappings": map[string]interface{}{ + "properties": map[string]interface{}{ + "title": map[string]interface{}{"type": "text"}, + "count": map[string]interface{}{"type": "long"}, + "tags": map[string]interface{}{"type": "keyword"}, + }, + }, + }, + } + + columns := extractColumnsFromMapping("test-index", mapping) + if len(columns) != 3 { + t.Fatalf("期望 3 个字段,实际 %d", len(columns)) + } + + typeMap := make(map[string]string) + for _, col := range columns { + typeMap[col.Name] = col.Type + } + expectedTypes := map[string]string{"title": "text", "count": "long", "tags": "keyword"} + for name, expectedType := range expectedTypes { + if typeMap[name] != expectedType { + t.Fatalf("字段 %q 类型期望 %q,实际 %q", name, expectedType, typeMap[name]) + } + } + }) + + t.Run("含 description 的字段提取注释", func(t *testing.T) { + mapping := map[string]interface{}{ + "idx": map[string]interface{}{ + "mappings": map[string]interface{}{ + "properties": map[string]interface{}{ + "email": map[string]interface{}{ + "type": "keyword", + "description": "用户邮箱地址", + }, + }, + }, + }, + } + + columns := extractColumnsFromMapping("idx", mapping) + if len(columns) != 1 { + t.Fatalf("期望 1 个字段,实际 %d", len(columns)) + } + if columns[0].Comment != "用户邮箱地址" { + t.Fatalf("期望注释 '用户邮箱地址',实际:%q", columns[0].Comment) + } + }) + + t.Run("空 mapping 返回空列表", func(t *testing.T) { + mapping := map[string]interface{}{} + columns := extractColumnsFromMapping("non-existent", mapping) + if len(columns) != 0 { + t.Fatalf("空 mapping 应返回空列表,实际 %d 个", len(columns)) + } + }) + + t.Run("索引数据无 mappings 字段", func(t *testing.T) { + mapping := map[string]interface{}{ + "idx": map[string]interface{}{ + "settings": map[string]interface{}{}, + }, + } + columns := extractColumnsFromMapping("idx", mapping) + if len(columns) != 0 { + t.Fatalf("无 mappings 时应返回空列表,实际 %d 个", len(columns)) + } + }) + + t.Run("从 mapping 响应中自动查找索引数据", func(t *testing.T) { + // 模拟 ES 返回的 mapping 响应,键名不完全匹配(如带日期后缀的索引别名) + mapping := map[string]interface{}{ + "logs-2024.01.01": map[string]interface{}{ + "mappings": map[string]interface{}{ + "properties": map[string]interface{}{ + "message": map[string]interface{}{"type": "text"}, + }, + }, + }, + } + columns := extractColumnsFromMapping("non-matching-key", mapping) + if len(columns) != 1 { + t.Fatalf("应自动查找 mapping 数据,期望 1 个字段,实际 %d 个", len(columns)) + } + }) +} + +// TestIsHiddenIndex 测试隐藏索引判断。 +func TestIsHiddenIndex(t *testing.T) { + tests := []struct { + name string + input string + expected bool + }{ + {"隐藏索引 .security", ".security", true}, + {"隐藏索引 .kibana_1", ".kibana_1", true}, + {"普通索引 logs-2024", "logs-2024", false}, + {"普通索引 users", "users", false}, + {"空字符串", "", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isHiddenIndex(tt.input); got != tt.expected { + t.Fatalf("isHiddenIndex(%q) = %v,期望 %v", tt.input, got, tt.expected) + } + }) + } +} + +// TestIsJSONDSL 测试 JSON DSL 检测。 +func TestIsJSONDSL(t *testing.T) { + tests := []struct { + name string + input string + expected bool + }{ + {"JSON DSL", `{"query":{"match_all":{}}}`, true}, + {"简单字符串", "hello world", false}, + {"空字符串", "", false}, + {"JSON 对象以空格开头", ` {"query":{}}`, false}, + {"非查询 JSON 前缀", `[1,2,3]`, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isJSONDSL(tt.input); got != tt.expected { + t.Fatalf("isJSONDSL(%q) = %v,期望 %v", tt.input, got, tt.expected) + } + }) + } +} + +// TestExtractEsFieldType 测试字段类型提取。 +func TestExtractEsFieldType(t *testing.T) { + tests := []struct { + name string + prop interface{} + expected string + }{ + { + name: "标准字段类型", + prop: map[string]interface{}{"type": "keyword"}, + expected: "keyword", + }, + { + name: "嵌套对象类型", + prop: map[string]interface{}{"properties": map[string]interface{}{}}, + expected: "object", + }, + { + name: "无 type 无 properties", + prop: map[string]interface{}{"enabled": true}, + expected: "unknown", + }, + { + name: "非 map 类型", + prop: "invalid", + expected: "unknown", + }, + { + name: "nil 值", + prop: nil, + expected: "unknown", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := extractEsFieldType(tt.prop); got != tt.expected { + t.Fatalf("extractEsFieldType() = %q,期望 %q", got, tt.expected) + } + }) + } +} + +// TestBuildESClientConfig 测试 ES 客户端配置构建。 +func TestBuildESClientConfig(t *testing.T) { + t.Run("HTTP 配置", func(t *testing.T) { + cfg := buildESClientConfig(connection.ConnectionConfig{ + Host: "localhost", + Port: 9200, + User: "elastic", + }) + if len(cfg.Addresses) != 1 || cfg.Addresses[0] != "http://localhost:9200" { + t.Fatalf("HTTP 地址期望 http://localhost:9200,实际:%v", cfg.Addresses) + } + if cfg.Username != "elastic" { + t.Fatalf("用户名期望 elastic,实际:%q", cfg.Username) + } + }) + + t.Run("HTTPS 配置", func(t *testing.T) { + cfg := buildESClientConfig(connection.ConnectionConfig{ + Host: "es.example.com", + Port: 9200, + UseSSL: true, + }) + if len(cfg.Addresses) != 1 || cfg.Addresses[0] != "https://es.example.com:9200" { + t.Fatalf("HTTPS 地址期望 https://es.example.com:9200,实际:%v", cfg.Addresses) + } + }) +} + +// TestResolveEsIndexName 测试索引名解析。 +func TestResolveEsIndexName(t *testing.T) { + tests := []struct { + name string + dbName string + tableName string + defaultDB string + expected string + }{ + { + name: "优先使用 tableName", + dbName: "db1", + tableName: "tbl1", + defaultDB: "default", + expected: "tbl1", + }, + { + name: "回退到 dbName", + dbName: "db1", + tableName: "", + defaultDB: "default", + expected: "db1", + }, + { + name: "回退到默认值", + dbName: "", + tableName: "", + defaultDB: "default", + expected: "default", + }, + { + name: "全部为空", + dbName: "", + tableName: "", + defaultDB: "", + expected: "", + }, + { + name: "空白字符等同于空", + dbName: " ", + tableName: " ", + defaultDB: "default", + expected: "default", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := resolveEsIndexName(tt.dbName, tt.tableName, tt.defaultDB) + if got != tt.expected { + t.Fatalf("resolveEsIndexName(%q, %q, %q) = %q,期望 %q", + tt.dbName, tt.tableName, tt.defaultDB, got, tt.expected) + } + }) + } +} + +// TestESMockIntegration 使用完整 mock 服务器的集成测试。 +func TestESMockIntegration(t *testing.T) { + // 构造完整的 mock mapping 响应 + mappingData := buildMockESMappingResponse("products", map[string]string{ + "name": "text", + "price": "float", + "in_stock": "boolean", + "created_at": "date", + "description": "text", + }) + + server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) { + path := r.URL.Path + + switch { + // Ping + case r.Method == http.MethodHead && path == "/": + w.WriteHeader(http.StatusOK) + + // Cat Indices + case strings.HasPrefix(path, "/_cat/indices") && r.Method == http.MethodGet: + writeJSON(w, []map[string]string{ + {"index": "products"}, + {"index": "orders"}, + {"index": ".internal"}, + }) + + // Mapping + case strings.HasSuffix(path, "/_mapping"): + writeJSON(w, mappingData) + + // GetCreateStatement + case r.Method == http.MethodGet && !strings.Contains(path, "_"): + writeJSON(w, map[string]interface{}{ + "products": map[string]interface{}{ + "settings": map[string]interface{}{"index": map[string]interface{}{"number_of_shards": "1"}}, + "mappings": map[string]interface{}{"properties": map[string]interface{}{"name": map[string]interface{}{"type": "text"}}}, + }, + }) + + // Search + case r.Method == http.MethodPost && strings.HasSuffix(path, "/_search"): + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "hits": { + "total": {"value": 3}, + "hits": [ + {"_index": "products", "_id": "1", "_source": {"name": "商品A", "price": 99.9}}, + {"_index": "products", "_id": "2", "_source": {"name": "商品B", "price": 199.9}}, + {"_index": "products", "_id": "3", "_source": {"name": "商品C", "price": 299.9}} + ] + } + }`)) + + default: + w.WriteHeader(http.StatusNotFound) + } + }) + + db := newTestESDB(t, server.URL, "products") + + // 验证 Ping + if err := db.Ping(); err != nil { + t.Fatalf("Ping 失败:%v", err) + } + + // 验证 GetDatabases(应过滤 .internal) + databases, err := db.GetDatabases() + if err != nil { + t.Fatalf("GetDatabases 失败:%v", err) + } + slices.Sort(databases) + if len(databases) != 2 || databases[0] != "orders" || databases[1] != "products" { + t.Fatalf("GetDatabases 期望 [orders, products],实际:%v", databases) + } + + // 验证 GetTables + tables, err := db.GetTables("") + if err != nil { + t.Fatalf("GetTables 失败:%v", err) + } + if len(tables) != 1 || tables[0] != "products" { + t.Fatalf("GetTables 期望 [products],实际:%v", tables) + } + + // 验证 GetColumns + columns, err := db.GetColumns("products", "") + if err != nil { + t.Fatalf("GetColumns 失败:%v", err) + } + if len(columns) != 5 { + t.Fatalf("GetColumns 期望 5 个字段,实际 %d", len(columns)) + } + + // 验证 DSL 查询 + rows, _, err := db.Query(`{"query":{"match_all":{}}}`) + if err != nil { + t.Fatalf("DSL 查询失败:%v", err) + } + if len(rows) != 3 { + t.Fatalf("DSL 查询期望 3 条结果,实际 %d", len(rows)) + } + + // 验证 query_string 查询 + rows, _, err = db.Query("商品") + if err != nil { + t.Fatalf("query_string 查询失败:%v", err) + } + if len(rows) != 3 { + t.Fatalf("query_string 查询期望 3 条结果,实际 %d", len(rows)) + } + + // 验证 GetCreateStatement + stmt, err := db.GetCreateStatement("products", "") + if err != nil { + t.Fatalf("GetCreateStatement 失败:%v", err) + } + if !strings.Contains(stmt, "products") { + t.Fatalf("GetCreateStatement 应包含索引名,实际:%s", stmt) + } + + // 验证 Exec 不支持 + _, err = db.Exec("DELETE products") + if err == nil || !strings.Contains(err.Error(), "不支持") { + t.Fatalf("Exec 应返回不支持错误,实际:%v", err) + } + + // 验证 GetForeignKeys / GetTriggers 返回空 + fks, _ := db.GetForeignKeys("products", "") + if len(fks) != 0 { + t.Fatalf("GetForeignKeys 应返回空,实际:%d", len(fks)) + } + triggers, _ := db.GetTriggers("products", "") + if len(triggers) != 0 { + t.Fatalf("GetTriggers 应返回空,实际:%d", len(triggers)) + } +} diff --git a/tools/generate-driver-agent-revisions.sh b/tools/generate-driver-agent-revisions.sh index 9fccef5..c417572 100755 --- a/tools/generate-driver-agent-revisions.sh +++ b/tools/generate-driver-agent-revisions.sh @@ -5,7 +5,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" cd "$SCRIPT_DIR" -DEFAULT_DRIVERS=(mariadb oceanbase diros starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine clickhouse) +DEFAULT_DRIVERS=(mariadb oceanbase diros starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine clickhouse elasticsearch) OUTPUT_FILE="internal/db/driver_agent_revisions_gen.go" usage() {