diff --git a/build-driver-agents.sh b/build-driver-agents.sh index d975e1e..729a719 100755 --- a/build-driver-agents.sh +++ b/build-driver-agents.sh @@ -5,7 +5,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" -DEFAULT_DRIVERS=(mariadb oceanbase doris starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine clickhouse elasticsearch) +DEFAULT_DRIVERS=(mariadb oceanbase doris starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine iotdb clickhouse elasticsearch) DEFAULT_PLATFORMS=(darwin/amd64 darwin/arm64 windows/amd64 windows/arm64 linux/amd64 linux/arm64) DUCKDB_WINDOWS_LIBRARY_VERSION="v1.4.4" DUCKDB_WINDOWS_LIBRARY_URL="https://github.com/duckdb/duckdb/releases/download/${DUCKDB_WINDOWS_LIBRARY_VERSION}/libduckdb-windows-amd64.zip" @@ -43,7 +43,7 @@ normalize_driver() { doris|diros) echo "doris" ;; open_gauss|open-gauss) echo "opengauss" ;; elasticsearch|elastic) echo "elasticsearch" ;; - mariadb|oceanbase|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|opengauss|iris|mongodb|tdengine|clickhouse) + mariadb|oceanbase|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|opengauss|iris|mongodb|tdengine|iotdb|clickhouse) echo "$name" ;; *) diff --git a/cmd/optional-driver-agent/provider_iotdb.go b/cmd/optional-driver-agent/provider_iotdb.go new file mode 100644 index 0000000..0beb13e --- /dev/null +++ b/cmd/optional-driver-agent/provider_iotdb.go @@ -0,0 +1,12 @@ +//go:build gonavi_iotdb_driver + +package main + +import "GoNavi-Wails/internal/db" + +func init() { + agentDriverType = "iotdb" + agentDatabaseFactory = func() db.Database { + return &db.IoTDBDB{} + } +} diff --git a/frontend/src/components/ConnectionModal.edit-password.test.tsx b/frontend/src/components/ConnectionModal.edit-password.test.tsx index 125bd70..f3cc7da 100644 --- a/frontend/src/components/ConnectionModal.edit-password.test.tsx +++ b/frontend/src/components/ConnectionModal.edit-password.test.tsx @@ -64,6 +64,18 @@ describe('ConnectionModal data source registry', () => { expect(source).toContain('return "http://127.0.0.1:6333";'); expect(source).toContain('return "apiKey=...";'); }); + + it('exposes Apache IoTDB in the create-connection picker with timeseries defaults', () => { + expect(source).toContain("case 'iotdb':"); + expect(source).toContain('return 6667;'); + expect(source).toContain('iotdb: ["iotdb"]'); + expect(source).toContain("key: 'iotdb'"); + expect(source).toContain("name: 'Apache IoTDB'"); + expect(source).toContain('dbType === "iotdb"'); + expect(source).toContain("return 'Storage Group / Device / Timeseries';"); + expect(source).toContain('return "iotdb://root:root@127.0.0.1:6667/root.sg";'); + expect(source).toContain('return "fetchSize=1024&timeZone=Asia%2FShanghai";'); + }); }); describe('ConnectionModal Redis Sentinel configuration', () => { diff --git a/frontend/src/components/ConnectionModal.tsx b/frontend/src/components/ConnectionModal.tsx index 876cfe3..b5a7aa3 100644 --- a/frontend/src/components/ConnectionModal.tsx +++ b/frontend/src/components/ConnectionModal.tsx @@ -1873,6 +1873,9 @@ const ConnectionModal: React.FC<{ if (dbType === "qdrant") { return "http://127.0.0.1:6333"; } + if (dbType === "iotdb") { + return "iotdb://root:root@127.0.0.1:6667/root.sg"; + } if (dbType === "redis") { return "redis://:pass@127.0.0.1:6379,127.0.0.2:6379/0?topology=cluster 或 redis://:pass@10.0.0.1:26379,10.0.0.2:26379/0?topology=sentinel&master=mymaster"; } @@ -1922,6 +1925,8 @@ const ConnectionModal: React.FC<{ return "schema=SYSDBA"; case "tdengine": return "timezone=Asia%2FShanghai"; + case "iotdb": + return "fetchSize=1024&timeZone=Asia%2FShanghai"; default: return "key=value&another=value"; } diff --git a/frontend/src/components/DataSyncModal.tsx b/frontend/src/components/DataSyncModal.tsx index 9c233ee..7efd45b 100644 --- a/frontend/src/components/DataSyncModal.tsx +++ b/frontend/src/components/DataSyncModal.tsx @@ -7,6 +7,7 @@ import { SavedConnection } from '../types'; import { EventsOn } from '../../wailsjs/runtime/runtime'; import { isMacLikePlatform, normalizeOpacityForPlatform, resolveAppearanceValues, resolveTextInputSafeBackdropFilter } from '../utils/appearance'; import { buildRpcConnectionConfig } from '../utils/connectionRpcConfig'; +import { quoteIdentPart, quoteQualifiedIdent } from '../utils/sql'; import { formatLocalDateTimeLiteral, normalizeTemporalLiteralText } from './dataGridCopyInsert'; import { buildDataSyncRequest, type SourceDatasetMode, validateDataSyncSelection } from './dataSyncRequest'; const { Title, Text } = Typography; @@ -46,26 +47,11 @@ type TableOps = { type WorkflowType = 'sync' | 'migration'; const quoteSqlIdent = (dbType: string, ident: string): string => { - const raw = String(ident || '').trim(); - if (!raw) return raw; - const t = String(dbType || '').toLowerCase(); - if (t === 'mysql' || t === 'mariadb' || t === 'oceanbase' || t === 'diros' || t === 'starrocks' || t === 'sphinx' || t === 'clickhouse' || t === 'tdengine') { - return `\`${raw.replace(/`/g, '``')}\``; - } - if (t === 'sqlserver') { - return `[${raw.replace(/]/g, ']]')}]`; - } - return `"${raw.replace(/"/g, '""')}"`; + return quoteIdentPart(dbType, String(ident || '').trim()); }; const quoteSqlTable = (dbType: string, tableName: string): string => { - const raw = String(tableName || '').trim(); - if (!raw) return raw; - if (!raw.includes('.')) return quoteSqlIdent(dbType, raw); - return raw - .split('.') - .map((part) => quoteSqlIdent(dbType, part)) - .join('.'); + return quoteQualifiedIdent(dbType, String(tableName || '').trim()); }; const toSqlLiteral = (value: any, dbType: string): string => { diff --git a/frontend/src/components/DatabaseIcons.test.tsx b/frontend/src/components/DatabaseIcons.test.tsx index b0ae56e..6f5f6d5 100644 --- a/frontend/src/components/DatabaseIcons.test.tsx +++ b/frontend/src/components/DatabaseIcons.test.tsx @@ -32,6 +32,13 @@ describe('DatabaseIcons', () => { expect(markup).toContain('>Qd'); }); + it('includes Apache IoTDB in the selectable database icons', () => { + expect(DB_ICON_TYPES).toContain('iotdb'); + expect(getDbIconLabel('iotdb')).toBe('Apache IoTDB'); + const markup = renderToStaticMarkup(<>{getDbIcon('iotdb', undefined, 22)}); + expect(markup).toContain('>Io'); + }); + it('wraps database icons in a consistent frame for sidebar sizing', () => { const mysqlMarkup = renderToStaticMarkup(<>{getDbIcon('mysql', undefined, 22)}); const jvmMarkup = renderToStaticMarkup(<>{getDbIcon('jvm', undefined, 22)}); diff --git a/frontend/src/components/DatabaseIcons.tsx b/frontend/src/components/DatabaseIcons.tsx index e224cf6..9a66348 100644 --- a/frontend/src/components/DatabaseIcons.tsx +++ b/frontend/src/components/DatabaseIcons.tsx @@ -49,6 +49,7 @@ const DB_DEFAULT_COLORS: Record = { highgo: '#00A86B', iris: '#1F6FEB', tdengine: '#2962FF', + iotdb: '#0F766E', chroma: '#7C3AED', qdrant: '#DC244C', diros: '#0050B3', @@ -180,6 +181,9 @@ const IrisIcon: React.FC = ({ size = 16, color }) => ( const TDengineIcon: React.FC = ({ size = 16, color }) => ( ); +const IoTDBIcon: React.FC = ({ size = 16, color }) => ( + +); const ChromaIcon: React.FC = ({ size = 16, color }) => ( ); @@ -239,6 +243,7 @@ const DB_ICON_MAP: Record> = { highgo: HighGoIcon, iris: IrisIcon, tdengine: TDengineIcon, + iotdb: IoTDBIcon, chroma: ChromaIcon, qdrant: QdrantIcon, elasticsearch: ElasticsearchIcon, @@ -249,7 +254,7 @@ const DB_ICON_MAP: Record> = { export const DB_ICON_TYPES: string[] = [ 'mysql', 'mariadb', 'oceanbase', 'postgres', 'redis', 'mongodb', 'jvm', 'oracle', 'sqlserver', 'sqlite', 'duckdb', 'clickhouse', 'starrocks', - 'kingbase', 'dameng', 'vastbase', 'opengauss', 'highgo', 'iris', 'tdengine', 'chroma', 'qdrant', 'elasticsearch', 'custom', + 'kingbase', 'dameng', 'vastbase', 'opengauss', 'highgo', 'iris', 'tdengine', 'iotdb', 'chroma', 'qdrant', 'elasticsearch', 'custom', ]; /** 该类型是否有品牌 SVG 文件 */ @@ -271,7 +276,7 @@ export const getDbIconLabel = (type: string): string => { sqlserver: 'SQL Server', clickhouse: 'ClickHouse', sqlite: 'SQLite', starrocks: 'StarRocks', duckdb: 'DuckDB', kingbase: '金仓', dameng: '达梦', - vastbase: 'VastBase', opengauss: 'OpenGauss', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', + vastbase: 'VastBase', opengauss: 'OpenGauss', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', chroma: 'Chroma', qdrant: 'Qdrant', elasticsearch: 'Elasticsearch', diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 3762cda..cadadcd 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -2540,16 +2540,16 @@ const Sidebar: React.FC<{ } }; - const isNonRelationalDbType = (connectionId: string): boolean => { + const isStructureOnlyDbType = (connectionId: string): boolean => { const conn = connections.find(c => c.id === connectionId); if (!conn) return false; const dbType = resolveDataSourceType(conn.config); - return dbType === 'elasticsearch' || dbType === 'mongodb' || dbType === 'redis'; + return dbType === 'elasticsearch' || dbType === 'mongodb' || dbType === 'redis' || dbType === 'iotdb'; }; const openDesign = (node: any, initialTab: string, readOnly: boolean = false) => { const { tableName, dbName, id } = node.dataRef; - const forceReadOnly = readOnly || isNonRelationalDbType(id); + const forceReadOnly = readOnly || isStructureOnlyDbType(id); addTab({ id: `design-${id}-${dbName}-${tableName}`, title: `${forceReadOnly ? '表结构' : '设计表'} (${tableName})`, @@ -2564,6 +2564,10 @@ const Sidebar: React.FC<{ const openNewTableDesign = (node: any) => { const { dbName, id } = node.dataRef; + if (isStructureOnlyDbType(id)) { + message.warning('当前数据源暂不支持可视化新建表'); + return; + } addTab({ id: `new-table-${id}-${dbName}-${Date.now()}`, title: `新建表 - ${dbName}`, @@ -6405,14 +6409,15 @@ const Sidebar: React.FC<{ const groupData = node.dataRef; // { ...conn, dbName, groupKey } const sortPreferenceKey = `${groupData.id}-${groupData.dbName}`; const currentSort = tableSortPreference[sortPreferenceKey] || 'name'; + const canCreateTable = !isStructureOnlyDbType(String(groupData.id || '')); return [ - { + ...(canCreateTable ? [{ key: 'new-table', label: '新建表', icon: , onClick: () => openNewTableDesign(node) - }, + }] : []), { type: 'divider' }, { key: 'sort-by-name', @@ -6845,13 +6850,14 @@ const Sidebar: React.FC<{ const capabilities = getDataSourceCapabilities(databaseConn?.config); const isStarRocks = dialect === 'starrocks'; const supportsSchemaActions = isPostgresSchemaDialect(dialect); + const canCreateTable = !isStructureOnlyDbType(String(databaseConn?.id || '')); return [ - { + ...(canCreateTable ? [{ key: 'new-table', label: '新建表', icon: , onClick: () => openNewTableDesign(node) - }, + }] : []), ...(supportsSchemaActions ? [ { key: 'new-schema', @@ -7116,7 +7122,7 @@ const Sidebar: React.FC<{ { type: 'divider' }, { key: 'design-table', - label: '设计表', + label: isStructureOnlyDbType(String(node.dataRef?.id || '')) ? '表结构' : '设计表', icon: , onClick: () => openDesign(node, 'columns', false) }, diff --git a/frontend/src/components/TableOverview.tsx b/frontend/src/components/TableOverview.tsx index 6a5cc68..ff2f10d 100644 --- a/frontend/src/components/TableOverview.tsx +++ b/frontend/src/components/TableOverview.tsx @@ -132,6 +132,11 @@ const getMetadataDialect = (connType: string, driver?: string, oceanBaseProtocol const buildTableStatusSQL = (dialect: string, dbName: string, schemaName?: string): string => { const escapeLiteral = (s: string) => s.replace(/'/g, "''"); + const iotdbDevicePattern = (name: string) => { + const normalized = String(name || '').trim().replace(/[`"]/g, ''); + if (!normalized) return ''; + return normalized.endsWith('.**') ? normalized : `${normalized}.**`; + }; switch (dialect) { case 'mysql': case 'starrocks': @@ -190,6 +195,10 @@ ORDER BY s.name, t.name`; return `SELECT name AS table_name, comment AS table_comment, total_rows AS table_rows, total_bytes AS data_length, 0 AS index_length FROM system.tables WHERE database = '${escapeLiteral(dbName)}' AND engine NOT IN ('View', 'MaterializedView') ORDER BY name`; case 'tdengine': return `SHOW TABLES FROM \`${dbName.replace(/`/g, '``')}\``; + case 'iotdb': { + const pattern = iotdbDevicePattern(dbName); + return pattern ? `SHOW DEVICES ${pattern}` : 'SHOW DEVICES'; + } case 'dm': case 'oracle': { const owner = (schemaName || dbName).toUpperCase(); @@ -219,7 +228,7 @@ const parseTableStats = (dialect: string, rows: Record[]): TableSta }; return { - name: strVal(['Name', 'name', 'table_name', 'tablename', 'TABLE_NAME']), + name: strVal(['Name', 'name', 'table_name', 'tablename', 'TABLE_NAME', 'Device', 'device']), comment: strVal(['Comment', 'table_comment', 'TABLE_COMMENT', 'comments']), rows: numVal(['Rows', 'table_rows', 'TABLE_ROWS', 'num_rows', 'reltuples', 'total_rows']), dataSize: numVal(['Data_length', 'data_length', 'DATA_LENGTH', 'total_bytes']), @@ -263,6 +272,7 @@ const TableOverview: React.FC = ({ tab }) => { [connection?.config?.driver, connection?.config?.oceanBaseProtocol, connection?.config?.type] ); const schemaName = String((tab as any).schemaName || '').trim(); + const supportsDesignWrite = metadataDialect !== 'iotdb'; const autoFetchVisible = useAutoFetchVisibility(); const loadData = useCallback(async () => { @@ -415,17 +425,18 @@ const TableOverview: React.FC = ({ tab }) => { const openDesign = useCallback((tableName: string) => { if (!connection) return; setActiveContext({ connectionId: connection.id, dbName: tab.dbName || '' }); + const structureOnly = !supportsDesignWrite; addTab({ id: `design-${connection.id}-${tab.dbName}-${tableName}`, - title: `设计表 (${tableName})`, + title: `${structureOnly ? '表结构' : '设计表'} (${tableName})`, type: 'design', connectionId: connection.id, dbName: tab.dbName, tableName, initialTab: 'columns', - readOnly: false, + readOnly: structureOnly, }); - }, [connection, tab.dbName, addTab, setActiveContext]); + }, [connection, tab.dbName, addTab, setActiveContext, supportsDesignWrite]); const openTableDdl = useCallback((tableName: string) => { if (!connection) return; @@ -827,7 +838,7 @@ const TableOverview: React.FC = ({ tab }) => { const buildLegacyTableContextMenuItems = useCallback((table: TableStatRow): MenuProps['items'] => [ { key: 'new-query', label: '新建查询', icon: , onClick: () => openQueryForTable(table.name) }, { type: 'divider' }, - { key: 'design-table', label: '设计表', icon: , onClick: () => openDesign(table.name) }, + { key: 'design-table', label: supportsDesignWrite ? '设计表' : '表结构', icon: , onClick: () => openDesign(table.name) }, { key: 'copy-table-name', label: '复制表名', icon: , onClick: () => handleCopyTableName(table.name) }, { key: 'copy-structure', label: '复制表结构', icon: , onClick: () => handleCopyStructure(table.name) }, { key: 'backup-table', label: '备份表 (SQL)', icon: , onClick: () => handleExport(table.name, 'sql') }, @@ -855,6 +866,7 @@ const TableOverview: React.FC = ({ tab }) => { handleTableDataDangerAction, openDesign, openQueryForTable, + supportsDesignWrite, ]); const renderOverviewSectionTitle = (section: OverviewTableSection) => ( diff --git a/frontend/src/store.ts b/frontend/src/store.ts index 4a5428b..3025e4b 100644 --- a/frontend/src/store.ts +++ b/frontend/src/store.ts @@ -289,6 +289,7 @@ const SUPPORTED_CONNECTION_TYPES = new Set([ "postgres", "redis", "tdengine", + "iotdb", "oracle", "dameng", "kingbase", @@ -353,6 +354,8 @@ const getDefaultPortByType = (type: string): number => { return 6379; case "tdengine": return 6041; + case "iotdb": + return 6667; case "oracle": return 1521; case "dameng": diff --git a/frontend/src/utils/connectionDriverType.test.ts b/frontend/src/utils/connectionDriverType.test.ts index e1252dd..af2534b 100644 --- a/frontend/src/utils/connectionDriverType.test.ts +++ b/frontend/src/utils/connectionDriverType.test.ts @@ -16,6 +16,8 @@ describe('connectionDriverType', () => { expect(normalizeDriverType('chroma-db')).toBe('chroma'); expect(normalizeDriverType('qdrantdb')).toBe('qdrant'); expect(normalizeDriverType('qdrant-db')).toBe('qdrant'); + expect(normalizeDriverType('apache-iotdb')).toBe('iotdb'); + expect(normalizeDriverType('apache_iotdb')).toBe('iotdb'); expect(normalizeDriverType('doris')).toBe('diros'); expect(normalizeDriverType('open-gauss')).toBe('opengauss'); expect(normalizeDriverType('InterSystemsIRIS')).toBe('iris'); diff --git a/frontend/src/utils/connectionDriverType.ts b/frontend/src/utils/connectionDriverType.ts index 9cb0659..c07d149 100644 --- a/frontend/src/utils/connectionDriverType.ts +++ b/frontend/src/utils/connectionDriverType.ts @@ -17,6 +17,7 @@ export const normalizeDriverType = (value: string): string => { if (normalized === 'elastic') return 'elasticsearch'; if (normalized === 'chromadb' || normalized === 'chroma-db') return 'chroma'; if (normalized === 'qdrantdb' || normalized === 'qdrant-db') return 'qdrant'; + if (normalized === 'apache-iotdb' || normalized === 'apache_iotdb') return 'iotdb'; if (normalized === 'doris') return 'diros'; if ( normalized === 'open_gauss' || diff --git a/frontend/src/utils/connectionModalPresentation.test.ts b/frontend/src/utils/connectionModalPresentation.test.ts index 6a6f424..d6af201 100644 --- a/frontend/src/utils/connectionModalPresentation.test.ts +++ b/frontend/src/utils/connectionModalPresentation.test.ts @@ -91,6 +91,7 @@ describe('connectionModalPresentation', () => { 'qdrant', 'redis', 'tdengine', + 'iotdb', 'custom', 'jvm', ]; @@ -174,6 +175,14 @@ describe('connectionModalPresentation', () => { 'credentials', 'databaseScope', ]); + expect(resolveConnectionConfigLayout('iotdb').sections).toEqual([ + 'identity', + 'uri', + 'target', + 'service', + 'credentials', + 'databaseScope', + ]); }); it('uses localized labels for layout kinds shown in the modal', () => { @@ -181,5 +190,6 @@ describe('connectionModalPresentation', () => { expect(getConnectionConfigLayoutKindLabel('file')).toBe('文件型数据库'); expect(getConnectionConfigLayoutKindLabel('search')).toBe('搜索引擎'); expect(getConnectionConfigLayoutKindLabel('vector')).toBe('向量数据库'); + expect(getConnectionConfigLayoutKindLabel('timeseries')).toBe('时序数据库'); }); }); diff --git a/frontend/src/utils/connectionModalPresentation.ts b/frontend/src/utils/connectionModalPresentation.ts index 5ce552c..060f7c2 100644 --- a/frontend/src/utils/connectionModalPresentation.ts +++ b/frontend/src/utils/connectionModalPresentation.ts @@ -41,6 +41,7 @@ export type ConnectionConfigLayoutKind = | 'file' | 'search' | 'vector' + | 'timeseries' | 'custom' | 'jvm' | 'generic-sql'; @@ -163,6 +164,8 @@ export const getConnectionConfigLayoutKindLabel = ( return '搜索引擎'; case 'vector': return '向量数据库'; + case 'timeseries': + return '时序数据库'; case 'custom': return '自定义连接'; case 'jvm': @@ -265,6 +268,19 @@ export const resolveConnectionConfigLayout = ( ], }; } + if (type === 'iotdb') { + return { + kind: 'timeseries', + sections: [ + 'identity', + 'uri', + 'target', + 'service', + 'credentials', + 'databaseScope', + ], + }; + } if (postgresCompatibleTypes.has(type)) { return { kind: 'postgres-compatible', diff --git a/frontend/src/utils/connectionTypeCapabilities.test.ts b/frontend/src/utils/connectionTypeCapabilities.test.ts index 14ccd89..e95c853 100644 --- a/frontend/src/utils/connectionTypeCapabilities.test.ts +++ b/frontend/src/utils/connectionTypeCapabilities.test.ts @@ -19,6 +19,7 @@ describe('connectionTypeCapabilities', () => { expect(singleHostUriSchemesByType.elasticsearch).toEqual(['http', 'https']); expect(singleHostUriSchemesByType.chroma).toEqual(['http', 'https', 'chroma']); expect(singleHostUriSchemesByType.qdrant).toEqual(['http', 'https', 'qdrant']); + expect(singleHostUriSchemesByType.iotdb).toEqual(['iotdb']); expect(singleHostUriSchemesByType.redis).toEqual(['redis']); }); @@ -29,6 +30,7 @@ describe('connectionTypeCapabilities', () => { expect(supportsSSLForType('chroma')).toBe(true); expect(supportsSSLForType('qdrant')).toBe(true); expect(supportsSSLForType('tdengine')).toBe(true); + expect(supportsSSLForType('iotdb')).toBe(false); expect(supportsSSLForType('dameng')).toBe(true); expect(supportsSSLForType('sqlite')).toBe(false); }); @@ -70,6 +72,7 @@ describe('connectionTypeCapabilities', () => { expect(supportsConnectionParamsForType('mongodb')).toBe(true); expect(supportsConnectionParamsForType('dameng')).toBe(true); expect(supportsConnectionParamsForType('tdengine')).toBe(true); + expect(supportsConnectionParamsForType('iotdb')).toBe(true); expect(supportsConnectionParamsForType('elasticsearch')).toBe(true); expect(supportsConnectionParamsForType('chroma')).toBe(true); expect(supportsConnectionParamsForType('qdrant')).toBe(true); diff --git a/frontend/src/utils/connectionTypeCapabilities.ts b/frontend/src/utils/connectionTypeCapabilities.ts index 7f3032d..4eb0e36 100644 --- a/frontend/src/utils/connectionTypeCapabilities.ts +++ b/frontend/src/utils/connectionTypeCapabilities.ts @@ -7,6 +7,7 @@ export const singleHostUriSchemesByType: Record = { iris: ["iris", "intersystems"], redis: ["redis"], tdengine: ["tdengine"], + iotdb: ["iotdb"], dameng: ["dameng", "dm"], kingbase: ["kingbase"], highgo: ["highgo"], @@ -129,6 +130,7 @@ export const supportsConnectionParamsForType = (type: string) => type === "mongodb" || type === "dameng" || type === "tdengine" || + type === "iotdb" || type === "elasticsearch" || type === "chroma" || type === "qdrant"; diff --git a/frontend/src/utils/connectionTypeCatalog.test.ts b/frontend/src/utils/connectionTypeCatalog.test.ts index 6e357a1..00e230a 100644 --- a/frontend/src/utils/connectionTypeCatalog.test.ts +++ b/frontend/src/utils/connectionTypeCatalog.test.ts @@ -26,6 +26,7 @@ describe('connectionTypeCatalog', () => { expect(keys).toContain('elasticsearch'); expect(keys).toContain('chroma'); expect(keys).toContain('qdrant'); + expect(keys).toContain('iotdb'); expect(keys).toContain('jvm'); expect(keys).toContain('custom'); expect(new Set(keys).size).toBe(keys.length); @@ -42,6 +43,7 @@ describe('connectionTypeCatalog', () => { expect(getConnectionTypeDefaultPort('elasticsearch')).toBe(9200); expect(getConnectionTypeDefaultPort('chroma')).toBe(8000); expect(getConnectionTypeDefaultPort('qdrant')).toBe(6333); + expect(getConnectionTypeDefaultPort('iotdb')).toBe(6667); expect(getConnectionTypeDefaultPort('sqlite')).toBe(0); expect(getConnectionTypeDefaultPort('duckdb')).toBe(0); expect(getConnectionTypeDefaultPort('unknown')).toBe(3306); @@ -53,6 +55,7 @@ describe('connectionTypeCatalog', () => { expect(getConnectionTypeHint('elasticsearch')).toContain('Mapping'); expect(getConnectionTypeHint('chroma')).toContain('向量'); expect(getConnectionTypeHint('qdrant')).toContain('Payload'); + expect(getConnectionTypeHint('iotdb')).toContain('Timeseries'); expect(getConnectionTypeHint('oceanbase')).toBe('MySQL / Oracle 租户'); expect(getConnectionTypeHint('duckdb')).toBe('本地文件连接'); expect(getConnectionTypeHint('mysql')).toBe('标准连接配置'); diff --git a/frontend/src/utils/connectionTypeCatalog.ts b/frontend/src/utils/connectionTypeCatalog.ts index c32e0ea..4509228 100644 --- a/frontend/src/utils/connectionTypeCatalog.ts +++ b/frontend/src/utils/connectionTypeCatalog.ts @@ -56,6 +56,7 @@ export const CONNECTION_TYPE_GROUPS: ConnectionTypeCatalogGroup[] = [ label: '时序数据库', items: [ { key: 'tdengine', name: 'TDengine' }, + { key: 'iotdb', name: 'Apache IoTDB' }, ], }, { @@ -90,6 +91,8 @@ export const getConnectionTypeDefaultPort = (type: string): number => { return 6379; case 'tdengine': return 6041; + case 'iotdb': + return 6667; case 'oracle': return 1521; case 'dameng': @@ -138,6 +141,8 @@ export const getConnectionTypeHint = (type: string): string => { return 'Collection 浏览、向量检索和元数据过滤'; case 'qdrant': return 'Collection 浏览、向量搜索和 Payload 过滤'; + case 'iotdb': + return 'Storage Group / Device / Timeseries'; case 'oceanbase': return 'MySQL / Oracle 租户'; case 'sqlite': diff --git a/frontend/src/utils/dataSourceCapabilities.test.ts b/frontend/src/utils/dataSourceCapabilities.test.ts index d7f21f5..1d5cd57 100644 --- a/frontend/src/utils/dataSourceCapabilities.test.ts +++ b/frontend/src/utils/dataSourceCapabilities.test.ts @@ -108,6 +108,25 @@ describe('dataSourceCapabilities', () => { }); }); + it('treats Apache IoTDB as a queryable timeseries datasource with IoTDB-specific writes', () => { + expect(getDataSourceCapabilities({ type: 'iotdb' })).toMatchObject({ + type: 'iotdb', + supportsQueryEditor: true, + supportsSqlQueryExport: false, + supportsCopyInsert: false, + supportsCreateDatabase: false, + supportsRenameDatabase: false, + supportsDropDatabase: false, + forceReadOnlyQueryResult: true, + }); + expect(getDataSourceCapabilities({ type: 'custom', driver: 'apache-iotdb' })).toMatchObject({ + type: 'iotdb', + supportsQueryEditor: true, + supportsCopyInsert: false, + forceReadOnlyQueryResult: true, + }); + }); + it('treats OceanBase Oracle protocol as Oracle capabilities', () => { expect(getDataSourceCapabilities({ type: 'oceanbase', diff --git a/frontend/src/utils/dataSourceCapabilities.ts b/frontend/src/utils/dataSourceCapabilities.ts index f92025a..7e98191 100644 --- a/frontend/src/utils/dataSourceCapabilities.ts +++ b/frontend/src/utils/dataSourceCapabilities.ts @@ -27,6 +27,9 @@ const normalizeDataSourceToken = (raw: string): string => { case 'qdrantdb': case 'qdrant-db': return 'qdrant'; + case 'apache-iotdb': + case 'apache_iotdb': + return 'iotdb'; case 'intersystems': case 'intersystemsiris': case 'inter-systems': @@ -98,7 +101,7 @@ const COPY_INSERT_TYPES = new Set([ ]); const QUERY_EDITOR_DISABLED_TYPES = new Set(['redis']); -const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'clickhouse']); +const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse']); const MANUAL_TOTAL_COUNT_TYPES = new Set(['duckdb', 'oracle']); const APPROXIMATE_TABLE_COUNT_TYPES = new Set(['duckdb', 'oracle']); const APPROXIMATE_TOTAL_PAGE_TYPES = new Set(['duckdb']); diff --git a/frontend/src/utils/queryAutoLimit.test.ts b/frontend/src/utils/queryAutoLimit.test.ts index 5e05d63..54b04bd 100644 --- a/frontend/src/utils/queryAutoLimit.test.ts +++ b/frontend/src/utils/queryAutoLimit.test.ts @@ -25,6 +25,7 @@ describe('applyQueryAutoLimit', () => { 'duckdb', 'clickhouse', 'tdengine', + 'iotdb', ]; it.each(limitDialects)('adds generic LIMIT for %s connections', (dbType) => { diff --git a/frontend/src/utils/sql.test.ts b/frontend/src/utils/sql.test.ts index e435e93..1a2140c 100644 --- a/frontend/src/utils/sql.test.ts +++ b/frontend/src/utils/sql.test.ts @@ -54,6 +54,11 @@ describe('reverseOrderBySQL', () => { }); describe('quoteQualifiedIdent', () => { + it('quotes Apache IoTDB device paths with backticks per path segment', () => { + expect(quoteQualifiedIdent('iotdb', 'root.sg.d1')) + .toBe('`root`.`sg`.`d1`'); + }); + it('does not split dots inside quoted DuckDB identifiers', () => { expect(quoteQualifiedIdent('duckdb', '"daily.events"."2026.06"')) .toBe('"daily.events"."2026.06"'); diff --git a/frontend/src/utils/sql.ts b/frontend/src/utils/sql.ts index 4abfd59..d4c2b81 100644 --- a/frontend/src/utils/sql.ts +++ b/frontend/src/utils/sql.ts @@ -29,7 +29,7 @@ export const quoteIdentPart = (dbType: string, ident: string) => { if (!raw) return raw; const dbTypeLower = (dbType || '').toLowerCase(); - if (dbTypeLower === 'mysql' || dbTypeLower === 'mariadb' || dbTypeLower === 'oceanbase' || dbTypeLower === 'diros' || dbTypeLower === 'starrocks' || dbTypeLower === 'sphinx' || dbTypeLower === 'tdengine' || dbTypeLower === 'clickhouse') { + if (dbTypeLower === 'mysql' || dbTypeLower === 'mariadb' || dbTypeLower === 'oceanbase' || dbTypeLower === 'diros' || dbTypeLower === 'starrocks' || dbTypeLower === 'sphinx' || dbTypeLower === 'tdengine' || dbTypeLower === 'iotdb' || dbTypeLower === 'clickhouse') { return `\`${raw.replace(/`/g, '``')}\``; } diff --git a/frontend/src/utils/sqlDialect.test.ts b/frontend/src/utils/sqlDialect.test.ts index ec496a4..b1a43ce 100644 --- a/frontend/src/utils/sqlDialect.test.ts +++ b/frontend/src/utils/sqlDialect.test.ts @@ -34,6 +34,8 @@ describe('sqlDialect', () => { expect(resolveSqlDialect('custom', 'chroma-db')).toBe('chroma'); expect(resolveSqlDialect('QdrantDB')).toBe('qdrant'); expect(resolveSqlDialect('custom', 'qdrant-db')).toBe('qdrant'); + expect(resolveSqlDialect('Apache-IoTDB')).toBe('iotdb'); + expect(resolveSqlDialect('custom', 'apache_iotdb')).toBe('iotdb'); expect(resolveSqlDialect('OceanBase', '', { oceanBaseProtocol: 'oracle' })).toBe('oracle'); expect(resolveSqlDialect('custom', 'oceanbase', { oceanBaseProtocol: 'oracle' })).toBe('oracle'); expect(isMysqlFamilyDialect('mariadb')).toBe(true); @@ -56,9 +58,16 @@ describe('sqlDialect', () => { expect(values(resolveColumnTypeOptions('clickhouse'))).toContain('DateTime64(3)'); expect(values(resolveColumnTypeOptions('iris'))).toContain('varchar(255)'); expect(values(resolveColumnTypeOptions('tdengine'))).toContain('TIMESTAMP'); + expect(values(resolveColumnTypeOptions('iotdb'))).toContain('INT64'); expect(values(resolveColumnTypeOptions('duckdb'))).toContain('STRUCT'); }); + it('resolves Apache IoTDB completion keywords and functions independently', () => { + expect(resolveSqlKeywords('iotdb')).toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'SHOW TIMESERIES', 'WITH DATATYPE'])); + expect(names(resolveSqlFunctions('iotdb'))).toEqual(expect.arrayContaining(['DATE_BIN', 'DIFF', 'TOP_K'])); + expect(resolveSqlKeywords('iotdb')).not.toEqual(expect.arrayContaining(['TAGS', 'USING'])); + }); + it('resolves oracle completion keywords and functions without mysql-only suggestions', () => { expect(resolveSqlKeywords('oracle')).toEqual(expect.arrayContaining(['ROWNUM', 'FETCH', 'VARCHAR2', 'NUMBER'])); expect(resolveSqlKeywords('oracle')).not.toEqual(expect.arrayContaining(['AUTO_INCREMENT', 'CHANGE', 'LIMIT'])); diff --git a/frontend/src/utils/sqlDialect.ts b/frontend/src/utils/sqlDialect.ts index c8461a7..dc5408b 100644 --- a/frontend/src/utils/sqlDialect.ts +++ b/frontend/src/utils/sqlDialect.ts @@ -27,6 +27,7 @@ export type SqlDialect = | 'duckdb' | 'clickhouse' | 'tdengine' + | 'iotdb' | 'mongodb' | 'redis' | 'elasticsearch' @@ -111,6 +112,7 @@ export const resolveSqlDialect = ( case 'duckdb': case 'clickhouse': case 'tdengine': + case 'iotdb': case 'mongodb': case 'redis': case 'elasticsearch': @@ -125,6 +127,9 @@ export const resolveSqlDialect = ( case 'qdrant-db': case 'qdrant': return 'qdrant'; + case 'apache-iotdb': + case 'apache_iotdb': + return 'iotdb'; default: break; } @@ -147,6 +152,7 @@ export const resolveSqlDialect = ( if (source.includes('duckdb')) return 'duckdb'; if (source.includes('clickhouse')) return 'clickhouse'; if (source.includes('tdengine')) return 'tdengine'; + if (source.includes('iotdb')) return 'iotdb'; if (source.includes('sqlserver') || source.includes('mssql')) return 'sqlserver'; if (source.includes('iris') || source.includes('intersystems')) return 'iris'; if (source.includes('elastic')) return 'elasticsearch'; @@ -171,7 +177,7 @@ export const isOracleLikeDialect = (dbType: string): boolean => ( export const isSqlServerDialect = (dbType: string): boolean => resolveSqlDialect(dbType) === 'sqlserver'; export const isBacktickIdentifierDialect = (dbType: string): boolean => ( - isMysqlFamilyDialect(dbType) || ['clickhouse', 'tdengine'].includes(resolveSqlDialect(dbType)) + isMysqlFamilyDialect(dbType) || ['clickhouse', 'tdengine', 'iotdb'].includes(resolveSqlDialect(dbType)) ); const stripIdentifierQuotes = (part: string): string => { @@ -470,6 +476,19 @@ const TDENGINE_TYPES = optionValues([ 'GEOMETRY', ]); +const IOTDB_TYPES = optionValues([ + 'BOOLEAN', + 'INT32', + 'INT64', + 'FLOAT', + 'DOUBLE', + 'TEXT', + 'STRING', + 'BLOB', + 'TIMESTAMP', + 'DATE', +]); + const DUCKDB_TYPES = optionValues([ 'BOOLEAN', 'TINYINT', @@ -514,6 +533,7 @@ export const resolveColumnTypeOptions = (dbType: string): ColumnTypeOption[] => if (dialect === 'duckdb') return DUCKDB_TYPES; if (dialect === 'clickhouse') return CLICKHOUSE_TYPES; if (dialect === 'tdengine') return TDENGINE_TYPES; + if (dialect === 'iotdb') return IOTDB_TYPES; return COMMON_TYPES; }; @@ -565,6 +585,26 @@ const STARROCKS_KEYWORDS = [ const TDENGINE_KEYWORDS = ['LIMIT', 'SLIMIT', 'SOFFSET', 'TAGS', 'USING', 'INTERVAL', 'FILL', 'PARTITION BY']; +const IOTDB_KEYWORDS = [ + 'LIMIT', + 'OFFSET', + 'ALIGN BY DEVICE', + 'DISABLE ALIGN', + 'GROUP BY', + 'LEVEL', + 'FILL', + 'SLIMIT', + 'SOFFSET', + 'CREATE TIMESERIES', + 'SHOW TIMESERIES', + 'SHOW DEVICES', + 'SHOW DATABASES', + 'STORAGE GROUP', + 'WITH DATATYPE', + 'ENCODING', + 'COMPRESSION', +]; + export const resolveSqlKeywords = (dbType: string): string[] => { const dialect = resolveSqlDialect(dbType); if (dialect === 'starrocks') return unique([...COMMON_KEYWORDS, ...MYSQL_KEYWORDS, ...STARROCKS_KEYWORDS]); @@ -576,6 +616,7 @@ export const resolveSqlKeywords = (dbType: string): string[] => { if (dialect === 'duckdb') return unique([...COMMON_KEYWORDS, ...DUCKDB_KEYWORDS]); if (dialect === 'clickhouse') return unique([...COMMON_KEYWORDS, ...CLICKHOUSE_KEYWORDS]); if (dialect === 'tdengine') return unique([...COMMON_KEYWORDS, ...TDENGINE_KEYWORDS]); + if (dialect === 'iotdb') return unique([...COMMON_KEYWORDS, ...IOTDB_KEYWORDS]); return COMMON_KEYWORDS; }; @@ -793,6 +834,19 @@ const TDENGINE_FUNCTIONS = [ fn('IRATE', 'TDengine - 瞬时变化率'), ]; +const IOTDB_FUNCTIONS = [ + fn('NOW', 'IoTDB - 当前时间'), + fn('DATE_BIN', 'IoTDB - 时间分桶'), + fn('DIFF', 'IoTDB - 差分'), + fn('TIME_DIFFERENCE', 'IoTDB - 时间差'), + fn('DERIVATIVE', 'IoTDB - 导数'), + fn('NON_NEGATIVE_DERIVATIVE', 'IoTDB - 非负导数'), + fn('TOP_K', 'IoTDB - Top K'), + fn('BOTTOM_K', 'IoTDB - Bottom K'), + fn('M4', 'IoTDB - M4 降采样'), + fn('EQUAL_SIZE_BUCKET_RANDOM_SAMPLE', 'IoTDB - 等宽随机采样'), +]; + const mergeFunctions = (items: SqlFunctionCompletion[]): SqlFunctionCompletion[] => { const seen = new Set(); const result: SqlFunctionCompletion[] = []; @@ -816,5 +870,6 @@ export const resolveSqlFunctions = (dbType: string): SqlFunctionCompletion[] => if (dialect === 'duckdb') return mergeFunctions([...COMMON_FUNCTIONS, ...DUCKDB_FUNCTIONS]); if (dialect === 'clickhouse') return mergeFunctions([...COMMON_FUNCTIONS, ...CLICKHOUSE_FUNCTIONS]); if (dialect === 'tdengine') return mergeFunctions([...COMMON_FUNCTIONS, ...TDENGINE_FUNCTIONS]); + if (dialect === 'iotdb') return mergeFunctions([...COMMON_FUNCTIONS, ...IOTDB_FUNCTIONS]); return COMMON_FUNCTIONS; }; diff --git a/go.mod b/go.mod index bd99ab9..e2451ad 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( gitea.com/kingbase/gokb v0.0.0-20201021123113-29bd62a876c3 gitee.com/chunanyong/dm v1.8.22 github.com/ClickHouse/clickhouse-go/v2 v2.43.0 + github.com/apache/iotdb-client-go v1.3.7 github.com/caretdev/go-irisnative v0.2.1 github.com/duckdb/duckdb-go/v2 v2.5.5 github.com/elastic/go-elasticsearch/v8 v8.19.6 @@ -31,6 +32,7 @@ require ( ) require ( + github.com/apache/thrift v0.22.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 64b9d88..9f96587 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,9 @@ github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwTo github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/apache/arrow-go/v18 v18.5.1 h1:yaQ6zxMGgf9YCYw4/oaeOU3AULySDlAYDOcnr4LdHdI= github.com/apache/arrow-go/v18 v18.5.1/go.mod h1:OCCJsmdq8AsRm8FkBSSmYTwL/s4zHW9CqxeBxEytkNE= +github.com/apache/iotdb-client-go v1.3.7 h1:NHEW0yysGfxFQkkJpFHTlww1a/RHCINbOXBfv2/aIQ0= +github.com/apache/iotdb-client-go v1.3.7/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY= +github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -96,6 +99,7 @@ github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0kt github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= diff --git a/internal/app/db_context.go b/internal/app/db_context.go index 738742b..5ba6713 100644 --- a/internal/app/db_context.go +++ b/internal/app/db_context.go @@ -20,7 +20,7 @@ func normalizeRunConfig(config connection.ConnectionConfig, dbName string) conne if !isOceanBaseOracleProtocol(config) { runConfig.Database = name } - case "mysql", "mariadb", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "iris", "intersystems", "intersystemsiris", "inter-systems", "inter-systems-iris", "mongodb", "tdengine", "clickhouse": + case "mysql", "mariadb", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "sqlserver", "iris", "intersystems", "intersystemsiris", "inter-systems", "inter-systems-iris", "mongodb", "tdengine", "iotdb", "clickhouse": // 这些类型的 dbName 表示"数据库",需要写入连接配置以选择目标库。 runConfig.Database = name case "dameng": @@ -51,7 +51,7 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string, // Elasticsearch:索引名可能含多个点(如 iot_pro_biz_operate_log.index.20240626), // 不能按点分割,直接返回原始数据库名和完整表名。 - if dbType == "elasticsearch" { + if dbType == "elasticsearch" || dbType == "iotdb" { return rawDB, rawTable } diff --git a/internal/app/db_proxy.go b/internal/app/db_proxy.go index 4b3120f..6ec7b78 100644 --- a/internal/app/db_proxy.go +++ b/internal/app/db_proxy.go @@ -217,6 +217,8 @@ func defaultPortByType(driverType string) int { return 6379 case "tdengine": return 6041 + case "iotdb": + return 6667 case "oracle": return 1521 case "dameng": diff --git a/internal/app/methods_driver.go b/internal/app/methods_driver.go index eec5406..3f11d63 100644 --- a/internal/app/methods_driver.go +++ b/internal/app/methods_driver.go @@ -354,6 +354,7 @@ const builtinDriverManifestJSON = `{ "iris": { "engine": "go", "version": "0.2.1", "checksumPolicy": "off", "downloadUrl": "builtin://activate/iris" }, "mongodb": { "engine": "go", "version": "1.17.9", "checksumPolicy": "off", "downloadUrl": "builtin://activate/mongodb" }, "tdengine": { "engine": "go", "version": "3.7.8", "checksumPolicy": "off", "downloadUrl": "builtin://activate/tdengine" }, + "iotdb": { "engine": "go", "version": "1.3.7", "checksumPolicy": "off", "downloadUrl": "builtin://activate/iotdb" }, "clickhouse": { "engine": "go", "version": "2.43.1", "checksumPolicy": "off", "downloadUrl": "builtin://activate/clickhouse" }, "elasticsearch": { "engine": "go", "version": "8.19.6", "checksumPolicy": "off", "downloadUrl": "builtin://activate/elasticsearch" } } @@ -416,6 +417,7 @@ var latestDriverVersionMap = map[string]string{ "iris": "0.2.1", "mongodb": "2.5.0", "tdengine": "3.7.8", + "iotdb": "1.3.7", "clickhouse": "2.43.1", "elasticsearch": "8.19.6", "oracle": "2.9.0", @@ -440,6 +442,7 @@ var driverGoModulePathMap = map[string]string{ "iris": "github.com/caretdev/go-irisnative", "mongodb": "go.mongodb.org/mongo-driver/v2", "tdengine": "github.com/taosdata/driver-go/v3", + "iotdb": "github.com/apache/iotdb-client-go", "clickhouse": "github.com/ClickHouse/clickhouse-go/v2", "elasticsearch": "github.com/elastic/go-elasticsearch/v8", } @@ -1505,6 +1508,7 @@ func allDriverDefinitionsWithPackages(packages map[string]pinnedDriverPackage) [ buildOptionalGoDriverDefinition("iris", "InterSystems IRIS", packages), buildOptionalGoDriverDefinition("mongodb", "MongoDB", packages), buildOptionalGoDriverDefinition("tdengine", "TDengine", packages), + buildOptionalGoDriverDefinition("iotdb", "Apache IoTDB", packages), buildOptionalGoDriverDefinition("clickhouse", "ClickHouse", packages), buildOptionalGoDriverDefinition("elasticsearch", "Elasticsearch", packages), } @@ -4081,6 +4085,8 @@ func optionalDriverBuildTag(driverType string, selectedVersion string) (string, return "gonavi_mongodb_driver", nil case "tdengine": return "gonavi_tdengine_driver", nil + case "iotdb": + return "gonavi_iotdb_driver", nil case "clickhouse": return "gonavi_clickhouse_driver", nil case "elasticsearch": diff --git a/internal/app/methods_driver_agent_revision_test.go b/internal/app/methods_driver_agent_revision_test.go index 4029d74..d1ff60e 100644 --- a/internal/app/methods_driver_agent_revision_test.go +++ b/internal/app/methods_driver_agent_revision_test.go @@ -188,6 +188,7 @@ func optionalDriverAgentRevisionTestDrivers(t *testing.T) []string { "iris", "mongodb", "tdengine", + "iotdb", "clickhouse", "elasticsearch", } diff --git a/internal/app/methods_driver_version_test.go b/internal/app/methods_driver_version_test.go index ee861d7..f62c78d 100644 --- a/internal/app/methods_driver_version_test.go +++ b/internal/app/methods_driver_version_test.go @@ -469,6 +469,39 @@ func TestElasticsearchDriverDefinitionUsesOptionalAgent(t *testing.T) { } } +func TestIoTDBDriverDefinitionUsesOptionalAgent(t *testing.T) { + definition, ok := resolveDriverDefinition("iotdb") + if !ok { + t.Fatal("expected iotdb driver definition") + } + if definition.Name != "Apache IoTDB" { + t.Fatalf("unexpected iotdb driver name: %q", definition.Name) + } + if definition.BuiltIn { + t.Fatal("expected iotdb to be an optional driver agent") + } + if driverGoModulePathMap["iotdb"] != "github.com/apache/iotdb-client-go" { + t.Fatalf("unexpected iotdb go module path: %q", driverGoModulePathMap["iotdb"]) + } + if definition.PinnedVersion != "1.3.7" { + t.Fatalf("unexpected iotdb definition pinned version: %q", definition.PinnedVersion) + } + if definition.DefaultDownloadURL != "builtin://activate/iotdb" { + t.Fatalf("unexpected iotdb default download URL: %q", definition.DefaultDownloadURL) + } + if latestDriverVersionMap["iotdb"] != "1.3.7" { + t.Fatalf("unexpected iotdb pinned version: %q", latestDriverVersionMap["iotdb"]) + } + + tags, err := optionalDriverBuildTags("iotdb", "") + if err != nil { + t.Fatalf("resolve iotdb build tags failed: %v", err) + } + if tags != "gonavi_iotdb_driver" { + t.Fatalf("unexpected iotdb build tag: %q", tags) + } +} + func TestBuildOptionalDriverInstallPlanMessagePrefersDirectThenBundle(t *testing.T) { message := buildOptionalDriverInstallPlanMessage("SQL Server", "1.9.6", false, false, false, false, 1, 2) if !strings.Contains(message, "先尝试 1 个预编译直链") { diff --git a/internal/db/database_optional_factories_full.go b/internal/db/database_optional_factories_full.go index 41fc59b..5389950 100644 --- a/internal/db/database_optional_factories_full.go +++ b/internal/db/database_optional_factories_full.go @@ -19,6 +19,7 @@ func registerOptionalDatabaseFactories() { registerDatabaseFactory(newOptionalDriverAgentDatabase("iris"), "iris", "intersystems") registerDatabaseFactory(newOptionalDriverAgentDatabase("mongodb"), "mongodb") registerDatabaseFactory(newOptionalDriverAgentDatabase("tdengine"), "tdengine") + registerDatabaseFactory(newOptionalDriverAgentDatabase("iotdb"), "iotdb", "apache-iotdb", "apache_iotdb") registerDatabaseFactory(newOptionalDriverAgentDatabase("clickhouse"), "clickhouse") registerDatabaseFactory(newOptionalDriverAgentDatabase("elasticsearch"), "elasticsearch", "elastic") } diff --git a/internal/db/database_optional_factories_lite.go b/internal/db/database_optional_factories_lite.go index 3e7e64d..b964f79 100644 --- a/internal/db/database_optional_factories_lite.go +++ b/internal/db/database_optional_factories_lite.go @@ -19,6 +19,7 @@ func registerOptionalDatabaseFactories() { registerDatabaseFactory(newOptionalDriverAgentDatabase("iris"), "iris", "intersystems") registerDatabaseFactory(newOptionalDriverAgentDatabase("mongodb"), "mongodb") registerDatabaseFactory(newOptionalDriverAgentDatabase("tdengine"), "tdengine") + registerDatabaseFactory(newOptionalDriverAgentDatabase("iotdb"), "iotdb", "apache-iotdb", "apache_iotdb") registerDatabaseFactory(newOptionalDriverAgentDatabase("clickhouse"), "clickhouse") registerDatabaseFactory(newOptionalDriverAgentDatabase("elasticsearch"), "elasticsearch", "elastic") } diff --git a/internal/db/driver_agent_revisions_gen.go b/internal/db/driver_agent_revisions_gen.go index 27a2ee0..561a773 100644 --- a/internal/db/driver_agent_revisions_gen.go +++ b/internal/db/driver_agent_revisions_gen.go @@ -20,6 +20,7 @@ func init() { "iris": "src-1b072c57af08bec4", "mongodb": "src-57fdd8bfebdcd46e", "tdengine": "src-939715f94df1ec9c", + "iotdb": "src-473c39891f926db2", "clickhouse": "src-482d62ed565b3e69", "elasticsearch": "src-2fb00b94d7067c56", } diff --git a/internal/db/driver_support.go b/internal/db/driver_support.go index 0a9a688..ecf2b32 100644 --- a/internal/db/driver_support.go +++ b/internal/db/driver_support.go @@ -39,6 +39,7 @@ var optionalGoDrivers = map[string]struct{}{ "iris": {}, "mongodb": {}, "tdengine": {}, + "iotdb": {}, "clickhouse": {}, "elasticsearch": {}, } @@ -72,6 +73,8 @@ func normalizeRuntimeDriverType(driverType string) string { return "chroma" case "qdrantdb", "qdrant-db": return "qdrant" + case "apache-iotdb", "apache_iotdb", "iotdb": + return "iotdb" default: return normalized } @@ -119,6 +122,8 @@ func driverDisplayName(driverType string) string { return "MongoDB" case "tdengine": return "TDengine" + case "iotdb": + return "Apache IoTDB" case "clickhouse": return "ClickHouse" case "elasticsearch": diff --git a/internal/db/iotdb_impl.go b/internal/db/iotdb_impl.go new file mode 100644 index 0000000..decf3b3 --- /dev/null +++ b/internal/db/iotdb_impl.go @@ -0,0 +1,699 @@ +//go:build gonavi_full_drivers || gonavi_iotdb_driver + +package db + +import ( + "context" + "fmt" + "net" + "net/url" + "sort" + "strconv" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/logger" + "GoNavi-Wails/internal/ssh" + + iotdbclient "github.com/apache/iotdb-client-go/client" +) + +const ( + defaultIoTDBPort = 6667 + defaultIoTDBUser = "root" + defaultIoTDBPassword = "root" + defaultIoTDBQueryTimeout = 30 * time.Second +) + +type iotdbDataSet interface { + Next() (bool, error) + Close() error + IsNull(columnName string) (bool, error) + GetObject(columnName string) (interface{}, error) + GetColumnNames() []string +} + +type iotdbSessionRunner interface { + Close() error + Query(ctx context.Context, sql string, timeoutMs *int64) (iotdbDataSet, error) + Exec(ctx context.Context, sql string) error +} + +type iotdbClientSession struct { + session *iotdbclient.Session +} + +func (s *iotdbClientSession) Close() error { + if s == nil || s.session == nil { + return nil + } + return s.session.Close() +} + +func (s *iotdbClientSession) Query(ctx context.Context, sql string, timeoutMs *int64) (iotdbDataSet, error) { + if s == nil || s.session == nil { + return nil, fmt.Errorf("连接未打开") + } + return s.session.ExecuteQueryStatement(sql, timeoutMs) +} + +func (s *iotdbClientSession) Exec(ctx context.Context, sql string) error { + if s == nil || s.session == nil { + return fmt.Errorf("连接未打开") + } + return s.session.ExecuteNonQueryStatement(sql) +} + +var newIoTDBSessionRunner = func(config connection.ConnectionConfig) (iotdbSessionRunner, error) { + params := iotdbConnectionParams(config) + user := strings.TrimSpace(config.User) + if user == "" { + user = defaultIoTDBUser + } + password := config.Password + if password == "" { + password = defaultIoTDBPassword + } + fetchSize := int32(intFromAny(params.Get("fetchSize"), iotdbclient.DefaultFetchSize)) + if fetchSize <= 0 { + fetchSize = iotdbclient.DefaultFetchSize + } + timeZone := strings.TrimSpace(firstNonEmpty(params.Get("timeZone"), params.Get("timezone"), params.Get("zoneId"))) + if timeZone == "" { + timeZone = iotdbclient.DefaultTimeZone + } + retryMax := intFromAny(firstNonEmpty(params.Get("connectRetryMax"), params.Get("retryMax")), iotdbclient.DefaultConnectRetryMax) + if retryMax <= 0 { + retryMax = iotdbclient.DefaultConnectRetryMax + } + enableCompression := getOrBool(map[string]interface{}{"rpcCompression": params.Get("rpcCompression")}, "rpcCompression") + cfg := &iotdbclient.Config{ + Host: strings.TrimSpace(config.Host), + Port: strconv.Itoa(config.Port), + UserName: user, + Password: password, + FetchSize: fetchSize, + TimeZone: timeZone, + ConnectRetryMax: retryMax, + } + session := iotdbclient.NewSession(cfg) + timeoutMs := getConnectTimeout(config).Milliseconds() + if timeoutMs < 0 { + timeoutMs = 0 + } + if err := session.Open(enableCompression, int(timeoutMs)); err != nil { + return nil, err + } + return &iotdbClientSession{session: &session}, nil +} + +// IoTDBDB implements Database for Apache IoTDB through the official Session API. +type IoTDBDB struct { + session iotdbSessionRunner + forwarder *ssh.LocalForwarder + pingTimeout time.Duration +} + +func (i *IoTDBDB) Connect(config connection.ConnectionConfig) error { + if i.forwarder != nil { + _ = i.forwarder.Close() + i.forwarder = nil + } + i.session = nil + + runConfig := normalizeIoTDBConfig(config) + if runConfig.UseSSH { + forwarder, err := ssh.GetOrCreateLocalForwarder(runConfig.SSH, runConfig.Host, runConfig.Port) + if err != nil { + return fmt.Errorf("创建 SSH 隧道失败:%w", err) + } + i.forwarder = forwarder + + host, portText, err := net.SplitHostPort(forwarder.LocalAddr) + if err != nil { + return fmt.Errorf("解析本地转发地址失败:%w", err) + } + port, err := strconv.Atoi(portText) + if err != nil { + return fmt.Errorf("解析本地端口失败:%w", err) + } + runConfig.Host = host + runConfig.Port = port + runConfig.UseSSH = false + logger.Infof("IoTDB 通过本地端口转发连接:%s -> %s:%d", forwarder.LocalAddr, config.Host, config.Port) + } + + session, err := newIoTDBSessionRunner(runConfig) + if err != nil { + _ = i.Close() + return err + } + i.session = session + i.pingTimeout = getConnectTimeout(runConfig) + if err := i.Ping(); err != nil { + _ = i.Close() + return err + } + return nil +} + +func (i *IoTDBDB) Close() error { + if i.forwarder != nil { + if err := i.forwarder.Close(); err != nil { + logger.Warnf("关闭 IoTDB SSH 端口转发失败:%v", err) + } + i.forwarder = nil + } + if i.session != nil { + err := i.session.Close() + i.session = nil + return err + } + return nil +} + +func (i *IoTDBDB) Ping() error { + if i.session == nil { + return fmt.Errorf("连接未打开") + } + ctx, cancel := context.WithTimeout(context.Background(), i.effectiveTimeout()) + defer cancel() + ds, err := i.session.Query(ctx, "SHOW VERSION", nil) + if err != nil { + return err + } + return ds.Close() +} + +func (i *IoTDBDB) Query(query string) ([]map[string]interface{}, []string, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultIoTDBQueryTimeout) + defer cancel() + return i.QueryContext(ctx, query) +} + +func (i *IoTDBDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) { + if i.session == nil { + return nil, nil, fmt.Errorf("连接未打开") + } + text := strings.TrimSpace(query) + if text == "" { + return nil, nil, fmt.Errorf("查询语句不能为空") + } + timeoutMs := int64(i.effectiveTimeout().Milliseconds()) + ds, err := i.session.Query(ctx, text, &timeoutMs) + if err != nil { + return nil, nil, err + } + return scanIoTDBDataSet(ds) +} + +func (i *IoTDBDB) Exec(query string) (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultIoTDBQueryTimeout) + defer cancel() + return i.ExecContext(ctx, query) +} + +func (i *IoTDBDB) ExecContext(ctx context.Context, query string) (int64, error) { + if i.session == nil { + return 0, fmt.Errorf("连接未打开") + } + text := strings.TrimSpace(query) + if text == "" { + return 0, fmt.Errorf("执行语句不能为空") + } + if err := i.session.Exec(ctx, text); err != nil { + return 0, err + } + return 0, nil +} + +func (i *IoTDBDB) GetDatabases() ([]string, error) { + queries := []string{"SHOW DATABASES", "SHOW STORAGE GROUPS", "SHOW STORAGE GROUP"} + var lastErr error + for _, query := range queries { + rows, _, err := i.Query(query) + if err != nil { + lastErr = err + continue + } + names := make([]string, 0, len(rows)) + for _, row := range rows { + name := firstRowString(row, "Database", "database", "Storage Group", "storage group", "storage_group", "name") + if name == "" { + name = firstAnyRowString(row) + } + if name != "" { + names = append(names, name) + } + } + if len(names) > 0 { + sort.Strings(names) + return names, nil + } + } + if lastErr != nil { + return nil, lastErr + } + return []string{}, nil +} + +func (i *IoTDBDB) GetTables(dbName string) ([]string, error) { + queries := []string{} + if pattern := iotdbDevicePattern(dbName); pattern != "" { + queries = append(queries, "SHOW DEVICES "+pattern) + } + queries = append(queries, "SHOW DEVICES") + + var lastErr error + seen := map[string]struct{}{} + tables := []string{} + for _, query := range queries { + rows, _, err := i.Query(query) + if err != nil { + lastErr = err + continue + } + for _, row := range rows { + name := firstRowString(row, "Device", "device", "devices", "Devices", "Path", "path") + if name == "" { + name = firstAnyRowString(row) + } + if name == "" { + continue + } + if _, exists := seen[name]; exists { + continue + } + seen[name] = struct{}{} + tables = append(tables, name) + } + if len(tables) > 0 { + sort.Strings(tables) + return tables, nil + } + } + if lastErr != nil { + return nil, lastErr + } + return []string{}, nil +} + +func (i *IoTDBDB) GetCreateStatement(dbName, tableName string) (string, error) { + device := resolveIoTDBDevicePath(dbName, tableName) + rows, _, err := i.Query("SHOW TIMESERIES " + iotdbTimeseriesPattern(device)) + if err != nil { + return "", err + } + statements := make([]string, 0, len(rows)) + for _, row := range rows { + path := firstRowString(row, "Timeseries", "timeseries", "Path", "path") + if path == "" { + path = firstAnyRowString(row) + } + if path == "" { + continue + } + dataType := firstRowString(row, "DataType", "dataType", "data_type", "Type", "type") + encoding := firstRowString(row, "Encoding", "encoding") + compression := firstRowString(row, "Compression", "compression", "Compressor", "compressor") + parts := []string{} + if dataType != "" { + parts = append(parts, "DATATYPE="+dataType) + } + if encoding != "" { + parts = append(parts, "ENCODING="+encoding) + } + if compression != "" { + parts = append(parts, "COMPRESSION="+compression) + } + if len(parts) == 0 { + statements = append(statements, "CREATE TIMESERIES "+path+";") + } else { + statements = append(statements, "CREATE TIMESERIES "+path+" WITH "+strings.Join(parts, ", ")+";") + } + } + if len(statements) == 0 { + return "", fmt.Errorf("未找到 IoTDB timeseries:%s", device) + } + return strings.Join(statements, "\n"), nil +} + +func (i *IoTDBDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { + device := resolveIoTDBDevicePath(dbName, tableName) + rows, _, err := i.Query("SHOW TIMESERIES " + iotdbTimeseriesPattern(device)) + if err != nil { + return nil, err + } + columns := []connection.ColumnDefinition{{ + Name: "Time", + Type: "TIMESTAMP", + Nullable: "NO", + Key: "PRI", + Comment: "IoTDB timestamp column", + }} + for _, row := range rows { + path := firstRowString(row, "Timeseries", "timeseries", "Path", "path") + if path == "" { + path = firstAnyRowString(row) + } + if path == "" { + continue + } + name := strings.TrimPrefix(path, strings.TrimRight(device, ".")+".") + dataType := firstRowString(row, "DataType", "dataType", "data_type", "Type", "type") + encoding := firstRowString(row, "Encoding", "encoding") + compression := firstRowString(row, "Compression", "compression", "Compressor", "compressor") + commentParts := []string{} + if encoding != "" { + commentParts = append(commentParts, "encoding="+encoding) + } + if compression != "" { + commentParts = append(commentParts, "compression="+compression) + } + columns = append(columns, connection.ColumnDefinition{ + Name: name, + Type: dataType, + Nullable: "YES", + Comment: strings.Join(commentParts, "; "), + }) + } + return columns, nil +} + +func (i *IoTDBDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) { + tables, err := i.GetTables(dbName) + if err != nil { + return nil, err + } + var result []connection.ColumnDefinitionWithTable + for _, table := range tables { + cols, err := i.GetColumns(dbName, table) + if err != nil { + continue + } + for _, col := range cols { + result = append(result, connection.ColumnDefinitionWithTable{ + TableName: table, + Name: col.Name, + Type: col.Type, + Comment: col.Comment, + }) + } + } + return result, nil +} + +func (i *IoTDBDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) { + return []connection.IndexDefinition{{Name: "TIME", ColumnName: "Time", NonUnique: 0, SeqInIndex: 1, IndexType: "TIME"}}, nil +} + +func (i *IoTDBDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) { + return []connection.ForeignKeyDefinition{}, nil +} + +func (i *IoTDBDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) { + return []connection.TriggerDefinition{}, nil +} + +func (i *IoTDBDB) ApplyChanges(tableName string, changes connection.ChangeSet) error { + if i.session == nil { + return fmt.Errorf("连接未打开") + } + if strings.TrimSpace(tableName) == "" { + return fmt.Errorf("设备路径不能为空") + } + if len(changes.Updates) > 0 || len(changes.Deletes) > 0 { + return fmt.Errorf("IoTDB 目标端当前仅支持 INSERT 写入,暂不支持 UPDATE/DELETE 差异同步") + } + for _, row := range changes.Inserts { + sqlText, err := buildIoTDBInsertSQL(tableName, row) + if err != nil { + return err + } + if strings.TrimSpace(sqlText) == "" { + continue + } + if _, err := i.Exec(sqlText); err != nil { + return err + } + } + return nil +} + +func normalizeIoTDBConfig(config connection.ConnectionConfig) connection.ConnectionConfig { + runConfig := applyIoTDBURI(config) + if strings.TrimSpace(runConfig.Host) == "" { + runConfig.Host = "localhost" + } + if runConfig.Port <= 0 { + runConfig.Port = defaultIoTDBPort + } + if strings.TrimSpace(runConfig.User) == "" { + runConfig.User = defaultIoTDBUser + } + if runConfig.Password == "" { + runConfig.Password = defaultIoTDBPassword + } + return runConfig +} + +func applyIoTDBURI(config connection.ConnectionConfig) connection.ConnectionConfig { + uriText := strings.TrimSpace(config.URI) + if uriText == "" { + return config + } + parsed, err := url.Parse(uriText) + if err != nil { + return config + } + scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme)) + if scheme != "iotdb" { + return config + } + if parsed.User != nil { + if strings.TrimSpace(config.User) == "" { + config.User = parsed.User.Username() + } + if pass, ok := parsed.User.Password(); ok && config.Password == "" { + config.Password = pass + } + } + if host := strings.TrimSpace(parsed.Host); host != "" { + if h, port, ok := parseHostPortWithDefault(host, defaultIoTDBPort); ok { + config.Host = h + config.Port = port + } + } + if dbName := strings.Trim(strings.TrimSpace(parsed.Path), "/"); dbName != "" && strings.TrimSpace(config.Database) == "" { + config.Database = dbName + } + return config +} + +func iotdbConnectionParams(config connection.ConnectionConfig) url.Values { + params := url.Values{} + mergeConnectionParamValues(params, connectionParamsFromURI(config.URI, "iotdb")) + mergeConnectionParamValues(params, connectionParamsFromText(config.ConnectionParams)) + return params +} + +func (i *IoTDBDB) effectiveTimeout() time.Duration { + if i.pingTimeout > 0 { + return i.pingTimeout + } + return defaultIoTDBQueryTimeout +} + +func scanIoTDBDataSet(ds iotdbDataSet) ([]map[string]interface{}, []string, error) { + if ds == nil { + return nil, nil, nil + } + defer ds.Close() + columns := ds.GetColumnNames() + rows := make([]map[string]interface{}, 0) + for { + hasNext, err := ds.Next() + if err != nil { + return nil, nil, err + } + if !hasNext { + break + } + row := make(map[string]interface{}, len(columns)) + for _, column := range columns { + isNull, err := ds.IsNull(column) + if err == nil && isNull { + row[column] = nil + continue + } + value, err := ds.GetObject(column) + if err != nil { + row[column] = nil + continue + } + row[column] = normalizeIoTDBValue(value) + } + rows = append(rows, row) + } + return rows, columns, nil +} + +func normalizeIoTDBValue(value interface{}) interface{} { + switch v := value.(type) { + case time.Time: + return v.Format(time.RFC3339Nano) + case *iotdbclient.Binary: + if v == nil { + return nil + } + return v.GetStringValue() + case fmt.Stringer: + return v.String() + default: + return value + } +} + +func iotdbDevicePattern(dbName string) string { + db := strings.Trim(strings.TrimSpace(dbName), ".") + if db == "" { + return "" + } + if strings.HasSuffix(db, ".**") || strings.HasSuffix(db, ".*") { + return db + } + return db + ".**" +} + +func iotdbTimeseriesPattern(device string) string { + path := strings.Trim(strings.TrimSpace(device), ".") + if path == "" { + return "root.**" + } + if strings.HasSuffix(path, ".**") || strings.HasSuffix(path, ".*") { + return path + } + return path + ".*" +} + +func resolveIoTDBDevicePath(dbName, tableName string) string { + table := strings.Trim(strings.TrimSpace(tableName), ".") + if table == "" { + return strings.Trim(strings.TrimSpace(dbName), ".") + } + if strings.HasPrefix(strings.ToLower(table), "root.") || strings.TrimSpace(dbName) == "" { + return table + } + return strings.Trim(strings.TrimSpace(dbName), ".") + "." + table +} + +func firstRowString(row map[string]interface{}, keys ...string) string { + for _, key := range keys { + for actual, value := range row { + if strings.EqualFold(actual, key) { + text := strings.TrimSpace(fmt.Sprintf("%v", value)) + if text != "" && text != "" { + return text + } + } + } + } + return "" +} + +func firstAnyRowString(row map[string]interface{}) string { + keys := make([]string, 0, len(row)) + for key := range row { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + text := strings.TrimSpace(fmt.Sprintf("%v", row[key])) + if text != "" && text != "" { + return text + } + } + return "" +} + +func buildIoTDBInsertSQL(device string, row map[string]interface{}) (string, error) { + path := strings.TrimSpace(device) + if path == "" { + return "", fmt.Errorf("设备路径不能为空") + } + timestamp, ok := iotdbTimestampValue(row) + if !ok { + return "", fmt.Errorf("IoTDB INSERT 行缺少 Time/time/timestamp 字段") + } + measurements := make([]string, 0, len(row)) + for key := range row { + if strings.TrimSpace(key) == "" || isIoTDBTimestampColumn(key) { + continue + } + measurements = append(measurements, key) + } + if len(measurements) == 0 { + return "", nil + } + sort.Strings(measurements) + + columns := append([]string{"timestamp"}, measurements...) + values := []string{iotdbTimestampLiteral(timestamp)} + for _, measurement := range measurements { + values = append(values, iotdbLiteral(row[measurement])) + } + return fmt.Sprintf("INSERT INTO %s(%s) VALUES(%s)", path, strings.Join(columns, ", "), strings.Join(values, ", ")), nil +} + +func iotdbTimestampValue(row map[string]interface{}) (interface{}, bool) { + for key, value := range row { + if isIoTDBTimestampColumn(key) { + return value, true + } + } + return nil, false +} + +func isIoTDBTimestampColumn(column string) bool { + switch strings.ToLower(strings.TrimSpace(column)) { + case "time", "timestamp", "_time": + return true + default: + return false + } +} + +func iotdbTimestampLiteral(value interface{}) string { + switch v := value.(type) { + case time.Time: + return strconv.FormatInt(v.UnixMilli(), 10) + case string: + text := strings.TrimSpace(v) + if _, err := strconv.ParseInt(text, 10, 64); err == nil { + return text + } + return iotdbLiteral(text) + default: + return fmt.Sprintf("%v", value) + } +} + +func iotdbLiteral(value interface{}) string { + switch v := value.(type) { + case nil: + return "null" + case bool: + if v { + return "true" + } + return "false" + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: + return fmt.Sprintf("%v", v) + case time.Time: + return strconv.FormatInt(v.UnixMilli(), 10) + default: + text := fmt.Sprintf("%v", v) + return "'" + strings.ReplaceAll(text, "'", "''") + "'" + } +} diff --git a/internal/db/iotdb_impl_test.go b/internal/db/iotdb_impl_test.go new file mode 100644 index 0000000..d648f01 --- /dev/null +++ b/internal/db/iotdb_impl_test.go @@ -0,0 +1,269 @@ +//go:build gonavi_full_drivers || gonavi_iotdb_driver + +package db + +import ( + "context" + "os" + "reflect" + "strconv" + "strings" + "testing" + + "GoNavi-Wails/internal/connection" + + iotdbclient "github.com/apache/iotdb-client-go/client" +) + +type fakeIoTDBSession struct { + queryResults map[string][]map[string]interface{} + execs []string +} + +func (f *fakeIoTDBSession) Close() error { return nil } + +func (f *fakeIoTDBSession) Query(_ context.Context, sql string, _ *int64) (iotdbDataSet, error) { + rows := f.queryResults[sql] + return &fakeIoTDBDataSet{rows: rows, columns: fakeIoTDBColumns(rows)}, nil +} + +func (f *fakeIoTDBSession) Exec(_ context.Context, sql string) error { + f.execs = append(f.execs, sql) + return nil +} + +type fakeIoTDBDataSet struct { + rows []map[string]interface{} + columns []string + index int +} + +func (f *fakeIoTDBDataSet) Next() (bool, error) { + if f.index >= len(f.rows) { + return false, nil + } + f.index++ + return true, nil +} + +func (f *fakeIoTDBDataSet) Close() error { return nil } + +func (f *fakeIoTDBDataSet) IsNull(columnName string) (bool, error) { + value, ok := f.currentRow()[columnName] + return !ok || value == nil, nil +} + +func (f *fakeIoTDBDataSet) GetObject(columnName string) (interface{}, error) { + return f.currentRow()[columnName], nil +} + +func (f *fakeIoTDBDataSet) GetColumnNames() []string { return append([]string(nil), f.columns...) } + +func (f *fakeIoTDBDataSet) currentRow() map[string]interface{} { + if f.index <= 0 || f.index > len(f.rows) { + return map[string]interface{}{} + } + return f.rows[f.index-1] +} + +func fakeIoTDBColumns(rows []map[string]interface{}) []string { + seen := map[string]struct{}{} + columns := []string{} + for _, row := range rows { + for key := range row { + if _, exists := seen[key]; exists { + continue + } + seen[key] = struct{}{} + columns = append(columns, key) + } + } + return columns +} + +func TestIoTDBMetadataMapsStorageGroupsDevicesAndTimeseries(t *testing.T) { + session := &fakeIoTDBSession{queryResults: map[string][]map[string]interface{}{ + "SHOW DATABASES": { + {"Database": "root.zeta"}, + {"Database": "root.sg"}, + }, + "SHOW DEVICES root.sg.**": { + {"Device": "root.sg.d2"}, + {"Device": "root.sg.d1"}, + }, + "SHOW TIMESERIES root.sg.d1.*": { + { + "Timeseries": "root.sg.d1.temperature", + "DataType": "DOUBLE", + "Encoding": "GORILLA", + "Compression": "SNAPPY", + }, + { + "Timeseries": "root.sg.d1.status", + "DataType": "TEXT", + "Encoding": "PLAIN", + "Compression": "SNAPPY", + }, + }, + }} + client := &IoTDBDB{session: session} + + databases, err := client.GetDatabases() + if err != nil { + t.Fatalf("GetDatabases: %v", err) + } + if !reflect.DeepEqual(databases, []string{"root.sg", "root.zeta"}) { + t.Fatalf("unexpected databases: %#v", databases) + } + + tables, err := client.GetTables("root.sg") + if err != nil { + t.Fatalf("GetTables: %v", err) + } + if !reflect.DeepEqual(tables, []string{"root.sg.d1", "root.sg.d2"}) { + t.Fatalf("unexpected tables: %#v", tables) + } + + columns, err := client.GetColumns("root.sg", "root.sg.d1") + if err != nil { + t.Fatalf("GetColumns: %v", err) + } + if len(columns) != 3 { + t.Fatalf("unexpected columns: %#v", columns) + } + if columns[0].Name != "Time" || columns[0].Type != "TIMESTAMP" || columns[0].Key != "PRI" { + t.Fatalf("unexpected time column: %#v", columns[0]) + } + if columns[1].Name != "temperature" || columns[1].Type != "DOUBLE" || !strings.Contains(columns[1].Comment, "encoding=GORILLA") { + t.Fatalf("unexpected measurement column: %#v", columns[1]) + } + + ddl, err := client.GetCreateStatement("root.sg", "root.sg.d1") + if err != nil { + t.Fatalf("GetCreateStatement: %v", err) + } + if !strings.Contains(ddl, "CREATE TIMESERIES root.sg.d1.temperature WITH DATATYPE=DOUBLE, ENCODING=GORILLA, COMPRESSION=SNAPPY;") { + t.Fatalf("unexpected DDL: %s", ddl) + } +} + +func TestIoTDBApplyChangesBuildsInsertAndRejectsMutatingDiffs(t *testing.T) { + session := &fakeIoTDBSession{} + client := &IoTDBDB{session: session} + + err := client.ApplyChanges("root.sg.d1", connection.ChangeSet{ + Inserts: []map[string]interface{}{ + { + "Time": int64(1700000000000), + "temperature": 23.5, + "status": "ok", + "active": true, + }, + }, + }) + if err != nil { + t.Fatalf("ApplyChanges insert: %v", err) + } + expected := "INSERT INTO root.sg.d1(timestamp, active, status, temperature) VALUES(1700000000000, true, 'ok', 23.5)" + if !reflect.DeepEqual(session.execs, []string{expected}) { + t.Fatalf("unexpected execs: %#v", session.execs) + } + + err = client.ApplyChanges("root.sg.d1", connection.ChangeSet{ + Updates: []connection.UpdateRow{{}}, + }) + if err == nil || !strings.Contains(err.Error(), "仅支持 INSERT") { + t.Fatalf("expected update rejection, got %v", err) + } + + err = client.ApplyChanges("root.sg.d1", connection.ChangeSet{ + Deletes: []map[string]interface{}{{"Time": int64(1700000000000)}}, + }) + if err == nil || !strings.Contains(err.Error(), "仅支持 INSERT") { + t.Fatalf("expected delete rejection, got %v", err) + } +} + +func TestIoTDBConfigParsesURIAndConnectionParams(t *testing.T) { + config := normalizeIoTDBConfig(connection.ConnectionConfig{ + URI: "iotdb://alice:secret@iotdb.local:16667/root.sg?fetchSize=2048&timeZone=Asia%2FShanghai", + }) + if config.Host != "iotdb.local" || config.Port != 16667 || config.User != "alice" || config.Password != "secret" || config.Database != "root.sg" { + t.Fatalf("unexpected config: %#v", config) + } + + params := iotdbConnectionParams(connection.ConnectionConfig{ + URI: config.URI, + ConnectionParams: "connectRetryMax=3&rpcCompression=true", + }) + if params.Get("fetchSize") != "2048" || params.Get("timeZone") != "Asia/Shanghai" || params.Get("connectRetryMax") != "3" || params.Get("rpcCompression") != "true" { + t.Fatalf("unexpected params: %#v", params) + } +} + +func TestNormalizeIoTDBValueConvertsBinaryText(t *testing.T) { + if got := normalizeIoTDBValue(iotdbclient.NewBinary([]byte("ok"))); got != "ok" { + t.Fatalf("expected binary text to become string, got %#v", got) + } +} + +func TestIoTDBLiveSmoke(t *testing.T) { + addr := strings.TrimSpace(os.Getenv("GONAVI_IOTDB_TEST_ADDR")) + if addr == "" { + t.Skip("set GONAVI_IOTDB_TEST_ADDR=host:port to run live IoTDB smoke test") + } + host, portText, ok := strings.Cut(addr, ":") + if !ok || strings.TrimSpace(host) == "" || strings.TrimSpace(portText) == "" { + t.Fatalf("invalid GONAVI_IOTDB_TEST_ADDR: %q", addr) + } + port, err := strconv.Atoi(strings.TrimSpace(portText)) + if err != nil { + t.Fatalf("invalid IoTDB port: %v", err) + } + + client := &IoTDBDB{} + if err := client.Connect(connection.ConnectionConfig{ + Type: "iotdb", + Host: strings.TrimSpace(host), + Port: port, + User: "root", + Password: "root", + Timeout: 15, + }); err != nil { + t.Fatalf("connect iotdb: %v", err) + } + defer client.Close() + + _, _ = client.ExecContext(context.Background(), "DELETE DATABASE root.gonavi_smoke") + _, _ = client.ExecContext(context.Background(), "DROP DATABASE root.gonavi_smoke") + + if _, err := client.Exec("CREATE DATABASE root.gonavi_smoke"); err != nil { + t.Fatalf("create database: %v", err) + } + defer func() { + _, _ = client.Exec("DELETE DATABASE root.gonavi_smoke") + _, _ = client.Exec("DROP DATABASE root.gonavi_smoke") + }() + + statements := []string{ + "CREATE TIMESERIES root.gonavi_smoke.d1.temperature WITH DATATYPE=DOUBLE, ENCODING=GORILLA, COMPRESSION=SNAPPY", + "CREATE TIMESERIES root.gonavi_smoke.d1.status WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSION=SNAPPY", + "INSERT INTO root.gonavi_smoke.d1(timestamp, temperature, status) VALUES(1700000000000, 21.5, 'ok')", + } + for _, stmt := range statements { + if _, err := client.Exec(stmt); err != nil { + t.Fatalf("exec %q: %v", stmt, err) + } + } + + rows, columns, err := client.Query("SELECT temperature, status FROM root.gonavi_smoke.d1 LIMIT 10") + if err != nil { + t.Fatalf("query smoke data: %v", err) + } + if len(rows) != 1 { + t.Fatalf("expected one row, got rows=%#v columns=%#v", rows, columns) + } + if got := rows[0]["root.gonavi_smoke.d1.status"]; got != "ok" { + t.Fatalf("unexpected status value: %#v rows=%#v columns=%#v", got, rows, columns) + } +} diff --git a/internal/sync/schema_migration.go b/internal/sync/schema_migration.go index 1e1adc3..9336c56 100644 --- a/internal/sync/schema_migration.go +++ b/internal/sync/schema_migration.go @@ -105,6 +105,8 @@ func buildSchemaMigrationPlanLegacy(config SyncConfig, tableName string, sourceD plan.PlannedAction = "使用已有目标表导入" if targetType == "tdengine" { plan.Warnings = append(plan.Warnings, "TDengine 目标端当前仅支持 INSERT 写入;若存在差异更新/删除,执行期会被拒绝,请优先使用仅插入或全量覆盖模式") + } else if targetType == "iotdb" { + plan.Warnings = append(plan.Warnings, "IoTDB 目标端当前仅支持 INSERT 写入;若存在差异更新/删除,执行期会被拒绝,请优先使用仅插入模式") } sourceCols, sourceExists, err := inspectTableColumns(sourceDB, plan.SourceSchema, plan.SourceTable) diff --git a/internal/sync/schema_migration_test.go b/internal/sync/schema_migration_test.go index 1539fce..e17141a 100644 --- a/internal/sync/schema_migration_test.go +++ b/internal/sync/schema_migration_test.go @@ -920,6 +920,41 @@ func TestBuildSchemaMigrationPlan_TDengineTargetWarnsInsertOnlyBoundary(t *testi } } +func TestBuildSchemaMigrationPlan_IoTDBTargetWarnsInsertOnlyBoundary(t *testing.T) { + t.Parallel() + + sourceDB := &fakeMigrationDB{ + columns: map[string][]connection.ColumnDefinition{ + "shop.metrics": { + {Name: "Time", Type: "timestamp", Nullable: "NO", Key: "PRI"}, + {Name: "value", Type: "double", Nullable: "YES"}, + }, + }, + } + targetDB := &fakeMigrationDB{ + columns: map[string][]connection.ColumnDefinition{ + "root.sg.metrics": { + {Name: "Time", Type: "timestamp", Nullable: "NO", Key: "PRI"}, + {Name: "value", Type: "double", Nullable: "YES"}, + }, + }, + } + cfg := SyncConfig{ + SourceConfig: connection.ConnectionConfig{Type: "mysql", Database: "shop"}, + TargetConfig: connection.ConnectionConfig{Type: "iotdb", Database: "root.sg"}, + Mode: "insert_update", + } + + plan, _, _, err := buildSchemaMigrationPlan(cfg, "metrics", sourceDB, targetDB) + if err != nil { + t.Fatalf("buildSchemaMigrationPlan returned error: %v", err) + } + warnings := strings.Join(plan.Warnings, " ") + if !strings.Contains(warnings, "仅支持 INSERT 写入") { + t.Fatalf("expected IoTDB target warning, got: %v", plan.Warnings) + } +} + func TestBuildMySQLLikeToTDenginePlan_AutoCreateWhenTargetMissing(t *testing.T) { t.Parallel() diff --git a/tools/complete-driver-release-assets.py b/tools/complete-driver-release-assets.py index 07ab366..c31ea2c 100644 --- a/tools/complete-driver-release-assets.py +++ b/tools/complete-driver-release-assets.py @@ -30,6 +30,7 @@ DRIVERS = [ "iris", "mongodb", "tdengine", + "iotdb", "clickhouse", "elasticsearch", ] diff --git a/tools/detect-changed-driver-agents.sh b/tools/detect-changed-driver-agents.sh index fedb152..75484be 100644 --- a/tools/detect-changed-driver-agents.sh +++ b/tools/detect-changed-driver-agents.sh @@ -7,7 +7,7 @@ cd "$SCRIPT_DIR" SCRIPT_DIR_WINDOWS="$(pwd -W 2>/dev/null || true)" SCRIPT_DIR_WINDOWS="${SCRIPT_DIR_WINDOWS//\\//}" -DEFAULT_DRIVERS=(mariadb oceanbase doris starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine clickhouse elasticsearch) +DEFAULT_DRIVERS=(mariadb oceanbase doris starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine iotdb clickhouse elasticsearch) TARGET_PLATFORMS=(darwin/amd64 darwin/arm64 windows/amd64 windows/arm64 linux/amd64) usage() { @@ -53,7 +53,7 @@ normalize_driver() { doris|diros) echo "doris" ;; open_gauss|open-gauss) echo "opengauss" ;; elastic|elasticsearch) echo "elasticsearch" ;; - mariadb|oceanbase|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|opengauss|iris|mongodb|tdengine|clickhouse) + mariadb|oceanbase|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|opengauss|iris|mongodb|tdengine|iotdb|clickhouse) echo "$value" ;; *) @@ -160,6 +160,7 @@ driver_tokens_from_text() { case "$text" in *iris*) emit_driver_token iris ;; esac case "$text" in *mongodb*) emit_driver_token mongodb ;; esac case "$text" in *tdengine*) emit_driver_token tdengine ;; esac + case "$text" in *iotdb*|*apache-iotdb*|*apache_iotdb*) emit_driver_token iotdb ;; esac case "$text" in *clickhouse*) emit_driver_token clickhouse ;; esac case "$text" in *elasticsearch*) emit_driver_token elasticsearch ;; esac @@ -187,6 +188,7 @@ driver_tokens_from_text() { case "$text" in *github.com/caretdev/go-irisnative*|*third_party/go-irisnative*) emit_driver_token iris ;; esac case "$text" in *go.mongodb.org/mongo-driver*|*go.mongodb.org/mongo-driver/v2*) emit_driver_token mongodb ;; esac case "$text" in *github.com/taosdata/driver-go/v3*) emit_driver_token tdengine ;; esac + case "$text" in *github.com/apache/iotdb-client-go*) emit_driver_token iotdb ;; esac case "$text" in *github.com/clickhouse/clickhouse-go/v2*|*github.com/clickhouse/ch-go*) emit_driver_token clickhouse ;; esac case "$text" in *github.com/elastic/go-elasticsearch/v8*) emit_driver_token elasticsearch ;; esac } diff --git a/tools/diff-driver-agent-revisions.sh b/tools/diff-driver-agent-revisions.sh index c88959d..6870b0e 100644 --- a/tools/diff-driver-agent-revisions.sh +++ b/tools/diff-driver-agent-revisions.sh @@ -5,7 +5,7 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" cd "$SCRIPT_DIR" -DEFAULT_DRIVERS=(mariadb oceanbase diros starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine clickhouse elasticsearch) +DEFAULT_DRIVERS=(mariadb oceanbase diros starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine iotdb clickhouse elasticsearch) usage() { cat <<'EOF' diff --git a/tools/generate-driver-agent-revisions.sh b/tools/generate-driver-agent-revisions.sh index 10b0ca5..5242404 100755 --- a/tools/generate-driver-agent-revisions.sh +++ b/tools/generate-driver-agent-revisions.sh @@ -7,7 +7,7 @@ cd "$SCRIPT_DIR" SCRIPT_DIR_WINDOWS="$(pwd -W 2>/dev/null || true)" SCRIPT_DIR_WINDOWS="${SCRIPT_DIR_WINDOWS//\\//}" -DEFAULT_DRIVERS=(mariadb oceanbase diros starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine clickhouse elasticsearch) +DEFAULT_DRIVERS=(mariadb oceanbase diros starrocks sphinx sqlserver sqlite duckdb dameng kingbase highgo vastbase opengauss iris mongodb tdengine iotdb clickhouse elasticsearch) OUTPUT_FILE="internal/db/driver_agent_revisions_gen.go" usage() { @@ -30,7 +30,7 @@ normalize_driver() { oceanbase) echo "oceanbase" ;; opengauss|open_gauss|open-gauss) echo "opengauss" ;; elasticsearch|elastic) echo "elasticsearch" ;; - mariadb|diros|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|iris|mongodb|tdengine|clickhouse) + mariadb|diros|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|iris|mongodb|tdengine|iotdb|clickhouse) echo "$value" ;; *) @@ -134,6 +134,7 @@ iris:internal/db/iris_impl.go|\ mongodb:internal/db/mongodb_impl.go|\ mongodb:internal/db/mongodb_impl_v1.go|\ tdengine:internal/db/tdengine_impl.go|\ +iotdb:internal/db/iotdb_impl.go|\ clickhouse:internal/db/clickhouse_impl.go|\ elasticsearch:internal/db/elasticsearch_impl.go|\ elasticsearch:internal/db/elasticsearch_helpers.go) diff --git a/tools/verify-driver-agent-revisions.sh b/tools/verify-driver-agent-revisions.sh index b723661..01a4faf 100755 --- a/tools/verify-driver-agent-revisions.sh +++ b/tools/verify-driver-agent-revisions.sh @@ -70,7 +70,7 @@ normalize_driver() { doris|diros) echo "diros" ;; opengauss|open_gauss|open-gauss) echo "opengauss" ;; elasticsearch|elastic) echo "elasticsearch" ;; - mariadb|oceanbase|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|iris|mongodb|tdengine|clickhouse) + mariadb|oceanbase|starrocks|sphinx|sqlserver|sqlite|duckdb|dameng|kingbase|highgo|vastbase|iris|mongodb|tdengine|iotdb|clickhouse) echo "$value" ;; *)