diff --git a/frontend/src/components/ConnectionModal.edit-password.test.tsx b/frontend/src/components/ConnectionModal.edit-password.test.tsx
index 429b214..1cd1f0a 100644
--- a/frontend/src/components/ConnectionModal.edit-password.test.tsx
+++ b/frontend/src/components/ConnectionModal.edit-password.test.tsx
@@ -33,10 +33,10 @@ describe('ConnectionModal data source registry', () => {
expect(source).toContain('type === "elasticsearch"');
expect(source).toContain("return '支持索引浏览、Mapping 检查、JSON DSL 和 query_string 查询';");
expect(source).toContain(
- 'type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka" || type === "rabbitmq") ? "" : "root";',
+ 'type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "mqtt" || type === "kafka" || type === "rabbitmq") ? "" : "root";',
);
expect(source).toContain(
- 'placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka" || dbType === "rabbitmq") ? "未开启认证可留空" : undefined}',
+ 'placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "mqtt" || dbType === "kafka" || dbType === "rabbitmq") ? "未开启认证可留空" : undefined}',
);
expect(source).toContain('label="显示数据库 (留空显示全部)"');
});
@@ -77,6 +77,19 @@ describe('ConnectionModal data source registry', () => {
expect(source).toContain('return "fetchSize=1024&timeZone=Asia%2FShanghai";');
});
+ it('exposes MQTT in the create-connection picker with broker and topic-filter defaults', () => {
+ expect(source).toContain("case 'mqtt':");
+ expect(source).toContain('return 1883;');
+ expect(source).toContain('mqtt: ["mqtt", "mqtts", "tcp", "ssl", "tls"]');
+ expect(source).toContain("key: 'mqtt'");
+ expect(source).toContain("name: 'MQTT'");
+ expect(source).toContain('dbType === "mqtt"');
+ expect(source).toContain("return 'Broker / Topic Filter / QoS';");
+ expect(source).toContain('return "mqtt://user:pass@127.0.0.1:1883/devices%2F%2B%2Ftelemetry?topology=cluster&clientId=gonavi-desktop&qos=1";');
+ expect(source).toContain('return "topics=devices%2F%2B%2Ftelemetry,%24SYS%2F%23&clientId=gonavi-desktop&qos=1&cleanSession=true&fetchWaitMs=4000";');
+ expect(source).toContain('label="默认 Topic / Filter(可选)"');
+ });
+
it('exposes Kafka in the create-connection picker with broker and topic defaults', () => {
expect(source).toContain("case 'kafka':");
expect(source).toContain('return 9092;');
diff --git a/frontend/src/components/ConnectionModal.tsx b/frontend/src/components/ConnectionModal.tsx
index 538a2d9..e106664 100644
--- a/frontend/src/components/ConnectionModal.tsx
+++ b/frontend/src/components/ConnectionModal.tsx
@@ -385,6 +385,7 @@ const ConnectionModal: React.FC<{
);
const disableLocalBackdropFilter = isMacLikePlatform();
const mysqlTopology = Form.useWatch("mysqlTopology", form) || "single";
+ const mqttTopology = Form.useWatch("mqttTopology", form) || "single";
const kafkaTopology = Form.useWatch("kafkaTopology", form) || "single";
const mongoTopology = Form.useWatch("mongoTopology", form) || "single";
const mongoSrv = Form.useWatch("mongoSrv", form) || false;
@@ -419,6 +420,7 @@ const ConnectionModal: React.FC<{
);
const isOceanBaseOracle = dbType === "oceanbase" && oceanBaseProtocol === "oracle";
const isMySQLLike = isMySQLCompatibleType(dbType) && !isOceanBaseOracle;
+ const isMQTT = dbType === "mqtt";
const isKafka = dbType === "kafka";
const isRabbitMQ = dbType === "rabbitmq";
const supportsConnectionParams = supportsConnectionParamsForType(dbType);
@@ -1691,6 +1693,69 @@ const ConnectionModal: React.FC<{
};
}
+ if (type === "mqtt") {
+ const defaultPort = getDefaultPortByType(type);
+ const parsed =
+ parseMultiHostUri(trimmedUri, "mqtt") ||
+ parseMultiHostUri(trimmedUri, "mqtts") ||
+ parseMultiHostUri(trimmedUri, "tcp") ||
+ parseMultiHostUri(trimmedUri, "ssl") ||
+ parseMultiHostUri(trimmedUri, "tls");
+ if (!parsed) {
+ return null;
+ }
+ if (!parsed.hosts.length || parsed.hosts.length > MAX_URI_HOSTS) {
+ return null;
+ }
+ if (parsed.hosts.some((entry) => !isValidUriHostEntry(entry))) {
+ return null;
+ }
+ const hostList = normalizeAddressList(parsed.hosts, defaultPort);
+ if (!hostList.length) {
+ return null;
+ }
+ const primary = parseHostPort(
+ hostList[0] || `localhost:${defaultPort}`,
+ defaultPort,
+ );
+ const lowerUri = trimmedUri.toLowerCase();
+ const tlsEnabled =
+ lowerUri.startsWith("mqtts://") ||
+ lowerUri.startsWith("ssl://") ||
+ lowerUri.startsWith("tls://") ||
+ normalizeUriBool(
+ parsed.params.get("tls") ||
+ parsed.params.get("ssl") ||
+ parsed.params.get("useSSL") ||
+ parsed.params.get("use_ssl"),
+ );
+ const skipVerify = normalizeUriBool(
+ parsed.params.get("skip_verify") || parsed.params.get("skipVerify"),
+ );
+ const topology = String(parsed.params.get("topology") || "")
+ .trim()
+ .toLowerCase();
+ const timeoutValue = Number(parsed.params.get("timeout"));
+ return {
+ host: primary?.host || "localhost",
+ port: primary?.port || defaultPort,
+ user: parsed.username,
+ password: parsed.password,
+ database: parsed.database || "",
+ useSSL: tlsEnabled,
+ sslMode: tlsEnabled ? (skipVerify ? "skip-verify" : "required") : "disable",
+ ...extractSSLPathValuesFromParams(parsed.params, type),
+ mqttTopology:
+ topology === "cluster" || hostList.length > 1 ? "cluster" : "single",
+ mqttHosts: hostList.slice(1),
+ connectionParams: serializeConnectionParams(parsed.params),
+ timeout:
+ Number.isFinite(timeoutValue) && timeoutValue > 0
+ ? Math.min(MAX_TIMEOUT_SECONDS, Math.trunc(timeoutValue))
+ : undefined,
+ };
+ }
+
if (type === "rabbitmq") {
const defaultPort = getDefaultPortByType(type);
const parsed = parseSingleHostUri(
@@ -1979,6 +2044,9 @@ const ConnectionModal: React.FC<{
if (dbType === "iotdb") {
return "iotdb://root:root@127.0.0.1:6667/root.sg";
}
+ if (dbType === "mqtt") {
+ return "mqtt://user:pass@127.0.0.1:1883/devices%2F%2B%2Ftelemetry?topology=cluster&clientId=gonavi-desktop&qos=1";
+ }
if (dbType === "kafka") {
return "kafka://user:pass@127.0.0.1:9092,127.0.0.2:9092/orders.events?topology=cluster&groupId=analytics&mechanism=scram-sha-256";
}
@@ -2040,6 +2108,8 @@ const ConnectionModal: React.FC<{
return "timezone=Asia%2FShanghai";
case "iotdb":
return "fetchSize=1024&timeZone=Asia%2FShanghai";
+ case "mqtt":
+ return "topics=devices%2F%2B%2Ftelemetry,%24SYS%2F%23&clientId=gonavi-desktop&qos=1&cleanSession=true&fetchWaitMs=4000";
case "kafka":
return "groupId=gonavi&mechanism=scram-sha-256&clientId=gonavi-desktop&startOffset=latest";
case "rabbitmq":
@@ -2136,6 +2206,36 @@ const ConnectionModal: React.FC<{
return `kafka://${encodedAuth}${allBrokers.join(",")}${topicPath}${query ? `?${query}` : ""}`;
}
+ if (type === "mqtt") {
+ const primary = toAddress(host, port, defaultPort);
+ const brokers =
+ values.mqttTopology === "cluster"
+ ? normalizeAddressList(values.mqttHosts, defaultPort)
+ : [];
+ const allBrokers = normalizeAddressList([primary, ...brokers], defaultPort);
+ const params = new URLSearchParams();
+ if (allBrokers.length > 1 || values.mqttTopology === "cluster") {
+ params.set("topology", "cluster");
+ }
+ if (values.useSSL) {
+ const mode = String(values.sslMode || "preferred")
+ .trim()
+ .toLowerCase();
+ params.set("tls", "true");
+ if (mode === "skip-verify" || mode === "preferred") {
+ params.set("skip_verify", "true");
+ }
+ appendSSLPathParamsForUri(params, type, values);
+ }
+ if (Number.isFinite(timeout) && timeout > 0) {
+ params.set("timeout", String(timeout));
+ }
+ mergeConnectionParams(params, values.connectionParams);
+ const topicPath = database ? `/${encodeURIComponent(database)}` : "";
+ const query = params.toString();
+ return `mqtt://${encodedAuth}${allBrokers.join(",")}${topicPath}${query ? `?${query}` : ""}`;
+ }
+
if (type === "rabbitmq") {
const address = toAddress(host, port, defaultPort);
const params = new URLSearchParams();
@@ -2529,6 +2629,8 @@ const ConnectionModal: React.FC<{
configType === "sphinx"
? normalizedHosts.slice(1)
: [];
+ const mqttHosts =
+ configType === "mqtt" ? normalizedHosts.slice(1) : [];
const kafkaHosts =
configType === "kafka" ? normalizedHosts.slice(1) : [];
const mongoHosts =
@@ -2538,6 +2640,9 @@ const ConnectionModal: React.FC<{
const mysqlIsReplica =
String(config.topology || "").toLowerCase() === "replica" ||
mysqlReplicaHosts.length > 0;
+ const mqttIsCluster =
+ String(config.topology || "").toLowerCase() === "cluster" ||
+ mqttHosts.length > 0;
const kafkaIsCluster =
String(config.topology || "").toLowerCase() === "cluster" ||
kafkaHosts.length > 0;
@@ -2613,6 +2718,8 @@ const ConnectionModal: React.FC<{
timeout: resolvedJvmTimeout,
mysqlTopology: mysqlIsReplica ? "replica" : "single",
mysqlReplicaHosts: mysqlReplicaHosts,
+ mqttTopology: mqttIsCluster ? "cluster" : "single",
+ mqttHosts: mqttHosts,
kafkaTopology: kafkaIsCluster ? "cluster" : "single",
kafkaHosts: kafkaHosts,
mysqlReplicaUser: config.mysqlReplicaUser || "",
@@ -3590,6 +3697,23 @@ const ConnectionModal: React.FC<{
}
}
+ if (type === "mqtt") {
+ const brokers =
+ mergedValues.mqttTopology === "cluster"
+ ? normalizeAddressList(mergedValues.mqttHosts, defaultPort)
+ : [];
+ const allHosts = normalizeAddressList(
+ [`${primaryHost}:${primaryPort}`, ...brokers],
+ defaultPort,
+ );
+ if (mergedValues.mqttTopology === "cluster" || allHosts.length > 1) {
+ hosts = allHosts;
+ topology = "cluster";
+ } else {
+ topology = "single";
+ }
+ }
+
if (type === "mongodb") {
mongoSrvEnabled = !!mergedValues.mongoSrv;
const extraHosts =
@@ -3826,6 +3950,7 @@ const ConnectionModal: React.FC<{
includeDatabases: undefined,
includeRedisDatabases: undefined,
mysqlTopology: "single",
+ mqttTopology: "single",
kafkaTopology: "single",
redisTopology: "single",
mongoTopology: "single",
@@ -3836,6 +3961,7 @@ const ConnectionModal: React.FC<{
mongoAuthMechanism: "",
savePassword: true,
mysqlReplicaHosts: [],
+ mqttHosts: [],
kafkaHosts: [],
redisHosts: [],
redisSentinelMaster: "",
@@ -3915,7 +4041,7 @@ const ConnectionModal: React.FC<{
});
} else if (type !== "custom") {
const defaultUser =
- type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka" || type === "rabbitmq") ? "" : "root";
+ type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "mqtt" || type === "kafka" || type === "rabbitmq") ? "" : "root";
const sslCapableType = supportsSSLForType(type);
setUseSSL(false);
setUseHttpTunnel(false);
@@ -3934,6 +4060,7 @@ const ConnectionModal: React.FC<{
httpTunnelUser: "",
httpTunnelPassword: "",
mysqlTopology: "single",
+ mqttTopology: "single",
kafkaTopology: "single",
redisTopology: "single",
mongoTopology: "single",
@@ -3944,6 +4071,7 @@ const ConnectionModal: React.FC<{
mongoAuthMechanism: "",
savePassword: true,
mysqlReplicaHosts: [],
+ mqttHosts: [],
kafkaHosts: [],
redisHosts: [],
redisSentinelMaster: "",
@@ -5061,6 +5189,22 @@ const ConnectionModal: React.FC<{
),
})}
+ {dbType === "mqtt" &&
+ renderConfigSectionCard({
+ sectionKey: "service",
+ icon: ,
+ children: (
+
+
+
+ ),
+ })}
+
{dbType === "rabbitmq" &&
renderConfigSectionCard({
sectionKey: "service",
@@ -5151,6 +5295,28 @@ const ConnectionModal: React.FC<{
}),
})}
+ {isMQTT &&
+ renderConfigSectionCard({
+ sectionKey: "connectionMode",
+ icon: ,
+ children: renderChoiceCards({
+ fieldName: "mqttTopology",
+ value: String(mqttTopology),
+ options: [
+ {
+ value: "single",
+ label: "单 Broker",
+ description: "只配置一个 broker,适合本地或简单环境。",
+ },
+ {
+ value: "cluster",
+ label: "集群模式",
+ description: "配置多个 broker,提高连接发现与故障切换成功率。",
+ },
+ ],
+ }),
+ })}
+
{isKafka &&
kafkaTopology === "cluster" &&
renderConfigSectionCard({
@@ -5171,6 +5337,26 @@ const ConnectionModal: React.FC<{
),
})}
+ {isMQTT &&
+ mqttTopology === "cluster" &&
+ renderConfigSectionCard({
+ sectionKey: "replica",
+ icon: ,
+ children: (
+
+
+
+ ),
+ })}
+
{isMySQLLike &&
mysqlTopology === "replica" &&
renderConfigSectionCard({
@@ -5294,7 +5480,7 @@ const ConnectionModal: React.FC<{
}
style={{ marginBottom: 0 }}
>
-
+
{
expect(markup).toContain('>Io');
});
+ it('includes MQTT in the selectable database icons', () => {
+ expect(DB_ICON_TYPES).toContain('mqtt');
+ expect(getDbIconLabel('mqtt')).toBe('MQTT');
+ const markup = renderToStaticMarkup(<>{getDbIcon('mqtt', undefined, 22)}>);
+ expect(markup).toContain('>Mq');
+ });
+
it('includes Kafka in the selectable database icons', () => {
expect(DB_ICON_TYPES).toContain('kafka');
expect(getDbIconLabel('kafka')).toBe('Kafka');
diff --git a/frontend/src/components/DatabaseIcons.tsx b/frontend/src/components/DatabaseIcons.tsx
index 2cb472c..bf55eb3 100644
--- a/frontend/src/components/DatabaseIcons.tsx
+++ b/frontend/src/components/DatabaseIcons.tsx
@@ -52,6 +52,7 @@ const DB_DEFAULT_COLORS: Record = {
iris: '#1F6FEB',
tdengine: '#2962FF',
iotdb: '#0F766E',
+ mqtt: '#0EA5A4',
kafka: '#F97316',
rabbitmq: '#FF6B35',
chroma: '#7C3AED',
@@ -194,6 +195,9 @@ const TDengineIcon: React.FC = ({ size = 16, color }) => (
const IoTDBIcon: React.FC = ({ size = 16, color }) => (
);
+const MQTTIcon: React.FC = ({ size = 16, color }) => (
+
+);
const KafkaIcon: React.FC = ({ size = 16, color }) => (
);
@@ -262,6 +266,7 @@ const DB_ICON_MAP: Record> = {
iris: IrisIcon,
tdengine: TDengineIcon,
iotdb: IoTDBIcon,
+ mqtt: MQTTIcon,
kafka: KafkaIcon,
rabbitmq: RabbitMQIcon,
chroma: ChromaIcon,
@@ -274,7 +279,7 @@ const DB_ICON_MAP: Record> = {
export const DB_ICON_TYPES: string[] = [
'mysql', 'mariadb', 'oceanbase', 'postgres', 'redis', 'mongodb', 'jvm',
'oracle', 'sqlserver', 'sqlite', 'duckdb', 'clickhouse', 'starrocks',
- 'kingbase', 'dameng', 'vastbase', 'opengauss', 'gaussdb', 'goldendb', 'highgo', 'iris', 'tdengine', 'iotdb', 'kafka', 'rabbitmq', 'chroma', 'qdrant', 'elasticsearch', 'custom',
+ 'kingbase', 'dameng', 'vastbase', 'opengauss', 'gaussdb', 'goldendb', 'highgo', 'iris', 'tdengine', 'iotdb', 'mqtt', 'kafka', 'rabbitmq', 'chroma', 'qdrant', 'elasticsearch', 'custom',
];
/** 该类型是否有品牌 SVG 文件 */
@@ -296,7 +301,7 @@ export const getDbIconLabel = (type: string): string => {
sqlserver: 'SQL Server', clickhouse: 'ClickHouse', sqlite: 'SQLite',
starrocks: 'StarRocks',
duckdb: 'DuckDB', kingbase: '金仓', dameng: '达梦',
- vastbase: 'VastBase', opengauss: 'OpenGauss', gaussdb: 'GaussDB', goldendb: 'GoldenDB', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', kafka: 'Kafka', rabbitmq: 'RabbitMQ',
+ vastbase: 'VastBase', opengauss: 'OpenGauss', gaussdb: 'GaussDB', goldendb: 'GoldenDB', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', mqtt: 'MQTT', kafka: 'Kafka', rabbitmq: 'RabbitMQ',
chroma: 'Chroma',
qdrant: 'Qdrant',
elasticsearch: 'Elasticsearch',
diff --git a/frontend/src/components/MessagePublishModal.tsx b/frontend/src/components/MessagePublishModal.tsx
index a5e2504..e9b379f 100644
--- a/frontend/src/components/MessagePublishModal.tsx
+++ b/frontend/src/components/MessagePublishModal.tsx
@@ -1,5 +1,5 @@
import React, { useEffect, useMemo, useState } from 'react';
-import { Alert, Form, Input, Modal, Select, Space, Typography, message } from 'antd';
+import { Alert, Checkbox, Form, Input, Modal, Select, Space, Typography, message } from 'antd';
import { DBQuery } from '../../wailsjs/go/app/App';
import type { SavedConnection } from '../types';
@@ -149,6 +149,28 @@ const MessagePublishModal: React.FC = ({
)}
+ {presentation.showQos && (
+
+
+
+ )}
+
+ {presentation.showRetain && (
+
+ Retain 消息
+
+ )}
+
{presentation.showKey && (
diff --git a/frontend/src/utils/connectionDriverType.ts b/frontend/src/utils/connectionDriverType.ts
index 744ee9e..0b50842 100644
--- a/frontend/src/utils/connectionDriverType.ts
+++ b/frontend/src/utils/connectionDriverType.ts
@@ -18,6 +18,7 @@ export const normalizeDriverType = (value: string): string => {
if (normalized === 'chromadb' || normalized === 'chroma-db') return 'chroma';
if (normalized === 'qdrantdb' || normalized === 'qdrant-db') return 'qdrant';
if (normalized === 'apache-iotdb' || normalized === 'apache_iotdb') return 'iotdb';
+ if (normalized === 'mqtts') return 'mqtt';
if (normalized === 'apache-kafka' || normalized === 'apache_kafka') return 'kafka';
if (normalized === 'rabbit-mq' || normalized === 'rabbit_mq') return 'rabbitmq';
if (normalized === 'doris') return 'diros';
diff --git a/frontend/src/utils/connectionModalPresentation.ts b/frontend/src/utils/connectionModalPresentation.ts
index 5d61170..3bfc3be 100644
--- a/frontend/src/utils/connectionModalPresentation.ts
+++ b/frontend/src/utils/connectionModalPresentation.ts
@@ -283,6 +283,20 @@ export const resolveConnectionConfigLayout = (
],
};
}
+ if (type === 'mqtt') {
+ return {
+ kind: 'generic-sql',
+ sections: [
+ 'identity',
+ 'uri',
+ 'target',
+ 'connectionMode',
+ 'replica',
+ 'service',
+ 'credentials',
+ ],
+ };
+ }
if (type === 'kafka') {
return {
kind: 'generic-sql',
diff --git a/frontend/src/utils/connectionTypeCapabilities.ts b/frontend/src/utils/connectionTypeCapabilities.ts
index 21fb00b..c38f28d 100644
--- a/frontend/src/utils/connectionTypeCapabilities.ts
+++ b/frontend/src/utils/connectionTypeCapabilities.ts
@@ -16,6 +16,7 @@ export const singleHostUriSchemesByType: Record = {
elasticsearch: ["http", "https"],
chroma: ["http", "https", "chroma"],
qdrant: ["http", "https", "qdrant"],
+ mqtt: ["mqtt", "mqtts", "tcp", "ssl", "tls"],
rabbitmq: ["rabbitmq", "http", "https"],
};
@@ -29,6 +30,8 @@ const normalizeConnectionType = (type: string) =>
case "greatdb":
case "gdb":
return "goldendb";
+ case "mqtts":
+ return "mqtt";
default:
return normalized;
}
@@ -59,6 +62,7 @@ const sslSupportedTypes = new Set([
"elasticsearch",
"chroma",
"qdrant",
+ "mqtt",
"kafka",
"rabbitmq",
]);
@@ -87,6 +91,7 @@ const sslCAPathSupportedTypes = new Set([
"elasticsearch",
"chroma",
"qdrant",
+ "mqtt",
"kafka",
"rabbitmq",
]);
@@ -109,6 +114,7 @@ const sslClientCertificateSupportedTypes = new Set([
"gaussdb",
"mongodb",
"redis",
+ "mqtt",
"kafka",
"rabbitmq",
]);
@@ -161,5 +167,6 @@ export const supportsConnectionParamsForType = (type: string) =>
type === "elasticsearch" ||
type === "chroma" ||
type === "qdrant" ||
+ type === "mqtt" ||
type === "kafka" ||
type === "rabbitmq";
diff --git a/frontend/src/utils/connectionTypeCatalog.ts b/frontend/src/utils/connectionTypeCatalog.ts
index ebddeca..8cbd997 100644
--- a/frontend/src/utils/connectionTypeCatalog.ts
+++ b/frontend/src/utils/connectionTypeCatalog.ts
@@ -64,6 +64,7 @@ export const CONNECTION_TYPE_GROUPS: ConnectionTypeCatalogGroup[] = [
{
label: '消息队列',
items: [
+ { key: 'mqtt', name: 'MQTT' },
{ key: 'kafka', name: 'Kafka' },
{ key: 'rabbitmq', name: 'RabbitMQ' },
],
@@ -123,6 +124,8 @@ export const getConnectionTypeDefaultPort = (type: string): number => {
return 8000;
case 'qdrant':
return 6333;
+ case 'mqtt':
+ return 1883;
case 'kafka':
return 9092;
case 'rabbitmq':
@@ -159,6 +162,8 @@ export const getConnectionTypeHint = (type: string): string => {
return 'Collection 浏览、向量搜索和 Payload 过滤';
case 'iotdb':
return 'Storage Group / Device / Timeseries';
+ case 'mqtt':
+ return 'Broker / Topic Filter / QoS';
case 'kafka':
return 'Broker / Topic / Consumer Group';
case 'rabbitmq':
diff --git a/frontend/src/utils/dataSourceCapabilities.test.ts b/frontend/src/utils/dataSourceCapabilities.test.ts
index 5395984..da63fa3 100644
--- a/frontend/src/utils/dataSourceCapabilities.test.ts
+++ b/frontend/src/utils/dataSourceCapabilities.test.ts
@@ -164,6 +164,27 @@ describe('dataSourceCapabilities', () => {
});
});
+ it('treats MQTT as a queryable messaging datasource with manual total count and publish support', () => {
+ expect(getDataSourceCapabilities({ type: 'mqtt' })).toMatchObject({
+ type: 'mqtt',
+ supportsQueryEditor: true,
+ supportsSqlQueryExport: false,
+ supportsCopyInsert: false,
+ supportsCreateDatabase: false,
+ supportsRenameDatabase: false,
+ supportsDropDatabase: false,
+ supportsMessagePublish: true,
+ forceReadOnlyQueryResult: true,
+ preferManualTotalCount: true,
+ });
+ expect(getDataSourceCapabilities({ type: 'custom', driver: 'mqtts' })).toMatchObject({
+ type: 'mqtt',
+ supportsQueryEditor: true,
+ supportsMessagePublish: true,
+ preferManualTotalCount: true,
+ });
+ });
+
it('treats Kafka as a queryable read-only messaging datasource', () => {
expect(getDataSourceCapabilities({ type: 'kafka' })).toMatchObject({
type: 'kafka',
diff --git a/frontend/src/utils/dataSourceCapabilities.ts b/frontend/src/utils/dataSourceCapabilities.ts
index 3bba2d4..3234997 100644
--- a/frontend/src/utils/dataSourceCapabilities.ts
+++ b/frontend/src/utils/dataSourceCapabilities.ts
@@ -35,6 +35,9 @@ const normalizeDataSourceToken = (raw: string): string => {
case 'qdrantdb':
case 'qdrant-db':
return 'qdrant';
+ case 'mqtt':
+ case 'mqtts':
+ return 'mqtt';
case 'apache-iotdb':
case 'apache_iotdb':
return 'iotdb';
@@ -121,9 +124,9 @@ const COPY_INSERT_TYPES = new Set([
]);
const QUERY_EDITOR_DISABLED_TYPES = new Set(['redis']);
-const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse', 'kafka', 'rabbitmq']);
-const MESSAGE_PUBLISH_TYPES = new Set(['kafka', 'rabbitmq']);
-const MANUAL_TOTAL_COUNT_TYPES = new Set(['duckdb', 'oracle']);
+const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse', 'mqtt', 'kafka', 'rabbitmq']);
+const MESSAGE_PUBLISH_TYPES = new Set(['mqtt', 'kafka', 'rabbitmq']);
+const MANUAL_TOTAL_COUNT_TYPES = new Set(['duckdb', 'oracle', 'mqtt']);
const APPROXIMATE_TABLE_COUNT_TYPES = new Set(['duckdb', 'oracle']);
const APPROXIMATE_TOTAL_PAGE_TYPES = new Set(['duckdb']);
diff --git a/frontend/src/utils/messagePublish.test.ts b/frontend/src/utils/messagePublish.test.ts
index c56a971..94b32db 100644
--- a/frontend/src/utils/messagePublish.test.ts
+++ b/frontend/src/utils/messagePublish.test.ts
@@ -64,6 +64,37 @@ describe('messagePublish', () => {
});
});
+ it('builds an MQTT publish JSON command with qos and retain flags', () => {
+ const result = buildMessagePublishCommand(
+ { type: 'mqtt' },
+ {
+ destination: 'devices/device-001/telemetry',
+ qos: 1,
+ retain: true,
+ bodyMode: 'json',
+ body: '{"id":1,"event":"created"}',
+ },
+ );
+
+ expect(result.transportLabel).toBe('MQTT Topic');
+ expect(result.destinationLabel).toBe('devices/device-001/telemetry');
+ expect(result.commandText).toContain('"publish": "devices/device-001/telemetry"');
+ expect(result.commandText).toContain('"qos": 1');
+ expect(result.commandText).toContain('"retain": true');
+ });
+
+ it('seeds MQTT default publish draft with connection qos and retain defaults', () => {
+ expect(createDefaultMessagePublishDraft(
+ { type: 'mqtt', database: 'devices/+/telemetry', connectionParams: 'qos=1&retain=true' },
+ '',
+ )).toMatchObject({
+ destination: 'devices/+/telemetry',
+ qos: 1,
+ retain: true,
+ bodyMode: 'json',
+ });
+ });
+
it('builds a RabbitMQ publish JSON command with routing and properties', () => {
const result = buildMessagePublishCommand(
{ type: 'rabbitmq', connectionParams: 'defaultQueue=orders.queue&exchange=events.topic' },
diff --git a/frontend/src/utils/messagePublish.ts b/frontend/src/utils/messagePublish.ts
index 4ff7a34..b789a8a 100644
--- a/frontend/src/utils/messagePublish.ts
+++ b/frontend/src/utils/messagePublish.ts
@@ -15,6 +15,8 @@ export type MessagePublishDraft = {
destination: string;
exchange?: string;
routingKey?: string;
+ qos?: number;
+ retain?: boolean;
keyMode?: MessagePublishValueMode;
key?: string;
bodyMode?: MessagePublishValueMode;
@@ -40,6 +42,8 @@ export type MessagePublishPresentation = {
showExchange: boolean;
showRoutingKey: boolean;
showProperties: boolean;
+ showQos: boolean;
+ showRetain: boolean;
};
const normalizeMode = (value: unknown, fallback: MessagePublishValueMode): MessagePublishValueMode => {
@@ -138,6 +142,9 @@ const resolveDefaultDestination = (config: ConnectionLike, explicitDestination:
if (resolvedType === 'kafka') {
return String(config?.database || '').trim();
}
+ if (resolvedType === 'mqtt') {
+ return String(config?.database || params.get('defaultTopic') || params.get('topic') || '').trim();
+ }
if (resolvedType === 'rabbitmq') {
return String(params.get('defaultQueue') || params.get('queue') || '').trim();
}
@@ -161,6 +168,25 @@ export const getMessagePublishPresentation = (
showExchange: true,
showRoutingKey: true,
showProperties: true,
+ showQos: false,
+ showRetain: false,
+ };
+ }
+
+ if (resolvedType === 'mqtt') {
+ return {
+ transportLabel: 'MQTT Topic',
+ destinationLabel: 'Topic',
+ destinationPlaceholder: '例如:devices/device-001/telemetry',
+ destinationRequiredMessage: '请输入 Topic',
+ alertMessage: '当前表单会自动拼装 MQTT publish JSON 命令,并直接通过 broker 执行测试发送。',
+ successHint: 'QoS 与 retain 可单独指定;未填写时沿用当前连接中的默认参数。',
+ showKey: false,
+ showExchange: false,
+ showRoutingKey: false,
+ showProperties: false,
+ showQos: true,
+ showRetain: true,
};
}
@@ -175,6 +201,8 @@ export const getMessagePublishPresentation = (
showExchange: false,
showRoutingKey: false,
showProperties: false,
+ showQos: false,
+ showRetain: false,
};
};
@@ -198,6 +226,18 @@ export const createDefaultMessagePublishDraft = (
};
}
+ if (resolvedType === 'mqtt') {
+ const qosValue = Number(params.get('qos'));
+ return {
+ destination: resolvedDestination,
+ qos: Number.isFinite(qosValue) ? Math.min(2, Math.max(0, Math.trunc(qosValue))) : 0,
+ retain: ['1', 'true', 'yes', 'on'].includes(String(params.get('retain') || '').trim().toLowerCase()),
+ bodyMode: 'json',
+ body: '{\n "event": "test",\n "source": "gonavi"\n}',
+ headers: '',
+ };
+ }
+
return {
destination: resolvedDestination,
keyMode: 'text',
@@ -218,6 +258,27 @@ export const buildMessagePublishCommand = (
throw new Error('请输入目标 Topic / Queue');
}
+ if (resolvedType === 'mqtt') {
+ if (/[#+]/.test(destination)) {
+ throw new Error('MQTT 发送 Topic 不能包含 + 或 # 通配符');
+ }
+ const bodyMode = normalizeMode(draft.bodyMode, 'json');
+ const qosValue = Number(draft.qos);
+ const qos = Number.isFinite(qosValue) ? Math.min(2, Math.max(0, Math.trunc(qosValue))) : 0;
+ const command: Record = {
+ publish: destination,
+ payload: parseRequiredPayload(draft.body, bodyMode, '消息体'),
+ qos,
+ retain: !!draft.retain,
+ };
+
+ return {
+ commandText: JSON.stringify(command, null, 2),
+ destinationLabel: destination,
+ transportLabel: 'MQTT Topic',
+ };
+ }
+
if (resolvedType === 'rabbitmq') {
const params = resolveConnectionParams(config);
const bodyMode = normalizeMode(draft.bodyMode, 'json');
diff --git a/frontend/src/utils/objectQueryTemplates.test.ts b/frontend/src/utils/objectQueryTemplates.test.ts
index 0c02e6a..f7abe21 100644
--- a/frontend/src/utils/objectQueryTemplates.test.ts
+++ b/frontend/src/utils/objectQueryTemplates.test.ts
@@ -11,6 +11,10 @@ describe('buildTableSelectQuery', () => {
expect(buildTableSelectQuery('kafka', 'logs.app-1')).toBe('SELECT * FROM "logs.app-1" LIMIT 100;');
});
+ it('adds a preview limit for MQTT topic browsing', () => {
+ expect(buildTableSelectQuery('mqtt', 'devices/+/telemetry')).toBe('SELECT * FROM "devices/+/telemetry" LIMIT 100;');
+ });
+
it('adds a preview limit for RabbitMQ queue browsing', () => {
expect(buildTableSelectQuery('rabbitmq', 'orders.events.v1')).toBe('SELECT * FROM "orders.events.v1" LIMIT 100;');
});
diff --git a/frontend/src/utils/objectQueryTemplates.ts b/frontend/src/utils/objectQueryTemplates.ts
index 11678be..a8589da 100644
--- a/frontend/src/utils/objectQueryTemplates.ts
+++ b/frontend/src/utils/objectQueryTemplates.ts
@@ -5,7 +5,7 @@ export const buildTableSelectQuery = (dbType: string, tableName: string): string
if (!normalizedTableName) {
return 'SELECT * FROM ';
}
- if (['kafka', 'rabbitmq'].includes(String(dbType || '').trim().toLowerCase())) {
+ if (['mqtt', 'kafka', 'rabbitmq'].includes(String(dbType || '').trim().toLowerCase())) {
return `SELECT * FROM ${quoteQualifiedIdent(dbType, normalizedTableName)} LIMIT 100;`;
}
return `SELECT * FROM ${quoteQualifiedIdent(dbType, normalizedTableName)};`;
diff --git a/frontend/src/utils/sql.test.ts b/frontend/src/utils/sql.test.ts
index 0515129..f72b2cb 100644
--- a/frontend/src/utils/sql.test.ts
+++ b/frontend/src/utils/sql.test.ts
@@ -59,6 +59,11 @@ describe('quoteQualifiedIdent', () => {
.toBe('`root`.`sg`.`d1`');
});
+ it('keeps MQTT topic filters as one quoted identifier', () => {
+ expect(quoteQualifiedIdent('mqtt', 'devices/+/telemetry.v1'))
+ .toBe('"devices/+/telemetry.v1"');
+ });
+
it('keeps Kafka topic names as one quoted identifier', () => {
expect(quoteQualifiedIdent('kafka', 'logs.app-1'))
.toBe('"logs.app-1"');
diff --git a/frontend/src/utils/sql.ts b/frontend/src/utils/sql.ts
index a1f122d..2376d95 100644
--- a/frontend/src/utils/sql.ts
+++ b/frontend/src/utils/sql.ts
@@ -54,7 +54,7 @@ export const quoteIdentPart = (dbType: string, ident: string) => {
export const quoteQualifiedIdent = (dbType: string, ident: string) => {
const raw = (ident || '').trim();
if (!raw) return raw;
- if (['kafka', 'rabbitmq'].includes((dbType || '').trim().toLowerCase())) {
+ if (['mqtt', 'kafka', 'rabbitmq'].includes((dbType || '').trim().toLowerCase())) {
return quoteIdentPart(dbType, raw);
}
const parts = splitQualifiedNameSegments(raw).filter(Boolean);
diff --git a/frontend/src/utils/sqlDialect.test.ts b/frontend/src/utils/sqlDialect.test.ts
index 0efaa9f..33a5f2d 100644
--- a/frontend/src/utils/sqlDialect.test.ts
+++ b/frontend/src/utils/sqlDialect.test.ts
@@ -38,6 +38,8 @@ describe('sqlDialect', () => {
expect(resolveSqlDialect('custom', 'qdrant-db')).toBe('qdrant');
expect(resolveSqlDialect('Apache-IoTDB')).toBe('iotdb');
expect(resolveSqlDialect('custom', 'apache_iotdb')).toBe('iotdb');
+ expect(resolveSqlDialect('MQTTS')).toBe('mqtt');
+ expect(resolveSqlDialect('custom', 'mqtts')).toBe('mqtt');
expect(resolveSqlDialect('Apache-Kafka')).toBe('kafka');
expect(resolveSqlDialect('custom', 'apache_kafka')).toBe('kafka');
expect(resolveSqlDialect('Rabbit-MQ')).toBe('rabbitmq');
@@ -75,6 +77,11 @@ describe('sqlDialect', () => {
expect(resolveSqlKeywords('iotdb')).not.toEqual(expect.arrayContaining(['TAGS', 'USING']));
});
+ it('resolves MQTT completion keywords for topic discovery and consume syntax', () => {
+ expect(resolveSqlKeywords('mqtt')).toEqual(expect.arrayContaining(['SHOW TOPICS', 'DESCRIBE TOPIC', 'CONSUME']));
+ expect(resolveSqlKeywords('mqtt')).not.toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'AUTO_INCREMENT']));
+ });
+
it('resolves Kafka completion keywords for topic discovery and consume syntax', () => {
expect(resolveSqlKeywords('kafka')).toEqual(expect.arrayContaining(['SHOW TOPICS', 'DESCRIBE TOPIC', 'CONSUME']));
expect(resolveSqlKeywords('kafka')).not.toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'AUTO_INCREMENT']));
diff --git a/frontend/src/utils/sqlDialect.ts b/frontend/src/utils/sqlDialect.ts
index df8ffa1..e123bfb 100644
--- a/frontend/src/utils/sqlDialect.ts
+++ b/frontend/src/utils/sqlDialect.ts
@@ -29,6 +29,7 @@ export type SqlDialect =
| 'clickhouse'
| 'tdengine'
| 'iotdb'
+ | 'mqtt'
| 'kafka'
| 'rabbitmq'
| 'mongodb'
@@ -137,6 +138,9 @@ export const resolveSqlDialect = (
case 'apache-iotdb':
case 'apache_iotdb':
return 'iotdb';
+ case 'mqtt':
+ case 'mqtts':
+ return 'mqtt';
case 'kafka':
case 'apache-kafka':
case 'apache_kafka':
@@ -169,6 +173,7 @@ export const resolveSqlDialect = (
if (source.includes('clickhouse')) return 'clickhouse';
if (source.includes('tdengine')) return 'tdengine';
if (source.includes('iotdb')) return 'iotdb';
+ if (source.includes('mqtt')) return 'mqtt';
if (source.includes('kafka')) return 'kafka';
if (source.includes('rabbitmq') || source.includes('rabbit-mq') || source.includes('rabbit_mq')) return 'rabbitmq';
if (source.includes('sqlserver') || source.includes('mssql')) return 'sqlserver';
@@ -623,6 +628,15 @@ const IOTDB_KEYWORDS = [
'COMPRESSION',
];
+const MQTT_KEYWORDS = [
+ 'SHOW TOPICS',
+ 'DESCRIBE TOPIC',
+ 'CONSUME',
+ 'FROM',
+ 'LIMIT',
+ 'OFFSET',
+];
+
const KAFKA_KEYWORDS = [
'SHOW TOPICS',
'SHOW TOPIC',
@@ -658,6 +672,7 @@ export const resolveSqlKeywords = (dbType: string): string[] => {
if (dialect === 'clickhouse') return unique([...COMMON_KEYWORDS, ...CLICKHOUSE_KEYWORDS]);
if (dialect === 'tdengine') return unique([...COMMON_KEYWORDS, ...TDENGINE_KEYWORDS]);
if (dialect === 'iotdb') return unique([...COMMON_KEYWORDS, ...IOTDB_KEYWORDS]);
+ if (dialect === 'mqtt') return unique([...COMMON_KEYWORDS, ...MQTT_KEYWORDS]);
if (dialect === 'kafka') return unique([...COMMON_KEYWORDS, ...KAFKA_KEYWORDS]);
if (dialect === 'rabbitmq') return unique([...COMMON_KEYWORDS, ...RABBITMQ_KEYWORDS]);
return COMMON_KEYWORDS;
diff --git a/go.mod b/go.mod
index af00d0d..3a06bac 100644
--- a/go.mod
+++ b/go.mod
@@ -6,10 +6,12 @@ require (
gitea.com/kingbase/gokb v0.0.0-20201021123113-29bd62a876c3
gitee.com/chunanyong/dm v1.8.22
github.com/ClickHouse/clickhouse-go/v2 v2.43.0
+ github.com/HuaweiCloudDeveloper/gaussdb-go v1.0.0-rc1
github.com/apache/iotdb-client-go v1.3.7
github.com/caretdev/go-irisnative v0.2.1
github.com/duckdb/duckdb-go/v2 v2.5.5
github.com/elastic/go-elasticsearch/v8 v8.19.6
+ github.com/eclipse/paho.mqtt.golang v1.5.1
github.com/go-sql-driver/mysql v1.9.3
github.com/google/uuid v1.6.0
github.com/highgo/pq-sm3 v0.0.0
@@ -17,6 +19,7 @@ require (
github.com/microsoft/go-mssqldb v1.9.6
github.com/modelcontextprotocol/go-sdk v1.6.1
github.com/redis/go-redis/v9 v9.17.3
+ github.com/segmentio/kafka-go v0.4.51
github.com/sijms/go-ora/v2 v2.9.0
github.com/taosdata/driver-go/v3 v3.7.8
github.com/wailsapp/wails/v2 v2.11.0
@@ -32,7 +35,6 @@ require (
)
require (
- github.com/HuaweiCloudDeveloper/gaussdb-go v1.0.0-rc1 // indirect
github.com/apache/thrift v0.22.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
@@ -42,7 +44,6 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/segmentio/encoding v0.5.4 // indirect
- github.com/segmentio/kafka-go v0.4.51 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
diff --git a/go.sum b/go.sum
index a74058a..78d9524 100644
--- a/go.sum
+++ b/go.sum
@@ -74,6 +74,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM=
github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
+github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
+github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
github.com/elastic/elastic-transport-go/v8 v8.9.0 h1:KeT/2P54F0xS0S8Y3Pf+tFDg4HmBgReQMB+BMz8dDAs=
github.com/elastic/elastic-transport-go/v8 v8.9.0/go.mod h1:ssMTvNS2hwf7CaiGsRRsx4gQHFZ/jS/DkLcISxekWzc=
github.com/elastic/go-elasticsearch/v8 v8.19.6 h1:4qa7ecJkr5rLsoHKIVGbaqcFt2o57CnOHQJi9Pts/rk=
diff --git a/internal/app/db_context.go b/internal/app/db_context.go
index 24f3e1f..f57307f 100644
--- a/internal/app/db_context.go
+++ b/internal/app/db_context.go
@@ -16,6 +16,8 @@ func normalizeRunConfig(config connection.ConnectionConfig, dbName string) conne
}
switch strings.ToLower(strings.TrimSpace(config.Type)) {
+ case "mqtt", "mqtts":
+ // MQTT 的 Database 字段表示默认 Topic,不能被树上的 synthetic database(topics) 覆盖。
case "kafka", "apache-kafka", "apache_kafka":
// Kafka 的 Database 字段表示默认 Topic,不能被树上的 synthetic database(topics) 覆盖。
case "oceanbase":
@@ -53,7 +55,7 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string,
// Elasticsearch:索引名可能含多个点(如 iot_pro_biz_operate_log.index.20240626),
// 不能按点分割,直接返回原始数据库名和完整表名。
- if dbType == "elasticsearch" || dbType == "iotdb" || dbType == "kafka" || dbType == "rabbitmq" {
+ if dbType == "elasticsearch" || dbType == "iotdb" || dbType == "mqtt" || dbType == "kafka" || dbType == "rabbitmq" {
return rawDB, rawTable
}
@@ -112,7 +114,7 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string,
func normalizeMetadataSchemaAndTable(config connection.ConnectionConfig, dbName string, tableName string) (string, string) {
schema, table := normalizeSchemaAndTable(config, dbName, tableName)
switch resolveDDLDBType(config) {
- case "kafka", "rabbitmq":
+ case "mqtt", "kafka", "rabbitmq":
return schema, table
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb":
rawTable := strings.TrimSpace(tableName)
diff --git a/internal/app/db_context_test.go b/internal/app/db_context_test.go
index d98e560..2aaa292 100644
--- a/internal/app/db_context_test.go
+++ b/internal/app/db_context_test.go
@@ -332,6 +332,18 @@ func TestNormalizeSchemaAndTable_KafkaPreservesDottedTopicName(t *testing.T) {
}
}
+func TestNormalizeSchemaAndTable_MQTTPreservesTopicFilter(t *testing.T) {
+ t.Parallel()
+
+ schemaOrDb, table := normalizeSchemaAndTable(connection.ConnectionConfig{
+ Type: "mqtt",
+ }, "topics", "devices/floor1.sensor.v1")
+
+ if schemaOrDb != "topics" || table != "devices/floor1.sensor.v1" {
+ t.Fatalf("expected mqtt topic filter to stay intact, got %q.%q", schemaOrDb, table)
+ }
+}
+
func TestNormalizeSchemaAndTable_RabbitMQPreservesDottedQueueName(t *testing.T) {
t.Parallel()
@@ -356,6 +368,18 @@ func TestNormalizeMetadataSchemaAndTable_KafkaPreservesDottedTopicName(t *testin
}
}
+func TestNormalizeMetadataSchemaAndTable_MQTTPreservesTopicFilter(t *testing.T) {
+ t.Parallel()
+
+ schemaOrDb, table := normalizeMetadataSchemaAndTable(connection.ConnectionConfig{
+ Type: "mqtt",
+ }, "topics", "devices/floor1.sensor.v1")
+
+ if schemaOrDb != "topics" || table != "devices/floor1.sensor.v1" {
+ t.Fatalf("expected mqtt metadata topic filter to stay intact, got %q.%q", schemaOrDb, table)
+ }
+}
+
func TestNormalizeMetadataSchemaAndTable_RabbitMQPreservesDottedQueueName(t *testing.T) {
t.Parallel()
diff --git a/internal/app/methods_db.go b/internal/app/methods_db.go
index 2796b95..cf2fd66 100644
--- a/internal/app/methods_db.go
+++ b/internal/app/methods_db.go
@@ -315,8 +315,8 @@ func normalizeSchemaAndTableByType(dbType string, dbName string, tableName strin
return rawDB, rawTable
}
- // Elasticsearch / RabbitMQ / Kafka:对象名可能含多个点,不能按点分割
- if dbType == "elasticsearch" || dbType == "kafka" || dbType == "rabbitmq" {
+ // Elasticsearch / MQTT / RabbitMQ / Kafka:对象名可能含多个点或路径,不能按点分割
+ if dbType == "elasticsearch" || dbType == "mqtt" || dbType == "kafka" || dbType == "rabbitmq" {
return rawDB, rawTable
}
diff --git a/internal/app/methods_db_create_statement_test.go b/internal/app/methods_db_create_statement_test.go
index 08c73bb..4c9599c 100644
--- a/internal/app/methods_db_create_statement_test.go
+++ b/internal/app/methods_db_create_statement_test.go
@@ -166,6 +166,15 @@ func TestNormalizeSchemaAndTableByType_KafkaPreservesDottedTopicName(t *testing.
}
}
+func TestNormalizeSchemaAndTableByType_MQTTPreservesTopicFilter(t *testing.T) {
+ t.Parallel()
+
+ schema, table := normalizeSchemaAndTableByType("mqtt", "topics", "devices/floor1.sensor.v1")
+ if schema != "topics" || table != "devices/floor1.sensor.v1" {
+ t.Fatalf("expected mqtt topic filter to stay intact, got %q.%q", schema, table)
+ }
+}
+
func TestNormalizeSchemaAndTableByType_RabbitMQPreservesDottedQueueName(t *testing.T) {
t.Parallel()
diff --git a/internal/app/methods_driver.go b/internal/app/methods_driver.go
index f0c575f..254f549 100644
--- a/internal/app/methods_driver.go
+++ b/internal/app/methods_driver.go
@@ -1432,6 +1432,8 @@ func normalizeDriverType(driverType string) string {
return "gaussdb"
case "goldendb", "greatdb", "gdb":
return "goldendb"
+ case "mqtt", "mqtts":
+ return "mqtt"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
case "rabbitmq", "rabbit-mq", "rabbit_mq":
@@ -1505,6 +1507,7 @@ func allDriverDefinitionsWithPackages(packages map[string]pinnedDriverPackage) [
{Type: "oracle", Name: "Oracle", Engine: driverEngineGo, BuiltIn: true},
{Type: "redis", Name: "Redis", Engine: driverEngineGo, BuiltIn: true},
{Type: "postgres", Name: "PostgreSQL", Engine: driverEngineGo, BuiltIn: true},
+ {Type: "mqtt", Name: "MQTT", Engine: driverEngineGo, BuiltIn: true},
{Type: "kafka", Name: "Kafka", Engine: driverEngineGo, BuiltIn: true},
{Type: "rabbitmq", Name: "RabbitMQ", Engine: driverEngineGo, BuiltIn: true},
diff --git a/internal/app/methods_driver_version_test.go b/internal/app/methods_driver_version_test.go
index 7181286..8c73e62 100644
--- a/internal/app/methods_driver_version_test.go
+++ b/internal/app/methods_driver_version_test.go
@@ -518,6 +518,22 @@ func TestKafkaDriverDefinitionIsBuiltIn(t *testing.T) {
}
}
+func TestMQTTDriverDefinitionIsBuiltIn(t *testing.T) {
+ definition, ok := resolveDriverDefinition("mqtts")
+ if !ok {
+ t.Fatal("expected mqtt driver definition")
+ }
+ if definition.Name != "MQTT" {
+ t.Fatalf("unexpected mqtt driver name: %q", definition.Name)
+ }
+ if !definition.BuiltIn {
+ t.Fatal("expected mqtt to be a built-in driver")
+ }
+ if definition.PinnedVersion != "" || definition.DefaultDownloadURL != "" {
+ t.Fatalf("expected mqtt builtin definition to omit optional-agent metadata: %#v", definition)
+ }
+}
+
func TestRabbitMQDriverDefinitionIsBuiltIn(t *testing.T) {
definition, ok := resolveDriverDefinition("rabbit-mq")
if !ok {
diff --git a/internal/db/database.go b/internal/db/database.go
index ee24391..a5aacda 100644
--- a/internal/db/database.go
+++ b/internal/db/database.go
@@ -489,6 +489,9 @@ var databaseFactories = map[string]databaseFactory{
"qdrant": func() Database {
return &QdrantDB{}
},
+ "mqtt": func() Database {
+ return &MQTTDB{}
+ },
"kafka": func() Database {
return &KafkaDB{}
},
@@ -535,6 +538,8 @@ func normalizeDatabaseType(dbType string) string {
return "chroma"
case "qdrantdb", "qdrant-db":
return "qdrant"
+ case "mqtt", "mqtts":
+ return "mqtt"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
case "rabbitmq", "rabbit-mq", "rabbit_mq":
diff --git a/internal/db/driver_support.go b/internal/db/driver_support.go
index 0744af6..cb90897 100644
--- a/internal/db/driver_support.go
+++ b/internal/db/driver_support.go
@@ -19,6 +19,7 @@ var coreBuiltinDrivers = map[string]struct{}{
"postgres": {},
"chroma": {},
"qdrant": {},
+ "mqtt": {},
"kafka": {},
"rabbitmq": {},
}
@@ -81,6 +82,8 @@ func normalizeRuntimeDriverType(driverType string) string {
return "chroma"
case "qdrantdb", "qdrant-db":
return "qdrant"
+ case "mqtt", "mqtts":
+ return "mqtt"
case "apache-iotdb", "apache_iotdb", "iotdb":
return "iotdb"
case "kafka", "apache-kafka", "apache_kafka":
@@ -148,6 +151,8 @@ func driverDisplayName(driverType string) string {
return "Chroma"
case "qdrant":
return "Qdrant"
+ case "mqtt":
+ return "MQTT"
case "kafka":
return "Kafka"
case "rabbitmq":
diff --git a/internal/db/mqtt_impl.go b/internal/db/mqtt_impl.go
new file mode 100644
index 0000000..ecc2103
--- /dev/null
+++ b/internal/db/mqtt_impl.go
@@ -0,0 +1,1191 @@
+package db
+
+import (
+ "context"
+ "crypto/tls"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/url"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+ "unicode/utf8"
+
+ "GoNavi-Wails/internal/connection"
+ "GoNavi-Wails/internal/logger"
+ proxytunnel "GoNavi-Wails/internal/proxy"
+ "GoNavi-Wails/internal/ssh"
+
+ pahomqtt "github.com/eclipse/paho.mqtt.golang"
+)
+
+const (
+ defaultMQTTPort = 1883
+ defaultMQTTQueryTimeout = 30 * time.Second
+ defaultMQTTPreviewLimit = 100
+ defaultMQTTFetchWait = 4 * time.Second
+ maxMQTTFetchWait = 30 * time.Second
+ mqttSyntheticDatabase = "topics"
+ mqttDefaultClientID = "GoNavi"
+)
+
+type mqttRuntime interface {
+ Close() error
+ Ping(ctx context.Context) error
+ FetchMessages(ctx context.Context, request mqttFetchRequest) ([]mqttMessageRecord, error)
+ Publish(ctx context.Context, command mqttPublishCommand) (int64, error)
+}
+
+type mqttFetchRequest struct {
+ Topic string
+ Limit int
+ Offset int
+ QoS byte
+ Wait time.Duration
+}
+
+type mqttPublishCommand struct {
+ Topic string
+ Payload interface{}
+ QoS byte
+ Retain bool
+}
+
+type mqttMessageRecord struct {
+ Topic string
+ QoS byte
+ Retained bool
+ Duplicate bool
+ MessageID uint16
+ Payload []byte
+ Decoded interface{}
+ Encoding string
+ ReceivedAt time.Time
+}
+
+type mqttTopicDescriptor struct {
+ Filter string
+ Default bool
+ Wildcard bool
+ Source string
+}
+
+type pahoMQTTRuntime struct {
+ client pahomqtt.Client
+ timeout time.Duration
+}
+
+var newMQTTRuntime = func(config connection.ConnectionConfig) (mqttRuntime, error) {
+ return newPahoMQTTRuntime(config)
+}
+
+type MQTTDB struct {
+ runtime mqttRuntime
+ forwarders []*ssh.LocalForwarder
+ brokers []string
+ defaultTopic string
+ topics []mqttTopicDescriptor
+ defaultQoS byte
+ defaultRetain bool
+ cleanSession bool
+ fetchWait time.Duration
+}
+
+func (m *MQTTDB) Connect(config connection.ConnectionConfig) error {
+ _ = m.Close()
+
+ runConfig := normalizeMQTTConfig(config)
+ if runConfig.UseSSH {
+ sshConfig, brokers, forwarders, err := mqttForwardBrokersOverSSH(runConfig)
+ if err != nil {
+ return err
+ }
+ m.forwarders = forwarders
+ runConfig = sshConfig
+ runConfig.Hosts = brokers[1:]
+ host, port, ok := parseHostPortWithDefault(brokers[0], defaultMQTTPort)
+ if !ok {
+ _ = m.Close()
+ return fmt.Errorf("解析 MQTT SSH 转发地址失败:%s", brokers[0])
+ }
+ runConfig.Host = host
+ runConfig.Port = port
+ runConfig.UseSSH = false
+ logger.Infof("MQTT 通过 SSH 端口转发连接:brokers=%s", strings.Join(brokers, ","))
+ }
+
+ runtime, err := newMQTTRuntime(runConfig)
+ if err != nil {
+ _ = m.Close()
+ return err
+ }
+ m.runtime = runtime
+ m.defaultTopic = mqttDefaultTopic(runConfig)
+ m.topics = mqttConfiguredTopics(runConfig, m.defaultTopic)
+ m.defaultQoS = mqttDefaultQoS(runConfig)
+ m.defaultRetain = mqttDefaultRetain(runConfig)
+ m.cleanSession = mqttCleanSession(runConfig)
+ m.fetchWait = mqttFetchWait(runConfig)
+ m.brokers, _ = mqttBrokerAddresses(runConfig)
+
+ if err := m.Ping(); err != nil {
+ _ = m.Close()
+ return err
+ }
+ return nil
+}
+
+func (m *MQTTDB) Close() error {
+ var firstErr error
+ if m.runtime != nil {
+ if err := m.runtime.Close(); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ m.runtime = nil
+ }
+ for _, forwarder := range m.forwarders {
+ if forwarder == nil {
+ continue
+ }
+ if err := forwarder.Close(); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ m.forwarders = nil
+ m.brokers = nil
+ m.defaultTopic = ""
+ m.topics = nil
+ m.defaultQoS = 0
+ m.defaultRetain = false
+ m.cleanSession = false
+ m.fetchWait = 0
+ return firstErr
+}
+
+func (m *MQTTDB) Ping() error {
+ if m.runtime == nil {
+ return fmt.Errorf("连接未打开")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ return m.runtime.Ping(ctx)
+}
+
+func (m *MQTTDB) Query(query string) ([]map[string]interface{}, []string, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), defaultMQTTQueryTimeout)
+ defer cancel()
+ return m.QueryContext(ctx, query)
+}
+
+func (m *MQTTDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
+ if m.runtime == nil {
+ return nil, nil, fmt.Errorf("连接未打开")
+ }
+ text := strings.TrimSpace(query)
+ if text == "" {
+ return nil, nil, fmt.Errorf("查询语句不能为空")
+ }
+ parsed, ok := parseMQTTSQL(text)
+ if !ok {
+ return nil, nil, fmt.Errorf("MQTT 查询仅支持 SHOW TOPICS、DESCRIBE TOPIC、SELECT * FROM topic 与 CONSUME FROM topic")
+ }
+
+ switch parsed.Action {
+ case "show_topics":
+ rows := mqttTopicRows(m.topics, m.defaultQoS, m.defaultRetain)
+ if parsed.Limit > 0 && len(rows) > parsed.Limit {
+ rows = rows[:parsed.Limit]
+ }
+ return rows, collectColumns(rows), nil
+ case "describe_topic":
+ topic := mqttResolveTopic(parsed.Topic, m.defaultTopic)
+ if topic == "" {
+ return nil, nil, fmt.Errorf("MQTT topic 不能为空")
+ }
+ rows := []map[string]interface{}{mqttDescribeTopicRow(topic, m.topics, m.defaultQoS, m.defaultRetain, m.cleanSession, m.fetchWait, m.brokers)}
+ return rows, collectColumns(rows), nil
+ case "select", "consume":
+ if parsed.Count {
+ return nil, nil, fmt.Errorf("MQTT 不支持 COUNT(*) 总量统计;请使用 SELECT * FROM topic LIMIT n 预览实时消息")
+ }
+ topic := mqttResolveTopic(parsed.Topic, m.defaultTopic)
+ if topic == "" {
+ return nil, nil, fmt.Errorf("MQTT topic 不能为空")
+ }
+ records, err := m.runtime.FetchMessages(ctx, mqttFetchRequest{
+ Topic: topic,
+ Limit: parsed.Limit,
+ Offset: parsed.Offset,
+ QoS: m.defaultQoS,
+ Wait: m.fetchWait,
+ })
+ if err != nil {
+ return nil, nil, err
+ }
+ rows := mqttMessageRows(records)
+ return rows, collectColumns(rows), nil
+ default:
+ return nil, nil, fmt.Errorf("未实现的 MQTT 查询类型:%s", parsed.Action)
+ }
+}
+
+func (m *MQTTDB) Exec(query string) (int64, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), defaultMQTTQueryTimeout)
+ defer cancel()
+ return m.ExecContext(ctx, query)
+}
+
+func (m *MQTTDB) ExecContext(ctx context.Context, query string) (int64, error) {
+ if m.runtime == nil {
+ return 0, fmt.Errorf("连接未打开")
+ }
+ var cmd map[string]interface{}
+ if err := decodeJSONWithUseNumber([]byte(strings.TrimSpace(query)), &cmd); err != nil {
+ return 0, fmt.Errorf("MQTT 写入命令必须是 JSON:%w", err)
+ }
+
+ topic := mqttResolveTopic(firstStringValue(cmd, "publish", "topic", "destination"), m.defaultTopic)
+ if err := mqttValidatePublishTopic(topic); err != nil {
+ return 0, err
+ }
+ if !hasAnyKey(cmd, "payload", "value", "body", "message") {
+ return 0, fmt.Errorf("MQTT publish 命令缺少 payload")
+ }
+ qos, err := mqttQoSFromAny(firstExisting(cmd, "qos"), m.defaultQoS)
+ if err != nil {
+ return 0, err
+ }
+ retain := mqttBoolFromAny(firstExisting(cmd, "retain", "retained"), m.defaultRetain)
+
+ return m.runtime.Publish(ctx, mqttPublishCommand{
+ Topic: topic,
+ Payload: firstExisting(cmd, "payload", "value", "body", "message"),
+ QoS: qos,
+ Retain: retain,
+ })
+}
+
+func (m *MQTTDB) GetDatabases() ([]string, error) {
+ if m.runtime == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ return []string{mqttSyntheticDatabase}, nil
+}
+
+func (m *MQTTDB) GetTables(dbName string) ([]string, error) {
+ if m.runtime == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ names := make([]string, 0, len(m.topics))
+ for _, topic := range m.topics {
+ if strings.TrimSpace(topic.Filter) != "" {
+ names = append(names, topic.Filter)
+ }
+ }
+ sort.Strings(names)
+ return names, nil
+}
+
+func (m *MQTTDB) GetCreateStatement(dbName, tableName string) (string, error) {
+ if m.runtime == nil {
+ return "", fmt.Errorf("连接未打开")
+ }
+ topic := mqttResolveTopic(tableName, m.defaultTopic)
+ if topic == "" {
+ return "", fmt.Errorf("MQTT topic 不能为空")
+ }
+ payload, _ := json.MarshalIndent(
+ mqttDescribeTopicRow(topic, m.topics, m.defaultQoS, m.defaultRetain, m.cleanSession, m.fetchWait, m.brokers),
+ "",
+ " ",
+ )
+ return fmt.Sprintf("// MQTT topic filter: %s\n%s", topic, string(payload)), nil
+}
+
+func (m *MQTTDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) {
+ if m.runtime == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ topic := mqttResolveTopic(tableName, m.defaultTopic)
+ if topic == "" {
+ return nil, fmt.Errorf("MQTT topic 不能为空")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ records, err := m.runtime.FetchMessages(ctx, mqttFetchRequest{
+ Topic: topic,
+ Limit: 20,
+ QoS: m.defaultQoS,
+ Wait: m.fetchWait,
+ })
+ if err != nil {
+ return nil, err
+ }
+ rows := mqttMessageRows(records)
+ columns := []connection.ColumnDefinition{
+ {Name: "topic", Type: "string", Nullable: "NO", Comment: "MQTT topic"},
+ {Name: "qos", Type: "tinyint", Nullable: "NO", Comment: "MQTT QoS level"},
+ {Name: "retained", Type: "bool", Nullable: "YES", Comment: "Whether the message is retained"},
+ {Name: "duplicate", Type: "bool", Nullable: "YES", Comment: "Whether the message is marked as duplicate"},
+ {Name: "message_id", Type: "int", Nullable: "YES", Comment: "MQTT message id"},
+ {Name: "payload", Type: "json", Nullable: "YES", Comment: "Decoded MQTT payload"},
+ {Name: "payload_encoding", Type: "string", Nullable: "YES", Comment: "json / text / base64"},
+ {Name: "payload_bytes", Type: "int", Nullable: "YES", Comment: "Payload size in bytes"},
+ {Name: "received_at", Type: "timestamp", Nullable: "YES", Comment: "Client receive timestamp"},
+ }
+ seen := map[string]struct{}{
+ "topic": {}, "qos": {}, "retained": {}, "duplicate": {}, "message_id": {},
+ "payload": {}, "payload_encoding": {}, "payload_bytes": {}, "received_at": {},
+ }
+ for _, row := range rows {
+ for key, value := range row {
+ if _, exists := seen[key]; exists {
+ continue
+ }
+ if !strings.HasPrefix(key, "payload.") {
+ continue
+ }
+ seen[key] = struct{}{}
+ columns = append(columns, connection.ColumnDefinition{
+ Name: key,
+ Type: inferChromaValueType(value),
+ Nullable: "YES",
+ Comment: "Derived MQTT payload field",
+ })
+ }
+ }
+ return columns, nil
+}
+
+func (m *MQTTDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) {
+ tables, err := m.GetTables(dbName)
+ if err != nil {
+ return nil, err
+ }
+ var result []connection.ColumnDefinitionWithTable
+ for _, table := range tables {
+ cols, err := m.GetColumns(dbName, table)
+ if err != nil {
+ continue
+ }
+ for _, col := range cols {
+ result = append(result, connection.ColumnDefinitionWithTable{
+ TableName: table,
+ Name: col.Name,
+ Type: col.Type,
+ Comment: col.Comment,
+ })
+ }
+ }
+ return result, nil
+}
+
+func (m *MQTTDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) {
+ return []connection.IndexDefinition{
+ {Name: "TOPIC_RECEIVED_AT", ColumnName: "topic", NonUnique: 1, SeqInIndex: 1, IndexType: "SUBSCRIPTION"},
+ {Name: "TOPIC_RECEIVED_AT", ColumnName: "received_at", NonUnique: 1, SeqInIndex: 2, IndexType: "SUBSCRIPTION"},
+ }, nil
+}
+
+func (m *MQTTDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) {
+ return []connection.ForeignKeyDefinition{}, nil
+}
+
+func (m *MQTTDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) {
+ return []connection.TriggerDefinition{}, nil
+}
+
+func (m *MQTTDB) ApplyChanges(tableName string, changes connection.ChangeSet) error {
+ if len(changes.Inserts) == 0 && len(changes.Updates) == 0 && len(changes.Deletes) == 0 {
+ return nil
+ }
+ return fmt.Errorf("MQTT 结果集仅支持只读预览;如需写入请在 SQL 编辑器执行 JSON publish 命令")
+}
+
+func normalizeMQTTConfig(config connection.ConnectionConfig) connection.ConnectionConfig {
+ runConfig := applyMQTTURI(config)
+ if strings.TrimSpace(runConfig.Host) == "" && len(runConfig.Hosts) == 0 {
+ runConfig.Host = "localhost"
+ }
+ if runConfig.Port <= 0 {
+ runConfig.Port = defaultMQTTPort
+ }
+ params := mqttConnectionParams(runConfig)
+ transport := mqttTransportScheme(runConfig)
+ if transport == "ssl" || transport == "wss" || mqttBoolValue(firstNonEmpty(params.Get("ssl"), params.Get("tls"), params.Get("useSSL"), params.Get("use_ssl"))) {
+ runConfig.UseSSL = true
+ }
+ if strings.TrimSpace(runConfig.SSLMode) == "" && runConfig.UseSSL {
+ if mqttBoolValue(firstNonEmpty(params.Get("skip_verify"), params.Get("skipVerify"), params.Get("insecure"))) {
+ runConfig.SSLMode = "skip-verify"
+ } else {
+ runConfig.SSLMode = "required"
+ }
+ }
+ return runConfig
+}
+
+func applyMQTTURI(config connection.ConnectionConfig) connection.ConnectionConfig {
+ uriText := strings.TrimSpace(config.URI)
+ if uriText == "" {
+ return config
+ }
+ parsed, err := url.Parse(uriText)
+ if err != nil {
+ return config
+ }
+ scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme))
+ switch scheme {
+ case "mqtt", "mqtts", "tcp", "ssl", "tls", "ws", "wss":
+ default:
+ return config
+ }
+
+ if parsed.User != nil {
+ if strings.TrimSpace(config.User) == "" {
+ config.User = parsed.User.Username()
+ }
+ if pass, ok := parsed.User.Password(); ok && config.Password == "" {
+ config.Password = pass
+ }
+ }
+
+ hosts := make([]string, 0, 4)
+ for _, entry := range strings.Split(strings.TrimSpace(parsed.Host), ",") {
+ host, port, ok := parseHostPortWithDefault(strings.TrimSpace(entry), defaultMQTTPort)
+ if !ok {
+ continue
+ }
+ hosts = append(hosts, mqttFormatHostPort(host, port))
+ }
+ if len(hosts) > 0 {
+ host, port, ok := parseHostPortWithDefault(hosts[0], defaultMQTTPort)
+ if ok {
+ config.Host = host
+ config.Port = port
+ }
+ if len(hosts) > 1 {
+ config.Hosts = append([]string(nil), hosts[1:]...)
+ }
+ }
+ if topic := strings.Trim(strings.TrimSpace(parsed.Path), "/"); topic != "" && strings.TrimSpace(config.Database) == "" {
+ config.Database = topic
+ }
+ params := parsed.Query()
+ if strings.TrimSpace(config.Topology) == "" {
+ if topology := strings.ToLower(strings.TrimSpace(firstNonEmpty(params.Get("topology"), params.Get("mode")))); topology != "" {
+ config.Topology = topology
+ } else if len(hosts) > 1 {
+ config.Topology = "cluster"
+ }
+ }
+ if scheme == "ssl" || scheme == "tls" || scheme == "mqtts" || scheme == "wss" {
+ config.UseSSL = true
+ if strings.TrimSpace(config.SSLMode) == "" {
+ config.SSLMode = "required"
+ }
+ }
+ return config
+}
+
+func mqttConnectionParams(config connection.ConnectionConfig) url.Values {
+ params := url.Values{}
+ mergeConnectionParamValues(params, connectionParamsFromURI(config.URI, "mqtt", "mqtts", "tcp", "ssl", "tls", "ws", "wss"))
+ mergeConnectionParamValues(params, connectionParamsFromText(config.ConnectionParams))
+ return params
+}
+
+func mqttDefaultTopic(config connection.ConnectionConfig) string {
+ if topic := strings.TrimSpace(config.Database); topic != "" {
+ return topic
+ }
+ params := mqttConnectionParams(config)
+ return strings.TrimSpace(firstNonEmpty(params.Get("defaultTopic"), params.Get("default_topic"), params.Get("topic")))
+}
+
+func mqttConfiguredTopics(config connection.ConnectionConfig, defaultTopic string) []mqttTopicDescriptor {
+ seen := make(map[string]struct{})
+ topics := make([]mqttTopicDescriptor, 0, 8)
+ appendTopic := func(raw string, isDefault bool, source string) {
+ filter := strings.TrimSpace(raw)
+ if filter == "" {
+ return
+ }
+ if _, ok := seen[filter]; ok {
+ if isDefault {
+ for index := range topics {
+ if topics[index].Filter == filter {
+ topics[index].Default = true
+ }
+ }
+ }
+ return
+ }
+ seen[filter] = struct{}{}
+ topics = append(topics, mqttTopicDescriptor{
+ Filter: filter,
+ Default: isDefault,
+ Wildcard: strings.ContainsAny(filter, "#+"),
+ Source: source,
+ })
+ }
+
+ appendTopic(defaultTopic, defaultTopic != "", "default")
+
+ params := mqttConnectionParams(config)
+ for _, key := range []string{"topics", "topicFilters", "topic_filters", "subscriptions", "subscription", "subscribe"} {
+ for _, value := range params[key] {
+ for _, part := range splitMQTTTopicList(value) {
+ appendTopic(part, false, key)
+ }
+ }
+ }
+
+ sort.SliceStable(topics, func(i, j int) bool {
+ if topics[i].Default != topics[j].Default {
+ return topics[i].Default
+ }
+ return topics[i].Filter < topics[j].Filter
+ })
+ return topics
+}
+
+func splitMQTTTopicList(raw string) []string {
+ fields := strings.FieldsFunc(raw, func(r rune) bool {
+ return r == ',' || r == ';' || r == '\n' || r == '\r'
+ })
+ result := make([]string, 0, len(fields))
+ for _, field := range fields {
+ if text := strings.TrimSpace(field); text != "" {
+ result = append(result, text)
+ }
+ }
+ return result
+}
+
+func mqttDefaultQoS(config connection.ConnectionConfig) byte {
+ value, err := mqttQoSFromAny(firstNonEmpty(mqttConnectionParams(config).Get("qos"), "0"), 0)
+ if err != nil {
+ return 0
+ }
+ return value
+}
+
+func mqttDefaultRetain(config connection.ConnectionConfig) bool {
+ params := mqttConnectionParams(config)
+ return mqttBoolValue(firstNonEmpty(params.Get("retain"), params.Get("retained")))
+}
+
+func mqttCleanSession(config connection.ConnectionConfig) bool {
+ params := mqttConnectionParams(config)
+ value := strings.TrimSpace(firstNonEmpty(params.Get("cleanSession"), params.Get("clean_session")))
+ if value == "" {
+ return true
+ }
+ return mqttBoolValue(value)
+}
+
+func mqttFetchWait(config connection.ConnectionConfig) time.Duration {
+ params := mqttConnectionParams(config)
+ for _, key := range []string{"fetchWaitMs", "fetch_wait_ms", "waitMs", "wait_ms"} {
+ if value := strings.TrimSpace(params.Get(key)); value != "" {
+ if ms, err := strconv.Atoi(value); err == nil && ms > 0 {
+ wait := time.Duration(ms) * time.Millisecond
+ if wait > maxMQTTFetchWait {
+ return maxMQTTFetchWait
+ }
+ return wait
+ }
+ }
+ }
+ for _, key := range []string{"fetchWait", "wait"} {
+ if value := strings.TrimSpace(params.Get(key)); value != "" {
+ if seconds, err := strconv.Atoi(value); err == nil && seconds > 0 {
+ wait := time.Duration(seconds) * time.Second
+ if wait > maxMQTTFetchWait {
+ return maxMQTTFetchWait
+ }
+ return wait
+ }
+ }
+ }
+ return defaultMQTTFetchWait
+}
+
+func mqttClientID(config connection.ConnectionConfig) string {
+ params := mqttConnectionParams(config)
+ if clientID := strings.TrimSpace(firstNonEmpty(params.Get("clientId"), params.Get("client_id"))); clientID != "" {
+ return clientID
+ }
+ if id := strings.TrimSpace(config.ID); id != "" {
+ return mqttDefaultClientID + "-" + id
+ }
+ return fmt.Sprintf("%s-%d", mqttDefaultClientID, time.Now().UnixNano())
+}
+
+func mqttTransportScheme(config connection.ConnectionConfig) string {
+ if parsed, err := url.Parse(strings.TrimSpace(config.URI)); err == nil {
+ switch strings.ToLower(strings.TrimSpace(parsed.Scheme)) {
+ case "ssl", "tls", "mqtts":
+ return "ssl"
+ case "wss":
+ return "wss"
+ case "ws":
+ return "ws"
+ case "tcp", "mqtt":
+ return "tcp"
+ }
+ }
+ params := mqttConnectionParams(config)
+ switch strings.ToLower(strings.TrimSpace(firstNonEmpty(params.Get("transport"), params.Get("scheme")))) {
+ case "ssl", "tls", "mqtts":
+ return "ssl"
+ case "wss":
+ return "wss"
+ case "ws":
+ return "ws"
+ }
+ if config.UseSSL {
+ return "ssl"
+ }
+ return "tcp"
+}
+
+func mqttBrokerAddresses(config connection.ConnectionConfig) ([]string, error) {
+ hosts := make([]string, 0, 4)
+ if host, port, ok := parseHostPortWithDefault(net.JoinHostPort(strings.TrimSpace(config.Host), strconv.Itoa(config.Port)), defaultMQTTPort); ok && strings.TrimSpace(host) != "" {
+ hosts = append(hosts, mqttFormatHostPort(host, port))
+ }
+ for _, entry := range config.Hosts {
+ host, port, ok := parseHostPortWithDefault(strings.TrimSpace(entry), defaultMQTTPort)
+ if !ok {
+ continue
+ }
+ hosts = append(hosts, mqttFormatHostPort(host, port))
+ }
+ hosts = uniqueStringsPreserveOrder(hosts)
+ if len(hosts) == 0 {
+ return nil, fmt.Errorf("MQTT 至少需要一个 broker 地址")
+ }
+ return hosts, nil
+}
+
+func mqttFormatHostPort(host string, port int) string {
+ return net.JoinHostPort(strings.TrimSpace(host), strconv.Itoa(port))
+}
+
+func mqttForwardBrokersOverSSH(config connection.ConnectionConfig) (connection.ConnectionConfig, []string, []*ssh.LocalForwarder, error) {
+ brokers, err := mqttBrokerAddresses(config)
+ if err != nil {
+ return connection.ConnectionConfig{}, nil, nil, err
+ }
+ runConfig := config
+ forwarders := make([]*ssh.LocalForwarder, 0, len(brokers))
+ rewritten := make([]string, 0, len(brokers))
+ for _, broker := range brokers {
+ host, port, ok := parseHostPortWithDefault(broker, defaultMQTTPort)
+ if !ok {
+ return connection.ConnectionConfig{}, nil, nil, fmt.Errorf("解析 MQTT broker 地址失败:%s", broker)
+ }
+ forwarder, err := ssh.GetOrCreateLocalForwarder(config.SSH, host, port)
+ if err != nil {
+ return connection.ConnectionConfig{}, nil, nil, fmt.Errorf("创建 MQTT SSH 隧道失败:%w", err)
+ }
+ forwarders = append(forwarders, forwarder)
+ rewritten = append(rewritten, forwarder.LocalAddr)
+ }
+ return runConfig, rewritten, forwarders, nil
+}
+
+func newPahoMQTTRuntime(config connection.ConnectionConfig) (mqttRuntime, error) {
+ brokers, err := mqttBrokerAddresses(config)
+ if err != nil {
+ return nil, err
+ }
+ timeout := getConnectTimeout(config)
+ if timeout <= 0 {
+ timeout = 10 * time.Second
+ }
+ transport := mqttTransportScheme(config)
+ if config.UseProxy && (transport == "ws" || transport == "wss") {
+ return nil, fmt.Errorf("MQTT 当前暂不支持通过代理建立 WebSocket 连接,请改用 tcp/ssl")
+ }
+ tlsConfig, err := resolveGenericTLSConfig(config)
+ if err != nil {
+ return nil, err
+ }
+
+ options := pahomqtt.NewClientOptions().
+ SetClientID(mqttClientID(config)).
+ SetCleanSession(mqttCleanSession(config)).
+ SetOrderMatters(false).
+ SetAutoReconnect(false).
+ SetConnectRetry(false).
+ SetConnectTimeout(timeout).
+ SetWriteTimeout(timeout)
+
+ if user := strings.TrimSpace(config.User); user != "" {
+ options.SetUsername(user)
+ options.SetPassword(config.Password)
+ }
+ if transport == "ssl" || transport == "wss" {
+ options.SetTLSConfig(tlsConfig)
+ }
+ for _, broker := range brokers {
+ options.AddBroker(fmt.Sprintf("%s://%s", transport, broker))
+ }
+ if config.UseProxy {
+ options.SetCustomOpenConnectionFn(mqttProxyOpenConnectionFn(config.Proxy, timeout, tlsConfig))
+ }
+
+ client := pahomqtt.NewClient(options)
+ token := client.Connect()
+ if !token.WaitTimeout(timeout + 5*time.Second) {
+ return nil, fmt.Errorf("MQTT 连接超时")
+ }
+ if err := token.Error(); err != nil {
+ return nil, err
+ }
+ return &pahoMQTTRuntime{
+ client: client,
+ timeout: timeout,
+ }, nil
+}
+
+func mqttProxyOpenConnectionFn(proxyConfig connection.ProxyConfig, timeout time.Duration, tlsConfig *tls.Config) func(uri *url.URL, options pahomqtt.ClientOptions) (net.Conn, error) {
+ return func(uri *url.URL, options pahomqtt.ClientOptions) (net.Conn, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ conn, err := proxytunnel.DialContext(ctx, proxyConfig, "tcp", uri.Host)
+ if err != nil {
+ return nil, err
+ }
+ if uri.Scheme != "ssl" && uri.Scheme != "wss" {
+ return conn, nil
+ }
+
+ effectiveTLS := tlsConfig
+ if effectiveTLS == nil {
+ effectiveTLS = options.TLSConfig
+ }
+ if effectiveTLS == nil {
+ effectiveTLS = &tls.Config{}
+ }
+ cloned := effectiveTLS.Clone()
+ if cloned.ServerName == "" {
+ host, _, splitErr := net.SplitHostPort(uri.Host)
+ if splitErr == nil {
+ cloned.ServerName = host
+ } else {
+ cloned.ServerName = uri.Host
+ }
+ }
+
+ tlsConn := tls.Client(conn, cloned)
+ if err := tlsConn.HandshakeContext(ctx); err != nil {
+ _ = conn.Close()
+ return nil, err
+ }
+ return tlsConn, nil
+ }
+}
+
+func (r *pahoMQTTRuntime) Close() error {
+ if r == nil || r.client == nil {
+ return nil
+ }
+ r.client.Disconnect(250)
+ r.client = nil
+ return nil
+}
+
+func (r *pahoMQTTRuntime) Ping(ctx context.Context) error {
+ if r == nil || r.client == nil {
+ return fmt.Errorf("连接未打开")
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ if !r.client.IsConnectionOpen() {
+ return fmt.Errorf("MQTT 连接已断开")
+ }
+ return nil
+}
+
+func (r *pahoMQTTRuntime) FetchMessages(ctx context.Context, request mqttFetchRequest) ([]mqttMessageRecord, error) {
+ if r == nil || r.client == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ if !r.client.IsConnectionOpen() {
+ return nil, fmt.Errorf("MQTT 连接已断开")
+ }
+
+ limit := request.Limit
+ if limit <= 0 {
+ limit = defaultMQTTPreviewLimit
+ }
+ offset := request.Offset
+ if offset < 0 {
+ offset = 0
+ }
+ wait := request.Wait
+ if wait <= 0 {
+ wait = defaultMQTTFetchWait
+ }
+ if wait > maxMQTTFetchWait {
+ wait = maxMQTTFetchWait
+ }
+
+ bufferSize := limit + offset + 8
+ if bufferSize < 8 {
+ bufferSize = 8
+ }
+ if bufferSize > 1024 {
+ bufferSize = 1024
+ }
+ messageCh := make(chan mqttMessageRecord, bufferSize)
+ callback := func(_ pahomqtt.Client, msg pahomqtt.Message) {
+ record := mqttRecordFromMessage(msg)
+ select {
+ case messageCh <- record:
+ default:
+ }
+ }
+
+ token := r.client.Subscribe(request.Topic, request.QoS, callback)
+ if !token.WaitTimeout(r.timeout) {
+ return nil, fmt.Errorf("MQTT 订阅超时")
+ }
+ if err := token.Error(); err != nil {
+ return nil, fmt.Errorf("MQTT 订阅失败:%w", err)
+ }
+ defer func() {
+ unsub := r.client.Unsubscribe(request.Topic)
+ if !unsub.WaitTimeout(r.timeout) {
+ logger.Warnf("MQTT 取消订阅超时:%s", request.Topic)
+ return
+ }
+ if err := unsub.Error(); err != nil {
+ logger.Warnf("MQTT 取消订阅失败:topic=%s err=%v", request.Topic, err)
+ }
+ }()
+
+ timer := time.NewTimer(wait)
+ defer timer.Stop()
+
+ result := make([]mqttMessageRecord, 0, limit)
+ for len(result) < limit {
+ select {
+ case <-ctx.Done():
+ if len(result) > 0 {
+ return result, nil
+ }
+ return nil, ctx.Err()
+ case <-timer.C:
+ return result, nil
+ case record := <-messageCh:
+ if offset > 0 {
+ offset--
+ continue
+ }
+ result = append(result, record)
+ }
+ }
+ return result, nil
+}
+
+func (r *pahoMQTTRuntime) Publish(ctx context.Context, command mqttPublishCommand) (int64, error) {
+ if r == nil || r.client == nil {
+ return 0, fmt.Errorf("连接未打开")
+ }
+ if !r.client.IsConnectionOpen() {
+ return 0, fmt.Errorf("MQTT 连接已断开")
+ }
+ payload, err := mqttEncodePayload(command.Payload)
+ if err != nil {
+ return 0, err
+ }
+ token := r.client.Publish(command.Topic, command.QoS, command.Retain, payload)
+ wait := r.timeout
+ if deadline, ok := ctx.Deadline(); ok {
+ if remaining := time.Until(deadline); remaining > 0 && remaining < wait {
+ wait = remaining
+ }
+ }
+ if !token.WaitTimeout(wait) {
+ return 0, fmt.Errorf("MQTT 发布超时")
+ }
+ if err := token.Error(); err != nil {
+ return 0, err
+ }
+ return 1, nil
+}
+
+func mqttEncodePayload(payload interface{}) ([]byte, error) {
+ switch typed := payload.(type) {
+ case nil:
+ return []byte{}, nil
+ case []byte:
+ return typed, nil
+ case string:
+ return []byte(typed), nil
+ default:
+ return json.Marshal(typed)
+ }
+}
+
+func mqttRecordFromMessage(message pahomqtt.Message) mqttMessageRecord {
+ decoded, encoding := mqttDecodePayload(message.Payload())
+ return mqttMessageRecord{
+ Topic: message.Topic(),
+ QoS: message.Qos(),
+ Retained: message.Retained(),
+ Duplicate: message.Duplicate(),
+ MessageID: message.MessageID(),
+ Payload: append([]byte(nil), message.Payload()...),
+ Decoded: decoded,
+ Encoding: encoding,
+ ReceivedAt: time.Now(),
+ }
+}
+
+func mqttDecodePayload(payload []byte) (interface{}, string) {
+ if payload == nil {
+ return nil, "text"
+ }
+ var decoded interface{}
+ if err := decodeJSONWithUseNumber(payload, &decoded); err == nil {
+ return decoded, "json"
+ }
+ if utf8.Valid(payload) {
+ return string(payload), "text"
+ }
+ return base64.StdEncoding.EncodeToString(payload), "base64"
+}
+
+type mqttParsedSQL struct {
+ Action string
+ Topic string
+ Limit int
+ Offset int
+ Count bool
+}
+
+var (
+ mqttSQLFromRE = regexp.MustCompile(`(?i)\bFROM\s+(?:"([^"]+)"|` + "`" + `([^` + "`" + `]+)` + "`" + `|([^\s;]+))`)
+ mqttSQLLimitRE = regexp.MustCompile(`(?i)\bLIMIT\s+(\d+)`)
+ mqttSQLOffsetRE = regexp.MustCompile(`(?i)\bOFFSET\s+(\d+)`)
+ mqttShowTopicsRE = regexp.MustCompile(`(?i)^\s*SHOW\s+TOPICS(?:\s+LIMIT\s+(\d+))?\s*;?\s*$`)
+ mqttDescribeTopicRE = regexp.MustCompile(`(?i)^\s*(?:SHOW|DESCRIBE)\s+TOPIC\s+(?:"([^"]+)"|` + "`" + `([^` + "`" + `]+)` + "`" + `|([^\s;]+))\s*;?\s*$`)
+ mqttConsumeTopicRE = regexp.MustCompile(`(?i)^\s*CONSUME\s+FROM\s+(?:"([^"]+)"|` + "`" + `([^` + "`" + `]+)` + "`" + `|([^\s;]+))`)
+)
+
+func parseMQTTSQL(sqlText string) (mqttParsedSQL, bool) {
+ text := strings.TrimSpace(sqlText)
+ if text == "" {
+ return mqttParsedSQL{}, false
+ }
+ if matches := mqttShowTopicsRE.FindStringSubmatch(text); len(matches) > 0 {
+ parsed := mqttParsedSQL{Action: "show_topics"}
+ if len(matches) > 1 && strings.TrimSpace(matches[1]) != "" {
+ parsed.Limit, _ = strconv.Atoi(matches[1])
+ }
+ return parsed, true
+ }
+ if matches := mqttDescribeTopicRE.FindStringSubmatch(text); len(matches) > 0 {
+ return mqttParsedSQL{
+ Action: "describe_topic",
+ Topic: mqttTrimIdentifier(firstNonEmpty(matches[1], matches[2], matches[3])),
+ }, true
+ }
+ if matches := mqttConsumeTopicRE.FindStringSubmatch(text); len(matches) > 0 {
+ parsed := mqttParsedSQL{
+ Action: "consume",
+ Topic: mqttTrimIdentifier(firstNonEmpty(matches[1], matches[2], matches[3])),
+ Limit: defaultMQTTPreviewLimit,
+ }
+ if limitMatch := mqttSQLLimitRE.FindStringSubmatch(text); len(limitMatch) > 1 {
+ parsed.Limit, _ = strconv.Atoi(limitMatch[1])
+ }
+ if offsetMatch := mqttSQLOffsetRE.FindStringSubmatch(text); len(offsetMatch) > 1 {
+ parsed.Offset, _ = strconv.Atoi(offsetMatch[1])
+ }
+ return parsed, true
+ }
+ if !strings.HasPrefix(strings.ToLower(text), "select") {
+ return mqttParsedSQL{}, false
+ }
+ matches := mqttSQLFromRE.FindStringSubmatch(text)
+ if len(matches) == 0 {
+ return mqttParsedSQL{}, false
+ }
+ parsed := mqttParsedSQL{
+ Action: "select",
+ Topic: mqttTrimIdentifier(firstNonEmpty(matches[1], matches[2], matches[3])),
+ Limit: defaultMQTTPreviewLimit,
+ Count: strings.Contains(strings.ToLower(text), "count("),
+ }
+ if limitMatch := mqttSQLLimitRE.FindStringSubmatch(text); len(limitMatch) > 1 {
+ parsed.Limit, _ = strconv.Atoi(limitMatch[1])
+ }
+ if offsetMatch := mqttSQLOffsetRE.FindStringSubmatch(text); len(offsetMatch) > 1 {
+ parsed.Offset, _ = strconv.Atoi(offsetMatch[1])
+ }
+ return parsed, true
+}
+
+func mqttTrimIdentifier(value string) string {
+ return strings.TrimSuffix(strings.TrimSpace(value), ";")
+}
+
+func mqttResolveTopic(raw string, fallback string) string {
+ return strings.TrimSpace(firstNonEmpty(raw, fallback))
+}
+
+func mqttValidatePublishTopic(topic string) error {
+ text := strings.TrimSpace(topic)
+ if text == "" {
+ return fmt.Errorf("MQTT publish 命令缺少 topic")
+ }
+ if strings.ContainsAny(text, "#+") {
+ return fmt.Errorf("MQTT publish topic 不能包含通配符:%s", text)
+ }
+ return nil
+}
+
+func mqttQoSFromAny(value interface{}, fallback byte) (byte, error) {
+ if value == nil {
+ return fallback, nil
+ }
+ qosValue := intFromAny(value, int(fallback))
+ if qosValue < 0 || qosValue > 2 {
+ return 0, fmt.Errorf("MQTT QoS 仅支持 0、1、2")
+ }
+ return byte(qosValue), nil
+}
+
+func mqttBoolFromAny(value interface{}, fallback bool) bool {
+ if value == nil {
+ return fallback
+ }
+ switch typed := value.(type) {
+ case bool:
+ return typed
+ case string:
+ return mqttBoolValue(typed)
+ default:
+ return mqttBoolValue(fmt.Sprintf("%v", value))
+ }
+}
+
+func mqttBoolValue(value string) bool {
+ switch strings.ToLower(strings.TrimSpace(value)) {
+ case "1", "true", "yes", "on", "required":
+ return true
+ default:
+ return false
+ }
+}
+
+func mqttTopicRows(topics []mqttTopicDescriptor, defaultQoS byte, defaultRetain bool) []map[string]interface{} {
+ rows := make([]map[string]interface{}, 0, len(topics))
+ for _, topic := range topics {
+ rows = append(rows, map[string]interface{}{
+ "topic": topic.Filter,
+ "default": topic.Default,
+ "wildcard": topic.Wildcard,
+ "default_qos": int(defaultQoS),
+ "retain": defaultRetain,
+ "source": topic.Source,
+ })
+ }
+ return rows
+}
+
+func mqttDescribeTopicRow(topic string, topics []mqttTopicDescriptor, defaultQoS byte, defaultRetain bool, cleanSession bool, fetchWait time.Duration, brokers []string) map[string]interface{} {
+ configured := false
+ isDefault := false
+ wildcard := strings.ContainsAny(topic, "#+")
+ source := ""
+ for _, entry := range topics {
+ if entry.Filter == topic {
+ configured = true
+ isDefault = entry.Default
+ wildcard = entry.Wildcard
+ source = entry.Source
+ break
+ }
+ }
+ return map[string]interface{}{
+ "topic": topic,
+ "configured": configured,
+ "default": isDefault,
+ "wildcard": wildcard,
+ "source": source,
+ "default_qos": int(defaultQoS),
+ "default_retain": defaultRetain,
+ "clean_session": cleanSession,
+ "fetch_wait_ms": fetchWait.Milliseconds(),
+ "broker_count": len(brokers),
+ "brokers": append([]string(nil), brokers...),
+ }
+}
+
+func mqttMessageRows(records []mqttMessageRecord) []map[string]interface{} {
+ rows := make([]map[string]interface{}, 0, len(records))
+ for _, record := range records {
+ row := map[string]interface{}{
+ "topic": record.Topic,
+ "qos": int(record.QoS),
+ "retained": record.Retained,
+ "duplicate": record.Duplicate,
+ "message_id": int(record.MessageID),
+ "payload": record.Decoded,
+ "payload_encoding": record.Encoding,
+ "payload_bytes": len(record.Payload),
+ "received_at": record.ReceivedAt.Format(time.RFC3339Nano),
+ }
+ if payloadMap, ok := record.Decoded.(map[string]interface{}); ok {
+ flattenMQTTMap("payload", payloadMap, row)
+ }
+ rows = append(rows, row)
+ }
+ return rows
+}
+
+func flattenMQTTMap(prefix string, values map[string]interface{}, row map[string]interface{}) {
+ for key, value := range values {
+ if strings.TrimSpace(key) == "" {
+ continue
+ }
+ name := prefix + "." + key
+ row[name] = value
+ if nested, ok := value.(map[string]interface{}); ok {
+ flattenMQTTMap(name, nested, row)
+ }
+ }
+}
+
+func uniqueStringsPreserveOrder(values []string) []string {
+ seen := make(map[string]struct{}, len(values))
+ result := make([]string, 0, len(values))
+ for _, value := range values {
+ key := strings.TrimSpace(value)
+ if key == "" {
+ continue
+ }
+ if _, ok := seen[key]; ok {
+ continue
+ }
+ seen[key] = struct{}{}
+ result = append(result, key)
+ }
+ return result
+}
diff --git a/internal/db/mqtt_impl_test.go b/internal/db/mqtt_impl_test.go
new file mode 100644
index 0000000..158e456
--- /dev/null
+++ b/internal/db/mqtt_impl_test.go
@@ -0,0 +1,211 @@
+package db
+
+import (
+ "context"
+ "reflect"
+ "strings"
+ "testing"
+ "time"
+
+ "GoNavi-Wails/internal/connection"
+)
+
+func TestNormalizeMQTTConfigParsesURIAndParams(t *testing.T) {
+ config := normalizeMQTTConfig(connection.ConnectionConfig{
+ URI: "mqtt://user:secret@127.0.0.1:1883/devices%2F%2B%2Ftelemetry?topology=cluster&tls=true&skip_verify=true",
+ ConnectionParams: "topics=devices%2F%2B%2Ftelemetry,%24SYS%2F%23&qos=1&retain=false&cleanSession=false&fetchWaitMs=3500",
+ })
+
+ if config.Host != "127.0.0.1" || config.Port != 1883 {
+ t.Fatalf("unexpected mqtt host/port: %#v", config)
+ }
+ if config.User != "user" || config.Password != "secret" {
+ t.Fatalf("unexpected mqtt credentials: %#v", config)
+ }
+ if config.Database != "devices/+/telemetry" {
+ t.Fatalf("unexpected mqtt default topic: %q", config.Database)
+ }
+ if !config.UseSSL || config.SSLMode != "skip-verify" {
+ t.Fatalf("unexpected mqtt tls settings: %#v", config)
+ }
+ if config.Topology != "cluster" {
+ t.Fatalf("unexpected mqtt topology: %q", config.Topology)
+ }
+
+ params := mqttConnectionParams(config)
+ if params.Get("topics") != "devices/+/telemetry,$SYS/#" {
+ t.Fatalf("unexpected mqtt topics param: %#v", params)
+ }
+ if params.Get("qos") != "1" || params.Get("fetchWaitMs") != "3500" {
+ t.Fatalf("unexpected mqtt params: %#v", params)
+ }
+}
+
+func TestMQTTQueryExecAndColumns(t *testing.T) {
+ fakeRuntime := &fakeMQTTRuntime{
+ fetchResponses: map[string][]mqttMessageRecord{
+ "devices/+/telemetry": {
+ {
+ Topic: "devices/device-001/telemetry",
+ QoS: 1,
+ Retained: false,
+ Duplicate: false,
+ MessageID: 12,
+ Payload: []byte(`{"event":"created","meta":{"source":"sensor"}}`),
+ Decoded: map[string]interface{}{"event": "created", "meta": map[string]interface{}{"source": "sensor"}},
+ Encoding: "json",
+ ReceivedAt: time.Date(2026, 6, 14, 11, 0, 0, 0, time.UTC),
+ },
+ {
+ Topic: "devices/device-002/telemetry",
+ QoS: 1,
+ Retained: true,
+ Duplicate: false,
+ MessageID: 13,
+ Payload: []byte("plain-text"),
+ Decoded: "plain-text",
+ Encoding: "text",
+ ReceivedAt: time.Date(2026, 6, 14, 11, 0, 1, 0, time.UTC),
+ },
+ },
+ },
+ }
+
+ originalFactory := newMQTTRuntime
+ newMQTTRuntime = func(config connection.ConnectionConfig) (mqttRuntime, error) {
+ return fakeRuntime, nil
+ }
+ defer func() {
+ newMQTTRuntime = originalFactory
+ }()
+
+ client := &MQTTDB{}
+ if err := client.Connect(connection.ConnectionConfig{
+ Type: "mqtt",
+ Host: "127.0.0.1",
+ Port: 1883,
+ Database: "devices/+/telemetry",
+ ConnectionParams: "topics=devices%2F%2B%2Ftelemetry,%24SYS%2F%23&qos=1&fetchWaitMs=2500",
+ }); err != nil {
+ t.Fatalf("Connect failed: %v", err)
+ }
+ defer client.Close()
+
+ rows, columns, err := client.Query(`SHOW TOPICS LIMIT 2`)
+ if err != nil {
+ t.Fatalf("SHOW TOPICS failed: %v", err)
+ }
+ if len(rows) != 2 || rows[0]["topic"] != "devices/+/telemetry" {
+ t.Fatalf("unexpected mqtt topic rows: %#v", rows)
+ }
+ if !containsString(columns, "wildcard") {
+ t.Fatalf("expected wildcard column, got %v", columns)
+ }
+
+ rows, _, err = client.Query(`DESCRIBE TOPIC "devices/+/telemetry"`)
+ if err != nil {
+ t.Fatalf("DESCRIBE TOPIC failed: %v", err)
+ }
+ if len(rows) != 1 || rows[0]["configured"] != true || rows[0]["default_qos"] != 1 {
+ t.Fatalf("unexpected mqtt describe rows: %#v", rows)
+ }
+
+ rows, columns, err = client.Query(`SELECT * FROM "devices/+/telemetry" LIMIT 1 OFFSET 1`)
+ if err != nil {
+ t.Fatalf("SELECT topic failed: %v", err)
+ }
+ if len(fakeRuntime.fetchRequests) == 0 || fakeRuntime.fetchRequests[len(fakeRuntime.fetchRequests)-1].Offset != 1 {
+ t.Fatalf("expected mqtt fetch offset 1, got %#v", fakeRuntime.fetchRequests)
+ }
+ if len(rows) != 1 || rows[0]["payload"] != "plain-text" || rows[0]["payload_encoding"] != "text" {
+ t.Fatalf("unexpected mqtt message rows: %#v", rows)
+ }
+ if !containsString(columns, "payload_encoding") {
+ t.Fatalf("expected payload_encoding column, got %v", columns)
+ }
+
+ affected, err := client.Exec(`{"publish":"devices/device-001/telemetry","payload":{"id":1},"qos":2,"retain":true}`)
+ if err != nil {
+ t.Fatalf("mqtt publish failed: %v", err)
+ }
+ if affected != 1 {
+ t.Fatalf("unexpected affected rows: %d", affected)
+ }
+ if len(fakeRuntime.published) != 1 {
+ t.Fatalf("expected one mqtt publish call, got %#v", fakeRuntime.published)
+ }
+ if fakeRuntime.published[0].Topic != "devices/device-001/telemetry" || fakeRuntime.published[0].QoS != 2 || !fakeRuntime.published[0].Retain {
+ t.Fatalf("unexpected mqtt publish command: %#v", fakeRuntime.published[0])
+ }
+
+ columnDefs, err := client.GetColumns(mqttSyntheticDatabase, "devices/+/telemetry")
+ if err != nil {
+ t.Fatalf("GetColumns failed: %v", err)
+ }
+ names := make([]string, 0, len(columnDefs))
+ for _, col := range columnDefs {
+ names = append(names, col.Name)
+ }
+ joined := strings.Join(names, ",")
+ for _, want := range []string{"topic", "payload.meta.source", "payload_encoding"} {
+ if !strings.Contains(joined, want) {
+ t.Fatalf("expected mqtt column %q in %s", want, joined)
+ }
+ }
+
+ databases, err := client.GetDatabases()
+ if err != nil {
+ t.Fatalf("GetDatabases failed: %v", err)
+ }
+ if !reflect.DeepEqual(databases, []string{mqttSyntheticDatabase}) {
+ t.Fatalf("unexpected mqtt database list: %#v", databases)
+ }
+
+ tables, err := client.GetTables(mqttSyntheticDatabase)
+ if err != nil {
+ t.Fatalf("GetTables failed: %v", err)
+ }
+ if !reflect.DeepEqual(tables, []string{"$SYS/#", "devices/+/telemetry"}) {
+ t.Fatalf("unexpected mqtt topic list: %#v", tables)
+ }
+
+ if _, _, err := client.Query(`SELECT COUNT(*) FROM "devices/+/telemetry"`); err == nil || !strings.Contains(err.Error(), "COUNT(*)") {
+ t.Fatalf("expected COUNT(*) to be rejected, got %v", err)
+ }
+}
+
+type fakeMQTTRuntime struct {
+ fetchResponses map[string][]mqttMessageRecord
+ fetchRequests []mqttFetchRequest
+ published []mqttPublishCommand
+ closed bool
+}
+
+func (f *fakeMQTTRuntime) Close() error {
+ f.closed = true
+ return nil
+}
+
+func (f *fakeMQTTRuntime) Ping(ctx context.Context) error {
+ return nil
+}
+
+func (f *fakeMQTTRuntime) FetchMessages(ctx context.Context, request mqttFetchRequest) ([]mqttMessageRecord, error) {
+ f.fetchRequests = append(f.fetchRequests, request)
+ items := append([]mqttMessageRecord(nil), f.fetchResponses[request.Topic]...)
+ if request.Offset > 0 {
+ if request.Offset >= len(items) {
+ return []mqttMessageRecord{}, nil
+ }
+ items = items[request.Offset:]
+ }
+ if request.Limit > 0 && len(items) > request.Limit {
+ items = items[:request.Limit]
+ }
+ return items, nil
+}
+
+func (f *fakeMQTTRuntime) Publish(ctx context.Context, command mqttPublishCommand) (int64, error) {
+ f.published = append(f.published, command)
+ return 1, nil
+}