From e04774ff6879032a8b48de461a02895edb831470 Mon Sep 17 00:00:00 2001 From: Wu Qing <3184394176@qq.com> Date: Thu, 16 Apr 2026 23:43:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD:=20=E6=96=B0=E5=A2=9E=20SAP?= =?UTF-8?q?=20HANA=20=E5=AE=8C=E6=95=B4=E5=A4=87=E4=BB=BD=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E4=B8=8E=20Backint=20=E5=8D=8F=E8=AE=AE=E4=BB=A3?= =?UTF-8?q?=E7=90=86=20(#37)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: ignore web/dist directory in git repository * 功能: 新增 SAP HANA 完整备份支持与 Backint 协议代理 - 修复 service 层校验 bug,使 SAP HANA 类型可正常创建 - 增强 hdbsql Runner:支持完整/增量/差异/日志备份、并行通道、失败重试 - 新增 Backint 协议代理(backupx backint 子命令),HANA 原生接口直连 BackupX 存储后端 - 新增本地 SQLite 目录维护 EBID↔对象键映射 - 前端新增 SAP HANA 扩展字段表单(备份类型/级别/通道数/重试次数/实例编号) - README 中英文补充 SAP HANA 两种模式的使用说明 --- .gitignore | 4 +- README.md | 79 +++- README_EN.md | 79 +++- server/cmd/backupx/backint.go | 98 +++++ server/cmd/backupx/main.go | 5 + server/internal/backint/agent.go | 360 ++++++++++++++++++ server/internal/backint/agent_test.go | 217 +++++++++++ server/internal/backint/catalog.go | 102 +++++ server/internal/backint/catalog_test.go | 74 ++++ server/internal/backint/config.go | 140 +++++++ server/internal/backint/config_test.go | 74 ++++ server/internal/backint/protocol.go | 267 +++++++++++++ server/internal/backint/protocol_test.go | 142 +++++++ server/internal/backup/saphana_runner.go | 165 +++++++- server/internal/backup/saphana_runner_test.go | 241 ++++++++++++ server/internal/backup/types.go | 6 + server/internal/model/backup_task.go | 3 + .../service/backup_execution_service.go | 45 ++- .../internal/service/backup_task_service.go | 60 ++- .../backup-tasks/BackupTaskFormDrawer.tsx | 89 +++++ .../components/backup-tasks/field-config.ts | 36 ++ web/src/types/backup-tasks.ts | 4 + 22 files changed, 2247 insertions(+), 43 deletions(-) create mode 100644 server/cmd/backupx/backint.go create mode 100644 server/internal/backint/agent.go create mode 100644 server/internal/backint/agent_test.go create mode 100644 server/internal/backint/catalog.go create mode 100644 server/internal/backint/catalog_test.go create mode 100644 server/internal/backint/config.go create mode 100644 server/internal/backint/config_test.go create mode 100644 server/internal/backint/protocol.go create mode 100644 server/internal/backint/protocol_test.go diff --git a/.gitignore b/.gitignore index 735a8b4..ac8d584 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -web/node_modules/ \ No newline at end of file +web/node_modules/ +web/dist/ +server/bin/ \ No newline at end of file diff --git a/README.md b/README.md index 601d7cc..4981057 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,8 @@ | 能力 | 说明 | |------|------| -| **备份类型** | 文件/目录(多源路径)、MySQL、PostgreSQL、SQLite、SAP HANA | +| **备份类型** | 文件/目录(多源路径)、MySQL、PostgreSQL、SQLite、SAP HANA(完整/增量/差异/日志备份 + 并行通道 + 失败重试) | +| **SAP HANA Backint 代理** | 内置 SAP HANA Backint 协议代理,HANA 原生备份接口可直接把数据路由到 BackupX 支持的任意存储后端 | | **70+ 存储后端** | 内置阿里云 OSS / 腾讯云 COS / 七牛云 / S3 / Google Drive / WebDAV / FTP + 通过 rclone 集成 SFTP、Azure Blob、Dropbox、OneDrive 等 70+ 后端 | | **自动调度** | Cron 定时 + 可视化编辑器 + 自动保留策略(按天数/份数清理,自动回收空目录) | | **多节点** | Master-Agent 集群,统一管理多台服务器的备份,支持远程目录浏览与节点编辑 | @@ -240,6 +241,82 @@ docker exec -it backupx /app/bin/backupx reset-password --username admin --passw --- +## SAP HANA 支持 + +BackupX 提供两种 SAP HANA 备份模式,按需选用: + +### 模式一:hdbsql Runner(Web 控制台托管) + +通过 Web 控制台创建 SAP HANA 备份任务,后端调用 `hdbsql` 执行备份,适合 BackupX 调度的周期性作业。 + +**源配置步骤支持:** + +| 字段 | 可选值 | 说明 | +|------|--------|------| +| 备份类型 | `data` / `log` | 数据备份或日志备份 | +| 备份级别 | `full` / `incremental` / `differential` | 日志备份时自动禁用 | +| 并行通道数 | `1 ~ 32` | `BACKUP DATA USING FILE ('c1','c2',...)` 多路径并发 | +| 失败重试次数 | `1 ~ 10` | 指数退避(5s × 尝试次数²) | +| 实例编号 | 可选 | 从端口推断或手动指定 | + +### 模式二:Backint 协议代理(HANA 原生接口) + +BackupX 内置 Backint Agent,SAP HANA 通过原生 `BACKUP DATA USING BACKINT` 语法调用,数据自动路由到 BackupX 存储目标(S3 / OSS / COS / WebDAV / 70+ 后端)。 + +**1. 准备参数文件** `/opt/backupx/backint_params.ini`: + +```ini +#STORAGE_TYPE = s3 +#STORAGE_CONFIG_JSON = /opt/backupx/storage.json +#PARALLEL_FACTOR = 4 +#COMPRESS = true +#KEY_PREFIX = hana-backup +#CATALOG_DB = /opt/backupx/backint_catalog.db +#LOG_FILE = /var/log/backupx/backint.log +``` + +**2. 准备存储配置** `/opt/backupx/storage.json`(与 BackupX 存储目标配置一致): + +```json +{ + "endpoint": "https://s3.amazonaws.com", + "region": "us-east-1", + "bucket": "hana-prod", + "accessKeyId": "AKIA...", + "secretAccessKey": "..." +} +``` + +**3. 创建 hdbbackint 软链接:** + +```bash +ln -s /opt/backupx/backupx /usr/sap//SYS/global/hdb/opt/hdbbackint +``` + +**4. 在 HANA `global.ini` 中启用:** + +```ini +[backup] +data_backup_using_backint = true +catalog_backup_using_backint = true +log_backup_using_backint = true +data_backup_parameter_file = /opt/backupx/backint_params.ini +log_backup_parameter_file = /opt/backupx/backint_params.ini +``` + +**5. CLI 手动调用(用于排查):** + +```bash +backupx backint -f backup -i input.txt -o output.txt -p backint_params.ini +backupx backint -f restore -i input.txt -o output.txt -p backint_params.ini +backupx backint -f inquire -i input.txt -o output.txt -p backint_params.ini +backupx backint -f delete -i input.txt -o output.txt -p backint_params.ini +``` + +Backint Agent 使用本地 SQLite 维护 `EBID ↔ 对象键` 目录,所有操作遵循 SAP HANA Backint 协议(`#PIPE` / `#SAVED` / `#RESTORED` / `#BACKUP` / `#NOTFOUND` / `#DELETED` / `#ERROR`)。 + +--- + ## 多节点集群 BackupX 支持 Master-Agent 模式管理多台服务器: diff --git a/README_EN.md b/README_EN.md index fe0c6c9..1f07077 100644 --- a/README_EN.md +++ b/README_EN.md @@ -34,7 +34,8 @@ | Capability | Details | |-----------|---------| -| **Backup Types** | Files/Directories (multi-source), MySQL, PostgreSQL, SQLite, SAP HANA | +| **Backup Types** | Files/Directories (multi-source), MySQL, PostgreSQL, SQLite, SAP HANA (full / incremental / differential / log backups + parallel channels + retry) | +| **SAP HANA Backint Agent** | Built-in SAP HANA Backint protocol agent — HANA's native backup interface can route data directly to any storage backend supported by BackupX | | **70+ Storage Backends** | Built-in Alibaba OSS / Tencent COS / Qiniu / S3 / Google Drive / WebDAV / FTP + 70+ backends via rclone (SFTP, Azure Blob, Dropbox, OneDrive, etc.) | | **Scheduling** | Cron-based + visual editor + auto-retention policy (by days/count, auto empty directory cleanup) | | **Multi-Node** | Master-Agent cluster for managing backups across multiple servers with remote directory browsing and node editing | @@ -238,6 +239,82 @@ docker exec -it backupx /app/bin/backupx reset-password --username admin --passw --- +## SAP HANA Support + +BackupX offers two SAP HANA backup modes — pick whichever fits: + +### Mode 1: hdbsql Runner (Web-console managed) + +Create a SAP HANA backup task in the Web console. The backend runs `hdbsql` to perform backups, suitable for BackupX-scheduled recurring jobs. + +**Source configuration supports:** + +| Field | Options | Description | +|-------|---------|-------------| +| Backup type | `data` / `log` | Data or log backup | +| Backup level | `full` / `incremental` / `differential` | Auto-disabled for log backups | +| Parallel channels | `1 ~ 32` | `BACKUP DATA USING FILE ('c1','c2',...)` parallel paths | +| Retry count | `1 ~ 10` | Exponential backoff (5s × attempt²) | +| Instance number | Optional | Inferred from port or manually specified | + +### Mode 2: Backint Protocol Agent (HANA native) + +BackupX ships a built-in Backint Agent. SAP HANA calls it via native `BACKUP DATA USING BACKINT` syntax, and data is routed automatically to BackupX storage targets (S3 / OSS / COS / WebDAV / 70+ backends). + +**1. Prepare parameter file** `/opt/backupx/backint_params.ini`: + +```ini +#STORAGE_TYPE = s3 +#STORAGE_CONFIG_JSON = /opt/backupx/storage.json +#PARALLEL_FACTOR = 4 +#COMPRESS = true +#KEY_PREFIX = hana-backup +#CATALOG_DB = /opt/backupx/backint_catalog.db +#LOG_FILE = /var/log/backupx/backint.log +``` + +**2. Prepare storage config** `/opt/backupx/storage.json` (same schema as BackupX storage targets): + +```json +{ + "endpoint": "https://s3.amazonaws.com", + "region": "us-east-1", + "bucket": "hana-prod", + "accessKeyId": "AKIA...", + "secretAccessKey": "..." +} +``` + +**3. Create the hdbbackint symlink:** + +```bash +ln -s /opt/backupx/backupx /usr/sap//SYS/global/hdb/opt/hdbbackint +``` + +**4. Enable in HANA `global.ini`:** + +```ini +[backup] +data_backup_using_backint = true +catalog_backup_using_backint = true +log_backup_using_backint = true +data_backup_parameter_file = /opt/backupx/backint_params.ini +log_backup_parameter_file = /opt/backupx/backint_params.ini +``` + +**5. Manual CLI invocation (for troubleshooting):** + +```bash +backupx backint -f backup -i input.txt -o output.txt -p backint_params.ini +backupx backint -f restore -i input.txt -o output.txt -p backint_params.ini +backupx backint -f inquire -i input.txt -o output.txt -p backint_params.ini +backupx backint -f delete -i input.txt -o output.txt -p backint_params.ini +``` + +The Backint Agent maintains an `EBID ↔ object-key` catalog in a local SQLite DB. All operations follow the SAP HANA Backint protocol (`#PIPE` / `#SAVED` / `#RESTORED` / `#BACKUP` / `#NOTFOUND` / `#DELETED` / `#ERROR`). + +--- + ## Multi-Node Cluster BackupX supports Master-Agent mode for managing multiple servers: diff --git a/server/cmd/backupx/backint.go b/server/cmd/backupx/backint.go new file mode 100644 index 0000000..12bc268 --- /dev/null +++ b/server/cmd/backupx/backint.go @@ -0,0 +1,98 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + "backupx/server/internal/backint" +) + +// runBackint 是 `backupx backint` 子命令入口。 +// +// CLI 参数遵循 SAP HANA Backint 规范: +// +// backupx backint -f -i -o -p +// [-u ] [-c ] [-l ] [-v ] +// +// 除 -f / -i / -o / -p 外其余参数接受但忽略(兼容 SAP 调用约定)。 +func runBackint(args []string) { + fs := flag.NewFlagSet("backint", flag.ExitOnError) + fnStr := fs.String("f", "", "function: backup | restore | inquire | delete") + inputPath := fs.String("i", "", "input file path") + outputPath := fs.String("o", "", "output file path") + paramFile := fs.String("p", "", "parameter file path") + + // 以下参数仅为兼容 SAP 调用约定,当前未使用 + _ = fs.String("u", "", "user (ignored)") + _ = fs.String("c", "", "config-prefix (ignored)") + _ = fs.String("l", "", "log file override (ignored, use LOG_FILE in params)") + _ = fs.String("v", "", "backint version (ignored)") + + if err := fs.Parse(args); err != nil { + os.Exit(2) + } + + if *fnStr == "" || *inputPath == "" || *outputPath == "" || *paramFile == "" { + fmt.Fprintln(os.Stderr, "backint: -f, -i, -o, -p are required") + fs.Usage() + os.Exit(2) + } + + fn, err := backint.ParseFunction(*fnStr) + if err != nil { + fmt.Fprintf(os.Stderr, "backint: %v\n", err) + os.Exit(2) + } + + cfg, err := backint.LoadConfigFile(*paramFile) + if err != nil { + fmt.Fprintf(os.Stderr, "backint: load config: %v\n", err) + os.Exit(2) + } + + // 配置日志重定向(如果指定 LOG_FILE) + restoreLog, err := redirectStderr(cfg.LogFile) + if err != nil { + fmt.Fprintf(os.Stderr, "backint: open log: %v\n", err) + os.Exit(2) + } + defer restoreLog() + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + agent, err := backint.NewAgent(ctx, cfg) + if err != nil { + fmt.Fprintf(os.Stderr, "backint: init agent: %v\n", err) + os.Exit(1) + } + defer func() { _ = agent.Close() }() + + if err := agent.Run(ctx, fn, *inputPath, *outputPath); err != nil { + fmt.Fprintf(os.Stderr, "backint: run: %v\n", err) + os.Exit(1) + } +} + +// redirectStderr 将 stderr 重定向到指定日志文件,返回恢复函数。 +// 空字符串表示保持原样。 +func redirectStderr(path string) (func(), error) { + if path == "" { + return func() {}, nil + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + orig := os.Stderr + os.Stderr = f + return func() { + os.Stderr = orig + _ = f.Close() + }, nil +} + diff --git a/server/cmd/backupx/main.go b/server/cmd/backupx/main.go index cc10407..5a75019 100644 --- a/server/cmd/backupx/main.go +++ b/server/cmd/backupx/main.go @@ -24,6 +24,11 @@ func main() { runResetPassword(os.Args[2:]) return } + // 子命令分发:backint(SAP HANA Backint Agent 模式) + if len(os.Args) > 1 && os.Args[1] == "backint" { + runBackint(os.Args[2:]) + return + } var configPath string var showVersion bool diff --git a/server/internal/backint/agent.go b/server/internal/backint/agent.go new file mode 100644 index 0000000..56f2382 --- /dev/null +++ b/server/internal/backint/agent.go @@ -0,0 +1,360 @@ +package backint + +import ( + "compress/gzip" + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "io" + "os" + "path" + "strings" + "time" + + "backupx/server/internal/storage" + storageRclone "backupx/server/internal/storage/rclone" +) + +// Agent 是 Backint 协议代理主入口。 +// +// 职责: +// 1. 根据 -f 指定的功能,从 -i 输入文件解析请求 +// 2. 把数据路由到 BackupX storage 后端 +// 3. 把结果写回 -o 输出文件(失败使用 #ERROR,不中断批次) +type Agent struct { + cfg *Config + provider storage.StorageProvider + catalog *Catalog +} + +// NewAgent 构造 Agent,初始化 storage provider 与 catalog。 +func NewAgent(ctx context.Context, cfg *Config) (*Agent, error) { + registry := buildStorageRegistry() + provider, err := registry.Create(ctx, cfg.StorageType, cfg.StorageConfig) + if err != nil { + return nil, fmt.Errorf("create storage provider: %w", err) + } + if err := provider.TestConnection(ctx); err != nil { + return nil, fmt.Errorf("storage provider connection failed: %w", err) + } + cat, err := OpenCatalog(cfg.CatalogDB) + if err != nil { + return nil, err + } + return &Agent{cfg: cfg, provider: provider, catalog: cat}, nil +} + +// Close 释放资源。 +func (a *Agent) Close() error { + if a.catalog != nil { + return a.catalog.Close() + } + return nil +} + +// Run 执行一次 Backint 调用。 +// +// HANA 针对 BACKUP 调用时:input 是 #PIPE 列表,output 需返回 #SAVED 或 #ERROR。 +// 批次中任一条目失败不应导致整个进程退出,因此错误被降级为 #ERROR 行。 +// 仅在极端错误(参数非法、I/O 失败)时返回 error,进程以非 0 退出。 +func (a *Agent) Run(ctx context.Context, fn Function, inputPath, outputPath string) error { + in, err := os.Open(inputPath) + if err != nil { + return fmt.Errorf("open input: %w", err) + } + defer in.Close() + + out, err := os.Create(outputPath) + if err != nil { + return fmt.Errorf("create output: %w", err) + } + defer out.Close() + + switch fn { + case FunctionBackup: + return a.runBackup(ctx, in, out) + case FunctionRestore: + return a.runRestore(ctx, in, out) + case FunctionInquire: + return a.runInquire(ctx, in, out) + case FunctionDelete: + return a.runDelete(ctx, in, out) + default: + return fmt.Errorf("unsupported function: %s", fn) + } +} + +// runBackup 处理 BACKUP 操作:读取每条请求的管道/文件,上传到存储后端。 +func (a *Agent) runBackup(ctx context.Context, in io.Reader, out io.Writer) error { + reqs, err := ParseBackupRequests(in) + if err != nil { + return err + } + for _, req := range reqs { + ebid, perr := a.handleBackupOne(ctx, req) + if perr != nil { + fmt.Fprintf(os.Stderr, "backint: backup %q failed: %v\n", req.Path, perr) + _ = WriteError(out, req.Path) + continue + } + _ = WriteSaved(out, ebid, req.Path) + } + return nil +} + +// handleBackupOne 上传一条请求,返回分配的 EBID。 +func (a *Agent) handleBackupOne(ctx context.Context, req BackupRequest) (string, error) { + src, size, err := openBackupSource(req) + if err != nil { + return "", err + } + defer src.Close() + + ebid := generateEBID() + objectKey := a.objectKeyFor(ebid) + + reader := io.Reader(src) + // 可选 gzip 压缩 + if a.cfg.Compress { + pr, pw := io.Pipe() + go func() { + gw := gzip.NewWriter(pw) + if _, cerr := io.Copy(gw, src); cerr != nil { + _ = gw.Close() + _ = pw.CloseWithError(cerr) + return + } + if cerr := gw.Close(); cerr != nil { + _ = pw.CloseWithError(cerr) + return + } + _ = pw.Close() + }() + reader = pr + size = -1 // 压缩后大小未知 + objectKey += ".gz" + } + + meta := map[string]string{ + "source-path": req.Path, + "ebid": ebid, + "compress": boolStr(a.cfg.Compress), + } + if err := a.provider.Upload(ctx, objectKey, reader, size, meta); err != nil { + return "", fmt.Errorf("upload: %w", err) + } + + if err := a.catalog.Put(CatalogEntry{ + EBID: ebid, + ObjectKey: objectKey, + SourcePath: req.Path, + Size: size, + }); err != nil { + return "", fmt.Errorf("catalog put: %w", err) + } + return ebid, nil +} + +// runRestore 处理 RESTORE 操作:根据 EBID 从存储下载,写入 HANA 指定的管道/文件。 +func (a *Agent) runRestore(ctx context.Context, in io.Reader, out io.Writer) error { + reqs, err := ParseRestoreRequests(in) + if err != nil { + return err + } + for _, req := range reqs { + if perr := a.handleRestoreOne(ctx, req); perr != nil { + fmt.Fprintf(os.Stderr, "backint: restore %q failed: %v\n", req.EBID, perr) + _ = WriteError(out, req.Path) + continue + } + _ = WriteRestored(out, req.EBID, req.Path) + } + return nil +} + +func (a *Agent) handleRestoreOne(ctx context.Context, req RestoreRequest) error { + entry, err := a.catalog.Get(req.EBID) + if err != nil { + return fmt.Errorf("catalog get: %w", err) + } + if entry == nil { + return fmt.Errorf("ebid not found: %s", req.EBID) + } + rc, err := a.provider.Download(ctx, entry.ObjectKey) + if err != nil { + return fmt.Errorf("download: %w", err) + } + defer rc.Close() + + var src io.Reader = rc + if strings.HasSuffix(entry.ObjectKey, ".gz") { + gr, err := gzip.NewReader(rc) + if err != nil { + return fmt.Errorf("gzip reader: %w", err) + } + defer gr.Close() + src = gr + } + + dst, err := openRestoreTarget(req) + if err != nil { + return err + } + defer dst.Close() + + if _, err := io.Copy(dst, src); err != nil { + return fmt.Errorf("copy to target: %w", err) + } + return nil +} + +// runInquire 处理 INQUIRE 操作:查询 EBID 是否存在,或列出全部备份。 +func (a *Agent) runInquire(ctx context.Context, in io.Reader, out io.Writer) error { + reqs, err := ParseInquireRequests(in) + if err != nil { + return err + } + for _, req := range reqs { + if req.All { + entries, err := a.catalog.List() + if err != nil { + fmt.Fprintf(os.Stderr, "backint: inquire list failed: %v\n", err) + _ = WriteError(out, "#NULL") + continue + } + for _, e := range entries { + _ = WriteBackup(out, e.EBID) + } + continue + } + entry, err := a.catalog.Get(req.EBID) + if err != nil { + fmt.Fprintf(os.Stderr, "backint: inquire %q failed: %v\n", req.EBID, err) + _ = WriteError(out, req.EBID) + continue + } + if entry == nil { + _ = WriteNotFound(out, req.EBID) + continue + } + _ = WriteBackup(out, entry.EBID) + } + return nil +} + +// runDelete 处理 DELETE 操作:从存储删除对象并移除目录条目。 +func (a *Agent) runDelete(ctx context.Context, in io.Reader, out io.Writer) error { + reqs, err := ParseDeleteRequests(in) + if err != nil { + return err + } + for _, req := range reqs { + if perr := a.handleDeleteOne(ctx, req); perr != nil { + fmt.Fprintf(os.Stderr, "backint: delete %q failed: %v\n", req.EBID, perr) + _ = WriteError(out, req.EBID) + continue + } + _ = WriteDeleted(out, req.EBID) + } + return nil +} + +func (a *Agent) handleDeleteOne(ctx context.Context, req DeleteRequest) error { + entry, err := a.catalog.Get(req.EBID) + if err != nil { + return fmt.Errorf("catalog get: %w", err) + } + if entry == nil { + return fmt.Errorf("ebid not found: %s", req.EBID) + } + if err := a.provider.Delete(ctx, entry.ObjectKey); err != nil { + // 允许后端返回"不存在"类错误后继续删除目录条目,避免孤立条目 + fmt.Fprintf(os.Stderr, "backint: storage delete warning for %s: %v\n", entry.ObjectKey, err) + } + return a.catalog.Delete(req.EBID) +} + +// 辅助函数 + +func (a *Agent) objectKeyFor(ebid string) string { + base := ebid + ".bin" + if a.cfg.KeyPrefix == "" { + return base + } + return path.Join(a.cfg.KeyPrefix, base) +} + +// openBackupSource 打开 HANA 提供的数据源。 +// +// 对于 #PIPE 模式:HANA 写入命名管道,Agent 读取。管道是顺序流,size 未知 (-1)。 +// 对于文件模式:HANA 已在指定路径写好完整文件。 +func openBackupSource(req BackupRequest) (io.ReadCloser, int64, error) { + if req.IsPipe { + f, err := os.OpenFile(req.Path, os.O_RDONLY, 0) + if err != nil { + return nil, 0, fmt.Errorf("open pipe: %w", err) + } + return f, -1, nil + } + f, err := os.Open(req.Path) + if err != nil { + return nil, 0, fmt.Errorf("open file: %w", err) + } + info, err := f.Stat() + if err != nil { + _ = f.Close() + return nil, 0, fmt.Errorf("stat: %w", err) + } + return f, info.Size(), nil +} + +// openRestoreTarget 打开 HANA 指定的恢复目标(管道或文件)。 +func openRestoreTarget(req RestoreRequest) (io.WriteCloser, error) { + if req.IsPipe { + return os.OpenFile(req.Path, os.O_WRONLY, 0) + } + return os.Create(req.Path) +} + +// generateEBID 生成 Backint 外部备份 ID。 +// 格式:backupx--<16 hex chars> +func generateEBID() string { + var buf [8]byte + if _, err := rand.Read(buf[:]); err != nil { + // fallback:用纳秒时间戳作为熵 + now := time.Now().UnixNano() + for i := 0; i < 8; i++ { + buf[i] = byte(now >> (i * 8)) + } + } + return fmt.Sprintf("backupx-%d-%s", time.Now().Unix(), hex.EncodeToString(buf[:])) +} + +func boolStr(b bool) string { + if b { + return "true" + } + return "false" +} + +// buildStorageRegistry 构造与主程序一致的 storage registry。 +// +// Backint Agent 作为独立 CLI 进程运行,不依赖 BackupX HTTP 服务, +// 因此这里直接引用 storage/rclone 包注册所有后端。 +func buildStorageRegistry() *storage.Registry { + registry := storage.NewRegistry( + storageRclone.NewLocalDiskFactory(), + storageRclone.NewS3Factory(), + storageRclone.NewWebDAVFactory(), + storageRclone.NewGoogleDriveFactory(), + storageRclone.NewAliyunOSSFactory(), + storageRclone.NewTencentCOSFactory(), + storageRclone.NewQiniuKodoFactory(), + storageRclone.NewFTPFactory(), + storageRclone.NewRcloneFactory(), + ) + storageRclone.RegisterAllBackends(registry) + return registry +} + diff --git a/server/internal/backint/agent_test.go b/server/internal/backint/agent_test.go new file mode 100644 index 0000000..3f45120 --- /dev/null +++ b/server/internal/backint/agent_test.go @@ -0,0 +1,217 @@ +package backint + +import ( + "bytes" + "context" + "os" + "path/filepath" + "strings" + "testing" + + "backupx/server/internal/storage" + storageRclone "backupx/server/internal/storage/rclone" +) + +// newTestAgent 构造一个使用本地磁盘后端的 Agent,便于集成测试。 +func newTestAgent(t *testing.T, compress bool) (*Agent, string) { + t.Helper() + dir := t.TempDir() + storageDir := filepath.Join(dir, "storage") + if err := os.MkdirAll(storageDir, 0755); err != nil { + t.Fatal(err) + } + + registry := storage.NewRegistry(storageRclone.NewLocalDiskFactory()) + provider, err := registry.Create(context.Background(), "local_disk", map[string]any{ + "basePath": storageDir, + }) + if err != nil { + t.Fatalf("create provider: %v", err) + } + cat, err := OpenCatalog(filepath.Join(dir, "catalog.db")) + if err != nil { + t.Fatal(err) + } + agent := &Agent{ + cfg: &Config{StorageType: "local_disk", KeyPrefix: "backint", Compress: compress, CatalogDB: filepath.Join(dir, "catalog.db")}, + provider: provider, + catalog: cat, + } + t.Cleanup(func() { _ = agent.Close() }) + return agent, dir +} + +func TestAgent_BackupAndRestore_File(t *testing.T) { + agent, dir := newTestAgent(t, false) + ctx := context.Background() + + // 准备源文件 + src := filepath.Join(dir, "src.bak") + content := []byte("hello backint world") + if err := os.WriteFile(src, content, 0644); err != nil { + t.Fatal(err) + } + + // BACKUP + inPath := filepath.Join(dir, "backup.in") + outPath := filepath.Join(dir, "backup.out") + if err := os.WriteFile(inPath, []byte(src+"\n"), 0644); err != nil { + t.Fatal(err) + } + if err := agent.Run(ctx, FunctionBackup, inPath, outPath); err != nil { + t.Fatalf("backup: %v", err) + } + out, _ := os.ReadFile(outPath) + if !bytes.HasPrefix(out, []byte("#SAVED ")) { + t.Fatalf("expected #SAVED, got: %s", out) + } + // 提取 EBID:#SAVED "" + parts := strings.Fields(string(out)) + if len(parts) < 3 { + t.Fatalf("malformed output: %s", out) + } + ebid := parts[1] + + // RESTORE + restoreDst := filepath.Join(dir, "restored.bak") + inPath2 := filepath.Join(dir, "restore.in") + outPath2 := filepath.Join(dir, "restore.out") + if err := os.WriteFile(inPath2, []byte(ebid+" \""+restoreDst+"\"\n"), 0644); err != nil { + t.Fatal(err) + } + if err := agent.Run(ctx, FunctionRestore, inPath2, outPath2); err != nil { + t.Fatalf("restore: %v", err) + } + got, err := os.ReadFile(restoreDst) + if err != nil { + t.Fatalf("read restored: %v", err) + } + if !bytes.Equal(got, content) { + t.Errorf("restored content mismatch: %q vs %q", got, content) + } +} + +func TestAgent_BackupWithCompression(t *testing.T) { + agent, dir := newTestAgent(t, true) + ctx := context.Background() + + src := filepath.Join(dir, "src.bak") + content := bytes.Repeat([]byte("ABCDEFGH"), 1024) + if err := os.WriteFile(src, content, 0644); err != nil { + t.Fatal(err) + } + + inPath := filepath.Join(dir, "backup.in") + outPath := filepath.Join(dir, "backup.out") + _ = os.WriteFile(inPath, []byte(src+"\n"), 0644) + if err := agent.Run(ctx, FunctionBackup, inPath, outPath); err != nil { + t.Fatalf("backup: %v", err) + } + parts := strings.Fields(string(mustRead(t, outPath))) + ebid := parts[1] + + // 验证 catalog 记录的对象键以 .gz 结尾 + entry, _ := agent.catalog.Get(ebid) + if entry == nil || !strings.HasSuffix(entry.ObjectKey, ".gz") { + t.Fatalf("expected .gz suffix: %+v", entry) + } + + // RESTORE 应能解压回原始内容 + dst := filepath.Join(dir, "restored.bak") + in2 := filepath.Join(dir, "restore.in") + out2 := filepath.Join(dir, "restore.out") + _ = os.WriteFile(in2, []byte(ebid+" \""+dst+"\"\n"), 0644) + if err := agent.Run(ctx, FunctionRestore, in2, out2); err != nil { + t.Fatalf("restore: %v", err) + } + got := mustRead(t, dst) + if !bytes.Equal(got, content) { + t.Errorf("decompressed content mismatch (len=%d vs %d)", len(got), len(content)) + } +} + +func TestAgent_Inquire(t *testing.T) { + agent, dir := newTestAgent(t, false) + ctx := context.Background() + + // 注入两条目录记录 + _ = agent.catalog.Put(CatalogEntry{EBID: "bid-a", ObjectKey: "k/a"}) + _ = agent.catalog.Put(CatalogEntry{EBID: "bid-b", ObjectKey: "k/b"}) + + // INQUIRE #NULL 应列出全部 + in := filepath.Join(dir, "inq.in") + out := filepath.Join(dir, "inq.out") + _ = os.WriteFile(in, []byte("#NULL\n"), 0644) + if err := agent.Run(ctx, FunctionInquire, in, out); err != nil { + t.Fatalf("inquire: %v", err) + } + text := string(mustRead(t, out)) + if !strings.Contains(text, "bid-a") || !strings.Contains(text, "bid-b") { + t.Errorf("expected both ebids, got: %s", text) + } + + // INQUIRE 不存在的 ebid → #NOTFOUND + _ = os.WriteFile(in, []byte("bid-missing\n"), 0644) + if err := agent.Run(ctx, FunctionInquire, in, out); err != nil { + t.Fatalf("inquire missing: %v", err) + } + text = string(mustRead(t, out)) + if !strings.Contains(text, "#NOTFOUND") { + t.Errorf("expected #NOTFOUND, got: %s", text) + } +} + +func TestAgent_Delete(t *testing.T) { + agent, dir := newTestAgent(t, false) + ctx := context.Background() + + // 先做一次 BACKUP + src := filepath.Join(dir, "src.bak") + _ = os.WriteFile(src, []byte("data"), 0644) + inPath := filepath.Join(dir, "b.in") + outPath := filepath.Join(dir, "b.out") + _ = os.WriteFile(inPath, []byte(src+"\n"), 0644) + if err := agent.Run(ctx, FunctionBackup, inPath, outPath); err != nil { + t.Fatal(err) + } + ebid := strings.Fields(string(mustRead(t, outPath)))[1] + + // DELETE + delIn := filepath.Join(dir, "d.in") + delOut := filepath.Join(dir, "d.out") + _ = os.WriteFile(delIn, []byte(ebid+"\n"), 0644) + if err := agent.Run(ctx, FunctionDelete, delIn, delOut); err != nil { + t.Fatalf("delete: %v", err) + } + if !strings.Contains(string(mustRead(t, delOut)), "#DELETED") { + t.Errorf("expected #DELETED, got: %s", mustRead(t, delOut)) + } + // catalog 条目应已删除 + if entry, _ := agent.catalog.Get(ebid); entry != nil { + t.Errorf("catalog entry should be removed, got: %+v", entry) + } +} + +func TestAgent_RestoreUnknownEBID(t *testing.T) { + agent, dir := newTestAgent(t, false) + ctx := context.Background() + + in := filepath.Join(dir, "r.in") + out := filepath.Join(dir, "r.out") + _ = os.WriteFile(in, []byte("bid-unknown \""+filepath.Join(dir, "dst")+"\"\n"), 0644) + if err := agent.Run(ctx, FunctionRestore, in, out); err != nil { + t.Fatalf("run: %v", err) + } + if !strings.Contains(string(mustRead(t, out)), "#ERROR") { + t.Errorf("expected #ERROR for unknown ebid, got: %s", mustRead(t, out)) + } +} + +func mustRead(t *testing.T, path string) []byte { + t.Helper() + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + return b +} diff --git a/server/internal/backint/catalog.go b/server/internal/backint/catalog.go new file mode 100644 index 0000000..418ac90 --- /dev/null +++ b/server/internal/backint/catalog.go @@ -0,0 +1,102 @@ +package backint + +import ( + "fmt" + "time" + + "github.com/glebarez/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/clause" + gormlogger "gorm.io/gorm/logger" +) + +// CatalogEntry 是 Backint 目录条目,建立 BID (备份 ID) 与对象键的映射。 +// +// BID 是 Backint Agent 返回给 SAP HANA 的唯一标识,HANA 后续用它作为 RESTORE/DELETE +// 的句柄。Agent 用 catalog 查询该 BID 对应的实际存储对象键。 +type CatalogEntry struct { + ID uint `gorm:"primaryKey"` + EBID string `gorm:"column:ebid;uniqueIndex;size:128;not null"` + ObjectKey string `gorm:"column:object_key;size:512;not null"` + SourcePath string `gorm:"column:source_path;size:1024"` + Size int64 `gorm:"column:size"` + CreatedAt time.Time `gorm:"column:created_at"` +} + +// TableName 指定表名,避免 GORM 自动复数化。 +func (CatalogEntry) TableName() string { return "backint_catalog" } + +// Catalog 是本地 Backint 目录(SQLite 后端)。 +type Catalog struct { + db *gorm.DB +} + +// OpenCatalog 打开或创建 catalog 数据库。 +func OpenCatalog(dbPath string) (*Catalog, error) { + db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{ + Logger: gormlogger.Default.LogMode(gormlogger.Silent), + }) + if err != nil { + return nil, fmt.Errorf("open catalog: %w", err) + } + if err := db.AutoMigrate(&CatalogEntry{}); err != nil { + return nil, fmt.Errorf("migrate catalog: %w", err) + } + return &Catalog{db: db}, nil +} + +// Close 关闭底层连接。 +func (c *Catalog) Close() error { + if c.db == nil { + return nil + } + sqlDB, err := c.db.DB() + if err != nil { + return err + } + return sqlDB.Close() +} + +// Put 插入或更新一条记录。 +func (c *Catalog) Put(entry CatalogEntry) error { + if entry.EBID == "" { + return fmt.Errorf("ebid is required") + } + if entry.CreatedAt.IsZero() { + entry.CreatedAt = time.Now().UTC() + } + // Upsert:EBID 冲突时更新 object_key/size/source_path + return c.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "ebid"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "object_key", "source_path", "size", "created_at", + }), + }).Create(&entry).Error +} + +// Get 通过 EBID 查询条目。未找到返回 (nil, nil)。 +func (c *Catalog) Get(ebid string) (*CatalogEntry, error) { + var entry CatalogEntry + err := c.db.Where("ebid = ?", ebid).First(&entry).Error + if err == gorm.ErrRecordNotFound { + return nil, nil + } + if err != nil { + return nil, err + } + return &entry, nil +} + +// Delete 删除一条记录。 +func (c *Catalog) Delete(ebid string) error { + return c.db.Where("ebid = ?", ebid).Delete(&CatalogEntry{}).Error +} + +// List 列出全部条目。 +func (c *Catalog) List() ([]CatalogEntry, error) { + var entries []CatalogEntry + if err := c.db.Order("created_at DESC").Find(&entries).Error; err != nil { + return nil, err + } + return entries, nil +} diff --git a/server/internal/backint/catalog_test.go b/server/internal/backint/catalog_test.go new file mode 100644 index 0000000..90c8624 --- /dev/null +++ b/server/internal/backint/catalog_test.go @@ -0,0 +1,74 @@ +package backint + +import ( + "path/filepath" + "testing" +) + +func TestCatalog_CRUD(t *testing.T) { + dir := t.TempDir() + cat, err := OpenCatalog(filepath.Join(dir, "test.db")) + if err != nil { + t.Fatalf("open: %v", err) + } + defer cat.Close() + + if err := cat.Put(CatalogEntry{EBID: "bid-1", ObjectKey: "k/1.bin", SourcePath: "/tmp/a", Size: 100}); err != nil { + t.Fatalf("put: %v", err) + } + if err := cat.Put(CatalogEntry{EBID: "bid-2", ObjectKey: "k/2.bin", Size: 200}); err != nil { + t.Fatalf("put: %v", err) + } + + got, err := cat.Get("bid-1") + if err != nil || got == nil { + t.Fatalf("get: %v %v", got, err) + } + if got.ObjectKey != "k/1.bin" || got.Size != 100 { + t.Errorf("mismatch: %+v", got) + } + + // 不存在的条目 + missing, err := cat.Get("bid-999") + if err != nil { + t.Fatalf("get missing: %v", err) + } + if missing != nil { + t.Errorf("expected nil, got %+v", missing) + } + + // List + all, err := cat.List() + if err != nil || len(all) != 2 { + t.Fatalf("list: %v %d", err, len(all)) + } + + // Delete + if err := cat.Delete("bid-1"); err != nil { + t.Fatalf("delete: %v", err) + } + got, _ = cat.Get("bid-1") + if got != nil { + t.Errorf("bid-1 should be deleted") + } +} + +func TestCatalog_UpsertSameEBID(t *testing.T) { + dir := t.TempDir() + cat, err := OpenCatalog(filepath.Join(dir, "test.db")) + if err != nil { + t.Fatalf("open: %v", err) + } + defer cat.Close() + + if err := cat.Put(CatalogEntry{EBID: "bid-x", ObjectKey: "v1"}); err != nil { + t.Fatal(err) + } + if err := cat.Put(CatalogEntry{EBID: "bid-x", ObjectKey: "v2"}); err != nil { + t.Fatal(err) + } + got, _ := cat.Get("bid-x") + if got == nil || got.ObjectKey != "v2" { + t.Errorf("upsert failed: %+v", got) + } +} diff --git a/server/internal/backint/config.go b/server/internal/backint/config.go new file mode 100644 index 0000000..184a667 --- /dev/null +++ b/server/internal/backint/config.go @@ -0,0 +1,140 @@ +package backint + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "strconv" + "strings" +) + +// Config 是 Backint Agent 的运行时配置。 +// +// SAP HANA 通过 -p 传入一个参数文件。BackupX Backint Agent 复用 SAP +// 的"#KEY = VALUE"风格(兼容原生 backint 参数文件习惯),不支持 section。 +// +// 必填字段: +// - STORAGE_TYPE:存储类型(s3/webdav/local_disk/...,与 BackupX storage registry 一致) +// - STORAGE_CONFIG_JSON:存储配置 JSON 文件路径(或直接 STORAGE_CONFIG = ) +// +// 可选字段: +// - PARALLEL_FACTOR:并行度(默认 1) +// - COMPRESS:是否 gzip 压缩(true/false,默认 false) +// - LOG_FILE:日志文件路径(默认 stderr) +// - CATALOG_DB:本地目录数据库路径(默认 ./backint_catalog.db) +// - KEY_PREFIX:对象键前缀(默认空,最终对象键 = /) +type Config struct { + StorageType string + StorageConfigJSON string // 存储配置 JSON 文件路径 + StorageConfigRaw []byte // 也支持直接内联(STORAGE_CONFIG) + StorageConfig map[string]any // 解析后的存储配置 + ParallelFactor int + Compress bool + LogFile string + CatalogDB string + KeyPrefix string +} + +// LoadConfigFile 从文件加载配置。 +func LoadConfigFile(path string) (*Config, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("open backint config: %w", err) + } + defer f.Close() + return ParseConfig(f) +} + +// ParseConfig 从 reader 解析配置。 +func ParseConfig(r io.Reader) (*Config, error) { + cfg := &Config{ParallelFactor: 1} + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" || strings.HasPrefix(line, ";") { + continue + } + // 兼容可选的 "#" 前缀(SAP 约定) + line = strings.TrimPrefix(line, "#") + eq := strings.Index(line, "=") + if eq < 0 { + continue + } + key := strings.TrimSpace(line[:eq]) + value := strings.TrimSpace(line[eq+1:]) + if len(value) >= 2 && value[0] == '"' && value[len(value)-1] == '"' { + value = value[1 : len(value)-1] + } + switch strings.ToUpper(key) { + case "STORAGE_TYPE": + cfg.StorageType = value + case "STORAGE_CONFIG_JSON": + cfg.StorageConfigJSON = value + case "STORAGE_CONFIG": + cfg.StorageConfigRaw = []byte(value) + case "PARALLEL_FACTOR": + n, err := strconv.Atoi(value) + if err != nil || n <= 0 { + return nil, fmt.Errorf("invalid PARALLEL_FACTOR: %q", value) + } + cfg.ParallelFactor = n + case "COMPRESS": + cfg.Compress = parseBool(value) + case "LOG_FILE": + cfg.LogFile = value + case "CATALOG_DB": + cfg.CatalogDB = value + case "KEY_PREFIX": + cfg.KeyPrefix = strings.Trim(value, "/") + } + } + if err := scanner.Err(); err != nil { + return nil, err + } + if err := cfg.finalize(); err != nil { + return nil, err + } + return cfg, nil +} + +func (c *Config) finalize() error { + if c.StorageType == "" { + return errors.New("STORAGE_TYPE is required") + } + if c.CatalogDB == "" { + c.CatalogDB = "./backint_catalog.db" + } + // 加载存储配置 JSON + var raw []byte + switch { + case c.StorageConfigJSON != "": + data, err := os.ReadFile(c.StorageConfigJSON) + if err != nil { + return fmt.Errorf("read STORAGE_CONFIG_JSON: %w", err) + } + raw = data + case len(c.StorageConfigRaw) > 0: + raw = c.StorageConfigRaw + default: + return errors.New("STORAGE_CONFIG_JSON or STORAGE_CONFIG is required") + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + return fmt.Errorf("parse storage config JSON: %w", err) + } + c.StorageConfig = m + return nil +} + +func parseBool(v string) bool { + switch strings.ToLower(strings.TrimSpace(v)) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} diff --git a/server/internal/backint/config_test.go b/server/internal/backint/config_test.go new file mode 100644 index 0000000..c96bdee --- /dev/null +++ b/server/internal/backint/config_test.go @@ -0,0 +1,74 @@ +package backint + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestParseConfig(t *testing.T) { + dir := t.TempDir() + storagePath := filepath.Join(dir, "storage.json") + if err := os.WriteFile(storagePath, []byte(`{"basePath":"/tmp/backup"}`), 0644); err != nil { + t.Fatal(err) + } + input := ` +; 注释 +#STORAGE_TYPE = local_disk +#STORAGE_CONFIG_JSON = ` + storagePath + ` +#PARALLEL_FACTOR = 4 +#COMPRESS = true +#KEY_PREFIX = /hana/backups/ +#CATALOG_DB = ` + filepath.Join(dir, "catalog.db") + ` +` + cfg, err := ParseConfig(strings.NewReader(input)) + if err != nil { + t.Fatalf("parse: %v", err) + } + if cfg.StorageType != "local_disk" { + t.Errorf("StorageType: %q", cfg.StorageType) + } + if cfg.ParallelFactor != 4 { + t.Errorf("ParallelFactor: %d", cfg.ParallelFactor) + } + if !cfg.Compress { + t.Errorf("Compress should be true") + } + if cfg.KeyPrefix != "hana/backups" { + t.Errorf("KeyPrefix should be trimmed: %q", cfg.KeyPrefix) + } + if cfg.StorageConfig["basePath"] != "/tmp/backup" { + t.Errorf("StorageConfig mismatch: %+v", cfg.StorageConfig) + } +} + +func TestParseConfig_MissingStorageType(t *testing.T) { + input := `PARALLEL_FACTOR = 1` + if _, err := ParseConfig(strings.NewReader(input)); err == nil { + t.Fatal("expected error for missing STORAGE_TYPE") + } +} + +func TestParseConfig_InlineStorageConfig(t *testing.T) { + input := `STORAGE_TYPE = local_disk +STORAGE_CONFIG = {"basePath":"/x"} +` + cfg, err := ParseConfig(strings.NewReader(input)) + if err != nil { + t.Fatalf("parse: %v", err) + } + if cfg.StorageConfig["basePath"] != "/x" { + t.Errorf("inline config not parsed: %+v", cfg.StorageConfig) + } +} + +func TestParseConfig_InvalidParallel(t *testing.T) { + input := `STORAGE_TYPE = local_disk +STORAGE_CONFIG = {} +PARALLEL_FACTOR = oops +` + if _, err := ParseConfig(strings.NewReader(input)); err == nil { + t.Fatal("expected error for invalid PARALLEL_FACTOR") + } +} diff --git a/server/internal/backint/protocol.go b/server/internal/backint/protocol.go new file mode 100644 index 0000000..b790f0f --- /dev/null +++ b/server/internal/backint/protocol.go @@ -0,0 +1,267 @@ +// Package backint 实现 SAP HANA Backint 协议代理。 +// +// Backint 协议是 SAP HANA 与第三方备份工具之间的管道/文件协议。 +// SAP HANA 通过 CLI 调用 Backint Agent,传入参数文件、输入文件、输出文件, +// Agent 根据输入文件中的 #PIPE / #EBID / #NULL 指令读取/写入数据, +// 并在输出文件中返回 #SAVED / #RESTORED / #BACKUP / #NOTFOUND / #DELETED / #ERROR。 +// +// 支持的功能:BACKUP / RESTORE / INQUIRE / DELETE +// 参考规范:SAP HANA Backint Interface for Backup Tools (OSS 1642148) +package backint + +import ( + "bufio" + "errors" + "fmt" + "io" + "strings" +) + +// Function 代表 Backint 操作类型,对应 CLI 的 -f 参数。 +type Function string + +const ( + FunctionBackup Function = "backup" + FunctionRestore Function = "restore" + FunctionInquire Function = "inquire" + FunctionDelete Function = "delete" +) + +// BackupRequest 是 BACKUP 操作的单条请求。 +// +// 两种形态: +// - Pipe: #PIPE (HANA 通过命名管道传输数据) +// - File: "" (HANA 指向一个已完成的临时文件) +type BackupRequest struct { + IsPipe bool + Path string +} + +// RestoreRequest 是 RESTORE 操作的单条请求。 +// +// 形态:#PIPE "" 或 "" +type RestoreRequest struct { + IsPipe bool + EBID string // 之前 BACKUP 返回的备份 ID + Path string +} + +// InquireRequest 是 INQUIRE 操作的单条请求。 +// +// 形态: +// - #NULL (列出所有备份) +// - "" (查询指定 ID 是否存在) +// - #EBID "" (带前缀的变体) +type InquireRequest struct { + All bool + EBID string +} + +// DeleteRequest 是 DELETE 操作的单条请求。 +// +// 形态: 或 #EBID +type DeleteRequest struct { + EBID string +} + +// ParseBackupRequests 解析 BACKUP 输入文件。 +func ParseBackupRequests(r io.Reader) ([]BackupRequest, error) { + var items []BackupRequest + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + if strings.HasPrefix(line, "#PIPE") { + path := strings.TrimSpace(strings.TrimPrefix(line, "#PIPE")) + if path == "" { + return nil, fmt.Errorf("invalid #PIPE line: %q", line) + } + items = append(items, BackupRequest{IsPipe: true, Path: trimQuotes(path)}) + continue + } + items = append(items, BackupRequest{IsPipe: false, Path: trimQuotes(line)}) + } + if err := scanner.Err(); err != nil { + return nil, err + } + return items, nil +} + +// ParseRestoreRequests 解析 RESTORE 输入文件。 +func ParseRestoreRequests(r io.Reader) ([]RestoreRequest, error) { + var items []RestoreRequest + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + isPipe := false + if strings.HasPrefix(line, "#PIPE") { + isPipe = true + line = strings.TrimSpace(strings.TrimPrefix(line, "#PIPE")) + } + if strings.HasPrefix(line, "#EBID") { + line = strings.TrimSpace(strings.TrimPrefix(line, "#EBID")) + } + ebid, rest := splitFirstField(line) + if ebid == "" || rest == "" { + return nil, fmt.Errorf("invalid restore line: %q", line) + } + items = append(items, RestoreRequest{ + IsPipe: isPipe, + EBID: trimQuotes(ebid), + Path: trimQuotes(rest), + }) + } + if err := scanner.Err(); err != nil { + return nil, err + } + return items, nil +} + +// ParseInquireRequests 解析 INQUIRE 输入文件。 +func ParseInquireRequests(r io.Reader) ([]InquireRequest, error) { + var items []InquireRequest + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + if line == "#NULL" { + items = append(items, InquireRequest{All: true}) + continue + } + if strings.HasPrefix(line, "#EBID") { + line = strings.TrimSpace(strings.TrimPrefix(line, "#EBID")) + } + items = append(items, InquireRequest{EBID: trimQuotes(line)}) + } + if err := scanner.Err(); err != nil { + return nil, err + } + return items, nil +} + +// ParseDeleteRequests 解析 DELETE 输入文件。 +func ParseDeleteRequests(r io.Reader) ([]DeleteRequest, error) { + var items []DeleteRequest + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + if strings.HasPrefix(line, "#EBID") { + line = strings.TrimSpace(strings.TrimPrefix(line, "#EBID")) + } + ebid := trimQuotes(strings.TrimSpace(line)) + if ebid == "" { + return nil, fmt.Errorf("invalid delete line: %q", line) + } + items = append(items, DeleteRequest{EBID: ebid}) + } + if err := scanner.Err(); err != nil { + return nil, err + } + return items, nil +} + +// 输出写入辅助 + +// WriteSaved 写入一条 BACKUP 成功响应:#SAVED "" +func WriteSaved(w io.Writer, ebid, path string) error { + _, err := fmt.Fprintf(w, "#SAVED %s %s\n", ebid, quote(path)) + return err +} + +// WriteRestored 写入一条 RESTORE 成功响应:#RESTORED "" "" +func WriteRestored(w io.Writer, ebid, path string) error { + _, err := fmt.Fprintf(w, "#RESTORED %s %s\n", quote(ebid), quote(path)) + return err +} + +// WriteBackup 写入一条 INQUIRE 命中响应:#BACKUP "" +func WriteBackup(w io.Writer, ebid string) error { + _, err := fmt.Fprintf(w, "#BACKUP %s\n", quote(ebid)) + return err +} + +// WriteNotFound 写入一条 INQUIRE/RESTORE 未命中响应:#NOTFOUND "" +func WriteNotFound(w io.Writer, identifier string) error { + _, err := fmt.Fprintf(w, "#NOTFOUND %s\n", quote(identifier)) + return err +} + +// WriteDeleted 写入一条 DELETE 成功响应:#DELETED "" +func WriteDeleted(w io.Writer, ebid string) error { + _, err := fmt.Fprintf(w, "#DELETED %s\n", quote(ebid)) + return err +} + +// WriteError 写入一条错误响应:#ERROR "" +// +// SAP HANA 会将 #ERROR 视为本条请求失败,但不会终止整个批次。 +// 在 stderr 输出错误详情便于排查。 +func WriteError(w io.Writer, identifier string) error { + _, err := fmt.Fprintf(w, "#ERROR %s\n", quote(identifier)) + return err +} + +// 内部工具函数 + +func trimQuotes(s string) string { + s = strings.TrimSpace(s) + if len(s) >= 2 && s[0] == '"' && s[len(s)-1] == '"' { + return s[1 : len(s)-1] + } + return s +} + +func quote(s string) string { + return `"` + strings.ReplaceAll(s, `"`, `\"`) + `"` +} + +// splitFirstField 把一行拆分为 "第一个字段" 和 "剩余部分"。 +// 支持带引号的字段:`"abc def" "path"` → `abc def` / `"path"`。 +func splitFirstField(line string) (first, rest string) { + line = strings.TrimSpace(line) + if line == "" { + return "", "" + } + if line[0] == '"' { + idx := strings.Index(line[1:], `"`) + if idx < 0 { + return line, "" + } + return line[1 : idx+1], strings.TrimSpace(line[idx+2:]) + } + idx := strings.IndexAny(line, " \t") + if idx < 0 { + return line, "" + } + return line[:idx], strings.TrimSpace(line[idx+1:]) +} + +// ParseFunction 将 CLI 的 -f 参数字符串规范化为 Function。 +func ParseFunction(s string) (Function, error) { + switch strings.ToLower(strings.TrimSpace(s)) { + case "backup": + return FunctionBackup, nil + case "restore": + return FunctionRestore, nil + case "inquire": + return FunctionInquire, nil + case "delete": + return FunctionDelete, nil + default: + return "", errors.New("unsupported backint function: " + s) + } +} diff --git a/server/internal/backint/protocol_test.go b/server/internal/backint/protocol_test.go new file mode 100644 index 0000000..43e2f46 --- /dev/null +++ b/server/internal/backint/protocol_test.go @@ -0,0 +1,142 @@ +package backint + +import ( + "bytes" + "strings" + "testing" +) + +func TestParseBackupRequests(t *testing.T) { + input := `#PIPE /tmp/pipe1 +#PIPE "/tmp/pipe two" +/tmp/file.bak +"/tmp/file two.bak" +` + reqs, err := ParseBackupRequests(strings.NewReader(input)) + if err != nil { + t.Fatalf("parse: %v", err) + } + if len(reqs) != 4 { + t.Fatalf("expected 4 requests, got %d", len(reqs)) + } + if !reqs[0].IsPipe || reqs[0].Path != "/tmp/pipe1" { + t.Errorf("req[0] mismatch: %+v", reqs[0]) + } + if !reqs[1].IsPipe || reqs[1].Path != "/tmp/pipe two" { + t.Errorf("req[1] mismatch: %+v", reqs[1]) + } + if reqs[2].IsPipe || reqs[2].Path != "/tmp/file.bak" { + t.Errorf("req[2] mismatch: %+v", reqs[2]) + } + if reqs[3].Path != "/tmp/file two.bak" { + t.Errorf("req[3] mismatch: %+v", reqs[3]) + } +} + +func TestParseRestoreRequests(t *testing.T) { + input := `#PIPE backupx-123 "/tmp/pipe1" +#EBID "backupx-456" "/tmp/file.bak" +backupx-789 /tmp/plain.bak +` + reqs, err := ParseRestoreRequests(strings.NewReader(input)) + if err != nil { + t.Fatalf("parse: %v", err) + } + if len(reqs) != 3 { + t.Fatalf("expected 3, got %d", len(reqs)) + } + if !reqs[0].IsPipe || reqs[0].EBID != "backupx-123" || reqs[0].Path != "/tmp/pipe1" { + t.Errorf("req[0] mismatch: %+v", reqs[0]) + } + if reqs[1].IsPipe || reqs[1].EBID != "backupx-456" { + t.Errorf("req[1] mismatch: %+v", reqs[1]) + } + if reqs[2].EBID != "backupx-789" || reqs[2].Path != "/tmp/plain.bak" { + t.Errorf("req[2] mismatch: %+v", reqs[2]) + } +} + +func TestParseInquireRequests(t *testing.T) { + input := "#NULL\nbackupx-abc\n#EBID \"backupx-xyz\"\n" + reqs, err := ParseInquireRequests(strings.NewReader(input)) + if err != nil { + t.Fatalf("parse: %v", err) + } + if len(reqs) != 3 { + t.Fatalf("expected 3, got %d", len(reqs)) + } + if !reqs[0].All { + t.Errorf("req[0] should be All") + } + if reqs[1].EBID != "backupx-abc" { + t.Errorf("req[1] mismatch: %+v", reqs[1]) + } + if reqs[2].EBID != "backupx-xyz" { + t.Errorf("req[2] mismatch: %+v", reqs[2]) + } +} + +func TestParseDeleteRequests(t *testing.T) { + input := "backupx-aaa\n#EBID \"backupx-bbb\"\n" + reqs, err := ParseDeleteRequests(strings.NewReader(input)) + if err != nil { + t.Fatalf("parse: %v", err) + } + if len(reqs) != 2 || reqs[0].EBID != "backupx-aaa" || reqs[1].EBID != "backupx-bbb" { + t.Fatalf("unexpected: %+v", reqs) + } +} + +func TestWriteResponses(t *testing.T) { + var buf bytes.Buffer + _ = WriteSaved(&buf, "backupx-1", "/tmp/x") + _ = WriteRestored(&buf, "backupx-2", "/tmp/y") + _ = WriteBackup(&buf, "backupx-3") + _ = WriteNotFound(&buf, "backupx-4") + _ = WriteDeleted(&buf, "backupx-5") + _ = WriteError(&buf, "/tmp/z") + want := "#SAVED backupx-1 \"/tmp/x\"\n" + + "#RESTORED \"backupx-2\" \"/tmp/y\"\n" + + "#BACKUP \"backupx-3\"\n" + + "#NOTFOUND \"backupx-4\"\n" + + "#DELETED \"backupx-5\"\n" + + "#ERROR \"/tmp/z\"\n" + if buf.String() != want { + t.Errorf("output mismatch:\n got: %q\nwant: %q", buf.String(), want) + } +} + +func TestParseFunction(t *testing.T) { + cases := map[string]Function{ + "backup": FunctionBackup, + "BACKUP": FunctionBackup, + "restore": FunctionRestore, + "inquire": FunctionInquire, + "delete": FunctionDelete, + } + for s, want := range cases { + got, err := ParseFunction(s) + if err != nil || got != want { + t.Errorf("ParseFunction(%q) = %v, %v; want %v", s, got, err, want) + } + } + if _, err := ParseFunction("bogus"); err == nil { + t.Errorf("expected error for bogus function") + } +} + +func TestSplitFirstField(t *testing.T) { + cases := []struct{ in, first, rest string }{ + {`abc def`, "abc", "def"}, + {`"abc def" ghi`, "abc def", "ghi"}, + {`"a b" "c d"`, "a b", `"c d"`}, + {`lone`, "lone", ""}, + {``, "", ""}, + } + for _, c := range cases { + f, r := splitFirstField(c.in) + if f != c.first || r != c.rest { + t.Errorf("splitFirstField(%q) = (%q, %q); want (%q, %q)", c.in, f, r, c.first, c.rest) + } + } +} diff --git a/server/internal/backup/saphana_runner.go b/server/internal/backup/saphana_runner.go index 5ed142e..a1b4669 100644 --- a/server/internal/backup/saphana_runner.go +++ b/server/internal/backup/saphana_runner.go @@ -35,6 +35,12 @@ func (r *SAPHANARunner) Type() string { // Run executes a SAP HANA data-level backup using hdbsql + BACKUP DATA USING FILE. // The backup files are written to a temporary directory, then packaged into a tar // archive as the artifact for BackupX to compress/encrypt/upload. +// +// 支持以下增强(通过 task.Database 字段配置): +// - BackupLevel: full / incremental / differential +// - BackupType: data / log +// - BackupChannels: 并行通道数(>1 时生成多路径 SQL) +// - MaxRetries: hdbsql 执行失败的重试次数 func (r *SAPHANARunner) Run(ctx context.Context, task TaskSpec, writer LogWriter) (*RunResult, error) { if _, err := r.executor.LookPath("hdbsql"); err != nil { return nil, fmt.Errorf("未找到 hdbsql 命令 (请确保服务器已安装 SAP HANA Client)") @@ -68,32 +74,46 @@ func (r *SAPHANARunner) Run(ctx context.Context, task TaskSpec, writer LogWriter port = 30015 } - writer.WriteLine(fmt.Sprintf("连接到 SAP HANA: %s:%d", task.Database.Host, port)) + backupLevel := normalizeBackupLevel(task.Database.BackupLevel) + backupType := normalizeBackupType(task.Database.BackupType) + channels := task.Database.BackupChannels + if channels < 1 { + channels = 1 + } + maxRetries := task.Database.MaxRetries + if maxRetries < 1 { + maxRetries = 3 + } + instance := task.Database.InstanceNumber + if strings.TrimSpace(instance) == "" { + instance = hanaInstanceNumber(port) + } + + writer.WriteLine(fmt.Sprintf("连接到 SAP HANA: %s:%d (实例 %s)", task.Database.Host, port, instance)) writer.WriteLine(fmt.Sprintf("备份数据库: %s", tenantDB)) + writer.WriteLine(fmt.Sprintf("备份配置: 类型=%s, 级别=%s, 通道数=%d, 最大重试=%d", backupType, backupLevel, channels, maxRetries)) // Build backup prefix — HANA will create files like _databackup__1. timestamp := startedAt.UTC().Format("20060102_150405") - backupPrefix := filepath.Join(backupDir, fmt.Sprintf("hana_%s_%s", strings.ToLower(tenantDB), timestamp)) - - // Build `BACKUP DATA USING FILE` SQL. - backupSQL := fmt.Sprintf(`BACKUP DATA USING FILE ('%s')`, backupPrefix) - if strings.ToUpper(tenantDB) != "SYSTEMDB" { - backupSQL = fmt.Sprintf(`BACKUP DATA FOR %s USING FILE ('%s')`, tenantDB, backupPrefix) + prefixes, err := buildBackupPrefixes(backupDir, tenantDB, timestamp, channels) + if err != nil { + return nil, err } + // Build SQL based on backup type and level. + backupSQL := buildBackupSQL(tenantDB, prefixes, backupType, backupLevel) + writer.WriteLine(fmt.Sprintf("生成 SQL: %s", backupSQL)) + // Construct hdbsql connection arguments. args := buildHdbsqlArgs(task.Database.Host, port, task.Database.User, task.Database.Password, tenantDB, backupSQL) - stderrWriter := newLogLineWriter(writer, "hdbsql") - writer.WriteLine("开始执行 SAP HANA BACKUP DATA USING FILE") + writer.WriteLine("开始执行 SAP HANA 备份命令") - if err := r.executor.Run(ctx, "hdbsql", args, CommandOptions{ - Stderr: stderrWriter, - }); err != nil { - return nil, fmt.Errorf("run hdbsql BACKUP DATA: %w: %s", err, stderrWriter.collected()) + if err := r.runHdbsqlWithRetry(ctx, "hdbsql", args, maxRetries, writer); err != nil { + return nil, fmt.Errorf("run hdbsql backup: %w", err) } - writer.WriteLine("SAP HANA BACKUP DATA 命令执行完成,开始打包备份文件") + writer.WriteLine("SAP HANA 备份命令执行完成,开始打包备份文件") // Package all generated backup files into a tar archive. if err := packageBackupFiles(backupDir, artifactPath, writer); err != nil { @@ -166,12 +186,12 @@ func (r *SAPHANARunner) Restore(ctx context.Context, task TaskSpec, artifactPath args := buildHdbsqlArgs(task.Database.Host, port, task.Database.User, task.Database.Password, tenantDB, recoverSQL) - stderrWriter := newLogLineWriter(writer, "hdbsql") - if err := r.executor.Run(ctx, "hdbsql", args, CommandOptions{ - Stderr: stderrWriter, - }); err != nil { - errMsg := stderrWriter.collected() - return fmt.Errorf("run hdbsql RECOVER DATA: %w: %s", err, strings.TrimSpace(errMsg)) + maxRetries := task.Database.MaxRetries + if maxRetries < 1 { + maxRetries = 3 + } + if err := r.runHdbsqlWithRetry(ctx, "hdbsql", args, maxRetries, writer); err != nil { + return fmt.Errorf("run hdbsql RECOVER DATA: %w", err) } writer.WriteLine("SAP HANA 恢复完成") @@ -188,6 +208,111 @@ func hanaInstanceNumber(port int) string { return "00" } +// normalizeBackupLevel 规范化备份级别值,无效或空值默认为 "full"。 +func normalizeBackupLevel(level string) string { + switch strings.ToLower(strings.TrimSpace(level)) { + case "incremental": + return "incremental" + case "differential": + return "differential" + default: + return "full" + } +} + +// normalizeBackupType 规范化备份类型,无效或空值默认为 "data"。 +func normalizeBackupType(t string) string { + switch strings.ToLower(strings.TrimSpace(t)) { + case "log": + return "log" + default: + return "data" + } +} + +// buildBackupPrefixes 为每个并行通道生成独立子目录和路径前缀。 +// 当 channels=1 时返回单个直接位于 backupDir 下的前缀; +// 当 channels>1 时为每个通道创建 chan_N/ 子目录。 +func buildBackupPrefixes(backupDir, tenantDB, timestamp string, channels int) ([]string, error) { + tenantLower := strings.ToLower(tenantDB) + if channels <= 1 { + return []string{filepath.Join(backupDir, fmt.Sprintf("hana_%s_%s", tenantLower, timestamp))}, nil + } + prefixes := make([]string, 0, channels) + for i := 0; i < channels; i++ { + chanDir := filepath.Join(backupDir, fmt.Sprintf("chan_%d", i)) + if err := os.MkdirAll(chanDir, 0o755); err != nil { + return nil, fmt.Errorf("create channel %d dir: %w", i, err) + } + prefixes = append(prefixes, filepath.Join(chanDir, fmt.Sprintf("hana_%s_%s", tenantLower, timestamp))) + } + return prefixes, nil +} + +// buildBackupSQL 根据备份类型和级别构建 SAP HANA BACKUP SQL 语句。 +// +// 支持的语法: +// +// 全量数据备份: BACKUP DATA [FOR ] USING FILE ('p1' [, 'p2', ...]) +// 增量数据备份: BACKUP DATA [FOR ] INCREMENTAL USING FILE ('...') +// 差异数据备份: BACKUP DATA [FOR ] DIFFERENTIAL USING FILE ('...') +// 日志备份: BACKUP LOG [FOR ] USING FILE ('...') +func buildBackupSQL(tenantDB string, prefixes []string, backupType, backupLevel string) string { + tenantClause := "" + if strings.ToUpper(tenantDB) != "SYSTEMDB" { + tenantClause = fmt.Sprintf(" FOR %s", tenantDB) + } + + // 多路径以 'p1', 'p2', ... 拼接(HANA 多通道并行语法) + quoted := make([]string, len(prefixes)) + for i, p := range prefixes { + quoted[i] = fmt.Sprintf("'%s'", p) + } + pathClause := strings.Join(quoted, ", ") + + if backupType == "log" { + // LOG 备份不支持 INCREMENTAL/DIFFERENTIAL 关键字 + return fmt.Sprintf("BACKUP LOG%s USING FILE (%s)", tenantClause, pathClause) + } + + levelClause := "" + switch backupLevel { + case "incremental": + levelClause = " INCREMENTAL" + case "differential": + levelClause = " DIFFERENTIAL" + } + return fmt.Sprintf("BACKUP DATA%s%s USING FILE (%s)", tenantClause, levelClause, pathClause) +} + +// runHdbsqlWithRetry 执行 hdbsql 命令并在失败时按指数退避重试。 +// 退避公式:5s × attempt²,并在 ctx 取消时立即返回。 +func (r *SAPHANARunner) runHdbsqlWithRetry(ctx context.Context, name string, args []string, maxAttempts int, writer LogWriter) error { + if maxAttempts < 1 { + maxAttempts = 1 + } + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + if attempt > 1 { + backoff := time.Duration(attempt*attempt) * 5 * time.Second + writer.WriteLine(fmt.Sprintf("hdbsql 第 %d 次重试(等待 %s)", attempt, backoff)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + } + stderrWriter := newLogLineWriter(writer, "hdbsql") + err := r.executor.Run(ctx, name, args, CommandOptions{Stderr: stderrWriter}) + if err == nil { + return nil + } + lastErr = fmt.Errorf("%w: %s", err, strings.TrimSpace(stderrWriter.collected())) + writer.WriteLine(fmt.Sprintf("hdbsql 执行失败(第 %d/%d 次): %v", attempt, maxAttempts, lastErr)) + } + return lastErr +} + // buildHdbsqlArgs constructs the common hdbsql CLI arguments. func buildHdbsqlArgs(host string, port int, user, password, database, sql string) []string { return []string{ diff --git a/server/internal/backup/saphana_runner_test.go b/server/internal/backup/saphana_runner_test.go index a8358bd..da21e75 100644 --- a/server/internal/backup/saphana_runner_test.go +++ b/server/internal/backup/saphana_runner_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" "testing" + "time" ) func TestSAPHANARunnerRun_BackupDataCommand(t *testing.T) { @@ -273,6 +274,246 @@ func TestSAPHANARunnerRestore_TenantRecoverCommand(t *testing.T) { } } +func TestBuildBackupSQL_FullSystemDB(t *testing.T) { + sql := buildBackupSQL("SYSTEMDB", []string{"/tmp/p1"}, "data", "full") + if sql != "BACKUP DATA USING FILE ('/tmp/p1')" { + t.Fatalf("unexpected SQL: %s", sql) + } +} + +func TestBuildBackupSQL_IncrementalTenant(t *testing.T) { + sql := buildBackupSQL("HDB", []string{"/tmp/p1"}, "data", "incremental") + expected := "BACKUP DATA FOR HDB INCREMENTAL USING FILE ('/tmp/p1')" + if sql != expected { + t.Fatalf("expected %q, got %q", expected, sql) + } +} + +func TestBuildBackupSQL_DifferentialTenant(t *testing.T) { + sql := buildBackupSQL("HDB", []string{"/tmp/p1"}, "data", "differential") + expected := "BACKUP DATA FOR HDB DIFFERENTIAL USING FILE ('/tmp/p1')" + if sql != expected { + t.Fatalf("expected %q, got %q", expected, sql) + } +} + +func TestBuildBackupSQL_LogBackup(t *testing.T) { + sql := buildBackupSQL("HDB", []string{"/tmp/log"}, "log", "full") + expected := "BACKUP LOG FOR HDB USING FILE ('/tmp/log')" + if sql != expected { + t.Fatalf("expected %q, got %q", expected, sql) + } +} + +func TestBuildBackupSQL_ParallelChannels(t *testing.T) { + sql := buildBackupSQL("SYSTEMDB", []string{"/tmp/c0/p", "/tmp/c1/p", "/tmp/c2/p"}, "data", "full") + expected := "BACKUP DATA USING FILE ('/tmp/c0/p', '/tmp/c1/p', '/tmp/c2/p')" + if sql != expected { + t.Fatalf("expected %q, got %q", expected, sql) + } +} + +func TestNormalizeBackupLevel(t *testing.T) { + cases := map[string]string{ + "": "full", + "FULL": "full", + "incremental": "incremental", + "DIFFERENTIAL": "differential", + "unknown": "full", + } + for in, want := range cases { + if got := normalizeBackupLevel(in); got != want { + t.Errorf("normalizeBackupLevel(%q) = %q, want %q", in, got, want) + } + } +} + +func TestNormalizeBackupType(t *testing.T) { + cases := map[string]string{ + "": "data", + "DATA": "data", + "log": "log", + "LOG": "log", + } + for in, want := range cases { + if got := normalizeBackupType(in); got != want { + t.Errorf("normalizeBackupType(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSAPHANARunnerRun_IncrementalBackup(t *testing.T) { + var capturedSQL string + executor := &fakeCommandExecutor{ + runFunc: func(name string, args []string, options CommandOptions) error { + capturedSQL = args[len(args)-1] + startIdx := strings.Index(capturedSQL, "('") + 2 + endIdx := strings.Index(capturedSQL, "')") + if startIdx > 1 && endIdx > startIdx { + prefix := capturedSQL[startIdx:endIdx] + _ = os.MkdirAll(filepath.Dir(prefix), 0o755) + _ = os.WriteFile(prefix+"_databackup_0_1", []byte("incremental data"), 0o644) + } + return nil + }, + } + + runner := NewSAPHANARunner(executor) + result, err := runner.Run(context.Background(), TaskSpec{ + Name: "hana-incremental", + Type: "saphana", + Database: DatabaseSpec{ + Host: "10.0.0.1", + Port: 30015, + User: "SYSTEM", + Password: "secret", + Names: []string{"HDB"}, + BackupLevel: "incremental", + }, + }, NopLogWriter{}) + + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + defer os.RemoveAll(result.TempDir) + + if !strings.Contains(capturedSQL, "INCREMENTAL USING FILE") { + t.Fatalf("expected INCREMENTAL in SQL, got: %s", capturedSQL) + } +} + +func TestSAPHANARunnerRun_LogBackup(t *testing.T) { + var capturedSQL string + executor := &fakeCommandExecutor{ + runFunc: func(name string, args []string, options CommandOptions) error { + capturedSQL = args[len(args)-1] + startIdx := strings.Index(capturedSQL, "('") + 2 + endIdx := strings.Index(capturedSQL, "')") + if startIdx > 1 && endIdx > startIdx { + prefix := capturedSQL[startIdx:endIdx] + _ = os.MkdirAll(filepath.Dir(prefix), 0o755) + _ = os.WriteFile(prefix+"_logbackup_0_1", []byte("log data"), 0o644) + } + return nil + }, + } + + runner := NewSAPHANARunner(executor) + result, err := runner.Run(context.Background(), TaskSpec{ + Name: "hana-log", + Type: "saphana", + Database: DatabaseSpec{ + Host: "10.0.0.1", Port: 30015, User: "SYSTEM", Password: "secret", + Names: []string{"HDB"}, + BackupType: "log", + }, + }, NopLogWriter{}) + + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + defer os.RemoveAll(result.TempDir) + + if !strings.Contains(capturedSQL, "BACKUP LOG FOR HDB USING FILE") { + t.Fatalf("expected log backup SQL, got: %s", capturedSQL) + } +} + +func TestSAPHANARunnerRun_ParallelChannels(t *testing.T) { + var capturedSQL string + executor := &fakeCommandExecutor{ + runFunc: func(name string, args []string, options CommandOptions) error { + capturedSQL = args[len(args)-1] + // 模拟为每个通道生成备份文件 + parts := strings.Split(capturedSQL, "',") + for _, p := range parts { + p = strings.TrimSpace(p) + if idx := strings.Index(p, "'"); idx >= 0 { + prefix := p[idx+1:] + prefix = strings.TrimSuffix(prefix, "')") + prefix = strings.TrimSuffix(prefix, "'") + if prefix != "" { + _ = os.MkdirAll(filepath.Dir(prefix), 0o755) + _ = os.WriteFile(prefix+"_databackup_0_1", []byte("data"), 0o644) + } + } + } + return nil + }, + } + + runner := NewSAPHANARunner(executor) + result, err := runner.Run(context.Background(), TaskSpec{ + Name: "hana-parallel", + Type: "saphana", + Database: DatabaseSpec{ + Host: "10.0.0.1", Port: 30015, User: "SYSTEM", Password: "secret", + Names: []string{"SYSTEMDB"}, + BackupChannels: 3, + }, + }, NopLogWriter{}) + + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + defer os.RemoveAll(result.TempDir) + + // 应该包含 3 个路径 + if strings.Count(capturedSQL, "'") != 6 { // 3 路径 × 2 引号 + t.Fatalf("expected 3 channels (6 quotes), got SQL: %s", capturedSQL) + } + if !strings.Contains(capturedSQL, "chan_0") || !strings.Contains(capturedSQL, "chan_2") { + t.Fatalf("expected channel directories in SQL, got: %s", capturedSQL) + } +} + +func TestSAPHANARunnerRun_RetryOnFailure(t *testing.T) { + attempts := 0 + executor := &fakeCommandExecutor{ + runFunc: func(name string, args []string, options CommandOptions) error { + attempts++ + if attempts < 2 { + return errors.New("transient failure") + } + // 第二次成功,写入备份文件 + sql := args[len(args)-1] + startIdx := strings.Index(sql, "('") + 2 + endIdx := strings.Index(sql, "')") + if startIdx > 1 && endIdx > startIdx { + prefix := sql[startIdx:endIdx] + _ = os.MkdirAll(filepath.Dir(prefix), 0o755) + _ = os.WriteFile(prefix+"_databackup_0_1", []byte("data"), 0o644) + } + return nil + }, + } + + // 使用极短的重试周期版本(这里通过 fake context 机制无法快进时间,所以直接验证 attempts) + // 设置 MaxRetries=2 以加快测试,不会真实等待 5s + runner := NewSAPHANARunner(executor) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := runner.Run(ctx, TaskSpec{ + Name: "hana-retry", + Type: "saphana", + Database: DatabaseSpec{ + Host: "10.0.0.1", Port: 30015, User: "SYSTEM", Password: "secret", + Names: []string{"SYSTEMDB"}, + MaxRetries: 2, + }, + }, NopLogWriter{}) + + if err != nil { + t.Fatalf("Run returned error after retry: %v", err) + } + defer os.RemoveAll(result.TempDir) + + if attempts != 2 { + t.Fatalf("expected 2 attempts, got %d", attempts) + } +} + func TestHanaInstanceNumber(t *testing.T) { tests := []struct { port int diff --git a/server/internal/backup/types.go b/server/internal/backup/types.go index 3c3dee4..74d9fea 100644 --- a/server/internal/backup/types.go +++ b/server/internal/backup/types.go @@ -12,6 +12,12 @@ type DatabaseSpec struct { Password string Names []string Path string + // SAP HANA 特有字段(其他类型忽略) + InstanceNumber string // 实例编号(从端口推断或手动指定) + BackupLevel string // "full"(默认) / "incremental" / "differential" + BackupType string // "data"(默认) / "log" + BackupChannels int // 并行通道数(默认 1) + MaxRetries int // 最大重试次数(默认 3) } type TaskSpec struct { diff --git a/server/internal/model/backup_task.go b/server/internal/model/backup_task.go index 356a66a..0eb6202 100644 --- a/server/internal/model/backup_task.go +++ b/server/internal/model/backup_task.go @@ -7,6 +7,7 @@ const ( BackupTaskTypeMySQL = "mysql" BackupTaskTypeSQLite = "sqlite" BackupTaskTypePostgreSQL = "postgresql" + BackupTaskTypeSAPHANA = "saphana" ) const ( @@ -31,6 +32,8 @@ type BackupTask struct { DBPasswordCiphertext string `gorm:"column:db_password_ciphertext;type:text" json:"-"` DBName string `gorm:"column:db_name;size:255" json:"dbName"` DBPath string `gorm:"column:db_path;size:500" json:"dbPath"` + // ExtraConfig 类型特有的扩展配置(JSON),如 SAP HANA 的 backupLevel / backupChannels 等 + ExtraConfig string `gorm:"column:extra_config;type:text" json:"extraConfig"` StorageTargetID uint `gorm:"column:storage_target_id;index;not null" json:"storageTargetId"` // deprecated: 保留兼容 StorageTarget StorageTarget `json:"storageTarget,omitempty"` // deprecated: 保留兼容 StorageTargets []StorageTarget `gorm:"many2many:backup_task_storage_targets" json:"storageTargets,omitempty"` diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index a514547..7dc323c 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -525,6 +525,22 @@ func (s *BackupExecutionService) buildTaskSpec(task *model.BackupTask, startedAt return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err) } } + dbSpec := backup.DatabaseSpec{ + Host: task.DBHost, + Port: task.DBPort, + User: task.DBUser, + Password: password, + Names: []string{task.DBName}, + Path: task.DBPath, + } + // 解析 ExtraConfig 填充类型特有字段(目前主要用于 SAP HANA) + if strings.TrimSpace(task.ExtraConfig) != "" { + extra := map[string]any{} + if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil { + return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err) + } + applyHANAExtraConfig(&dbSpec, extra) + } return backup.TaskSpec{ ID: task.ID, Name: task.Name, @@ -540,17 +556,30 @@ func (s *BackupExecutionService) buildTaskSpec(task *model.BackupTask, startedAt MaxBackups: task.MaxBackups, StartedAt: startedAt, TempDir: s.tempDir, - Database: backup.DatabaseSpec{ - Host: task.DBHost, - Port: task.DBPort, - User: task.DBUser, - Password: password, - Names: []string{task.DBName}, - Path: task.DBPath, - }, + Database: dbSpec, }, nil } +// applyHANAExtraConfig 从 ExtraConfig map 中提取 SAP HANA 字段填入 DatabaseSpec。 +// 不识别的键被忽略,保持向后兼容。 +func applyHANAExtraConfig(spec *backup.DatabaseSpec, extra map[string]any) { + if v, ok := extra["instanceNumber"].(string); ok { + spec.InstanceNumber = strings.TrimSpace(v) + } + if v, ok := extra["backupLevel"].(string); ok { + spec.BackupLevel = strings.ToLower(strings.TrimSpace(v)) + } + if v, ok := extra["backupType"].(string); ok { + spec.BackupType = strings.ToLower(strings.TrimSpace(v)) + } + if v, ok := extra["backupChannels"].(float64); ok { + spec.BackupChannels = int(v) + } + if v, ok := extra["maxRetries"].(float64); ok { + spec.MaxRetries = int(v) + } +} + func (s *BackupExecutionService) loadRecordProvider(ctx context.Context, recordID uint) (*model.BackupRecord, storage.StorageProvider, error) { record, err := s.records.FindByID(ctx, recordID) if err != nil { diff --git a/server/internal/service/backup_task_service.go b/server/internal/service/backup_task_service.go index 194adcf..5554b8a 100644 --- a/server/internal/service/backup_task_service.go +++ b/server/internal/service/backup_task_service.go @@ -19,7 +19,7 @@ const backupTaskMaskedValue = "********" type BackupTaskUpsertInput struct { Name string `json:"name" binding:"required,min=1,max=100"` - Type string `json:"type" binding:"required,oneof=file mysql sqlite postgresql pgsql"` + Type string `json:"type" binding:"required,oneof=file mysql sqlite postgresql pgsql saphana"` Enabled bool `json:"enabled"` CronExpr string `json:"cronExpr" binding:"max=64"` SourcePath string `json:"sourcePath" binding:"max=500"` @@ -37,6 +37,8 @@ type BackupTaskUpsertInput struct { Compression string `json:"compression" binding:"omitempty,oneof=gzip none"` Encrypt bool `json:"encrypt"` MaxBackups int `json:"maxBackups"` + // ExtraConfig 类型特有扩展配置(如 SAP HANA 的 backupLevel/backupChannels) + ExtraConfig map[string]any `json:"extraConfig"` } type BackupTaskToggleInput struct { @@ -64,16 +66,17 @@ type BackupTaskSummary struct { type BackupTaskDetail struct { BackupTaskSummary - SourcePath string `json:"sourcePath"` - SourcePaths []string `json:"sourcePaths"` - ExcludePatterns []string `json:"excludePatterns"` - DBHost string `json:"dbHost"` - DBPort int `json:"dbPort"` - DBUser string `json:"dbUser"` - DBName string `json:"dbName"` - DBPath string `json:"dbPath"` - MaskedFields []string `json:"maskedFields,omitempty"` - CreatedAt time.Time `json:"createdAt"` + SourcePath string `json:"sourcePath"` + SourcePaths []string `json:"sourcePaths"` + ExcludePatterns []string `json:"excludePatterns"` + DBHost string `json:"dbHost"` + DBPort int `json:"dbPort"` + DBUser string `json:"dbUser"` + DBName string `json:"dbName"` + DBPath string `json:"dbPath"` + ExtraConfig map[string]any `json:"extraConfig,omitempty"` + MaskedFields []string `json:"maskedFields,omitempty"` + CreatedAt time.Time `json:"createdAt"` } type BackupTaskScheduler interface { @@ -346,7 +349,7 @@ func validateTaskTypeSpecificFields(input BackupTaskUpsertInput, passwordRequire if !hasSourcePaths { return apperror.BadRequest("BACKUP_TASK_INVALID", "文件备份必须填写源路径", nil) } - case "mysql", "postgresql": + case "mysql", "postgresql", "saphana": if strings.TrimSpace(input.DBHost) == "" { return apperror.BadRequest("BACKUP_TASK_INVALID", "数据库主机不能为空", nil) } @@ -417,6 +420,10 @@ func (s *BackupTaskService) buildTask(existing *model.BackupTask, input BackupTa if len(resolvedPaths) > 0 { primarySourcePath = resolvedPaths[0] } + extraConfigJSON, err := encodeExtraConfig(input.ExtraConfig) + if err != nil { + return nil, apperror.BadRequest("BACKUP_TASK_INVALID", "扩展配置格式不合法", err) + } item := &model.BackupTask{ Name: strings.TrimSpace(input.Name), Type: normalizeBackupTaskType(input.Type), @@ -431,6 +438,7 @@ func (s *BackupTaskService) buildTask(existing *model.BackupTask, input BackupTa DBPasswordCiphertext: passwordCiphertext, DBName: strings.TrimSpace(input.DBName), DBPath: strings.TrimSpace(input.DBPath), + ExtraConfig: extraConfigJSON, StorageTargetID: primaryTargetID, StorageTargets: storageTargets, RetentionDays: input.RetentionDays, @@ -456,6 +464,10 @@ func (s *BackupTaskService) toDetail(item *model.BackupTask) (*BackupTaskDetail, if err != nil { return nil, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err) } + extraConfig, err := decodeExtraConfig(item.ExtraConfig) + if err != nil { + return nil, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err) + } detail := &BackupTaskDetail{ BackupTaskSummary: toBackupTaskSummary(item), SourcePath: item.SourcePath, @@ -466,6 +478,7 @@ func (s *BackupTaskService) toDetail(item *model.BackupTask) (*BackupTaskDetail, DBUser: item.DBUser, DBName: item.DBName, DBPath: item.DBPath, + ExtraConfig: extraConfig, CreatedAt: item.CreatedAt, } if item.DBPasswordCiphertext != "" { @@ -580,6 +593,29 @@ func decodeSourcePaths(value string) ([]string, error) { return items, nil } +func encodeExtraConfig(value map[string]any) (string, error) { + if len(value) == 0 { + return "", nil + } + encoded, err := json.Marshal(value) + if err != nil { + return "", err + } + return string(encoded), nil +} + +func decodeExtraConfig(value string) (map[string]any, error) { + trimmed := strings.TrimSpace(value) + if trimmed == "" || trimmed == "{}" { + return nil, nil + } + result := map[string]any{} + if err := json.Unmarshal([]byte(trimmed), &result); err != nil { + return nil, err + } + return result, nil +} + func normalizeBackupTaskType(value string) string { normalized := strings.TrimSpace(strings.ToLower(value)) if normalized == "pgsql" { diff --git a/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx b/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx index e5a6eb6..09fa165 100644 --- a/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx +++ b/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx @@ -11,10 +11,15 @@ import { StorageTargetFormDrawer } from '../storage-targets/StorageTargetFormDra import { backupCompressionOptions, backupTaskTypeOptions, + defaultSapHanaExtraConfig, getDefaultPort, isDatabaseBackupTask, isFileBackupTask, + isSapHanaBackupTask, isSQLiteBackupTask, + sapHanaBackupLevelOptions, + sapHanaBackupTypeOptions, + type SapHanaExtraConfig, } from './field-config' interface BackupTaskFormDrawerProps { @@ -55,6 +60,7 @@ function createEmptyDraft(storageTargets?: StorageTargetSummary[]): BackupTaskPa compression: 'gzip', encrypt: false, maxBackups: 10, + extraConfig: undefined, } } @@ -114,6 +120,7 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa compression: initialValue.compression, encrypt: initialValue.encrypt, maxBackups: initialValue.maxBackups, + extraConfig: initialValue.extraConfig, }) setExcludePatternsText(initialValue.excludePatterns.join('\n')) setCurrentStep(0) @@ -152,12 +159,28 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa dbPassword: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbPassword : '', dbName: value === 'mysql' || value === 'postgresql' || value === 'saphana' ? current.dbName : '', dbPath: value === 'sqlite' ? current.dbPath : '', + // 切换到 SAP HANA 时初始化扩展配置;切换到其他类型时清空 + extraConfig: value === 'saphana' + ? ({ ...defaultSapHanaExtraConfig(), ...(current.extraConfig as SapHanaExtraConfig | undefined) } as unknown as Record) + : undefined, })) if (value !== 'file') { setExcludePatternsText('') } } + // 更新 SAP HANA 扩展配置的辅助函数 + function updateHanaExtraConfig(patch: Partial) { + setDraft((current) => { + const merged: SapHanaExtraConfig = { + ...defaultSapHanaExtraConfig(), + ...(current.extraConfig as SapHanaExtraConfig | undefined), + ...patch, + } + return { ...current, extraConfig: merged as unknown as Record } + }) + } + function validate(value: BackupTaskPayload) { if (!value.name.trim()) { return '请输入任务名称' @@ -368,12 +391,78 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa updateDraft({ dbName: value })} /> )} + {isSapHanaBackupTask(draft.type) ? renderSapHanaExtraFields() : null} ) : null} ) } + function renderSapHanaExtraFields() { + const hana: SapHanaExtraConfig = { + ...defaultSapHanaExtraConfig(), + ...(draft.extraConfig as SapHanaExtraConfig | undefined), + } + return ( + <> + + SAP HANA 扩展配置 + +
+ 备份类型 + updateHanaExtraConfig({ backupLevel: value as SapHanaExtraConfig['backupLevel'] })} + /> + {hana.backupType === 'log' ? ( + 日志备份不支持级别配置 + ) : null} +
+
+ 并行通道数 + updateHanaExtraConfig({ backupChannels: Number(value ?? 1) })} + /> + {'>'} 1 时启用 HANA 多路径并行备份 +
+
+ 失败重试次数 + updateHanaExtraConfig({ maxRetries: Number(value ?? 3) })} + /> +
+
+ 实例编号(可选) + updateHanaExtraConfig({ instanceNumber: value })} + /> +
+ + ) + } + async function handleQuickCreateSubmit(value: StorageTargetPayload) { if (!onCreateStorageTarget) return setQuickCreateLoading(true) diff --git a/web/src/components/backup-tasks/field-config.ts b/web/src/components/backup-tasks/field-config.ts index a3c2f0c..e5b291c 100644 --- a/web/src/components/backup-tasks/field-config.ts +++ b/web/src/components/backup-tasks/field-config.ts @@ -86,3 +86,39 @@ export function getDefaultPort(type: BackupTaskType) { export function getCompressionLabel(compression: BackupCompression) { return compression === 'gzip' ? 'Gzip' : '无' } + +/** SAP HANA 备份级别选项 */ +export const sapHanaBackupLevelOptions = [ + { label: '完整备份 (Full)', value: 'full' }, + { label: '增量备份 (Incremental)', value: 'incremental' }, + { label: '差异备份 (Differential)', value: 'differential' }, +] as const + +/** SAP HANA 备份类型选项 */ +export const sapHanaBackupTypeOptions = [ + { label: '数据备份 (Data)', value: 'data' }, + { label: '日志备份 (Log)', value: 'log' }, +] as const + +/** SAP HANA 扩展配置默认值 */ +export interface SapHanaExtraConfig { + backupType?: 'data' | 'log' + backupLevel?: 'full' | 'incremental' | 'differential' + backupChannels?: number + maxRetries?: number + instanceNumber?: string +} + +export function isSapHanaBackupTask(type: BackupTaskType) { + return type === 'saphana' +} + +export function defaultSapHanaExtraConfig(): SapHanaExtraConfig { + return { + backupType: 'data', + backupLevel: 'full', + backupChannels: 1, + maxRetries: 3, + instanceNumber: '', + } +} diff --git a/web/src/types/backup-tasks.ts b/web/src/types/backup-tasks.ts index 294cc43..fce3f65 100644 --- a/web/src/types/backup-tasks.ts +++ b/web/src/types/backup-tasks.ts @@ -33,6 +33,8 @@ export interface BackupTaskDetail extends BackupTaskSummary { dbUser: string dbName: string dbPath: string + /** 类型特有的扩展配置(如 SAP HANA 的 backupLevel/backupChannels 等) */ + extraConfig?: Record maskedFields?: string[] createdAt: string } @@ -59,6 +61,8 @@ export interface BackupTaskPayload { compression: BackupCompression encrypt: boolean maxBackups: number + /** 类型特有的扩展配置(如 SAP HANA 的 backupLevel/backupChannels 等) */ + extraConfig?: Record } export interface BackupTaskTogglePayload {