mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-06-05 17:59:34 +08:00
功能: 新增 SAP HANA 完整备份支持与 Backint 协议代理 (#37)
* chore: ignore web/dist directory in git repository * 功能: 新增 SAP HANA 完整备份支持与 Backint 协议代理 - 修复 service 层校验 bug,使 SAP HANA 类型可正常创建 - 增强 hdbsql Runner:支持完整/增量/差异/日志备份、并行通道、失败重试 - 新增 Backint 协议代理(backupx backint 子命令),HANA 原生接口直连 BackupX 存储后端 - 新增本地 SQLite 目录维护 EBID↔对象键映射 - 前端新增 SAP HANA 扩展字段表单(备份类型/级别/通道数/重试次数/实例编号) - README 中英文补充 SAP HANA 两种模式的使用说明
This commit is contained in:
360
server/internal/backint/agent.go
Normal file
360
server/internal/backint/agent.go
Normal file
@@ -0,0 +1,360 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
storageRclone "backupx/server/internal/storage/rclone"
|
||||
)
|
||||
|
||||
// Agent 是 Backint 协议代理主入口。
|
||||
//
|
||||
// 职责:
|
||||
// 1. 根据 -f 指定的功能,从 -i 输入文件解析请求
|
||||
// 2. 把数据路由到 BackupX storage 后端
|
||||
// 3. 把结果写回 -o 输出文件(失败使用 #ERROR,不中断批次)
|
||||
type Agent struct {
|
||||
cfg *Config
|
||||
provider storage.StorageProvider
|
||||
catalog *Catalog
|
||||
}
|
||||
|
||||
// NewAgent 构造 Agent,初始化 storage provider 与 catalog。
|
||||
func NewAgent(ctx context.Context, cfg *Config) (*Agent, error) {
|
||||
registry := buildStorageRegistry()
|
||||
provider, err := registry.Create(ctx, cfg.StorageType, cfg.StorageConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create storage provider: %w", err)
|
||||
}
|
||||
if err := provider.TestConnection(ctx); err != nil {
|
||||
return nil, fmt.Errorf("storage provider connection failed: %w", err)
|
||||
}
|
||||
cat, err := OpenCatalog(cfg.CatalogDB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Agent{cfg: cfg, provider: provider, catalog: cat}, nil
|
||||
}
|
||||
|
||||
// Close 释放资源。
|
||||
func (a *Agent) Close() error {
|
||||
if a.catalog != nil {
|
||||
return a.catalog.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run 执行一次 Backint 调用。
|
||||
//
|
||||
// HANA 针对 BACKUP 调用时:input 是 #PIPE 列表,output 需返回 #SAVED 或 #ERROR。
|
||||
// 批次中任一条目失败不应导致整个进程退出,因此错误被降级为 #ERROR 行。
|
||||
// 仅在极端错误(参数非法、I/O 失败)时返回 error,进程以非 0 退出。
|
||||
func (a *Agent) Run(ctx context.Context, fn Function, inputPath, outputPath string) error {
|
||||
in, err := os.Open(inputPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open input: %w", err)
|
||||
}
|
||||
defer in.Close()
|
||||
|
||||
out, err := os.Create(outputPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create output: %w", err)
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
switch fn {
|
||||
case FunctionBackup:
|
||||
return a.runBackup(ctx, in, out)
|
||||
case FunctionRestore:
|
||||
return a.runRestore(ctx, in, out)
|
||||
case FunctionInquire:
|
||||
return a.runInquire(ctx, in, out)
|
||||
case FunctionDelete:
|
||||
return a.runDelete(ctx, in, out)
|
||||
default:
|
||||
return fmt.Errorf("unsupported function: %s", fn)
|
||||
}
|
||||
}
|
||||
|
||||
// runBackup 处理 BACKUP 操作:读取每条请求的管道/文件,上传到存储后端。
|
||||
func (a *Agent) runBackup(ctx context.Context, in io.Reader, out io.Writer) error {
|
||||
reqs, err := ParseBackupRequests(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, req := range reqs {
|
||||
ebid, perr := a.handleBackupOne(ctx, req)
|
||||
if perr != nil {
|
||||
fmt.Fprintf(os.Stderr, "backint: backup %q failed: %v\n", req.Path, perr)
|
||||
_ = WriteError(out, req.Path)
|
||||
continue
|
||||
}
|
||||
_ = WriteSaved(out, ebid, req.Path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleBackupOne 上传一条请求,返回分配的 EBID。
|
||||
func (a *Agent) handleBackupOne(ctx context.Context, req BackupRequest) (string, error) {
|
||||
src, size, err := openBackupSource(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
ebid := generateEBID()
|
||||
objectKey := a.objectKeyFor(ebid)
|
||||
|
||||
reader := io.Reader(src)
|
||||
// 可选 gzip 压缩
|
||||
if a.cfg.Compress {
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
gw := gzip.NewWriter(pw)
|
||||
if _, cerr := io.Copy(gw, src); cerr != nil {
|
||||
_ = gw.Close()
|
||||
_ = pw.CloseWithError(cerr)
|
||||
return
|
||||
}
|
||||
if cerr := gw.Close(); cerr != nil {
|
||||
_ = pw.CloseWithError(cerr)
|
||||
return
|
||||
}
|
||||
_ = pw.Close()
|
||||
}()
|
||||
reader = pr
|
||||
size = -1 // 压缩后大小未知
|
||||
objectKey += ".gz"
|
||||
}
|
||||
|
||||
meta := map[string]string{
|
||||
"source-path": req.Path,
|
||||
"ebid": ebid,
|
||||
"compress": boolStr(a.cfg.Compress),
|
||||
}
|
||||
if err := a.provider.Upload(ctx, objectKey, reader, size, meta); err != nil {
|
||||
return "", fmt.Errorf("upload: %w", err)
|
||||
}
|
||||
|
||||
if err := a.catalog.Put(CatalogEntry{
|
||||
EBID: ebid,
|
||||
ObjectKey: objectKey,
|
||||
SourcePath: req.Path,
|
||||
Size: size,
|
||||
}); err != nil {
|
||||
return "", fmt.Errorf("catalog put: %w", err)
|
||||
}
|
||||
return ebid, nil
|
||||
}
|
||||
|
||||
// runRestore 处理 RESTORE 操作:根据 EBID 从存储下载,写入 HANA 指定的管道/文件。
|
||||
func (a *Agent) runRestore(ctx context.Context, in io.Reader, out io.Writer) error {
|
||||
reqs, err := ParseRestoreRequests(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, req := range reqs {
|
||||
if perr := a.handleRestoreOne(ctx, req); perr != nil {
|
||||
fmt.Fprintf(os.Stderr, "backint: restore %q failed: %v\n", req.EBID, perr)
|
||||
_ = WriteError(out, req.Path)
|
||||
continue
|
||||
}
|
||||
_ = WriteRestored(out, req.EBID, req.Path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) handleRestoreOne(ctx context.Context, req RestoreRequest) error {
|
||||
entry, err := a.catalog.Get(req.EBID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("catalog get: %w", err)
|
||||
}
|
||||
if entry == nil {
|
||||
return fmt.Errorf("ebid not found: %s", req.EBID)
|
||||
}
|
||||
rc, err := a.provider.Download(ctx, entry.ObjectKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("download: %w", err)
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
var src io.Reader = rc
|
||||
if strings.HasSuffix(entry.ObjectKey, ".gz") {
|
||||
gr, err := gzip.NewReader(rc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("gzip reader: %w", err)
|
||||
}
|
||||
defer gr.Close()
|
||||
src = gr
|
||||
}
|
||||
|
||||
dst, err := openRestoreTarget(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dst.Close()
|
||||
|
||||
if _, err := io.Copy(dst, src); err != nil {
|
||||
return fmt.Errorf("copy to target: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runInquire 处理 INQUIRE 操作:查询 EBID 是否存在,或列出全部备份。
|
||||
func (a *Agent) runInquire(ctx context.Context, in io.Reader, out io.Writer) error {
|
||||
reqs, err := ParseInquireRequests(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, req := range reqs {
|
||||
if req.All {
|
||||
entries, err := a.catalog.List()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "backint: inquire list failed: %v\n", err)
|
||||
_ = WriteError(out, "#NULL")
|
||||
continue
|
||||
}
|
||||
for _, e := range entries {
|
||||
_ = WriteBackup(out, e.EBID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
entry, err := a.catalog.Get(req.EBID)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "backint: inquire %q failed: %v\n", req.EBID, err)
|
||||
_ = WriteError(out, req.EBID)
|
||||
continue
|
||||
}
|
||||
if entry == nil {
|
||||
_ = WriteNotFound(out, req.EBID)
|
||||
continue
|
||||
}
|
||||
_ = WriteBackup(out, entry.EBID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runDelete 处理 DELETE 操作:从存储删除对象并移除目录条目。
|
||||
func (a *Agent) runDelete(ctx context.Context, in io.Reader, out io.Writer) error {
|
||||
reqs, err := ParseDeleteRequests(in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, req := range reqs {
|
||||
if perr := a.handleDeleteOne(ctx, req); perr != nil {
|
||||
fmt.Fprintf(os.Stderr, "backint: delete %q failed: %v\n", req.EBID, perr)
|
||||
_ = WriteError(out, req.EBID)
|
||||
continue
|
||||
}
|
||||
_ = WriteDeleted(out, req.EBID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) handleDeleteOne(ctx context.Context, req DeleteRequest) error {
|
||||
entry, err := a.catalog.Get(req.EBID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("catalog get: %w", err)
|
||||
}
|
||||
if entry == nil {
|
||||
return fmt.Errorf("ebid not found: %s", req.EBID)
|
||||
}
|
||||
if err := a.provider.Delete(ctx, entry.ObjectKey); err != nil {
|
||||
// 允许后端返回"不存在"类错误后继续删除目录条目,避免孤立条目
|
||||
fmt.Fprintf(os.Stderr, "backint: storage delete warning for %s: %v\n", entry.ObjectKey, err)
|
||||
}
|
||||
return a.catalog.Delete(req.EBID)
|
||||
}
|
||||
|
||||
// 辅助函数
|
||||
|
||||
func (a *Agent) objectKeyFor(ebid string) string {
|
||||
base := ebid + ".bin"
|
||||
if a.cfg.KeyPrefix == "" {
|
||||
return base
|
||||
}
|
||||
return path.Join(a.cfg.KeyPrefix, base)
|
||||
}
|
||||
|
||||
// openBackupSource 打开 HANA 提供的数据源。
|
||||
//
|
||||
// 对于 #PIPE 模式:HANA 写入命名管道,Agent 读取。管道是顺序流,size 未知 (-1)。
|
||||
// 对于文件模式:HANA 已在指定路径写好完整文件。
|
||||
func openBackupSource(req BackupRequest) (io.ReadCloser, int64, error) {
|
||||
if req.IsPipe {
|
||||
f, err := os.OpenFile(req.Path, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("open pipe: %w", err)
|
||||
}
|
||||
return f, -1, nil
|
||||
}
|
||||
f, err := os.Open(req.Path)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("open file: %w", err)
|
||||
}
|
||||
info, err := f.Stat()
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
return nil, 0, fmt.Errorf("stat: %w", err)
|
||||
}
|
||||
return f, info.Size(), nil
|
||||
}
|
||||
|
||||
// openRestoreTarget 打开 HANA 指定的恢复目标(管道或文件)。
|
||||
func openRestoreTarget(req RestoreRequest) (io.WriteCloser, error) {
|
||||
if req.IsPipe {
|
||||
return os.OpenFile(req.Path, os.O_WRONLY, 0)
|
||||
}
|
||||
return os.Create(req.Path)
|
||||
}
|
||||
|
||||
// generateEBID 生成 Backint 外部备份 ID。
|
||||
// 格式:backupx-<timestamp>-<16 hex chars>
|
||||
func generateEBID() string {
|
||||
var buf [8]byte
|
||||
if _, err := rand.Read(buf[:]); err != nil {
|
||||
// fallback:用纳秒时间戳作为熵
|
||||
now := time.Now().UnixNano()
|
||||
for i := 0; i < 8; i++ {
|
||||
buf[i] = byte(now >> (i * 8))
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("backupx-%d-%s", time.Now().Unix(), hex.EncodeToString(buf[:]))
|
||||
}
|
||||
|
||||
func boolStr(b bool) string {
|
||||
if b {
|
||||
return "true"
|
||||
}
|
||||
return "false"
|
||||
}
|
||||
|
||||
// buildStorageRegistry 构造与主程序一致的 storage registry。
|
||||
//
|
||||
// Backint Agent 作为独立 CLI 进程运行,不依赖 BackupX HTTP 服务,
|
||||
// 因此这里直接引用 storage/rclone 包注册所有后端。
|
||||
func buildStorageRegistry() *storage.Registry {
|
||||
registry := storage.NewRegistry(
|
||||
storageRclone.NewLocalDiskFactory(),
|
||||
storageRclone.NewS3Factory(),
|
||||
storageRclone.NewWebDAVFactory(),
|
||||
storageRclone.NewGoogleDriveFactory(),
|
||||
storageRclone.NewAliyunOSSFactory(),
|
||||
storageRclone.NewTencentCOSFactory(),
|
||||
storageRclone.NewQiniuKodoFactory(),
|
||||
storageRclone.NewFTPFactory(),
|
||||
storageRclone.NewRcloneFactory(),
|
||||
)
|
||||
storageRclone.RegisterAllBackends(registry)
|
||||
return registry
|
||||
}
|
||||
|
||||
217
server/internal/backint/agent_test.go
Normal file
217
server/internal/backint/agent_test.go
Normal file
@@ -0,0 +1,217 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
storageRclone "backupx/server/internal/storage/rclone"
|
||||
)
|
||||
|
||||
// newTestAgent 构造一个使用本地磁盘后端的 Agent,便于集成测试。
|
||||
func newTestAgent(t *testing.T, compress bool) (*Agent, string) {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
storageDir := filepath.Join(dir, "storage")
|
||||
if err := os.MkdirAll(storageDir, 0755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
registry := storage.NewRegistry(storageRclone.NewLocalDiskFactory())
|
||||
provider, err := registry.Create(context.Background(), "local_disk", map[string]any{
|
||||
"basePath": storageDir,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create provider: %v", err)
|
||||
}
|
||||
cat, err := OpenCatalog(filepath.Join(dir, "catalog.db"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
agent := &Agent{
|
||||
cfg: &Config{StorageType: "local_disk", KeyPrefix: "backint", Compress: compress, CatalogDB: filepath.Join(dir, "catalog.db")},
|
||||
provider: provider,
|
||||
catalog: cat,
|
||||
}
|
||||
t.Cleanup(func() { _ = agent.Close() })
|
||||
return agent, dir
|
||||
}
|
||||
|
||||
func TestAgent_BackupAndRestore_File(t *testing.T) {
|
||||
agent, dir := newTestAgent(t, false)
|
||||
ctx := context.Background()
|
||||
|
||||
// 准备源文件
|
||||
src := filepath.Join(dir, "src.bak")
|
||||
content := []byte("hello backint world")
|
||||
if err := os.WriteFile(src, content, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// BACKUP
|
||||
inPath := filepath.Join(dir, "backup.in")
|
||||
outPath := filepath.Join(dir, "backup.out")
|
||||
if err := os.WriteFile(inPath, []byte(src+"\n"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := agent.Run(ctx, FunctionBackup, inPath, outPath); err != nil {
|
||||
t.Fatalf("backup: %v", err)
|
||||
}
|
||||
out, _ := os.ReadFile(outPath)
|
||||
if !bytes.HasPrefix(out, []byte("#SAVED ")) {
|
||||
t.Fatalf("expected #SAVED, got: %s", out)
|
||||
}
|
||||
// 提取 EBID:#SAVED <ebid> "<path>"
|
||||
parts := strings.Fields(string(out))
|
||||
if len(parts) < 3 {
|
||||
t.Fatalf("malformed output: %s", out)
|
||||
}
|
||||
ebid := parts[1]
|
||||
|
||||
// RESTORE
|
||||
restoreDst := filepath.Join(dir, "restored.bak")
|
||||
inPath2 := filepath.Join(dir, "restore.in")
|
||||
outPath2 := filepath.Join(dir, "restore.out")
|
||||
if err := os.WriteFile(inPath2, []byte(ebid+" \""+restoreDst+"\"\n"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := agent.Run(ctx, FunctionRestore, inPath2, outPath2); err != nil {
|
||||
t.Fatalf("restore: %v", err)
|
||||
}
|
||||
got, err := os.ReadFile(restoreDst)
|
||||
if err != nil {
|
||||
t.Fatalf("read restored: %v", err)
|
||||
}
|
||||
if !bytes.Equal(got, content) {
|
||||
t.Errorf("restored content mismatch: %q vs %q", got, content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_BackupWithCompression(t *testing.T) {
|
||||
agent, dir := newTestAgent(t, true)
|
||||
ctx := context.Background()
|
||||
|
||||
src := filepath.Join(dir, "src.bak")
|
||||
content := bytes.Repeat([]byte("ABCDEFGH"), 1024)
|
||||
if err := os.WriteFile(src, content, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
inPath := filepath.Join(dir, "backup.in")
|
||||
outPath := filepath.Join(dir, "backup.out")
|
||||
_ = os.WriteFile(inPath, []byte(src+"\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionBackup, inPath, outPath); err != nil {
|
||||
t.Fatalf("backup: %v", err)
|
||||
}
|
||||
parts := strings.Fields(string(mustRead(t, outPath)))
|
||||
ebid := parts[1]
|
||||
|
||||
// 验证 catalog 记录的对象键以 .gz 结尾
|
||||
entry, _ := agent.catalog.Get(ebid)
|
||||
if entry == nil || !strings.HasSuffix(entry.ObjectKey, ".gz") {
|
||||
t.Fatalf("expected .gz suffix: %+v", entry)
|
||||
}
|
||||
|
||||
// RESTORE 应能解压回原始内容
|
||||
dst := filepath.Join(dir, "restored.bak")
|
||||
in2 := filepath.Join(dir, "restore.in")
|
||||
out2 := filepath.Join(dir, "restore.out")
|
||||
_ = os.WriteFile(in2, []byte(ebid+" \""+dst+"\"\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionRestore, in2, out2); err != nil {
|
||||
t.Fatalf("restore: %v", err)
|
||||
}
|
||||
got := mustRead(t, dst)
|
||||
if !bytes.Equal(got, content) {
|
||||
t.Errorf("decompressed content mismatch (len=%d vs %d)", len(got), len(content))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_Inquire(t *testing.T) {
|
||||
agent, dir := newTestAgent(t, false)
|
||||
ctx := context.Background()
|
||||
|
||||
// 注入两条目录记录
|
||||
_ = agent.catalog.Put(CatalogEntry{EBID: "bid-a", ObjectKey: "k/a"})
|
||||
_ = agent.catalog.Put(CatalogEntry{EBID: "bid-b", ObjectKey: "k/b"})
|
||||
|
||||
// INQUIRE #NULL 应列出全部
|
||||
in := filepath.Join(dir, "inq.in")
|
||||
out := filepath.Join(dir, "inq.out")
|
||||
_ = os.WriteFile(in, []byte("#NULL\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionInquire, in, out); err != nil {
|
||||
t.Fatalf("inquire: %v", err)
|
||||
}
|
||||
text := string(mustRead(t, out))
|
||||
if !strings.Contains(text, "bid-a") || !strings.Contains(text, "bid-b") {
|
||||
t.Errorf("expected both ebids, got: %s", text)
|
||||
}
|
||||
|
||||
// INQUIRE 不存在的 ebid → #NOTFOUND
|
||||
_ = os.WriteFile(in, []byte("bid-missing\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionInquire, in, out); err != nil {
|
||||
t.Fatalf("inquire missing: %v", err)
|
||||
}
|
||||
text = string(mustRead(t, out))
|
||||
if !strings.Contains(text, "#NOTFOUND") {
|
||||
t.Errorf("expected #NOTFOUND, got: %s", text)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_Delete(t *testing.T) {
|
||||
agent, dir := newTestAgent(t, false)
|
||||
ctx := context.Background()
|
||||
|
||||
// 先做一次 BACKUP
|
||||
src := filepath.Join(dir, "src.bak")
|
||||
_ = os.WriteFile(src, []byte("data"), 0644)
|
||||
inPath := filepath.Join(dir, "b.in")
|
||||
outPath := filepath.Join(dir, "b.out")
|
||||
_ = os.WriteFile(inPath, []byte(src+"\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionBackup, inPath, outPath); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ebid := strings.Fields(string(mustRead(t, outPath)))[1]
|
||||
|
||||
// DELETE
|
||||
delIn := filepath.Join(dir, "d.in")
|
||||
delOut := filepath.Join(dir, "d.out")
|
||||
_ = os.WriteFile(delIn, []byte(ebid+"\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionDelete, delIn, delOut); err != nil {
|
||||
t.Fatalf("delete: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(mustRead(t, delOut)), "#DELETED") {
|
||||
t.Errorf("expected #DELETED, got: %s", mustRead(t, delOut))
|
||||
}
|
||||
// catalog 条目应已删除
|
||||
if entry, _ := agent.catalog.Get(ebid); entry != nil {
|
||||
t.Errorf("catalog entry should be removed, got: %+v", entry)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_RestoreUnknownEBID(t *testing.T) {
|
||||
agent, dir := newTestAgent(t, false)
|
||||
ctx := context.Background()
|
||||
|
||||
in := filepath.Join(dir, "r.in")
|
||||
out := filepath.Join(dir, "r.out")
|
||||
_ = os.WriteFile(in, []byte("bid-unknown \""+filepath.Join(dir, "dst")+"\"\n"), 0644)
|
||||
if err := agent.Run(ctx, FunctionRestore, in, out); err != nil {
|
||||
t.Fatalf("run: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(mustRead(t, out)), "#ERROR") {
|
||||
t.Errorf("expected #ERROR for unknown ebid, got: %s", mustRead(t, out))
|
||||
}
|
||||
}
|
||||
|
||||
func mustRead(t *testing.T, path string) []byte {
|
||||
t.Helper()
|
||||
b, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatalf("read %s: %v", path, err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
102
server/internal/backint/catalog.go
Normal file
102
server/internal/backint/catalog.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/glebarez/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
gormlogger "gorm.io/gorm/logger"
|
||||
)
|
||||
|
||||
// CatalogEntry 是 Backint 目录条目,建立 BID (备份 ID) 与对象键的映射。
|
||||
//
|
||||
// BID 是 Backint Agent 返回给 SAP HANA 的唯一标识,HANA 后续用它作为 RESTORE/DELETE
|
||||
// 的句柄。Agent 用 catalog 查询该 BID 对应的实际存储对象键。
|
||||
type CatalogEntry struct {
|
||||
ID uint `gorm:"primaryKey"`
|
||||
EBID string `gorm:"column:ebid;uniqueIndex;size:128;not null"`
|
||||
ObjectKey string `gorm:"column:object_key;size:512;not null"`
|
||||
SourcePath string `gorm:"column:source_path;size:1024"`
|
||||
Size int64 `gorm:"column:size"`
|
||||
CreatedAt time.Time `gorm:"column:created_at"`
|
||||
}
|
||||
|
||||
// TableName 指定表名,避免 GORM 自动复数化。
|
||||
func (CatalogEntry) TableName() string { return "backint_catalog" }
|
||||
|
||||
// Catalog 是本地 Backint 目录(SQLite 后端)。
|
||||
type Catalog struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
// OpenCatalog 打开或创建 catalog 数据库。
|
||||
func OpenCatalog(dbPath string) (*Catalog, error) {
|
||||
db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{
|
||||
Logger: gormlogger.Default.LogMode(gormlogger.Silent),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open catalog: %w", err)
|
||||
}
|
||||
if err := db.AutoMigrate(&CatalogEntry{}); err != nil {
|
||||
return nil, fmt.Errorf("migrate catalog: %w", err)
|
||||
}
|
||||
return &Catalog{db: db}, nil
|
||||
}
|
||||
|
||||
// Close 关闭底层连接。
|
||||
func (c *Catalog) Close() error {
|
||||
if c.db == nil {
|
||||
return nil
|
||||
}
|
||||
sqlDB, err := c.db.DB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqlDB.Close()
|
||||
}
|
||||
|
||||
// Put 插入或更新一条记录。
|
||||
func (c *Catalog) Put(entry CatalogEntry) error {
|
||||
if entry.EBID == "" {
|
||||
return fmt.Errorf("ebid is required")
|
||||
}
|
||||
if entry.CreatedAt.IsZero() {
|
||||
entry.CreatedAt = time.Now().UTC()
|
||||
}
|
||||
// Upsert:EBID 冲突时更新 object_key/size/source_path
|
||||
return c.db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "ebid"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{
|
||||
"object_key", "source_path", "size", "created_at",
|
||||
}),
|
||||
}).Create(&entry).Error
|
||||
}
|
||||
|
||||
// Get 通过 EBID 查询条目。未找到返回 (nil, nil)。
|
||||
func (c *Catalog) Get(ebid string) (*CatalogEntry, error) {
|
||||
var entry CatalogEntry
|
||||
err := c.db.Where("ebid = ?", ebid).First(&entry).Error
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &entry, nil
|
||||
}
|
||||
|
||||
// Delete 删除一条记录。
|
||||
func (c *Catalog) Delete(ebid string) error {
|
||||
return c.db.Where("ebid = ?", ebid).Delete(&CatalogEntry{}).Error
|
||||
}
|
||||
|
||||
// List 列出全部条目。
|
||||
func (c *Catalog) List() ([]CatalogEntry, error) {
|
||||
var entries []CatalogEntry
|
||||
if err := c.db.Order("created_at DESC").Find(&entries).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
74
server/internal/backint/catalog_test.go
Normal file
74
server/internal/backint/catalog_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCatalog_CRUD(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
cat, err := OpenCatalog(filepath.Join(dir, "test.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
defer cat.Close()
|
||||
|
||||
if err := cat.Put(CatalogEntry{EBID: "bid-1", ObjectKey: "k/1.bin", SourcePath: "/tmp/a", Size: 100}); err != nil {
|
||||
t.Fatalf("put: %v", err)
|
||||
}
|
||||
if err := cat.Put(CatalogEntry{EBID: "bid-2", ObjectKey: "k/2.bin", Size: 200}); err != nil {
|
||||
t.Fatalf("put: %v", err)
|
||||
}
|
||||
|
||||
got, err := cat.Get("bid-1")
|
||||
if err != nil || got == nil {
|
||||
t.Fatalf("get: %v %v", got, err)
|
||||
}
|
||||
if got.ObjectKey != "k/1.bin" || got.Size != 100 {
|
||||
t.Errorf("mismatch: %+v", got)
|
||||
}
|
||||
|
||||
// 不存在的条目
|
||||
missing, err := cat.Get("bid-999")
|
||||
if err != nil {
|
||||
t.Fatalf("get missing: %v", err)
|
||||
}
|
||||
if missing != nil {
|
||||
t.Errorf("expected nil, got %+v", missing)
|
||||
}
|
||||
|
||||
// List
|
||||
all, err := cat.List()
|
||||
if err != nil || len(all) != 2 {
|
||||
t.Fatalf("list: %v %d", err, len(all))
|
||||
}
|
||||
|
||||
// Delete
|
||||
if err := cat.Delete("bid-1"); err != nil {
|
||||
t.Fatalf("delete: %v", err)
|
||||
}
|
||||
got, _ = cat.Get("bid-1")
|
||||
if got != nil {
|
||||
t.Errorf("bid-1 should be deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalog_UpsertSameEBID(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
cat, err := OpenCatalog(filepath.Join(dir, "test.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
defer cat.Close()
|
||||
|
||||
if err := cat.Put(CatalogEntry{EBID: "bid-x", ObjectKey: "v1"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := cat.Put(CatalogEntry{EBID: "bid-x", ObjectKey: "v2"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
got, _ := cat.Get("bid-x")
|
||||
if got == nil || got.ObjectKey != "v2" {
|
||||
t.Errorf("upsert failed: %+v", got)
|
||||
}
|
||||
}
|
||||
140
server/internal/backint/config.go
Normal file
140
server/internal/backint/config.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Config 是 Backint Agent 的运行时配置。
|
||||
//
|
||||
// SAP HANA 通过 -p <paramfile> 传入一个参数文件。BackupX Backint Agent 复用 SAP
|
||||
// 的"#KEY = VALUE"风格(兼容原生 backint 参数文件习惯),不支持 section。
|
||||
//
|
||||
// 必填字段:
|
||||
// - STORAGE_TYPE:存储类型(s3/webdav/local_disk/...,与 BackupX storage registry 一致)
|
||||
// - STORAGE_CONFIG_JSON:存储配置 JSON 文件路径(或直接 STORAGE_CONFIG = <json>)
|
||||
//
|
||||
// 可选字段:
|
||||
// - PARALLEL_FACTOR:并行度(默认 1)
|
||||
// - COMPRESS:是否 gzip 压缩(true/false,默认 false)
|
||||
// - LOG_FILE:日志文件路径(默认 stderr)
|
||||
// - CATALOG_DB:本地目录数据库路径(默认 ./backint_catalog.db)
|
||||
// - KEY_PREFIX:对象键前缀(默认空,最终对象键 = <prefix>/<ebid>)
|
||||
type Config struct {
|
||||
StorageType string
|
||||
StorageConfigJSON string // 存储配置 JSON 文件路径
|
||||
StorageConfigRaw []byte // 也支持直接内联(STORAGE_CONFIG)
|
||||
StorageConfig map[string]any // 解析后的存储配置
|
||||
ParallelFactor int
|
||||
Compress bool
|
||||
LogFile string
|
||||
CatalogDB string
|
||||
KeyPrefix string
|
||||
}
|
||||
|
||||
// LoadConfigFile 从文件加载配置。
|
||||
func LoadConfigFile(path string) (*Config, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open backint config: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
return ParseConfig(f)
|
||||
}
|
||||
|
||||
// ParseConfig 从 reader 解析配置。
|
||||
func ParseConfig(r io.Reader) (*Config, error) {
|
||||
cfg := &Config{ParallelFactor: 1}
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" || strings.HasPrefix(line, ";") {
|
||||
continue
|
||||
}
|
||||
// 兼容可选的 "#" 前缀(SAP 约定)
|
||||
line = strings.TrimPrefix(line, "#")
|
||||
eq := strings.Index(line, "=")
|
||||
if eq < 0 {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSpace(line[:eq])
|
||||
value := strings.TrimSpace(line[eq+1:])
|
||||
if len(value) >= 2 && value[0] == '"' && value[len(value)-1] == '"' {
|
||||
value = value[1 : len(value)-1]
|
||||
}
|
||||
switch strings.ToUpper(key) {
|
||||
case "STORAGE_TYPE":
|
||||
cfg.StorageType = value
|
||||
case "STORAGE_CONFIG_JSON":
|
||||
cfg.StorageConfigJSON = value
|
||||
case "STORAGE_CONFIG":
|
||||
cfg.StorageConfigRaw = []byte(value)
|
||||
case "PARALLEL_FACTOR":
|
||||
n, err := strconv.Atoi(value)
|
||||
if err != nil || n <= 0 {
|
||||
return nil, fmt.Errorf("invalid PARALLEL_FACTOR: %q", value)
|
||||
}
|
||||
cfg.ParallelFactor = n
|
||||
case "COMPRESS":
|
||||
cfg.Compress = parseBool(value)
|
||||
case "LOG_FILE":
|
||||
cfg.LogFile = value
|
||||
case "CATALOG_DB":
|
||||
cfg.CatalogDB = value
|
||||
case "KEY_PREFIX":
|
||||
cfg.KeyPrefix = strings.Trim(value, "/")
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := cfg.finalize(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (c *Config) finalize() error {
|
||||
if c.StorageType == "" {
|
||||
return errors.New("STORAGE_TYPE is required")
|
||||
}
|
||||
if c.CatalogDB == "" {
|
||||
c.CatalogDB = "./backint_catalog.db"
|
||||
}
|
||||
// 加载存储配置 JSON
|
||||
var raw []byte
|
||||
switch {
|
||||
case c.StorageConfigJSON != "":
|
||||
data, err := os.ReadFile(c.StorageConfigJSON)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read STORAGE_CONFIG_JSON: %w", err)
|
||||
}
|
||||
raw = data
|
||||
case len(c.StorageConfigRaw) > 0:
|
||||
raw = c.StorageConfigRaw
|
||||
default:
|
||||
return errors.New("STORAGE_CONFIG_JSON or STORAGE_CONFIG is required")
|
||||
}
|
||||
var m map[string]any
|
||||
if err := json.Unmarshal(raw, &m); err != nil {
|
||||
return fmt.Errorf("parse storage config JSON: %w", err)
|
||||
}
|
||||
c.StorageConfig = m
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseBool(v string) bool {
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "1", "true", "yes", "on":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
74
server/internal/backint/config_test.go
Normal file
74
server/internal/backint/config_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseConfig(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
storagePath := filepath.Join(dir, "storage.json")
|
||||
if err := os.WriteFile(storagePath, []byte(`{"basePath":"/tmp/backup"}`), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
input := `
|
||||
; 注释
|
||||
#STORAGE_TYPE = local_disk
|
||||
#STORAGE_CONFIG_JSON = ` + storagePath + `
|
||||
#PARALLEL_FACTOR = 4
|
||||
#COMPRESS = true
|
||||
#KEY_PREFIX = /hana/backups/
|
||||
#CATALOG_DB = ` + filepath.Join(dir, "catalog.db") + `
|
||||
`
|
||||
cfg, err := ParseConfig(strings.NewReader(input))
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if cfg.StorageType != "local_disk" {
|
||||
t.Errorf("StorageType: %q", cfg.StorageType)
|
||||
}
|
||||
if cfg.ParallelFactor != 4 {
|
||||
t.Errorf("ParallelFactor: %d", cfg.ParallelFactor)
|
||||
}
|
||||
if !cfg.Compress {
|
||||
t.Errorf("Compress should be true")
|
||||
}
|
||||
if cfg.KeyPrefix != "hana/backups" {
|
||||
t.Errorf("KeyPrefix should be trimmed: %q", cfg.KeyPrefix)
|
||||
}
|
||||
if cfg.StorageConfig["basePath"] != "/tmp/backup" {
|
||||
t.Errorf("StorageConfig mismatch: %+v", cfg.StorageConfig)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_MissingStorageType(t *testing.T) {
|
||||
input := `PARALLEL_FACTOR = 1`
|
||||
if _, err := ParseConfig(strings.NewReader(input)); err == nil {
|
||||
t.Fatal("expected error for missing STORAGE_TYPE")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_InlineStorageConfig(t *testing.T) {
|
||||
input := `STORAGE_TYPE = local_disk
|
||||
STORAGE_CONFIG = {"basePath":"/x"}
|
||||
`
|
||||
cfg, err := ParseConfig(strings.NewReader(input))
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if cfg.StorageConfig["basePath"] != "/x" {
|
||||
t.Errorf("inline config not parsed: %+v", cfg.StorageConfig)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseConfig_InvalidParallel(t *testing.T) {
|
||||
input := `STORAGE_TYPE = local_disk
|
||||
STORAGE_CONFIG = {}
|
||||
PARALLEL_FACTOR = oops
|
||||
`
|
||||
if _, err := ParseConfig(strings.NewReader(input)); err == nil {
|
||||
t.Fatal("expected error for invalid PARALLEL_FACTOR")
|
||||
}
|
||||
}
|
||||
267
server/internal/backint/protocol.go
Normal file
267
server/internal/backint/protocol.go
Normal file
@@ -0,0 +1,267 @@
|
||||
// Package backint 实现 SAP HANA Backint 协议代理。
|
||||
//
|
||||
// Backint 协议是 SAP HANA 与第三方备份工具之间的管道/文件协议。
|
||||
// SAP HANA 通过 CLI 调用 Backint Agent,传入参数文件、输入文件、输出文件,
|
||||
// Agent 根据输入文件中的 #PIPE / #EBID / #NULL 指令读取/写入数据,
|
||||
// 并在输出文件中返回 #SAVED / #RESTORED / #BACKUP / #NOTFOUND / #DELETED / #ERROR。
|
||||
//
|
||||
// 支持的功能:BACKUP / RESTORE / INQUIRE / DELETE
|
||||
// 参考规范:SAP HANA Backint Interface for Backup Tools (OSS 1642148)
|
||||
package backint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Function 代表 Backint 操作类型,对应 CLI 的 -f 参数。
|
||||
type Function string
|
||||
|
||||
const (
|
||||
FunctionBackup Function = "backup"
|
||||
FunctionRestore Function = "restore"
|
||||
FunctionInquire Function = "inquire"
|
||||
FunctionDelete Function = "delete"
|
||||
)
|
||||
|
||||
// BackupRequest 是 BACKUP 操作的单条请求。
|
||||
//
|
||||
// 两种形态:
|
||||
// - Pipe: #PIPE <path> (HANA 通过命名管道传输数据)
|
||||
// - File: "<path>" (HANA 指向一个已完成的临时文件)
|
||||
type BackupRequest struct {
|
||||
IsPipe bool
|
||||
Path string
|
||||
}
|
||||
|
||||
// RestoreRequest 是 RESTORE 操作的单条请求。
|
||||
//
|
||||
// 形态:#PIPE <ebid> "<path>" 或 <ebid> "<path>"
|
||||
type RestoreRequest struct {
|
||||
IsPipe bool
|
||||
EBID string // 之前 BACKUP 返回的备份 ID
|
||||
Path string
|
||||
}
|
||||
|
||||
// InquireRequest 是 INQUIRE 操作的单条请求。
|
||||
//
|
||||
// 形态:
|
||||
// - #NULL (列出所有备份)
|
||||
// - "<ebid>" (查询指定 ID 是否存在)
|
||||
// - #EBID "<ebid>" (带前缀的变体)
|
||||
type InquireRequest struct {
|
||||
All bool
|
||||
EBID string
|
||||
}
|
||||
|
||||
// DeleteRequest 是 DELETE 操作的单条请求。
|
||||
//
|
||||
// 形态:<ebid> 或 #EBID <ebid>
|
||||
type DeleteRequest struct {
|
||||
EBID string
|
||||
}
|
||||
|
||||
// ParseBackupRequests 解析 BACKUP 输入文件。
|
||||
func ParseBackupRequests(r io.Reader) ([]BackupRequest, error) {
|
||||
var items []BackupRequest
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(line, "#PIPE") {
|
||||
path := strings.TrimSpace(strings.TrimPrefix(line, "#PIPE"))
|
||||
if path == "" {
|
||||
return nil, fmt.Errorf("invalid #PIPE line: %q", line)
|
||||
}
|
||||
items = append(items, BackupRequest{IsPipe: true, Path: trimQuotes(path)})
|
||||
continue
|
||||
}
|
||||
items = append(items, BackupRequest{IsPipe: false, Path: trimQuotes(line)})
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// ParseRestoreRequests 解析 RESTORE 输入文件。
|
||||
func ParseRestoreRequests(r io.Reader) ([]RestoreRequest, error) {
|
||||
var items []RestoreRequest
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
isPipe := false
|
||||
if strings.HasPrefix(line, "#PIPE") {
|
||||
isPipe = true
|
||||
line = strings.TrimSpace(strings.TrimPrefix(line, "#PIPE"))
|
||||
}
|
||||
if strings.HasPrefix(line, "#EBID") {
|
||||
line = strings.TrimSpace(strings.TrimPrefix(line, "#EBID"))
|
||||
}
|
||||
ebid, rest := splitFirstField(line)
|
||||
if ebid == "" || rest == "" {
|
||||
return nil, fmt.Errorf("invalid restore line: %q", line)
|
||||
}
|
||||
items = append(items, RestoreRequest{
|
||||
IsPipe: isPipe,
|
||||
EBID: trimQuotes(ebid),
|
||||
Path: trimQuotes(rest),
|
||||
})
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// ParseInquireRequests 解析 INQUIRE 输入文件。
|
||||
func ParseInquireRequests(r io.Reader) ([]InquireRequest, error) {
|
||||
var items []InquireRequest
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if line == "#NULL" {
|
||||
items = append(items, InquireRequest{All: true})
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(line, "#EBID") {
|
||||
line = strings.TrimSpace(strings.TrimPrefix(line, "#EBID"))
|
||||
}
|
||||
items = append(items, InquireRequest{EBID: trimQuotes(line)})
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// ParseDeleteRequests 解析 DELETE 输入文件。
|
||||
func ParseDeleteRequests(r io.Reader) ([]DeleteRequest, error) {
|
||||
var items []DeleteRequest
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(line, "#EBID") {
|
||||
line = strings.TrimSpace(strings.TrimPrefix(line, "#EBID"))
|
||||
}
|
||||
ebid := trimQuotes(strings.TrimSpace(line))
|
||||
if ebid == "" {
|
||||
return nil, fmt.Errorf("invalid delete line: %q", line)
|
||||
}
|
||||
items = append(items, DeleteRequest{EBID: ebid})
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// 输出写入辅助
|
||||
|
||||
// WriteSaved 写入一条 BACKUP 成功响应:#SAVED <ebid> "<path>"
|
||||
func WriteSaved(w io.Writer, ebid, path string) error {
|
||||
_, err := fmt.Fprintf(w, "#SAVED %s %s\n", ebid, quote(path))
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteRestored 写入一条 RESTORE 成功响应:#RESTORED "<ebid>" "<path>"
|
||||
func WriteRestored(w io.Writer, ebid, path string) error {
|
||||
_, err := fmt.Fprintf(w, "#RESTORED %s %s\n", quote(ebid), quote(path))
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteBackup 写入一条 INQUIRE 命中响应:#BACKUP "<ebid>"
|
||||
func WriteBackup(w io.Writer, ebid string) error {
|
||||
_, err := fmt.Fprintf(w, "#BACKUP %s\n", quote(ebid))
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteNotFound 写入一条 INQUIRE/RESTORE 未命中响应:#NOTFOUND "<path-or-ebid>"
|
||||
func WriteNotFound(w io.Writer, identifier string) error {
|
||||
_, err := fmt.Fprintf(w, "#NOTFOUND %s\n", quote(identifier))
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteDeleted 写入一条 DELETE 成功响应:#DELETED "<ebid>"
|
||||
func WriteDeleted(w io.Writer, ebid string) error {
|
||||
_, err := fmt.Fprintf(w, "#DELETED %s\n", quote(ebid))
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteError 写入一条错误响应:#ERROR "<path-or-ebid>"
|
||||
//
|
||||
// SAP HANA 会将 #ERROR 视为本条请求失败,但不会终止整个批次。
|
||||
// 在 stderr 输出错误详情便于排查。
|
||||
func WriteError(w io.Writer, identifier string) error {
|
||||
_, err := fmt.Fprintf(w, "#ERROR %s\n", quote(identifier))
|
||||
return err
|
||||
}
|
||||
|
||||
// 内部工具函数
|
||||
|
||||
func trimQuotes(s string) string {
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) >= 2 && s[0] == '"' && s[len(s)-1] == '"' {
|
||||
return s[1 : len(s)-1]
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func quote(s string) string {
|
||||
return `"` + strings.ReplaceAll(s, `"`, `\"`) + `"`
|
||||
}
|
||||
|
||||
// splitFirstField 把一行拆分为 "第一个字段" 和 "剩余部分"。
|
||||
// 支持带引号的字段:`"abc def" "path"` → `abc def` / `"path"`。
|
||||
func splitFirstField(line string) (first, rest string) {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
return "", ""
|
||||
}
|
||||
if line[0] == '"' {
|
||||
idx := strings.Index(line[1:], `"`)
|
||||
if idx < 0 {
|
||||
return line, ""
|
||||
}
|
||||
return line[1 : idx+1], strings.TrimSpace(line[idx+2:])
|
||||
}
|
||||
idx := strings.IndexAny(line, " \t")
|
||||
if idx < 0 {
|
||||
return line, ""
|
||||
}
|
||||
return line[:idx], strings.TrimSpace(line[idx+1:])
|
||||
}
|
||||
|
||||
// ParseFunction 将 CLI 的 -f 参数字符串规范化为 Function。
|
||||
func ParseFunction(s string) (Function, error) {
|
||||
switch strings.ToLower(strings.TrimSpace(s)) {
|
||||
case "backup":
|
||||
return FunctionBackup, nil
|
||||
case "restore":
|
||||
return FunctionRestore, nil
|
||||
case "inquire":
|
||||
return FunctionInquire, nil
|
||||
case "delete":
|
||||
return FunctionDelete, nil
|
||||
default:
|
||||
return "", errors.New("unsupported backint function: " + s)
|
||||
}
|
||||
}
|
||||
142
server/internal/backint/protocol_test.go
Normal file
142
server/internal/backint/protocol_test.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package backint
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseBackupRequests(t *testing.T) {
|
||||
input := `#PIPE /tmp/pipe1
|
||||
#PIPE "/tmp/pipe two"
|
||||
/tmp/file.bak
|
||||
"/tmp/file two.bak"
|
||||
`
|
||||
reqs, err := ParseBackupRequests(strings.NewReader(input))
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(reqs) != 4 {
|
||||
t.Fatalf("expected 4 requests, got %d", len(reqs))
|
||||
}
|
||||
if !reqs[0].IsPipe || reqs[0].Path != "/tmp/pipe1" {
|
||||
t.Errorf("req[0] mismatch: %+v", reqs[0])
|
||||
}
|
||||
if !reqs[1].IsPipe || reqs[1].Path != "/tmp/pipe two" {
|
||||
t.Errorf("req[1] mismatch: %+v", reqs[1])
|
||||
}
|
||||
if reqs[2].IsPipe || reqs[2].Path != "/tmp/file.bak" {
|
||||
t.Errorf("req[2] mismatch: %+v", reqs[2])
|
||||
}
|
||||
if reqs[3].Path != "/tmp/file two.bak" {
|
||||
t.Errorf("req[3] mismatch: %+v", reqs[3])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseRestoreRequests(t *testing.T) {
|
||||
input := `#PIPE backupx-123 "/tmp/pipe1"
|
||||
#EBID "backupx-456" "/tmp/file.bak"
|
||||
backupx-789 /tmp/plain.bak
|
||||
`
|
||||
reqs, err := ParseRestoreRequests(strings.NewReader(input))
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(reqs) != 3 {
|
||||
t.Fatalf("expected 3, got %d", len(reqs))
|
||||
}
|
||||
if !reqs[0].IsPipe || reqs[0].EBID != "backupx-123" || reqs[0].Path != "/tmp/pipe1" {
|
||||
t.Errorf("req[0] mismatch: %+v", reqs[0])
|
||||
}
|
||||
if reqs[1].IsPipe || reqs[1].EBID != "backupx-456" {
|
||||
t.Errorf("req[1] mismatch: %+v", reqs[1])
|
||||
}
|
||||
if reqs[2].EBID != "backupx-789" || reqs[2].Path != "/tmp/plain.bak" {
|
||||
t.Errorf("req[2] mismatch: %+v", reqs[2])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseInquireRequests(t *testing.T) {
|
||||
input := "#NULL\nbackupx-abc\n#EBID \"backupx-xyz\"\n"
|
||||
reqs, err := ParseInquireRequests(strings.NewReader(input))
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(reqs) != 3 {
|
||||
t.Fatalf("expected 3, got %d", len(reqs))
|
||||
}
|
||||
if !reqs[0].All {
|
||||
t.Errorf("req[0] should be All")
|
||||
}
|
||||
if reqs[1].EBID != "backupx-abc" {
|
||||
t.Errorf("req[1] mismatch: %+v", reqs[1])
|
||||
}
|
||||
if reqs[2].EBID != "backupx-xyz" {
|
||||
t.Errorf("req[2] mismatch: %+v", reqs[2])
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDeleteRequests(t *testing.T) {
|
||||
input := "backupx-aaa\n#EBID \"backupx-bbb\"\n"
|
||||
reqs, err := ParseDeleteRequests(strings.NewReader(input))
|
||||
if err != nil {
|
||||
t.Fatalf("parse: %v", err)
|
||||
}
|
||||
if len(reqs) != 2 || reqs[0].EBID != "backupx-aaa" || reqs[1].EBID != "backupx-bbb" {
|
||||
t.Fatalf("unexpected: %+v", reqs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteResponses(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
_ = WriteSaved(&buf, "backupx-1", "/tmp/x")
|
||||
_ = WriteRestored(&buf, "backupx-2", "/tmp/y")
|
||||
_ = WriteBackup(&buf, "backupx-3")
|
||||
_ = WriteNotFound(&buf, "backupx-4")
|
||||
_ = WriteDeleted(&buf, "backupx-5")
|
||||
_ = WriteError(&buf, "/tmp/z")
|
||||
want := "#SAVED backupx-1 \"/tmp/x\"\n" +
|
||||
"#RESTORED \"backupx-2\" \"/tmp/y\"\n" +
|
||||
"#BACKUP \"backupx-3\"\n" +
|
||||
"#NOTFOUND \"backupx-4\"\n" +
|
||||
"#DELETED \"backupx-5\"\n" +
|
||||
"#ERROR \"/tmp/z\"\n"
|
||||
if buf.String() != want {
|
||||
t.Errorf("output mismatch:\n got: %q\nwant: %q", buf.String(), want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseFunction(t *testing.T) {
|
||||
cases := map[string]Function{
|
||||
"backup": FunctionBackup,
|
||||
"BACKUP": FunctionBackup,
|
||||
"restore": FunctionRestore,
|
||||
"inquire": FunctionInquire,
|
||||
"delete": FunctionDelete,
|
||||
}
|
||||
for s, want := range cases {
|
||||
got, err := ParseFunction(s)
|
||||
if err != nil || got != want {
|
||||
t.Errorf("ParseFunction(%q) = %v, %v; want %v", s, got, err, want)
|
||||
}
|
||||
}
|
||||
if _, err := ParseFunction("bogus"); err == nil {
|
||||
t.Errorf("expected error for bogus function")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitFirstField(t *testing.T) {
|
||||
cases := []struct{ in, first, rest string }{
|
||||
{`abc def`, "abc", "def"},
|
||||
{`"abc def" ghi`, "abc def", "ghi"},
|
||||
{`"a b" "c d"`, "a b", `"c d"`},
|
||||
{`lone`, "lone", ""},
|
||||
{``, "", ""},
|
||||
}
|
||||
for _, c := range cases {
|
||||
f, r := splitFirstField(c.in)
|
||||
if f != c.first || r != c.rest {
|
||||
t.Errorf("splitFirstField(%q) = (%q, %q); want (%q, %q)", c.in, f, r, c.first, c.rest)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user