diff --git a/internal/db/clickhouse_impl.go b/internal/db/clickhouse_impl.go index d0952b8..473d822 100644 --- a/internal/db/clickhouse_impl.go +++ b/internal/db/clickhouse_impl.go @@ -3,15 +3,18 @@ package db import ( + "bytes" "context" "database/sql" "fmt" + "io" "net" "net/http" "net/url" "sort" "strconv" "strings" + "sync/atomic" "time" "unicode" "unicode/utf8" @@ -208,6 +211,10 @@ func (c *ClickHouseDB) buildClickHouseOptionsWithHTTPCompatibility(config connec type clickHouseHTTPClientProtocolVersionStripper struct { next http.RoundTripper + // serverHelloRewritten 保证只对每个连接的首个握手探测请求改写一次, + // 避免连接建立之后误改写恰好相同的用户查询(clickhouse-go 的 queryHello + // 始终是连接上的第一个 HTTP 请求)。 + serverHelloRewritten *atomic.Bool } func (rt clickHouseHTTPClientProtocolVersionStripper) RoundTrip(req *http.Request) (*http.Response, error) { @@ -218,19 +225,81 @@ func (rt clickHouseHTTPClientProtocolVersionStripper) RoundTrip(req *http.Reques if req == nil || req.URL == nil { return next.RoundTrip(req) } + query := req.URL.Query() - if _, ok := query["client_protocol_version"]; !ok { + stripParam := false + if _, ok := query["client_protocol_version"]; ok { + stripParam = true + } + + var ( + rewrittenBody []byte + hadServerInfoQuery bool + err error + ) + // 仅在握手阶段(首个匹配请求)改写探测查询;后续用户查询一律放行。 + if rt.serverHelloRewritten == nil || !rt.serverHelloRewritten.Load() { + rewrittenBody, hadServerInfoQuery, err = rewriteClickHouseServerHelloRequestBody(req) + if err != nil { + return nil, err + } + if hadServerInfoQuery && rt.serverHelloRewritten != nil { + rt.serverHelloRewritten.Store(true) + } + } + + if !stripParam && !hadServerInfoQuery { return next.RoundTrip(req) } cloned := req.Clone(req.Context()) - clonedURL := *req.URL - query.Del("client_protocol_version") - clonedURL.RawQuery = query.Encode() - cloned.URL = &clonedURL + if stripParam { + clonedURL := *req.URL + query.Del("client_protocol_version") + clonedURL.RawQuery = query.Encode() + cloned.URL = &clonedURL + } + if hadServerInfoQuery { + cloned.Body = io.NopCloser(bytes.NewReader(rewrittenBody)) + cloned.ContentLength = int64(len(rewrittenBody)) + cloned.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(rewrittenBody)), nil + } + } return next.RoundTrip(cloned) } +// clickHouseServerHelloQuery 是 clickhouse-go HTTP 驱动在握手阶段发送的服务端信息探测语句。 +// 旧版本服务端(如 ClickHouse 22.8)没有 displayName() 函数,会直接返回 UNKNOWN_FUNCTION。 +const clickHouseServerHelloQuery = "SELECT displayName(), version(), revision(), timezone()" + +// clickHouseServerHelloCompatQuery 使用 hostName() 替换不存在的 displayName()。 +// hostName() 在所有受支持的 ClickHouse 版本上都可用,并返回服务端主机名, +// 足以填充驱动握手所需的显示名称字段,其余 version()/revision()/timezone() 保持不变。 +const clickHouseServerHelloCompatQuery = "SELECT hostName(), version(), revision(), timezone()" + +// rewriteClickHouseServerHelloRequestBody 检测并改写握手探测请求体,将 displayName() 替换为 +// hostName()。仅当请求体恰好是驱动的握手探测语句时才改写,其它请求体一律原样放行。 +func rewriteClickHouseServerHelloRequestBody(req *http.Request) ([]byte, bool, error) { + if req == nil || req.Body == nil || req.Body == http.NoBody { + return nil, false, nil + } + body, err := io.ReadAll(req.Body) + closeErr := req.Body.Close() + if err != nil { + return nil, false, err + } + if closeErr != nil { + return nil, false, closeErr + } + // 恢复原始请求体,保证非握手请求不受影响。 + req.Body = io.NopCloser(bytes.NewReader(body)) + if strings.TrimSpace(string(body)) != clickHouseServerHelloQuery { + return nil, false, nil + } + return []byte(clickHouseServerHelloCompatQuery), true, nil +} + func installClickHouseHTTPClientProtocolVersionStripper(opts *clickhouse.Options) { if opts == nil { return @@ -247,7 +316,10 @@ func installClickHouseHTTPClientProtocolVersionStripper(opts *clickhouse.Options next = wrapped } } - return clickHouseHTTPClientProtocolVersionStripper{next: next}, nil + return clickHouseHTTPClientProtocolVersionStripper{ + next: next, + serverHelloRewritten: &atomic.Bool{}, + }, nil } } @@ -462,9 +534,32 @@ func isClickHouseHTTPClientProtocolVersionUnsupported(err error) bool { strings.Contains(text, "code: 115") } +// isClickHouseHTTPServerInfoFunctionUnsupported 识别 clickhouse-go 在 HTTP 握手阶段 +// 执行 "SELECT displayName(), version(), revision(), timezone()" 时,旧版本服务端 +// (如 ClickHouse 22.8)因不存在 displayName() 函数而返回的 Code 46 / UNKNOWN_FUNCTION 错误。 +func isClickHouseHTTPServerInfoFunctionUnsupported(err error) bool { + if err == nil { + return false + } + text := strings.ToLower(strings.TrimSpace(err.Error())) + if text == "" || !strings.Contains(text, "displayname") { + return false + } + return strings.Contains(text, "unknown function") || + strings.Contains(text, "unknown_function") || + strings.Contains(text, "code: 46") +} + +// shouldRetryClickHouseHTTPCompatibility 判断 HTTP 协议下的失败是否可以通过 +// HTTP 兼容模式(移除 client_protocol_version 并改写握手探测查询)重试解决。 +func shouldRetryClickHouseHTTPCompatibility(err error) bool { + return isClickHouseHTTPClientProtocolVersionUnsupported(err) || + isClickHouseHTTPServerInfoFunctionUnsupported(err) +} + func shouldTryNextClickHouseProtocol(protocol clickhouse.Protocol, err error) bool { return isClickHouseProtocolMismatch(err) || - (protocol == clickhouse.HTTP && isClickHouseHTTPClientProtocolVersionUnsupported(err)) + (protocol == clickhouse.HTTP && shouldRetryClickHouseHTTPCompatibility(err)) } func clickHouseProtocolName(protocol clickhouse.Protocol) string { @@ -510,6 +605,9 @@ func clickHouseAttemptFailureMessage(protocol clickhouse.Protocol, err error) st if protocol == clickhouse.HTTP && isClickHouseHTTPClientProtocolVersionUnsupported(err) { return "当前 ClickHouse HTTP 端口不支持 client_protocol_version(常见于 ClickHouse 22.8),将使用 HTTP 兼容模式重试;如仍失败请确认连接协议和端口" } + if protocol == clickhouse.HTTP && isClickHouseHTTPServerInfoFunctionUnsupported(err) { + return "当前 ClickHouse HTTP 端口不支持 displayName() 握手探测函数(常见于 ClickHouse 22.8),将使用 HTTP 兼容模式重试;如仍失败请确认连接协议和端口" + } if isClickHouseProtocolMismatch(err) { if protocol == clickhouse.Native { return "服务端响应不像 Native 握手,当前端口更像 HTTP/HTTPS 端口;请选择 HTTP 协议,或确认 ClickHouse Native 端口" @@ -656,9 +754,13 @@ func (c *ClickHouseDB) Connect(config connection.ConnectionConfig) error { } if protocol == clickhouse.HTTP && !stripHTTPClientProtocolVersion && - isClickHouseHTTPClientProtocolVersionUnsupported(err) && + shouldRetryClickHouseHTTPCompatibility(err) && compatIdx+1 < len(compatibilityModes) { - logger.Warnf("ClickHouse HTTP 端口不支持 client_protocol_version,改用 HTTP 兼容模式重试") + if isClickHouseHTTPServerInfoFunctionUnsupported(err) { + logger.Warnf("ClickHouse HTTP 端口不支持 displayName() 握手探测函数,改用 HTTP 兼容模式重试") + } else { + logger.Warnf("ClickHouse HTTP 端口不支持 client_protocol_version,改用 HTTP 兼容模式重试") + } continue } break diff --git a/internal/db/clickhouse_impl_test.go b/internal/db/clickhouse_impl_test.go index db19a5e..edf9e56 100644 --- a/internal/db/clickhouse_impl_test.go +++ b/internal/db/clickhouse_impl_test.go @@ -11,6 +11,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "testing" "time" @@ -355,6 +356,170 @@ func TestClickHouseHTTPClientProtocolVersionStripperRemovesDriverQueryParam(t *t } } +func TestClickHouseHTTPServerInfoFunctionUnsupportedEnablesCompatibilityRetry(t *testing.T) { + err := errors.New(`failed to query server hello: failed to query server hello info: sendQuery: [HTTP 404] response body: "Code: 46. DB::Exception: Unknown function displayName: While processing displayName(), version(), revision(), timezone(). (UNKNOWN_FUNCTION)"`) + if !isClickHouseHTTPServerInfoFunctionUnsupported(err) { + t.Fatalf("expected displayName unknown function to be treated as HTTP server-info compatibility issue") + } + if !shouldRetryClickHouseHTTPCompatibility(err) { + t.Fatalf("expected displayName unknown function to permit HTTP compatibility retry") + } + if !shouldTryNextClickHouseProtocol(clickhouse.HTTP, err) { + t.Fatalf("expected HTTP displayName issue to permit protocol fallback") + } + if shouldTryNextClickHouseProtocol(clickhouse.Native, err) { + t.Fatalf("native protocol should not treat HTTP displayName issue as retryable") + } + + message := clickHouseAttemptFailureMessage(clickhouse.HTTP, err) + if !strings.Contains(message, "displayName") || !strings.Contains(message, "兼容模式") { + t.Fatalf("expected displayName compatibility retry hint, got %q", message) + } +} + +func TestIsClickHouseHTTPServerInfoFunctionUnsupportedIgnoresUnrelatedErrors(t *testing.T) { + if isClickHouseHTTPServerInfoFunctionUnsupported(nil) { + t.Fatal("nil error should not be treated as server-info function issue") + } + if isClickHouseHTTPServerInfoFunctionUnsupported(errors.New("[HTTP 404] page not found")) { + t.Fatal("plain 404 without displayName signal should not be treated as server-info function issue") + } + if isClickHouseHTTPServerInfoFunctionUnsupported(errors.New("Code: 60. DB::Exception: Unknown function someOtherFn")) { + t.Fatal("unknown function error without displayName should not be treated as server-info function issue") + } +} + +func TestClickHouseHTTPCompatibilityStripperRewritesServerHelloQuery(t *testing.T) { + var seenBody string + stripper := clickHouseHTTPClientProtocolVersionStripper{ + next: roundTripFunc(func(req *http.Request) (*http.Response, error) { + if req.Body != nil { + data, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + seenBody = string(data) + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + Header: make(http.Header), + }, nil + }), + } + req, err := http.NewRequest( + http.MethodPost, + "http://clickhouse.local:8123/?database=default", + strings.NewReader(clickHouseServerHelloQuery), + ) + if err != nil { + t.Fatalf("new request: %v", err) + } + + res, err := stripper.RoundTrip(req) + if err != nil { + t.Fatalf("round trip: %v", err) + } + if res != nil && res.Body != nil { + res.Body.Close() + } + if strings.Contains(seenBody, "displayName()") { + t.Fatalf("expected displayName() rewritten out of server hello query, got %q", seenBody) + } + if seenBody != clickHouseServerHelloCompatQuery { + t.Fatalf("expected compatibility server hello query, got %q", seenBody) + } +} + +func TestClickHouseHTTPCompatibilityStripperRewritesServerHelloOnlyOnce(t *testing.T) { + var seenBodies []string + stripper := clickHouseHTTPClientProtocolVersionStripper{ + serverHelloRewritten: &atomic.Bool{}, + next: roundTripFunc(func(req *http.Request) (*http.Response, error) { + if req.Body != nil { + data, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + seenBodies = append(seenBodies, string(data)) + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + Header: make(http.Header), + }, nil + }), + } + + for i := 0; i < 2; i++ { + req, err := http.NewRequest( + http.MethodPost, + "http://clickhouse.local:8123/?database=default", + strings.NewReader(clickHouseServerHelloQuery), + ) + if err != nil { + t.Fatalf("new request %d: %v", i, err) + } + res, err := stripper.RoundTrip(req) + if err != nil { + t.Fatalf("round trip %d: %v", i, err) + } + if res != nil && res.Body != nil { + res.Body.Close() + } + } + + if len(seenBodies) != 2 { + t.Fatalf("expected two forwarded requests, got %d", len(seenBodies)) + } + if seenBodies[0] != clickHouseServerHelloCompatQuery { + t.Fatalf("expected first (handshake) request rewritten, got %q", seenBodies[0]) + } + if seenBodies[1] != clickHouseServerHelloQuery { + t.Fatalf("expected second identical query left unchanged after handshake, got %q", seenBodies[1]) + } +} + +func TestClickHouseHTTPCompatibilityStripperLeavesOtherBodiesUnchanged(t *testing.T) { + const userQuery = "SELECT count() FROM system.tables" + var seenBody string + stripper := clickHouseHTTPClientProtocolVersionStripper{ + next: roundTripFunc(func(req *http.Request) (*http.Response, error) { + if req.Body != nil { + data, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + seenBody = string(data) + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + Header: make(http.Header), + }, nil + }), + } + req, err := http.NewRequest( + http.MethodPost, + "http://clickhouse.local:8123/?database=default", + strings.NewReader(userQuery), + ) + if err != nil { + t.Fatalf("new request: %v", err) + } + + res, err := stripper.RoundTrip(req) + if err != nil { + t.Fatalf("round trip: %v", err) + } + if res != nil && res.Body != nil { + res.Body.Close() + } + if seenBody != userQuery { + t.Fatalf("expected user query body untouched, got %q", seenBody) + } +} + func TestWithClickHouseProtocolForcesProtocolSelection(t *testing.T) { httpConfig := withClickHouseProtocol(connection.ConnectionConfig{ Type: "clickhouse",