From f584a0802a2b391decbf72e4d8101a430d914195 Mon Sep 17 00:00:00 2001 From: Wu Qing <3184394176@qq.com> Date: Wed, 27 May 2026 18:35:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(backup):=20=E6=96=B0=E5=A2=9E=20MongoDB=20?= =?UTF-8?q?=E5=A4=87=E4=BB=BD=E4=B8=8E=E6=81=A2=E5=A4=8D=E6=94=AF=E6=8C=81?= =?UTF-8?q?=20(#87)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 通过 mongodump/mongorestore --archive 流式管线接入 MongoDB 数据源,与现有数据库运行器架构一致;注册到 Master 与 Agent,含任务校验、默认端口与前端表单/恢复确认。5 个单测覆盖参数构造、全库、空产物与缺工具分支。 --- server/internal/agent/executor.go | 1 + server/internal/app/app.go | 2 +- server/internal/backup/mongodb_runner.go | 119 ++++++++++++++++++ server/internal/backup/mongodb_runner_test.go | 102 +++++++++++++++ server/internal/model/backup_task.go | 1 + .../internal/service/backup_task_service.go | 4 +- .../backup-tasks/BackupTaskFormDrawer.tsx | 10 +- .../components/backup-tasks/field-config.ts | 7 +- .../restore-records/RestoreConfirmModal.tsx | 2 +- web/src/types/backup-tasks.ts | 2 +- 10 files changed, 239 insertions(+), 11 deletions(-) create mode 100644 server/internal/backup/mongodb_runner.go create mode 100644 server/internal/backup/mongodb_runner_test.go diff --git a/server/internal/agent/executor.go b/server/internal/agent/executor.go index 24d606d..d782519 100644 --- a/server/internal/agent/executor.go +++ b/server/internal/agent/executor.go @@ -34,6 +34,7 @@ func NewExecutor(client *MasterClient, tempDir string) *Executor { backup.NewMySQLRunner(nil), backup.NewPostgreSQLRunner(nil), backup.NewSAPHANARunner(nil), + backup.NewMongoDBRunner(nil), ) storageRegistry := storage.NewRegistry( storageRclone.NewLocalDiskFactory(), diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 2795778..88deb5f 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -82,7 +82,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, backupTaskService := service.NewBackupTaskService(backupTaskRepo, storageTargetRepo, configCipher) backupTaskService.SetRecordsAndStorage(backupRecordRepo, storageRegistry) // nodeRepo 在下方 Cluster 节点管理区块才实例化,这里延后注入 - backupRunnerRegistry := backup.NewRegistry(backup.NewFileRunner(), backup.NewSQLiteRunner(), backup.NewMySQLRunner(nil), backup.NewPostgreSQLRunner(nil), backup.NewSAPHANARunner(nil)) + backupRunnerRegistry := backup.NewRegistry(backup.NewFileRunner(), backup.NewSQLiteRunner(), backup.NewMySQLRunner(nil), backup.NewPostgreSQLRunner(nil), backup.NewSAPHANARunner(nil), backup.NewMongoDBRunner(nil)) logHub := backup.NewLogHub() retentionService := backupretention.NewService(backupRecordRepo) notifyRegistry := notify.NewRegistry(notify.NewEmailNotifier(), notify.NewWebhookNotifier(), notify.NewTelegramNotifier()) diff --git a/server/internal/backup/mongodb_runner.go b/server/internal/backup/mongodb_runner.go new file mode 100644 index 0000000..12a99c9 --- /dev/null +++ b/server/internal/backup/mongodb_runner.go @@ -0,0 +1,119 @@ +package backup + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +// MongoDBRunner 通过 mongodump/mongorestore 备份与恢复 MongoDB 数据库。 +// 采用 --archive 流式模式(dump 写 stdout、restore 读 stdin),与 MySQLRunner +// 的 mysqldump/mysql 管线保持一致;产物为未压缩的 mongo archive,由备份管线统一压缩/加密。 +type MongoDBRunner struct { + executor CommandExecutor +} + +func NewMongoDBRunner(executor CommandExecutor) *MongoDBRunner { + if executor == nil { + executor = NewOSCommandExecutor() + } + return &MongoDBRunner{executor: executor} +} + +func (r *MongoDBRunner) Type() string { + return "mongodb" +} + +func (r *MongoDBRunner) Run(ctx context.Context, task TaskSpec, writer LogWriter) (*RunResult, error) { + if _, err := r.executor.LookPath("mongodump"); err != nil { + return nil, fmt.Errorf("未找到 mongodump 命令 (请确保服务器已安装 mongodb-database-tools)") + } + startedAt := task.StartedAt + if startedAt.IsZero() { + startedAt = time.Now().UTC() + } + tempDir, err := CreateTaskTempDir(task.Name, startedAt) + if err != nil { + return nil, err + } + fileName := BuildArtifactName(task.Name, startedAt, "archive") + artifactPath := filepath.Join(tempDir, fileName) + file, err := os.Create(artifactPath) + if err != nil { + return nil, fmt.Errorf("create mongodump archive file: %w", err) + } + defer file.Close() + + args := mongoConnArgs(task.Database) + dbNames := normalizeDatabaseNames(task.Database.Names) + if len(dbNames) == 1 { + args = append(args, "--db", dbNames[0]) + writer.WriteLine(fmt.Sprintf("备份数据库: %s", dbNames[0])) + } else { + writer.WriteLine("备份全部数据库") + } + args = append(args, "--archive") // 归档流式写入 stdout + + writer.WriteLine(fmt.Sprintf("连接到 MongoDB: %s:%d", task.Database.Host, task.Database.Port)) + stderrWriter := newLogLineWriter(writer, "mongodump") + writer.WriteLine("开始执行 mongodump") + if err := r.executor.Run(ctx, "mongodump", args, CommandOptions{Stdout: file, Stderr: stderrWriter}); err != nil { + return nil, fmt.Errorf("run mongodump: %w: %s", err, stderrWriter.collected()) + } + info, err := file.Stat() + if err != nil { + return nil, fmt.Errorf("stat mongodump archive: %w", err) + } + if info.Size() == 0 { + return nil, fmt.Errorf("mongodump 产物为空,请检查数据库连接与权限") + } + writer.WriteLine(fmt.Sprintf("MongoDB 导出完成(文件大小: %s)", formatFileSize(info.Size()))) + return &RunResult{ArtifactPath: artifactPath, FileName: fileName, TempDir: tempDir, Size: info.Size(), StorageKey: BuildStorageKey("mongodb", startedAt, fileName)}, nil +} + +func (r *MongoDBRunner) Restore(ctx context.Context, task TaskSpec, artifactPath string, writer LogWriter) error { + if _, err := r.executor.LookPath("mongorestore"); err != nil { + return fmt.Errorf("未找到 mongorestore 命令 (请确保服务器已安装 mongodb-database-tools)") + } + input, err := os.Open(filepath.Clean(artifactPath)) + if err != nil { + return fmt.Errorf("open mongodb restore archive: %w", err) + } + defer input.Close() + + args := mongoConnArgs(task.Database) + // --drop:恢复前删除同名集合,保证恢复后与归档一致(与 mysql 恢复的整库覆盖语义对齐)。 + args = append(args, "--drop", "--archive") + stderr := &bytes.Buffer{} + writer.WriteLine("开始执行 mongorestore") + if err := r.executor.Run(ctx, "mongorestore", args, CommandOptions{Stdin: input, Stderr: stderr}); err != nil { + return fmt.Errorf("run mongorestore: %w: %s", err, strings.TrimSpace(stderr.String())) + } + writer.WriteLine("MongoDB 恢复完成") + return nil +} + +// mongoConnArgs 构造 mongodump/mongorestore 的连接与认证参数。 +// 注意:mongodb-database-tools 无类似 MYSQL_PWD 的密码环境变量,密码只能经 --password 传入; +// 认证库默认 admin(绝大多数部署的管理账号所在库)。 +func mongoConnArgs(db DatabaseSpec) []string { + args := make([]string, 0, 8) + if strings.TrimSpace(db.Host) != "" { + args = append(args, "--host", db.Host) + } + if db.Port > 0 { + args = append(args, "--port", strconv.Itoa(db.Port)) + } + if strings.TrimSpace(db.User) != "" { + args = append(args, "--username", db.User, "--authenticationDatabase", "admin") + if strings.TrimSpace(db.Password) != "" { + args = append(args, "--password", db.Password) + } + } + return args +} diff --git a/server/internal/backup/mongodb_runner_test.go b/server/internal/backup/mongodb_runner_test.go new file mode 100644 index 0000000..1fdd0ef --- /dev/null +++ b/server/internal/backup/mongodb_runner_test.go @@ -0,0 +1,102 @@ +package backup + +import ( + "context" + "errors" + "io" + "os" + "strings" + "testing" +) + +func argIndex(args []string, target string) int { + for i, a := range args { + if a == target { + return i + } + } + return -1 +} + +func TestMongoDBRunnerRunUsesMongodump(t *testing.T) { + executor := &fakeCommandExecutor{runFunc: func(name string, args []string, options CommandOptions) error { + if options.Stdout != nil { + _, _ = io.WriteString(options.Stdout, "mongo archive bytes") + } + return nil + }} + runner := NewMongoDBRunner(executor) + result, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, User: "admin", Password: "secret", Names: []string{"app"}}}, NopLogWriter{}) + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + if executor.lastName != "mongodump" { + t.Fatalf("expected mongodump, got %s", executor.lastName) + } + args := executor.lastArgs + if argIndex(args, "--archive") < 0 { + t.Fatalf("expected --archive flag, got %#v", args) + } + if i := argIndex(args, "--db"); i < 0 || i+1 >= len(args) || args[i+1] != "app" { + t.Fatalf("expected --db app, got %#v", args) + } + if i := argIndex(args, "--username"); i < 0 || args[i+1] != "admin" { + t.Fatalf("expected --username admin, got %#v", args) + } + if argIndex(args, "--authenticationDatabase") < 0 || argIndex(args, "--password") < 0 { + t.Fatalf("expected auth args, got %#v", args) + } + if _, err := os.Stat(result.ArtifactPath); err != nil { + t.Fatalf("artifact file missing: %v", err) + } + if result.StorageKey == "" || !strings.HasSuffix(result.FileName, ".archive") { + t.Fatalf("unexpected result metadata: %#v", result) + } +} + +func TestMongoDBRunnerRunBackupsAllWhenNoDatabase(t *testing.T) { + executor := &fakeCommandExecutor{runFunc: func(name string, args []string, options CommandOptions) error { + _, _ = io.WriteString(options.Stdout, "all dbs") + return nil + }} + runner := NewMongoDBRunner(executor) + _, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017}}, NopLogWriter{}) + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + if argIndex(executor.lastArgs, "--db") >= 0 { + t.Fatalf("expected no --db when backing up all databases, got %#v", executor.lastArgs) + } +} + +func TestMongoDBRunnerRunRejectsEmptyOutput(t *testing.T) { + executor := &fakeCommandExecutor{} // runFunc nil → writes nothing + runner := NewMongoDBRunner(executor) + _, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, Names: []string{"app"}}}, NopLogWriter{}) + if err == nil { + t.Fatal("expected error for empty mongodump output") + } +} + +func TestMongoDBRunnerRestoreUsesMongorestore(t *testing.T) { + executor := &fakeCommandExecutor{} + runner := NewMongoDBRunner(executor) + artifact := filepathJoinTempFile(t, "dump.archive", "mongo archive bytes") + if err := runner.Restore(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, User: "admin", Password: "secret"}}, artifact, NopLogWriter{}); err != nil { + t.Fatalf("Restore returned error: %v", err) + } + if executor.lastName != "mongorestore" { + t.Fatalf("expected mongorestore, got %s", executor.lastName) + } + if argIndex(executor.lastArgs, "--drop") < 0 || argIndex(executor.lastArgs, "--archive") < 0 { + t.Fatalf("expected --drop --archive, got %#v", executor.lastArgs) + } +} + +func TestMongoDBRunnerRunReturnsLookupError(t *testing.T) { + runner := NewMongoDBRunner(&fakeCommandExecutor{lookupErr: errors.New("missing")}) + _, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, Names: []string{"app"}}}, NopLogWriter{}) + if err == nil { + t.Fatal("expected error when mongodump is missing") + } +} diff --git a/server/internal/model/backup_task.go b/server/internal/model/backup_task.go index 5305ed9..78945a5 100644 --- a/server/internal/model/backup_task.go +++ b/server/internal/model/backup_task.go @@ -8,6 +8,7 @@ const ( BackupTaskTypeSQLite = "sqlite" BackupTaskTypePostgreSQL = "postgresql" BackupTaskTypeSAPHANA = "saphana" + BackupTaskTypeMongoDB = "mongodb" ) const ( diff --git a/server/internal/service/backup_task_service.go b/server/internal/service/backup_task_service.go index 4b31711..24b4fdc 100644 --- a/server/internal/service/backup_task_service.go +++ b/server/internal/service/backup_task_service.go @@ -21,7 +21,7 @@ const backupTaskMaskedValue = "********" type BackupTaskUpsertInput struct { Name string `json:"name" binding:"required,min=1,max=100"` - Type string `json:"type" binding:"required,oneof=file mysql sqlite postgresql pgsql saphana"` + Type string `json:"type" binding:"required,oneof=file mysql sqlite postgresql pgsql saphana mongodb"` Enabled bool `json:"enabled"` CronExpr string `json:"cronExpr" binding:"max=64"` SourcePath string `json:"sourcePath" binding:"max=500"` @@ -578,7 +578,7 @@ func validateTaskTypeSpecificFields(input BackupTaskUpsertInput, passwordRequire if !hasSourcePaths { return apperror.BadRequest("BACKUP_TASK_INVALID", "文件备份必须填写源路径", nil) } - case "mysql", "postgresql", "saphana": + case "mysql", "postgresql", "saphana", "mongodb": if strings.TrimSpace(input.DBHost) == "" { return apperror.BadRequest("BACKUP_TASK_INVALID", "数据库主机不能为空", nil) } diff --git a/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx b/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx index 1e30293..b68f00e 100644 --- a/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx +++ b/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx @@ -198,11 +198,11 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa sourcePath: value === 'file' ? current.sourcePath : '', sourcePaths: value === 'file' ? current.sourcePaths : [''], excludePatterns: value === 'file' ? current.excludePatterns : [], - dbHost: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbHost : '', - dbPort: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbPort || getDefaultPort(value) : 0, - dbUser: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbUser : '', - dbPassword: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbPassword : '', - dbName: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbName : '', + dbHost: isDatabaseBackupTask(value) ? current.dbHost : '', + dbPort: isDatabaseBackupTask(value) ? current.dbPort || getDefaultPort(value) : 0, + dbUser: isDatabaseBackupTask(value) ? current.dbUser : '', + dbPassword: isDatabaseBackupTask(value) ? current.dbPassword : '', + dbName: isDatabaseBackupTask(value) ? current.dbName : '', dbPath: value === 'sqlite' ? current.dbPath : '', // 切换到 SAP HANA 时初始化扩展配置;切换到其他类型时清空 extraConfig: value === 'saphana' diff --git a/web/src/components/backup-tasks/field-config.ts b/web/src/components/backup-tasks/field-config.ts index e5b291c..341935a 100644 --- a/web/src/components/backup-tasks/field-config.ts +++ b/web/src/components/backup-tasks/field-config.ts @@ -6,6 +6,7 @@ export const backupTaskTypeOptions = [ { label: 'SQLite', value: 'sqlite' }, { label: 'PostgreSQL', value: 'postgresql' }, { label: 'SAP HANA', value: 'saphana' }, + { label: 'MongoDB', value: 'mongodb' }, ] as const export const backupCompressionOptions = [ @@ -25,6 +26,8 @@ export function getBackupTaskTypeLabel(type: BackupTaskType) { return 'PostgreSQL' case 'saphana': return 'SAP HANA' + case 'mongodb': + return 'MongoDB' default: return type } @@ -67,7 +70,7 @@ export function isSQLiteBackupTask(type: BackupTaskType) { } export function isDatabaseBackupTask(type: BackupTaskType) { - return type === 'mysql' || type === 'postgresql' || type === 'saphana' + return type === 'mysql' || type === 'postgresql' || type === 'saphana' || type === 'mongodb' } export function getDefaultPort(type: BackupTaskType) { @@ -78,6 +81,8 @@ export function getDefaultPort(type: BackupTaskType) { return 5432 case 'saphana': return 30015 + case 'mongodb': + return 27017 default: return 0 } diff --git a/web/src/components/restore-records/RestoreConfirmModal.tsx b/web/src/components/restore-records/RestoreConfirmModal.tsx index 569d7b5..daaee4f 100644 --- a/web/src/components/restore-records/RestoreConfirmModal.tsx +++ b/web/src/components/restore-records/RestoreConfirmModal.tsx @@ -80,7 +80,7 @@ function renderRestoreTarget(task: BackupTaskDetail) { if (task.type === 'sqlite') { return {task.dbPath || '-'} } - if (task.type === 'mysql' || task.type === 'postgresql' || task.type === 'saphana') { + if (task.type === 'mysql' || task.type === 'postgresql' || task.type === 'saphana' || task.type === 'mongodb') { return ( {task.dbUser}@{task.dbHost}:{task.dbPort} / {task.dbName || '-'} diff --git a/web/src/types/backup-tasks.ts b/web/src/types/backup-tasks.ts index 75595d3..e22bfca 100644 --- a/web/src/types/backup-tasks.ts +++ b/web/src/types/backup-tasks.ts @@ -1,4 +1,4 @@ -export type BackupTaskType = 'file' | 'mysql' | 'sqlite' | 'postgresql' | 'saphana' +export type BackupTaskType = 'file' | 'mysql' | 'sqlite' | 'postgresql' | 'saphana' | 'mongodb' export type BackupTaskStatus = 'idle' | 'running' | 'success' | 'failed' export type BackupCompression = 'gzip' | 'none'