mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-05-22 17:00:21 +08:00
✨ feat(starrocks): 新增 StarRocks 数据源与高级对象能力
- 后端接入:新增独立 starrocks 可选驱动,复用 MySQL wire 协议并支持默认 9030 端口 - 驱动管理:补齐 manifest、build tag、revision、driver-agent provider 和构建脚本 - 前端接入:新增 StarRocks 连接类型、图标、能力矩阵、URI 解析、保存回显和 SQL 自动 LIMIT - 方言增强:新增 StarRocks 类型、关键字、函数补全和专属建表 SQL 生成 - 高级对象:支持物化视图对象浏览、Rollup 模板、外部 Catalog 模板和高级表设计器参数 - CI 发布:将 StarRocks driver-agent 纳入 dev/release 构建与 release 资产校验
This commit is contained in:
@@ -20,7 +20,7 @@ func normalizeRunConfig(config connection.ConnectionConfig, dbName string) conne
|
||||
if !isOceanBaseOracleProtocol(config) {
|
||||
runConfig.Database = name
|
||||
}
|
||||
case "mysql", "mariadb", "diros", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "mongodb", "tdengine", "clickhouse":
|
||||
case "mysql", "mariadb", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "mongodb", "tdengine", "clickhouse":
|
||||
// 这些类型的 dbName 表示"数据库",需要写入连接配置以选择目标库。
|
||||
runConfig.Database = name
|
||||
case "dameng":
|
||||
|
||||
@@ -77,6 +77,19 @@ func TestNormalizeRunConfig_OceanBaseOracleKeepsServiceName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeRunConfig_StarRocksUsesDatabaseFromTree(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
runConfig := normalizeRunConfig(connection.ConnectionConfig{
|
||||
Type: "starrocks",
|
||||
Database: "default_cluster",
|
||||
}, "analytics")
|
||||
|
||||
if runConfig.Database != "analytics" {
|
||||
t.Fatalf("expected StarRocks database from tree, got %q", runConfig.Database)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeSchemaAndTable_OceanBaseOracleUsesSchemaFromDatabaseTree(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
@@ -207,6 +207,8 @@ func defaultPortByType(driverType string) int {
|
||||
return 2881
|
||||
case "diros":
|
||||
return 9030
|
||||
case "starrocks":
|
||||
return 9030
|
||||
case "sphinx":
|
||||
return 9306
|
||||
case "postgres", "vastbase", "opengauss":
|
||||
|
||||
@@ -131,6 +131,8 @@ func (a *App) CreateDatabase(config connection.ConnectionConfig, dbName string)
|
||||
query = fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", quoteIdentByType(dbType, dbName))
|
||||
} else if dbType == "clickhouse" {
|
||||
query = fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", quoteIdentByType(dbType, dbName))
|
||||
} else if dbType == "starrocks" {
|
||||
query = fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", quoteIdentByType(dbType, dbName))
|
||||
} else if dbType == "mariadb" || dbType == "diros" || dbType == "oceanbase" {
|
||||
// MariaDB uses same syntax as MySQL
|
||||
} else if dbType == "sphinx" {
|
||||
@@ -184,6 +186,8 @@ func resolveDDLDBType(config connection.ConnectionConfig) string {
|
||||
return "sqlserver"
|
||||
case "diros", "doris":
|
||||
return "diros"
|
||||
case "starrocks":
|
||||
return "starrocks"
|
||||
case "kingbase", "kingbase8", "kingbasees", "kingbasev8":
|
||||
return "kingbase"
|
||||
case "highgo":
|
||||
@@ -213,6 +217,8 @@ func resolveDDLDBType(config connection.ConnectionConfig) string {
|
||||
return "sqlserver"
|
||||
case strings.Contains(driver, "diros"), strings.Contains(driver, "doris"):
|
||||
return "diros"
|
||||
case strings.Contains(driver, "starrocks"):
|
||||
return "starrocks"
|
||||
case strings.Contains(driver, "oceanbase"):
|
||||
return "oceanbase"
|
||||
default:
|
||||
@@ -277,7 +283,7 @@ func buildRunConfigForDDL(config connection.ConnectionConfig, dbType string, dbN
|
||||
if strings.EqualFold(strings.TrimSpace(config.Type), "custom") {
|
||||
// custom 连接的 dbName 语义依赖 driver,尽量在常见驱动上对齐内置类型行为。
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "dameng", "sqlserver", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "dameng", "sqlserver", "clickhouse":
|
||||
if strings.TrimSpace(dbName) != "" {
|
||||
runConfig.Database = strings.TrimSpace(dbName)
|
||||
}
|
||||
@@ -312,8 +318,8 @@ func (a *App) RenameDatabase(config connection.ConnectionConfig, oldName string,
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
return connection.QueryResult{Success: true, Message: "数据库重命名成功"}
|
||||
case "mysql", "mariadb", "oceanbase", "sphinx":
|
||||
return connection.QueryResult{Success: false, Message: "MySQL/MariaDB/OceanBase/Sphinx 不支持直接重命名数据库,请新建库后迁移数据"}
|
||||
case "mysql", "mariadb", "oceanbase", "starrocks", "sphinx":
|
||||
return connection.QueryResult{Success: false, Message: "MySQL/MariaDB/OceanBase/StarRocks/Sphinx 不支持直接重命名数据库,请新建库后迁移数据"}
|
||||
case "postgres", "kingbase", "highgo", "vastbase", "opengauss":
|
||||
if strings.EqualFold(strings.TrimSpace(config.Database), oldName) {
|
||||
return connection.QueryResult{Success: false, Message: "当前连接正在使用目标数据库,请先连接到其他数据库后再重命名"}
|
||||
@@ -345,7 +351,7 @@ func (a *App) DropDatabase(config connection.ConnectionConfig, dbName string) co
|
||||
sql string
|
||||
)
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "tdengine", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "tdengine", "clickhouse":
|
||||
runConfig = config
|
||||
runConfig.Database = ""
|
||||
sql = fmt.Sprintf("DROP DATABASE %s", quoteIdentByType(dbType, dbName))
|
||||
@@ -384,7 +390,7 @@ func (a *App) RenameTable(config connection.ConnectionConfig, dbName string, old
|
||||
|
||||
dbType := resolveDDLDBType(config)
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "postgres", "kingbase", "sqlite", "duckdb", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "sqlite", "duckdb", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "clickhouse":
|
||||
default:
|
||||
return connection.QueryResult{Success: false, Message: fmt.Sprintf("当前数据源(%s)暂不支持重命名表", dbType)}
|
||||
}
|
||||
@@ -398,7 +404,7 @@ func (a *App) RenameTable(config connection.ConnectionConfig, dbName string, old
|
||||
|
||||
var sql string
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "clickhouse":
|
||||
newQualifiedTable := quoteTableIdentByType(dbType, schemaName, newTableName)
|
||||
sql = fmt.Sprintf("RENAME TABLE %s TO %s", oldQualifiedTable, newQualifiedTable)
|
||||
case "sqlserver":
|
||||
@@ -430,7 +436,7 @@ func (a *App) DropTable(config connection.ConnectionConfig, dbName string, table
|
||||
|
||||
dbType := resolveDDLDBType(config)
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "postgres", "kingbase", "sqlite", "duckdb", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "tdengine", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "sqlite", "duckdb", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "tdengine", "clickhouse":
|
||||
default:
|
||||
return connection.QueryResult{Success: false, Message: fmt.Sprintf("当前数据源(%s)暂不支持删除表", dbType)}
|
||||
}
|
||||
@@ -1047,7 +1053,7 @@ func supportsCreateStatementFallback(dbType string) bool {
|
||||
|
||||
func supportsViewCreateStatementLookup(dbType string) bool {
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "oracle", "dameng", "sqlite", "duckdb", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "oracle", "dameng", "sqlite", "duckdb", "clickhouse":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
@@ -1246,7 +1252,7 @@ func (a *App) DropView(config connection.ConnectionConfig, dbName string, viewNa
|
||||
|
||||
dbType := resolveDDLDBType(config)
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "postgres", "kingbase", "sqlite", "duckdb", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "sqlite", "duckdb", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "clickhouse":
|
||||
default:
|
||||
return connection.QueryResult{Success: false, Message: fmt.Sprintf("当前数据源(%s)暂不支持删除视图", dbType)}
|
||||
}
|
||||
@@ -1281,7 +1287,7 @@ func (a *App) DropFunction(config connection.ConnectionConfig, dbName string, ro
|
||||
|
||||
dbType := resolveDDLDBType(config)
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "postgres", "kingbase", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "duckdb":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "oracle", "dameng", "highgo", "vastbase", "opengauss", "sqlserver", "duckdb":
|
||||
default:
|
||||
return connection.QueryResult{Success: false, Message: fmt.Sprintf("当前数据源(%s)暂不支持删除函数/存储过程", dbType)}
|
||||
}
|
||||
@@ -1335,7 +1341,7 @@ func (a *App) RenameView(config connection.ConnectionConfig, dbName string, oldN
|
||||
|
||||
var sql string
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "clickhouse":
|
||||
newQualified := quoteTableIdentByType(dbType, schemaName, newName)
|
||||
sql = fmt.Sprintf("RENAME TABLE %s TO %s", oldQualified, newQualified)
|
||||
case "postgres", "kingbase", "highgo", "vastbase", "opengauss":
|
||||
|
||||
@@ -336,6 +336,7 @@ const builtinDriverManifestJSON = `{
|
||||
"mariadb": { "engine": "go", "version": "1.9.3", "checksumPolicy": "off", "downloadUrl": "builtin://activate/mariadb" },
|
||||
"oceanbase": { "engine": "go", "version": "1.9.3", "checksumPolicy": "off", "downloadUrl": "builtin://activate/oceanbase" },
|
||||
"doris": { "engine": "go", "version": "1.9.3", "checksumPolicy": "off", "downloadUrl": "builtin://activate/doris" },
|
||||
"starrocks": { "engine": "go", "version": "1.9.3", "checksumPolicy": "off", "downloadUrl": "builtin://activate/starrocks" },
|
||||
"sphinx": { "engine": "go", "version": "1.9.3", "checksumPolicy": "off", "downloadUrl": "builtin://activate/sphinx" },
|
||||
"sqlserver": { "engine": "go", "version": "1.9.6", "checksumPolicy": "off", "downloadUrl": "builtin://activate/sqlserver" },
|
||||
"sqlite": { "engine": "go", "version": "1.44.3", "checksumPolicy": "off", "downloadUrl": "builtin://activate/sqlite" },
|
||||
@@ -391,6 +392,7 @@ var latestDriverVersionMap = map[string]string{
|
||||
"mariadb": "1.9.3",
|
||||
"oceanbase": "1.9.3",
|
||||
"diros": "1.9.3",
|
||||
"starrocks": "1.9.3",
|
||||
"sphinx": "1.9.3",
|
||||
"sqlserver": "1.9.6",
|
||||
"sqlite": "1.46.1",
|
||||
@@ -412,6 +414,7 @@ var driverGoModulePathMap = map[string]string{
|
||||
"mariadb": "github.com/go-sql-driver/mysql",
|
||||
"oceanbase": "github.com/go-sql-driver/mysql",
|
||||
"diros": "github.com/go-sql-driver/mysql",
|
||||
"starrocks": "github.com/go-sql-driver/mysql",
|
||||
"sphinx": "github.com/go-sql-driver/mysql",
|
||||
"sqlserver": "github.com/microsoft/go-mssqldb",
|
||||
"sqlite": "modernc.org/sqlite",
|
||||
@@ -1472,6 +1475,7 @@ func allDriverDefinitionsWithPackages(packages map[string]pinnedDriverPackage) [
|
||||
buildOptionalGoDriverDefinition("mariadb", "MariaDB", packages),
|
||||
buildOptionalGoDriverDefinition("oceanbase", "OceanBase", packages),
|
||||
buildOptionalGoDriverDefinition("diros", "Doris", packages),
|
||||
buildOptionalGoDriverDefinition("starrocks", "StarRocks", packages),
|
||||
buildOptionalGoDriverDefinition("sphinx", "Sphinx", packages),
|
||||
buildOptionalGoDriverDefinition("sqlserver", "SQL Server", packages),
|
||||
buildOptionalGoDriverDefinition("sqlite", "SQLite", packages),
|
||||
@@ -3780,6 +3784,8 @@ func optionalDriverBuildTag(driverType string, selectedVersion string) (string,
|
||||
return "gonavi_oceanbase_driver", nil
|
||||
case "diros":
|
||||
return "gonavi_diros_driver", nil
|
||||
case "starrocks":
|
||||
return "gonavi_starrocks_driver", nil
|
||||
case "sphinx":
|
||||
return "gonavi_sphinx_driver", nil
|
||||
case "sqlserver":
|
||||
|
||||
@@ -128,6 +128,7 @@ func optionalDriverAgentRevisionTestDrivers(t *testing.T) []string {
|
||||
"mariadb",
|
||||
"oceanbase",
|
||||
"diros",
|
||||
"starrocks",
|
||||
"sphinx",
|
||||
"sqlserver",
|
||||
"sqlite",
|
||||
|
||||
@@ -1251,7 +1251,7 @@ const (
|
||||
|
||||
func supportsTruncateTableForDBType(dbType string) bool {
|
||||
switch strings.ToLower(strings.TrimSpace(dbType)) {
|
||||
case "mysql", "mariadb", "oceanbase", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "oracle", "dameng", "clickhouse", "duckdb":
|
||||
case "mysql", "mariadb", "oceanbase", "starrocks", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "oracle", "dameng", "clickhouse", "duckdb":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
@@ -1386,7 +1386,7 @@ func quoteIdentByType(dbType string, ident string) string {
|
||||
|
||||
dbType = resolveDDLDBType(connection.ConnectionConfig{Type: dbType})
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "tdengine", "clickhouse":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "tdengine", "clickhouse":
|
||||
return "`" + strings.ReplaceAll(ident, "`", "``") + "`"
|
||||
case "kingbase":
|
||||
return db.QuoteKingbaseIdentifier(ident)
|
||||
@@ -1608,7 +1608,7 @@ func buildListViewQueries(config connection.ConnectionConfig, dbName string) []s
|
||||
dbType := resolveDDLDBType(config)
|
||||
escapedDbName := escapeSQLLiteral(dbName)
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx":
|
||||
queries := []string{
|
||||
fmt.Sprintf(`SELECT TABLE_SCHEMA AS schema_name, TABLE_NAME AS object_name, TABLE_TYPE AS table_type FROM information_schema.tables WHERE TABLE_TYPE='VIEW' AND TABLE_SCHEMA='%s' ORDER BY TABLE_NAME`, escapedDbName),
|
||||
}
|
||||
@@ -1711,7 +1711,7 @@ func buildViewCreateQueries(config connection.ConnectionConfig, dbName, schemaNa
|
||||
escapedDB := escapeSQLLiteral(dbName)
|
||||
|
||||
switch dbType {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx":
|
||||
if safeSchema == "" {
|
||||
safeSchema = strings.TrimSpace(dbName)
|
||||
}
|
||||
@@ -1991,7 +1991,7 @@ func formatSQLValue(dbType string, v interface{}) string {
|
||||
return "'" + val.Format("2006-01-02 15:04:05") + "'"
|
||||
case string:
|
||||
normalizedType := strings.ToLower(strings.TrimSpace(dbType))
|
||||
if (normalizedType == "mysql" || normalizedType == "oceanbase" || normalizedType == "diros") && isMySQLHexLiteral(val) {
|
||||
if (normalizedType == "mysql" || normalizedType == "oceanbase" || normalizedType == "diros" || normalizedType == "starrocks") && isMySQLHexLiteral(val) {
|
||||
return val
|
||||
}
|
||||
escaped := strings.ReplaceAll(val, "'", "''")
|
||||
|
||||
@@ -6,6 +6,7 @@ func registerOptionalDatabaseFactories() {
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("mariadb"), "mariadb")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("oceanbase"), "oceanbase")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("diros"), "diros", "doris")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("starrocks"), "starrocks")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("sphinx"), "sphinx")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("sqlserver"), "sqlserver")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("sqlite"), "sqlite")
|
||||
|
||||
@@ -6,6 +6,7 @@ func registerOptionalDatabaseFactories() {
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("mariadb"), "mariadb")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("oceanbase"), "oceanbase")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("diros"), "diros", "doris")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("starrocks"), "starrocks")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("sphinx"), "sphinx")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("sqlserver"), "sqlserver")
|
||||
registerDatabaseFactory(newOptionalDriverAgentDatabase("sqlite"), "sqlite")
|
||||
|
||||
@@ -7,6 +7,7 @@ func init() {
|
||||
"mariadb": "src-4e1ec648c70c87ea",
|
||||
"oceanbase": "src-8e445fc4899d850f",
|
||||
"diros": "src-74927b3809258666",
|
||||
"starrocks": "src-3b5aad8a32f79b61",
|
||||
"sphinx": "src-269bd60a34df47d3",
|
||||
"sqlserver": "src-84553484c72e7253",
|
||||
"sqlite": "src-762863d48f653b89",
|
||||
|
||||
@@ -24,6 +24,7 @@ var optionalGoDrivers = map[string]struct{}{
|
||||
"mariadb": {},
|
||||
"oceanbase": {},
|
||||
"diros": {},
|
||||
"starrocks": {},
|
||||
"sphinx": {},
|
||||
"sqlserver": {},
|
||||
"sqlite": {},
|
||||
@@ -78,6 +79,8 @@ func driverDisplayName(driverType string) string {
|
||||
return "OceanBase"
|
||||
case "diros":
|
||||
return "Doris"
|
||||
case "starrocks":
|
||||
return "StarRocks"
|
||||
case "sphinx":
|
||||
return "Sphinx"
|
||||
case "postgres":
|
||||
|
||||
@@ -113,7 +113,7 @@ func TestNewCompatibleDriversAreOptionalAgentDrivers(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
SetExternalDriverDownloadDirectory(tmpDir)
|
||||
|
||||
for _, driverType := range []string{"oceanbase", "opengauss", "open_gauss"} {
|
||||
for _, driverType := range []string{"oceanbase", "opengauss", "open_gauss", "starrocks"} {
|
||||
if IsBuiltinDriver(driverType) {
|
||||
t.Fatalf("%s 不应是免安装内置驱动", driverType)
|
||||
}
|
||||
|
||||
210
internal/db/starrocks_impl.go
Normal file
210
internal/db/starrocks_impl.go
Normal file
@@ -0,0 +1,210 @@
|
||||
//go:build gonavi_full_drivers || gonavi_starrocks_driver
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"GoNavi-Wails/internal/connection"
|
||||
"GoNavi-Wails/internal/ssh"
|
||||
"GoNavi-Wails/internal/utils"
|
||||
|
||||
mysqlDriver "github.com/go-sql-driver/mysql"
|
||||
)
|
||||
|
||||
const (
|
||||
starRocksDriverName = "starrocks"
|
||||
defaultStarRocksPort = 9030
|
||||
)
|
||||
|
||||
// StarRocksDB 使用独立 driver 名称接入,底层协议兼容 MySQL。
|
||||
type StarRocksDB struct {
|
||||
MySQLDB
|
||||
}
|
||||
|
||||
func init() {
|
||||
for _, name := range sql.Drivers() {
|
||||
if name == starRocksDriverName {
|
||||
return
|
||||
}
|
||||
}
|
||||
sql.Register(starRocksDriverName, &mysqlDriver.MySQLDriver{})
|
||||
}
|
||||
|
||||
func applyStarRocksURI(config connection.ConnectionConfig) connection.ConnectionConfig {
|
||||
uriText := strings.TrimSpace(config.URI)
|
||||
if uriText == "" {
|
||||
return config
|
||||
}
|
||||
|
||||
parsed, ok := parseMySQLCompatibleURI(uriText, "starrocks", "mysql")
|
||||
if !ok {
|
||||
return config
|
||||
}
|
||||
|
||||
if parsed.User != nil {
|
||||
if config.User == "" {
|
||||
config.User = parsed.User.Username()
|
||||
}
|
||||
if pass, ok := parsed.User.Password(); ok && config.Password == "" {
|
||||
config.Password = pass
|
||||
}
|
||||
}
|
||||
|
||||
if dbName := strings.TrimPrefix(parsed.Path, "/"); dbName != "" && config.Database == "" {
|
||||
config.Database = dbName
|
||||
}
|
||||
|
||||
defaultPort := config.Port
|
||||
if defaultPort <= 0 {
|
||||
defaultPort = defaultStarRocksPort
|
||||
}
|
||||
|
||||
hostsFromURI := make([]string, 0, 4)
|
||||
hostText := strings.TrimSpace(parsed.Host)
|
||||
if hostText != "" {
|
||||
for _, entry := range strings.Split(hostText, ",") {
|
||||
host, port, ok := parseHostPortWithDefault(entry, defaultPort)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
hostsFromURI = append(hostsFromURI, normalizeMySQLAddress(host, port))
|
||||
}
|
||||
}
|
||||
|
||||
if len(config.Hosts) == 0 && len(hostsFromURI) > 0 {
|
||||
config.Hosts = hostsFromURI
|
||||
}
|
||||
if strings.TrimSpace(config.Host) == "" && len(hostsFromURI) > 0 {
|
||||
host, port, ok := parseHostPortWithDefault(hostsFromURI[0], defaultPort)
|
||||
if ok {
|
||||
config.Host = host
|
||||
config.Port = port
|
||||
}
|
||||
}
|
||||
|
||||
if config.Topology == "" {
|
||||
topology := strings.TrimSpace(parsed.Query().Get("topology"))
|
||||
if topology != "" {
|
||||
config.Topology = strings.ToLower(topology)
|
||||
}
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
func collectStarRocksAddresses(config connection.ConnectionConfig) []string {
|
||||
defaultPort := config.Port
|
||||
if defaultPort <= 0 {
|
||||
defaultPort = defaultStarRocksPort
|
||||
}
|
||||
|
||||
candidates := make([]string, 0, len(config.Hosts)+1)
|
||||
if len(config.Hosts) > 0 {
|
||||
candidates = append(candidates, config.Hosts...)
|
||||
} else {
|
||||
candidates = append(candidates, normalizeMySQLAddress(config.Host, defaultPort))
|
||||
}
|
||||
|
||||
result := make([]string, 0, len(candidates))
|
||||
seen := make(map[string]struct{}, len(candidates))
|
||||
for _, entry := range candidates {
|
||||
host, port, ok := parseHostPortWithDefault(entry, defaultPort)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
normalized := normalizeMySQLAddress(host, port)
|
||||
if _, exists := seen[normalized]; exists {
|
||||
continue
|
||||
}
|
||||
seen[normalized] = struct{}{}
|
||||
result = append(result, normalized)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *StarRocksDB) getDSN(config connection.ConnectionConfig) (string, error) {
|
||||
database := config.Database
|
||||
protocol := "tcp"
|
||||
address := normalizeMySQLAddress(config.Host, config.Port)
|
||||
|
||||
if config.UseSSH {
|
||||
netName, err := ssh.RegisterSSHNetwork(config.SSH)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("创建 SSH 隧道失败:%w", err)
|
||||
}
|
||||
protocol = netName
|
||||
}
|
||||
|
||||
return buildMySQLCompatibleDSN(config, protocol, address, database), nil
|
||||
}
|
||||
|
||||
func resolveStarRocksCredential(config connection.ConnectionConfig, addressIndex int) (string, string) {
|
||||
primaryUser := strings.TrimSpace(config.User)
|
||||
primaryPassword := config.Password
|
||||
replicaUser := strings.TrimSpace(config.MySQLReplicaUser)
|
||||
replicaPassword := config.MySQLReplicaPassword
|
||||
|
||||
if addressIndex > 0 && replicaUser != "" {
|
||||
return replicaUser, replicaPassword
|
||||
}
|
||||
|
||||
if primaryUser == "" && replicaUser != "" {
|
||||
return replicaUser, replicaPassword
|
||||
}
|
||||
|
||||
return config.User, primaryPassword
|
||||
}
|
||||
|
||||
func (s *StarRocksDB) Connect(config connection.ConnectionConfig) error {
|
||||
runConfig := applyStarRocksURI(config)
|
||||
addresses := collectStarRocksAddresses(runConfig)
|
||||
if len(addresses) == 0 {
|
||||
return fmt.Errorf("连接建立后验证失败:未找到可用的 StarRocks 地址")
|
||||
}
|
||||
|
||||
var errorDetails []string
|
||||
for index, address := range addresses {
|
||||
candidateConfig := runConfig
|
||||
host, port, ok := parseHostPortWithDefault(address, defaultStarRocksPort)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
candidateConfig.Host = host
|
||||
candidateConfig.Port = port
|
||||
candidateConfig.User, candidateConfig.Password = resolveStarRocksCredential(runConfig, index)
|
||||
|
||||
dsn, err := s.getDSN(candidateConfig)
|
||||
if err != nil {
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 生成连接串失败: %v", address, err))
|
||||
continue
|
||||
}
|
||||
db, err := sql.Open(starRocksDriverName, dsn)
|
||||
if err != nil {
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 打开失败: %v", address, err))
|
||||
continue
|
||||
}
|
||||
|
||||
timeout := getConnectTimeout(candidateConfig)
|
||||
ctx, cancel := utils.ContextWithTimeout(timeout)
|
||||
pingErr := db.PingContext(ctx)
|
||||
cancel()
|
||||
if pingErr != nil {
|
||||
_ = db.Close()
|
||||
errorDetails = append(errorDetails, fmt.Sprintf("%s 验证失败: %v", address, pingErr))
|
||||
continue
|
||||
}
|
||||
|
||||
s.conn = db
|
||||
s.pingTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(errorDetails) == 0 {
|
||||
return fmt.Errorf("连接建立后验证失败:未找到可用的 StarRocks 地址")
|
||||
}
|
||||
return fmt.Errorf("连接建立后验证失败:%s", strings.Join(errorDetails, ";"))
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func classifyMigrationDataModel(dbType string) MigrationDataModel {
|
||||
return MigrationDataModelRelational
|
||||
case "mongodb":
|
||||
return MigrationDataModelDocument
|
||||
case "clickhouse", "diros", "sphinx":
|
||||
case "clickhouse", "diros", "starrocks", "sphinx":
|
||||
return MigrationDataModelColumnar
|
||||
case "tdengine":
|
||||
return MigrationDataModelTimeSeries
|
||||
|
||||
@@ -15,6 +15,7 @@ func TestClassifyMigrationDataModel(t *testing.T) {
|
||||
"kingbase": MigrationDataModelRelational,
|
||||
"mongodb": MigrationDataModelDocument,
|
||||
"clickhouse": MigrationDataModelColumnar,
|
||||
"starrocks": MigrationDataModelColumnar,
|
||||
"tdengine": MigrationDataModelTimeSeries,
|
||||
"redis": MigrationDataModelKeyValue,
|
||||
"custom": MigrationDataModelCustom,
|
||||
|
||||
@@ -45,6 +45,8 @@ func resolveMigrationDBType(config connection.ConnectionConfig) string {
|
||||
return "sphinx"
|
||||
case "diros", "doris":
|
||||
return "diros"
|
||||
case "starrocks":
|
||||
return "starrocks"
|
||||
case "kingbase", "kingbase8", "kingbasees", "kingbasev8":
|
||||
return "kingbase"
|
||||
case "highgo":
|
||||
@@ -76,6 +78,8 @@ func resolveMigrationDBType(config connection.ConnectionConfig) string {
|
||||
return "sphinx"
|
||||
case strings.Contains(driver, "diros"), strings.Contains(driver, "doris"):
|
||||
return "diros"
|
||||
case strings.Contains(driver, "starrocks"):
|
||||
return "starrocks"
|
||||
case strings.Contains(driver, "maria"):
|
||||
return "mariadb"
|
||||
case strings.Contains(driver, "oceanbase"):
|
||||
@@ -91,7 +95,7 @@ func resolveMigrationDBType(config connection.ConnectionConfig) string {
|
||||
|
||||
func isMySQLCoreType(dbType string) bool {
|
||||
switch normalizeMigrationDBType(dbType) {
|
||||
case "mysql", "mariadb", "oceanbase", "diros":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
||||
@@ -26,7 +26,7 @@ func quoteIdentByType(dbType string, ident string) string {
|
||||
}
|
||||
|
||||
switch normalizeMigrationDBType(dbType) {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "clickhouse", "tdengine":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "clickhouse", "tdengine":
|
||||
return "`" + strings.ReplaceAll(ident, "`", "``") + "`"
|
||||
case "kingbase":
|
||||
return db.QuoteKingbaseIdentifier(ident)
|
||||
@@ -140,7 +140,7 @@ func qualifiedNameForQuery(dbType string, schema string, table string, original
|
||||
return raw
|
||||
}
|
||||
return s + "." + table
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "sphinx", "clickhouse", "tdengine":
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "clickhouse", "tdengine":
|
||||
s := strings.TrimSpace(schema)
|
||||
if s == "" || table == "" {
|
||||
return table
|
||||
|
||||
Reference in New Issue
Block a user