♻️ refactor(elasticsearch): 改用轻量 REST 客户端

- 使用标准库 HTTP 客户端实现 ES ping、索引、mapping 和查询请求

- 保留代理、TLS、超时和 BasicAuth 配置能力

- 移除 go-elasticsearch SDK 及间接依赖,降低 dev 构建下载风险

- 更新 Elasticsearch 后端单测适配 REST 客户端
This commit is contained in:
Syngnat
2026-06-02 15:30:32 +08:00
parent 05d1bc22c6
commit 864ad8a371
5 changed files with 128 additions and 104 deletions

View File

@@ -15,9 +15,6 @@ import (
"GoNavi-Wails/internal/connection"
proxytunnel "GoNavi-Wails/internal/proxy"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
const defaultEsPort = 9200
@@ -116,17 +113,31 @@ func esSSLAttemptLabel(config connection.ConnectionConfig, fallback bool) string
return "明文"
}
type esHTTPClientConfig struct {
BaseURL string
Username string
Password string
HTTPClient *http.Client
}
type esRESTClient struct {
baseURL string
username string
password string
httpClient *http.Client
}
// buildESClientConfig 从连接配置构建 ES 客户端配置。
func buildESClientConfig(config connection.ConnectionConfig) elasticsearch.Config {
func buildESClientConfig(config connection.ConnectionConfig) esHTTPClientConfig {
scheme := "http"
if config.UseSSL {
scheme = "https"
}
cfg := elasticsearch.Config{
Addresses: []string{
fmt.Sprintf("%s://%s:%d", scheme, config.Host, config.Port),
},
transport := http.DefaultTransport.(*http.Transport).Clone()
cfg := esHTTPClientConfig{
BaseURL: fmt.Sprintf("%s://%s:%d", scheme, config.Host, config.Port),
Username: strings.TrimSpace(config.User),
Password: config.Password,
}
@@ -134,36 +145,85 @@ func buildESClientConfig(config connection.ConnectionConfig) elasticsearch.Confi
// TLS 配置
tlsConfig, _ := resolveGenericTLSConfig(config)
if tlsConfig != nil {
cfg.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
transport.TLSClientConfig = tlsConfig
}
// 代理支持
if config.UseProxy {
transport, ok := cfg.Transport.(*http.Transport)
if !ok {
transport = http.DefaultTransport.(*http.Transport).Clone()
}
proxyCfg := config.Proxy
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return proxytunnel.DialContext(ctx, proxyCfg, network, addr)
}
cfg.Transport = transport
}
// 超时设置
timeout := getConnectTimeout(config)
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
if transport, ok := cfg.Transport.(*http.Transport); ok {
transport.ResponseHeaderTimeout = timeout
}
transport.ResponseHeaderTimeout = timeout
cfg.HTTPClient = &http.Client{Transport: transport}
return cfg
}
func newESRESTClient(config esHTTPClientConfig) (*esRESTClient, error) {
baseURL := strings.TrimRight(strings.TrimSpace(config.BaseURL), "/")
if baseURL == "" {
return nil, fmt.Errorf("Elasticsearch 地址不能为空")
}
if _, err := url.ParseRequestURI(baseURL); err != nil {
return nil, fmt.Errorf("Elasticsearch 地址无效:%w", err)
}
httpClient := config.HTTPClient
if httpClient == nil {
httpClient = http.DefaultClient
}
return &esRESTClient{
baseURL: baseURL,
username: strings.TrimSpace(config.Username),
password: config.Password,
httpClient: httpClient,
}, nil
}
func (c *esRESTClient) do(ctx context.Context, method string, path string, query url.Values, body io.Reader) (*http.Response, error) {
if c == nil || c.httpClient == nil {
return nil, fmt.Errorf("连接未打开")
}
requestURL := c.baseURL + path
if len(query) > 0 {
requestURL += "?" + query.Encode()
}
req, err := http.NewRequestWithContext(ctx, method, requestURL, body)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/json")
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
if c.username != "" || c.password != "" {
req.SetBasicAuth(c.username, c.password)
}
return c.httpClient.Do(req)
}
func esPathSegment(value string) string {
if value == "*" {
return "*"
}
return url.PathEscape(value)
}
func esResponseIsError(res *http.Response) bool {
return res == nil || res.StatusCode >= http.StatusBadRequest
}
func esResponseStatus(res *http.Response) string {
if res == nil {
return ""
}
return res.Status
}
// ---- 查询响应解析 ----
// esIndexInfo 用于解析 Cat Indices JSON 响应。
@@ -196,11 +256,7 @@ func (e *ElasticsearchDB) esQueryWithDSL(ctx context.Context, dsl string) ([]map
indexName = "*"
}
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(indexName),
e.client.Search.WithBody(strings.NewReader(dsl)),
)
res, err := e.client.do(ctx, http.MethodPost, "/"+esPathSegment(indexName)+"/_search", nil, strings.NewReader(dsl))
if err != nil {
return nil, nil, fmt.Errorf("Elasticsearch DSL 查询失败:%w", err)
}
@@ -218,11 +274,7 @@ func (e *ElasticsearchDB) esQueryWithString(ctx context.Context, queryStr string
dsl := fmt.Sprintf(`{"query":{"query_string":{"query":"%s"}}}`, strings.ReplaceAll(queryStr, `"`, `\"`))
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
e.client.Search.WithIndex(indexName),
e.client.Search.WithBody(strings.NewReader(dsl)),
)
res, err := e.client.do(ctx, http.MethodPost, "/"+esPathSegment(indexName)+"/_search", nil, strings.NewReader(dsl))
if err != nil {
return nil, nil, fmt.Errorf("Elasticsearch 查询失败:%w", err)
}
@@ -232,8 +284,8 @@ func (e *ElasticsearchDB) esQueryWithString(ctx context.Context, queryStr string
}
// parseSearchResponse 解析 ES _search 响应为标准行格式。
func (e *ElasticsearchDB) parseSearchResponse(res *esapi.Response) ([]map[string]interface{}, []string, error) {
if res.IsError() {
func (e *ElasticsearchDB) parseSearchResponse(res *http.Response) ([]map[string]interface{}, []string, error) {
if esResponseIsError(res) {
body, _ := io.ReadAll(res.Body)
return nil, nil, fmt.Errorf("Elasticsearch 查询错误:%s", string(body))
}
@@ -277,17 +329,14 @@ func (e *ElasticsearchDB) esFetchIndexMapping(indexName string) (map[string]inte
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := e.client.Indices.GetMapping(
e.client.Indices.GetMapping.WithContext(ctx),
e.client.Indices.GetMapping.WithIndex(indexName),
)
res, err := e.client.do(ctx, http.MethodGet, "/"+esPathSegment(indexName)+"/_mapping", nil, nil)
if err != nil {
return nil, fmt.Errorf("获取索引 mapping 失败:%w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("获取索引 mapping 失败:%s", res.Status())
if esResponseIsError(res) {
return nil, fmt.Errorf("获取索引 mapping 失败:%s", esResponseStatus(res))
}
body, err := io.ReadAll(res.Body)

View File

@@ -8,6 +8,8 @@ import (
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@@ -15,8 +17,6 @@ import (
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/logger"
"GoNavi-Wails/internal/ssh"
"github.com/elastic/go-elasticsearch/v8"
)
const (
@@ -26,7 +26,7 @@ const (
// ElasticsearchDB 实现 Database 接口,提供 Elasticsearch 数据源连接能力。
type ElasticsearchDB struct {
client *elasticsearch.Client
client *esRESTClient
database string // 默认索引名
pingTimeout time.Duration
forwarder *ssh.LocalForwarder
@@ -85,7 +85,7 @@ func (e *ElasticsearchDB) Connect(config connection.ConnectionConfig) error {
idx+1, len(attempts), sslLabel, attempt.Host, attempt.Port)
esCfg := buildESClientConfig(attempt)
client, err := elasticsearch.NewClient(esCfg)
client, err := newESRESTClient(esCfg)
if err != nil {
logger.Warnf("Elasticsearch 创建客户端失败:%d/%d 模式=%s 错误=%v", idx+1, len(attempts), sslLabel, err)
lastErr = err
@@ -137,14 +137,14 @@ func (e *ElasticsearchDB) Ping() error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
res, err := e.client.Ping(e.client.Ping.WithContext(ctx))
res, err := e.client.do(ctx, http.MethodHead, "/", nil, nil)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("Elasticsearch Ping 失败:%s", res.Status())
if esResponseIsError(res) {
return fmt.Errorf("Elasticsearch Ping 失败:%s", esResponseStatus(res))
}
return nil
}
@@ -183,18 +183,17 @@ func (e *ElasticsearchDB) GetDatabases() ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := e.client.Cat.Indices(
e.client.Cat.Indices.WithContext(ctx),
e.client.Cat.Indices.WithFormat("json"),
e.client.Cat.Indices.WithH("index"),
)
query := url.Values{}
query.Set("format", "json")
query.Set("h", "index")
res, err := e.client.do(ctx, http.MethodGet, "/_cat/indices", query, nil)
if err != nil {
return nil, fmt.Errorf("获取索引列表失败:%w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("获取索引列表失败:%s", res.Status())
if esResponseIsError(res) {
return nil, fmt.Errorf("获取索引列表失败:%s", esResponseStatus(res))
}
var indices []struct {
@@ -240,17 +239,14 @@ func (e *ElasticsearchDB) GetCreateStatement(dbName, tableName string) (string,
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := e.client.Indices.Get(
[]string{indexName},
e.client.Indices.Get.WithContext(ctx),
)
res, err := e.client.do(ctx, http.MethodGet, "/"+esPathSegment(indexName), nil, nil)
if err != nil {
return "", fmt.Errorf("获取索引定义失败:%w", err)
}
defer res.Body.Close()
if res.IsError() {
return "", fmt.Errorf("获取索引定义失败:%s", res.Status())
if esResponseIsError(res) {
return "", fmt.Errorf("获取索引定义失败:%s", esResponseStatus(res))
}
body, err := io.ReadAll(res.Body)
@@ -322,19 +318,17 @@ func (e *ElasticsearchDB) GetIndexes(dbName, tableName string) ([]connection.Ind
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := e.client.Cat.Indices(
e.client.Cat.Indices.WithContext(ctx),
e.client.Cat.Indices.WithIndex(indexName),
e.client.Cat.Indices.WithFormat("json"),
e.client.Cat.Indices.WithH("index", "health", "status", "docs.count", "store.size"),
)
query := url.Values{}
query.Set("format", "json")
query.Set("h", "index,health,status,docs.count,store.size")
res, err := e.client.do(ctx, http.MethodGet, "/_cat/indices/"+esPathSegment(indexName), query, nil)
if err != nil {
return nil, fmt.Errorf("获取索引信息失败:%w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("获取索引信息失败:%s", res.Status())
if esResponseIsError(res) {
return nil, fmt.Errorf("获取索引信息失败:%s", esResponseStatus(res))
}
var info []esIndexInfo

View File

@@ -11,8 +11,6 @@ import (
"testing"
"GoNavi-Wails/internal/connection"
"github.com/elastic/go-elasticsearch/v8"
)
// ---- 测试辅助函数 ----
@@ -29,13 +27,10 @@ func newMockESServer(t *testing.T, handler http.HandlerFunc) *httptest.Server {
return server
}
// newTestESClient 创建连接到测试服务器的 ES 客户端。
func newTestESClient(t *testing.T, serverURL string) *elasticsearch.Client {
// newTestESClient 创建连接到测试服务器的 ES REST 客户端。
func newTestESClient(t *testing.T, serverURL string) *esRESTClient {
t.Helper()
cfg := elasticsearch.Config{
Addresses: []string{serverURL},
}
client, err := elasticsearch.NewClient(cfg)
client, err := newESRESTClient(esHTTPClientConfig{BaseURL: serverURL})
if err != nil {
t.Fatalf("创建测试 ES 客户端失败: %v", err)
}
@@ -292,7 +287,7 @@ func TestElasticsearchQueryDSL(t *testing.T) {
t.Run("指定索引的 DSL 查询", func(t *testing.T) {
server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_search") {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"hits": {
"total": {"value": 1},
@@ -339,7 +334,7 @@ func TestElasticsearchQueryDSL(t *testing.T) {
server := newMockESServer(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_search") {
capturedPath = r.URL.Path
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"hits":{"total":{"value":0},"hits":[]}}`))
return
}
@@ -379,7 +374,7 @@ func TestElasticsearchQueryString(t *testing.T) {
buf := make([]byte, r.ContentLength)
_, _ = r.Body.Read(buf)
capturedBody = string(buf)
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"hits": {
"total": {"value": 2},
@@ -895,8 +890,8 @@ func TestBuildESClientConfig(t *testing.T) {
Port: 9200,
User: "elastic",
})
if len(cfg.Addresses) != 1 || cfg.Addresses[0] != "http://localhost:9200" {
t.Fatalf("HTTP 地址期望 http://localhost:9200实际%v", cfg.Addresses)
if cfg.BaseURL != "http://localhost:9200" {
t.Fatalf("HTTP 地址期望 http://localhost:9200实际%v", cfg.BaseURL)
}
if cfg.Username != "elastic" {
t.Fatalf("用户名期望 elastic实际%q", cfg.Username)
@@ -909,8 +904,8 @@ func TestBuildESClientConfig(t *testing.T) {
Port: 9200,
UseSSL: true,
})
if len(cfg.Addresses) != 1 || cfg.Addresses[0] != "https://es.example.com:9200" {
t.Fatalf("HTTPS 地址期望 https://es.example.com:9200实际%v", cfg.Addresses)
if cfg.BaseURL != "https://es.example.com:9200" {
t.Fatalf("HTTPS 地址期望 https://es.example.com:9200实际%v", cfg.BaseURL)
}
})
}