diff --git a/frontend/src/components/ConnectionModal.edit-password.test.tsx b/frontend/src/components/ConnectionModal.edit-password.test.tsx
index d97172c..429b214 100644
--- a/frontend/src/components/ConnectionModal.edit-password.test.tsx
+++ b/frontend/src/components/ConnectionModal.edit-password.test.tsx
@@ -33,10 +33,10 @@ describe('ConnectionModal data source registry', () => {
expect(source).toContain('type === "elasticsearch"');
expect(source).toContain("return '支持索引浏览、Mapping 检查、JSON DSL 和 query_string 查询';");
expect(source).toContain(
- 'type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka") ? "" : "root";',
+ 'type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka" || type === "rabbitmq") ? "" : "root";',
);
expect(source).toContain(
- 'placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka") ? "未开启认证可留空" : undefined}',
+ 'placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka" || dbType === "rabbitmq") ? "未开启认证可留空" : undefined}',
);
expect(source).toContain('label="显示数据库 (留空显示全部)"');
});
@@ -89,6 +89,19 @@ describe('ConnectionModal data source registry', () => {
expect(source).toContain('label="默认 Topic(可选)"');
});
+ it('exposes RabbitMQ in the create-connection picker with management-api and vhost defaults', () => {
+ expect(source).toContain("case 'rabbitmq':");
+ expect(source).toContain('return 15672;');
+ expect(source).toContain('rabbitmq: ["rabbitmq", "http", "https"]');
+ expect(source).toContain("key: 'rabbitmq'");
+ expect(source).toContain("name: 'RabbitMQ'");
+ expect(source).toContain('dbType === "rabbitmq"');
+ expect(source).toContain("return 'Management API / Virtual Host / Queue';");
+ expect(source).toContain('return "rabbitmq://guest:guest@127.0.0.1:15672/%2F?defaultQueue=orders.queue&exchange=events.topic&timeout=30";');
+ expect(source).toContain('return "defaultQueue=orders.queue&exchange=events.topic&managementPathPrefix=/rabbitmq";');
+ expect(source).toContain('label="默认 Virtual Host(可选)"');
+ });
+
it('exposes GaussDB in the create-connection picker with PostgreSQL-family defaults', () => {
expect(source).toContain("case 'gaussdb':");
expect(source).toContain('return 5432;');
diff --git a/frontend/src/components/ConnectionModal.tsx b/frontend/src/components/ConnectionModal.tsx
index f9f4f1c..538a2d9 100644
--- a/frontend/src/components/ConnectionModal.tsx
+++ b/frontend/src/components/ConnectionModal.tsx
@@ -420,6 +420,7 @@ const ConnectionModal: React.FC<{
const isOceanBaseOracle = dbType === "oceanbase" && oceanBaseProtocol === "oracle";
const isMySQLLike = isMySQLCompatibleType(dbType) && !isOceanBaseOracle;
const isKafka = dbType === "kafka";
+ const isRabbitMQ = dbType === "rabbitmq";
const supportsConnectionParams = supportsConnectionParamsForType(dbType);
const isSSLType = supportsSSLForType(dbType);
const supportsSSLCAPath = supportsSSLCAPathForType(dbType);
@@ -1690,6 +1691,46 @@ const ConnectionModal: React.FC<{
};
}
+ if (type === "rabbitmq") {
+ const defaultPort = getDefaultPortByType(type);
+ const parsed = parseSingleHostUri(
+ trimmedUri,
+ ["rabbitmq", "http", "https"],
+ defaultPort,
+ );
+ if (!parsed) {
+ return null;
+ }
+ const lowerUri = trimmedUri.toLowerCase();
+ const tlsEnabled =
+ lowerUri.startsWith("https://") ||
+ normalizeUriBool(
+ parsed.params.get("tls") ||
+ parsed.params.get("ssl") ||
+ parsed.params.get("useSSL") ||
+ parsed.params.get("use_ssl"),
+ );
+ const skipVerify = normalizeUriBool(
+ parsed.params.get("skip_verify") || parsed.params.get("skipVerify"),
+ );
+ const timeoutValue = Number(parsed.params.get("timeout"));
+ return {
+ host: parsed.host,
+ port: parsed.port,
+ user: parsed.username,
+ password: parsed.password,
+ database: parsed.database || "",
+ useSSL: tlsEnabled,
+ sslMode: tlsEnabled ? (skipVerify ? "skip-verify" : "required") : "disable",
+ ...extractSSLPathValuesFromParams(parsed.params, type),
+ connectionParams: serializeConnectionParams(parsed.params),
+ timeout:
+ Number.isFinite(timeoutValue) && timeoutValue > 0
+ ? Math.min(MAX_TIMEOUT_SECONDS, Math.trunc(timeoutValue))
+ : undefined,
+ };
+ }
+
if (type === "clickhouse") {
const httpValues = parseClickHouseHTTPUriToValues(trimmedUri);
if (httpValues) {
@@ -1941,6 +1982,9 @@ const ConnectionModal: React.FC<{
if (dbType === "kafka") {
return "kafka://user:pass@127.0.0.1:9092,127.0.0.2:9092/orders.events?topology=cluster&groupId=analytics&mechanism=scram-sha-256";
}
+ if (dbType === "rabbitmq") {
+ return "rabbitmq://guest:guest@127.0.0.1:15672/%2F?defaultQueue=orders.queue&exchange=events.topic&timeout=30";
+ }
if (dbType === "redis") {
return "redis://:pass@127.0.0.1:6379,127.0.0.2:6379/0?topology=cluster 或 redis://:pass@10.0.0.1:26379,10.0.0.2:26379/0?topology=sentinel&master=mymaster";
}
@@ -1998,6 +2042,8 @@ const ConnectionModal: React.FC<{
return "fetchSize=1024&timeZone=Asia%2FShanghai";
case "kafka":
return "groupId=gonavi&mechanism=scram-sha-256&clientId=gonavi-desktop&startOffset=latest";
+ case "rabbitmq":
+ return "defaultQueue=orders.queue&exchange=events.topic&managementPathPrefix=/rabbitmq";
default:
return "key=value&another=value";
}
@@ -2090,6 +2136,28 @@ const ConnectionModal: React.FC<{
return `kafka://${encodedAuth}${allBrokers.join(",")}${topicPath}${query ? `?${query}` : ""}`;
}
+ if (type === "rabbitmq") {
+ const address = toAddress(host, port, defaultPort);
+ const params = new URLSearchParams();
+ if (values.useSSL) {
+ const mode = String(values.sslMode || "preferred")
+ .trim()
+ .toLowerCase();
+ params.set("tls", "true");
+ if (mode === "skip-verify" || mode === "preferred") {
+ params.set("skip_verify", "true");
+ }
+ appendSSLPathParamsForUri(params, type, values);
+ }
+ if (Number.isFinite(timeout) && timeout > 0) {
+ params.set("timeout", String(timeout));
+ }
+ mergeConnectionParams(params, values.connectionParams);
+ const vhostPath = database ? `/${encodeURIComponent(database)}` : "";
+ const query = params.toString();
+ return `rabbitmq://${encodedAuth}${address}${vhostPath}${query ? `?${query}` : ""}`;
+ }
+
if (type === "redis") {
return buildRedisUriFromValues(values);
}
@@ -3847,7 +3915,7 @@ const ConnectionModal: React.FC<{
});
} else if (type !== "custom") {
const defaultUser =
- type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka") ? "" : "root";
+ type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka" || type === "rabbitmq") ? "" : "root";
const sslCapableType = supportsSSLForType(type);
setUseSSL(false);
setUseHttpTunnel(false);
@@ -4993,6 +5061,22 @@ const ConnectionModal: React.FC<{
),
})}
+ {dbType === "rabbitmq" &&
+ renderConfigSectionCard({
+ sectionKey: "service",
+ icon: ,
+ children: (
+
+
+
+ ),
+ })}
+
{(dbType === "oracle" || isOceanBaseOracle) &&
renderConfigSectionCard({
sectionKey: "service",
@@ -5204,13 +5288,13 @@ const ConnectionModal: React.FC<{
name="user"
label="用户名"
rules={
- (dbType === "mongodb" || dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka")
+ (dbType === "mongodb" || dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka" || dbType === "rabbitmq")
? []
: [createUriAwareRequiredRule("请输入用户名")]
}
style={{ marginBottom: 0 }}
>
-
+
{
expect(markup).toContain('>Kf');
});
+ it('includes RabbitMQ in the selectable database icons', () => {
+ expect(DB_ICON_TYPES).toContain('rabbitmq');
+ expect(getDbIconLabel('rabbitmq')).toBe('RabbitMQ');
+ const markup = renderToStaticMarkup(<>{getDbIcon('rabbitmq', undefined, 22)}>);
+ expect(markup).toContain('>RM');
+ });
+
it('includes GaussDB in the selectable database icons', () => {
expect(DB_ICON_TYPES).toContain('gaussdb');
expect(getDbIconLabel('gaussdb')).toBe('GaussDB');
diff --git a/frontend/src/components/DatabaseIcons.tsx b/frontend/src/components/DatabaseIcons.tsx
index bc14bce..2cb472c 100644
--- a/frontend/src/components/DatabaseIcons.tsx
+++ b/frontend/src/components/DatabaseIcons.tsx
@@ -53,6 +53,7 @@ const DB_DEFAULT_COLORS: Record = {
tdengine: '#2962FF',
iotdb: '#0F766E',
kafka: '#F97316',
+ rabbitmq: '#FF6B35',
chroma: '#7C3AED',
qdrant: '#DC244C',
diros: '#0050B3',
@@ -196,6 +197,9 @@ const IoTDBIcon: React.FC = ({ size = 16, color }) => (
const KafkaIcon: React.FC = ({ size = 16, color }) => (
);
+const RabbitMQIcon: React.FC = ({ size = 16, color }) => (
+
+);
const ChromaIcon: React.FC = ({ size = 16, color }) => (
);
@@ -259,6 +263,7 @@ const DB_ICON_MAP: Record> = {
tdengine: TDengineIcon,
iotdb: IoTDBIcon,
kafka: KafkaIcon,
+ rabbitmq: RabbitMQIcon,
chroma: ChromaIcon,
qdrant: QdrantIcon,
elasticsearch: ElasticsearchIcon,
@@ -269,7 +274,7 @@ const DB_ICON_MAP: Record> = {
export const DB_ICON_TYPES: string[] = [
'mysql', 'mariadb', 'oceanbase', 'postgres', 'redis', 'mongodb', 'jvm',
'oracle', 'sqlserver', 'sqlite', 'duckdb', 'clickhouse', 'starrocks',
- 'kingbase', 'dameng', 'vastbase', 'opengauss', 'gaussdb', 'goldendb', 'highgo', 'iris', 'tdengine', 'iotdb', 'kafka', 'chroma', 'qdrant', 'elasticsearch', 'custom',
+ 'kingbase', 'dameng', 'vastbase', 'opengauss', 'gaussdb', 'goldendb', 'highgo', 'iris', 'tdengine', 'iotdb', 'kafka', 'rabbitmq', 'chroma', 'qdrant', 'elasticsearch', 'custom',
];
/** 该类型是否有品牌 SVG 文件 */
@@ -291,7 +296,7 @@ export const getDbIconLabel = (type: string): string => {
sqlserver: 'SQL Server', clickhouse: 'ClickHouse', sqlite: 'SQLite',
starrocks: 'StarRocks',
duckdb: 'DuckDB', kingbase: '金仓', dameng: '达梦',
- vastbase: 'VastBase', opengauss: 'OpenGauss', gaussdb: 'GaussDB', goldendb: 'GoldenDB', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', kafka: 'Kafka',
+ vastbase: 'VastBase', opengauss: 'OpenGauss', gaussdb: 'GaussDB', goldendb: 'GoldenDB', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', kafka: 'Kafka', rabbitmq: 'RabbitMQ',
chroma: 'Chroma',
qdrant: 'Qdrant',
elasticsearch: 'Elasticsearch',
diff --git a/frontend/src/components/MessagePublishModal.tsx b/frontend/src/components/MessagePublishModal.tsx
new file mode 100644
index 0000000..a5e2504
--- /dev/null
+++ b/frontend/src/components/MessagePublishModal.tsx
@@ -0,0 +1,216 @@
+import React, { useEffect, useMemo, useState } from 'react';
+import { Alert, Form, Input, Modal, Select, Space, Typography, message } from 'antd';
+
+import { DBQuery } from '../../wailsjs/go/app/App';
+import type { SavedConnection } from '../types';
+import { buildRpcConnectionConfig } from '../utils/connectionRpcConfig';
+import {
+ buildMessagePublishCommand,
+ createDefaultMessagePublishDraft,
+ getMessagePublishPresentation,
+ type MessagePublishDraft,
+} from '../utils/messagePublish';
+
+const { Text } = Typography;
+const { TextArea } = Input;
+
+export type MessagePublishModalProps = {
+ open: boolean;
+ connection: SavedConnection | null;
+ executionDbName?: string;
+ defaultDestination?: string;
+ onCancel: () => void;
+ onSuccess?: (result: { destination: string; affectedRows: number; commandText: string }) => void;
+};
+
+const MessagePublishModal: React.FC = ({
+ open,
+ connection,
+ executionDbName = '',
+ defaultDestination = '',
+ onCancel,
+ onSuccess,
+}) => {
+ const [form] = Form.useForm();
+ const [submitting, setSubmitting] = useState(false);
+ const presentation = useMemo(
+ () => getMessagePublishPresentation(connection?.config),
+ [connection],
+ );
+
+ useEffect(() => {
+ if (!open || !connection) return;
+ form.setFieldsValue(
+ createDefaultMessagePublishDraft(
+ connection.config,
+ defaultDestination,
+ ),
+ );
+ }, [connection, defaultDestination, form, open]);
+
+ useEffect(() => {
+ if (open) return;
+ form.resetFields();
+ setSubmitting(false);
+ }, [form, open]);
+
+ const handleSubmit = async () => {
+ if (!connection) return;
+
+ let values: MessagePublishDraft;
+ try {
+ values = await form.validateFields();
+ } catch {
+ return;
+ }
+
+ let command;
+ try {
+ command = buildMessagePublishCommand(connection.config, values);
+ } catch (error: any) {
+ void message.error(error?.message || '构造发送命令失败');
+ return;
+ }
+
+ setSubmitting(true);
+ try {
+ const res = await DBQuery(
+ buildRpcConnectionConfig(connection.config) as any,
+ executionDbName,
+ command.commandText,
+ );
+ if (!res?.success) {
+ void message.error(`发送失败: ${res?.message || '未知错误'}`);
+ return;
+ }
+
+ const affectedRows = Number((res.data as any)?.affectedRows);
+ onSuccess?.({
+ destination: command.destinationLabel,
+ affectedRows: Number.isFinite(affectedRows) ? affectedRows : 0,
+ commandText: command.commandText,
+ });
+ } catch (error: any) {
+ void message.error(`发送失败: ${error?.message || String(error)}`);
+ } finally {
+ setSubmitting(false);
+ }
+ };
+
+ return (
+ { void handleSubmit(); }}
+ okText="发送"
+ confirmLoading={submitting}
+ width={720}
+ destroyOnHidden
+ maskClosable={!submitting}
+ >
+
+
+
+
+
+
+
+ {presentation.showExchange && (
+
+
+
+ )}
+
+ {presentation.showRoutingKey && (
+
+
+
+ )}
+
+ {presentation.showKey && (
+
+
+
+
+
+
+
+
+
+
+ )}
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {presentation.showProperties && (
+
+
+
+ )}
+
+
+
+ {presentation.successHint} 发送成功后会返回 affectedRows,用于确认本次测试消息是否已提交。
+
+
+
+ );
+};
+
+export default MessagePublishModal;
diff --git a/frontend/src/components/Sidebar.message-publish.test.tsx b/frontend/src/components/Sidebar.message-publish.test.tsx
new file mode 100644
index 0000000..25595ff
--- /dev/null
+++ b/frontend/src/components/Sidebar.message-publish.test.tsx
@@ -0,0 +1,23 @@
+import { describe, expect, it } from 'vitest';
+import { readFileSync } from 'node:fs';
+
+const sidebarSource = readFileSync(new URL('./Sidebar.tsx', import.meta.url), 'utf8');
+const contextMenuSource = readFileSync(new URL('./V2TableContextMenu.tsx', import.meta.url), 'utf8');
+const modalSource = readFileSync(new URL('./MessagePublishModal.tsx', import.meta.url), 'utf8');
+
+describe('Sidebar Kafka publish entry', () => {
+ it('adds a Kafka topic publish action in both legacy and v2 table menus', () => {
+ expect(sidebarSource).toContain("key: 'publish-message'");
+ expect(sidebarSource).toContain("label: '测试发送消息'");
+ expect(sidebarSource).toContain('openMessagePublishModal(node)');
+ expect(contextMenuSource).toContain("| 'publish-message'");
+ expect(contextMenuSource).toContain("title: '测试发送消息'");
+ });
+
+ it('renders the dedicated message publish modal and executes DBQuery through the encoder', () => {
+ expect(sidebarSource).toContain('(null);
+ const [messagePublishTarget, setMessagePublishTarget] = useState(null);
const [isRenameViewModalOpen, setIsRenameViewModalOpen] = useState(false);
const [renameViewForm] = Form.useForm();
const [renameViewTarget, setRenameViewTarget] = useState(null);
@@ -4719,6 +4727,37 @@ const Sidebar: React.FC<{
});
};
+ const resolveMessagePublishTarget = (node: any): SidebarMessagePublishTarget | null => {
+ const connectionId = String(node?.dataRef?.id || '').trim();
+ const liveConnection = connections.find((item) => item.id === connectionId);
+ const sourceConnection = (liveConnection || node?.dataRef) as SavedConnection | undefined;
+ if (!sourceConnection?.config) return null;
+ const capabilities = getDataSourceCapabilities(sourceConnection.config);
+ if (!capabilities.supportsMessagePublish) return null;
+
+ return {
+ connection: sourceConnection,
+ executionDbName: String(node?.dataRef?.dbName || ''),
+ destination: String(node?.dataRef?.tableName || node?.title || '').trim(),
+ };
+ };
+
+ const openMessagePublishModal = (node: any) => {
+ const target = resolveMessagePublishTarget(node);
+ if (!target) {
+ message.warning('当前对象不支持测试发送消息');
+ return;
+ }
+ setMessagePublishTarget(target);
+ };
+
+ const handleMessagePublishSuccess = (result: { destination: string; affectedRows: number }) => {
+ const destination = String(result.destination || '').trim();
+ const suffix = result.affectedRows > 0 ? `(已提交 ${result.affectedRows} 条)` : '';
+ message.success(`测试消息已发送到 ${destination || '目标'}${suffix}`);
+ setMessagePublishTarget(null);
+ };
+
const handleV2TableContextMenuAction = (node: any, action: V2TableContextMenuActionKey) => {
switch (action) {
case 'pin-table':
@@ -4746,6 +4785,9 @@ const Sidebar: React.FC<{
});
return;
}
+ case 'publish-message':
+ openMessagePublishModal(node);
+ return;
case 'view-ddl':
openTableDdlInDesigner(node);
return;
@@ -5873,6 +5915,7 @@ const Sidebar: React.FC<{
const statsKey = getV2TableContextMenuStatsKey(node);
const stats = v2TableContextMenuStats[statsKey];
const isStarRocks = getMetadataDialect(node.dataRef as SavedConnection) === 'starrocks';
+ const supportsMessagePublish = Boolean(resolveMessagePublishTarget(node));
const isPinned = isSidebarTablePinned(
pinnedSidebarTables,
String(node?.dataRef?.id || ''),
@@ -5888,6 +5931,7 @@ const Sidebar: React.FC<{
isPinned={isPinned}
supportsTruncate={supportsTableTruncateAction(node.dataRef?.config?.type, node.dataRef?.config?.driver)}
supportsStarRocksRollup={isStarRocks}
+ supportsMessagePublish={supportsMessagePublish}
onAction={(action) => {
setContextMenu(null);
handleV2TableContextMenuAction(node, action);
@@ -7109,6 +7153,7 @@ const Sidebar: React.FC<{
];
} else if (node.type === 'table') {
const isStarRocks = getMetadataDialect(node.dataRef as SavedConnection) === 'starrocks';
+ const messagePublishTarget = resolveMessagePublishTarget(node);
return [
{
key: 'new-query',
@@ -7127,6 +7172,12 @@ const Sidebar: React.FC<{
});
}
},
+ ...(messagePublishTarget ? [{
+ key: 'publish-message',
+ label: '测试发送消息',
+ icon: ,
+ onClick: () => openMessagePublishModal(node),
+ }] : []),
{ type: 'divider' },
{
key: 'design-table',
@@ -8928,6 +8979,14 @@ const Sidebar: React.FC<{
connectionId={findInDbContext.connectionId}
dbName={findInDbContext.dbName}
/>
+ setMessagePublishTarget(null)}
+ onSuccess={handleMessagePublishSuccess}
+ />
);
});
diff --git a/frontend/src/components/V2TableContextMenu.tsx b/frontend/src/components/V2TableContextMenu.tsx
index 95d7113..38747f9 100644
--- a/frontend/src/components/V2TableContextMenu.tsx
+++ b/frontend/src/components/V2TableContextMenu.tsx
@@ -11,6 +11,7 @@ import {
FolderOpenOutlined,
FolderOutlined,
SaveOutlined,
+ SendOutlined,
LinkOutlined,
ReloadOutlined,
TableOutlined,
@@ -39,6 +40,7 @@ export type V2TableContextMenuActionKey =
| 'design-table'
| 'open-new-tab'
| 'new-query'
+ | 'publish-message'
| 'view-ddl'
| 'view-er'
| 'copy-table-name'
@@ -159,6 +161,7 @@ export const V2TableContextMenuView: React.FC<{
isPinned?: boolean;
supportsTruncate?: boolean;
supportsStarRocksRollup?: boolean;
+ supportsMessagePublish?: boolean;
onAction?: (action: V2TableContextMenuActionKey) => void;
}> = ({
tableName,
@@ -167,6 +170,7 @@ export const V2TableContextMenuView: React.FC<{
isPinned = false,
supportsTruncate = true,
supportsStarRocksRollup = false,
+ supportsMessagePublish = false,
onAction,
}) => {
const renderItems = (items: V2TableContextMenuItemConfig[]) => renderV2ContextMenuItems(
@@ -202,6 +206,7 @@ export const V2TableContextMenuView: React.FC<{
{ action: 'design-table', icon: , title: '设计表 · 字段 / 索引 / 外键', kbd: primaryShortcut('D', shortcutPlatform) },
{ action: 'open-new-tab', icon: , title: '在新标签打开', kbd: primaryShortcut('Enter', shortcutPlatform) },
{ action: 'new-query', icon: , title: '新建查询' },
+ ...(supportsMessagePublish ? [{ action: 'publish-message' as const, icon: , title: '测试发送消息' }] : []),
])}
元信息
diff --git a/frontend/src/components/ai/aiConnectionCapabilitiesInsights.test.ts b/frontend/src/components/ai/aiConnectionCapabilitiesInsights.test.ts
index 1e7a65f..f532b9f 100644
--- a/frontend/src/components/ai/aiConnectionCapabilitiesInsights.test.ts
+++ b/frontend/src/components/ai/aiConnectionCapabilitiesInsights.test.ts
@@ -79,4 +79,31 @@ describe('aiConnectionCapabilitiesInsights', () => {
expect(snapshot.restrictions).toContain('create_database_hidden');
expect(snapshot.restrictions).toContain('drop_database_hidden');
});
+
+ it('includes publish_message when the datasource exposes message publish capability', () => {
+ const connections: SavedConnection[] = [{
+ id: 'conn-kafka',
+ name: '订单事件总线',
+ config: {
+ type: 'kafka',
+ host: '127.0.0.1',
+ port: 9092,
+ user: '',
+ database: 'orders.events',
+ },
+ }];
+
+ const snapshot = buildConnectionCapabilitiesSnapshot({
+ connectionId: 'conn-kafka',
+ connections,
+ });
+
+ if (!snapshot.hasConnection || !snapshot.capabilities) {
+ throw new Error('expected kafka connection snapshot');
+ }
+
+ expect(snapshot.capabilities.supportsMessagePublish).toBe(true);
+ expect(snapshot.supportedActions).toContain('publish_message');
+ expect(snapshot.uiHints.join(' ')).toContain('测试发送消息入口');
+ });
});
diff --git a/frontend/src/components/ai/aiConnectionCapabilitiesInsights.ts b/frontend/src/components/ai/aiConnectionCapabilitiesInsights.ts
index fc0fefc..8c68fdc 100644
--- a/frontend/src/components/ai/aiConnectionCapabilitiesInsights.ts
+++ b/frontend/src/components/ai/aiConnectionCapabilitiesInsights.ts
@@ -53,6 +53,7 @@ export const buildConnectionCapabilitiesSnapshot = (params: {
capabilities.supportsCreateDatabase ? 'create_database' : '',
capabilities.supportsRenameDatabase ? 'rename_database' : '',
capabilities.supportsDropDatabase ? 'drop_database' : '',
+ capabilities.supportsMessagePublish ? 'publish_message' : '',
capabilities.supportsApproximateTableCount ? 'approximate_table_count' : '',
capabilities.supportsApproximateTotalPages ? 'approximate_total_pages' : '',
].filter(Boolean);
@@ -75,6 +76,9 @@ export const buildConnectionCapabilitiesSnapshot = (params: {
capabilities.supportsApproximateTableCount
? '表浏览场景允许显示近似行数,减少大表统计开销。'
: '表浏览场景默认不使用近似行数。',
+ capabilities.supportsMessagePublish
+ ? '当前数据源提供测试发送消息入口,适合做 Topic/Queue 的联调验证。'
+ : '当前数据源未暴露测试发送消息入口。',
];
return {
diff --git a/frontend/src/utils/connectionDriverType.ts b/frontend/src/utils/connectionDriverType.ts
index b53ef8f..744ee9e 100644
--- a/frontend/src/utils/connectionDriverType.ts
+++ b/frontend/src/utils/connectionDriverType.ts
@@ -19,6 +19,7 @@ export const normalizeDriverType = (value: string): string => {
if (normalized === 'qdrantdb' || normalized === 'qdrant-db') return 'qdrant';
if (normalized === 'apache-iotdb' || normalized === 'apache_iotdb') return 'iotdb';
if (normalized === 'apache-kafka' || normalized === 'apache_kafka') return 'kafka';
+ if (normalized === 'rabbit-mq' || normalized === 'rabbit_mq') return 'rabbitmq';
if (normalized === 'doris') return 'diros';
if (
normalized === 'open_gauss' ||
diff --git a/frontend/src/utils/connectionModalPresentation.ts b/frontend/src/utils/connectionModalPresentation.ts
index 36e1d01..5d61170 100644
--- a/frontend/src/utils/connectionModalPresentation.ts
+++ b/frontend/src/utils/connectionModalPresentation.ts
@@ -297,6 +297,19 @@ export const resolveConnectionConfigLayout = (
],
};
}
+ if (type === 'rabbitmq') {
+ return {
+ kind: 'generic-sql',
+ sections: [
+ 'identity',
+ 'uri',
+ 'target',
+ 'service',
+ 'credentials',
+ 'databaseScope',
+ ],
+ };
+ }
if (postgresCompatibleTypes.has(type)) {
return {
kind: 'postgres-compatible',
diff --git a/frontend/src/utils/connectionTypeCapabilities.ts b/frontend/src/utils/connectionTypeCapabilities.ts
index 5f43194..21fb00b 100644
--- a/frontend/src/utils/connectionTypeCapabilities.ts
+++ b/frontend/src/utils/connectionTypeCapabilities.ts
@@ -16,6 +16,7 @@ export const singleHostUriSchemesByType: Record = {
elasticsearch: ["http", "https"],
chroma: ["http", "https", "chroma"],
qdrant: ["http", "https", "qdrant"],
+ rabbitmq: ["rabbitmq", "http", "https"],
};
const normalizeConnectionType = (type: string) =>
@@ -59,6 +60,7 @@ const sslSupportedTypes = new Set([
"chroma",
"qdrant",
"kafka",
+ "rabbitmq",
]);
export const supportsSSLForType = (type: string) =>
@@ -86,6 +88,7 @@ const sslCAPathSupportedTypes = new Set([
"chroma",
"qdrant",
"kafka",
+ "rabbitmq",
]);
const sslClientCertificateSupportedTypes = new Set([
@@ -107,6 +110,7 @@ const sslClientCertificateSupportedTypes = new Set([
"mongodb",
"redis",
"kafka",
+ "rabbitmq",
]);
export const supportsSSLCAPathForType = (type: string) =>
@@ -157,4 +161,5 @@ export const supportsConnectionParamsForType = (type: string) =>
type === "elasticsearch" ||
type === "chroma" ||
type === "qdrant" ||
- type === "kafka";
+ type === "kafka" ||
+ type === "rabbitmq";
diff --git a/frontend/src/utils/connectionTypeCatalog.ts b/frontend/src/utils/connectionTypeCatalog.ts
index 0871b03..ebddeca 100644
--- a/frontend/src/utils/connectionTypeCatalog.ts
+++ b/frontend/src/utils/connectionTypeCatalog.ts
@@ -65,6 +65,7 @@ export const CONNECTION_TYPE_GROUPS: ConnectionTypeCatalogGroup[] = [
label: '消息队列',
items: [
{ key: 'kafka', name: 'Kafka' },
+ { key: 'rabbitmq', name: 'RabbitMQ' },
],
},
{
@@ -124,6 +125,8 @@ export const getConnectionTypeDefaultPort = (type: string): number => {
return 6333;
case 'kafka':
return 9092;
+ case 'rabbitmq':
+ return 15672;
case 'highgo':
return 5866;
case 'mariadb':
@@ -158,6 +161,8 @@ export const getConnectionTypeHint = (type: string): string => {
return 'Storage Group / Device / Timeseries';
case 'kafka':
return 'Broker / Topic / Consumer Group';
+ case 'rabbitmq':
+ return 'Management API / Virtual Host / Queue';
case 'oceanbase':
return 'MySQL / Oracle 租户';
case 'goldendb':
diff --git a/frontend/src/utils/dataSourceCapabilities.test.ts b/frontend/src/utils/dataSourceCapabilities.test.ts
index ffc59db..5395984 100644
--- a/frontend/src/utils/dataSourceCapabilities.test.ts
+++ b/frontend/src/utils/dataSourceCapabilities.test.ts
@@ -173,11 +173,33 @@ describe('dataSourceCapabilities', () => {
supportsCreateDatabase: false,
supportsRenameDatabase: false,
supportsDropDatabase: false,
+ supportsMessagePublish: true,
forceReadOnlyQueryResult: true,
});
expect(getDataSourceCapabilities({ type: 'custom', driver: 'apache-kafka' })).toMatchObject({
type: 'kafka',
supportsQueryEditor: true,
+ supportsMessagePublish: true,
+ forceReadOnlyQueryResult: true,
+ });
+ });
+
+ it('treats RabbitMQ as a queryable messaging datasource with publish support', () => {
+ expect(getDataSourceCapabilities({ type: 'rabbitmq' })).toMatchObject({
+ type: 'rabbitmq',
+ supportsQueryEditor: true,
+ supportsSqlQueryExport: false,
+ supportsCopyInsert: false,
+ supportsCreateDatabase: false,
+ supportsRenameDatabase: false,
+ supportsDropDatabase: false,
+ supportsMessagePublish: true,
+ forceReadOnlyQueryResult: true,
+ });
+ expect(getDataSourceCapabilities({ type: 'custom', driver: 'rabbit-mq' })).toMatchObject({
+ type: 'rabbitmq',
+ supportsQueryEditor: true,
+ supportsMessagePublish: true,
forceReadOnlyQueryResult: true,
});
});
diff --git a/frontend/src/utils/dataSourceCapabilities.ts b/frontend/src/utils/dataSourceCapabilities.ts
index 9bc3a65..3bba2d4 100644
--- a/frontend/src/utils/dataSourceCapabilities.ts
+++ b/frontend/src/utils/dataSourceCapabilities.ts
@@ -42,6 +42,10 @@ const normalizeDataSourceToken = (raw: string): string => {
case 'apache-kafka':
case 'apache_kafka':
return 'kafka';
+ case 'rabbitmq':
+ case 'rabbit-mq':
+ case 'rabbit_mq':
+ return 'rabbitmq';
case 'intersystems':
case 'intersystemsiris':
case 'inter-systems':
@@ -117,7 +121,8 @@ const COPY_INSERT_TYPES = new Set([
]);
const QUERY_EDITOR_DISABLED_TYPES = new Set(['redis']);
-const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse', 'kafka']);
+const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse', 'kafka', 'rabbitmq']);
+const MESSAGE_PUBLISH_TYPES = new Set(['kafka', 'rabbitmq']);
const MANUAL_TOTAL_COUNT_TYPES = new Set(['duckdb', 'oracle']);
const APPROXIMATE_TABLE_COUNT_TYPES = new Set(['duckdb', 'oracle']);
const APPROXIMATE_TOTAL_PAGE_TYPES = new Set(['duckdb']);
@@ -130,6 +135,7 @@ export type DataSourceCapabilities = {
supportsCreateDatabase: boolean;
supportsRenameDatabase: boolean;
supportsDropDatabase: boolean;
+ supportsMessagePublish: boolean;
forceReadOnlyQueryResult: boolean;
preferManualTotalCount: boolean;
supportsApproximateTableCount: boolean;
@@ -191,6 +197,7 @@ export const getDataSourceCapabilities = (config: ConnectionLike): DataSourceCap
supportsCreateDatabase: CREATE_DATABASE_TYPES.has(type),
supportsRenameDatabase: RENAME_DATABASE_TYPES.has(type),
supportsDropDatabase: DROP_DATABASE_TYPES.has(type),
+ supportsMessagePublish: MESSAGE_PUBLISH_TYPES.has(type),
forceReadOnlyQueryResult: FORCE_READ_ONLY_QUERY_TYPES.has(type),
preferManualTotalCount: MANUAL_TOTAL_COUNT_TYPES.has(type),
supportsApproximateTableCount: APPROXIMATE_TABLE_COUNT_TYPES.has(type),
diff --git a/frontend/src/utils/messagePublish.test.ts b/frontend/src/utils/messagePublish.test.ts
new file mode 100644
index 0000000..c56a971
--- /dev/null
+++ b/frontend/src/utils/messagePublish.test.ts
@@ -0,0 +1,100 @@
+import { describe, expect, it } from 'vitest';
+
+import {
+ buildMessagePublishCommand,
+ createDefaultMessagePublishDraft,
+} from './messagePublish';
+
+describe('messagePublish', () => {
+ it('builds a Kafka publish JSON command from JSON payload inputs', () => {
+ const result = buildMessagePublishCommand(
+ { type: 'kafka' },
+ {
+ destination: 'orders.events',
+ keyMode: 'json',
+ key: '{"tenant":"a"}',
+ bodyMode: 'json',
+ body: '{"id":1,"event":"created"}',
+ headers: '{"x-env":"dev"}',
+ },
+ );
+
+ expect(result.transportLabel).toBe('Kafka Topic');
+ expect(result.destinationLabel).toBe('orders.events');
+ expect(result.commandText).toContain('"publish": "orders.events"');
+ expect(result.commandText).toContain('"tenant": "a"');
+ expect(result.commandText).toContain('"id": 1');
+ expect(result.commandText).toContain('"x-env": "dev"');
+ });
+
+ it('keeps Kafka text payloads as plain strings', () => {
+ const result = buildMessagePublishCommand(
+ { type: 'kafka' },
+ {
+ destination: 'logs.app',
+ keyMode: 'text',
+ key: 'tenant-a',
+ bodyMode: 'text',
+ body: 'hello gonavi',
+ headers: '',
+ },
+ );
+
+ expect(result.commandText).toContain('"key": "tenant-a"');
+ expect(result.commandText).toContain('"value": "hello gonavi"');
+ });
+
+ it('rejects non-object Kafka headers', () => {
+ expect(() => buildMessagePublishCommand(
+ { type: 'kafka' },
+ {
+ destination: 'logs.app',
+ bodyMode: 'json',
+ body: '{"ok":true}',
+ headers: '["bad"]',
+ },
+ )).toThrow('Headers 必须是 JSON 对象');
+ });
+
+ it('seeds Kafka default publish draft with a JSON body example', () => {
+ expect(createDefaultMessagePublishDraft({ type: 'kafka' }, 'orders.events')).toMatchObject({
+ destination: 'orders.events',
+ keyMode: 'text',
+ bodyMode: 'json',
+ });
+ });
+
+ it('builds a RabbitMQ publish JSON command with routing and properties', () => {
+ const result = buildMessagePublishCommand(
+ { type: 'rabbitmq', connectionParams: 'defaultQueue=orders.queue&exchange=events.topic' },
+ {
+ destination: 'orders.queue',
+ exchange: '',
+ routingKey: '',
+ bodyMode: 'json',
+ body: '{"id":1,"event":"created"}',
+ headers: '{"x-env":"dev"}',
+ properties: '{"content_type":"application/json"}',
+ },
+ );
+
+ expect(result.transportLabel).toBe('RabbitMQ Queue');
+ expect(result.destinationLabel).toBe('orders.queue');
+ expect(result.commandText).toContain('"publish": "orders.queue"');
+ expect(result.commandText).toContain('"exchange": "events.topic"');
+ expect(result.commandText).toContain('"routing_key": "orders.queue"');
+ expect(result.commandText).toContain('"content_type": "application/json"');
+ });
+
+ it('seeds RabbitMQ default publish draft with defaultQueue and exchange', () => {
+ expect(createDefaultMessagePublishDraft(
+ { type: 'rabbitmq', connectionParams: 'defaultQueue=orders.queue&exchange=events.topic' },
+ '',
+ )).toMatchObject({
+ destination: 'orders.queue',
+ exchange: 'events.topic',
+ routingKey: 'orders.queue',
+ bodyMode: 'json',
+ });
+ });
+});
diff --git a/frontend/src/utils/messagePublish.ts b/frontend/src/utils/messagePublish.ts
new file mode 100644
index 0000000..4ff7a34
--- /dev/null
+++ b/frontend/src/utils/messagePublish.ts
@@ -0,0 +1,274 @@
+import { resolveDataSourceType } from './dataSourceCapabilities';
+
+type ConnectionLike = {
+ type?: string;
+ driver?: string;
+ oceanBaseProtocol?: string;
+ database?: string;
+ uri?: string;
+ connectionParams?: string;
+} | null | undefined;
+
+export type MessagePublishValueMode = 'text' | 'json';
+
+export type MessagePublishDraft = {
+ destination: string;
+ exchange?: string;
+ routingKey?: string;
+ keyMode?: MessagePublishValueMode;
+ key?: string;
+ bodyMode?: MessagePublishValueMode;
+ body: string;
+ headers?: string;
+ properties?: string;
+};
+
+export type MessagePublishCommand = {
+ commandText: string;
+ destinationLabel: string;
+ transportLabel: string;
+};
+
+export type MessagePublishPresentation = {
+ transportLabel: string;
+ destinationLabel: string;
+ destinationPlaceholder: string;
+ destinationRequiredMessage: string;
+ alertMessage: string;
+ successHint: string;
+ showKey: boolean;
+ showExchange: boolean;
+ showRoutingKey: boolean;
+ showProperties: boolean;
+};
+
+const normalizeMode = (value: unknown, fallback: MessagePublishValueMode): MessagePublishValueMode => {
+ const normalized = String(value || '').trim().toLowerCase();
+ if (normalized === 'text') return 'text';
+ if (normalized === 'json') return 'json';
+ return fallback;
+};
+
+const parseRequiredPayload = (
+ rawValue: unknown,
+ mode: MessagePublishValueMode,
+ fieldLabel: string,
+): string | number | boolean | Record | Array => {
+ const text = String(rawValue ?? '');
+ if (!text.trim()) {
+ throw new Error(`请输入${fieldLabel}`);
+ }
+ if (mode === 'text') {
+ return text;
+ }
+ try {
+ return JSON.parse(text);
+ } catch (error: any) {
+ throw new Error(`${fieldLabel}不是合法 JSON:${error?.message || String(error)}`);
+ }
+};
+
+const parseOptionalPayload = (
+ rawValue: unknown,
+ mode: MessagePublishValueMode,
+ fieldLabel: string,
+): string | number | boolean | Record | Array | undefined => {
+ const text = String(rawValue ?? '');
+ if (!text.trim()) {
+ return undefined;
+ }
+ return parseRequiredPayload(text, mode, fieldLabel);
+};
+
+const parseOptionalJSONObject = (
+ rawValue: unknown,
+ fieldLabel: string,
+): Record | undefined => {
+ const text = String(rawValue ?? '');
+ if (!text.trim()) {
+ return undefined;
+ }
+ let parsed: unknown;
+ try {
+ parsed = JSON.parse(text);
+ } catch (error: any) {
+ throw new Error(`${fieldLabel}不是合法 JSON:${error?.message || String(error)}`);
+ }
+ if (!parsed || Array.isArray(parsed) || typeof parsed !== 'object') {
+ throw new Error(`${fieldLabel} 必须是 JSON 对象`);
+ }
+ return parsed as Record;
+};
+
+const mergeSearchParams = (target: URLSearchParams, sourceText: unknown) => {
+ const text = String(sourceText ?? '').trim();
+ if (!text) return;
+ const raw = text.includes('?') ? text.slice(text.indexOf('?') + 1) : text;
+ const params = new URLSearchParams(raw.replace(/^\?/, ''));
+ params.forEach((value, key) => {
+ if (String(key || '').trim()) {
+ target.set(key, value);
+ }
+ });
+};
+
+const resolveConnectionParams = (config: ConnectionLike): URLSearchParams => {
+ const params = new URLSearchParams();
+ if (!config) return params;
+ mergeSearchParams(params, config.uri);
+ mergeSearchParams(params, config.connectionParams);
+ return params;
+};
+
+const normalizeRabbitMQExchange = (value: unknown): string => {
+ const normalized = String(value ?? '').trim();
+ if (normalized === 'amq.default' || normalized === '(default)') {
+ return '';
+ }
+ return normalized;
+};
+
+const resolveDefaultDestination = (config: ConnectionLike, explicitDestination: string): string => {
+ const destination = String(explicitDestination || '').trim();
+ if (destination) return destination;
+
+ const resolvedType = resolveDataSourceType(config as any);
+ const params = resolveConnectionParams(config);
+
+ if (resolvedType === 'kafka') {
+ return String(config?.database || '').trim();
+ }
+ if (resolvedType === 'rabbitmq') {
+ return String(params.get('defaultQueue') || params.get('queue') || '').trim();
+ }
+ return '';
+};
+
+export const getMessagePublishPresentation = (
+ config: ConnectionLike,
+): MessagePublishPresentation => {
+ const resolvedType = resolveDataSourceType(config as any);
+
+ if (resolvedType === 'rabbitmq') {
+ return {
+ transportLabel: 'RabbitMQ Queue',
+ destinationLabel: 'Queue',
+ destinationPlaceholder: '例如:orders.queue',
+ destinationRequiredMessage: '请输入 Queue',
+ alertMessage: '当前表单会自动拼装 RabbitMQ publish JSON 命令,并通过 Management API 执行测试发送。',
+ successHint: '留空 Exchange 时会使用默认交换机并按 Queue 名作为 routing key。',
+ showKey: false,
+ showExchange: true,
+ showRoutingKey: true,
+ showProperties: true,
+ };
+ }
+
+ return {
+ transportLabel: 'Kafka Topic',
+ destinationLabel: 'Topic',
+ destinationPlaceholder: '例如:orders.events',
+ destinationRequiredMessage: '请输入 Topic',
+ alertMessage: '当前表单会自动拼装 Kafka publish JSON 命令,并直接调用后端执行测试发送。',
+ successHint: 'Headers 会作为 Kafka Record Headers 一并发送。',
+ showKey: true,
+ showExchange: false,
+ showRoutingKey: false,
+ showProperties: false,
+ };
+};
+
+export const createDefaultMessagePublishDraft = (
+ config: ConnectionLike,
+ destination = '',
+): MessagePublishDraft => {
+ const resolvedType = resolveDataSourceType(config as any);
+ const resolvedDestination = resolveDefaultDestination(config, destination);
+ const params = resolveConnectionParams(config);
+
+ if (resolvedType === 'rabbitmq') {
+ return {
+ destination: resolvedDestination,
+ exchange: normalizeRabbitMQExchange(params.get('defaultExchange') || params.get('exchange') || ''),
+ routingKey: resolvedDestination,
+ bodyMode: 'json',
+ body: '{\n "event": "test",\n "source": "gonavi"\n}',
+ headers: '{\n "x-source": "gonavi"\n}',
+ properties: '{\n "content_type": "application/json"\n}',
+ };
+ }
+
+ return {
+ destination: resolvedDestination,
+ keyMode: 'text',
+ key: '',
+ bodyMode: 'json',
+ body: '{\n "event": "test",\n "source": "gonavi"\n}',
+ headers: '{\n "x-source": "gonavi"\n}',
+ };
+};
+
+export const buildMessagePublishCommand = (
+ config: ConnectionLike,
+ draft: MessagePublishDraft,
+): MessagePublishCommand => {
+ const resolvedType = resolveDataSourceType(config as any);
+ const destination = String(draft.destination || '').trim();
+ if (!destination) {
+ throw new Error('请输入目标 Topic / Queue');
+ }
+
+ if (resolvedType === 'rabbitmq') {
+ const params = resolveConnectionParams(config);
+ const bodyMode = normalizeMode(draft.bodyMode, 'json');
+ const command: Record = {
+ publish: destination,
+ payload: parseRequiredPayload(draft.body, bodyMode, '消息体'),
+ exchange: normalizeRabbitMQExchange(draft.exchange || params.get('defaultExchange') || params.get('exchange') || ''),
+ routing_key: String(draft.routingKey || '').trim() || destination,
+ };
+
+ const headers = parseOptionalJSONObject(draft.headers, 'Headers');
+ if (headers && Object.keys(headers).length > 0) {
+ command.headers = headers;
+ }
+
+ const properties = parseOptionalJSONObject(draft.properties, 'Properties');
+ if (properties && Object.keys(properties).length > 0) {
+ command.properties = properties;
+ }
+
+ return {
+ commandText: JSON.stringify(command, null, 2),
+ destinationLabel: destination,
+ transportLabel: 'RabbitMQ Queue',
+ };
+ }
+
+ if (resolvedType === 'kafka') {
+ const keyMode = normalizeMode(draft.keyMode, 'text');
+ const bodyMode = normalizeMode(draft.bodyMode, 'json');
+ const command: Record = {
+ publish: destination,
+ value: parseRequiredPayload(draft.body, bodyMode, '消息体'),
+ };
+
+ const keyPayload = parseOptionalPayload(draft.key, keyMode, '消息 Key');
+ if (keyPayload !== undefined) {
+ command.key = keyPayload;
+ }
+
+ const headers = parseOptionalJSONObject(draft.headers, 'Headers');
+ if (headers && Object.keys(headers).length > 0) {
+ command.headers = headers;
+ }
+
+ return {
+ commandText: JSON.stringify(command, null, 2),
+ destinationLabel: destination,
+ transportLabel: 'Kafka Topic',
+ };
+ }
+
+ throw new Error(`当前数据源暂不支持测试发送消息:${resolvedType || 'unknown'}`);
+};
diff --git a/frontend/src/utils/objectQueryTemplates.test.ts b/frontend/src/utils/objectQueryTemplates.test.ts
index ffb3207..0c02e6a 100644
--- a/frontend/src/utils/objectQueryTemplates.test.ts
+++ b/frontend/src/utils/objectQueryTemplates.test.ts
@@ -10,4 +10,8 @@ describe('buildTableSelectQuery', () => {
it('adds a preview limit for Kafka topic browsing', () => {
expect(buildTableSelectQuery('kafka', 'logs.app-1')).toBe('SELECT * FROM "logs.app-1" LIMIT 100;');
});
+
+ it('adds a preview limit for RabbitMQ queue browsing', () => {
+ expect(buildTableSelectQuery('rabbitmq', 'orders.events.v1')).toBe('SELECT * FROM "orders.events.v1" LIMIT 100;');
+ });
});
diff --git a/frontend/src/utils/objectQueryTemplates.ts b/frontend/src/utils/objectQueryTemplates.ts
index e19e4b7..11678be 100644
--- a/frontend/src/utils/objectQueryTemplates.ts
+++ b/frontend/src/utils/objectQueryTemplates.ts
@@ -5,7 +5,7 @@ export const buildTableSelectQuery = (dbType: string, tableName: string): string
if (!normalizedTableName) {
return 'SELECT * FROM ';
}
- if (String(dbType || '').trim().toLowerCase() === 'kafka') {
+ if (['kafka', 'rabbitmq'].includes(String(dbType || '').trim().toLowerCase())) {
return `SELECT * FROM ${quoteQualifiedIdent(dbType, normalizedTableName)} LIMIT 100;`;
}
return `SELECT * FROM ${quoteQualifiedIdent(dbType, normalizedTableName)};`;
diff --git a/frontend/src/utils/sql.test.ts b/frontend/src/utils/sql.test.ts
index 08cc1b2..0515129 100644
--- a/frontend/src/utils/sql.test.ts
+++ b/frontend/src/utils/sql.test.ts
@@ -64,6 +64,11 @@ describe('quoteQualifiedIdent', () => {
.toBe('"logs.app-1"');
});
+ it('keeps RabbitMQ queue names as one quoted identifier', () => {
+ expect(quoteQualifiedIdent('rabbitmq', 'orders.events.v1'))
+ .toBe('"orders.events.v1"');
+ });
+
it('quotes GoldenDB identifiers with MySQL-style backticks', () => {
expect(quoteQualifiedIdent('goldendb', 'ledger.entries'))
.toBe('`ledger`.`entries`');
diff --git a/frontend/src/utils/sql.ts b/frontend/src/utils/sql.ts
index 1281ea3..a1f122d 100644
--- a/frontend/src/utils/sql.ts
+++ b/frontend/src/utils/sql.ts
@@ -54,7 +54,7 @@ export const quoteIdentPart = (dbType: string, ident: string) => {
export const quoteQualifiedIdent = (dbType: string, ident: string) => {
const raw = (ident || '').trim();
if (!raw) return raw;
- if ((dbType || '').trim().toLowerCase() === 'kafka') {
+ if (['kafka', 'rabbitmq'].includes((dbType || '').trim().toLowerCase())) {
return quoteIdentPart(dbType, raw);
}
const parts = splitQualifiedNameSegments(raw).filter(Boolean);
diff --git a/frontend/src/utils/sqlDialect.test.ts b/frontend/src/utils/sqlDialect.test.ts
index 6fd849e..0efaa9f 100644
--- a/frontend/src/utils/sqlDialect.test.ts
+++ b/frontend/src/utils/sqlDialect.test.ts
@@ -40,6 +40,8 @@ describe('sqlDialect', () => {
expect(resolveSqlDialect('custom', 'apache_iotdb')).toBe('iotdb');
expect(resolveSqlDialect('Apache-Kafka')).toBe('kafka');
expect(resolveSqlDialect('custom', 'apache_kafka')).toBe('kafka');
+ expect(resolveSqlDialect('Rabbit-MQ')).toBe('rabbitmq');
+ expect(resolveSqlDialect('custom', 'rabbit_mq')).toBe('rabbitmq');
expect(resolveSqlDialect('OceanBase', '', { oceanBaseProtocol: 'oracle' })).toBe('oracle');
expect(resolveSqlDialect('custom', 'oceanbase', { oceanBaseProtocol: 'oracle' })).toBe('oracle');
expect(isMysqlFamilyDialect('mariadb')).toBe(true);
@@ -78,6 +80,11 @@ describe('sqlDialect', () => {
expect(resolveSqlKeywords('kafka')).not.toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'AUTO_INCREMENT']));
});
+ it('resolves RabbitMQ completion keywords for queue and exchange discovery', () => {
+ expect(resolveSqlKeywords('rabbitmq')).toEqual(expect.arrayContaining(['SHOW VHOSTS', 'SHOW QUEUES', 'SHOW EXCHANGES', 'DESCRIBE QUEUE']));
+ expect(resolveSqlKeywords('rabbitmq')).not.toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'AUTO_INCREMENT']));
+ });
+
it('resolves GaussDB completion keywords and functions as a PostgreSQL-like dialect', () => {
expect(resolveSqlKeywords('gaussdb')).toEqual(expect.arrayContaining(['RETURNING', 'SERIAL', 'JSONB']));
expect(names(resolveSqlFunctions('gaussdb'))).toEqual(expect.arrayContaining(['STRING_AGG', 'TO_CHAR', 'CURRENT_DATABASE']));
diff --git a/frontend/src/utils/sqlDialect.ts b/frontend/src/utils/sqlDialect.ts
index 05c9c90..df8ffa1 100644
--- a/frontend/src/utils/sqlDialect.ts
+++ b/frontend/src/utils/sqlDialect.ts
@@ -30,6 +30,7 @@ export type SqlDialect =
| 'tdengine'
| 'iotdb'
| 'kafka'
+ | 'rabbitmq'
| 'mongodb'
| 'redis'
| 'elasticsearch'
@@ -140,6 +141,10 @@ export const resolveSqlDialect = (
case 'apache-kafka':
case 'apache_kafka':
return 'kafka';
+ case 'rabbitmq':
+ case 'rabbit-mq':
+ case 'rabbit_mq':
+ return 'rabbitmq';
default:
break;
}
@@ -165,6 +170,7 @@ export const resolveSqlDialect = (
if (source.includes('tdengine')) return 'tdengine';
if (source.includes('iotdb')) return 'iotdb';
if (source.includes('kafka')) return 'kafka';
+ if (source.includes('rabbitmq') || source.includes('rabbit-mq') || source.includes('rabbit_mq')) return 'rabbitmq';
if (source.includes('sqlserver') || source.includes('mssql')) return 'sqlserver';
if (source.includes('iris') || source.includes('intersystems')) return 'iris';
if (source.includes('elastic')) return 'elasticsearch';
@@ -628,6 +634,18 @@ const KAFKA_KEYWORDS = [
'OFFSET',
];
+const RABBITMQ_KEYWORDS = [
+ 'SHOW VHOSTS',
+ 'SHOW QUEUES',
+ 'SHOW EXCHANGES',
+ 'DESCRIBE QUEUE',
+ 'DESCRIBE EXCHANGE',
+ 'CONSUME',
+ 'FROM',
+ 'LIMIT',
+ 'OFFSET',
+];
+
export const resolveSqlKeywords = (dbType: string): string[] => {
const dialect = resolveSqlDialect(dbType);
if (dialect === 'starrocks') return unique([...COMMON_KEYWORDS, ...MYSQL_KEYWORDS, ...STARROCKS_KEYWORDS]);
@@ -641,6 +659,7 @@ export const resolveSqlKeywords = (dbType: string): string[] => {
if (dialect === 'tdengine') return unique([...COMMON_KEYWORDS, ...TDENGINE_KEYWORDS]);
if (dialect === 'iotdb') return unique([...COMMON_KEYWORDS, ...IOTDB_KEYWORDS]);
if (dialect === 'kafka') return unique([...COMMON_KEYWORDS, ...KAFKA_KEYWORDS]);
+ if (dialect === 'rabbitmq') return unique([...COMMON_KEYWORDS, ...RABBITMQ_KEYWORDS]);
return COMMON_KEYWORDS;
};
diff --git a/internal/app/db_context.go b/internal/app/db_context.go
index 7335062..24f3e1f 100644
--- a/internal/app/db_context.go
+++ b/internal/app/db_context.go
@@ -22,7 +22,7 @@ func normalizeRunConfig(config connection.ConnectionConfig, dbName string) conne
if !isOceanBaseOracleProtocol(config) {
runConfig.Database = name
}
- case "mysql", "mariadb", "goldendb", "greatdb", "gdb", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "sqlserver", "iris", "intersystems", "intersystemsiris", "inter-systems", "inter-systems-iris", "mongodb", "tdengine", "iotdb", "clickhouse":
+ case "mysql", "mariadb", "goldendb", "greatdb", "gdb", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "sqlserver", "iris", "intersystems", "intersystemsiris", "inter-systems", "inter-systems-iris", "mongodb", "tdengine", "iotdb", "clickhouse", "rabbitmq", "rabbit-mq", "rabbit_mq":
// 这些类型的 dbName 表示"数据库",需要写入连接配置以选择目标库。
runConfig.Database = name
case "dameng":
@@ -53,7 +53,7 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string,
// Elasticsearch:索引名可能含多个点(如 iot_pro_biz_operate_log.index.20240626),
// 不能按点分割,直接返回原始数据库名和完整表名。
- if dbType == "elasticsearch" || dbType == "iotdb" || dbType == "kafka" {
+ if dbType == "elasticsearch" || dbType == "iotdb" || dbType == "kafka" || dbType == "rabbitmq" {
return rawDB, rawTable
}
@@ -112,7 +112,7 @@ func normalizeSchemaAndTable(config connection.ConnectionConfig, dbName string,
func normalizeMetadataSchemaAndTable(config connection.ConnectionConfig, dbName string, tableName string) (string, string) {
schema, table := normalizeSchemaAndTable(config, dbName, tableName)
switch resolveDDLDBType(config) {
- case "kafka":
+ case "kafka", "rabbitmq":
return schema, table
case "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb":
rawTable := strings.TrimSpace(tableName)
diff --git a/internal/app/db_context_test.go b/internal/app/db_context_test.go
index 8b786bd..d98e560 100644
--- a/internal/app/db_context_test.go
+++ b/internal/app/db_context_test.go
@@ -332,6 +332,18 @@ func TestNormalizeSchemaAndTable_KafkaPreservesDottedTopicName(t *testing.T) {
}
}
+func TestNormalizeSchemaAndTable_RabbitMQPreservesDottedQueueName(t *testing.T) {
+ t.Parallel()
+
+ schemaOrDb, table := normalizeSchemaAndTable(connection.ConnectionConfig{
+ Type: "rabbitmq",
+ }, "/", "orders.events.v1")
+
+ if schemaOrDb != "/" || table != "orders.events.v1" {
+ t.Fatalf("expected rabbitmq queue to stay intact, got %q.%q", schemaOrDb, table)
+ }
+}
+
func TestNormalizeMetadataSchemaAndTable_KafkaPreservesDottedTopicName(t *testing.T) {
t.Parallel()
@@ -344,6 +356,18 @@ func TestNormalizeMetadataSchemaAndTable_KafkaPreservesDottedTopicName(t *testin
}
}
+func TestNormalizeMetadataSchemaAndTable_RabbitMQPreservesDottedQueueName(t *testing.T) {
+ t.Parallel()
+
+ schemaOrDb, table := normalizeMetadataSchemaAndTable(connection.ConnectionConfig{
+ Type: "rabbitmq",
+ }, "/", "logs.app-1")
+
+ if schemaOrDb != "/" || table != "logs.app-1" {
+ t.Fatalf("expected rabbitmq metadata queue to stay intact, got %q.%q", schemaOrDb, table)
+ }
+}
+
func TestQuoteTableIdentByType_KingbaseNormalizesQuotedQualifiedTable(t *testing.T) {
t.Parallel()
diff --git a/internal/app/methods_db.go b/internal/app/methods_db.go
index 55764e5..2796b95 100644
--- a/internal/app/methods_db.go
+++ b/internal/app/methods_db.go
@@ -315,8 +315,8 @@ func normalizeSchemaAndTableByType(dbType string, dbName string, tableName strin
return rawDB, rawTable
}
- // Elasticsearch:索引名可能含多个点,不能按点分割
- if dbType == "elasticsearch" || dbType == "kafka" {
+ // Elasticsearch / RabbitMQ / Kafka:对象名可能含多个点,不能按点分割
+ if dbType == "elasticsearch" || dbType == "kafka" || dbType == "rabbitmq" {
return rawDB, rawTable
}
diff --git a/internal/app/methods_db_create_statement_test.go b/internal/app/methods_db_create_statement_test.go
index 6488db3..08c73bb 100644
--- a/internal/app/methods_db_create_statement_test.go
+++ b/internal/app/methods_db_create_statement_test.go
@@ -166,6 +166,15 @@ func TestNormalizeSchemaAndTableByType_KafkaPreservesDottedTopicName(t *testing.
}
}
+func TestNormalizeSchemaAndTableByType_RabbitMQPreservesDottedQueueName(t *testing.T) {
+ t.Parallel()
+
+ schema, table := normalizeSchemaAndTableByType("rabbitmq", "/", "orders.events.v1")
+ if schema != "/" || table != "orders.events.v1" {
+ t.Fatalf("expected rabbitmq queue to stay intact, got %q.%q", schema, table)
+ }
+}
+
func TestBuildRunConfigForDDL_CustomHighGoUsesDatabase(t *testing.T) {
t.Parallel()
diff --git a/internal/app/methods_driver.go b/internal/app/methods_driver.go
index 73b505f..f0c575f 100644
--- a/internal/app/methods_driver.go
+++ b/internal/app/methods_driver.go
@@ -1434,6 +1434,8 @@ func normalizeDriverType(driverType string) string {
return "goldendb"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
+ case "rabbitmq", "rabbit-mq", "rabbit_mq":
+ return "rabbitmq"
case "intersystems", "intersystemsiris", "inter-systems-iris", "inter-systems":
return "iris"
default:
@@ -1504,6 +1506,7 @@ func allDriverDefinitionsWithPackages(packages map[string]pinnedDriverPackage) [
{Type: "redis", Name: "Redis", Engine: driverEngineGo, BuiltIn: true},
{Type: "postgres", Name: "PostgreSQL", Engine: driverEngineGo, BuiltIn: true},
{Type: "kafka", Name: "Kafka", Engine: driverEngineGo, BuiltIn: true},
+ {Type: "rabbitmq", Name: "RabbitMQ", Engine: driverEngineGo, BuiltIn: true},
// 其他数据源需要先在驱动管理中“安装启用”。
buildOptionalGoDriverDefinition("mariadb", "MariaDB", packages),
diff --git a/internal/app/methods_driver_version_test.go b/internal/app/methods_driver_version_test.go
index f13fd38..7181286 100644
--- a/internal/app/methods_driver_version_test.go
+++ b/internal/app/methods_driver_version_test.go
@@ -518,6 +518,22 @@ func TestKafkaDriverDefinitionIsBuiltIn(t *testing.T) {
}
}
+func TestRabbitMQDriverDefinitionIsBuiltIn(t *testing.T) {
+ definition, ok := resolveDriverDefinition("rabbit-mq")
+ if !ok {
+ t.Fatal("expected rabbitmq driver definition")
+ }
+ if definition.Name != "RabbitMQ" {
+ t.Fatalf("unexpected rabbitmq driver name: %q", definition.Name)
+ }
+ if !definition.BuiltIn {
+ t.Fatal("expected rabbitmq to be a built-in driver")
+ }
+ if definition.PinnedVersion != "" || definition.DefaultDownloadURL != "" {
+ t.Fatalf("expected rabbitmq builtin definition to omit optional-agent metadata: %#v", definition)
+ }
+}
+
func TestGoldenDBDriverDefinitionIsBuiltIn(t *testing.T) {
definition, ok := resolveDriverDefinition("greatdb")
if !ok {
diff --git a/internal/db/database.go b/internal/db/database.go
index c71c68e..ee24391 100644
--- a/internal/db/database.go
+++ b/internal/db/database.go
@@ -492,6 +492,9 @@ var databaseFactories = map[string]databaseFactory{
"kafka": func() Database {
return &KafkaDB{}
},
+ "rabbitmq": func() Database {
+ return &RabbitMQDB{}
+ },
}
func init() {
@@ -534,6 +537,8 @@ func normalizeDatabaseType(dbType string) string {
return "qdrant"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
+ case "rabbitmq", "rabbit-mq", "rabbit_mq":
+ return "rabbitmq"
default:
return normalized
}
diff --git a/internal/db/driver_support.go b/internal/db/driver_support.go
index f48283e..0744af6 100644
--- a/internal/db/driver_support.go
+++ b/internal/db/driver_support.go
@@ -20,6 +20,7 @@ var coreBuiltinDrivers = map[string]struct{}{
"chroma": {},
"qdrant": {},
"kafka": {},
+ "rabbitmq": {},
}
// optionalGoDrivers 表示需要用户“安装启用”后才能使用的纯 Go 驱动。
@@ -84,6 +85,8 @@ func normalizeRuntimeDriverType(driverType string) string {
return "iotdb"
case "kafka", "apache-kafka", "apache_kafka":
return "kafka"
+ case "rabbitmq", "rabbit-mq", "rabbit_mq":
+ return "rabbitmq"
default:
return normalized
}
@@ -147,6 +150,8 @@ func driverDisplayName(driverType string) string {
return "Qdrant"
case "kafka":
return "Kafka"
+ case "rabbitmq":
+ return "RabbitMQ"
default:
return strings.ToUpper(strings.TrimSpace(driverType))
}
diff --git a/internal/db/rabbitmq_impl.go b/internal/db/rabbitmq_impl.go
new file mode 100644
index 0000000..cdbff52
--- /dev/null
+++ b/internal/db/rabbitmq_impl.go
@@ -0,0 +1,1155 @@
+package db
+
+import (
+ "context"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "GoNavi-Wails/internal/connection"
+ "GoNavi-Wails/internal/logger"
+ proxytunnel "GoNavi-Wails/internal/proxy"
+ "GoNavi-Wails/internal/ssh"
+)
+
+const (
+ defaultRabbitMQPort = 15672
+ defaultRabbitMQQueryTimeout = 30 * time.Second
+ defaultRabbitMQPreviewLimit = 100
+ defaultRabbitMQPageSize = 200
+ maxRabbitMQPageSize = 500
+ rabbitMQDefaultVHost = "/"
+)
+
+type RabbitMQDB struct {
+ client *http.Client
+ baseURL string
+ defaultVHost string
+ defaultQueue string
+ defaultExchange string
+ pageSize int
+ authHeaders map[string]string
+ forwarder *ssh.LocalForwarder
+}
+
+func (r *RabbitMQDB) Connect(config connection.ConnectionConfig) error {
+ if r.forwarder != nil {
+ _ = r.forwarder.Close()
+ r.forwarder = nil
+ }
+ r.client = nil
+
+ runConfig := normalizeRabbitMQConfig(config)
+ if runConfig.UseSSH {
+ forwarder, err := ssh.GetOrCreateLocalForwarder(runConfig.SSH, runConfig.Host, runConfig.Port)
+ if err != nil {
+ return fmt.Errorf("创建 SSH 隧道失败:%w", err)
+ }
+ r.forwarder = forwarder
+
+ host, portText, err := net.SplitHostPort(forwarder.LocalAddr)
+ if err != nil {
+ return fmt.Errorf("解析本地转发地址失败:%w", err)
+ }
+ port, err := strconv.Atoi(portText)
+ if err != nil {
+ return fmt.Errorf("解析本地端口失败:%w", err)
+ }
+ runConfig.Host = host
+ runConfig.Port = port
+ runConfig.UseSSH = false
+ logger.Infof("RabbitMQ 通过本地端口转发连接:%s -> %s:%d", forwarder.LocalAddr, config.Host, config.Port)
+ }
+
+ params := rabbitmqConnectionParams(runConfig)
+ r.baseURL = buildRabbitMQBaseURL(runConfig)
+ r.defaultVHost = rabbitmqResolveVHost(runConfig.Database, "")
+ r.defaultQueue = strings.TrimSpace(firstNonEmpty(params.Get("defaultQueue"), params.Get("queue")))
+ r.defaultExchange = rabbitmqNormalizeExchangeName(firstNonEmpty(params.Get("defaultExchange"), params.Get("exchange")), "")
+ r.pageSize = rabbitmqPageSize(params)
+ r.authHeaders = rabbitmqAuthHeaders(runConfig)
+ r.client = buildRabbitMQHTTPClient(runConfig)
+
+ if err := r.Ping(); err != nil {
+ _ = r.Close()
+ return err
+ }
+ return nil
+}
+
+func (r *RabbitMQDB) Close() error {
+ if r.forwarder != nil {
+ if err := r.forwarder.Close(); err != nil {
+ logger.Warnf("关闭 RabbitMQ SSH 端口转发失败:%v", err)
+ }
+ r.forwarder = nil
+ }
+ r.client = nil
+ r.baseURL = ""
+ r.defaultVHost = ""
+ r.defaultQueue = ""
+ r.defaultExchange = ""
+ r.pageSize = 0
+ r.authHeaders = nil
+ return nil
+}
+
+func (r *RabbitMQDB) Ping() error {
+ if r.client == nil {
+ return fmt.Errorf("连接未打开")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ return r.doJSON(ctx, http.MethodGet, "/api/overview", nil, nil)
+}
+
+func (r *RabbitMQDB) Query(query string) ([]map[string]interface{}, []string, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), defaultRabbitMQQueryTimeout)
+ defer cancel()
+ return r.QueryContext(ctx, query)
+}
+
+func (r *RabbitMQDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
+ if r.client == nil {
+ return nil, nil, fmt.Errorf("连接未打开")
+ }
+ text := strings.TrimSpace(query)
+ if text == "" {
+ return nil, nil, fmt.Errorf("查询语句不能为空")
+ }
+
+ parsed, ok := parseRabbitMQSQL(text)
+ if !ok {
+ return nil, nil, fmt.Errorf("RabbitMQ 查询仅支持 SHOW VHOSTS、SHOW QUEUES、SHOW EXCHANGES、DESCRIBE QUEUE、DESCRIBE EXCHANGE、SELECT * FROM queue 与 CONSUME FROM queue")
+ }
+
+ switch parsed.Action {
+ case "show_vhosts":
+ items, err := r.listVHosts(ctx, parsed.Limit)
+ if err != nil {
+ return nil, nil, err
+ }
+ rows := rabbitmqVHostRows(items)
+ return rows, collectColumns(rows), nil
+ case "show_queues":
+ vhost := rabbitmqResolveVHost(parsed.VHost, r.defaultVHost)
+ items, err := r.listQueues(ctx, vhost, parsed.Limit)
+ if err != nil {
+ return nil, nil, err
+ }
+ rows := rabbitmqQueueRows(items)
+ return rows, collectColumns(rows), nil
+ case "show_exchanges":
+ vhost := rabbitmqResolveVHost(parsed.VHost, r.defaultVHost)
+ items, err := r.listExchanges(ctx, vhost, parsed.Limit)
+ if err != nil {
+ return nil, nil, err
+ }
+ rows := rabbitmqExchangeRows(items)
+ return rows, collectColumns(rows), nil
+ case "describe_queue":
+ vhost := rabbitmqResolveVHost(parsed.VHost, r.defaultVHost)
+ queue := rabbitmqResolveQueue(parsed.Name, r.defaultQueue)
+ if queue == "" {
+ return nil, nil, fmt.Errorf("RabbitMQ queue 不能为空")
+ }
+ info, err := r.getQueueInfo(ctx, vhost, queue)
+ if err != nil {
+ return nil, nil, err
+ }
+ rows := []map[string]interface{}{rabbitmqQueueRow(info)}
+ return rows, collectColumns(rows), nil
+ case "describe_exchange":
+ vhost := rabbitmqResolveVHost(parsed.VHost, r.defaultVHost)
+ exchange := rabbitmqNormalizeExchangeName(parsed.Name, r.defaultExchange)
+ info, err := r.getExchangeInfo(ctx, vhost, exchange)
+ if err != nil {
+ return nil, nil, err
+ }
+ rows := []map[string]interface{}{rabbitmqExchangeRow(info)}
+ return rows, collectColumns(rows), nil
+ case "select", "consume":
+ vhost := rabbitmqResolveVHost(parsed.VHost, r.defaultVHost)
+ queue := rabbitmqResolveQueue(parsed.Name, r.defaultQueue)
+ if queue == "" {
+ return nil, nil, fmt.Errorf("RabbitMQ queue 不能为空")
+ }
+ if parsed.Count {
+ info, err := r.getQueueInfo(ctx, vhost, queue)
+ if err != nil {
+ return nil, nil, err
+ }
+ return []map[string]interface{}{{
+ "vhost": vhost,
+ "queue": queue,
+ "total": intFromAny(info["messages"], 0),
+ "ready": intFromAny(info["messages_ready"], 0),
+ "unacked": intFromAny(info["messages_unacknowledged"], 0),
+ }}, []string{"vhost", "queue", "total", "ready", "unacked"}, nil
+ }
+
+ fetchLimit := parsed.Limit + parsed.Offset
+ if fetchLimit <= 0 {
+ fetchLimit = defaultRabbitMQPreviewLimit
+ }
+ items, err := r.getQueueMessages(ctx, vhost, queue, fetchLimit)
+ if err != nil {
+ return nil, nil, err
+ }
+ if parsed.Offset > 0 {
+ if parsed.Offset >= len(items) {
+ items = nil
+ } else {
+ items = items[parsed.Offset:]
+ }
+ }
+ if parsed.Limit > 0 && len(items) > parsed.Limit {
+ items = items[:parsed.Limit]
+ }
+ rows := rabbitmqMessageRows(vhost, queue, items)
+ return rows, collectColumns(rows), nil
+ default:
+ return nil, nil, fmt.Errorf("未实现的 RabbitMQ 查询类型:%s", parsed.Action)
+ }
+}
+
+func (r *RabbitMQDB) Exec(query string) (int64, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), defaultRabbitMQQueryTimeout)
+ defer cancel()
+ return r.ExecContext(ctx, query)
+}
+
+func (r *RabbitMQDB) ExecContext(ctx context.Context, query string) (int64, error) {
+ if r.client == nil {
+ return 0, fmt.Errorf("连接未打开")
+ }
+ var cmd map[string]interface{}
+ if err := decodeJSONWithUseNumber([]byte(strings.TrimSpace(query)), &cmd); err != nil {
+ return 0, fmt.Errorf("RabbitMQ 写入命令必须是 JSON:%w", err)
+ }
+ if !hasAnyKey(cmd, "publish", "queue", "destination") {
+ return 0, fmt.Errorf("RabbitMQ JSON 写入命令仅支持 publish/queue/destination 形式的消息发送")
+ }
+
+ vhost := rabbitmqResolveVHost(firstStringValue(cmd, "vhost", "database"), r.defaultVHost)
+ queue := rabbitmqResolveQueue(firstStringValue(cmd, "publish", "queue", "destination"), r.defaultQueue)
+ if queue == "" {
+ return 0, fmt.Errorf("RabbitMQ publish 命令缺少 queue")
+ }
+ exchange := rabbitmqNormalizeExchangeName(firstStringValue(cmd, "exchange"), r.defaultExchange)
+ routingKey := strings.TrimSpace(firstNonEmpty(firstStringValue(cmd, "routing_key", "routingKey", "route", "routing"), queue))
+ if routingKey == "" {
+ return 0, fmt.Errorf("RabbitMQ publish 命令缺少 routing_key")
+ }
+ if !hasAnyKey(cmd, "payload", "value", "body", "message") {
+ return 0, fmt.Errorf("RabbitMQ publish 命令缺少 payload")
+ }
+ payload := firstExisting(cmd, "payload", "value", "body", "message")
+
+ properties, err := rabbitmqMapPayload(firstExisting(cmd, "properties", "props"))
+ if err != nil {
+ return 0, fmt.Errorf("RabbitMQ properties 必须是 JSON 对象:%w", err)
+ }
+ headers, err := rabbitmqMapPayload(firstExisting(cmd, "headers"))
+ if err != nil {
+ return 0, fmt.Errorf("RabbitMQ headers 必须是 JSON 对象:%w", err)
+ }
+ if len(headers) > 0 {
+ if properties == nil {
+ properties = map[string]interface{}{}
+ }
+ existingHeaders, err := rabbitmqMapPayload(firstExisting(properties, "headers"))
+ if err != nil {
+ return 0, fmt.Errorf("RabbitMQ properties.headers 必须是 JSON 对象:%w", err)
+ }
+ if existingHeaders == nil {
+ existingHeaders = map[string]interface{}{}
+ }
+ for key, value := range headers {
+ existingHeaders[key] = value
+ }
+ properties["headers"] = existingHeaders
+ }
+
+ return r.publishMessage(ctx, vhost, exchange, routingKey, payload, properties)
+}
+
+func (r *RabbitMQDB) GetDatabases() ([]string, error) {
+ if r.client == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ items, err := r.listVHosts(ctx, 0)
+ if err != nil {
+ return nil, err
+ }
+ names := make([]string, 0, len(items))
+ for _, item := range items {
+ if name := mapString(item, "name"); name != "" {
+ names = append(names, name)
+ }
+ }
+ if len(names) == 0 {
+ names = append(names, rabbitmqResolveVHost("", r.defaultVHost))
+ }
+ sort.Strings(names)
+ return names, nil
+}
+
+func (r *RabbitMQDB) GetTables(dbName string) ([]string, error) {
+ if r.client == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ vhost := rabbitmqResolveVHost(dbName, r.defaultVHost)
+ items, err := r.listQueues(ctx, vhost, 0)
+ if err != nil {
+ return nil, err
+ }
+ names := make([]string, 0, len(items))
+ for _, item := range items {
+ if name := mapString(item, "name"); name != "" {
+ names = append(names, name)
+ }
+ }
+ sort.Strings(names)
+ return names, nil
+}
+
+func (r *RabbitMQDB) GetCreateStatement(dbName, tableName string) (string, error) {
+ if r.client == nil {
+ return "", fmt.Errorf("连接未打开")
+ }
+ vhost := rabbitmqResolveVHost(dbName, r.defaultVHost)
+ queue := rabbitmqResolveQueue(tableName, r.defaultQueue)
+ if queue == "" {
+ return "", fmt.Errorf("RabbitMQ queue 不能为空")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ info, err := r.getQueueInfo(ctx, vhost, queue)
+ if err != nil {
+ return "", err
+ }
+ payload, _ := json.MarshalIndent(info, "", " ")
+ return fmt.Sprintf("// RabbitMQ queue: %s @ %s\n%s", queue, vhost, string(payload)), nil
+}
+
+func (r *RabbitMQDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) {
+ if r.client == nil {
+ return nil, fmt.Errorf("连接未打开")
+ }
+ vhost := rabbitmqResolveVHost(dbName, r.defaultVHost)
+ queue := rabbitmqResolveQueue(tableName, r.defaultQueue)
+ if queue == "" {
+ return nil, fmt.Errorf("RabbitMQ queue 不能为空")
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ items, err := r.getQueueMessages(ctx, vhost, queue, 20)
+ if err != nil {
+ return nil, err
+ }
+ rows := rabbitmqMessageRows(vhost, queue, items)
+ columns := []connection.ColumnDefinition{
+ {Name: "vhost", Type: "string", Nullable: "NO", Comment: "RabbitMQ virtual host"},
+ {Name: "queue", Type: "string", Nullable: "NO", Key: "PRI", Comment: "RabbitMQ queue"},
+ {Name: "exchange", Type: "string", Nullable: "YES", Comment: "Exchange used for routing"},
+ {Name: "routing_key", Type: "string", Nullable: "YES", Comment: "RabbitMQ routing key"},
+ {Name: "redelivered", Type: "bool", Nullable: "YES", Comment: "Whether the message was redelivered"},
+ {Name: "message_count", Type: "int", Nullable: "YES", Comment: "Remaining messages after this delivery"},
+ {Name: "payload", Type: "json", Nullable: "YES", Comment: "Message payload"},
+ {Name: "payload_encoding", Type: "string", Nullable: "YES", Comment: "RabbitMQ payload encoding"},
+ {Name: "payload_bytes", Type: "int", Nullable: "YES", Comment: "Payload size in bytes"},
+ {Name: "properties", Type: "json", Nullable: "YES", Comment: "AMQP properties"},
+ {Name: "headers", Type: "json", Nullable: "YES", Comment: "AMQP headers"},
+ }
+ seen := map[string]struct{}{
+ "vhost": {}, "queue": {}, "exchange": {}, "routing_key": {}, "redelivered": {},
+ "message_count": {}, "payload": {}, "payload_encoding": {}, "payload_bytes": {},
+ "properties": {}, "headers": {},
+ }
+ for _, row := range rows {
+ for key, value := range row {
+ if _, exists := seen[key]; exists {
+ continue
+ }
+ if !strings.HasPrefix(key, "payload.") && !strings.HasPrefix(key, "properties.") && !strings.HasPrefix(key, "headers.") {
+ continue
+ }
+ seen[key] = struct{}{}
+ columns = append(columns, connection.ColumnDefinition{
+ Name: key,
+ Type: inferChromaValueType(value),
+ Nullable: "YES",
+ Comment: "Derived RabbitMQ field",
+ })
+ }
+ }
+ return columns, nil
+}
+
+func (r *RabbitMQDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) {
+ tables, err := r.GetTables(dbName)
+ if err != nil {
+ return nil, err
+ }
+ var result []connection.ColumnDefinitionWithTable
+ for _, table := range tables {
+ columns, err := r.GetColumns(dbName, table)
+ if err != nil {
+ return nil, err
+ }
+ for _, column := range columns {
+ result = append(result, connection.ColumnDefinitionWithTable{
+ TableName: table,
+ Name: column.Name,
+ Type: column.Type,
+ Comment: column.Comment,
+ })
+ }
+ }
+ return result, nil
+}
+
+func (r *RabbitMQDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) {
+ return []connection.IndexDefinition{}, nil
+}
+
+func (r *RabbitMQDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) {
+ return []connection.ForeignKeyDefinition{}, nil
+}
+
+func (r *RabbitMQDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) {
+ return []connection.TriggerDefinition{}, nil
+}
+
+func (r *RabbitMQDB) ApplyChanges(tableName string, changes connection.ChangeSet) error {
+ if len(changes.Inserts) == 0 && len(changes.Updates) == 0 && len(changes.Deletes) == 0 {
+ return nil
+ }
+ return fmt.Errorf("RabbitMQ 结果集仅支持只读预览;如需写入请在 SQL 编辑器执行 JSON publish 命令")
+}
+
+func normalizeRabbitMQConfig(config connection.ConnectionConfig) connection.ConnectionConfig {
+ runConfig := applyRabbitMQURI(config)
+ if strings.TrimSpace(runConfig.Host) == "" {
+ runConfig.Host = "localhost"
+ }
+ if runConfig.Port <= 0 {
+ runConfig.Port = defaultRabbitMQPort
+ }
+ params := rabbitmqConnectionParams(runConfig)
+ if rabbitmqBoolValue(firstNonEmpty(params.Get("ssl"), params.Get("tls"), params.Get("useSSL"), params.Get("use_ssl"))) {
+ runConfig.UseSSL = true
+ }
+ if strings.TrimSpace(runConfig.SSLMode) == "" && runConfig.UseSSL {
+ if rabbitmqBoolValue(firstNonEmpty(params.Get("skip_verify"), params.Get("skipVerify"), params.Get("insecure"))) {
+ runConfig.SSLMode = "skip-verify"
+ } else {
+ runConfig.SSLMode = "required"
+ }
+ }
+ return runConfig
+}
+
+func applyRabbitMQURI(config connection.ConnectionConfig) connection.ConnectionConfig {
+ uriText := strings.TrimSpace(config.URI)
+ if uriText == "" {
+ return config
+ }
+ parsed, err := url.Parse(uriText)
+ if err != nil {
+ return config
+ }
+ scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme))
+ if scheme != "rabbitmq" && scheme != "http" && scheme != "https" {
+ return config
+ }
+ if parsed.User != nil {
+ if strings.TrimSpace(config.User) == "" {
+ config.User = parsed.User.Username()
+ }
+ if pass, ok := parsed.User.Password(); ok && config.Password == "" {
+ config.Password = pass
+ }
+ }
+ host, port, ok := parseHostPortWithDefault(parsed.Host, defaultRabbitMQPort)
+ if ok {
+ config.Host = host
+ config.Port = port
+ }
+ if vhost := rabbitmqDecodePathValue(parsed.Path); vhost != "" && strings.TrimSpace(config.Database) == "" {
+ config.Database = vhost
+ }
+ if scheme == "https" {
+ config.UseSSL = true
+ if strings.TrimSpace(config.SSLMode) == "" {
+ config.SSLMode = "required"
+ }
+ }
+ return config
+}
+
+func rabbitmqConnectionParams(config connection.ConnectionConfig) url.Values {
+ params := url.Values{}
+ mergeConnectionParamValues(params, connectionParamsFromURI(config.URI, "rabbitmq", "http", "https"))
+ mergeConnectionParamValues(params, connectionParamsFromText(config.ConnectionParams))
+ return params
+}
+
+func rabbitmqDecodePathValue(path string) string {
+ trimmed := strings.TrimPrefix(strings.TrimSpace(path), "/")
+ if trimmed == "" {
+ return ""
+ }
+ decoded, err := url.PathUnescape(trimmed)
+ if err != nil {
+ return trimmed
+ }
+ return decoded
+}
+
+func rabbitmqResolveVHost(raw string, fallback string) string {
+ if text := strings.TrimSpace(raw); text != "" {
+ return text
+ }
+ if text := strings.TrimSpace(fallback); text != "" {
+ return text
+ }
+ return rabbitMQDefaultVHost
+}
+
+func rabbitmqResolveQueue(raw string, fallback string) string {
+ if text := strings.TrimSpace(raw); text != "" {
+ return text
+ }
+ return strings.TrimSpace(fallback)
+}
+
+func rabbitmqNormalizeExchangeName(raw string, fallback string) string {
+ text := strings.TrimSpace(firstNonEmpty(raw, fallback))
+ switch text {
+ case "(default)", "amq.default":
+ return ""
+ default:
+ return text
+ }
+}
+
+func rabbitmqPageSize(params url.Values) int {
+ size := intFromAny(firstNonEmpty(params.Get("pageSize"), params.Get("page_size")), defaultRabbitMQPageSize)
+ if size <= 0 {
+ size = defaultRabbitMQPageSize
+ }
+ if size > maxRabbitMQPageSize {
+ size = maxRabbitMQPageSize
+ }
+ return size
+}
+
+func buildRabbitMQBaseURL(config connection.ConnectionConfig) string {
+ scheme := "http"
+ if config.UseSSL {
+ scheme = "https"
+ }
+ params := rabbitmqConnectionParams(config)
+ prefix := strings.TrimSpace(firstNonEmpty(params.Get("managementPathPrefix"), params.Get("pathPrefix")))
+ if prefix != "" {
+ prefix = "/" + strings.Trim(strings.TrimSpace(prefix), "/")
+ }
+ return (&url.URL{
+ Scheme: scheme,
+ Host: net.JoinHostPort(strings.TrimSpace(config.Host), strconv.Itoa(config.Port)),
+ Path: prefix,
+ }).String()
+}
+
+func buildRabbitMQHTTPClient(config connection.ConnectionConfig) *http.Client {
+ transport := http.DefaultTransport.(*http.Transport).Clone()
+ if tlsConfig, err := resolveGenericTLSConfig(config); err == nil && tlsConfig != nil {
+ transport.TLSClientConfig = tlsConfig
+ }
+ if config.UseProxy {
+ proxyCfg := config.Proxy
+ transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
+ return proxytunnel.DialContext(ctx, proxyCfg, network, addr)
+ }
+ }
+ return &http.Client{Transport: transport, Timeout: getConnectTimeout(config)}
+}
+
+func rabbitmqAuthHeaders(config connection.ConnectionConfig) map[string]string {
+ headers := map[string]string{}
+ if user := strings.TrimSpace(config.User); user != "" {
+ raw := user + ":" + config.Password
+ headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(raw))
+ }
+ params := rabbitmqConnectionParams(config)
+ if headerName := strings.TrimSpace(params.Get("authHeader")); headerName != "" {
+ if headerValue := strings.TrimSpace(params.Get("authHeaderValue")); headerValue != "" && isSafeConnectionParamKey(headerName) {
+ headers[headerName] = headerValue
+ }
+ }
+ return headers
+}
+
+func (r *RabbitMQDB) doJSON(ctx context.Context, method, path string, body interface{}, out interface{}) error {
+ if r.client == nil {
+ return fmt.Errorf("连接未打开")
+ }
+ var payload io.Reader
+ if body != nil {
+ data, err := json.Marshal(body)
+ if err != nil {
+ return err
+ }
+ payload = strings.NewReader(string(data))
+ }
+ req, err := http.NewRequestWithContext(ctx, method, strings.TrimRight(r.baseURL, "/")+path, payload)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Accept", "application/json")
+ if body != nil {
+ req.Header.Set("Content-Type", "application/json")
+ }
+ for key, value := range r.authHeaders {
+ req.Header.Set(key, value)
+ }
+ resp, err := r.client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ data, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ message := strings.TrimSpace(string(data))
+ var errBody map[string]interface{}
+ if decodeJSONWithUseNumber(data, &errBody) == nil {
+ message = strings.TrimSpace(firstNonEmpty(
+ mapString(errBody, "error"),
+ mapString(errBody, "reason"),
+ mapString(errBody, "message"),
+ message,
+ ))
+ }
+ if message == "" {
+ message = resp.Status
+ }
+ return fmt.Errorf("RabbitMQ HTTP API %s %s 失败:%s", method, path, message)
+ }
+ if out == nil || len(data) == 0 {
+ return nil
+ }
+ if err := decodeJSONWithUseNumber(data, out); err != nil {
+ return fmt.Errorf("解析 RabbitMQ HTTP API 响应失败:%w", err)
+ }
+ return nil
+}
+
+func (r *RabbitMQDB) listVHosts(ctx context.Context, limit int) ([]map[string]interface{}, error) {
+ return r.listCollection(ctx, "/api/vhosts", limit, nil)
+}
+
+func (r *RabbitMQDB) listQueues(ctx context.Context, vhost string, limit int) ([]map[string]interface{}, error) {
+ params := url.Values{}
+ params.Set("disable_stats", "true")
+ params.Set("enable_queue_totals", "true")
+ return r.listCollection(ctx, fmt.Sprintf("/api/queues/%s", url.PathEscape(vhost)), limit, params)
+}
+
+func (r *RabbitMQDB) listExchanges(ctx context.Context, vhost string, limit int) ([]map[string]interface{}, error) {
+ return r.listCollection(ctx, fmt.Sprintf("/api/exchanges/%s", url.PathEscape(vhost)), limit, nil)
+}
+
+func (r *RabbitMQDB) listCollection(ctx context.Context, path string, limit int, extraParams url.Values) ([]map[string]interface{}, error) {
+ pageSize := r.pageSize
+ if pageSize <= 0 {
+ pageSize = defaultRabbitMQPageSize
+ }
+ if limit > 0 && limit < pageSize {
+ pageSize = limit
+ }
+ if pageSize <= 0 {
+ pageSize = defaultRabbitMQPageSize
+ }
+ if pageSize > maxRabbitMQPageSize {
+ pageSize = maxRabbitMQPageSize
+ }
+
+ var result []map[string]interface{}
+ for page := 1; ; page++ {
+ query := url.Values{}
+ for key, values := range extraParams {
+ for _, value := range values {
+ query.Add(key, value)
+ }
+ }
+ query.Set("page", strconv.Itoa(page))
+ query.Set("page_size", strconv.Itoa(pageSize))
+ query.Set("pagination", "true")
+
+ requestPath := path
+ if encoded := query.Encode(); encoded != "" {
+ requestPath += "?" + encoded
+ }
+
+ var raw interface{}
+ if err := r.doJSON(ctx, http.MethodGet, requestPath, nil, &raw); err != nil {
+ return nil, err
+ }
+ items, pageCount, err := rabbitmqItemsFromResponse(raw)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, items...)
+ if limit > 0 && len(result) >= limit {
+ return result[:limit], nil
+ }
+ if pageCount <= page || len(items) == 0 {
+ break
+ }
+ }
+ return result, nil
+}
+
+func rabbitmqItemsFromResponse(raw interface{}) ([]map[string]interface{}, int, error) {
+ switch typed := raw.(type) {
+ case []interface{}:
+ return rabbitmqMapSlice(typed)
+ case []map[string]interface{}:
+ return typed, 1, nil
+ case map[string]interface{}:
+ itemsRaw, ok := typed["items"]
+ if !ok {
+ return nil, 0, fmt.Errorf("RabbitMQ 列表响应缺少 items 字段")
+ }
+ items, _, err := rabbitmqItemsFromResponse(itemsRaw)
+ if err != nil {
+ return nil, 0, err
+ }
+ return items, intFromAny(typed["page_count"], 1), nil
+ default:
+ return nil, 0, fmt.Errorf("无法解析 RabbitMQ 列表响应")
+ }
+}
+
+func rabbitmqMapSlice(raw []interface{}) ([]map[string]interface{}, int, error) {
+ result := make([]map[string]interface{}, 0, len(raw))
+ for _, item := range raw {
+ row, ok := item.(map[string]interface{})
+ if !ok {
+ return nil, 0, fmt.Errorf("RabbitMQ 列表项不是对象")
+ }
+ result = append(result, row)
+ }
+ return result, 1, nil
+}
+
+func (r *RabbitMQDB) getQueueInfo(ctx context.Context, vhost string, queue string) (map[string]interface{}, error) {
+ params := url.Values{}
+ params.Set("disable_stats", "true")
+ params.Set("enable_queue_totals", "true")
+ path := fmt.Sprintf("/api/queues/%s/%s?%s", url.PathEscape(vhost), url.PathEscape(queue), params.Encode())
+ var info map[string]interface{}
+ if err := r.doJSON(ctx, http.MethodGet, path, nil, &info); err != nil {
+ return nil, err
+ }
+ return info, nil
+}
+
+func (r *RabbitMQDB) getExchangeInfo(ctx context.Context, vhost string, exchange string) (map[string]interface{}, error) {
+ path := fmt.Sprintf("/api/exchanges/%s/%s", url.PathEscape(vhost), url.PathEscape(exchange))
+ var info map[string]interface{}
+ if err := r.doJSON(ctx, http.MethodGet, path, nil, &info); err != nil {
+ return nil, err
+ }
+ return info, nil
+}
+
+func (r *RabbitMQDB) getQueueMessages(ctx context.Context, vhost string, queue string, limit int) ([]map[string]interface{}, error) {
+ if limit <= 0 {
+ limit = defaultRabbitMQPreviewLimit
+ }
+ body := map[string]interface{}{
+ "count": limit,
+ "ackmode": "ack_requeue_true",
+ "encoding": "auto",
+ "truncate": 50000,
+ }
+ path := fmt.Sprintf("/api/queues/%s/%s/get", url.PathEscape(vhost), url.PathEscape(queue))
+ var result []map[string]interface{}
+ if err := r.doJSON(ctx, http.MethodPost, path, body, &result); err != nil {
+ return nil, err
+ }
+ return result, nil
+}
+
+func (r *RabbitMQDB) publishMessage(ctx context.Context, vhost string, exchange string, routingKey string, payload interface{}, properties map[string]interface{}) (int64, error) {
+ payloadText, encoding, err := rabbitmqEncodePayload(payload)
+ if err != nil {
+ return 0, err
+ }
+ if properties == nil {
+ properties = map[string]interface{}{}
+ }
+ if _, exists := properties["content_type"]; !exists {
+ switch payload.(type) {
+ case map[string]interface{}, []interface{}:
+ properties["content_type"] = "application/json"
+ }
+ }
+ body := map[string]interface{}{
+ "properties": properties,
+ "routing_key": routingKey,
+ "payload": payloadText,
+ "payload_encoding": encoding,
+ }
+ path := fmt.Sprintf("/api/exchanges/%s/%s/publish", url.PathEscape(vhost), url.PathEscape(exchange))
+ var result map[string]interface{}
+ if err := r.doJSON(ctx, http.MethodPut, path, body, &result); err != nil {
+ return 0, err
+ }
+ if !rabbitmqBoolAny(result["routed"]) {
+ return 0, fmt.Errorf("RabbitMQ publish 未路由到任何队列")
+ }
+ return 1, nil
+}
+
+type rabbitmqParsedSQL struct {
+ Action string
+ VHost string
+ Name string
+ Limit int
+ Offset int
+ Count bool
+}
+
+var (
+ rabbitmqSQLFromRE = regexp.MustCompile(`(?i)\bFROM\s+(?:"([^"]*)"|` + "`" + `([^` + "`" + `]*)` + "`" + `|([^\s;]+))`)
+ rabbitmqSQLLimitRE = regexp.MustCompile(`(?i)\bLIMIT\s+(\d+)`)
+ rabbitmqSQLOffsetRE = regexp.MustCompile(`(?i)\bOFFSET\s+(\d+)`)
+ rabbitmqShowVHostsRE = regexp.MustCompile(`(?i)^\s*SHOW\s+VHOSTS(?:\s+LIMIT\s+(\d+))?\s*$`)
+ rabbitmqShowQueuesRE = regexp.MustCompile(`(?i)^\s*SHOW\s+QUEUES(?:\s+LIMIT\s+(\d+))?\s*$`)
+ rabbitmqShowExchangesRE = regexp.MustCompile(`(?i)^\s*SHOW\s+EXCHANGES(?:\s+LIMIT\s+(\d+))?\s*$`)
+ rabbitmqDescribeQueueRE = regexp.MustCompile(`(?i)^\s*(?:SHOW|DESCRIBE)\s+QUEUE\s+(?:"([^"]*)"|` + "`" + `([^` + "`" + `]*)` + "`" + `|([^\s;]+))\s*$`)
+ rabbitmqDescribeExchangeRE = regexp.MustCompile(`(?i)^\s*(?:SHOW|DESCRIBE)\s+EXCHANGE\s+(?:"([^"]*)"|` + "`" + `([^` + "`" + `]*)` + "`" + `|([^\s;]+))\s*$`)
+ rabbitmqConsumeQueueRE = regexp.MustCompile(`(?i)^\s*CONSUME\s+FROM\s+(?:"([^"]*)"|` + "`" + `([^` + "`" + `]*)` + "`" + `|([^\s;]+))`)
+)
+
+func parseRabbitMQSQL(sqlText string) (rabbitmqParsedSQL, bool) {
+ text := strings.TrimSpace(sqlText)
+ if text == "" {
+ return rabbitmqParsedSQL{}, false
+ }
+ if matches := rabbitmqShowVHostsRE.FindStringSubmatch(text); len(matches) > 0 {
+ return rabbitmqParsedSQL{Action: "show_vhosts", Limit: rabbitmqMatchLimit(matches, 1)}, true
+ }
+ if matches := rabbitmqShowQueuesRE.FindStringSubmatch(text); len(matches) > 0 {
+ return rabbitmqParsedSQL{Action: "show_queues", Limit: rabbitmqMatchLimit(matches, 1)}, true
+ }
+ if matches := rabbitmqShowExchangesRE.FindStringSubmatch(text); len(matches) > 0 {
+ return rabbitmqParsedSQL{Action: "show_exchanges", Limit: rabbitmqMatchLimit(matches, 1)}, true
+ }
+ if matches := rabbitmqDescribeQueueRE.FindStringSubmatch(text); len(matches) > 0 {
+ return rabbitmqParsedSQL{
+ Action: "describe_queue",
+ Name: firstNonEmpty(matches[1], matches[2], matches[3]),
+ }, true
+ }
+ if matches := rabbitmqDescribeExchangeRE.FindStringSubmatch(text); len(matches) > 0 {
+ return rabbitmqParsedSQL{
+ Action: "describe_exchange",
+ Name: firstNonEmpty(matches[1], matches[2], matches[3]),
+ }, true
+ }
+ if matches := rabbitmqConsumeQueueRE.FindStringSubmatch(text); len(matches) > 0 {
+ parsed := rabbitmqParsedSQL{
+ Action: "consume",
+ Name: firstNonEmpty(matches[1], matches[2], matches[3]),
+ Limit: defaultRabbitMQPreviewLimit,
+ }
+ if limitMatch := rabbitmqSQLLimitRE.FindStringSubmatch(text); len(limitMatch) > 1 {
+ parsed.Limit, _ = strconv.Atoi(limitMatch[1])
+ }
+ if offsetMatch := rabbitmqSQLOffsetRE.FindStringSubmatch(text); len(offsetMatch) > 1 {
+ parsed.Offset, _ = strconv.Atoi(offsetMatch[1])
+ }
+ return parsed, true
+ }
+ if !strings.HasPrefix(strings.ToLower(text), "select") {
+ return rabbitmqParsedSQL{}, false
+ }
+ matches := rabbitmqSQLFromRE.FindStringSubmatch(text)
+ if len(matches) == 0 {
+ return rabbitmqParsedSQL{}, false
+ }
+ parsed := rabbitmqParsedSQL{
+ Action: "select",
+ Name: firstNonEmpty(matches[1], matches[2], matches[3]),
+ Limit: defaultRabbitMQPreviewLimit,
+ Count: strings.Contains(strings.ToLower(text), "count("),
+ }
+ if limitMatch := rabbitmqSQLLimitRE.FindStringSubmatch(text); len(limitMatch) > 1 {
+ parsed.Limit, _ = strconv.Atoi(limitMatch[1])
+ }
+ if offsetMatch := rabbitmqSQLOffsetRE.FindStringSubmatch(text); len(offsetMatch) > 1 {
+ parsed.Offset, _ = strconv.Atoi(offsetMatch[1])
+ }
+ return parsed, true
+}
+
+func rabbitmqMatchLimit(matches []string, index int) int {
+ if index >= len(matches) || strings.TrimSpace(matches[index]) == "" {
+ return 0
+ }
+ limit, _ := strconv.Atoi(matches[index])
+ return limit
+}
+
+func rabbitmqVHostRows(items []map[string]interface{}) []map[string]interface{} {
+ rows := make([]map[string]interface{}, 0, len(items))
+ for _, item := range items {
+ row := map[string]interface{}{
+ "vhost": mapString(item, "name"),
+ "tracing": rabbitmqBoolAny(item["tracing"]),
+ }
+ if desc := mapString(item, "description"); desc != "" {
+ row["description"] = desc
+ }
+ if tags := mapString(item, "tags"); tags != "" {
+ row["tags"] = tags
+ }
+ if value := mapString(item, "default_queue_type"); value != "" {
+ row["default_queue_type"] = value
+ }
+ if state, ok := item["cluster_state"].(map[string]interface{}); ok && len(state) > 0 {
+ row["cluster_state"] = state
+ }
+ rows = append(rows, row)
+ }
+ return rows
+}
+
+func rabbitmqQueueRows(items []map[string]interface{}) []map[string]interface{} {
+ rows := make([]map[string]interface{}, 0, len(items))
+ for _, item := range items {
+ rows = append(rows, rabbitmqQueueRow(item))
+ }
+ return rows
+}
+
+func rabbitmqQueueRow(item map[string]interface{}) map[string]interface{} {
+ row := map[string]interface{}{
+ "vhost": mapString(item, "vhost"),
+ "queue": mapString(item, "name"),
+ "durable": rabbitmqBoolAny(item["durable"]),
+ "auto_delete": rabbitmqBoolAny(item["auto_delete"]),
+ "exclusive": rabbitmqBoolAny(item["exclusive"]),
+ "consumers": intFromAny(item["consumers"], 0),
+ "messages": intFromAny(item["messages"], 0),
+ "messages_ready": intFromAny(item["messages_ready"], 0),
+ "messages_unacknowledged": intFromAny(item["messages_unacknowledged"], 0),
+ }
+ if node := mapString(item, "node"); node != "" {
+ row["node"] = node
+ }
+ if state := mapString(item, "state"); state != "" {
+ row["state"] = state
+ }
+ if queueType := mapString(item, "type"); queueType != "" {
+ row["type"] = queueType
+ }
+ if args, ok := item["arguments"].(map[string]interface{}); ok && len(args) > 0 {
+ row["arguments"] = args
+ }
+ return row
+}
+
+func rabbitmqExchangeRows(items []map[string]interface{}) []map[string]interface{} {
+ rows := make([]map[string]interface{}, 0, len(items))
+ for _, item := range items {
+ rows = append(rows, rabbitmqExchangeRow(item))
+ }
+ return rows
+}
+
+func rabbitmqExchangeRow(item map[string]interface{}) map[string]interface{} {
+ name := mapString(item, "name")
+ row := map[string]interface{}{
+ "vhost": mapString(item, "vhost"),
+ "exchange": name,
+ "durable": rabbitmqBoolAny(item["durable"]),
+ "auto_delete": rabbitmqBoolAny(item["auto_delete"]),
+ "internal": rabbitmqBoolAny(item["internal"]),
+ }
+ if name == "" {
+ row["exchange_display"] = "(default)"
+ }
+ if exchangeType := mapString(item, "type"); exchangeType != "" {
+ row["type"] = exchangeType
+ }
+ if args, ok := item["arguments"].(map[string]interface{}); ok && len(args) > 0 {
+ row["arguments"] = args
+ }
+ return row
+}
+
+func rabbitmqMessageRows(vhost string, queue string, items []map[string]interface{}) []map[string]interface{} {
+ rows := make([]map[string]interface{}, 0, len(items))
+ for _, item := range items {
+ row := map[string]interface{}{
+ "vhost": vhost,
+ "queue": queue,
+ "exchange": mapString(item, "exchange"),
+ "routing_key": mapString(item, "routing_key"),
+ "redelivered": rabbitmqBoolAny(item["redelivered"]),
+ "message_count": intFromAny(item["message_count"], 0),
+ "payload_bytes": intFromAny(item["payload_bytes"], 0),
+ "payload_encoding": mapString(item, "payload_encoding"),
+ }
+ payload := rabbitmqDecodePayload(item["payload"], row["payload_encoding"])
+ if payload != nil {
+ row["payload"] = payload
+ if payloadMap, ok := payload.(map[string]interface{}); ok {
+ flattenRabbitMQMap("payload", payloadMap, row)
+ }
+ }
+ if properties, ok := item["properties"].(map[string]interface{}); ok && len(properties) > 0 {
+ row["properties"] = properties
+ flattenRabbitMQMap("properties", properties, row)
+ if headers, ok := properties["headers"].(map[string]interface{}); ok && len(headers) > 0 {
+ row["headers"] = headers
+ flattenRabbitMQMap("headers", headers, row)
+ }
+ }
+ rows = append(rows, row)
+ }
+ return rows
+}
+
+func flattenRabbitMQMap(prefix string, values map[string]interface{}, row map[string]interface{}) {
+ for key, value := range values {
+ if strings.TrimSpace(key) == "" {
+ continue
+ }
+ name := prefix + "." + key
+ row[name] = value
+ if nested, ok := value.(map[string]interface{}); ok {
+ flattenRabbitMQMap(name, nested, row)
+ }
+ }
+}
+
+func rabbitmqDecodePayload(raw interface{}, encodingValue interface{}) interface{} {
+ switch value := raw.(type) {
+ case nil:
+ return nil
+ case string:
+ encoding := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%v", encodingValue)))
+ if encoding == "base64" {
+ data, err := base64.StdEncoding.DecodeString(value)
+ if err == nil {
+ var decoded interface{}
+ if decodeJSONWithUseNumber(data, &decoded) == nil {
+ return decoded
+ }
+ return bytesToDisplayValue(data, "")
+ }
+ }
+ var decoded interface{}
+ if decodeJSONWithUseNumber([]byte(value), &decoded) == nil {
+ return decoded
+ }
+ return value
+ default:
+ return value
+ }
+}
+
+func rabbitmqEncodePayload(payload interface{}) (string, string, error) {
+ switch typed := payload.(type) {
+ case nil:
+ return "", "string", nil
+ case string:
+ return typed, "string", nil
+ case []byte:
+ return base64.StdEncoding.EncodeToString(typed), "base64", nil
+ case json.Number:
+ return typed.String(), "string", nil
+ case bool, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
+ return fmt.Sprintf("%v", typed), "string", nil
+ case map[string]interface{}, []interface{}:
+ data, err := json.Marshal(typed)
+ if err != nil {
+ return "", "", err
+ }
+ return string(data), "string", nil
+ default:
+ data, err := json.Marshal(typed)
+ if err != nil {
+ return "", "", err
+ }
+ return string(data), "string", nil
+ }
+}
+
+func rabbitmqMapPayload(raw interface{}) (map[string]interface{}, error) {
+ if raw == nil {
+ return nil, nil
+ }
+ value, ok := raw.(map[string]interface{})
+ if !ok {
+ return nil, fmt.Errorf("不是对象")
+ }
+ return value, nil
+}
+
+func rabbitmqBoolValue(raw string) bool {
+ switch strings.ToLower(strings.TrimSpace(raw)) {
+ case "1", "true", "yes", "on", "required":
+ return true
+ default:
+ return false
+ }
+}
+
+func rabbitmqBoolAny(raw interface{}) bool {
+ switch value := raw.(type) {
+ case bool:
+ return value
+ case json.Number:
+ n, err := value.Int64()
+ return err == nil && n != 0
+ case float64:
+ return value != 0
+ case int:
+ return value != 0
+ case int64:
+ return value != 0
+ case string:
+ return rabbitmqBoolValue(value)
+ default:
+ return false
+ }
+}
diff --git a/internal/db/rabbitmq_impl_test.go b/internal/db/rabbitmq_impl_test.go
new file mode 100644
index 0000000..bde9b46
--- /dev/null
+++ b/internal/db/rabbitmq_impl_test.go
@@ -0,0 +1,258 @@
+package db
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "reflect"
+ "strings"
+ "testing"
+
+ "GoNavi-Wails/internal/connection"
+)
+
+func TestNormalizeRabbitMQConfigParsesURIAndParams(t *testing.T) {
+ config := normalizeRabbitMQConfig(connection.ConnectionConfig{
+ URI: "rabbitmq://guest:secret@127.0.0.1:15672/%2F?tls=true&skip_verify=true",
+ ConnectionParams: "defaultQueue=orders.events&exchange=events.topic&pageSize=500",
+ })
+
+ if config.Host != "127.0.0.1" || config.Port != 15672 {
+ t.Fatalf("unexpected rabbitmq host/port: %#v", config)
+ }
+ if config.User != "guest" || config.Password != "secret" {
+ t.Fatalf("unexpected rabbitmq credentials: %#v", config)
+ }
+ if config.Database != "/" {
+ t.Fatalf("expected default vhost '/', got %q", config.Database)
+ }
+ if !config.UseSSL || config.SSLMode != "skip-verify" {
+ t.Fatalf("unexpected rabbitmq tls settings: %#v", config)
+ }
+
+ params := rabbitmqConnectionParams(config)
+ if params.Get("defaultQueue") != "orders.events" || params.Get("exchange") != "events.topic" {
+ t.Fatalf("unexpected rabbitmq params: %#v", params)
+ }
+}
+
+func TestRabbitMQQueryExecAndColumns(t *testing.T) {
+ var lastGetCount int
+ var lastPublishBody map[string]interface{}
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ escapedPath := req.URL.EscapedPath()
+ switch {
+ case req.Method == http.MethodGet && escapedPath == "/api/vhosts":
+ _ = json.NewEncoder(w).Encode(map[string]interface{}{
+ "items": []map[string]interface{}{{"name": "/", "tracing": false}},
+ "page": 1,
+ "page_count": 1,
+ })
+ case req.Method == http.MethodGet && escapedPath == "/api/queues/%2F":
+ _ = json.NewEncoder(w).Encode(map[string]interface{}{
+ "items": []map[string]interface{}{
+ {
+ "vhost": "/",
+ "name": "orders.events.v1",
+ "durable": true,
+ "messages": 8,
+ "messages_ready": 5,
+ "messages_unacknowledged": 3,
+ "consumers": 2,
+ },
+ },
+ "page": 1,
+ "page_count": 1,
+ })
+ case req.Method == http.MethodGet && escapedPath == "/api/queues/%2F/orders.events.v1":
+ _ = json.NewEncoder(w).Encode(map[string]interface{}{
+ "vhost": "/",
+ "name": "orders.events.v1",
+ "durable": true,
+ "messages": 8,
+ "messages_ready": 5,
+ "messages_unacknowledged": 3,
+ "consumers": 2,
+ "node": "rabbit@node1",
+ })
+ case req.Method == http.MethodGet && escapedPath == "/api/exchanges/%2F":
+ _ = json.NewEncoder(w).Encode(map[string]interface{}{
+ "items": []map[string]interface{}{
+ {
+ "vhost": "/",
+ "name": "events.topic",
+ "type": "topic",
+ "durable": true,
+ "internal": false,
+ },
+ },
+ "page": 1,
+ "page_count": 1,
+ })
+ case req.Method == http.MethodPost && escapedPath == "/api/queues/%2F/orders.events.v1/get":
+ var body map[string]interface{}
+ if err := json.NewDecoder(req.Body).Decode(&body); err != nil {
+ t.Fatalf("decode get body failed: %v", err)
+ }
+ lastGetCount = intFromAny(body["count"], 0)
+ _ = json.NewEncoder(w).Encode([]map[string]interface{}{
+ {
+ "exchange": "events.topic",
+ "routing_key": "orders.events.v1",
+ "payload": `{"event":"created","meta":{"ip":"127.0.0.1"}}`,
+ "payload_bytes": 46,
+ "payload_encoding": "string",
+ "redelivered": false,
+ "message_count": 7,
+ "properties": map[string]interface{}{
+ "content_type": "application/json",
+ "headers": map[string]interface{}{
+ "x-env": "dev",
+ },
+ },
+ },
+ {
+ "exchange": "events.topic",
+ "routing_key": "orders.events.v1",
+ "payload": "plain-text",
+ "payload_bytes": 10,
+ "payload_encoding": "string",
+ "redelivered": true,
+ "message_count": 6,
+ "properties": map[string]interface{}{
+ "headers": map[string]interface{}{
+ "x-env": "qa",
+ },
+ },
+ },
+ })
+ case req.Method == http.MethodPut && escapedPath == "/api/exchanges/%2F/events.topic/publish":
+ if err := json.NewDecoder(req.Body).Decode(&lastPublishBody); err != nil {
+ t.Fatalf("decode publish body failed: %v", err)
+ }
+ _ = json.NewEncoder(w).Encode(map[string]interface{}{"routed": true})
+ default:
+ t.Fatalf("unexpected rabbitmq request: %s %s?%s", req.Method, escapedPath, req.URL.RawQuery)
+ }
+ }))
+ defer server.Close()
+
+ client := &RabbitMQDB{
+ client: server.Client(),
+ baseURL: server.URL,
+ defaultVHost: "/",
+ defaultQueue: "orders.events.v1",
+ defaultExchange: "events.topic",
+ pageSize: 50,
+ }
+
+ rows, columns, err := client.Query(`SHOW VHOSTS LIMIT 1`)
+ if err != nil {
+ t.Fatalf("SHOW VHOSTS failed: %v", err)
+ }
+ if len(rows) != 1 || rows[0]["vhost"] != "/" {
+ t.Fatalf("unexpected vhost rows: %#v", rows)
+ }
+ if !containsString(columns, "tracing") {
+ t.Fatalf("expected tracing column, got %v", columns)
+ }
+
+ rows, columns, err = client.Query(`SHOW QUEUES LIMIT 1`)
+ if err != nil {
+ t.Fatalf("SHOW QUEUES failed: %v", err)
+ }
+ if len(rows) != 1 || rows[0]["queue"] != "orders.events.v1" {
+ t.Fatalf("unexpected queue rows: %#v", rows)
+ }
+ if !containsString(columns, "messages_ready") {
+ t.Fatalf("expected messages_ready column, got %v", columns)
+ }
+
+ rows, _, err = client.Query(`DESCRIBE QUEUE "orders.events.v1"`)
+ if err != nil {
+ t.Fatalf("DESCRIBE QUEUE failed: %v", err)
+ }
+ if len(rows) != 1 || rows[0]["node"] != "rabbit@node1" {
+ t.Fatalf("unexpected describe queue rows: %#v", rows)
+ }
+
+ rows, columns, err = client.Query(`SHOW EXCHANGES LIMIT 1`)
+ if err != nil {
+ t.Fatalf("SHOW EXCHANGES failed: %v", err)
+ }
+ if len(rows) != 1 || rows[0]["exchange"] != "events.topic" {
+ t.Fatalf("unexpected exchange rows: %#v", rows)
+ }
+ if !containsString(columns, "type") {
+ t.Fatalf("expected exchange type column, got %v", columns)
+ }
+
+ rows, columns, err = client.Query(`SELECT * FROM "orders.events.v1" LIMIT 1 OFFSET 1`)
+ if err != nil {
+ t.Fatalf("SELECT queue failed: %v", err)
+ }
+ if lastGetCount != 2 {
+ t.Fatalf("expected fetch count 2 for offset emulation, got %d", lastGetCount)
+ }
+ if len(rows) != 1 || rows[0]["payload"] != "plain-text" || rows[0]["headers.x-env"] != "qa" {
+ t.Fatalf("unexpected rabbitmq message rows: %#v", rows)
+ }
+ if !containsString(columns, "headers.x-env") {
+ t.Fatalf("expected derived header column, got %v", columns)
+ }
+
+ affected, err := client.Exec(`{"queue":"orders.events.v1","exchange":"events.topic","routing_key":"orders.events.v1","payload":{"id":1},"headers":{"x-env":"dev"}}`)
+ if err != nil {
+ t.Fatalf("rabbitmq publish failed: %v", err)
+ }
+ if affected != 1 {
+ t.Fatalf("unexpected affected rows: %d", affected)
+ }
+ if lastPublishBody["routing_key"] != "orders.events.v1" {
+ t.Fatalf("unexpected publish routing key: %#v", lastPublishBody)
+ }
+ properties, ok := lastPublishBody["properties"].(map[string]interface{})
+ if !ok {
+ t.Fatalf("expected publish properties object, got %#v", lastPublishBody["properties"])
+ }
+ headers, ok := properties["headers"].(map[string]interface{})
+ if !ok || headers["x-env"] != "dev" {
+ t.Fatalf("unexpected publish headers: %#v", properties["headers"])
+ }
+ if lastPublishBody["payload_encoding"] != "string" || !strings.Contains(lastPublishBody["payload"].(string), `"id":1`) {
+ t.Fatalf("unexpected publish payload: %#v", lastPublishBody)
+ }
+
+ columnDefs, err := client.GetColumns("/", "orders.events.v1")
+ if err != nil {
+ t.Fatalf("GetColumns failed: %v", err)
+ }
+ names := make([]string, 0, len(columnDefs))
+ for _, col := range columnDefs {
+ names = append(names, col.Name)
+ }
+ joined := strings.Join(names, ",")
+ for _, want := range []string{"queue", "payload.meta.ip", "headers.x-env", "properties.content_type"} {
+ if !strings.Contains(joined, want) {
+ t.Fatalf("expected derived rabbitmq column %q in %s", want, joined)
+ }
+ }
+
+ databases, err := client.GetDatabases()
+ if err != nil {
+ t.Fatalf("GetDatabases failed: %v", err)
+ }
+ if !reflect.DeepEqual(databases, []string{"/"}) {
+ t.Fatalf("unexpected vhost list: %#v", databases)
+ }
+
+ tables, err := client.GetTables("/")
+ if err != nil {
+ t.Fatalf("GetTables failed: %v", err)
+ }
+ if !reflect.DeepEqual(tables, []string{"orders.events.v1"}) {
+ t.Fatalf("unexpected queue list: %#v", tables)
+ }
+}