Files
BackupX/server/internal/agent/executor.go
Wu Qing f6bd185b9f feat: improve agent install release layout support
- fix bare-metal Agent install config and executor path handling
- support release package layout in deploy/install.sh and release workflow
- add regression tests for Agent execution and deploy install script behavior
2026-05-09 00:00:53 +08:00

461 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package agent
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"backupx/server/internal/backup"
"backupx/server/internal/storage"
storageRclone "backupx/server/internal/storage/rclone"
"backupx/server/pkg/compress"
)
// Executor 负责在 Agent 本地执行命令。
type Executor struct {
client *MasterClient
tempDir string
backupRegistry *backup.Registry
storageRegistry *storage.Registry
}
// NewExecutor 构造执行器。预先初始化 backup runner 与 storage registry。
func NewExecutor(client *MasterClient, tempDir string) *Executor {
backupRegistry := backup.NewRegistry(
backup.NewFileRunner(),
backup.NewSQLiteRunner(),
backup.NewMySQLRunner(nil),
backup.NewPostgreSQLRunner(nil),
backup.NewSAPHANARunner(nil),
)
storageRegistry := storage.NewRegistry(
storageRclone.NewLocalDiskFactory(),
storageRclone.NewS3Factory(),
storageRclone.NewWebDAVFactory(),
storageRclone.NewGoogleDriveFactory(),
storageRclone.NewAliyunOSSFactory(),
storageRclone.NewTencentCOSFactory(),
storageRclone.NewQiniuKodoFactory(),
storageRclone.NewFTPFactory(),
storageRclone.NewRcloneFactory(),
)
storageRclone.RegisterAllBackends(storageRegistry)
return &Executor{
client: client,
tempDir: tempDir,
backupRegistry: backupRegistry,
storageRegistry: storageRegistry,
}
}
// ExecuteRunTask 处理 run_task 命令:拉规格 → 执行 runner → 压缩 → 上传 → 上报记录。
//
// 注意Agent 当前不支持 Encrypt=true加密密钥不下发到 Agent避免密钥扩散
// 遇到启用加密的任务会向 Master 上报失败并返回错误。
func (e *Executor) ExecuteRunTask(ctx context.Context, taskID, recordID uint) error {
if err := e.ensureTempDir(); err != nil {
e.reportRecordFailure(ctx, recordID, err.Error())
return err
}
// 1) 拉取任务规格
spec, err := e.client.GetTaskSpec(ctx, taskID)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("拉取任务规格失败: %v", err))
return err
}
if spec.Encrypt {
msg := "Agent 不支持加密备份(加密密钥仅在 Master 端持有)"
e.reportRecordFailure(ctx, recordID, msg)
return fmt.Errorf("%s", msg)
}
e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 开始执行任务 %s (type=%s)\n", spec.Name, spec.Type))
// 2) 构造 backup.TaskSpec 并找对应 runner
startedAt := time.Now().UTC()
backupSpec := buildBackupTaskSpec(spec, startedAt, e.tempDir)
runner, err := e.backupRegistry.Runner(backupSpec.Type)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("不支持的备份类型: %v", err))
return err
}
// 3) 运行 runner
logger := newRecordLogger(ctx, e.client, recordID)
result, err := runner.Run(ctx, backupSpec, logger)
if err != nil {
e.reportRecordFailure(ctx, recordID, err.Error())
return err
}
defer os.RemoveAll(result.TempDir)
// 4) 可选 gzip 压缩
finalPath := result.ArtifactPath
if strings.EqualFold(spec.Compression, "gzip") && !strings.HasSuffix(strings.ToLower(finalPath), ".gz") {
e.appendLog(ctx, recordID, "[agent] 开始压缩备份文件\n")
compressedPath, compressErr := compress.GzipFile(finalPath)
if compressErr != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("压缩失败: %v", compressErr))
return compressErr
}
finalPath = compressedPath
}
info, err := os.Stat(finalPath)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("获取文件信息失败: %v", err))
return err
}
fileName := filepath.Base(finalPath)
fileSize := info.Size()
storagePath := backup.BuildStorageKey(spec.Type, startedAt, fileName)
// 5) 计算 checksum一次读一次并上传到所有目标
checksum, err := computeFileSHA256(finalPath)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("计算 checksum 失败: %v", err))
return err
}
if len(spec.StorageTargets) == 0 {
e.reportRecordFailure(ctx, recordID, "没有关联的存储目标")
return fmt.Errorf("no storage targets")
}
for _, target := range spec.StorageTargets {
if err := e.uploadToTarget(ctx, recordID, target, finalPath, storagePath, fileSize, spec.TaskID); err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("上传到 %s 失败: %v", target.Name, err))
return err
}
e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 已上传到存储目标 %s\n", target.Name))
}
// 6) 上报最终成功
return e.client.UpdateRecord(ctx, recordID, RecordUpdate{
Status: "success",
FileName: fileName,
FileSize: fileSize,
Checksum: checksum,
StoragePath: storagePath,
LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize),
})
}
// uploadToTarget 上传单个目标。为保持简化不做上传级重试rclone 本身已有 low-level 重试)。
func (e *Executor) uploadToTarget(ctx context.Context, recordID uint, target StorageTargetConfig, filePath, objectKey string, fileSize int64, taskID uint) error {
var rawConfig map[string]any
if len(target.Config) > 0 {
// DecodeRawConfig 通过 json 解析
if err := jsonUnmarshalMap(target.Config, &rawConfig); err != nil {
return fmt.Errorf("parse storage config: %w", err)
}
}
provider, err := e.storageRegistry.Create(ctx, target.Type, rawConfig)
if err != nil {
return fmt.Errorf("create provider: %w", err)
}
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("open artifact: %w", err)
}
defer f.Close()
meta := map[string]string{
"taskId": fmt.Sprintf("%d", taskID),
"recordId": fmt.Sprintf("%d", recordID),
}
return provider.Upload(ctx, objectKey, f, fileSize, meta)
}
// appendLog 追加日志到 Master 记录(尽力而为,失败不中断主流程)
func (e *Executor) appendLog(ctx context.Context, recordID uint, line string) {
_ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{LogAppend: line})
}
// reportRecordFailure 上报失败状态
func (e *Executor) reportRecordFailure(ctx context.Context, recordID uint, msg string) {
_ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{
Status: "failed",
ErrorMessage: msg,
LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg),
})
}
// buildBackupTaskSpec 把 AgentTaskSpec 转换为 backup.TaskSpec。
func buildBackupTaskSpec(spec *TaskSpec, startedAt time.Time, tempDir string) backup.TaskSpec {
sourcePaths := parseStringListField(spec.SourcePaths)
excludes := parseStringListField(spec.ExcludePatterns)
return backup.TaskSpec{
ID: spec.TaskID,
Name: spec.Name,
Type: spec.Type,
SourcePath: spec.SourcePath,
SourcePaths: sourcePaths,
ExcludePatterns: excludes,
Database: backup.DatabaseSpec{
Host: spec.DBHost,
Port: spec.DBPort,
User: spec.DBUser,
Password: spec.DBPassword,
Path: spec.DBPath,
Names: splitCommaOrNewline(spec.DBName),
},
Compression: spec.Compression,
Encrypt: spec.Encrypt,
StartedAt: startedAt,
TempDir: tempDir,
}
}
func (e *Executor) ensureTempDir() error {
if err := os.MkdirAll(e.tempDir, 0o755); err != nil {
return fmt.Errorf("create agent temp dir: %w", err)
}
return nil
}
func parseStringListField(value string) []string {
trimmed := strings.TrimSpace(value)
if trimmed == "" || trimmed == "[]" {
return nil
}
var jsonItems []string
if err := json.Unmarshal([]byte(trimmed), &jsonItems); err == nil {
return compactStringList(jsonItems)
}
return compactStringList(strings.FieldsFunc(trimmed, func(r rune) bool {
return r == '\n' || r == '\r'
}))
}
func compactStringList(items []string) []string {
result := make([]string, 0, len(items))
for _, item := range items {
if trimmed := strings.TrimSpace(item); trimmed != "" {
result = append(result, trimmed)
}
}
return result
}
// recordLogger 把 runner 日志回传到 Master 记录。
// 实现 backup.LogWriter每条日志追加到 record.log_content。
type recordLogger struct {
ctx context.Context
client *MasterClient
recordID uint
}
func newRecordLogger(ctx context.Context, client *MasterClient, recordID uint) *recordLogger {
return &recordLogger{ctx: ctx, client: client, recordID: recordID}
}
func (l *recordLogger) WriteLine(message string) {
_ = l.client.UpdateRecord(l.ctx, l.recordID, RecordUpdate{LogAppend: message + "\n"})
}
// restoreLogger 把 runner 日志回传到 Master 恢复记录。
type restoreLogger struct {
ctx context.Context
client *MasterClient
restoreID uint
}
func newRestoreLogger(ctx context.Context, client *MasterClient, restoreID uint) *restoreLogger {
return &restoreLogger{ctx: ctx, client: client, restoreID: restoreID}
}
func (l *restoreLogger) WriteLine(message string) {
_ = l.client.UpdateRestore(l.ctx, l.restoreID, RestoreUpdate{LogAppend: message + "\n"})
}
// DeleteStorageObject 在 Agent 本机上删除指定存储对象(供跨节点清理调用)。
func (e *Executor) DeleteStorageObject(ctx context.Context, targetType string, targetConfig map[string]any, storagePath string) error {
provider, err := e.storageRegistry.Create(ctx, targetType, targetConfig)
if err != nil {
return fmt.Errorf("create provider: %w", err)
}
return provider.Delete(ctx, storagePath)
}
// ExecuteRestore 处理 restore_record 命令:拉规格 → 下载 → 解压 → 执行 runner.Restore → 上报结果。
//
// 与 ExecuteRunTask 对称,但方向相反:
// - 下载:通过 spec.Storage 创建 provider → Download(spec.StoragePath)
// - 解密:当前 Agent 不支持加密恢复密钥未下发spec.Encrypt=true 会直接失败
// - 执行backup.Registry.Runner(spec.Type).Restore
// - 上报:通过 UpdateRestorestatus/logAppend
func (e *Executor) ExecuteRestore(ctx context.Context, restoreRecordID uint) error {
if err := e.ensureTempDir(); err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, err.Error())
return err
}
spec, err := e.client.GetRestoreSpec(ctx, restoreRecordID)
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("拉取恢复规格失败: %v", err))
return err
}
if spec.Encrypt {
msg := "Agent 不支持加密恢复(加密密钥仅在 Master 端持有)"
e.reportRestoreFailure(ctx, restoreRecordID, msg)
return fmt.Errorf("%s", msg)
}
e.appendRestoreLog(ctx, restoreRecordID, fmt.Sprintf("[agent] 开始恢复 %s (type=%s)\n", spec.TaskName, spec.Type))
tmpDir, err := os.MkdirTemp(e.tempDir, "restore-*")
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("创建恢复临时目录失败: %v", err))
return err
}
defer os.RemoveAll(tmpDir)
// 1) 创建 storage provider
var rawConfig map[string]any
if len(spec.Storage.Config) > 0 {
if err := jsonUnmarshalMap(spec.Storage.Config, &rawConfig); err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("解析存储配置失败: %v", err))
return err
}
}
provider, err := e.storageRegistry.Create(ctx, spec.Storage.Type, rawConfig)
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("创建存储客户端失败: %v", err))
return err
}
// 2) 下载
fileName := spec.FileName
if strings.TrimSpace(fileName) == "" {
fileName = filepath.Base(spec.StoragePath)
}
artifactPath := filepath.Join(tmpDir, filepath.Base(fileName))
e.appendRestoreLog(ctx, restoreRecordID, fmt.Sprintf("[agent] 下载备份文件 %s\n", spec.StoragePath))
reader, err := provider.Download(ctx, spec.StoragePath)
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("下载备份失败: %v", err))
return err
}
if err := writeReaderToLocal(artifactPath, reader); err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("写入备份文件失败: %v", err))
return err
}
// 3) 解压Agent 不支持加密,遇到 .enc 会直接失败)
preparedPath := artifactPath
if strings.HasSuffix(strings.ToLower(preparedPath), ".enc") {
msg := "检测到加密后缀Agent 不支持加密恢复"
e.reportRestoreFailure(ctx, restoreRecordID, msg)
return fmt.Errorf("%s", msg)
}
if strings.HasSuffix(strings.ToLower(preparedPath), ".gz") {
e.appendRestoreLog(ctx, restoreRecordID, "[agent] 解压 gzip 压缩\n")
decompressed, err := compress.GunzipFile(preparedPath)
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("解压失败: %v", err))
return err
}
preparedPath = decompressed
}
// 4) 运行 runner.Restore
taskSpec := buildRestoreBackupTaskSpec(spec, time.Now().UTC(), tmpDir)
runner, err := e.backupRegistry.Runner(taskSpec.Type)
if err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, fmt.Sprintf("不支持的备份类型: %v", err))
return err
}
logger := newRestoreLogger(ctx, e.client, restoreRecordID)
if err := runner.Restore(ctx, taskSpec, preparedPath, logger); err != nil {
e.reportRestoreFailure(ctx, restoreRecordID, err.Error())
return err
}
// 5) 上报成功
return e.client.UpdateRestore(ctx, restoreRecordID, RestoreUpdate{
Status: "success",
LogAppend: "[agent] 恢复执行完成\n",
})
}
func (e *Executor) appendRestoreLog(ctx context.Context, restoreID uint, line string) {
_ = e.client.UpdateRestore(ctx, restoreID, RestoreUpdate{LogAppend: line})
}
func (e *Executor) reportRestoreFailure(ctx context.Context, restoreID uint, msg string) {
_ = e.client.UpdateRestore(ctx, restoreID, RestoreUpdate{
Status: "failed",
ErrorMessage: msg,
LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg),
})
}
// buildRestoreBackupTaskSpec 把 RestoreSpec 转成 backup.TaskSpec。
func buildRestoreBackupTaskSpec(spec *RestoreSpec, startedAt time.Time, tempDir string) backup.TaskSpec {
return backup.TaskSpec{
ID: spec.TaskID,
Name: spec.TaskName,
Type: spec.Type,
SourcePath: spec.SourcePath,
SourcePaths: spec.SourcePaths,
ExcludePatterns: nil,
Database: backup.DatabaseSpec{
Host: spec.DBHost,
Port: spec.DBPort,
User: spec.DBUser,
Password: spec.DBPassword,
Path: spec.DBPath,
Names: splitCommaOrNewline(spec.DBName),
},
Compression: spec.Compression,
Encrypt: spec.Encrypt,
StartedAt: startedAt,
TempDir: tempDir,
}
}
// writeReaderToLocal 把 reader 写到本地文件Agent 侧工具函数)。
func writeReaderToLocal(targetPath string, reader io.ReadCloser) error {
defer reader.Close()
if err := os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
return err
}
file, err := os.Create(targetPath)
if err != nil {
return err
}
defer file.Close()
_, err = io.Copy(file, reader)
return err
}
// 辅助函数
func computeFileSHA256(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return hex.EncodeToString(h.Sum(nil)), nil
}
func splitCommaOrNewline(s string) []string {
var result []string
for _, part := range strings.FieldsFunc(s, func(r rune) bool {
return r == ',' || r == '\n' || r == ';'
}) {
if p := strings.TrimSpace(part); p != "" {
result = append(result, p)
}
}
return result
}