feat(backup): 新增 zstd 压缩选项 (#89)

备份压缩在 gzip 之外新增 zstd(更高压缩率、更快解压)。pkg/compress 新增 ZstdFile/UnzstdFile,Master 与 Agent 压缩/解压按后缀分流,任务校验与前端下拉同步;往返单测覆盖。
This commit is contained in:
Wu Qing
2026-05-27 19:15:06 +08:00
committed by GitHub
parent 90b58d58d6
commit 65cf3a04d4
9 changed files with 153 additions and 4 deletions

View File

@@ -6,6 +6,7 @@ require (
github.com/gin-gonic/gin v1.10.1
github.com/glebarez/sqlite v1.11.0
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/klauspost/compress v1.18.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pquerna/otp v1.5.0
github.com/prometheus/client_golang v1.23.2
@@ -149,7 +150,6 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolio/noiseconn v0.0.0-20231127013910-f6d9ecbf1de7 // indirect
github.com/jzelinskie/whirlpool v0.0.0-20201016144138-0675e54bb004 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/koofr/go-httpclient v0.0.0-20240520111329-e20f8f203988 // indirect
github.com/koofr/go-koofrclient v0.0.0-20221207135200-cbd7fc9ad6a6 // indirect

View File

@@ -107,6 +107,14 @@ func (e *Executor) ExecuteRunTask(ctx context.Context, taskID, recordID uint) er
return compressErr
}
finalPath = compressedPath
} else if strings.EqualFold(spec.Compression, "zstd") && !strings.HasSuffix(strings.ToLower(finalPath), ".zst") {
e.appendLog(ctx, recordID, "[agent] 开始压缩备份文件zstd\n")
compressedPath, compressErr := compress.ZstdFile(finalPath)
if compressErr != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("压缩失败: %v", compressErr))
return compressErr
}
finalPath = compressedPath
}
info, err := os.Stat(finalPath)
if err != nil {
@@ -414,6 +422,15 @@ func (e *Executor) ExecuteRestore(ctx context.Context, restoreRecordID uint) err
}
preparedPath = decompressed
}
if strings.HasSuffix(strings.ToLower(preparedPath), ".zst") {
e.appendRestoreLog(ctx, restoreRecordID, "[agent] 解压 zstd 压缩\n")
decompressed, err := compress.UnzstdFile(preparedPath)
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("解压失败: %v", err))
return err
}
preparedPath = decompressed
}
// 4) 运行 runner.Restore
taskSpec := buildRestoreBackupTaskSpec(spec, time.Now().UTC(), tmpDir)

View File

@@ -726,6 +726,15 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
return
}
finalPath = compressedPath
} else if strings.EqualFold(task.Compression, "zstd") && !strings.HasSuffix(strings.ToLower(finalPath), ".zst") {
logger.Infof("开始压缩备份文件zstd")
compressedPath, compressErr := compress.ZstdFile(finalPath)
if compressErr != nil {
errMessage = compressErr.Error()
logger.Errorf("压缩备份文件失败:%v", compressErr)
return
}
finalPath = compressedPath
}
if task.Encrypt {
logger.Infof("开始加密备份文件")

View File

@@ -40,7 +40,7 @@ type BackupTaskUpsertInput struct {
NodePoolTag string `json:"nodePoolTag" binding:"max=64"`
Tags string `json:"tags" binding:"max=500"` // 逗号分隔标签
RetentionDays int `json:"retentionDays"`
Compression string `json:"compression" binding:"omitempty,oneof=gzip none"`
Compression string `json:"compression" binding:"omitempty,oneof=gzip zstd none"`
Encrypt bool `json:"encrypt"`
MaxBackups int `json:"maxBackups"`
// ExtraConfig 类型特有扩展配置(如 SAP HANA 的 backupLevel/backupChannels

View File

@@ -106,6 +106,16 @@ func prepareBackupArtifact(cipher *codec.ConfigCipher, artifactPath string, logg
}
current = decompressed
}
if strings.HasSuffix(strings.ToLower(current), ".zst") {
if logger != nil {
logger.Infof("检测到 zstd 压缩,开始解压")
}
decompressed, err := compress.UnzstdFile(current)
if err != nil {
return "", err
}
current = decompressed
}
return current, nil
}

View File

@@ -0,0 +1,65 @@
package compress
import (
"fmt"
"io"
"os"
"strings"
"github.com/klauspost/compress/zstd"
)
// ZstdFile 将文件压缩为 .zstzstd返回压缩产物路径。
// 相比 gzipzstd 在相近 CPU 开销下提供更高压缩率与显著更快的解压速度。
func ZstdFile(sourcePath string) (string, error) {
source, err := os.Open(sourcePath)
if err != nil {
return "", fmt.Errorf("open source file: %w", err)
}
defer source.Close()
targetPath := sourcePath + ".zst"
target, err := os.Create(targetPath)
if err != nil {
return "", fmt.Errorf("create zstd file: %w", err)
}
defer target.Close()
writer, err := zstd.NewWriter(target)
if err != nil {
return "", fmt.Errorf("create zstd writer: %w", err)
}
if _, err := io.Copy(writer, source); err != nil {
_ = writer.Close()
return "", fmt.Errorf("zstd source file: %w", err)
}
if err := writer.Close(); err != nil {
return "", fmt.Errorf("close zstd writer: %w", err)
}
return targetPath, nil
}
// UnzstdFile 解压 .zst 文件,返回解压产物路径。
func UnzstdFile(sourcePath string) (string, error) {
source, err := os.Open(sourcePath)
if err != nil {
return "", fmt.Errorf("open zstd file: %w", err)
}
defer source.Close()
reader, err := zstd.NewReader(source)
if err != nil {
return "", fmt.Errorf("create zstd reader: %w", err)
}
defer reader.Close()
targetPath := strings.TrimSuffix(sourcePath, ".zst")
if targetPath == sourcePath {
targetPath += ".out"
}
target, err := os.Create(targetPath)
if err != nil {
return "", fmt.Errorf("create target file: %w", err)
}
defer target.Close()
if _, err := io.Copy(target, reader); err != nil {
return "", fmt.Errorf("unzstd file: %w", err)
}
return targetPath, nil
}

View File

@@ -0,0 +1,40 @@
package compress
import (
"bytes"
"os"
"path/filepath"
"strings"
"testing"
)
func TestZstdRoundTrip(t *testing.T) {
dir := t.TempDir()
src := filepath.Join(dir, "data.txt")
content := []byte("hello zstd roundtrip 差异压缩测试 " + strings.Repeat("payload-", 2000))
if err := os.WriteFile(src, content, 0o644); err != nil {
t.Fatalf("write source: %v", err)
}
compressed, err := ZstdFile(src)
if err != nil {
t.Fatalf("ZstdFile: %v", err)
}
if !strings.HasSuffix(compressed, ".zst") {
t.Fatalf("expected .zst suffix, got %s", compressed)
}
// 删除原文件,确保后续读到的是解压结果而非残留原文件。
if err := os.Remove(src); err != nil {
t.Fatalf("remove source: %v", err)
}
out, err := UnzstdFile(compressed)
if err != nil {
t.Fatalf("UnzstdFile: %v", err)
}
got, err := os.ReadFile(out)
if err != nil {
t.Fatalf("read decompressed: %v", err)
}
if !bytes.Equal(got, content) {
t.Fatalf("roundtrip mismatch: got %d bytes, want %d bytes", len(got), len(content))
}
}

View File

@@ -11,6 +11,7 @@ export const backupTaskTypeOptions = [
export const backupCompressionOptions = [
{ label: 'Gzip 压缩', value: 'gzip' },
{ label: 'Zstd 压缩(更快/更小)', value: 'zstd' },
{ label: '不压缩', value: 'none' },
] as const
@@ -89,7 +90,14 @@ export function getDefaultPort(type: BackupTaskType) {
}
export function getCompressionLabel(compression: BackupCompression) {
return compression === 'gzip' ? 'Gzip' : '无'
switch (compression) {
case 'gzip':
return 'Gzip'
case 'zstd':
return 'Zstd'
default:
return '无'
}
}
/** SAP HANA 备份级别选项 */

View File

@@ -1,6 +1,6 @@
export type BackupTaskType = 'file' | 'mysql' | 'sqlite' | 'postgresql' | 'saphana' | 'mongodb'
export type BackupTaskStatus = 'idle' | 'running' | 'success' | 'failed'
export type BackupCompression = 'gzip' | 'none'
export type BackupCompression = 'gzip' | 'zstd' | 'none'
export type BackupMode = 'full' | 'differential'
export interface BackupTaskSummary {