Files
MyGoNavi/internal/db/mongodb_impl.go
Syngnat 6ad690cffc release/0.5.6 (#210)
* 🐛 fix(data-viewer): 修复ClickHouse尾部分页异常并增强DuckDB复杂类型兼容

- DataViewer 新增 ClickHouse 反向分页策略,修复最后页与倒数页查询失败
- DuckDB 查询失败时按列类型生成安全 SELECT,复杂类型转 VARCHAR 重试
- 分页状态统一使用 currentPage 回填,避免页码与总数推导不一致
- 增强查询异常日志与重试路径,降低大表场景卡顿与误报

*  feat(frontend-driver): 驱动管理支持快速搜索并优化信息展示

- 新增搜索框,支持按 DuckDB/ClickHouse 等关键字快速定位驱动
- 显示“匹配 x / y”统计与无结果提示
- 优化头部区域排版,提升透明/暗色场景下的视觉对齐

* 🔧 fix(connection-modal): 修复多数据源URI导入解析并校正Oracle服务名校验

- 新增单主机URI解析映射,兼容 postgres/postgresql、sqlserver、redis、tdengine、dameng(dm)、kingbase、highgo、vastbase、clickhouse、oracle
- 抽取 parseSingleHostUri 复用逻辑,统一 host/port/user/password/database 回填行为
- Oracle 连接新增服务名必填校验,移除“服务名为空回退用户名”的隐式逻辑
- 连接弹窗补充 Oracle 服务名输入项与 URI 示例

* 🐛 fix(query-export): 修复查询结果导出卡住并统一按数据源能力控制导出路径

- 查询结果页导出增加稳定兜底,异常时确保 loading 关闭避免持续转圈
- DataGrid 导出逻辑按数据源能力分流,优先走后端 ExportQuery 并保留结果集导出降级
- QueryEditor 传递结果导出 SQL,保证查询结果导出范围与当前结果一致
- 后端补充 ExportData/ExportQuery 关键日志,提升导出链路可观测性

* 🐛 fix(precision): 修复查询链路与分页统计的大整数精度丢失

- 代理响应数据解码改为 UseNumber,避免默认 float64 吞精度
- 统一归一化 json.Number 与超界整数,超出 JS 安全范围转字符串
- 修复 DataViewer 总数解析,超大值不再误转 Number 参与分页
- refs #142

* 🐛 fix(driver-manager): 修复驱动管理网络告警重复并强化代理引导

- 新增下载链路域名探测,区分“GitHub可达但驱动下载链路不可达”
- 网络不可达场景仅保留红色强提醒,移除重复二级告警
- 强提醒增加“打开全局代理设置”入口,优先引导使用 GoNavi 全局代理
- 统一网络检测与目录说明提示图标尺寸,修复加载期视觉不一致
- refs #141

* ♻️ refactor(frontend-interaction): 统一标签拖拽与暗色主题交互实现

- 重构Tab拖拽排序实现,统一为可配置拖拽引擎
- 规范拖拽与点击事件边界,提升交互一致性
- 统一多组件暗色透明样式策略,减少硬编码色值
- 提升Redis/表格/连接面板在透明模式下的观感一致性
- refs #144

* ♻️ refactor(update-state): 重构在线更新状态流并按版本统一进度展示

- 重构更新检查与下载状态同步流程,减少前后端状态分叉
- 进度展示严格绑定 latestVersion,避免跨版本状态串用
- 优化 about 打开场景的静默检查状态回填逻辑
- 统一下载弹窗关闭/后台隐藏行为
- 保持现有安装流程并补齐目录打开能力

* 🎨 style(sidebar-log): 将SQL执行日志入口调整为悬浮胶囊样式

- 移除侧栏底部整条日志入口容器
- 新增悬浮按钮阴影/边框/透明背景并适配明暗主题
- 为树区域预留底部空间避免入口遮挡内容

*  feat(redis-cluster): 支持集群模式逻辑多库隔离与 0-15 库切换

- 前端恢复 Redis 集群场景下 db0-db15 的数据库选择与展示
- 后端新增集群逻辑库命名空间前缀映射,统一 key/pattern 读写隔离
- 覆盖扫描、读取、写入、删除、重命名等核心操作的键映射规则
- 集群命令通道支持 SELECT 逻辑切库与 FLUSHDB 逻辑库清空
- refs #145

*  feat(DataGrid): 大数据表虚拟滚动性能优化及UI一致性修复

- 启用动态虚拟滚动(数据量≥500行自动切换),解决万行数据表卡顿问题
- 虚拟模式下EditableCell改用div渲染,CSS选择器从元素级改为类级适配虚拟DOM
- 修复虚拟模式双水平滚动条:样式化rc-virtual-list内置滚动条为胶囊外观,禁用自定义外部滚动条
- 为rc-virtual-list水平滚动条添加鼠标滚轮支持(MutationObserver + marginLeft驱动)
- 修复白色主题透明模式下列名悬浮Tooltip对比度不足的问题
- 新增白色主题全局滚动条样式适配透明模式(App.css)
- App.tsx主题token与组件样式优化
- refs #147

* 🔧 chore(app): 清理 App.tsx 类型告警并收敛前端壳层实现

- 清除未使用代码和冗余状态
- 替换弃用 API 以消除 IDE 提示
- 显式处理浮动 Promise 避免告警
- 保持现有更新检查和代理设置行为不变

* 🔧 fix(ci): 修复 Windows AMD64 下 DuckDB 驱动构建链路

- 将 DuckDB 工具链准备切换为优先使用 MSYS2
- 增加 gcc 和 g++ 存在性校验与版本验证
- 在 MSYS2 异常时回退 Chocolatey 安装 MinGW
- 保持 Windows ARM64 跳过 DuckDB 构建与平台支持一致

* 🔧 fix(ci): 修复 Windows AMD64 下 DuckDB 驱动构建链路

- 将 DuckDB 工具链准备切换为优先使用 MSYS2
- 增加 gcc 和 g++ 存在性校验与版本验证
- 在 MSYS2 异常时回退 Chocolatey 安装 MinGW
- 保持 Windows ARM64 跳过 DuckDB 构建与平台支持一致

* 🔧 fix(ci): 修复 Windows AMD64 下 DuckDB 驱动构建工具链

- 将 DuckDB 编译链从 MINGW64 切换为 MSYS2 UCRT64
- 修正 Windows AMD64 的 gcc 和 g++ 探测路径
- 增加 DuckDB 编译器版本校验步骤

* 📝 docs(contributing): 补充中英文贡献指南并统一 README 入口

- 新增英文版 CONTRIBUTING.md 作为正式贡献文档
- 新增中文版 CONTRIBUTING.zh-CN.md 作为中文贡献说明
- 调整 README 和 README.zh-CN 的贡献入口指向对应语言文档

* - feat(connection,metadata,kingbase): 增强多数据源连接能力并修复金仓/达梦/Oracle/ClickHouse兼容性问题 (#188) (#190)

* feat(http-tunnel): 支持独立 HTTP 隧道连接并覆盖多数据源

refs #168

* fix(kingbase-data-grid): 修复金仓打开表卡顿并降低对象渲染开销

refs #178

* fix(kingbase-transaction): 修复金仓事务提交重复引号导致语法错误

refs #176

* fix(driver-agent): 修复老版本 Win10 升级后金仓驱动代理启动失败

refs #177

* chore(ci): 新增手动触发的 macOS 测试构建工作流

* chore(ci): 允许测试工作流在当前分支自动触发

* fix(query-editor): 修复 SQL 编辑中光标随机跳到末尾 refs #185

* feat(data-sync): 增加差异 SQL 预览能力便于审核 refs #174

* fix(clickhouse-connect): 自动识别并回退 HTTP/Native 协议连接 refs #181

* fix(oracle-metadata): 修复视图与函数加载按 schema 过滤异常 refs #155

* fix(dameng-databases): 修复显示全部库时数据库列表不完整 refs #154

* fix(connection,db-list): 统一处理空列表返回并修复达梦连接测试报错 refs #157

Co-authored-by: 辣条 <69459608+tianqijiuyun-latiao@users.noreply.github.com>

*  feat(release-notes): 支持自动生成 Release 更新说明并区分配置文件命名

* 🔁 chore(sync): 回灌 main 到 dev (#192)

* - feat(connection,metadata,kingbase): 增强多数据源连接能力并修复金仓/达梦/Oracle/ClickHouse兼容性问题 (#188)

* feat(http-tunnel): 支持独立 HTTP 隧道连接并覆盖多数据源

refs #168

* fix(kingbase-data-grid): 修复金仓打开表卡顿并降低对象渲染开销

refs #178

* fix(kingbase-transaction): 修复金仓事务提交重复引号导致语法错误

refs #176

* fix(driver-agent): 修复老版本 Win10 升级后金仓驱动代理启动失败

refs #177

* chore(ci): 新增手动触发的 macOS 测试构建工作流

* chore(ci): 允许测试工作流在当前分支自动触发

* fix(query-editor): 修复 SQL 编辑中光标随机跳到末尾 refs #185

* feat(data-sync): 增加差异 SQL 预览能力便于审核 refs #174

* fix(clickhouse-connect): 自动识别并回退 HTTP/Native 协议连接 refs #181

* fix(oracle-metadata): 修复视图与函数加载按 schema 过滤异常 refs #155

* fix(dameng-databases): 修复显示全部库时数据库列表不完整 refs #154

* fix(connection,db-list): 统一处理空列表返回并修复达梦连接测试报错 refs #157

* Release/0.5.3 (#191)

---------

Co-authored-by: 辣条 <69459608+tianqijiuyun-latiao@users.noreply.github.com>
Co-authored-by: Syngnat <92659908+Syngnat@users.noreply.github.com>

* 🐛 fix(branch-sync): 修复 main 回灌 dev 时 mergeable 异步计算导致漏开自动合并

- 增加 mergeable 状态轮询,避免新建同步 PR 后立即返回 UNKNOWN
- 在合并状态未稳定时输出中文告警与执行摘要
- 保持冲突分支、待计算分支与自动合并分支的处理路径清晰

* 🔁 chore(sync): 回灌 main 到 dev (#195)

* - feat(connection,metadata,kingbase): 增强多数据源连接能力并修复金仓/达梦/Oracle/ClickHouse兼容性问题 (#188)

* feat(http-tunnel): 支持独立 HTTP 隧道连接并覆盖多数据源

refs #168

* fix(kingbase-data-grid): 修复金仓打开表卡顿并降低对象渲染开销

refs #178

* fix(kingbase-transaction): 修复金仓事务提交重复引号导致语法错误

refs #176

* fix(driver-agent): 修复老版本 Win10 升级后金仓驱动代理启动失败

refs #177

* chore(ci): 新增手动触发的 macOS 测试构建工作流

* chore(ci): 允许测试工作流在当前分支自动触发

* fix(query-editor): 修复 SQL 编辑中光标随机跳到末尾 refs #185

* feat(data-sync): 增加差异 SQL 预览能力便于审核 refs #174

* fix(clickhouse-connect): 自动识别并回退 HTTP/Native 协议连接 refs #181

* fix(oracle-metadata): 修复视图与函数加载按 schema 过滤异常 refs #155

* fix(dameng-databases): 修复显示全部库时数据库列表不完整 refs #154

* fix(connection,db-list): 统一处理空列表返回并修复达梦连接测试报错 refs #157

* Release/0.5.3 (#191)

* - chore(ci): 新增全平台测试包手动构建工作流 tianqijiuyun-latiao 今天 下午4:26 (#194)

* feat(http-tunnel): 支持独立 HTTP 隧道连接并覆盖多数据源

refs #168

* fix(kingbase-data-grid): 修复金仓打开表卡顿并降低对象渲染开销

refs #178

* fix(kingbase-transaction): 修复金仓事务提交重复引号导致语法错误

refs #176

* fix(driver-agent): 修复老版本 Win10 升级后金仓驱动代理启动失败

refs #177

* chore(ci): 新增手动触发的 macOS 测试构建工作流

* chore(ci): 允许测试工作流在当前分支自动触发

* fix(query-editor): 修复 SQL 编辑中光标随机跳到末尾 refs #185

* feat(data-sync): 增加差异 SQL 预览能力便于审核 refs #174

* fix(clickhouse-connect): 自动识别并回退 HTTP/Native 协议连接 refs #181

* fix(oracle-metadata): 修复视图与函数加载按 schema 过滤异常 refs #155

* fix(dameng-databases): 修复显示全部库时数据库列表不完整 refs #154

* fix(connection,db-list): 统一处理空列表返回并修复达梦连接测试报错 refs #157

* fix(kingbase): 补齐主键识别并优化宽表卡顿 refs #176 refs #178

* fix(query-execution): 支持带前置注释的读查询结果识别

* chore(ci): 新增全平台测试包手动构建工作流

---------

Co-authored-by: 辣条 <69459608+tianqijiuyun-latiao@users.noreply.github.com>
Co-authored-by: Syngnat <92659908+Syngnat@users.noreply.github.com>

* ♻️ refactor(frontend-sync): 优化桌面交互细节并移除 main 回灌 dev 自动化

- 优化新建连接、主题设置、侧边栏工具区与 SQL 日志的界面表现
- 调整分页、筛选、透明模式与弹窗样式,统一整体交互层次
- 收口外观参数生效逻辑并补齐多组件适配
- 删除 sync-main-to-dev 工作流并同步维护者手动回灌说明

* feat: 统一筛选条件逻辑按钮宽度 (#201)

* 🐛 fix(oracle-query): 修复 Oracle 表数据分页 SQL 兼容问题 refs #196 (#202)

*  feat(datasource): 支持 DuckDB Parquet 文件模式并优化弹窗打开链路

- 统一 DuckDB 文件库与 Parquet 文件接入能力
- 补充 URI、文件选择、只读挂载与连接缓存键处理
- 去掉数据源卡片点击前的同步驱动查询,修复打开卡顿

*  feat(datasource): 支持 DuckDB Parquet 文件模式并优化弹窗打开链路

- 统一 DuckDB 文件库与 Parquet 文件接入能力
- 补充 URI、文件选择、只读挂载与连接缓存键处理
- 去掉数据源卡片点击前的同步驱动查询,修复打开卡顿
- refs #166

* 🐛 fix(dameng): 修复达梦连接成功后数据库列表为空问题

- 调整达梦数据库列表获取策略,优先回退查询当前 schema 与当前用户
- 保留可见用户与 owner 聚合逻辑,兼容低权限账号场景
- 补充前端空列表提示与后端单元测试,降低排查成本
- close #203

*  feat(data-sync): 扩展跨库迁移链路并优化数据同步交互

- 统一同库同步与跨库迁移入口,补充模式区分与风险提示
- 扩展 ClickHouse 与 PG-like 双向迁移,并新增 PG-like、ClickHouse、TDengine 到 MongoDB 的迁移路由
- 完善 TDengine 目标端建表规划、回归测试与需求追踪文档
- refs #51

* 🐛 fix(connection): 修复新建连接时标签切换导致表单数据丢失

- 在 SSH 标签页测试连接时,基础信息的 host 回退为默认值 localhost
- 在基础信息标签页保存时,SSH 配置丢失
- 保存结果仅包含当前选中标签页的字段
- refs #208

* 🐛 fix(mongodb): 修复单机模式连接副本集实例时地址被替换为内网地址

- getURI 在 topology=single 时未设置 directConnection=true
- 驱动连接目标地址后自动跟随副本集成员发现,切换到 localhost:27017
- 在 mongodb_impl.go 和 mongodb_impl_v1.go 中添加 directConnection=true
- 仅在 topology 非 replica、无 replicaSet、非 SRV 时生效
- refs #205

* 🐛 fix(DataGrid): 修复虚拟滚动模式下右键菜单失效

- 行级和单元格级右键菜单的启用条件互斥,虚拟滚动模式下两者同时失效
- enableLargeResultOptimizedEditing 关闭了内联编辑但未回退启用行级菜单
- 修改 useContextMenuRow 和 enableRowContextMenu 条件,虚拟模式下启用行级菜单
- 更新 dataContextValue 的 useMemo 依赖数组
- refs #209

* 🐛 fix(sqlserver): 修复 SQL Server 查看表数据时分页语法和标识符引用错误

- quoteIdentPart 缺少 sqlserver 分支,标识符使用双引号而非 [bracket]
- buildPaginatedSelectSQL 增加 mssql 别名兜底,避免 dbType 变体导致走 default 分支
- 修复后标识符使用 [bracket],分页使用 OFFSET FETCH NEXT 语法
- refs #204

*  feat(DataGrid): 统一表格右键菜单交互体验

- 彻底移除功能较少的行级右键菜单 ContextMenuRow,统一使用功能更丰富的单元格右键菜单
- 优化虚拟滚动模式和只读模式下的渲染,支持触发单元格右键菜单
- 菜单展示自适应:在只读或不可修改数据的场景下自动隐藏「设置为 NULL」与「填充到选中行」等编辑项
- refs #209

* 🔧 fix(DataGrid): 默认开启虚拟滚动并修复多选单元格高亮失效问题

- 移除根据数据量和列数动态判断是否开启虚拟滚动的阈值限制,改为在表格视图下默认全量开启,彻底解决卡顿问题
- 修复 `updateCellSelection` 在查找坐标节点时硬编码 `td` 选择器的问题,改为精确匹配 `.ant-table-cell`,兼容虚拟滚动时的 `div` 渲染模式
- 修复因透明窗口特性导致的 `transparent !important` 把高亮样式强行覆盖的问题,拔高了多选状态下背景与边框 CSS 的优先级
- 解决单元格内外多重属性嵌套导致的高亮右侧留白现象,使得高亮框完全贴合表格单元格边缘
- 适配主题色响应(暗黑模式使用黄色深色高亮,白昼模式使用默认蓝色高亮)

---------

Co-authored-by: Syngnat <yangguofeng919@gmail.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: 辣条 <69459608+tianqijiuyun-latiao@users.noreply.github.com>
Co-authored-by: TSS <266256496+Zencok@users.noreply.github.com>
2026-03-10 11:26:02 +08:00

1190 lines
29 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//go:build gonavi_full_drivers || gonavi_mongodb_driver
package db
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/url"
"sort"
"strconv"
"strings"
"time"
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/logger"
proxytunnel "GoNavi-Wails/internal/proxy"
"GoNavi-Wails/internal/ssh"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
)
type MongoDB struct {
client *mongo.Client
database string
pingTimeout time.Duration
forwarder *ssh.LocalForwarder
}
type mongoProxyDialer struct {
proxyConfig connection.ProxyConfig
}
func (d *mongoProxyDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
return proxytunnel.DialContext(ctx, d.proxyConfig, network, address)
}
const defaultMongoPort = 27017
func normalizeMongoAddress(host string, port int) string {
h := strings.TrimSpace(host)
if h == "" {
h = "localhost"
}
p := port
if p <= 0 {
p = defaultMongoPort
}
return fmt.Sprintf("%s:%d", h, p)
}
func normalizeMongoSeed(raw string, defaultPort int, useSRV bool) (string, bool) {
host, port, ok := parseHostPortWithDefault(raw, defaultPort)
if !ok {
return "", false
}
if useSRV {
normalized := strings.TrimSpace(host)
if normalized == "" {
return "", false
}
return normalized, true
}
return normalizeMongoAddress(host, port), true
}
func collectMongoSeeds(config connection.ConnectionConfig) []string {
defaultPort := config.Port
if defaultPort <= 0 {
defaultPort = defaultMongoPort
}
useSRV := config.MongoSRV
candidates := make([]string, 0, len(config.Hosts)+1)
if len(config.Hosts) > 0 {
candidates = append(candidates, config.Hosts...)
} else {
if useSRV {
candidates = append(candidates, strings.TrimSpace(config.Host))
} else {
candidates = append(candidates, normalizeMongoAddress(config.Host, defaultPort))
}
}
result := make([]string, 0, len(candidates))
seen := make(map[string]struct{}, len(candidates))
for _, entry := range candidates {
normalized, ok := normalizeMongoSeed(entry, defaultPort, useSRV)
if !ok {
continue
}
if _, exists := seen[normalized]; exists {
continue
}
seen[normalized] = struct{}{}
result = append(result, normalized)
}
return result
}
func applyMongoURI(config connection.ConnectionConfig) connection.ConnectionConfig {
uriText := strings.TrimSpace(config.URI)
if uriText == "" {
return config
}
lowerURI := strings.ToLower(uriText)
if strings.HasPrefix(lowerURI, "mongodb+srv://") {
config.MongoSRV = true
}
if !strings.HasPrefix(lowerURI, "mongodb://") && !strings.HasPrefix(lowerURI, "mongodb+srv://") {
return config
}
parsed, err := url.Parse(uriText)
if err != nil {
return config
}
if parsed.User != nil {
if config.User == "" {
config.User = parsed.User.Username()
}
if pass, ok := parsed.User.Password(); ok && config.Password == "" {
config.Password = pass
}
}
if dbName := strings.TrimPrefix(parsed.Path, "/"); dbName != "" && config.Database == "" {
config.Database = dbName
}
defaultPort := config.Port
if defaultPort <= 0 {
defaultPort = defaultMongoPort
}
hostsFromURI := make([]string, 0, 4)
hostText := strings.TrimSpace(parsed.Host)
if hostText != "" {
for _, entry := range strings.Split(hostText, ",") {
normalized, ok := normalizeMongoSeed(entry, defaultPort, config.MongoSRV)
if ok {
hostsFromURI = append(hostsFromURI, normalized)
}
}
}
if len(config.Hosts) == 0 && len(hostsFromURI) > 0 {
config.Hosts = hostsFromURI
}
if strings.TrimSpace(config.Host) == "" && len(hostsFromURI) > 0 {
host, port, ok := parseHostPortWithDefault(hostsFromURI[0], defaultPort)
if ok {
config.Host = host
config.Port = port
}
}
query := parsed.Query()
if config.AuthSource == "" {
config.AuthSource = strings.TrimSpace(query.Get("authSource"))
}
if config.ReadPreference == "" {
config.ReadPreference = strings.TrimSpace(query.Get("readPreference"))
}
if config.ReplicaSet == "" {
config.ReplicaSet = strings.TrimSpace(query.Get("replicaSet"))
}
if config.MongoAuthMechanism == "" {
config.MongoAuthMechanism = strings.TrimSpace(query.Get("authMechanism"))
}
if config.Topology == "" {
if len(config.Hosts) > 1 || strings.TrimSpace(config.ReplicaSet) != "" {
config.Topology = "replica"
} else {
config.Topology = "single"
}
}
return config
}
func (m *MongoDB) getURI(config connection.ConnectionConfig) string {
if strings.TrimSpace(config.URI) != "" {
return strings.TrimSpace(config.URI)
}
seeds := collectMongoSeeds(config)
if len(seeds) == 0 {
if config.MongoSRV {
seed := strings.TrimSpace(config.Host)
if seed == "" {
seed = "localhost"
}
seeds = append(seeds, seed)
} else {
seeds = append(seeds, normalizeMongoAddress(config.Host, config.Port))
}
}
scheme := "mongodb"
if config.MongoSRV {
scheme = "mongodb+srv"
}
hostText := strings.Join(seeds, ",")
uri := fmt.Sprintf("%s://%s", scheme, hostText)
if config.User != "" {
var userinfo *url.Userinfo
if config.Password != "" {
userinfo = url.UserPassword(config.User, config.Password)
} else {
userinfo = url.User(config.User)
}
uri = fmt.Sprintf("%s://%s@%s", scheme, userinfo.String(), hostText)
}
path := "/"
if strings.TrimSpace(config.Database) != "" {
path = "/" + url.PathEscape(strings.TrimSpace(config.Database))
}
uri += path
params := url.Values{}
timeout := getConnectTimeoutSeconds(config)
params.Set("connectTimeoutMS", strconv.Itoa(timeout*1000))
params.Set("serverSelectionTimeoutMS", strconv.Itoa(timeout*1000))
authSource := strings.TrimSpace(config.AuthSource)
if authSource == "" && strings.TrimSpace(config.Database) != "" {
authSource = strings.TrimSpace(config.Database)
}
if authSource == "" {
authSource = "admin"
}
params.Set("authSource", authSource)
if replicaSet := strings.TrimSpace(config.ReplicaSet); replicaSet != "" {
params.Set("replicaSet", replicaSet)
}
if readPreference := strings.TrimSpace(config.ReadPreference); readPreference != "" {
params.Set("readPreference", readPreference)
}
if authMechanism := strings.TrimSpace(config.MongoAuthMechanism); authMechanism != "" {
params.Set("authMechanism", authMechanism)
}
// 单机模式且未指定副本集名称时,启用 directConnection 避免驱动自动跟随副本集成员发现
if strings.TrimSpace(config.Topology) != "replica" && strings.TrimSpace(config.ReplicaSet) == "" && !config.MongoSRV {
params.Set("directConnection", "true")
}
if encoded := params.Encode(); encoded != "" {
uri += "?" + encoded
}
return uri
}
func buildMongoAuthAttempts(config connection.ConnectionConfig) []connection.ConnectionConfig {
attempts := []connection.ConnectionConfig{config}
replicaUser := strings.TrimSpace(config.MongoReplicaUser)
if replicaUser == "" {
return attempts
}
if replicaUser == strings.TrimSpace(config.User) && config.MongoReplicaPassword == config.Password {
return attempts
}
replicaConfig := config
replicaConfig.URI = ""
replicaConfig.User = replicaUser
replicaConfig.Password = config.MongoReplicaPassword
attempts = append(attempts, replicaConfig)
return attempts
}
func (m *MongoDB) Connect(config connection.ConnectionConfig) error {
runConfig := applyMongoURI(config)
connectConfig := runConfig
if runConfig.UseSSH && runConfig.MongoSRV {
return fmt.Errorf("MongoDB SRV 记录模式暂不支持 SSH 隧道")
}
if runConfig.UseSSH {
seeds := collectMongoSeeds(runConfig)
if len(seeds) == 0 {
seeds = append(seeds, normalizeMongoAddress(runConfig.Host, runConfig.Port))
}
targetHost, targetPort, ok := parseHostPortWithDefault(seeds[0], defaultMongoPort)
if !ok {
return fmt.Errorf("MongoDB 连接失败:无效地址 %s", seeds[0])
}
logger.Infof("MongoDB 使用 SSH 连接:地址=%s:%d", targetHost, targetPort)
forwarder, err := ssh.GetOrCreateLocalForwarder(runConfig.SSH, targetHost, targetPort)
if err != nil {
return fmt.Errorf("创建 SSH 隧道失败:%w", err)
}
m.forwarder = forwarder
host, portStr, err := net.SplitHostPort(forwarder.LocalAddr)
if err != nil {
return fmt.Errorf("解析本地转发地址失败:%w", err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
return fmt.Errorf("解析本地端口失败:%w", err)
}
localConfig := runConfig
localConfig.Host = host
localConfig.Port = port
localConfig.UseSSH = false
localConfig.URI = ""
localConfig.Hosts = []string{normalizeMongoAddress(host, port)}
connectConfig = localConfig
logger.Infof("MongoDB 通过本地端口转发连接:%s -> %s:%d", forwarder.LocalAddr, targetHost, targetPort)
}
m.pingTimeout = getConnectTimeout(connectConfig)
m.database = connectConfig.Database
if m.database == "" {
m.database = "admin"
}
sslAttempts := []connection.ConnectionConfig{connectConfig}
if shouldTrySSLPreferredFallback(connectConfig) {
sslAttempts = append(sslAttempts, withSSLDisabled(connectConfig))
}
var errorDetails []string
for sslIndex, sslConfig := range sslAttempts {
sslLabel := "SSL"
if sslIndex > 0 {
sslLabel = "明文回退"
}
attemptConfigs := buildMongoAuthAttempts(sslConfig)
for index, attemptConfig := range attemptConfigs {
authLabel := "主库凭据"
if index > 0 {
authLabel = "从库凭据"
}
if sslIndex > 0 {
attemptConfig.URI = ""
}
uri := m.getURI(attemptConfig)
clientOpts := options.Client().ApplyURI(uri)
tlsEnabled, tlsInsecure := resolveMongoTLSSettings(attemptConfig)
if tlsEnabled {
clientOpts.SetTLSConfig(&tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: tlsInsecure,
})
}
if attemptConfig.UseProxy {
clientOpts.SetDialer(&mongoProxyDialer{proxyConfig: attemptConfig.Proxy})
}
client, err := mongo.Connect(clientOpts)
if err != nil {
errorDetails = append(errorDetails, fmt.Sprintf("%s %s连接失败: %v", sslLabel, authLabel, err))
continue
}
m.client = client
if err := m.Ping(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
_ = client.Disconnect(ctx)
cancel()
m.client = nil
errorDetails = append(errorDetails, fmt.Sprintf("%s %s验证失败: %v", sslLabel, authLabel, err))
continue
}
if sslIndex > 0 {
logger.Warnf("MongoDB SSL 优先连接失败,已回退至明文连接")
}
return nil
}
}
if len(errorDetails) > 0 {
return fmt.Errorf("MongoDB 连接失败:%s", strings.Join(errorDetails, ""))
}
return fmt.Errorf("MongoDB 连接失败:无可用连接方案")
}
func (m *MongoDB) Close() error {
if m.forwarder != nil {
if err := m.forwarder.Close(); err != nil {
logger.Warnf("关闭 MongoDB SSH 端口转发失败:%v", err)
}
m.forwarder = nil
}
if m.client != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return m.client.Disconnect(ctx)
}
return nil
}
func (m *MongoDB) Ping() error {
if m.client == nil {
return fmt.Errorf("connection not open")
}
timeout := m.pingTimeout
if timeout <= 0 {
timeout = 5 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return m.client.Ping(ctx, readpref.Primary())
}
func asMongoStringList(raw interface{}) []string {
values, ok := raw.(bson.A)
if !ok {
return nil
}
result := make([]string, 0, len(values))
for _, entry := range values {
text := strings.TrimSpace(fmt.Sprintf("%v", entry))
if text != "" {
result = append(result, text)
}
}
return result
}
func asMongoString(raw interface{}) string {
if raw == nil {
return ""
}
if value, ok := raw.(string); ok {
return strings.TrimSpace(value)
}
return strings.TrimSpace(fmt.Sprintf("%v", raw))
}
func asMongoInt(raw interface{}) int {
switch value := raw.(type) {
case int:
return value
case int32:
return int(value)
case int64:
return int(value)
case float32:
return int(value)
case float64:
return int(value)
default:
return 0
}
}
func asMongoBool(raw interface{}) bool {
switch value := raw.(type) {
case bool:
return value
case int:
return value != 0
case int32:
return value != 0
case int64:
return value != 0
case float32:
return value != 0
case float64:
return value != 0
default:
return false
}
}
func asMongoInt64(raw interface{}) int64 {
switch value := raw.(type) {
case int:
return int64(value)
case int32:
return int64(value)
case int64:
return value
case float32:
return int64(value)
case float64:
return int64(value)
default:
return 0
}
}
func mongoStateByCode(code int) string {
switch code {
case 1:
return "PRIMARY"
case 2:
return "SECONDARY"
case 3:
return "RECOVERING"
case 5:
return "STARTUP2"
case 6:
return "UNKNOWN"
case 7:
return "ARBITER"
case 8:
return "DOWN"
case 9:
return "ROLLBACK"
case 10:
return "REMOVED"
default:
return "UNKNOWN"
}
}
func normalizeMongoStateLabel(state string, stateCode int) string {
normalized := strings.ToUpper(strings.TrimSpace(state))
if normalized != "" {
return normalized
}
return mongoStateByCode(stateCode)
}
func buildMembersFromReplStatus(raw bson.M) []connection.MongoMemberInfo {
items, ok := raw["members"].(bson.A)
if !ok {
return nil
}
members := make([]connection.MongoMemberInfo, 0, len(items))
for _, entry := range items {
member, ok := entry.(bson.M)
if !ok {
continue
}
host := asMongoString(member["name"])
if host == "" {
continue
}
stateCode := asMongoInt(member["state"])
state := normalizeMongoStateLabel(asMongoString(member["stateStr"]), stateCode)
members = append(members, connection.MongoMemberInfo{
Host: host,
Role: state,
State: state,
StateCode: stateCode,
Healthy: asMongoInt(member["health"]) > 0 || asMongoBool(member["health"]),
IsSelf: asMongoBool(member["self"]),
})
}
sort.Slice(members, func(i, j int) bool {
return members[i].Host < members[j].Host
})
return members
}
func buildMembersFromHello(raw bson.M) []connection.MongoMemberInfo {
hosts := asMongoStringList(raw["hosts"])
if len(hosts) == 0 {
return nil
}
primary := asMongoString(raw["primary"])
selfHost := asMongoString(raw["me"])
passiveSet := make(map[string]struct{})
for _, host := range asMongoStringList(raw["passives"]) {
passiveSet[host] = struct{}{}
}
arbiterSet := make(map[string]struct{})
for _, host := range asMongoStringList(raw["arbiters"]) {
arbiterSet[host] = struct{}{}
}
members := make([]connection.MongoMemberInfo, 0, len(hosts))
for _, host := range hosts {
state := "SECONDARY"
stateCode := 2
if host == primary {
state = "PRIMARY"
stateCode = 1
} else if _, ok := arbiterSet[host]; ok {
state = "ARBITER"
stateCode = 7
} else if _, ok := passiveSet[host]; ok {
state = "PASSIVE"
stateCode = 6
}
members = append(members, connection.MongoMemberInfo{
Host: host,
Role: state,
State: state,
StateCode: stateCode,
Healthy: true,
IsSelf: host == selfHost,
})
}
sort.Slice(members, func(i, j int) bool {
return members[i].Host < members[j].Host
})
return members
}
func (m *MongoDB) DiscoverMembers() (string, []connection.MongoMemberInfo, error) {
if m.client == nil {
return "", nil, fmt.Errorf("connection not open")
}
timeout := m.pingTimeout
if timeout <= 0 {
timeout = 10 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminDB := m.client.Database("admin")
var replStatus bson.M
replErr := adminDB.RunCommand(ctx, bson.D{{Key: "replSetGetStatus", Value: 1}}).Decode(&replStatus)
if replErr == nil {
replicaSet := asMongoString(replStatus["set"])
members := buildMembersFromReplStatus(replStatus)
if len(members) > 0 {
return replicaSet, members, nil
}
}
var helloResult bson.M
helloErr := adminDB.RunCommand(ctx, bson.D{{Key: "hello", Value: 1}}).Decode(&helloResult)
if helloErr != nil {
if err := adminDB.RunCommand(ctx, bson.D{{Key: "isMaster", Value: 1}}).Decode(&helloResult); err != nil {
if replErr != nil {
return "", nil, fmt.Errorf("成员发现失败replSetGetStatus=%vhello=%v", replErr, err)
}
return "", nil, fmt.Errorf("成员发现失败hello=%w", err)
}
}
replicaSet := asMongoString(helloResult["setName"])
members := buildMembersFromHello(helloResult)
if len(members) == 0 {
if replErr != nil {
return replicaSet, nil, fmt.Errorf("未获取到成员信息replSetGetStatus=%v", replErr)
}
return replicaSet, nil, fmt.Errorf("未获取到成员信息")
}
return replicaSet, members, nil
}
// Query executes a MongoDB command and returns results
// Supports JSON format commands like: {"find": "collection", "filter": {}}
func (m *MongoDB) Query(query string) ([]map[string]interface{}, []string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return m.queryWithContext(ctx, query)
}
// QueryContext executes a MongoDB command with the given context for timeout control
func (m *MongoDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
return m.queryWithContext(ctx, query)
}
// sqlToMongoFind 将前端生成的简单 SQL 转换为 MongoDB find 命令 JSON。
// 支持SELECT * FROM "coll" LIMIT n OFFSET m / SELECT COUNT(*) as total FROM "coll"
func sqlToMongoFind(sql string) (string, bool) {
lower := strings.ToLower(strings.TrimSpace(sql))
// SELECT COUNT(*) as total FROM "coll" ...
if strings.HasPrefix(lower, "select count(") {
coll := extractCollectionFromSQL(sql)
if coll == "" {
return "", false
}
return fmt.Sprintf(`{"count":"%s","query":{}}`, coll), true
}
// SELECT * FROM "coll" ... LIMIT n OFFSET m
if !strings.HasPrefix(lower, "select") {
return "", false
}
coll := extractCollectionFromSQL(sql)
if coll == "" {
return "", false
}
limit := int64(0)
skip := int64(0)
// 提取 LIMIT
if idx := strings.Index(lower, "limit "); idx >= 0 {
after := strings.TrimSpace(lower[idx+6:])
parts := strings.Fields(after)
if len(parts) > 0 {
if n, err := strconv.ParseInt(parts[0], 10, 64); err == nil {
limit = n
}
}
}
// 提取 OFFSET
if idx := strings.Index(lower, "offset "); idx >= 0 {
after := strings.TrimSpace(lower[idx+7:])
parts := strings.Fields(after)
if len(parts) > 0 {
if n, err := strconv.ParseInt(parts[0], 10, 64); err == nil {
skip = n
}
}
}
cmd := fmt.Sprintf(`{"find":"%s","filter":{}`, coll)
if limit > 0 {
cmd += fmt.Sprintf(`,"limit":%d`, limit)
}
if skip > 0 {
cmd += fmt.Sprintf(`,"skip":%d`, skip)
}
cmd += "}"
return cmd, true
}
// extractCollectionFromSQL 从 SQL 中提取 FROM 后的 collection 名称。
func extractCollectionFromSQL(sql string) string {
lower := strings.ToLower(sql)
idx := strings.Index(lower, "from ")
if idx < 0 {
return ""
}
after := strings.TrimSpace(sql[idx+5:])
// 去掉引号包裹
var coll string
if len(after) > 0 && after[0] == '"' {
end := strings.Index(after[1:], "\"")
if end < 0 {
return ""
}
coll = after[1 : end+1]
} else if len(after) > 0 && after[0] == '`' {
end := strings.Index(after[1:], "`")
if end < 0 {
return ""
}
coll = after[1 : end+1]
} else {
parts := strings.Fields(after)
if len(parts) == 0 {
return ""
}
coll = parts[0]
}
return strings.TrimSpace(coll)
}
func (m *MongoDB) queryWithContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
if m.client == nil {
return nil, nil, fmt.Errorf("connection not open")
}
query = strings.TrimSpace(query)
if query == "" {
return nil, nil, fmt.Errorf("empty query")
}
// 如果输入是 SQL 语句(前端 DataViewer 统一生成),自动转换为 MongoDB JSON 命令
lowerQuery := strings.ToLower(query)
if strings.HasPrefix(lowerQuery, "select") || strings.HasPrefix(lowerQuery, "show") {
if converted, ok := sqlToMongoFind(query); ok {
query = converted
}
}
// Parse JSON command
var cmd bson.D
if err := bson.UnmarshalExtJSON([]byte(query), true, &cmd); err != nil {
return nil, nil, fmt.Errorf("invalid JSON command: %w", err)
}
// 对 find 和 count 命令使用原生 driver API避免 RunCommand 的 firstBatch 限制
if len(cmd) > 0 {
switch cmd[0].Key {
case "find":
return m.execFind(ctx, cmd)
case "count":
return m.execCount(ctx, cmd)
}
}
// 其他命令走 RunCommand
db := m.client.Database(m.database)
var result bson.M
if err := db.RunCommand(ctx, cmd).Decode(&result); err != nil {
return nil, nil, err
}
// Handle COUNT result (e.g. delete/update returns "n")
if n, ok := result["n"]; ok {
if _, hasCursor := result["cursor"]; !hasCursor {
return []map[string]interface{}{{"total": n}}, []string{"total"}, nil
}
}
// Convert result to standard format
data := []map[string]interface{}{{"result": result}}
columns := []string{"result"}
// If result contains cursor with documents, extract them
if cursor, ok := result["cursor"].(bson.M); ok {
if batch, ok := cursor["firstBatch"].(bson.A); ok {
data = make([]map[string]interface{}, 0, len(batch))
columnSet := make(map[string]bool)
for _, doc := range batch {
if docMap, ok := doc.(bson.M); ok {
row := make(map[string]interface{})
for k, v := range docMap {
row[k] = v
columnSet[k] = true
}
data = append(data, row)
}
}
columns = make([]string, 0, len(columnSet))
for k := range columnSet {
columns = append(columns, k)
}
}
}
return data, columns, nil
}
// execFind 使用原生 Collection.Find() 执行查询,正确处理游标迭代
func (m *MongoDB) execFind(ctx context.Context, cmd bson.D) ([]map[string]interface{}, []string, error) {
var collName string
var filter interface{}
var limit int64
var skip int64
var sortDoc interface{}
var projection interface{}
for _, elem := range cmd {
switch elem.Key {
case "find":
collName = fmt.Sprintf("%v", elem.Value)
case "filter":
filter = elem.Value
case "limit":
limit = asMongoInt64(elem.Value)
case "skip":
skip = asMongoInt64(elem.Value)
case "sort":
sortDoc = elem.Value
case "projection":
projection = elem.Value
}
}
if collName == "" {
return nil, nil, fmt.Errorf("find command missing collection name")
}
if filter == nil {
filter = bson.D{}
}
collection := m.client.Database(m.database).Collection(collName)
opts := options.Find()
if limit > 0 {
opts.SetLimit(limit)
}
if skip > 0 {
opts.SetSkip(skip)
}
if sortDoc != nil {
opts.SetSort(sortDoc)
}
if projection != nil {
opts.SetProjection(projection)
}
cursor, err := collection.Find(ctx, filter, opts)
if err != nil {
return nil, nil, err
}
defer cursor.Close(ctx)
var data []map[string]interface{}
columnSet := make(map[string]bool)
for cursor.Next(ctx) {
var doc bson.M
if err := cursor.Decode(&doc); err != nil {
continue
}
row := make(map[string]interface{})
for k, v := range doc {
row[k] = convertBsonValue(v)
columnSet[k] = true
}
data = append(data, row)
}
if err := cursor.Err(); err != nil {
return nil, nil, err
}
columns := make([]string, 0, len(columnSet))
for k := range columnSet {
columns = append(columns, k)
}
sort.Strings(columns)
// 将 _id 列置首
for i, col := range columns {
if col == "_id" && i > 0 {
columns = append(columns[:i], columns[i+1:]...)
columns = append([]string{"_id"}, columns...)
break
}
}
return data, columns, nil
}
// execCount 使用原生 Collection.CountDocuments() 执行计数
func (m *MongoDB) execCount(ctx context.Context, cmd bson.D) ([]map[string]interface{}, []string, error) {
var collName string
var filter interface{}
for _, elem := range cmd {
switch elem.Key {
case "count":
collName = fmt.Sprintf("%v", elem.Value)
case "query":
filter = elem.Value
}
}
if collName == "" {
return nil, nil, fmt.Errorf("count command missing collection name")
}
if filter == nil {
filter = bson.D{}
}
collection := m.client.Database(m.database).Collection(collName)
n, err := collection.CountDocuments(ctx, filter)
if err != nil {
return nil, nil, err
}
return []map[string]interface{}{{"total": n}}, []string{"total"}, nil
}
// convertBsonValue 将 BSON 特殊类型转换为前端可读的 JSON 友好值
func convertBsonValue(v interface{}) interface{} {
switch val := v.(type) {
case bson.ObjectID:
return val.Hex()
case bson.M:
result := make(map[string]interface{}, len(val))
for k, v2 := range val {
result[k] = convertBsonValue(v2)
}
return result
case bson.D:
result := make(map[string]interface{}, len(val))
for _, elem := range val {
result[elem.Key] = convertBsonValue(elem.Value)
}
return result
case bson.A:
result := make([]interface{}, len(val))
for i, v2 := range val {
result[i] = convertBsonValue(v2)
}
return result
default:
return v
}
}
func (m *MongoDB) Exec(query string) (int64, error) {
_, _, err := m.Query(query)
if err != nil {
return 0, err
}
return 1, nil
}
// ExecContext executes a MongoDB command with the given context for timeout control
func (m *MongoDB) ExecContext(ctx context.Context, query string) (int64, error) {
_, _, err := m.QueryContext(ctx, query)
if err != nil {
return 0, err
}
return 1, nil
}
func (m *MongoDB) GetDatabases() ([]string, error) {
if m.client == nil {
return nil, fmt.Errorf("connection not open")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
dbs, err := m.client.ListDatabaseNames(ctx, bson.M{})
if err != nil {
return nil, err
}
return dbs, nil
}
func (m *MongoDB) GetTables(dbName string) ([]string, error) {
if m.client == nil {
return nil, fmt.Errorf("connection not open")
}
targetDB := dbName
if targetDB == "" {
targetDB = m.database
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
collections, err := m.client.Database(targetDB).ListCollectionNames(ctx, bson.M{})
if err != nil {
return nil, err
}
return collections, nil
}
func (m *MongoDB) GetCreateStatement(dbName, tableName string) (string, error) {
return fmt.Sprintf("// MongoDB collection: %s.%s\n// MongoDB is schemaless - no CREATE statement available", dbName, tableName), nil
}
// GetColumns returns empty for MongoDB (schemaless)
func (m *MongoDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) {
// MongoDB is schemaless, return empty
return []connection.ColumnDefinition{}, nil
}
// GetAllColumns returns empty for MongoDB (schemaless)
func (m *MongoDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) {
return []connection.ColumnDefinitionWithTable{}, nil
}
// GetIndexes returns indexes for a MongoDB collection
func (m *MongoDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) {
if m.client == nil {
return nil, fmt.Errorf("connection not open")
}
targetDB := dbName
if targetDB == "" {
targetDB = m.database
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
collection := m.client.Database(targetDB).Collection(tableName)
cursor, err := collection.Indexes().List(ctx)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var indexes []connection.IndexDefinition
for cursor.Next(ctx) {
var idx bson.M
if err := cursor.Decode(&idx); err != nil {
continue
}
name := fmt.Sprintf("%v", idx["name"])
unique := false
if u, ok := idx["unique"].(bool); ok {
unique = u
}
// Extract key fields
if key, ok := idx["key"].(bson.M); ok {
seq := 1
for field := range key {
nonUnique := 1
if unique {
nonUnique = 0
}
indexes = append(indexes, connection.IndexDefinition{
Name: name,
ColumnName: field,
NonUnique: nonUnique,
SeqInIndex: seq,
IndexType: "BTREE",
})
seq++
}
}
}
return indexes, nil
}
func (m *MongoDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) {
// MongoDB doesn't have foreign keys
return []connection.ForeignKeyDefinition{}, nil
}
func (m *MongoDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) {
// MongoDB doesn't have triggers in the traditional sense
return []connection.TriggerDefinition{}, nil
}
// ApplyChanges implements batch changes for MongoDB
func (m *MongoDB) ApplyChanges(tableName string, changes connection.ChangeSet) error {
if m.client == nil {
return fmt.Errorf("connection not open")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
collection := m.client.Database(m.database).Collection(tableName)
// Process deletes
for _, pk := range changes.Deletes {
filter := bson.M{}
for k, v := range pk {
filter[k] = v
}
if len(filter) > 0 {
if _, err := collection.DeleteOne(ctx, filter); err != nil {
return fmt.Errorf("delete error: %v", err)
}
}
}
// Process updates
for _, update := range changes.Updates {
filter := bson.M{}
for k, v := range update.Keys {
filter[k] = v
}
if len(filter) == 0 {
return fmt.Errorf("update requires keys")
}
updateDoc := bson.M{"$set": bson.M{}}
for k, v := range update.Values {
updateDoc["$set"].(bson.M)[k] = v
}
if _, err := collection.UpdateOne(ctx, filter, updateDoc); err != nil {
return fmt.Errorf("update error: %v", err)
}
}
// Process inserts
for _, row := range changes.Inserts {
doc := bson.M{}
for k, v := range row {
doc[k] = v
}
if len(doc) > 0 {
if _, err := collection.InsertOne(ctx, doc); err != nil {
return fmt.Errorf("insert error: %v", err)
}
}
}
return nil
}