mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-06-12 21:29:35 +08:00
feat(backup): 新增 MongoDB 备份与恢复支持 (#87)
通过 mongodump/mongorestore --archive 流式管线接入 MongoDB 数据源,与现有数据库运行器架构一致;注册到 Master 与 Agent,含任务校验、默认端口与前端表单/恢复确认。5 个单测覆盖参数构造、全库、空产物与缺工具分支。
This commit is contained in:
@@ -34,6 +34,7 @@ func NewExecutor(client *MasterClient, tempDir string) *Executor {
|
||||
backup.NewMySQLRunner(nil),
|
||||
backup.NewPostgreSQLRunner(nil),
|
||||
backup.NewSAPHANARunner(nil),
|
||||
backup.NewMongoDBRunner(nil),
|
||||
)
|
||||
storageRegistry := storage.NewRegistry(
|
||||
storageRclone.NewLocalDiskFactory(),
|
||||
|
||||
@@ -82,7 +82,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
|
||||
backupTaskService := service.NewBackupTaskService(backupTaskRepo, storageTargetRepo, configCipher)
|
||||
backupTaskService.SetRecordsAndStorage(backupRecordRepo, storageRegistry)
|
||||
// nodeRepo 在下方 Cluster 节点管理区块才实例化,这里延后注入
|
||||
backupRunnerRegistry := backup.NewRegistry(backup.NewFileRunner(), backup.NewSQLiteRunner(), backup.NewMySQLRunner(nil), backup.NewPostgreSQLRunner(nil), backup.NewSAPHANARunner(nil))
|
||||
backupRunnerRegistry := backup.NewRegistry(backup.NewFileRunner(), backup.NewSQLiteRunner(), backup.NewMySQLRunner(nil), backup.NewPostgreSQLRunner(nil), backup.NewSAPHANARunner(nil), backup.NewMongoDBRunner(nil))
|
||||
logHub := backup.NewLogHub()
|
||||
retentionService := backupretention.NewService(backupRecordRepo)
|
||||
notifyRegistry := notify.NewRegistry(notify.NewEmailNotifier(), notify.NewWebhookNotifier(), notify.NewTelegramNotifier())
|
||||
|
||||
119
server/internal/backup/mongodb_runner.go
Normal file
119
server/internal/backup/mongodb_runner.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MongoDBRunner 通过 mongodump/mongorestore 备份与恢复 MongoDB 数据库。
|
||||
// 采用 --archive 流式模式(dump 写 stdout、restore 读 stdin),与 MySQLRunner
|
||||
// 的 mysqldump/mysql 管线保持一致;产物为未压缩的 mongo archive,由备份管线统一压缩/加密。
|
||||
type MongoDBRunner struct {
|
||||
executor CommandExecutor
|
||||
}
|
||||
|
||||
func NewMongoDBRunner(executor CommandExecutor) *MongoDBRunner {
|
||||
if executor == nil {
|
||||
executor = NewOSCommandExecutor()
|
||||
}
|
||||
return &MongoDBRunner{executor: executor}
|
||||
}
|
||||
|
||||
func (r *MongoDBRunner) Type() string {
|
||||
return "mongodb"
|
||||
}
|
||||
|
||||
func (r *MongoDBRunner) Run(ctx context.Context, task TaskSpec, writer LogWriter) (*RunResult, error) {
|
||||
if _, err := r.executor.LookPath("mongodump"); err != nil {
|
||||
return nil, fmt.Errorf("未找到 mongodump 命令 (请确保服务器已安装 mongodb-database-tools)")
|
||||
}
|
||||
startedAt := task.StartedAt
|
||||
if startedAt.IsZero() {
|
||||
startedAt = time.Now().UTC()
|
||||
}
|
||||
tempDir, err := CreateTaskTempDir(task.Name, startedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fileName := BuildArtifactName(task.Name, startedAt, "archive")
|
||||
artifactPath := filepath.Join(tempDir, fileName)
|
||||
file, err := os.Create(artifactPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create mongodump archive file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
args := mongoConnArgs(task.Database)
|
||||
dbNames := normalizeDatabaseNames(task.Database.Names)
|
||||
if len(dbNames) == 1 {
|
||||
args = append(args, "--db", dbNames[0])
|
||||
writer.WriteLine(fmt.Sprintf("备份数据库: %s", dbNames[0]))
|
||||
} else {
|
||||
writer.WriteLine("备份全部数据库")
|
||||
}
|
||||
args = append(args, "--archive") // 归档流式写入 stdout
|
||||
|
||||
writer.WriteLine(fmt.Sprintf("连接到 MongoDB: %s:%d", task.Database.Host, task.Database.Port))
|
||||
stderrWriter := newLogLineWriter(writer, "mongodump")
|
||||
writer.WriteLine("开始执行 mongodump")
|
||||
if err := r.executor.Run(ctx, "mongodump", args, CommandOptions{Stdout: file, Stderr: stderrWriter}); err != nil {
|
||||
return nil, fmt.Errorf("run mongodump: %w: %s", err, stderrWriter.collected())
|
||||
}
|
||||
info, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat mongodump archive: %w", err)
|
||||
}
|
||||
if info.Size() == 0 {
|
||||
return nil, fmt.Errorf("mongodump 产物为空,请检查数据库连接与权限")
|
||||
}
|
||||
writer.WriteLine(fmt.Sprintf("MongoDB 导出完成(文件大小: %s)", formatFileSize(info.Size())))
|
||||
return &RunResult{ArtifactPath: artifactPath, FileName: fileName, TempDir: tempDir, Size: info.Size(), StorageKey: BuildStorageKey("mongodb", startedAt, fileName)}, nil
|
||||
}
|
||||
|
||||
func (r *MongoDBRunner) Restore(ctx context.Context, task TaskSpec, artifactPath string, writer LogWriter) error {
|
||||
if _, err := r.executor.LookPath("mongorestore"); err != nil {
|
||||
return fmt.Errorf("未找到 mongorestore 命令 (请确保服务器已安装 mongodb-database-tools)")
|
||||
}
|
||||
input, err := os.Open(filepath.Clean(artifactPath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("open mongodb restore archive: %w", err)
|
||||
}
|
||||
defer input.Close()
|
||||
|
||||
args := mongoConnArgs(task.Database)
|
||||
// --drop:恢复前删除同名集合,保证恢复后与归档一致(与 mysql 恢复的整库覆盖语义对齐)。
|
||||
args = append(args, "--drop", "--archive")
|
||||
stderr := &bytes.Buffer{}
|
||||
writer.WriteLine("开始执行 mongorestore")
|
||||
if err := r.executor.Run(ctx, "mongorestore", args, CommandOptions{Stdin: input, Stderr: stderr}); err != nil {
|
||||
return fmt.Errorf("run mongorestore: %w: %s", err, strings.TrimSpace(stderr.String()))
|
||||
}
|
||||
writer.WriteLine("MongoDB 恢复完成")
|
||||
return nil
|
||||
}
|
||||
|
||||
// mongoConnArgs 构造 mongodump/mongorestore 的连接与认证参数。
|
||||
// 注意:mongodb-database-tools 无类似 MYSQL_PWD 的密码环境变量,密码只能经 --password 传入;
|
||||
// 认证库默认 admin(绝大多数部署的管理账号所在库)。
|
||||
func mongoConnArgs(db DatabaseSpec) []string {
|
||||
args := make([]string, 0, 8)
|
||||
if strings.TrimSpace(db.Host) != "" {
|
||||
args = append(args, "--host", db.Host)
|
||||
}
|
||||
if db.Port > 0 {
|
||||
args = append(args, "--port", strconv.Itoa(db.Port))
|
||||
}
|
||||
if strings.TrimSpace(db.User) != "" {
|
||||
args = append(args, "--username", db.User, "--authenticationDatabase", "admin")
|
||||
if strings.TrimSpace(db.Password) != "" {
|
||||
args = append(args, "--password", db.Password)
|
||||
}
|
||||
}
|
||||
return args
|
||||
}
|
||||
102
server/internal/backup/mongodb_runner_test.go
Normal file
102
server/internal/backup/mongodb_runner_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func argIndex(args []string, target string) int {
|
||||
for i, a := range args {
|
||||
if a == target {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func TestMongoDBRunnerRunUsesMongodump(t *testing.T) {
|
||||
executor := &fakeCommandExecutor{runFunc: func(name string, args []string, options CommandOptions) error {
|
||||
if options.Stdout != nil {
|
||||
_, _ = io.WriteString(options.Stdout, "mongo archive bytes")
|
||||
}
|
||||
return nil
|
||||
}}
|
||||
runner := NewMongoDBRunner(executor)
|
||||
result, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, User: "admin", Password: "secret", Names: []string{"app"}}}, NopLogWriter{})
|
||||
if err != nil {
|
||||
t.Fatalf("Run returned error: %v", err)
|
||||
}
|
||||
if executor.lastName != "mongodump" {
|
||||
t.Fatalf("expected mongodump, got %s", executor.lastName)
|
||||
}
|
||||
args := executor.lastArgs
|
||||
if argIndex(args, "--archive") < 0 {
|
||||
t.Fatalf("expected --archive flag, got %#v", args)
|
||||
}
|
||||
if i := argIndex(args, "--db"); i < 0 || i+1 >= len(args) || args[i+1] != "app" {
|
||||
t.Fatalf("expected --db app, got %#v", args)
|
||||
}
|
||||
if i := argIndex(args, "--username"); i < 0 || args[i+1] != "admin" {
|
||||
t.Fatalf("expected --username admin, got %#v", args)
|
||||
}
|
||||
if argIndex(args, "--authenticationDatabase") < 0 || argIndex(args, "--password") < 0 {
|
||||
t.Fatalf("expected auth args, got %#v", args)
|
||||
}
|
||||
if _, err := os.Stat(result.ArtifactPath); err != nil {
|
||||
t.Fatalf("artifact file missing: %v", err)
|
||||
}
|
||||
if result.StorageKey == "" || !strings.HasSuffix(result.FileName, ".archive") {
|
||||
t.Fatalf("unexpected result metadata: %#v", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMongoDBRunnerRunBackupsAllWhenNoDatabase(t *testing.T) {
|
||||
executor := &fakeCommandExecutor{runFunc: func(name string, args []string, options CommandOptions) error {
|
||||
_, _ = io.WriteString(options.Stdout, "all dbs")
|
||||
return nil
|
||||
}}
|
||||
runner := NewMongoDBRunner(executor)
|
||||
_, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017}}, NopLogWriter{})
|
||||
if err != nil {
|
||||
t.Fatalf("Run returned error: %v", err)
|
||||
}
|
||||
if argIndex(executor.lastArgs, "--db") >= 0 {
|
||||
t.Fatalf("expected no --db when backing up all databases, got %#v", executor.lastArgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMongoDBRunnerRunRejectsEmptyOutput(t *testing.T) {
|
||||
executor := &fakeCommandExecutor{} // runFunc nil → writes nothing
|
||||
runner := NewMongoDBRunner(executor)
|
||||
_, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, Names: []string{"app"}}}, NopLogWriter{})
|
||||
if err == nil {
|
||||
t.Fatal("expected error for empty mongodump output")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMongoDBRunnerRestoreUsesMongorestore(t *testing.T) {
|
||||
executor := &fakeCommandExecutor{}
|
||||
runner := NewMongoDBRunner(executor)
|
||||
artifact := filepathJoinTempFile(t, "dump.archive", "mongo archive bytes")
|
||||
if err := runner.Restore(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, User: "admin", Password: "secret"}}, artifact, NopLogWriter{}); err != nil {
|
||||
t.Fatalf("Restore returned error: %v", err)
|
||||
}
|
||||
if executor.lastName != "mongorestore" {
|
||||
t.Fatalf("expected mongorestore, got %s", executor.lastName)
|
||||
}
|
||||
if argIndex(executor.lastArgs, "--drop") < 0 || argIndex(executor.lastArgs, "--archive") < 0 {
|
||||
t.Fatalf("expected --drop --archive, got %#v", executor.lastArgs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMongoDBRunnerRunReturnsLookupError(t *testing.T) {
|
||||
runner := NewMongoDBRunner(&fakeCommandExecutor{lookupErr: errors.New("missing")})
|
||||
_, err := runner.Run(context.Background(), TaskSpec{Name: "mongo", Type: "mongodb", Database: DatabaseSpec{Host: "127.0.0.1", Port: 27017, Names: []string{"app"}}}, NopLogWriter{})
|
||||
if err == nil {
|
||||
t.Fatal("expected error when mongodump is missing")
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ const (
|
||||
BackupTaskTypeSQLite = "sqlite"
|
||||
BackupTaskTypePostgreSQL = "postgresql"
|
||||
BackupTaskTypeSAPHANA = "saphana"
|
||||
BackupTaskTypeMongoDB = "mongodb"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -21,7 +21,7 @@ const backupTaskMaskedValue = "********"
|
||||
|
||||
type BackupTaskUpsertInput struct {
|
||||
Name string `json:"name" binding:"required,min=1,max=100"`
|
||||
Type string `json:"type" binding:"required,oneof=file mysql sqlite postgresql pgsql saphana"`
|
||||
Type string `json:"type" binding:"required,oneof=file mysql sqlite postgresql pgsql saphana mongodb"`
|
||||
Enabled bool `json:"enabled"`
|
||||
CronExpr string `json:"cronExpr" binding:"max=64"`
|
||||
SourcePath string `json:"sourcePath" binding:"max=500"`
|
||||
@@ -578,7 +578,7 @@ func validateTaskTypeSpecificFields(input BackupTaskUpsertInput, passwordRequire
|
||||
if !hasSourcePaths {
|
||||
return apperror.BadRequest("BACKUP_TASK_INVALID", "文件备份必须填写源路径", nil)
|
||||
}
|
||||
case "mysql", "postgresql", "saphana":
|
||||
case "mysql", "postgresql", "saphana", "mongodb":
|
||||
if strings.TrimSpace(input.DBHost) == "" {
|
||||
return apperror.BadRequest("BACKUP_TASK_INVALID", "数据库主机不能为空", nil)
|
||||
}
|
||||
|
||||
@@ -198,11 +198,11 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa
|
||||
sourcePath: value === 'file' ? current.sourcePath : '',
|
||||
sourcePaths: value === 'file' ? current.sourcePaths : [''],
|
||||
excludePatterns: value === 'file' ? current.excludePatterns : [],
|
||||
dbHost: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbHost : '',
|
||||
dbPort: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbPort || getDefaultPort(value) : 0,
|
||||
dbUser: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbUser : '',
|
||||
dbPassword: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbPassword : '',
|
||||
dbName: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbName : '',
|
||||
dbHost: isDatabaseBackupTask(value) ? current.dbHost : '',
|
||||
dbPort: isDatabaseBackupTask(value) ? current.dbPort || getDefaultPort(value) : 0,
|
||||
dbUser: isDatabaseBackupTask(value) ? current.dbUser : '',
|
||||
dbPassword: isDatabaseBackupTask(value) ? current.dbPassword : '',
|
||||
dbName: isDatabaseBackupTask(value) ? current.dbName : '',
|
||||
dbPath: value === 'sqlite' ? current.dbPath : '',
|
||||
// 切换到 SAP HANA 时初始化扩展配置;切换到其他类型时清空
|
||||
extraConfig: value === 'saphana'
|
||||
|
||||
@@ -6,6 +6,7 @@ export const backupTaskTypeOptions = [
|
||||
{ label: 'SQLite', value: 'sqlite' },
|
||||
{ label: 'PostgreSQL', value: 'postgresql' },
|
||||
{ label: 'SAP HANA', value: 'saphana' },
|
||||
{ label: 'MongoDB', value: 'mongodb' },
|
||||
] as const
|
||||
|
||||
export const backupCompressionOptions = [
|
||||
@@ -25,6 +26,8 @@ export function getBackupTaskTypeLabel(type: BackupTaskType) {
|
||||
return 'PostgreSQL'
|
||||
case 'saphana':
|
||||
return 'SAP HANA'
|
||||
case 'mongodb':
|
||||
return 'MongoDB'
|
||||
default:
|
||||
return type
|
||||
}
|
||||
@@ -67,7 +70,7 @@ export function isSQLiteBackupTask(type: BackupTaskType) {
|
||||
}
|
||||
|
||||
export function isDatabaseBackupTask(type: BackupTaskType) {
|
||||
return type === 'mysql' || type === 'postgresql' || type === 'saphana'
|
||||
return type === 'mysql' || type === 'postgresql' || type === 'saphana' || type === 'mongodb'
|
||||
}
|
||||
|
||||
export function getDefaultPort(type: BackupTaskType) {
|
||||
@@ -78,6 +81,8 @@ export function getDefaultPort(type: BackupTaskType) {
|
||||
return 5432
|
||||
case 'saphana':
|
||||
return 30015
|
||||
case 'mongodb':
|
||||
return 27017
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ function renderRestoreTarget(task: BackupTaskDetail) {
|
||||
if (task.type === 'sqlite') {
|
||||
return <Typography.Text code>{task.dbPath || '-'}</Typography.Text>
|
||||
}
|
||||
if (task.type === 'mysql' || task.type === 'postgresql' || task.type === 'saphana') {
|
||||
if (task.type === 'mysql' || task.type === 'postgresql' || task.type === 'saphana' || task.type === 'mongodb') {
|
||||
return (
|
||||
<Typography.Text>
|
||||
{task.dbUser}@{task.dbHost}:{task.dbPort} / <Typography.Text code>{task.dbName || '-'}</Typography.Text>
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
export type BackupTaskType = 'file' | 'mysql' | 'sqlite' | 'postgresql' | 'saphana'
|
||||
export type BackupTaskType = 'file' | 'mysql' | 'sqlite' | 'postgresql' | 'saphana' | 'mongodb'
|
||||
export type BackupTaskStatus = 'idle' | 'running' | 'success' | 'failed'
|
||||
export type BackupCompression = 'gzip' | 'none'
|
||||
|
||||
|
||||
Reference in New Issue
Block a user