mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-07-05 10:01:30 +08:00
✨ feat(rabbitmq): 新增 RabbitMQ 数据源连接与测试发消息支持
- 新增 RabbitMQ 管理 API 数据源实现,支持 vhost、queue、exchange 浏览与队列预览 - 统一消息发送弹窗,支持 Kafka Topic 与 RabbitMQ Queue 的测试发送命令生成 - 补齐连接表单、能力矩阵、SQL 方言、图标与前后端回归测试覆盖
This commit is contained in:
@@ -33,10 +33,10 @@ describe('ConnectionModal data source registry', () => {
|
||||
expect(source).toContain('type === "elasticsearch"');
|
||||
expect(source).toContain("return '支持索引浏览、Mapping 检查、JSON DSL 和 query_string 查询';");
|
||||
expect(source).toContain(
|
||||
'type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka") ? "" : "root";',
|
||||
'type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka" || type === "rabbitmq") ? "" : "root";',
|
||||
);
|
||||
expect(source).toContain(
|
||||
'placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka") ? "未开启认证可留空" : undefined}',
|
||||
'placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka" || dbType === "rabbitmq") ? "未开启认证可留空" : undefined}',
|
||||
);
|
||||
expect(source).toContain('label="显示数据库 (留空显示全部)"');
|
||||
});
|
||||
@@ -89,6 +89,19 @@ describe('ConnectionModal data source registry', () => {
|
||||
expect(source).toContain('label="默认 Topic(可选)"');
|
||||
});
|
||||
|
||||
it('exposes RabbitMQ in the create-connection picker with management-api and vhost defaults', () => {
|
||||
expect(source).toContain("case 'rabbitmq':");
|
||||
expect(source).toContain('return 15672;');
|
||||
expect(source).toContain('rabbitmq: ["rabbitmq", "http", "https"]');
|
||||
expect(source).toContain("key: 'rabbitmq'");
|
||||
expect(source).toContain("name: 'RabbitMQ'");
|
||||
expect(source).toContain('dbType === "rabbitmq"');
|
||||
expect(source).toContain("return 'Management API / Virtual Host / Queue';");
|
||||
expect(source).toContain('return "rabbitmq://guest:guest@127.0.0.1:15672/%2F?defaultQueue=orders.queue&exchange=events.topic&timeout=30";');
|
||||
expect(source).toContain('return "defaultQueue=orders.queue&exchange=events.topic&managementPathPrefix=/rabbitmq";');
|
||||
expect(source).toContain('label="默认 Virtual Host(可选)"');
|
||||
});
|
||||
|
||||
it('exposes GaussDB in the create-connection picker with PostgreSQL-family defaults', () => {
|
||||
expect(source).toContain("case 'gaussdb':");
|
||||
expect(source).toContain('return 5432;');
|
||||
|
||||
@@ -420,6 +420,7 @@ const ConnectionModal: React.FC<{
|
||||
const isOceanBaseOracle = dbType === "oceanbase" && oceanBaseProtocol === "oracle";
|
||||
const isMySQLLike = isMySQLCompatibleType(dbType) && !isOceanBaseOracle;
|
||||
const isKafka = dbType === "kafka";
|
||||
const isRabbitMQ = dbType === "rabbitmq";
|
||||
const supportsConnectionParams = supportsConnectionParamsForType(dbType);
|
||||
const isSSLType = supportsSSLForType(dbType);
|
||||
const supportsSSLCAPath = supportsSSLCAPathForType(dbType);
|
||||
@@ -1690,6 +1691,46 @@ const ConnectionModal: React.FC<{
|
||||
};
|
||||
}
|
||||
|
||||
if (type === "rabbitmq") {
|
||||
const defaultPort = getDefaultPortByType(type);
|
||||
const parsed = parseSingleHostUri(
|
||||
trimmedUri,
|
||||
["rabbitmq", "http", "https"],
|
||||
defaultPort,
|
||||
);
|
||||
if (!parsed) {
|
||||
return null;
|
||||
}
|
||||
const lowerUri = trimmedUri.toLowerCase();
|
||||
const tlsEnabled =
|
||||
lowerUri.startsWith("https://") ||
|
||||
normalizeUriBool(
|
||||
parsed.params.get("tls") ||
|
||||
parsed.params.get("ssl") ||
|
||||
parsed.params.get("useSSL") ||
|
||||
parsed.params.get("use_ssl"),
|
||||
);
|
||||
const skipVerify = normalizeUriBool(
|
||||
parsed.params.get("skip_verify") || parsed.params.get("skipVerify"),
|
||||
);
|
||||
const timeoutValue = Number(parsed.params.get("timeout"));
|
||||
return {
|
||||
host: parsed.host,
|
||||
port: parsed.port,
|
||||
user: parsed.username,
|
||||
password: parsed.password,
|
||||
database: parsed.database || "",
|
||||
useSSL: tlsEnabled,
|
||||
sslMode: tlsEnabled ? (skipVerify ? "skip-verify" : "required") : "disable",
|
||||
...extractSSLPathValuesFromParams(parsed.params, type),
|
||||
connectionParams: serializeConnectionParams(parsed.params),
|
||||
timeout:
|
||||
Number.isFinite(timeoutValue) && timeoutValue > 0
|
||||
? Math.min(MAX_TIMEOUT_SECONDS, Math.trunc(timeoutValue))
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
if (type === "clickhouse") {
|
||||
const httpValues = parseClickHouseHTTPUriToValues(trimmedUri);
|
||||
if (httpValues) {
|
||||
@@ -1941,6 +1982,9 @@ const ConnectionModal: React.FC<{
|
||||
if (dbType === "kafka") {
|
||||
return "kafka://user:pass@127.0.0.1:9092,127.0.0.2:9092/orders.events?topology=cluster&groupId=analytics&mechanism=scram-sha-256";
|
||||
}
|
||||
if (dbType === "rabbitmq") {
|
||||
return "rabbitmq://guest:guest@127.0.0.1:15672/%2F?defaultQueue=orders.queue&exchange=events.topic&timeout=30";
|
||||
}
|
||||
if (dbType === "redis") {
|
||||
return "redis://:pass@127.0.0.1:6379,127.0.0.2:6379/0?topology=cluster 或 redis://:pass@10.0.0.1:26379,10.0.0.2:26379/0?topology=sentinel&master=mymaster";
|
||||
}
|
||||
@@ -1998,6 +2042,8 @@ const ConnectionModal: React.FC<{
|
||||
return "fetchSize=1024&timeZone=Asia%2FShanghai";
|
||||
case "kafka":
|
||||
return "groupId=gonavi&mechanism=scram-sha-256&clientId=gonavi-desktop&startOffset=latest";
|
||||
case "rabbitmq":
|
||||
return "defaultQueue=orders.queue&exchange=events.topic&managementPathPrefix=/rabbitmq";
|
||||
default:
|
||||
return "key=value&another=value";
|
||||
}
|
||||
@@ -2090,6 +2136,28 @@ const ConnectionModal: React.FC<{
|
||||
return `kafka://${encodedAuth}${allBrokers.join(",")}${topicPath}${query ? `?${query}` : ""}`;
|
||||
}
|
||||
|
||||
if (type === "rabbitmq") {
|
||||
const address = toAddress(host, port, defaultPort);
|
||||
const params = new URLSearchParams();
|
||||
if (values.useSSL) {
|
||||
const mode = String(values.sslMode || "preferred")
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
params.set("tls", "true");
|
||||
if (mode === "skip-verify" || mode === "preferred") {
|
||||
params.set("skip_verify", "true");
|
||||
}
|
||||
appendSSLPathParamsForUri(params, type, values);
|
||||
}
|
||||
if (Number.isFinite(timeout) && timeout > 0) {
|
||||
params.set("timeout", String(timeout));
|
||||
}
|
||||
mergeConnectionParams(params, values.connectionParams);
|
||||
const vhostPath = database ? `/${encodeURIComponent(database)}` : "";
|
||||
const query = params.toString();
|
||||
return `rabbitmq://${encodedAuth}${address}${vhostPath}${query ? `?${query}` : ""}`;
|
||||
}
|
||||
|
||||
if (type === "redis") {
|
||||
return buildRedisUriFromValues(values);
|
||||
}
|
||||
@@ -3847,7 +3915,7 @@ const ConnectionModal: React.FC<{
|
||||
});
|
||||
} else if (type !== "custom") {
|
||||
const defaultUser =
|
||||
type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka") ? "" : "root";
|
||||
type === "clickhouse" ? "default" : (type === "redis" || type === "elasticsearch" || type === "chroma" || type === "qdrant" || type === "kafka" || type === "rabbitmq") ? "" : "root";
|
||||
const sslCapableType = supportsSSLForType(type);
|
||||
setUseSSL(false);
|
||||
setUseHttpTunnel(false);
|
||||
@@ -4993,6 +5061,22 @@ const ConnectionModal: React.FC<{
|
||||
),
|
||||
})}
|
||||
|
||||
{dbType === "rabbitmq" &&
|
||||
renderConfigSectionCard({
|
||||
sectionKey: "service",
|
||||
icon: <DatabaseOutlined />,
|
||||
children: (
|
||||
<Form.Item
|
||||
name="database"
|
||||
label="默认 Virtual Host(可选)"
|
||||
help="留空默认使用 /;填写后查询编辑器会以当前 vhost 作为 Queue 浏览与测试发送上下文。"
|
||||
style={{ marginBottom: 0 }}
|
||||
>
|
||||
<Input {...noAutoCapInputProps} placeholder="例如:/ 或 orders-vhost" />
|
||||
</Form.Item>
|
||||
),
|
||||
})}
|
||||
|
||||
{(dbType === "oracle" || isOceanBaseOracle) &&
|
||||
renderConfigSectionCard({
|
||||
sectionKey: "service",
|
||||
@@ -5204,13 +5288,13 @@ const ConnectionModal: React.FC<{
|
||||
name="user"
|
||||
label="用户名"
|
||||
rules={
|
||||
(dbType === "mongodb" || dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka")
|
||||
(dbType === "mongodb" || dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka" || dbType === "rabbitmq")
|
||||
? []
|
||||
: [createUriAwareRequiredRule("请输入用户名")]
|
||||
}
|
||||
style={{ marginBottom: 0 }}
|
||||
>
|
||||
<Input {...noAutoCapInputProps} placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka") ? "未开启认证可留空" : undefined} />
|
||||
<Input {...noAutoCapInputProps} placeholder={(dbType === "elasticsearch" || dbType === "chroma" || dbType === "qdrant" || dbType === "kafka" || dbType === "rabbitmq") ? "未开启认证可留空" : undefined} />
|
||||
</Form.Item>
|
||||
<Form.Item
|
||||
name="password"
|
||||
|
||||
@@ -46,6 +46,13 @@ describe('DatabaseIcons', () => {
|
||||
expect(markup).toContain('>Kf</text>');
|
||||
});
|
||||
|
||||
it('includes RabbitMQ in the selectable database icons', () => {
|
||||
expect(DB_ICON_TYPES).toContain('rabbitmq');
|
||||
expect(getDbIconLabel('rabbitmq')).toBe('RabbitMQ');
|
||||
const markup = renderToStaticMarkup(<>{getDbIcon('rabbitmq', undefined, 22)}</>);
|
||||
expect(markup).toContain('>RM</text>');
|
||||
});
|
||||
|
||||
it('includes GaussDB in the selectable database icons', () => {
|
||||
expect(DB_ICON_TYPES).toContain('gaussdb');
|
||||
expect(getDbIconLabel('gaussdb')).toBe('GaussDB');
|
||||
|
||||
@@ -53,6 +53,7 @@ const DB_DEFAULT_COLORS: Record<string, string> = {
|
||||
tdengine: '#2962FF',
|
||||
iotdb: '#0F766E',
|
||||
kafka: '#F97316',
|
||||
rabbitmq: '#FF6B35',
|
||||
chroma: '#7C3AED',
|
||||
qdrant: '#DC244C',
|
||||
diros: '#0050B3',
|
||||
@@ -196,6 +197,9 @@ const IoTDBIcon: React.FC<DbIconProps> = ({ size = 16, color }) => (
|
||||
const KafkaIcon: React.FC<DbIconProps> = ({ size = 16, color }) => (
|
||||
<ColorBadge size={size} color={color || DB_DEFAULT_COLORS.kafka} label="Kf" />
|
||||
);
|
||||
const RabbitMQIcon: React.FC<DbIconProps> = ({ size = 16, color }) => (
|
||||
<ColorBadge size={size} color={color || DB_DEFAULT_COLORS.rabbitmq} label="RM" />
|
||||
);
|
||||
const ChromaIcon: React.FC<DbIconProps> = ({ size = 16, color }) => (
|
||||
<ColorBadge size={size} color={color || DB_DEFAULT_COLORS.chroma} label="Ch" />
|
||||
);
|
||||
@@ -259,6 +263,7 @@ const DB_ICON_MAP: Record<string, React.FC<DbIconProps>> = {
|
||||
tdengine: TDengineIcon,
|
||||
iotdb: IoTDBIcon,
|
||||
kafka: KafkaIcon,
|
||||
rabbitmq: RabbitMQIcon,
|
||||
chroma: ChromaIcon,
|
||||
qdrant: QdrantIcon,
|
||||
elasticsearch: ElasticsearchIcon,
|
||||
@@ -269,7 +274,7 @@ const DB_ICON_MAP: Record<string, React.FC<DbIconProps>> = {
|
||||
export const DB_ICON_TYPES: string[] = [
|
||||
'mysql', 'mariadb', 'oceanbase', 'postgres', 'redis', 'mongodb', 'jvm',
|
||||
'oracle', 'sqlserver', 'sqlite', 'duckdb', 'clickhouse', 'starrocks',
|
||||
'kingbase', 'dameng', 'vastbase', 'opengauss', 'gaussdb', 'goldendb', 'highgo', 'iris', 'tdengine', 'iotdb', 'kafka', 'chroma', 'qdrant', 'elasticsearch', 'custom',
|
||||
'kingbase', 'dameng', 'vastbase', 'opengauss', 'gaussdb', 'goldendb', 'highgo', 'iris', 'tdengine', 'iotdb', 'kafka', 'rabbitmq', 'chroma', 'qdrant', 'elasticsearch', 'custom',
|
||||
];
|
||||
|
||||
/** 该类型是否有品牌 SVG 文件 */
|
||||
@@ -291,7 +296,7 @@ export const getDbIconLabel = (type: string): string => {
|
||||
sqlserver: 'SQL Server', clickhouse: 'ClickHouse', sqlite: 'SQLite',
|
||||
starrocks: 'StarRocks',
|
||||
duckdb: 'DuckDB', kingbase: '金仓', dameng: '达梦',
|
||||
vastbase: 'VastBase', opengauss: 'OpenGauss', gaussdb: 'GaussDB', goldendb: 'GoldenDB', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', kafka: 'Kafka',
|
||||
vastbase: 'VastBase', opengauss: 'OpenGauss', gaussdb: 'GaussDB', goldendb: 'GoldenDB', highgo: '瀚高', iris: 'InterSystems IRIS', tdengine: 'TDengine', iotdb: 'Apache IoTDB', kafka: 'Kafka', rabbitmq: 'RabbitMQ',
|
||||
chroma: 'Chroma',
|
||||
qdrant: 'Qdrant',
|
||||
elasticsearch: 'Elasticsearch',
|
||||
|
||||
216
frontend/src/components/MessagePublishModal.tsx
Normal file
216
frontend/src/components/MessagePublishModal.tsx
Normal file
@@ -0,0 +1,216 @@
|
||||
import React, { useEffect, useMemo, useState } from 'react';
|
||||
import { Alert, Form, Input, Modal, Select, Space, Typography, message } from 'antd';
|
||||
|
||||
import { DBQuery } from '../../wailsjs/go/app/App';
|
||||
import type { SavedConnection } from '../types';
|
||||
import { buildRpcConnectionConfig } from '../utils/connectionRpcConfig';
|
||||
import {
|
||||
buildMessagePublishCommand,
|
||||
createDefaultMessagePublishDraft,
|
||||
getMessagePublishPresentation,
|
||||
type MessagePublishDraft,
|
||||
} from '../utils/messagePublish';
|
||||
|
||||
const { Text } = Typography;
|
||||
const { TextArea } = Input;
|
||||
|
||||
export type MessagePublishModalProps = {
|
||||
open: boolean;
|
||||
connection: SavedConnection | null;
|
||||
executionDbName?: string;
|
||||
defaultDestination?: string;
|
||||
onCancel: () => void;
|
||||
onSuccess?: (result: { destination: string; affectedRows: number; commandText: string }) => void;
|
||||
};
|
||||
|
||||
const MessagePublishModal: React.FC<MessagePublishModalProps> = ({
|
||||
open,
|
||||
connection,
|
||||
executionDbName = '',
|
||||
defaultDestination = '',
|
||||
onCancel,
|
||||
onSuccess,
|
||||
}) => {
|
||||
const [form] = Form.useForm<MessagePublishDraft>();
|
||||
const [submitting, setSubmitting] = useState(false);
|
||||
const presentation = useMemo(
|
||||
() => getMessagePublishPresentation(connection?.config),
|
||||
[connection],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (!open || !connection) return;
|
||||
form.setFieldsValue(
|
||||
createDefaultMessagePublishDraft(
|
||||
connection.config,
|
||||
defaultDestination,
|
||||
),
|
||||
);
|
||||
}, [connection, defaultDestination, form, open]);
|
||||
|
||||
useEffect(() => {
|
||||
if (open) return;
|
||||
form.resetFields();
|
||||
setSubmitting(false);
|
||||
}, [form, open]);
|
||||
|
||||
const handleSubmit = async () => {
|
||||
if (!connection) return;
|
||||
|
||||
let values: MessagePublishDraft;
|
||||
try {
|
||||
values = await form.validateFields();
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
let command;
|
||||
try {
|
||||
command = buildMessagePublishCommand(connection.config, values);
|
||||
} catch (error: any) {
|
||||
void message.error(error?.message || '构造发送命令失败');
|
||||
return;
|
||||
}
|
||||
|
||||
setSubmitting(true);
|
||||
try {
|
||||
const res = await DBQuery(
|
||||
buildRpcConnectionConfig(connection.config) as any,
|
||||
executionDbName,
|
||||
command.commandText,
|
||||
);
|
||||
if (!res?.success) {
|
||||
void message.error(`发送失败: ${res?.message || '未知错误'}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const affectedRows = Number((res.data as any)?.affectedRows);
|
||||
onSuccess?.({
|
||||
destination: command.destinationLabel,
|
||||
affectedRows: Number.isFinite(affectedRows) ? affectedRows : 0,
|
||||
commandText: command.commandText,
|
||||
});
|
||||
} catch (error: any) {
|
||||
void message.error(`发送失败: ${error?.message || String(error)}`);
|
||||
} finally {
|
||||
setSubmitting(false);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<Modal
|
||||
title={`测试发送消息${connection?.name ? ` · ${connection.name}` : ''}`}
|
||||
open={open}
|
||||
onCancel={onCancel}
|
||||
onOk={() => { void handleSubmit(); }}
|
||||
okText="发送"
|
||||
confirmLoading={submitting}
|
||||
width={720}
|
||||
destroyOnHidden
|
||||
maskClosable={!submitting}
|
||||
>
|
||||
<Space direction="vertical" size={12} style={{ width: '100%' }}>
|
||||
<Alert
|
||||
type="info"
|
||||
showIcon
|
||||
message={presentation.alertMessage}
|
||||
/>
|
||||
|
||||
<Form<MessagePublishDraft>
|
||||
form={form}
|
||||
layout="vertical"
|
||||
initialValues={createDefaultMessagePublishDraft(connection?.config, defaultDestination)}
|
||||
>
|
||||
<Form.Item
|
||||
label={presentation.destinationLabel}
|
||||
name="destination"
|
||||
rules={[{ required: true, message: presentation.destinationRequiredMessage }]}
|
||||
>
|
||||
<Input placeholder={presentation.destinationPlaceholder} />
|
||||
</Form.Item>
|
||||
|
||||
{presentation.showExchange && (
|
||||
<Form.Item
|
||||
label="Exchange(可选)"
|
||||
name="exchange"
|
||||
extra="留空使用默认交换机;若填写自定义交换机,请确认目标 Queue 已建立 binding。"
|
||||
>
|
||||
<Input placeholder="例如:events.topic" />
|
||||
</Form.Item>
|
||||
)}
|
||||
|
||||
{presentation.showRoutingKey && (
|
||||
<Form.Item
|
||||
label="Routing Key(可选)"
|
||||
name="routingKey"
|
||||
extra="留空时默认使用当前 Queue 名。"
|
||||
>
|
||||
<Input placeholder="例如:orders.queue" />
|
||||
</Form.Item>
|
||||
)}
|
||||
|
||||
{presentation.showKey && (
|
||||
<Form.Item label="消息 Key(可选)">
|
||||
<Space.Compact style={{ width: '100%' }}>
|
||||
<Form.Item name="keyMode" noStyle>
|
||||
<Select
|
||||
style={{ width: 120 }}
|
||||
options={[
|
||||
{ label: '文本', value: 'text' },
|
||||
{ label: 'JSON', value: 'json' },
|
||||
]}
|
||||
/>
|
||||
</Form.Item>
|
||||
<Form.Item name="key" noStyle>
|
||||
<Input placeholder="可留空;JSON 模式请输入一行合法 JSON" />
|
||||
</Form.Item>
|
||||
</Space.Compact>
|
||||
</Form.Item>
|
||||
)}
|
||||
|
||||
<Form.Item label="消息体类型" name="bodyMode">
|
||||
<Select
|
||||
options={[
|
||||
{ label: 'JSON', value: 'json' },
|
||||
{ label: '文本', value: 'text' },
|
||||
]}
|
||||
/>
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item
|
||||
label="消息体"
|
||||
name="body"
|
||||
rules={[{ required: true, message: '请输入消息体' }]}
|
||||
extra="JSON 模式下需输入合法 JSON;文本模式按原样发送。"
|
||||
>
|
||||
<TextArea rows={8} placeholder="请输入消息体" />
|
||||
</Form.Item>
|
||||
|
||||
<Form.Item
|
||||
label="Headers(可选)"
|
||||
name="headers"
|
||||
extra={'需为 JSON 对象,例如 {"x-source":"gonavi"}。'}
|
||||
>
|
||||
<TextArea rows={5} placeholder='{"x-source":"gonavi"}' />
|
||||
</Form.Item>
|
||||
|
||||
{presentation.showProperties && (
|
||||
<Form.Item
|
||||
label="Properties(可选)"
|
||||
name="properties"
|
||||
extra='需为 JSON 对象,例如 {"content_type":"application/json"}。'
|
||||
>
|
||||
<TextArea rows={5} placeholder='{"content_type":"application/json"}' />
|
||||
</Form.Item>
|
||||
)}
|
||||
</Form>
|
||||
|
||||
<Text type="secondary">
|
||||
{presentation.successHint} 发送成功后会返回 <Text code>affectedRows</Text>,用于确认本次测试消息是否已提交。
|
||||
</Text>
|
||||
</Space>
|
||||
</Modal>
|
||||
);
|
||||
};
|
||||
|
||||
export default MessagePublishModal;
|
||||
23
frontend/src/components/Sidebar.message-publish.test.tsx
Normal file
23
frontend/src/components/Sidebar.message-publish.test.tsx
Normal file
@@ -0,0 +1,23 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import { readFileSync } from 'node:fs';
|
||||
|
||||
const sidebarSource = readFileSync(new URL('./Sidebar.tsx', import.meta.url), 'utf8');
|
||||
const contextMenuSource = readFileSync(new URL('./V2TableContextMenu.tsx', import.meta.url), 'utf8');
|
||||
const modalSource = readFileSync(new URL('./MessagePublishModal.tsx', import.meta.url), 'utf8');
|
||||
|
||||
describe('Sidebar Kafka publish entry', () => {
|
||||
it('adds a Kafka topic publish action in both legacy and v2 table menus', () => {
|
||||
expect(sidebarSource).toContain("key: 'publish-message'");
|
||||
expect(sidebarSource).toContain("label: '测试发送消息'");
|
||||
expect(sidebarSource).toContain('openMessagePublishModal(node)');
|
||||
expect(contextMenuSource).toContain("| 'publish-message'");
|
||||
expect(contextMenuSource).toContain("title: '测试发送消息'");
|
||||
});
|
||||
|
||||
it('renders the dedicated message publish modal and executes DBQuery through the encoder', () => {
|
||||
expect(sidebarSource).toContain('<MessagePublishModal');
|
||||
expect(modalSource).toContain('buildMessagePublishCommand');
|
||||
expect(modalSource).toContain('DBQuery(');
|
||||
expect(modalSource).toContain('消息体');
|
||||
});
|
||||
});
|
||||
@@ -25,6 +25,7 @@ import { Tree, message, Dropdown, MenuProps, Input, Button, Modal, Form, Badge,
|
||||
FileAddOutlined,
|
||||
PlusOutlined,
|
||||
ReloadOutlined,
|
||||
SendOutlined,
|
||||
DeleteOutlined,
|
||||
DisconnectOutlined,
|
||||
CloudOutlined,
|
||||
@@ -92,6 +93,7 @@ import { buildExternalSQLDirectoryId, buildExternalSQLRootNode, buildExternalSQL
|
||||
import { SIDEBAR_SQL_EDITOR_DRAG_MIME, encodeSidebarSqlEditorDragPayload } from '../utils/sidebarSqlDrag';
|
||||
import type { DriverStatusSnapshot } from '../utils/connectionDriverType';
|
||||
import JVMModeBadge from './jvm/JVMModeBadge';
|
||||
import MessagePublishModal from './MessagePublishModal';
|
||||
import {
|
||||
SEARCH_SCOPE_LABEL_MAP,
|
||||
SEARCH_SCOPE_OPTIONS,
|
||||
@@ -229,6 +231,11 @@ type BatchTableExportMode = 'schema' | 'backup' | 'dataOnly';
|
||||
type BatchObjectType = 'table' | 'view';
|
||||
type BatchObjectFilterType = 'all' | BatchObjectType;
|
||||
type BatchSelectionScope = 'filtered' | 'all';
|
||||
type SidebarMessagePublishTarget = {
|
||||
connection: SavedConnection;
|
||||
executionDbName: string;
|
||||
destination: string;
|
||||
};
|
||||
|
||||
interface BatchObjectItem {
|
||||
title: string;
|
||||
@@ -607,6 +614,7 @@ const Sidebar: React.FC<{
|
||||
const [isRenameTableModalOpen, setIsRenameTableModalOpen] = useState(false);
|
||||
const [renameTableForm] = Form.useForm();
|
||||
const [renameTableTarget, setRenameTableTarget] = useState<any>(null);
|
||||
const [messagePublishTarget, setMessagePublishTarget] = useState<SidebarMessagePublishTarget | null>(null);
|
||||
const [isRenameViewModalOpen, setIsRenameViewModalOpen] = useState(false);
|
||||
const [renameViewForm] = Form.useForm();
|
||||
const [renameViewTarget, setRenameViewTarget] = useState<any>(null);
|
||||
@@ -4719,6 +4727,37 @@ const Sidebar: React.FC<{
|
||||
});
|
||||
};
|
||||
|
||||
const resolveMessagePublishTarget = (node: any): SidebarMessagePublishTarget | null => {
|
||||
const connectionId = String(node?.dataRef?.id || '').trim();
|
||||
const liveConnection = connections.find((item) => item.id === connectionId);
|
||||
const sourceConnection = (liveConnection || node?.dataRef) as SavedConnection | undefined;
|
||||
if (!sourceConnection?.config) return null;
|
||||
const capabilities = getDataSourceCapabilities(sourceConnection.config);
|
||||
if (!capabilities.supportsMessagePublish) return null;
|
||||
|
||||
return {
|
||||
connection: sourceConnection,
|
||||
executionDbName: String(node?.dataRef?.dbName || ''),
|
||||
destination: String(node?.dataRef?.tableName || node?.title || '').trim(),
|
||||
};
|
||||
};
|
||||
|
||||
const openMessagePublishModal = (node: any) => {
|
||||
const target = resolveMessagePublishTarget(node);
|
||||
if (!target) {
|
||||
message.warning('当前对象不支持测试发送消息');
|
||||
return;
|
||||
}
|
||||
setMessagePublishTarget(target);
|
||||
};
|
||||
|
||||
const handleMessagePublishSuccess = (result: { destination: string; affectedRows: number }) => {
|
||||
const destination = String(result.destination || '').trim();
|
||||
const suffix = result.affectedRows > 0 ? `(已提交 ${result.affectedRows} 条)` : '';
|
||||
message.success(`测试消息已发送到 ${destination || '目标'}${suffix}`);
|
||||
setMessagePublishTarget(null);
|
||||
};
|
||||
|
||||
const handleV2TableContextMenuAction = (node: any, action: V2TableContextMenuActionKey) => {
|
||||
switch (action) {
|
||||
case 'pin-table':
|
||||
@@ -4746,6 +4785,9 @@ const Sidebar: React.FC<{
|
||||
});
|
||||
return;
|
||||
}
|
||||
case 'publish-message':
|
||||
openMessagePublishModal(node);
|
||||
return;
|
||||
case 'view-ddl':
|
||||
openTableDdlInDesigner(node);
|
||||
return;
|
||||
@@ -5873,6 +5915,7 @@ const Sidebar: React.FC<{
|
||||
const statsKey = getV2TableContextMenuStatsKey(node);
|
||||
const stats = v2TableContextMenuStats[statsKey];
|
||||
const isStarRocks = getMetadataDialect(node.dataRef as SavedConnection) === 'starrocks';
|
||||
const supportsMessagePublish = Boolean(resolveMessagePublishTarget(node));
|
||||
const isPinned = isSidebarTablePinned(
|
||||
pinnedSidebarTables,
|
||||
String(node?.dataRef?.id || ''),
|
||||
@@ -5888,6 +5931,7 @@ const Sidebar: React.FC<{
|
||||
isPinned={isPinned}
|
||||
supportsTruncate={supportsTableTruncateAction(node.dataRef?.config?.type, node.dataRef?.config?.driver)}
|
||||
supportsStarRocksRollup={isStarRocks}
|
||||
supportsMessagePublish={supportsMessagePublish}
|
||||
onAction={(action) => {
|
||||
setContextMenu(null);
|
||||
handleV2TableContextMenuAction(node, action);
|
||||
@@ -7109,6 +7153,7 @@ const Sidebar: React.FC<{
|
||||
];
|
||||
} else if (node.type === 'table') {
|
||||
const isStarRocks = getMetadataDialect(node.dataRef as SavedConnection) === 'starrocks';
|
||||
const messagePublishTarget = resolveMessagePublishTarget(node);
|
||||
return [
|
||||
{
|
||||
key: 'new-query',
|
||||
@@ -7127,6 +7172,12 @@ const Sidebar: React.FC<{
|
||||
});
|
||||
}
|
||||
},
|
||||
...(messagePublishTarget ? [{
|
||||
key: 'publish-message',
|
||||
label: '测试发送消息',
|
||||
icon: <SendOutlined />,
|
||||
onClick: () => openMessagePublishModal(node),
|
||||
}] : []),
|
||||
{ type: 'divider' },
|
||||
{
|
||||
key: 'design-table',
|
||||
@@ -8928,6 +8979,14 @@ const Sidebar: React.FC<{
|
||||
connectionId={findInDbContext.connectionId}
|
||||
dbName={findInDbContext.dbName}
|
||||
/>
|
||||
<MessagePublishModal
|
||||
open={Boolean(messagePublishTarget)}
|
||||
connection={messagePublishTarget?.connection || null}
|
||||
executionDbName={messagePublishTarget?.executionDbName || ''}
|
||||
defaultDestination={messagePublishTarget?.destination || ''}
|
||||
onCancel={() => setMessagePublishTarget(null)}
|
||||
onSuccess={handleMessagePublishSuccess}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
});
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
FolderOpenOutlined,
|
||||
FolderOutlined,
|
||||
SaveOutlined,
|
||||
SendOutlined,
|
||||
LinkOutlined,
|
||||
ReloadOutlined,
|
||||
TableOutlined,
|
||||
@@ -39,6 +40,7 @@ export type V2TableContextMenuActionKey =
|
||||
| 'design-table'
|
||||
| 'open-new-tab'
|
||||
| 'new-query'
|
||||
| 'publish-message'
|
||||
| 'view-ddl'
|
||||
| 'view-er'
|
||||
| 'copy-table-name'
|
||||
@@ -159,6 +161,7 @@ export const V2TableContextMenuView: React.FC<{
|
||||
isPinned?: boolean;
|
||||
supportsTruncate?: boolean;
|
||||
supportsStarRocksRollup?: boolean;
|
||||
supportsMessagePublish?: boolean;
|
||||
onAction?: (action: V2TableContextMenuActionKey) => void;
|
||||
}> = ({
|
||||
tableName,
|
||||
@@ -167,6 +170,7 @@ export const V2TableContextMenuView: React.FC<{
|
||||
isPinned = false,
|
||||
supportsTruncate = true,
|
||||
supportsStarRocksRollup = false,
|
||||
supportsMessagePublish = false,
|
||||
onAction,
|
||||
}) => {
|
||||
const renderItems = (items: V2TableContextMenuItemConfig[]) => renderV2ContextMenuItems(
|
||||
@@ -202,6 +206,7 @@ export const V2TableContextMenuView: React.FC<{
|
||||
{ action: 'design-table', icon: <EditOutlined />, title: '设计表 · 字段 / 索引 / 外键', kbd: primaryShortcut('D', shortcutPlatform) },
|
||||
{ action: 'open-new-tab', icon: <FileAddOutlined />, title: '在新标签打开', kbd: primaryShortcut('Enter', shortcutPlatform) },
|
||||
{ action: 'new-query', icon: <ConsoleSqlOutlined />, title: '新建查询' },
|
||||
...(supportsMessagePublish ? [{ action: 'publish-message' as const, icon: <SendOutlined />, title: '测试发送消息' }] : []),
|
||||
])}
|
||||
|
||||
<div className="gn-v2-context-menu-section-title">元信息</div>
|
||||
|
||||
@@ -79,4 +79,31 @@ describe('aiConnectionCapabilitiesInsights', () => {
|
||||
expect(snapshot.restrictions).toContain('create_database_hidden');
|
||||
expect(snapshot.restrictions).toContain('drop_database_hidden');
|
||||
});
|
||||
|
||||
it('includes publish_message when the datasource exposes message publish capability', () => {
|
||||
const connections: SavedConnection[] = [{
|
||||
id: 'conn-kafka',
|
||||
name: '订单事件总线',
|
||||
config: {
|
||||
type: 'kafka',
|
||||
host: '127.0.0.1',
|
||||
port: 9092,
|
||||
user: '',
|
||||
database: 'orders.events',
|
||||
},
|
||||
}];
|
||||
|
||||
const snapshot = buildConnectionCapabilitiesSnapshot({
|
||||
connectionId: 'conn-kafka',
|
||||
connections,
|
||||
});
|
||||
|
||||
if (!snapshot.hasConnection || !snapshot.capabilities) {
|
||||
throw new Error('expected kafka connection snapshot');
|
||||
}
|
||||
|
||||
expect(snapshot.capabilities.supportsMessagePublish).toBe(true);
|
||||
expect(snapshot.supportedActions).toContain('publish_message');
|
||||
expect(snapshot.uiHints.join(' ')).toContain('测试发送消息入口');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -53,6 +53,7 @@ export const buildConnectionCapabilitiesSnapshot = (params: {
|
||||
capabilities.supportsCreateDatabase ? 'create_database' : '',
|
||||
capabilities.supportsRenameDatabase ? 'rename_database' : '',
|
||||
capabilities.supportsDropDatabase ? 'drop_database' : '',
|
||||
capabilities.supportsMessagePublish ? 'publish_message' : '',
|
||||
capabilities.supportsApproximateTableCount ? 'approximate_table_count' : '',
|
||||
capabilities.supportsApproximateTotalPages ? 'approximate_total_pages' : '',
|
||||
].filter(Boolean);
|
||||
@@ -75,6 +76,9 @@ export const buildConnectionCapabilitiesSnapshot = (params: {
|
||||
capabilities.supportsApproximateTableCount
|
||||
? '表浏览场景允许显示近似行数,减少大表统计开销。'
|
||||
: '表浏览场景默认不使用近似行数。',
|
||||
capabilities.supportsMessagePublish
|
||||
? '当前数据源提供测试发送消息入口,适合做 Topic/Queue 的联调验证。'
|
||||
: '当前数据源未暴露测试发送消息入口。',
|
||||
];
|
||||
|
||||
return {
|
||||
|
||||
@@ -19,6 +19,7 @@ export const normalizeDriverType = (value: string): string => {
|
||||
if (normalized === 'qdrantdb' || normalized === 'qdrant-db') return 'qdrant';
|
||||
if (normalized === 'apache-iotdb' || normalized === 'apache_iotdb') return 'iotdb';
|
||||
if (normalized === 'apache-kafka' || normalized === 'apache_kafka') return 'kafka';
|
||||
if (normalized === 'rabbit-mq' || normalized === 'rabbit_mq') return 'rabbitmq';
|
||||
if (normalized === 'doris') return 'diros';
|
||||
if (
|
||||
normalized === 'open_gauss' ||
|
||||
|
||||
@@ -297,6 +297,19 @@ export const resolveConnectionConfigLayout = (
|
||||
],
|
||||
};
|
||||
}
|
||||
if (type === 'rabbitmq') {
|
||||
return {
|
||||
kind: 'generic-sql',
|
||||
sections: [
|
||||
'identity',
|
||||
'uri',
|
||||
'target',
|
||||
'service',
|
||||
'credentials',
|
||||
'databaseScope',
|
||||
],
|
||||
};
|
||||
}
|
||||
if (postgresCompatibleTypes.has(type)) {
|
||||
return {
|
||||
kind: 'postgres-compatible',
|
||||
|
||||
@@ -16,6 +16,7 @@ export const singleHostUriSchemesByType: Record<string, string[]> = {
|
||||
elasticsearch: ["http", "https"],
|
||||
chroma: ["http", "https", "chroma"],
|
||||
qdrant: ["http", "https", "qdrant"],
|
||||
rabbitmq: ["rabbitmq", "http", "https"],
|
||||
};
|
||||
|
||||
const normalizeConnectionType = (type: string) =>
|
||||
@@ -59,6 +60,7 @@ const sslSupportedTypes = new Set([
|
||||
"chroma",
|
||||
"qdrant",
|
||||
"kafka",
|
||||
"rabbitmq",
|
||||
]);
|
||||
|
||||
export const supportsSSLForType = (type: string) =>
|
||||
@@ -86,6 +88,7 @@ const sslCAPathSupportedTypes = new Set([
|
||||
"chroma",
|
||||
"qdrant",
|
||||
"kafka",
|
||||
"rabbitmq",
|
||||
]);
|
||||
|
||||
const sslClientCertificateSupportedTypes = new Set([
|
||||
@@ -107,6 +110,7 @@ const sslClientCertificateSupportedTypes = new Set([
|
||||
"mongodb",
|
||||
"redis",
|
||||
"kafka",
|
||||
"rabbitmq",
|
||||
]);
|
||||
|
||||
export const supportsSSLCAPathForType = (type: string) =>
|
||||
@@ -157,4 +161,5 @@ export const supportsConnectionParamsForType = (type: string) =>
|
||||
type === "elasticsearch" ||
|
||||
type === "chroma" ||
|
||||
type === "qdrant" ||
|
||||
type === "kafka";
|
||||
type === "kafka" ||
|
||||
type === "rabbitmq";
|
||||
|
||||
@@ -65,6 +65,7 @@ export const CONNECTION_TYPE_GROUPS: ConnectionTypeCatalogGroup[] = [
|
||||
label: '消息队列',
|
||||
items: [
|
||||
{ key: 'kafka', name: 'Kafka' },
|
||||
{ key: 'rabbitmq', name: 'RabbitMQ' },
|
||||
],
|
||||
},
|
||||
{
|
||||
@@ -124,6 +125,8 @@ export const getConnectionTypeDefaultPort = (type: string): number => {
|
||||
return 6333;
|
||||
case 'kafka':
|
||||
return 9092;
|
||||
case 'rabbitmq':
|
||||
return 15672;
|
||||
case 'highgo':
|
||||
return 5866;
|
||||
case 'mariadb':
|
||||
@@ -158,6 +161,8 @@ export const getConnectionTypeHint = (type: string): string => {
|
||||
return 'Storage Group / Device / Timeseries';
|
||||
case 'kafka':
|
||||
return 'Broker / Topic / Consumer Group';
|
||||
case 'rabbitmq':
|
||||
return 'Management API / Virtual Host / Queue';
|
||||
case 'oceanbase':
|
||||
return 'MySQL / Oracle 租户';
|
||||
case 'goldendb':
|
||||
|
||||
@@ -173,11 +173,33 @@ describe('dataSourceCapabilities', () => {
|
||||
supportsCreateDatabase: false,
|
||||
supportsRenameDatabase: false,
|
||||
supportsDropDatabase: false,
|
||||
supportsMessagePublish: true,
|
||||
forceReadOnlyQueryResult: true,
|
||||
});
|
||||
expect(getDataSourceCapabilities({ type: 'custom', driver: 'apache-kafka' })).toMatchObject({
|
||||
type: 'kafka',
|
||||
supportsQueryEditor: true,
|
||||
supportsMessagePublish: true,
|
||||
forceReadOnlyQueryResult: true,
|
||||
});
|
||||
});
|
||||
|
||||
it('treats RabbitMQ as a queryable messaging datasource with publish support', () => {
|
||||
expect(getDataSourceCapabilities({ type: 'rabbitmq' })).toMatchObject({
|
||||
type: 'rabbitmq',
|
||||
supportsQueryEditor: true,
|
||||
supportsSqlQueryExport: false,
|
||||
supportsCopyInsert: false,
|
||||
supportsCreateDatabase: false,
|
||||
supportsRenameDatabase: false,
|
||||
supportsDropDatabase: false,
|
||||
supportsMessagePublish: true,
|
||||
forceReadOnlyQueryResult: true,
|
||||
});
|
||||
expect(getDataSourceCapabilities({ type: 'custom', driver: 'rabbit-mq' })).toMatchObject({
|
||||
type: 'rabbitmq',
|
||||
supportsQueryEditor: true,
|
||||
supportsMessagePublish: true,
|
||||
forceReadOnlyQueryResult: true,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -42,6 +42,10 @@ const normalizeDataSourceToken = (raw: string): string => {
|
||||
case 'apache-kafka':
|
||||
case 'apache_kafka':
|
||||
return 'kafka';
|
||||
case 'rabbitmq':
|
||||
case 'rabbit-mq':
|
||||
case 'rabbit_mq':
|
||||
return 'rabbitmq';
|
||||
case 'intersystems':
|
||||
case 'intersystemsiris':
|
||||
case 'inter-systems':
|
||||
@@ -117,7 +121,8 @@ const COPY_INSERT_TYPES = new Set([
|
||||
]);
|
||||
|
||||
const QUERY_EDITOR_DISABLED_TYPES = new Set(['redis']);
|
||||
const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse', 'kafka']);
|
||||
const FORCE_READ_ONLY_QUERY_TYPES = new Set(['tdengine', 'iotdb', 'clickhouse', 'kafka', 'rabbitmq']);
|
||||
const MESSAGE_PUBLISH_TYPES = new Set(['kafka', 'rabbitmq']);
|
||||
const MANUAL_TOTAL_COUNT_TYPES = new Set(['duckdb', 'oracle']);
|
||||
const APPROXIMATE_TABLE_COUNT_TYPES = new Set(['duckdb', 'oracle']);
|
||||
const APPROXIMATE_TOTAL_PAGE_TYPES = new Set(['duckdb']);
|
||||
@@ -130,6 +135,7 @@ export type DataSourceCapabilities = {
|
||||
supportsCreateDatabase: boolean;
|
||||
supportsRenameDatabase: boolean;
|
||||
supportsDropDatabase: boolean;
|
||||
supportsMessagePublish: boolean;
|
||||
forceReadOnlyQueryResult: boolean;
|
||||
preferManualTotalCount: boolean;
|
||||
supportsApproximateTableCount: boolean;
|
||||
@@ -191,6 +197,7 @@ export const getDataSourceCapabilities = (config: ConnectionLike): DataSourceCap
|
||||
supportsCreateDatabase: CREATE_DATABASE_TYPES.has(type),
|
||||
supportsRenameDatabase: RENAME_DATABASE_TYPES.has(type),
|
||||
supportsDropDatabase: DROP_DATABASE_TYPES.has(type),
|
||||
supportsMessagePublish: MESSAGE_PUBLISH_TYPES.has(type),
|
||||
forceReadOnlyQueryResult: FORCE_READ_ONLY_QUERY_TYPES.has(type),
|
||||
preferManualTotalCount: MANUAL_TOTAL_COUNT_TYPES.has(type),
|
||||
supportsApproximateTableCount: APPROXIMATE_TABLE_COUNT_TYPES.has(type),
|
||||
|
||||
100
frontend/src/utils/messagePublish.test.ts
Normal file
100
frontend/src/utils/messagePublish.test.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import {
|
||||
buildMessagePublishCommand,
|
||||
createDefaultMessagePublishDraft,
|
||||
} from './messagePublish';
|
||||
|
||||
describe('messagePublish', () => {
|
||||
it('builds a Kafka publish JSON command from JSON payload inputs', () => {
|
||||
const result = buildMessagePublishCommand(
|
||||
{ type: 'kafka' },
|
||||
{
|
||||
destination: 'orders.events',
|
||||
keyMode: 'json',
|
||||
key: '{"tenant":"a"}',
|
||||
bodyMode: 'json',
|
||||
body: '{"id":1,"event":"created"}',
|
||||
headers: '{"x-env":"dev"}',
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.transportLabel).toBe('Kafka Topic');
|
||||
expect(result.destinationLabel).toBe('orders.events');
|
||||
expect(result.commandText).toContain('"publish": "orders.events"');
|
||||
expect(result.commandText).toContain('"tenant": "a"');
|
||||
expect(result.commandText).toContain('"id": 1');
|
||||
expect(result.commandText).toContain('"x-env": "dev"');
|
||||
});
|
||||
|
||||
it('keeps Kafka text payloads as plain strings', () => {
|
||||
const result = buildMessagePublishCommand(
|
||||
{ type: 'kafka' },
|
||||
{
|
||||
destination: 'logs.app',
|
||||
keyMode: 'text',
|
||||
key: 'tenant-a',
|
||||
bodyMode: 'text',
|
||||
body: 'hello gonavi',
|
||||
headers: '',
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.commandText).toContain('"key": "tenant-a"');
|
||||
expect(result.commandText).toContain('"value": "hello gonavi"');
|
||||
});
|
||||
|
||||
it('rejects non-object Kafka headers', () => {
|
||||
expect(() => buildMessagePublishCommand(
|
||||
{ type: 'kafka' },
|
||||
{
|
||||
destination: 'logs.app',
|
||||
bodyMode: 'json',
|
||||
body: '{"ok":true}',
|
||||
headers: '["bad"]',
|
||||
},
|
||||
)).toThrow('Headers 必须是 JSON 对象');
|
||||
});
|
||||
|
||||
it('seeds Kafka default publish draft with a JSON body example', () => {
|
||||
expect(createDefaultMessagePublishDraft({ type: 'kafka' }, 'orders.events')).toMatchObject({
|
||||
destination: 'orders.events',
|
||||
keyMode: 'text',
|
||||
bodyMode: 'json',
|
||||
});
|
||||
});
|
||||
|
||||
it('builds a RabbitMQ publish JSON command with routing and properties', () => {
|
||||
const result = buildMessagePublishCommand(
|
||||
{ type: 'rabbitmq', connectionParams: 'defaultQueue=orders.queue&exchange=events.topic' },
|
||||
{
|
||||
destination: 'orders.queue',
|
||||
exchange: '',
|
||||
routingKey: '',
|
||||
bodyMode: 'json',
|
||||
body: '{"id":1,"event":"created"}',
|
||||
headers: '{"x-env":"dev"}',
|
||||
properties: '{"content_type":"application/json"}',
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.transportLabel).toBe('RabbitMQ Queue');
|
||||
expect(result.destinationLabel).toBe('orders.queue');
|
||||
expect(result.commandText).toContain('"publish": "orders.queue"');
|
||||
expect(result.commandText).toContain('"exchange": "events.topic"');
|
||||
expect(result.commandText).toContain('"routing_key": "orders.queue"');
|
||||
expect(result.commandText).toContain('"content_type": "application/json"');
|
||||
});
|
||||
|
||||
it('seeds RabbitMQ default publish draft with defaultQueue and exchange', () => {
|
||||
expect(createDefaultMessagePublishDraft(
|
||||
{ type: 'rabbitmq', connectionParams: 'defaultQueue=orders.queue&exchange=events.topic' },
|
||||
'',
|
||||
)).toMatchObject({
|
||||
destination: 'orders.queue',
|
||||
exchange: 'events.topic',
|
||||
routingKey: 'orders.queue',
|
||||
bodyMode: 'json',
|
||||
});
|
||||
});
|
||||
});
|
||||
274
frontend/src/utils/messagePublish.ts
Normal file
274
frontend/src/utils/messagePublish.ts
Normal file
@@ -0,0 +1,274 @@
|
||||
import { resolveDataSourceType } from './dataSourceCapabilities';
|
||||
|
||||
type ConnectionLike = {
|
||||
type?: string;
|
||||
driver?: string;
|
||||
oceanBaseProtocol?: string;
|
||||
database?: string;
|
||||
uri?: string;
|
||||
connectionParams?: string;
|
||||
} | null | undefined;
|
||||
|
||||
export type MessagePublishValueMode = 'text' | 'json';
|
||||
|
||||
export type MessagePublishDraft = {
|
||||
destination: string;
|
||||
exchange?: string;
|
||||
routingKey?: string;
|
||||
keyMode?: MessagePublishValueMode;
|
||||
key?: string;
|
||||
bodyMode?: MessagePublishValueMode;
|
||||
body: string;
|
||||
headers?: string;
|
||||
properties?: string;
|
||||
};
|
||||
|
||||
export type MessagePublishCommand = {
|
||||
commandText: string;
|
||||
destinationLabel: string;
|
||||
transportLabel: string;
|
||||
};
|
||||
|
||||
export type MessagePublishPresentation = {
|
||||
transportLabel: string;
|
||||
destinationLabel: string;
|
||||
destinationPlaceholder: string;
|
||||
destinationRequiredMessage: string;
|
||||
alertMessage: string;
|
||||
successHint: string;
|
||||
showKey: boolean;
|
||||
showExchange: boolean;
|
||||
showRoutingKey: boolean;
|
||||
showProperties: boolean;
|
||||
};
|
||||
|
||||
const normalizeMode = (value: unknown, fallback: MessagePublishValueMode): MessagePublishValueMode => {
|
||||
const normalized = String(value || '').trim().toLowerCase();
|
||||
if (normalized === 'text') return 'text';
|
||||
if (normalized === 'json') return 'json';
|
||||
return fallback;
|
||||
};
|
||||
|
||||
const parseRequiredPayload = (
|
||||
rawValue: unknown,
|
||||
mode: MessagePublishValueMode,
|
||||
fieldLabel: string,
|
||||
): string | number | boolean | Record<string, any> | Array<any> => {
|
||||
const text = String(rawValue ?? '');
|
||||
if (!text.trim()) {
|
||||
throw new Error(`请输入${fieldLabel}`);
|
||||
}
|
||||
if (mode === 'text') {
|
||||
return text;
|
||||
}
|
||||
try {
|
||||
return JSON.parse(text);
|
||||
} catch (error: any) {
|
||||
throw new Error(`${fieldLabel}不是合法 JSON:${error?.message || String(error)}`);
|
||||
}
|
||||
};
|
||||
|
||||
const parseOptionalPayload = (
|
||||
rawValue: unknown,
|
||||
mode: MessagePublishValueMode,
|
||||
fieldLabel: string,
|
||||
): string | number | boolean | Record<string, any> | Array<any> | undefined => {
|
||||
const text = String(rawValue ?? '');
|
||||
if (!text.trim()) {
|
||||
return undefined;
|
||||
}
|
||||
return parseRequiredPayload(text, mode, fieldLabel);
|
||||
};
|
||||
|
||||
const parseOptionalJSONObject = (
|
||||
rawValue: unknown,
|
||||
fieldLabel: string,
|
||||
): Record<string, any> | undefined => {
|
||||
const text = String(rawValue ?? '');
|
||||
if (!text.trim()) {
|
||||
return undefined;
|
||||
}
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(text);
|
||||
} catch (error: any) {
|
||||
throw new Error(`${fieldLabel}不是合法 JSON:${error?.message || String(error)}`);
|
||||
}
|
||||
if (!parsed || Array.isArray(parsed) || typeof parsed !== 'object') {
|
||||
throw new Error(`${fieldLabel} 必须是 JSON 对象`);
|
||||
}
|
||||
return parsed as Record<string, any>;
|
||||
};
|
||||
|
||||
const mergeSearchParams = (target: URLSearchParams, sourceText: unknown) => {
|
||||
const text = String(sourceText ?? '').trim();
|
||||
if (!text) return;
|
||||
const raw = text.includes('?') ? text.slice(text.indexOf('?') + 1) : text;
|
||||
const params = new URLSearchParams(raw.replace(/^\?/, ''));
|
||||
params.forEach((value, key) => {
|
||||
if (String(key || '').trim()) {
|
||||
target.set(key, value);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const resolveConnectionParams = (config: ConnectionLike): URLSearchParams => {
|
||||
const params = new URLSearchParams();
|
||||
if (!config) return params;
|
||||
mergeSearchParams(params, config.uri);
|
||||
mergeSearchParams(params, config.connectionParams);
|
||||
return params;
|
||||
};
|
||||
|
||||
const normalizeRabbitMQExchange = (value: unknown): string => {
|
||||
const normalized = String(value ?? '').trim();
|
||||
if (normalized === 'amq.default' || normalized === '(default)') {
|
||||
return '';
|
||||
}
|
||||
return normalized;
|
||||
};
|
||||
|
||||
const resolveDefaultDestination = (config: ConnectionLike, explicitDestination: string): string => {
|
||||
const destination = String(explicitDestination || '').trim();
|
||||
if (destination) return destination;
|
||||
|
||||
const resolvedType = resolveDataSourceType(config as any);
|
||||
const params = resolveConnectionParams(config);
|
||||
|
||||
if (resolvedType === 'kafka') {
|
||||
return String(config?.database || '').trim();
|
||||
}
|
||||
if (resolvedType === 'rabbitmq') {
|
||||
return String(params.get('defaultQueue') || params.get('queue') || '').trim();
|
||||
}
|
||||
return '';
|
||||
};
|
||||
|
||||
export const getMessagePublishPresentation = (
|
||||
config: ConnectionLike,
|
||||
): MessagePublishPresentation => {
|
||||
const resolvedType = resolveDataSourceType(config as any);
|
||||
|
||||
if (resolvedType === 'rabbitmq') {
|
||||
return {
|
||||
transportLabel: 'RabbitMQ Queue',
|
||||
destinationLabel: 'Queue',
|
||||
destinationPlaceholder: '例如:orders.queue',
|
||||
destinationRequiredMessage: '请输入 Queue',
|
||||
alertMessage: '当前表单会自动拼装 RabbitMQ publish JSON 命令,并通过 Management API 执行测试发送。',
|
||||
successHint: '留空 Exchange 时会使用默认交换机并按 Queue 名作为 routing key。',
|
||||
showKey: false,
|
||||
showExchange: true,
|
||||
showRoutingKey: true,
|
||||
showProperties: true,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
transportLabel: 'Kafka Topic',
|
||||
destinationLabel: 'Topic',
|
||||
destinationPlaceholder: '例如:orders.events',
|
||||
destinationRequiredMessage: '请输入 Topic',
|
||||
alertMessage: '当前表单会自动拼装 Kafka publish JSON 命令,并直接调用后端执行测试发送。',
|
||||
successHint: 'Headers 会作为 Kafka Record Headers 一并发送。',
|
||||
showKey: true,
|
||||
showExchange: false,
|
||||
showRoutingKey: false,
|
||||
showProperties: false,
|
||||
};
|
||||
};
|
||||
|
||||
export const createDefaultMessagePublishDraft = (
|
||||
config: ConnectionLike,
|
||||
destination = '',
|
||||
): MessagePublishDraft => {
|
||||
const resolvedType = resolveDataSourceType(config as any);
|
||||
const resolvedDestination = resolveDefaultDestination(config, destination);
|
||||
const params = resolveConnectionParams(config);
|
||||
|
||||
if (resolvedType === 'rabbitmq') {
|
||||
return {
|
||||
destination: resolvedDestination,
|
||||
exchange: normalizeRabbitMQExchange(params.get('defaultExchange') || params.get('exchange') || ''),
|
||||
routingKey: resolvedDestination,
|
||||
bodyMode: 'json',
|
||||
body: '{\n "event": "test",\n "source": "gonavi"\n}',
|
||||
headers: '{\n "x-source": "gonavi"\n}',
|
||||
properties: '{\n "content_type": "application/json"\n}',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
destination: resolvedDestination,
|
||||
keyMode: 'text',
|
||||
key: '',
|
||||
bodyMode: 'json',
|
||||
body: '{\n "event": "test",\n "source": "gonavi"\n}',
|
||||
headers: '{\n "x-source": "gonavi"\n}',
|
||||
};
|
||||
};
|
||||
|
||||
export const buildMessagePublishCommand = (
|
||||
config: ConnectionLike,
|
||||
draft: MessagePublishDraft,
|
||||
): MessagePublishCommand => {
|
||||
const resolvedType = resolveDataSourceType(config as any);
|
||||
const destination = String(draft.destination || '').trim();
|
||||
if (!destination) {
|
||||
throw new Error('请输入目标 Topic / Queue');
|
||||
}
|
||||
|
||||
if (resolvedType === 'rabbitmq') {
|
||||
const params = resolveConnectionParams(config);
|
||||
const bodyMode = normalizeMode(draft.bodyMode, 'json');
|
||||
const command: Record<string, unknown> = {
|
||||
publish: destination,
|
||||
payload: parseRequiredPayload(draft.body, bodyMode, '消息体'),
|
||||
exchange: normalizeRabbitMQExchange(draft.exchange || params.get('defaultExchange') || params.get('exchange') || ''),
|
||||
routing_key: String(draft.routingKey || '').trim() || destination,
|
||||
};
|
||||
|
||||
const headers = parseOptionalJSONObject(draft.headers, 'Headers');
|
||||
if (headers && Object.keys(headers).length > 0) {
|
||||
command.headers = headers;
|
||||
}
|
||||
|
||||
const properties = parseOptionalJSONObject(draft.properties, 'Properties');
|
||||
if (properties && Object.keys(properties).length > 0) {
|
||||
command.properties = properties;
|
||||
}
|
||||
|
||||
return {
|
||||
commandText: JSON.stringify(command, null, 2),
|
||||
destinationLabel: destination,
|
||||
transportLabel: 'RabbitMQ Queue',
|
||||
};
|
||||
}
|
||||
|
||||
if (resolvedType === 'kafka') {
|
||||
const keyMode = normalizeMode(draft.keyMode, 'text');
|
||||
const bodyMode = normalizeMode(draft.bodyMode, 'json');
|
||||
const command: Record<string, unknown> = {
|
||||
publish: destination,
|
||||
value: parseRequiredPayload(draft.body, bodyMode, '消息体'),
|
||||
};
|
||||
|
||||
const keyPayload = parseOptionalPayload(draft.key, keyMode, '消息 Key');
|
||||
if (keyPayload !== undefined) {
|
||||
command.key = keyPayload;
|
||||
}
|
||||
|
||||
const headers = parseOptionalJSONObject(draft.headers, 'Headers');
|
||||
if (headers && Object.keys(headers).length > 0) {
|
||||
command.headers = headers;
|
||||
}
|
||||
|
||||
return {
|
||||
commandText: JSON.stringify(command, null, 2),
|
||||
destinationLabel: destination,
|
||||
transportLabel: 'Kafka Topic',
|
||||
};
|
||||
}
|
||||
|
||||
throw new Error(`当前数据源暂不支持测试发送消息:${resolvedType || 'unknown'}`);
|
||||
};
|
||||
@@ -10,4 +10,8 @@ describe('buildTableSelectQuery', () => {
|
||||
it('adds a preview limit for Kafka topic browsing', () => {
|
||||
expect(buildTableSelectQuery('kafka', 'logs.app-1')).toBe('SELECT * FROM "logs.app-1" LIMIT 100;');
|
||||
});
|
||||
|
||||
it('adds a preview limit for RabbitMQ queue browsing', () => {
|
||||
expect(buildTableSelectQuery('rabbitmq', 'orders.events.v1')).toBe('SELECT * FROM "orders.events.v1" LIMIT 100;');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,7 +5,7 @@ export const buildTableSelectQuery = (dbType: string, tableName: string): string
|
||||
if (!normalizedTableName) {
|
||||
return 'SELECT * FROM ';
|
||||
}
|
||||
if (String(dbType || '').trim().toLowerCase() === 'kafka') {
|
||||
if (['kafka', 'rabbitmq'].includes(String(dbType || '').trim().toLowerCase())) {
|
||||
return `SELECT * FROM ${quoteQualifiedIdent(dbType, normalizedTableName)} LIMIT 100;`;
|
||||
}
|
||||
return `SELECT * FROM ${quoteQualifiedIdent(dbType, normalizedTableName)};`;
|
||||
|
||||
@@ -64,6 +64,11 @@ describe('quoteQualifiedIdent', () => {
|
||||
.toBe('"logs.app-1"');
|
||||
});
|
||||
|
||||
it('keeps RabbitMQ queue names as one quoted identifier', () => {
|
||||
expect(quoteQualifiedIdent('rabbitmq', 'orders.events.v1'))
|
||||
.toBe('"orders.events.v1"');
|
||||
});
|
||||
|
||||
it('quotes GoldenDB identifiers with MySQL-style backticks', () => {
|
||||
expect(quoteQualifiedIdent('goldendb', 'ledger.entries'))
|
||||
.toBe('`ledger`.`entries`');
|
||||
|
||||
@@ -54,7 +54,7 @@ export const quoteIdentPart = (dbType: string, ident: string) => {
|
||||
export const quoteQualifiedIdent = (dbType: string, ident: string) => {
|
||||
const raw = (ident || '').trim();
|
||||
if (!raw) return raw;
|
||||
if ((dbType || '').trim().toLowerCase() === 'kafka') {
|
||||
if (['kafka', 'rabbitmq'].includes((dbType || '').trim().toLowerCase())) {
|
||||
return quoteIdentPart(dbType, raw);
|
||||
}
|
||||
const parts = splitQualifiedNameSegments(raw).filter(Boolean);
|
||||
|
||||
@@ -40,6 +40,8 @@ describe('sqlDialect', () => {
|
||||
expect(resolveSqlDialect('custom', 'apache_iotdb')).toBe('iotdb');
|
||||
expect(resolveSqlDialect('Apache-Kafka')).toBe('kafka');
|
||||
expect(resolveSqlDialect('custom', 'apache_kafka')).toBe('kafka');
|
||||
expect(resolveSqlDialect('Rabbit-MQ')).toBe('rabbitmq');
|
||||
expect(resolveSqlDialect('custom', 'rabbit_mq')).toBe('rabbitmq');
|
||||
expect(resolveSqlDialect('OceanBase', '', { oceanBaseProtocol: 'oracle' })).toBe('oracle');
|
||||
expect(resolveSqlDialect('custom', 'oceanbase', { oceanBaseProtocol: 'oracle' })).toBe('oracle');
|
||||
expect(isMysqlFamilyDialect('mariadb')).toBe(true);
|
||||
@@ -78,6 +80,11 @@ describe('sqlDialect', () => {
|
||||
expect(resolveSqlKeywords('kafka')).not.toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'AUTO_INCREMENT']));
|
||||
});
|
||||
|
||||
it('resolves RabbitMQ completion keywords for queue and exchange discovery', () => {
|
||||
expect(resolveSqlKeywords('rabbitmq')).toEqual(expect.arrayContaining(['SHOW VHOSTS', 'SHOW QUEUES', 'SHOW EXCHANGES', 'DESCRIBE QUEUE']));
|
||||
expect(resolveSqlKeywords('rabbitmq')).not.toEqual(expect.arrayContaining(['ALIGN BY DEVICE', 'AUTO_INCREMENT']));
|
||||
});
|
||||
|
||||
it('resolves GaussDB completion keywords and functions as a PostgreSQL-like dialect', () => {
|
||||
expect(resolveSqlKeywords('gaussdb')).toEqual(expect.arrayContaining(['RETURNING', 'SERIAL', 'JSONB']));
|
||||
expect(names(resolveSqlFunctions('gaussdb'))).toEqual(expect.arrayContaining(['STRING_AGG', 'TO_CHAR', 'CURRENT_DATABASE']));
|
||||
|
||||
@@ -30,6 +30,7 @@ export type SqlDialect =
|
||||
| 'tdengine'
|
||||
| 'iotdb'
|
||||
| 'kafka'
|
||||
| 'rabbitmq'
|
||||
| 'mongodb'
|
||||
| 'redis'
|
||||
| 'elasticsearch'
|
||||
@@ -140,6 +141,10 @@ export const resolveSqlDialect = (
|
||||
case 'apache-kafka':
|
||||
case 'apache_kafka':
|
||||
return 'kafka';
|
||||
case 'rabbitmq':
|
||||
case 'rabbit-mq':
|
||||
case 'rabbit_mq':
|
||||
return 'rabbitmq';
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@@ -165,6 +170,7 @@ export const resolveSqlDialect = (
|
||||
if (source.includes('tdengine')) return 'tdengine';
|
||||
if (source.includes('iotdb')) return 'iotdb';
|
||||
if (source.includes('kafka')) return 'kafka';
|
||||
if (source.includes('rabbitmq') || source.includes('rabbit-mq') || source.includes('rabbit_mq')) return 'rabbitmq';
|
||||
if (source.includes('sqlserver') || source.includes('mssql')) return 'sqlserver';
|
||||
if (source.includes('iris') || source.includes('intersystems')) return 'iris';
|
||||
if (source.includes('elastic')) return 'elasticsearch';
|
||||
@@ -628,6 +634,18 @@ const KAFKA_KEYWORDS = [
|
||||
'OFFSET',
|
||||
];
|
||||
|
||||
const RABBITMQ_KEYWORDS = [
|
||||
'SHOW VHOSTS',
|
||||
'SHOW QUEUES',
|
||||
'SHOW EXCHANGES',
|
||||
'DESCRIBE QUEUE',
|
||||
'DESCRIBE EXCHANGE',
|
||||
'CONSUME',
|
||||
'FROM',
|
||||
'LIMIT',
|
||||
'OFFSET',
|
||||
];
|
||||
|
||||
export const resolveSqlKeywords = (dbType: string): string[] => {
|
||||
const dialect = resolveSqlDialect(dbType);
|
||||
if (dialect === 'starrocks') return unique([...COMMON_KEYWORDS, ...MYSQL_KEYWORDS, ...STARROCKS_KEYWORDS]);
|
||||
@@ -641,6 +659,7 @@ export const resolveSqlKeywords = (dbType: string): string[] => {
|
||||
if (dialect === 'tdengine') return unique([...COMMON_KEYWORDS, ...TDENGINE_KEYWORDS]);
|
||||
if (dialect === 'iotdb') return unique([...COMMON_KEYWORDS, ...IOTDB_KEYWORDS]);
|
||||
if (dialect === 'kafka') return unique([...COMMON_KEYWORDS, ...KAFKA_KEYWORDS]);
|
||||
if (dialect === 'rabbitmq') return unique([...COMMON_KEYWORDS, ...RABBITMQ_KEYWORDS]);
|
||||
return COMMON_KEYWORDS;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user