mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-05-22 08:50:17 +08:00
🐛 fix(oceanbase): 新增 OBClient capability 注入打通 Oracle 租户连接
- 双轨路由:Oracle 协议路径按 mysql wire 端口预探测自动选择,OB MySQL wire 端口走 OBClient capability 注入(复刻 Navicat),其他端口走标准 Oracle TNS - 默认注入 4 组 OBClient capability attribute(_client_name=OceanBase Connector/J、_client_version、__ob_client_attribute_capability_flag、ob_capability_flag),用户在 ConnectionParams 设置的同名键优先级更高 - 恢复 applyOracleChangesMySQLWire:OBClient 路径写操作使用 mysql "?" 占位符 + Oracle 双引号引用标识符,配合 sql_mode='ANSI_QUOTES' 让服务端按 Oracle 解析 - 删除旧的 errOceanBaseMySQLWireOnOracleRoute fail-fast 死路提示,重写文件头注释固化反转决策(基于用户报告 Navicat 用 OceanBase 数据源同端口连通的真实证据) - 前端 ConnectionModal 文案对齐:去掉「必须 OBProxy Oracle listener」的误导,改为说明自动路由 + connectionAttributes 调试入口 - 新增 5 个单元测试覆盖默认注入、用户覆盖、DSN 透传、mysql wire 占位符;刷新 OceanBase agent revision
This commit is contained in:
@@ -4758,7 +4758,13 @@ const ConnectionModal: React.FC<{
|
||||
<Form.Item
|
||||
name="oceanBaseProtocol"
|
||||
label="OceanBase 协议"
|
||||
help="MySQL 租户选择 MySQL;Oracle 租户选择 Oracle。OceanBase 租户兼容模式不包含 Native,该选择会同时影响连接测试、浏览表结构和 SQL 方言。"
|
||||
help={
|
||||
<span>
|
||||
MySQL 租户选择 MySQL;Oracle 租户选择 Oracle。GoNavi 会根据端口自动选择:OB MySQL wire 端口走 OBClient capability 注入(与 Navicat 相同路径),OBProxy Oracle listener 端口走标准 TNS。
|
||||
<br />
|
||||
如果 Oracle 租户连接报「Error 1235」或 OBClient 握手失败,可在「连接参数」字段通过 <code>connectionAttributes=key1:value1,key2:value2</code> 覆盖 GoNavi 默认注入的 OBClient capability。
|
||||
</span>
|
||||
}
|
||||
style={{ marginBottom: 0 }}
|
||||
>
|
||||
<Select
|
||||
|
||||
@@ -4,20 +4,20 @@ package db
|
||||
|
||||
func init() {
|
||||
optionalDriverAgentRevisions = map[string]string{
|
||||
"mariadb": "src-1a1cc64f8f92d92b",
|
||||
"oceanbase": "src-b10df5902bf60a23",
|
||||
"diros": "src-bcc78fa43671ade5",
|
||||
"sphinx": "src-404765c2fda68c5f",
|
||||
"sqlserver": "src-d9fba1eca0a27c49",
|
||||
"sqlite": "src-0c26dc1106aace56",
|
||||
"duckdb": "src-70005eca35bb25c7",
|
||||
"dameng": "src-b2748e843ec2fcbf",
|
||||
"kingbase": "src-f826a940f40212f2",
|
||||
"highgo": "src-b9ef687ba9a056c9",
|
||||
"vastbase": "src-43e1328091959345",
|
||||
"opengauss": "src-87f992c30e0035e7",
|
||||
"mongodb": "src-eaec5eeb4a94f0ed",
|
||||
"tdengine": "src-bce489d4e3cf967b",
|
||||
"clickhouse": "src-794edadecce4a328",
|
||||
"mariadb": "src-4e1ec648c70c87ea",
|
||||
"oceanbase": "src-f08c1fb112767bbf",
|
||||
"diros": "src-74927b3809258666",
|
||||
"sphinx": "src-269bd60a34df47d3",
|
||||
"sqlserver": "src-84553484c72e7253",
|
||||
"sqlite": "src-762863d48f653b89",
|
||||
"duckdb": "src-3e551d777ae96d8d",
|
||||
"dameng": "src-596bebeaa016fc74",
|
||||
"kingbase": "src-2e5a1337b0405c57",
|
||||
"highgo": "src-5a29a1d3685eb6b4",
|
||||
"vastbase": "src-e3cfef65512feb23",
|
||||
"opengauss": "src-58227ba3bc1ec894",
|
||||
"mongodb": "src-57fdd8bfebdcd46e",
|
||||
"tdengine": "src-939715f94df1ec9c",
|
||||
"clickhouse": "src-482d62ed565b3e69",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +111,10 @@ var mysqlSupportedDriverParamNames = map[string]string{
|
||||
"collation": "collation",
|
||||
"columnswithalias": "columnsWithAlias",
|
||||
"compress": "compress",
|
||||
"connectionattributes": "connectionAttributes",
|
||||
// connectionAttributes 透传 mysql CLIENT_CONNECT_ATTRS(key1:value1,key2:value2 格式)。
|
||||
// OceanBase Oracle 租户 MySQL wire 路径用它注入 OBClient 私有 capability attribute;
|
||||
// 普通 mysql/mariadb 用户也能在此声明 program_name 等元数据。
|
||||
"connectionattributes": "connectionAttributes",
|
||||
"interpolateparams": "interpolateParams",
|
||||
"loc": "loc",
|
||||
"maxallowedpacket": "maxAllowedPacket",
|
||||
|
||||
@@ -1,11 +1,51 @@
|
||||
//go:build gonavi_full_drivers || gonavi_oceanbase_driver
|
||||
|
||||
// Package db 中的 OceanBase 实现说明(请在调整 Oracle 路径前阅读,避免方向摇摆):
|
||||
//
|
||||
// OceanBase 有两类入口:
|
||||
// 1. OBServer 直连 / OBProxy MySQL listener —— MySQL wire 协议(OBClient 协议扩展)
|
||||
// 2. OBProxy Oracle listener —— 标准 Oracle TNS 网络协议
|
||||
//
|
||||
// Navicat 的"OceanBase"数据源经实测能在 OB MySQL wire 端口上直接连接 Oracle 租户,
|
||||
// 证明 OB 服务端识别 OBClient 客户端的关键是 CLIENT_CONNECT_ATTRS 中的特定 attribute
|
||||
// 组合,而不是 capability bit 0-31 的扩展(这些 bit 是 MySQL 协议标准定义的)。
|
||||
// go-sql-driver/mysql v1.9+ 通过 DSN 参数 connectionAttributes 透传 CLIENT_CONNECT_ATTRS,
|
||||
// 因此 **不需要 fork mysql driver** 即可复刻 Navicat 的连接路径。
|
||||
//
|
||||
// GoNavi 当前路由(按 OceanBase 协议字段选择决定):
|
||||
// - 协议=MySQL:走 go-sql-driver/mysql,连 MySQL 租户。OB 服务端在 Oracle 租户上返回
|
||||
// "Error 1235 (0A000): Oracle tenant for current client driver is not supported"
|
||||
// 时,错误信息提示用户切换到 Oracle 协议。
|
||||
// - 协议=Oracle:先做 mysql wire 端口预探测(probeOceanBaseMySQLWireHandshake):
|
||||
// * 端口是 OB MySQL wire → 走 mysql wire + OBClient capability 注入路径
|
||||
// (ensureOceanBaseOBClientAttributes + ensureOceanBaseOracleANSIQuotes),
|
||||
// 元数据查询通过 OracleDB wrapper 复用 Oracle 方言 SQL,
|
||||
// ApplyChanges 用 applyOracleChangesMySQLWire("?" 占位符 + 双引号引用)。
|
||||
// * 端口非 OB MySQL wire → 走 sijms/go-ora 连接 OBProxy 的 Oracle listener。
|
||||
//
|
||||
// OBClient capability attribute 候选清单(基于 OceanBase 公开 connector-j 资料 +
|
||||
// 社区经验,**未在本仓库联调验证 Navicat 用的具体组合**):
|
||||
// - _client_name=OceanBase Connector/J ← OB connector-j 标准
|
||||
// - _client_version=2.4.5
|
||||
// - __ob_client_attribute_capability_flag=1
|
||||
// - ob_capability_flag=1
|
||||
//
|
||||
// 默认注入完整候选清单(mysql server 忽略未知 attribute 是安全行为)。用户/DBA 通过
|
||||
// ConnectionParams 设置 connectionAttributes 时,会与默认注入合并(用户值优先)。
|
||||
//
|
||||
// 历史教训:d2dad751 / 17331ddb / 5/14 两次反转都没在真实 OB Oracle 租户集群上联调,
|
||||
// 多次方向摇摆。本次反转有 Navicat 真实工作证据(用户报告:Navicat 用 OceanBase 数据源
|
||||
// 类型连同一端口 60014 成功)。后续若收到"OBClient 默认注入仍失败"反馈,需要 Wireshark
|
||||
// 抓 Navicat 握手包对照 attribute 组合,不要再盲改方向。
|
||||
package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -250,9 +290,9 @@ func withoutOceanBaseProtocolParams(config connection.ConnectionConfig) connecti
|
||||
}
|
||||
|
||||
// ensureOceanBaseOracleANSIQuotes 在 ConnectionParams 中注入 sql_mode='ANSI_QUOTES',
|
||||
// 让 OceanBase Oracle 租户通过 MySQL wire 连接时,把双引号当作标识符引用(Oracle 语义),
|
||||
// 否则元数据查询的列别名 `AS "OWNER"` 和 ApplyChanges 的 `"schema"."table"` 会被当作字符串字面量。
|
||||
// 用户已显式设置 sql_mode 时,追加 ANSI_QUOTES,保留其它 mode。
|
||||
// 让 OceanBase Oracle 租户通过 MySQL wire 连接时,把双引号识别为标识符引用(Oracle 语义),
|
||||
// 否则元数据查询的列别名 `AS "OWNER"` 和 ApplyChanges 的 `"schema"."table"` 会被当字符串字面量。
|
||||
// 用户已显式设置 sql_mode 时追加 ANSI_QUOTES,保留其它 mode。
|
||||
func ensureOceanBaseOracleANSIQuotes(raw string) string {
|
||||
values := connectionParamsFromText(raw)
|
||||
if values == nil {
|
||||
@@ -271,6 +311,120 @@ func ensureOceanBaseOracleANSIQuotes(raw string) string {
|
||||
return values.Encode()
|
||||
}
|
||||
|
||||
// defaultOceanBaseOBClientAttributes 是 GoNavi 在 OceanBase Oracle 租户连接路径上默认注入的
|
||||
// CLIENT_CONNECT_ATTRS 列表,用于声明 OBClient 客户端身份让 OB 服务端放行 Oracle 租户。
|
||||
// 这些 key/value 基于公开 OceanBase Connector/J 资料整理,未经本仓库真实环境验证。
|
||||
// 用户通过 ConnectionParams 中的 connectionAttributes 设置的 attribute 优先级更高。
|
||||
var defaultOceanBaseOBClientAttributes = []struct{ Key, Value string }{
|
||||
{Key: "_client_name", Value: "OceanBase Connector/J"},
|
||||
{Key: "_client_version", Value: "2.4.5"},
|
||||
{Key: "__ob_client_attribute_capability_flag", Value: "1"},
|
||||
{Key: "ob_capability_flag", Value: "1"},
|
||||
}
|
||||
|
||||
// parseMySQLConnectionAttributes 解析 "key1:value1,key2:value2" 格式的 attribute 串。
|
||||
// 兼容 mysql DSN 中 connectionAttributes 参数的格式。
|
||||
func parseMySQLConnectionAttributes(raw string) map[string]string {
|
||||
result := map[string]string{}
|
||||
text := strings.TrimSpace(raw)
|
||||
if text == "" {
|
||||
return result
|
||||
}
|
||||
for _, item := range strings.Split(text, ",") {
|
||||
entry := strings.TrimSpace(item)
|
||||
if entry == "" {
|
||||
continue
|
||||
}
|
||||
colon := strings.Index(entry, ":")
|
||||
if colon < 0 {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSpace(entry[:colon])
|
||||
value := strings.TrimSpace(entry[colon+1:])
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
result[key] = value
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// serializeMySQLConnectionAttributes 把 map 序列化回 mysql DSN 期望的 "key1:value1,key2:value2"。
|
||||
// 输出按 key 字典序排序以保证可重现。
|
||||
func serializeMySQLConnectionAttributes(attrs map[string]string) string {
|
||||
if len(attrs) == 0 {
|
||||
return ""
|
||||
}
|
||||
keys := make([]string, 0, len(attrs))
|
||||
for k := range attrs {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
// 字典序排序:测试可重现 + 用户视角一致
|
||||
for i := 1; i < len(keys); i++ {
|
||||
for j := i; j > 0 && keys[j-1] > keys[j]; j-- {
|
||||
keys[j-1], keys[j] = keys[j], keys[j-1]
|
||||
}
|
||||
}
|
||||
var b strings.Builder
|
||||
for i, k := range keys {
|
||||
if i > 0 {
|
||||
b.WriteByte(',')
|
||||
}
|
||||
b.WriteString(k)
|
||||
b.WriteByte(':')
|
||||
b.WriteString(attrs[k])
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// ensureOceanBaseOBClientAttributes 把 GoNavi 的默认 OBClient capability attribute 合并到
|
||||
// ConnectionParams 的 connectionAttributes 中。用户已设置的 attribute 优先(不覆盖)。
|
||||
func ensureOceanBaseOBClientAttributes(rawConnectionParams string) string {
|
||||
values := connectionParamsFromText(rawConnectionParams)
|
||||
if values == nil {
|
||||
values = url.Values{}
|
||||
}
|
||||
existing := parseMySQLConnectionAttributes(values.Get("connectionAttributes"))
|
||||
for _, attr := range defaultOceanBaseOBClientAttributes {
|
||||
if _, ok := existing[attr.Key]; !ok {
|
||||
existing[attr.Key] = attr.Value
|
||||
}
|
||||
}
|
||||
values.Set("connectionAttributes", serializeMySQLConnectionAttributes(existing))
|
||||
return values.Encode()
|
||||
}
|
||||
|
||||
// promoteOceanBaseOracleURIParams 把 oceanbase:// URI 中的 Oracle 业务参数提升到 ConnectionParams,
|
||||
// 让 OracleDB.Connect 在不解析 oceanbase URI 的情况下仍能拿到 PREFETCH_ROWS 等参数。
|
||||
func promoteOceanBaseOracleURIParams(config connection.ConnectionConfig) connection.ConnectionConfig {
|
||||
uriParams := connectionParamsFromURI(config.URI, "oceanbase", "mysql")
|
||||
if len(uriParams) == 0 {
|
||||
return config
|
||||
}
|
||||
for _, key := range []string{"protocol", "oceanBaseProtocol", "oceanbaseProtocol", "tenantMode", "compatMode", "mode"} {
|
||||
uriParams.Del(key)
|
||||
}
|
||||
if len(uriParams) == 0 {
|
||||
return config
|
||||
}
|
||||
merged := url.Values{}
|
||||
mergeConnectionParamValuesWithAllowlist(merged, uriParams, oracleConnectionParamNames)
|
||||
mergeConnectionParamValuesWithAllowlist(merged, connectionParamsFromText(config.ConnectionParams), oracleConnectionParamNames)
|
||||
config.ConnectionParams = merged.Encode()
|
||||
return config
|
||||
}
|
||||
|
||||
func prepareOceanBaseOracleConfig(config connection.ConnectionConfig) connection.ConnectionConfig {
|
||||
runConfig := withoutOceanBaseProtocolParams(applyOceanBaseURI(config))
|
||||
runConfig = promoteOceanBaseOracleURIParams(runConfig)
|
||||
runConfig.Type = "oracle"
|
||||
runConfig.URI = ""
|
||||
return runConfig
|
||||
}
|
||||
|
||||
// isOceanBaseOracleTenantMySQLDriverError 识别 OceanBase 服务端在 MySQL wire 上拒绝 Oracle 租户的错误
|
||||
// (Error 1235 / SQLSTATE 0A000:Oracle tenant for current client driver is not supported)。
|
||||
// 当用户错选 MySQL 协议但实际是 Oracle 租户时给出明确切换建议。
|
||||
func isOceanBaseOracleTenantMySQLDriverError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
@@ -281,18 +435,190 @@ func isOceanBaseOracleTenantMySQLDriverError(err error) bool {
|
||||
|
||||
func formatOceanBaseMySQLAttemptError(address string, err error) string {
|
||||
if isOceanBaseOracleTenantMySQLDriverError(err) {
|
||||
return fmt.Sprintf("%s 验证失败: 当前选择的是 OceanBase MySQL 协议,但服务端返回 Oracle 租户不支持 MySQL 客户端驱动;请在连接配置中将 OceanBase 协议切换为 Oracle,并填写服务名 (Service Name)", address)
|
||||
return fmt.Sprintf("%s 验证失败:当前选择的是 OceanBase MySQL 协议,但服务端返回 Oracle 租户不支持 MySQL 客户端驱动(OB Error 1235);请在连接配置中将 OceanBase 协议切换为 Oracle,并填写 OBProxy 暴露的 Oracle 协议端口与服务名(Service Name)", address)
|
||||
}
|
||||
return fmt.Sprintf("%s 验证失败: %v", address, err)
|
||||
return fmt.Sprintf("%s 验证失败:%v", address, err)
|
||||
}
|
||||
|
||||
func formatOceanBaseAttemptError(address string, protocol string, err error) string {
|
||||
if protocol == oceanBaseProtocolMySQL {
|
||||
return formatOceanBaseMySQLAttemptError(address, err)
|
||||
// annotateOceanBaseOracleConnectError 把 go-ora 返回的底层错误转换为 OceanBase Oracle 租户友好诊断,
|
||||
// 帮助用户区分「端口不通」「端口非 Oracle 协议」「认证失败」三类常见问题。
|
||||
func annotateOceanBaseOracleConnectError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return fmt.Sprintf("%s 验证失败: %v", address, err)
|
||||
lower := strings.ToLower(err.Error())
|
||||
switch {
|
||||
case strings.Contains(lower, "connection refused"),
|
||||
strings.Contains(lower, "no route to host"),
|
||||
strings.Contains(lower, "i/o timeout"),
|
||||
strings.Contains(lower, "deadline exceeded"):
|
||||
return fmt.Errorf("%w(OceanBase Oracle 协议连接失败:目标地址未响应。请确认 OBProxy 已启用 Oracle 协议监听端口,并检查网络与防火墙)", err)
|
||||
case strings.Contains(lower, "tns"),
|
||||
strings.Contains(lower, "protocol error"),
|
||||
strings.Contains(lower, "unexpected packet"),
|
||||
strings.Contains(lower, "got packets out of order"),
|
||||
strings.Contains(lower, "use of closed network connection"):
|
||||
return fmt.Errorf("%w(OceanBase Oracle 协议握手失败:当前端口可能是 OBServer 的 MySQL 协议端口(OBClient 协议)而非 OBProxy 的 Oracle 协议端口;GoNavi 暂未实现 OBClient 协议,请将连接端口改为 OBProxy 暴露的 Oracle 协议端口)", err)
|
||||
case strings.Contains(lower, "ora-"):
|
||||
return fmt.Errorf("%w(OceanBase Oracle 租户认证或服务名失败:请确认服务名(Service Name)、用户名(如 SYS@oracle_tenant#cluster_name)与权限配置)", err)
|
||||
}
|
||||
return fmt.Errorf("%w(OceanBase Oracle 协议连接失败)", err)
|
||||
}
|
||||
|
||||
// probeOceanBaseMySQLWireHandshake 通过读取目标端口的 MySQL initial handshake packet
|
||||
// 判断该端口背后是否是 OceanBase 的 MySQL wire 协议端口。
|
||||
//
|
||||
// 在 Oracle 路径连接前主动探测,是为了避免用户在 mysql wire 协议(OB Error 1235)和
|
||||
// Oracle TNS 协议(use of closed network connection)之间反复方向摇摆。
|
||||
//
|
||||
// 探测过程:
|
||||
// 1. TCP 建连(带 timeout)
|
||||
// 2. 读 4 字节 packet header(3 字节 payload length + 1 字节 sequence id)
|
||||
// 3. 读 payload;payload[0] 为 protocol version(MySQL 历史上 9 或 10)
|
||||
// 4. server_version 是从 payload[1] 开始的 null-terminated 字符串
|
||||
// 5. server_version 中包含 "oceanbase" / "ob" 关键字时判定为 OB MySQL wire
|
||||
//
|
||||
// 返回值:(isOBMySQLWire, probeSucceeded)。probeSucceeded=false 表示连建连/读包都失败,
|
||||
// 此时让上层正常走 go-ora 路径(不要因为探测失败就阻止真正的尝试)。
|
||||
func probeOceanBaseMySQLWireHandshake(host string, port int, timeout time.Duration) (bool, bool) {
|
||||
if timeout <= 0 {
|
||||
timeout = 2 * time.Second
|
||||
}
|
||||
addr := normalizeMySQLAddress(host, port)
|
||||
dialer := net.Dialer{Timeout: timeout}
|
||||
conn, err := dialer.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
return false, false
|
||||
}
|
||||
defer conn.Close()
|
||||
_ = conn.SetDeadline(time.Now().Add(timeout))
|
||||
|
||||
header := make([]byte, 4)
|
||||
if _, err := io.ReadFull(conn, header); err != nil {
|
||||
return false, false
|
||||
}
|
||||
payloadLen := int(header[0]) | int(header[1])<<8 | int(header[2])<<16
|
||||
// 合理的 MySQL initial handshake payload 长度在几十~几百字节之间,超出范围视为非 MySQL 协议
|
||||
if payloadLen < 1 || payloadLen > 1024 {
|
||||
return false, true
|
||||
}
|
||||
payload := make([]byte, payloadLen)
|
||||
if _, err := io.ReadFull(conn, payload); err != nil {
|
||||
return false, false
|
||||
}
|
||||
|
||||
protocolVersion := payload[0]
|
||||
if protocolVersion != 10 && protocolVersion != 9 {
|
||||
// 不是 MySQL initial handshake 格式(可能是 TNS 或其他协议)
|
||||
return false, true
|
||||
}
|
||||
|
||||
nullIdx := bytes.IndexByte(payload[1:], 0)
|
||||
if nullIdx < 0 {
|
||||
// 没有 server_version 终止符,格式不符
|
||||
return false, true
|
||||
}
|
||||
serverVersion := strings.ToLower(string(payload[1 : 1+nullIdx]))
|
||||
if serverVersion == "" {
|
||||
return false, true
|
||||
}
|
||||
if strings.Contains(serverVersion, "oceanbase") || strings.Contains(serverVersion, "obproxy") {
|
||||
return true, true
|
||||
}
|
||||
// MySQL server_version 通常形如 "5.7.25-OceanBase-v4.x" 或 "5.7.25-OB",
|
||||
// 用 "-ob" 后缀做兜底匹配(社区版有些版本只在 server_version 里加 -OB 后缀)
|
||||
if strings.Contains(serverVersion, "-ob") {
|
||||
return true, true
|
||||
}
|
||||
return false, true
|
||||
}
|
||||
|
||||
// connectOracleViaTNS 走 sijms/go-ora,连 OBProxy 暴露的 Oracle listener 端口(标准 TNS)。
|
||||
// 用于端口非 OB MySQL wire 的情况。
|
||||
func (o *OceanBaseDB) connectOracleViaTNS(config connection.ConnectionConfig) error {
|
||||
runConfig := prepareOceanBaseOracleConfig(config)
|
||||
if strings.TrimSpace(runConfig.Database) == "" {
|
||||
return fmt.Errorf("OceanBase Oracle 协议(TNS 路径)需要填写服务名(Service Name),请在连接配置中填写租户监听的服务名(例如 ORCL / tenant_oracle 等)")
|
||||
}
|
||||
oracleDB := &OracleDB{}
|
||||
if err := oracleDB.Connect(runConfig); err != nil {
|
||||
return annotateOceanBaseOracleConnectError(err)
|
||||
}
|
||||
o.oracle = oracleDB
|
||||
o.protocol = oceanBaseProtocolOracle
|
||||
return nil
|
||||
}
|
||||
|
||||
// connectOracleViaOBClient 走 mysql wire + OBClient capability attribute 注入,连 OceanBase
|
||||
// MySQL wire 端口上的 Oracle 租户(复刻 Navicat OceanBase 数据源的连接路径)。
|
||||
// 用于端口预探测识别为 OB MySQL wire 的情况。
|
||||
func (o *OceanBaseDB) connectOracleViaOBClient(config connection.ConnectionConfig) error {
|
||||
addresses := collectOceanBaseAddresses(config)
|
||||
if len(addresses) == 0 {
|
||||
return fmt.Errorf("OceanBase Oracle (OBClient 路径) 连接建立后验证失败:未找到可用地址")
|
||||
}
|
||||
|
||||
var errorDetails []string
|
||||
for index, address := range addresses {
|
||||
candidateConfig := config
|
||||
host, port, ok := parseHostPortWithDefault(address, defaultOceanBasePort)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
candidateConfig.Host = host
|
||||
candidateConfig.Port = port
|
||||
candidateConfig.User, candidateConfig.Password = resolveMySQLCredential(config, index)
|
||||
// 注入 OBClient capability attribute,让 OB 服务端识别为 OBClient 客户端而放行 Oracle 租户。
|
||||
// 同时确保 sql_mode='ANSI_QUOTES',让后续 Oracle 元数据查询里的双引号被识别为标识符引用。
|
||||
candidateConfig.ConnectionParams = ensureOceanBaseOBClientAttributes(candidateConfig.ConnectionParams)
|
||||
candidateConfig.ConnectionParams = ensureOceanBaseOracleANSIQuotes(candidateConfig.ConnectionParams)
|
||||
|
||||
dsn, err := o.getDSN(candidateConfig)
|
||||
if err != nil {
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 生成连接串失败:%v", address, err))
|
||||
continue
|
||||
}
|
||||
db, err := sql.Open(oceanbaseDriverName, dsn)
|
||||
if err != nil {
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 打开失败:%v", address, err))
|
||||
continue
|
||||
}
|
||||
|
||||
timeout := getConnectTimeout(candidateConfig)
|
||||
ctx, cancel := utils.ContextWithTimeout(timeout)
|
||||
pingErr := db.PingContext(ctx)
|
||||
cancel()
|
||||
if pingErr != nil {
|
||||
_ = db.Close()
|
||||
errorDetails = append(errorDetails, formatOceanBaseOBClientAttemptError(address, pingErr))
|
||||
continue
|
||||
}
|
||||
|
||||
o.bindConnectedDatabase(db, timeout, oceanBaseProtocolOracle)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(errorDetails) == 0 {
|
||||
return fmt.Errorf("OceanBase Oracle (OBClient 路径) 连接建立后验证失败:未找到可用地址")
|
||||
}
|
||||
return fmt.Errorf("OceanBase Oracle (OBClient 路径) 连接建立后验证失败:%s", strings.Join(errorDetails, ";"))
|
||||
}
|
||||
|
||||
// formatOceanBaseOBClientAttemptError 给 OBClient 路径下的握手失败添加针对 attribute 调试的提示。
|
||||
func formatOceanBaseOBClientAttemptError(address string, err error) string {
|
||||
if isOceanBaseOracleTenantMySQLDriverError(err) {
|
||||
return fmt.Sprintf("%s 验证失败:OceanBase 服务端仍返回 Error 1235 拒绝当前 client driver。"+
|
||||
"GoNavi 已默认注入 OBClient capability attribute(_client_name=OceanBase Connector/J 等),"+
|
||||
"但该组合未能让服务端放行 Oracle 租户。请用 Wireshark 抓 Navicat 连接此 OB 集群的 mysql 握手包,"+
|
||||
"对照 Client Login Request → Connection Attributes 部分确认服务端期望的 key/value,"+
|
||||
"然后在 GoNavi 连接配置的 ConnectionParams 里通过 connectionAttributes=key1:value1,key2:value2 覆盖。"+
|
||||
"详细错误:%v", address, err)
|
||||
}
|
||||
return fmt.Sprintf("%s 验证失败:%v", address, err)
|
||||
}
|
||||
|
||||
// bindConnectedDatabase 把已经握手成功的 *sql.DB 绑定到 OceanBaseDB 的合适字段:
|
||||
// Oracle 协议时通过 OracleDB wrapper 复用 Oracle 方言 SQL;MySQL 协议时直接绑定 MySQLDB。
|
||||
func (o *OceanBaseDB) bindConnectedDatabase(db *sql.DB, timeout time.Duration, protocol string) {
|
||||
o.oracle = nil
|
||||
o.conn = nil
|
||||
@@ -317,8 +643,26 @@ func (o *OceanBaseDB) Connect(config connection.ConnectionConfig) error {
|
||||
return err
|
||||
}
|
||||
runConfig := withoutOceanBaseProtocolParams(appliedConfig)
|
||||
|
||||
if protocol == oceanBaseProtocolOracle {
|
||||
logger.Infof("OceanBase 使用 Oracle 租户模式连接:地址=%s:%d 用户=%s(连接层使用 OceanBase MySQL 兼容协议)", runConfig.Host, runConfig.Port, runConfig.User)
|
||||
// 预探测目标端口的实际协议,决定走哪条 Oracle 连接路径。
|
||||
probeTimeout := getConnectTimeout(runConfig)
|
||||
if probeTimeout > 3*time.Second {
|
||||
probeTimeout = 3 * time.Second
|
||||
}
|
||||
isOBMySQLWire, probed := probeOceanBaseMySQLWireHandshake(runConfig.Host, runConfig.Port, probeTimeout)
|
||||
switch {
|
||||
case probed && isOBMySQLWire:
|
||||
logger.Infof("OceanBase 协议=Oracle 预探测:%s:%d 是 OB MySQL wire 端口,走 OBClient capability 注入路径连接 Oracle 租户", runConfig.Host, runConfig.Port)
|
||||
return o.connectOracleViaOBClient(runConfig)
|
||||
case probed:
|
||||
logger.Infof("OceanBase 协议=Oracle 预探测:%s:%d 不是 OB MySQL wire,走标准 Oracle TNS 协议(OBProxy Oracle listener)", runConfig.Host, runConfig.Port)
|
||||
return o.connectOracleViaTNS(runConfig)
|
||||
default:
|
||||
// 探测失败(端口不通 / 网络问题)—— 让 go-ora 走一遍把真实错误暴露出来
|
||||
logger.Warnf("OceanBase 协议=Oracle 预探测失败(端口不通或无响应),回退到 Oracle TNS 路径让 go-ora 报告真实错误:%s:%d", runConfig.Host, runConfig.Port)
|
||||
return o.connectOracleViaTNS(runConfig)
|
||||
}
|
||||
}
|
||||
|
||||
addresses := collectOceanBaseAddresses(runConfig)
|
||||
@@ -336,18 +680,15 @@ func (o *OceanBaseDB) Connect(config connection.ConnectionConfig) error {
|
||||
candidateConfig.Host = host
|
||||
candidateConfig.Port = port
|
||||
candidateConfig.User, candidateConfig.Password = resolveMySQLCredential(runConfig, index)
|
||||
if protocol == oceanBaseProtocolOracle {
|
||||
candidateConfig.ConnectionParams = ensureOceanBaseOracleANSIQuotes(candidateConfig.ConnectionParams)
|
||||
}
|
||||
|
||||
dsn, err := o.getDSN(candidateConfig)
|
||||
if err != nil {
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 生成连接串失败: %v", address, err))
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 生成连接串失败:%v", address, err))
|
||||
continue
|
||||
}
|
||||
db, err := sql.Open(oceanbaseDriverName, dsn)
|
||||
if err != nil {
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 打开失败: %v", address, err))
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 打开失败:%v", address, err))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -357,11 +698,13 @@ func (o *OceanBaseDB) Connect(config connection.ConnectionConfig) error {
|
||||
cancel()
|
||||
if pingErr != nil {
|
||||
_ = db.Close()
|
||||
errorDetails = append(errorDetails, formatOceanBaseAttemptError(address, protocol, pingErr))
|
||||
errorDetails = append(errorDetails, formatOceanBaseMySQLAttemptError(address, pingErr))
|
||||
continue
|
||||
}
|
||||
|
||||
o.bindConnectedDatabase(db, timeout, protocol)
|
||||
o.conn = db
|
||||
o.pingTimeout = timeout
|
||||
o.protocol = oceanBaseProtocolMySQL
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -479,6 +822,9 @@ func (o *OceanBaseDB) GetTriggers(dbName, tableName string) ([]connection.Trigge
|
||||
}
|
||||
|
||||
func (o *OceanBaseDB) ApplyChanges(tableName string, changes connection.ChangeSet) error {
|
||||
// Oracle 协议走 OBClient 路径时,o.oracle.conn 实际上是 mysql wire 的 *sql.DB,
|
||||
// Oracle 风格 SQL(双引号引用 + ROWID)由 OceanBase 服务端按 Oracle 解析器处理,
|
||||
// 但占位符必须是 mysql 风格的 "?",不能用 OracleDB.ApplyChanges 的 ":1" Oracle bind 风格。
|
||||
if o.protocol == oceanBaseProtocolOracle && o.oracle != nil {
|
||||
return o.applyOracleChangesMySQLWire(tableName, changes)
|
||||
}
|
||||
@@ -488,6 +834,9 @@ func (o *OceanBaseDB) ApplyChanges(tableName string, changes connection.ChangeSe
|
||||
return fmt.Errorf("当前 OceanBase %s 协议不支持 ApplyChanges", o.protocol)
|
||||
}
|
||||
|
||||
// applyOracleChangesMySQLWire 在 OceanBase Oracle 租户的 mysql wire 连接上执行
|
||||
// DELETE/UPDATE/INSERT,使用 Oracle 风格双引号引用标识符 + mysql wire 风格 "?" 占位符。
|
||||
// 需要事先确保 sql_mode='ANSI_QUOTES'(由 ensureOceanBaseOracleANSIQuotes 在 DSN 中注入)。
|
||||
func (o *OceanBaseDB) applyOracleChangesMySQLWire(tableName string, changes connection.ChangeSet) error {
|
||||
if o.oracle == nil || o.oracle.conn == nil {
|
||||
return fmt.Errorf("连接未打开")
|
||||
|
||||
@@ -3,10 +3,13 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"GoNavi-Wails/internal/connection"
|
||||
|
||||
@@ -151,74 +154,85 @@ func TestWithoutOceanBaseProtocolParamsStripsDriverMeta(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOceanBaseOracleProtocolUsesMySQLWireConnection(t *testing.T) {
|
||||
// OceanBase Oracle 租户实际通过 OBProxy 暴露的 Oracle 网络协议端口连接(走 go-ora),
|
||||
// 锁定 prepareOceanBaseOracleConfig 把 oceanbase:// URI 的业务参数提升到 ConnectionParams,
|
||||
// 并清理 protocol 关键字,避免泄漏到 OracleDB.getDSN。
|
||||
func TestPrepareOceanBaseOracleConfigPromotesURIParams(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbConn, state := openOracleRecordingDB(t)
|
||||
state.queryResults["SELECT username FROM all_users ORDER BY username"] = oracleRecordingQueryResult{
|
||||
columns: []string{"USERNAME"},
|
||||
rows: [][]driver.Value{{"SYS"}},
|
||||
}
|
||||
config := prepareOceanBaseOracleConfig(connection.ConnectionConfig{
|
||||
Type: "oceanbase",
|
||||
OceanBaseProtocol: "oracle",
|
||||
Host: "127.0.0.1",
|
||||
Port: 60014,
|
||||
User: "SYS@oracle_tenant#cluster",
|
||||
Database: "ORCL",
|
||||
URI: "oceanbase://SYS%40oracle_tenant%23cluster:p@127.0.0.1:60014/ORCL?protocol=oracle&PREFETCH_ROWS=5000",
|
||||
})
|
||||
|
||||
oceanbaseDB := &OceanBaseDB{}
|
||||
oceanbaseDB.bindConnectedDatabase(dbConn, 0, oceanBaseProtocolOracle)
|
||||
|
||||
if oceanbaseDB.oracle == nil {
|
||||
t.Fatal("expected Oracle metadata wrapper for OceanBase Oracle tenant")
|
||||
if config.Type != "oracle" {
|
||||
t.Fatalf("expected Type rewritten to oracle (for OracleDB.Connect), got %q", config.Type)
|
||||
}
|
||||
if oceanbaseDB.conn != nil {
|
||||
t.Fatal("expected MySQLDB connection slot to stay empty for Oracle tenant wrapper")
|
||||
if config.URI != "" {
|
||||
t.Fatalf("expected URI cleared so OracleDB does not try to reparse oceanbase scheme, got %q", config.URI)
|
||||
}
|
||||
if oceanbaseDB.protocol != oceanBaseProtocolOracle {
|
||||
t.Fatalf("expected protocol oracle, got %q", oceanbaseDB.protocol)
|
||||
if strings.Contains(config.ConnectionParams, "protocol=") {
|
||||
t.Fatalf("expected protocol param stripped, got %q", config.ConnectionParams)
|
||||
}
|
||||
|
||||
databases, err := oceanbaseDB.GetDatabases()
|
||||
if err != nil {
|
||||
t.Fatalf("GetDatabases() unexpected error: %v", err)
|
||||
}
|
||||
if len(databases) != 1 || databases[0] != "SYS" {
|
||||
t.Fatalf("GetDatabases() = %#v, want [SYS]", databases)
|
||||
if !strings.Contains(config.ConnectionParams, "PREFETCH_ROWS=5000") {
|
||||
t.Fatalf("expected Oracle business param PREFETCH_ROWS promoted to ConnectionParams, got %q", config.ConnectionParams)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOceanBaseOracleApplyChangesUsesMySQLWirePlaceholders(t *testing.T) {
|
||||
// 验证 go-ora 错误信息按三类常见根因分别给出可操作的诊断提示,
|
||||
// 避免用户在「mysql wire 路径」与「go-ora 路径」之间方向摇摆。
|
||||
func TestAnnotateOceanBaseOracleConnectErrorClassifies(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbConn, state := openOracleRecordingDB(t)
|
||||
oceanbaseDB := &OceanBaseDB{}
|
||||
oceanbaseDB.bindConnectedDatabase(dbConn, 0, oceanBaseProtocolOracle)
|
||||
|
||||
changes := connection.ChangeSet{
|
||||
Updates: []connection.UpdateRow{{
|
||||
Keys: map[string]interface{}{
|
||||
"ID": 7,
|
||||
},
|
||||
Values: map[string]interface{}{
|
||||
"NAME": "new-name",
|
||||
},
|
||||
}},
|
||||
tests := []struct {
|
||||
name string
|
||||
raw error
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "port unreachable",
|
||||
raw: errors.New("dial tcp 172.16.1.155:60014: connect: connection refused"),
|
||||
want: "目标地址未响应",
|
||||
},
|
||||
{
|
||||
name: "non-oracle protocol on port (e.g. mysql wire)",
|
||||
raw: errors.New("TNS: protocol error - got unexpected packet from server"),
|
||||
want: "MySQL 协议端口",
|
||||
},
|
||||
{
|
||||
name: "ora authentication error",
|
||||
raw: errors.New("ORA-01017: invalid username/password; logon denied"),
|
||||
want: "服务名(Service Name)",
|
||||
},
|
||||
{
|
||||
name: "fallback generic wrapping",
|
||||
raw: errors.New("some unexpected go-ora error"),
|
||||
want: "OceanBase Oracle 协议连接失败",
|
||||
},
|
||||
}
|
||||
|
||||
if err := oceanbaseDB.ApplyChanges("APP.USERS", changes); err != nil {
|
||||
t.Fatalf("ApplyChanges() unexpected error: %v", err)
|
||||
}
|
||||
|
||||
queries := state.snapshotExecQueries()
|
||||
if len(queries) != 1 {
|
||||
t.Fatalf("expected one exec query, got %#v", queries)
|
||||
}
|
||||
if strings.Contains(queries[0], ":1") {
|
||||
t.Fatalf("expected MySQL wire placeholder style, got %q", queries[0])
|
||||
}
|
||||
if !strings.Contains(queries[0], `"NAME" = ?`) || !strings.Contains(queries[0], `"ID" = ?`) {
|
||||
t.Fatalf("expected question mark placeholders, got %q", queries[0])
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
got := annotateOceanBaseOracleConnectError(tt.raw)
|
||||
if got == nil {
|
||||
t.Fatal("expected wrapped error, got nil")
|
||||
}
|
||||
if !strings.Contains(got.Error(), tt.want) {
|
||||
t.Fatalf("expected hint to contain %q, got %v", tt.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// OceanBase Oracle 租户用户名形如 SYS@oracle001#cluster_name,密码也可能含 @ 等保留字符。
|
||||
// 锁定 mysql driver ParseDSN 能正确切分 user/password,避免未来重构 buildMySQLCompatibleDSN 时
|
||||
// 误引入 url.QueryEscape 等会破坏认证的"修复"。
|
||||
// 任何 mysql 兼容数据源中含 @/#/: 的复合用户名/密码都依赖 go-sql-driver/mysql ParseDSN
|
||||
// 的特殊切分算法(从右向左找最后一个 @,从左向右找首个 :)。锁定该 invariant 防止未来
|
||||
// 重构 buildMySQLCompatibleDSN 时误加 url.QueryEscape 破坏认证。
|
||||
func TestOceanBaseOracleDSNParsesTenantCredentials(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -274,98 +288,277 @@ func TestOceanBaseOracleDSNParsesTenantCredentials(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureOceanBaseOracleANSIQuotesInjectsSqlMode(t *testing.T) {
|
||||
// buildMySQLHandshakePacket 构造一个最小化的 MySQL initial handshake packet(protocol v10),
|
||||
// 用于 mock OceanBase / 通用 MySQL / OBProxy 各种 server_version 场景。
|
||||
// 实际字段顺序按 MySQL 协议规范:
|
||||
//
|
||||
// 4 字节 header (3 字节 payload length + 1 字节 sequence id)
|
||||
// payload[0] protocol_version (10)
|
||||
// payload[1..N] server_version (null-terminated)
|
||||
// ... (后续字段对协议探测无关,可省略)
|
||||
func buildMySQLHandshakePacket(serverVersion string) []byte {
|
||||
payload := []byte{10}
|
||||
payload = append(payload, []byte(serverVersion)...)
|
||||
payload = append(payload, 0)
|
||||
// 追加几个占位字节,让 packet 看起来更像真实 handshake(探测代码并不解析这些字段)
|
||||
payload = append(payload, []byte{0x01, 0x00, 0x00, 0x00}...)
|
||||
payloadLen := len(payload)
|
||||
header := []byte{byte(payloadLen), byte(payloadLen >> 8), byte(payloadLen >> 16), 0}
|
||||
return append(header, payload...)
|
||||
}
|
||||
|
||||
// startMockHandshakeServer 启动一个本地 TCP server,在 Accept 后立即写入一个 handshake packet,
|
||||
// 然后等待客户端关闭连接。返回 server 地址(host, port)和 cleanup 函数。
|
||||
func startMockHandshakeServer(t *testing.T, packet []byte) (string, int, func()) {
|
||||
t.Helper()
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("listen failed: %v", err)
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_ = conn.SetDeadline(time.Now().Add(2 * time.Second))
|
||||
if packet != nil {
|
||||
_, _ = conn.Write(packet)
|
||||
}
|
||||
// 让客户端有机会读完后主动关闭
|
||||
buf := make([]byte, 16)
|
||||
_, _ = conn.Read(buf)
|
||||
_ = conn.Close()
|
||||
}
|
||||
}()
|
||||
host, portStr, _ := net.SplitHostPort(ln.Addr().String())
|
||||
port, _ := strconv.Atoi(portStr)
|
||||
cleanup := func() {
|
||||
_ = ln.Close()
|
||||
<-done
|
||||
}
|
||||
return host, port, cleanup
|
||||
}
|
||||
|
||||
func TestProbeOceanBaseMySQLWireDetectsOceanBaseHandshake(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
expect string
|
||||
name string
|
||||
serverVersion string
|
||||
wantIsOB bool
|
||||
}{
|
||||
{
|
||||
name: "empty params",
|
||||
input: "",
|
||||
expect: "sql_mode=%27ANSI_QUOTES%27",
|
||||
},
|
||||
{
|
||||
name: "existing params without sql_mode",
|
||||
input: "PREFETCH_ROWS=5000",
|
||||
expect: "sql_mode=%27ANSI_QUOTES%27",
|
||||
},
|
||||
{
|
||||
name: "preserve user sql_mode and append ANSI_QUOTES",
|
||||
input: "sql_mode='STRICT_TRANS_TABLES'",
|
||||
expect: "sql_mode=%27STRICT_TRANS_TABLES%2CANSI_QUOTES%27",
|
||||
},
|
||||
{
|
||||
name: "no-op when user already includes ANSI_QUOTES",
|
||||
input: "sql_mode='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'",
|
||||
expect: "sql_mode=%27ANSI_QUOTES%2CNO_AUTO_VALUE_ON_ZERO%27",
|
||||
},
|
||||
{name: "ob server version", serverVersion: "5.7.25-OceanBase-v4.2.1.0", wantIsOB: true},
|
||||
{name: "obproxy server version", serverVersion: "5.6.25-OBProxy-3.2.0", wantIsOB: true},
|
||||
{name: "community ob suffix", serverVersion: "5.7.25-OB", wantIsOB: true},
|
||||
{name: "regular mysql is not flagged", serverVersion: "8.0.36", wantIsOB: false},
|
||||
{name: "mariadb is not flagged", serverVersion: "10.6.12-MariaDB", wantIsOB: false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
got := ensureOceanBaseOracleANSIQuotes(tt.input)
|
||||
if !strings.Contains(got, tt.expect) {
|
||||
t.Fatalf("ensureOceanBaseOracleANSIQuotes(%q) = %q, want substring %q", tt.input, got, tt.expect)
|
||||
host, port, cleanup := startMockHandshakeServer(t, buildMySQLHandshakePacket(tt.serverVersion))
|
||||
defer cleanup()
|
||||
gotIsOB, probed := probeOceanBaseMySQLWireHandshake(host, port, time.Second)
|
||||
if !probed {
|
||||
t.Fatal("expected probe to succeed against mock server, got probed=false")
|
||||
}
|
||||
if gotIsOB != tt.wantIsOB {
|
||||
t.Fatalf("server_version=%q expected isOB=%v got %v", tt.serverVersion, tt.wantIsOB, gotIsOB)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOceanBaseOracleDSNContainsANSIQuotesSysVar(t *testing.T) {
|
||||
func TestProbeOceanBaseMySQLWireHandshakeReturnsFalseOnUnreachable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// 用一个不可达端口(监听后立即关闭),探测应返回 probed=false 让上层继续走 go-ora 路径
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("listen failed: %v", err)
|
||||
}
|
||||
host, portStr, _ := net.SplitHostPort(ln.Addr().String())
|
||||
port, _ := strconv.Atoi(portStr)
|
||||
_ = ln.Close()
|
||||
|
||||
gotIsOB, probed := probeOceanBaseMySQLWireHandshake(host, port, 200*time.Millisecond)
|
||||
if gotIsOB {
|
||||
t.Fatal("expected unreachable port not flagged as OB")
|
||||
}
|
||||
if probed {
|
||||
t.Fatal("expected probed=false on unreachable port so upper layer falls back to go-ora")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProbeOceanBaseMySQLWireHandshakeIgnoresNonMySQLProtocol(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// 模拟一个 Oracle TNS 端口:返回非 MySQL 协议格式的字节,探测应判定为非 OB MySQL wire
|
||||
host, port, cleanup := startMockHandshakeServer(t, []byte{0x00, 0x20, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00})
|
||||
defer cleanup()
|
||||
|
||||
gotIsOB, probed := probeOceanBaseMySQLWireHandshake(host, port, time.Second)
|
||||
if gotIsOB {
|
||||
t.Fatal("expected non-MySQL packet not flagged as OB")
|
||||
}
|
||||
if !probed {
|
||||
t.Fatal("expected probe to complete the read")
|
||||
}
|
||||
}
|
||||
|
||||
// decodeConnectionAttributesFromConnectionParams 把 connectionAttributes 从 url-encoded 的
|
||||
// ConnectionParams 中取出来并解析成 map,便于测试用解码后的值断言。
|
||||
func decodeConnectionAttributesFromConnectionParams(t *testing.T, raw string) map[string]string {
|
||||
t.Helper()
|
||||
values, err := url.ParseQuery(raw)
|
||||
if err != nil {
|
||||
t.Fatalf("ParseQuery(%q) failed: %v", raw, err)
|
||||
}
|
||||
return parseMySQLConnectionAttributes(values.Get("connectionAttributes"))
|
||||
}
|
||||
|
||||
// ensureOceanBaseOBClientAttributes 必须默认注入 OBClient capability 候选 attribute,
|
||||
// 并且用户在 ConnectionParams 里已设置的 attribute 优先级更高(不被默认值覆盖)。
|
||||
func TestEnsureOceanBaseOBClientAttributesInjectsDefaults(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
attrs := decodeConnectionAttributesFromConnectionParams(t, ensureOceanBaseOBClientAttributes(""))
|
||||
want := map[string]string{
|
||||
"_client_name": "OceanBase Connector/J",
|
||||
"_client_version": "2.4.5",
|
||||
"__ob_client_attribute_capability_flag": "1",
|
||||
"ob_capability_flag": "1",
|
||||
}
|
||||
for k, v := range want {
|
||||
if attrs[k] != v {
|
||||
t.Fatalf("expected default attribute %s=%q, got %q (all=%v)", k, v, attrs[k], attrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureOceanBaseOBClientAttributesPreservesUserOverrides(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
attrs := decodeConnectionAttributesFromConnectionParams(t,
|
||||
ensureOceanBaseOBClientAttributes("connectionAttributes=_client_name:libobclient,_pid:9527"))
|
||||
|
||||
if attrs["_client_name"] != "libobclient" {
|
||||
t.Fatalf("expected user-supplied _client_name preserved, got %q", attrs["_client_name"])
|
||||
}
|
||||
if attrs["_pid"] != "9527" {
|
||||
t.Fatalf("expected user extra attribute _pid preserved, got %q", attrs["_pid"])
|
||||
}
|
||||
// 仍应补齐默认值中用户未提供的部分
|
||||
if attrs["ob_capability_flag"] != "1" {
|
||||
t.Fatalf("expected default ob_capability_flag still injected when user did not set it, got %q (all=%v)", attrs["ob_capability_flag"], attrs)
|
||||
}
|
||||
}
|
||||
|
||||
// 锁定 Oracle 协议路径下,OBClient capability attribute 会被注入到生成的 mysql DSN 中。
|
||||
func TestOceanBaseOracleOBClientDSNCarriesCapabilityAttributes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfg := connection.ConnectionConfig{
|
||||
Type: "oceanbase",
|
||||
Host: "127.0.0.1",
|
||||
Port: 2881,
|
||||
User: "SYS@oracle001#cluster",
|
||||
Password: "p@ss",
|
||||
Database: "ORCL",
|
||||
OceanBaseProtocol: "oracle",
|
||||
Type: "oceanbase",
|
||||
Host: "127.0.0.1",
|
||||
Port: 2881,
|
||||
User: "SYS@oracle_tenant#cluster",
|
||||
Password: "x",
|
||||
Database: "ORCL",
|
||||
}
|
||||
cfg.ConnectionParams = ensureOceanBaseOBClientAttributes(cfg.ConnectionParams)
|
||||
cfg.ConnectionParams = ensureOceanBaseOracleANSIQuotes(cfg.ConnectionParams)
|
||||
ob := &OceanBaseDB{}
|
||||
dsn, err := ob.getDSN(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("getDSN error: %v", err)
|
||||
}
|
||||
parsed, err := mysqlDriver.ParseDSN(dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("ParseDSN error: %v", err)
|
||||
}
|
||||
if !strings.Contains(parsed.ConnectionAttributes, "_client_name:OceanBase Connector/J") {
|
||||
t.Fatalf("expected default _client_name in DSN, got %q", parsed.ConnectionAttributes)
|
||||
}
|
||||
if !strings.Contains(parsed.ConnectionAttributes, "ob_capability_flag:1") {
|
||||
t.Fatalf("expected default ob_capability_flag in DSN, got %q", parsed.ConnectionAttributes)
|
||||
}
|
||||
if !strings.Contains(dsn, "sql_mode=%27ANSI_QUOTES%27") {
|
||||
t.Fatalf("expected DSN to carry sql_mode='ANSI_QUOTES', got %q", dsn)
|
||||
t.Fatalf("expected ANSI_QUOTES sys var in DSN, got %q", dsn)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOceanBaseOracleApplyChangesFailsLoudOnColumnMetadataError(t *testing.T) {
|
||||
// OBClient 路径写操作仍然使用 mysql wire 风格 "?" 占位符 + Oracle 风格双引号引用标识符。
|
||||
// 注意 bindConnectedDatabase 直接绑 OracleDB wrapper(OracleDB.conn 实际是 mysql wire conn),
|
||||
// ApplyChanges 会走 applyOracleChangesMySQLWire。
|
||||
func TestOceanBaseOracleOBClientApplyChangesUsesMySQLWirePlaceholders(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbConn, state := openOracleRecordingDB(t)
|
||||
state.queryError = errors.New("ORA-00942: table or view does not exist")
|
||||
|
||||
oceanbaseDB := &OceanBaseDB{}
|
||||
oceanbaseDB.bindConnectedDatabase(dbConn, 0, oceanBaseProtocolOracle)
|
||||
|
||||
changes := connection.ChangeSet{
|
||||
Updates: []connection.UpdateRow{{
|
||||
Keys: map[string]interface{}{"ID": 7},
|
||||
Values: map[string]interface{}{"NAME": "x"},
|
||||
Values: map[string]interface{}{"NAME": "new-name"},
|
||||
}},
|
||||
}
|
||||
|
||||
err := oceanbaseDB.ApplyChanges("APP.USERS", changes)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when column metadata load fails, got nil")
|
||||
if err := oceanbaseDB.ApplyChanges("APP.USERS", changes); err != nil {
|
||||
t.Fatalf("ApplyChanges() unexpected error: %v", err)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "加载列元数据失败") {
|
||||
t.Fatalf("expected error message to mention column metadata, got %v", err)
|
||||
|
||||
queries := state.snapshotExecQueries()
|
||||
if len(queries) != 1 {
|
||||
t.Fatalf("expected one exec query, got %#v", queries)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "ORA-00942") {
|
||||
t.Fatalf("expected error to wrap underlying ORA-00942, got %v", err)
|
||||
if strings.Contains(queries[0], ":1") {
|
||||
t.Fatalf("expected MySQL wire placeholder style, got %q", queries[0])
|
||||
}
|
||||
if !strings.Contains(queries[0], `"NAME" = ?`) || !strings.Contains(queries[0], `"ID" = ?`) {
|
||||
t.Fatalf("expected question mark placeholders + double-quoted identifiers, got %q", queries[0])
|
||||
}
|
||||
}
|
||||
|
||||
// 用户通过 ConnectionParams 设置 connectionAttributes 时,OceanBase MySQL wire 路径必须把
|
||||
// 这些 attribute 透传到 go-sql-driver/mysql DSN,让 driver 在握手响应里发 CLIENT_CONNECT_ATTRS。
|
||||
// 这是 OBClient 协议握手探索的入口:高级用户/DBA 可以试错不同 attribute 组合而不需要改 GoNavi 代码。
|
||||
func TestOceanBaseMySQLDSNPassesThroughConnectionAttributes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfg := connection.ConnectionConfig{
|
||||
Type: "oceanbase",
|
||||
Host: "127.0.0.1",
|
||||
Port: 2881,
|
||||
User: "root@mysql_tenant",
|
||||
Password: "root",
|
||||
Database: "test",
|
||||
ConnectionParams: "connectionAttributes=_client_name:OceanBase Connector/J,_client_version:2.4.5",
|
||||
}
|
||||
ob := &OceanBaseDB{}
|
||||
dsn, err := ob.getDSN(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("getDSN error: %v", err)
|
||||
}
|
||||
parsed, err := mysqlDriver.ParseDSN(dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("mysql ParseDSN failed: %v", err)
|
||||
}
|
||||
if !strings.Contains(parsed.ConnectionAttributes, "_client_name:OceanBase Connector/J") {
|
||||
t.Fatalf("expected _client_name attribute in DSN, got %q", parsed.ConnectionAttributes)
|
||||
}
|
||||
if !strings.Contains(parsed.ConnectionAttributes, "_client_version:2.4.5") {
|
||||
t.Fatalf("expected _client_version attribute in DSN, got %q", parsed.ConnectionAttributes)
|
||||
}
|
||||
}
|
||||
|
||||
// 当用户错选 MySQL 协议但租户实际是 Oracle 模式时,OceanBase 服务端返回 Error 1235,
|
||||
// 我们必须在错误消息里明确指引用户切换协议,避免方向摇摆。
|
||||
func TestFormatOceanBaseMySQLAttemptErrorHintsOracleProtocol(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -376,4 +569,8 @@ func TestFormatOceanBaseMySQLAttemptErrorHintsOracleProtocol(t *testing.T) {
|
||||
if !strings.Contains(got, "切换为 Oracle") {
|
||||
t.Fatalf("expected Oracle protocol hint, got %q", got)
|
||||
}
|
||||
if !strings.Contains(got, "OBProxy") {
|
||||
t.Fatalf("expected hint to mention OBProxy Oracle protocol port, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user