mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-06-23 06:53:52 +08:00
fix(datasource): ClickHouse 22.8 HTTP 握手兼容 displayName 缺失
clickhouse-go 在 HTTP 握手阶段执行 SELECT displayName(), version(), revision(), timezone(),而 ClickHouse 22.8 没有 displayName() 函数, 返回 Code 46 UNKNOWN_FUNCTION,导致即便已移除 client_protocol_version 的兼容重试路径仍然连接失败。 扩展现有 HTTP 兼容脚手架:新增 displayName Code 46 检测,复用同一条 兼容重试分支;兼容模式下的 RoundTripper 改写握手探测请求体,将 displayName() 替换为各版本通用的 hostName(),其余请求体原样放行。 Refs #479
This commit is contained in:
@@ -3,15 +3,18 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
@@ -208,6 +211,10 @@ func (c *ClickHouseDB) buildClickHouseOptionsWithHTTPCompatibility(config connec
|
||||
|
||||
type clickHouseHTTPClientProtocolVersionStripper struct {
|
||||
next http.RoundTripper
|
||||
// serverHelloRewritten 保证只对每个连接的首个握手探测请求改写一次,
|
||||
// 避免连接建立之后误改写恰好相同的用户查询(clickhouse-go 的 queryHello
|
||||
// 始终是连接上的第一个 HTTP 请求)。
|
||||
serverHelloRewritten *atomic.Bool
|
||||
}
|
||||
|
||||
func (rt clickHouseHTTPClientProtocolVersionStripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
@@ -218,19 +225,81 @@ func (rt clickHouseHTTPClientProtocolVersionStripper) RoundTrip(req *http.Reques
|
||||
if req == nil || req.URL == nil {
|
||||
return next.RoundTrip(req)
|
||||
}
|
||||
|
||||
query := req.URL.Query()
|
||||
if _, ok := query["client_protocol_version"]; !ok {
|
||||
stripParam := false
|
||||
if _, ok := query["client_protocol_version"]; ok {
|
||||
stripParam = true
|
||||
}
|
||||
|
||||
var (
|
||||
rewrittenBody []byte
|
||||
hadServerInfoQuery bool
|
||||
err error
|
||||
)
|
||||
// 仅在握手阶段(首个匹配请求)改写探测查询;后续用户查询一律放行。
|
||||
if rt.serverHelloRewritten == nil || !rt.serverHelloRewritten.Load() {
|
||||
rewrittenBody, hadServerInfoQuery, err = rewriteClickHouseServerHelloRequestBody(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if hadServerInfoQuery && rt.serverHelloRewritten != nil {
|
||||
rt.serverHelloRewritten.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
if !stripParam && !hadServerInfoQuery {
|
||||
return next.RoundTrip(req)
|
||||
}
|
||||
|
||||
cloned := req.Clone(req.Context())
|
||||
clonedURL := *req.URL
|
||||
query.Del("client_protocol_version")
|
||||
clonedURL.RawQuery = query.Encode()
|
||||
cloned.URL = &clonedURL
|
||||
if stripParam {
|
||||
clonedURL := *req.URL
|
||||
query.Del("client_protocol_version")
|
||||
clonedURL.RawQuery = query.Encode()
|
||||
cloned.URL = &clonedURL
|
||||
}
|
||||
if hadServerInfoQuery {
|
||||
cloned.Body = io.NopCloser(bytes.NewReader(rewrittenBody))
|
||||
cloned.ContentLength = int64(len(rewrittenBody))
|
||||
cloned.GetBody = func() (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader(rewrittenBody)), nil
|
||||
}
|
||||
}
|
||||
return next.RoundTrip(cloned)
|
||||
}
|
||||
|
||||
// clickHouseServerHelloQuery 是 clickhouse-go HTTP 驱动在握手阶段发送的服务端信息探测语句。
|
||||
// 旧版本服务端(如 ClickHouse 22.8)没有 displayName() 函数,会直接返回 UNKNOWN_FUNCTION。
|
||||
const clickHouseServerHelloQuery = "SELECT displayName(), version(), revision(), timezone()"
|
||||
|
||||
// clickHouseServerHelloCompatQuery 使用 hostName() 替换不存在的 displayName()。
|
||||
// hostName() 在所有受支持的 ClickHouse 版本上都可用,并返回服务端主机名,
|
||||
// 足以填充驱动握手所需的显示名称字段,其余 version()/revision()/timezone() 保持不变。
|
||||
const clickHouseServerHelloCompatQuery = "SELECT hostName(), version(), revision(), timezone()"
|
||||
|
||||
// rewriteClickHouseServerHelloRequestBody 检测并改写握手探测请求体,将 displayName() 替换为
|
||||
// hostName()。仅当请求体恰好是驱动的握手探测语句时才改写,其它请求体一律原样放行。
|
||||
func rewriteClickHouseServerHelloRequestBody(req *http.Request) ([]byte, bool, error) {
|
||||
if req == nil || req.Body == nil || req.Body == http.NoBody {
|
||||
return nil, false, nil
|
||||
}
|
||||
body, err := io.ReadAll(req.Body)
|
||||
closeErr := req.Body.Close()
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if closeErr != nil {
|
||||
return nil, false, closeErr
|
||||
}
|
||||
// 恢复原始请求体,保证非握手请求不受影响。
|
||||
req.Body = io.NopCloser(bytes.NewReader(body))
|
||||
if strings.TrimSpace(string(body)) != clickHouseServerHelloQuery {
|
||||
return nil, false, nil
|
||||
}
|
||||
return []byte(clickHouseServerHelloCompatQuery), true, nil
|
||||
}
|
||||
|
||||
func installClickHouseHTTPClientProtocolVersionStripper(opts *clickhouse.Options) {
|
||||
if opts == nil {
|
||||
return
|
||||
@@ -247,7 +316,10 @@ func installClickHouseHTTPClientProtocolVersionStripper(opts *clickhouse.Options
|
||||
next = wrapped
|
||||
}
|
||||
}
|
||||
return clickHouseHTTPClientProtocolVersionStripper{next: next}, nil
|
||||
return clickHouseHTTPClientProtocolVersionStripper{
|
||||
next: next,
|
||||
serverHelloRewritten: &atomic.Bool{},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -462,9 +534,32 @@ func isClickHouseHTTPClientProtocolVersionUnsupported(err error) bool {
|
||||
strings.Contains(text, "code: 115")
|
||||
}
|
||||
|
||||
// isClickHouseHTTPServerInfoFunctionUnsupported 识别 clickhouse-go 在 HTTP 握手阶段
|
||||
// 执行 "SELECT displayName(), version(), revision(), timezone()" 时,旧版本服务端
|
||||
// (如 ClickHouse 22.8)因不存在 displayName() 函数而返回的 Code 46 / UNKNOWN_FUNCTION 错误。
|
||||
func isClickHouseHTTPServerInfoFunctionUnsupported(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
text := strings.ToLower(strings.TrimSpace(err.Error()))
|
||||
if text == "" || !strings.Contains(text, "displayname") {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(text, "unknown function") ||
|
||||
strings.Contains(text, "unknown_function") ||
|
||||
strings.Contains(text, "code: 46")
|
||||
}
|
||||
|
||||
// shouldRetryClickHouseHTTPCompatibility 判断 HTTP 协议下的失败是否可以通过
|
||||
// HTTP 兼容模式(移除 client_protocol_version 并改写握手探测查询)重试解决。
|
||||
func shouldRetryClickHouseHTTPCompatibility(err error) bool {
|
||||
return isClickHouseHTTPClientProtocolVersionUnsupported(err) ||
|
||||
isClickHouseHTTPServerInfoFunctionUnsupported(err)
|
||||
}
|
||||
|
||||
func shouldTryNextClickHouseProtocol(protocol clickhouse.Protocol, err error) bool {
|
||||
return isClickHouseProtocolMismatch(err) ||
|
||||
(protocol == clickhouse.HTTP && isClickHouseHTTPClientProtocolVersionUnsupported(err))
|
||||
(protocol == clickhouse.HTTP && shouldRetryClickHouseHTTPCompatibility(err))
|
||||
}
|
||||
|
||||
func clickHouseProtocolName(protocol clickhouse.Protocol) string {
|
||||
@@ -510,6 +605,9 @@ func clickHouseAttemptFailureMessage(protocol clickhouse.Protocol, err error) st
|
||||
if protocol == clickhouse.HTTP && isClickHouseHTTPClientProtocolVersionUnsupported(err) {
|
||||
return "当前 ClickHouse HTTP 端口不支持 client_protocol_version(常见于 ClickHouse 22.8),将使用 HTTP 兼容模式重试;如仍失败请确认连接协议和端口"
|
||||
}
|
||||
if protocol == clickhouse.HTTP && isClickHouseHTTPServerInfoFunctionUnsupported(err) {
|
||||
return "当前 ClickHouse HTTP 端口不支持 displayName() 握手探测函数(常见于 ClickHouse 22.8),将使用 HTTP 兼容模式重试;如仍失败请确认连接协议和端口"
|
||||
}
|
||||
if isClickHouseProtocolMismatch(err) {
|
||||
if protocol == clickhouse.Native {
|
||||
return "服务端响应不像 Native 握手,当前端口更像 HTTP/HTTPS 端口;请选择 HTTP 协议,或确认 ClickHouse Native 端口"
|
||||
@@ -656,9 +754,13 @@ func (c *ClickHouseDB) Connect(config connection.ConnectionConfig) error {
|
||||
}
|
||||
if protocol == clickhouse.HTTP &&
|
||||
!stripHTTPClientProtocolVersion &&
|
||||
isClickHouseHTTPClientProtocolVersionUnsupported(err) &&
|
||||
shouldRetryClickHouseHTTPCompatibility(err) &&
|
||||
compatIdx+1 < len(compatibilityModes) {
|
||||
logger.Warnf("ClickHouse HTTP 端口不支持 client_protocol_version,改用 HTTP 兼容模式重试")
|
||||
if isClickHouseHTTPServerInfoFunctionUnsupported(err) {
|
||||
logger.Warnf("ClickHouse HTTP 端口不支持 displayName() 握手探测函数,改用 HTTP 兼容模式重试")
|
||||
} else {
|
||||
logger.Warnf("ClickHouse HTTP 端口不支持 client_protocol_version,改用 HTTP 兼容模式重试")
|
||||
}
|
||||
continue
|
||||
}
|
||||
break
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -355,6 +356,170 @@ func TestClickHouseHTTPClientProtocolVersionStripperRemovesDriverQueryParam(t *t
|
||||
}
|
||||
}
|
||||
|
||||
func TestClickHouseHTTPServerInfoFunctionUnsupportedEnablesCompatibilityRetry(t *testing.T) {
|
||||
err := errors.New(`failed to query server hello: failed to query server hello info: sendQuery: [HTTP 404] response body: "Code: 46. DB::Exception: Unknown function displayName: While processing displayName(), version(), revision(), timezone(). (UNKNOWN_FUNCTION)"`)
|
||||
if !isClickHouseHTTPServerInfoFunctionUnsupported(err) {
|
||||
t.Fatalf("expected displayName unknown function to be treated as HTTP server-info compatibility issue")
|
||||
}
|
||||
if !shouldRetryClickHouseHTTPCompatibility(err) {
|
||||
t.Fatalf("expected displayName unknown function to permit HTTP compatibility retry")
|
||||
}
|
||||
if !shouldTryNextClickHouseProtocol(clickhouse.HTTP, err) {
|
||||
t.Fatalf("expected HTTP displayName issue to permit protocol fallback")
|
||||
}
|
||||
if shouldTryNextClickHouseProtocol(clickhouse.Native, err) {
|
||||
t.Fatalf("native protocol should not treat HTTP displayName issue as retryable")
|
||||
}
|
||||
|
||||
message := clickHouseAttemptFailureMessage(clickhouse.HTTP, err)
|
||||
if !strings.Contains(message, "displayName") || !strings.Contains(message, "兼容模式") {
|
||||
t.Fatalf("expected displayName compatibility retry hint, got %q", message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsClickHouseHTTPServerInfoFunctionUnsupportedIgnoresUnrelatedErrors(t *testing.T) {
|
||||
if isClickHouseHTTPServerInfoFunctionUnsupported(nil) {
|
||||
t.Fatal("nil error should not be treated as server-info function issue")
|
||||
}
|
||||
if isClickHouseHTTPServerInfoFunctionUnsupported(errors.New("[HTTP 404] page not found")) {
|
||||
t.Fatal("plain 404 without displayName signal should not be treated as server-info function issue")
|
||||
}
|
||||
if isClickHouseHTTPServerInfoFunctionUnsupported(errors.New("Code: 60. DB::Exception: Unknown function someOtherFn")) {
|
||||
t.Fatal("unknown function error without displayName should not be treated as server-info function issue")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClickHouseHTTPCompatibilityStripperRewritesServerHelloQuery(t *testing.T) {
|
||||
var seenBody string
|
||||
stripper := clickHouseHTTPClientProtocolVersionStripper{
|
||||
next: roundTripFunc(func(req *http.Request) (*http.Response, error) {
|
||||
if req.Body != nil {
|
||||
data, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
seenBody = string(data)
|
||||
}
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader("")),
|
||||
Header: make(http.Header),
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
req, err := http.NewRequest(
|
||||
http.MethodPost,
|
||||
"http://clickhouse.local:8123/?database=default",
|
||||
strings.NewReader(clickHouseServerHelloQuery),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
|
||||
res, err := stripper.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatalf("round trip: %v", err)
|
||||
}
|
||||
if res != nil && res.Body != nil {
|
||||
res.Body.Close()
|
||||
}
|
||||
if strings.Contains(seenBody, "displayName()") {
|
||||
t.Fatalf("expected displayName() rewritten out of server hello query, got %q", seenBody)
|
||||
}
|
||||
if seenBody != clickHouseServerHelloCompatQuery {
|
||||
t.Fatalf("expected compatibility server hello query, got %q", seenBody)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClickHouseHTTPCompatibilityStripperRewritesServerHelloOnlyOnce(t *testing.T) {
|
||||
var seenBodies []string
|
||||
stripper := clickHouseHTTPClientProtocolVersionStripper{
|
||||
serverHelloRewritten: &atomic.Bool{},
|
||||
next: roundTripFunc(func(req *http.Request) (*http.Response, error) {
|
||||
if req.Body != nil {
|
||||
data, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
seenBodies = append(seenBodies, string(data))
|
||||
}
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader("")),
|
||||
Header: make(http.Header),
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
req, err := http.NewRequest(
|
||||
http.MethodPost,
|
||||
"http://clickhouse.local:8123/?database=default",
|
||||
strings.NewReader(clickHouseServerHelloQuery),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("new request %d: %v", i, err)
|
||||
}
|
||||
res, err := stripper.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatalf("round trip %d: %v", i, err)
|
||||
}
|
||||
if res != nil && res.Body != nil {
|
||||
res.Body.Close()
|
||||
}
|
||||
}
|
||||
|
||||
if len(seenBodies) != 2 {
|
||||
t.Fatalf("expected two forwarded requests, got %d", len(seenBodies))
|
||||
}
|
||||
if seenBodies[0] != clickHouseServerHelloCompatQuery {
|
||||
t.Fatalf("expected first (handshake) request rewritten, got %q", seenBodies[0])
|
||||
}
|
||||
if seenBodies[1] != clickHouseServerHelloQuery {
|
||||
t.Fatalf("expected second identical query left unchanged after handshake, got %q", seenBodies[1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestClickHouseHTTPCompatibilityStripperLeavesOtherBodiesUnchanged(t *testing.T) {
|
||||
const userQuery = "SELECT count() FROM system.tables"
|
||||
var seenBody string
|
||||
stripper := clickHouseHTTPClientProtocolVersionStripper{
|
||||
next: roundTripFunc(func(req *http.Request) (*http.Response, error) {
|
||||
if req.Body != nil {
|
||||
data, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
seenBody = string(data)
|
||||
}
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader("")),
|
||||
Header: make(http.Header),
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
req, err := http.NewRequest(
|
||||
http.MethodPost,
|
||||
"http://clickhouse.local:8123/?database=default",
|
||||
strings.NewReader(userQuery),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("new request: %v", err)
|
||||
}
|
||||
|
||||
res, err := stripper.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatalf("round trip: %v", err)
|
||||
}
|
||||
if res != nil && res.Body != nil {
|
||||
res.Body.Close()
|
||||
}
|
||||
if seenBody != userQuery {
|
||||
t.Fatalf("expected user query body untouched, got %q", seenBody)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithClickHouseProtocolForcesProtocolSelection(t *testing.T) {
|
||||
httpConfig := withClickHouseProtocol(connection.ConnectionConfig{
|
||||
Type: "clickhouse",
|
||||
|
||||
Reference in New Issue
Block a user