refactor: 消除集群执行服务冗余逻辑 + 修复节点状态滞后缺陷 (#74)

抽取备份/恢复/验证/复制四服务的复制粘贴逻辑到 execution_helpers.go(净减约 250 行);节点状态改为按 LastSeen 实时推导,消除过期 online 误判;Agent systemd 单元补齐 LimitNOFILE 与单机端一致。go build/test 全绿。
This commit is contained in:
Wu Qing
2026-05-26 14:12:39 +08:00
committed by GitHub
parent e4c52fd8f4
commit 0f30e7bf52
10 changed files with 317 additions and 323 deletions

View File

@@ -70,6 +70,8 @@ Environment="BACKUPX_AGENT_TOKEN=${AGENT_TOKEN}"
ExecStart=${INSTALL_PREFIX}/backupx agent --temp-dir /var/lib/backupx-agent/tmp
Restart=on-failure
RestartSec=10s
# Agent 需以 root 运行以读取任意源数据;与单机服务端保持一致的资源/句柄上限。
LimitNOFILE=65535
[Install]
WantedBy=multi-user.target

View File

@@ -10,22 +10,46 @@ const (
NodeStatusOffline = "offline"
)
// OfflineGracePeriod 节点心跳超时判定阈值:超过该时长未心跳的远程节点视为离线。
// Agent 默认 15s 心跳一次,预留 3 次重试空间。
const OfflineGracePeriod = 45 * time.Second
// EffectiveStatus 返回节点的「实时」在线状态。
//
// 存储字段 Status 由心跳置 online、由后台离线监控置 offline二者之间存在最长一个
// 监控周期的滞后窗口;期间 List/Get/调度器可能读到过期的 "online",进而把任务下发
// 给一台刚刚失联的节点。本方法直接以 LastSeen 推导:远程节点若超过 OfflineGracePeriod
// 未心跳即视为 offline消除该滞后导致的误判。本机节点恒以存储状态为准它就是 Master
// 自身,不依赖心跳)。
func (n *Node) EffectiveStatus(now time.Time) string {
if n == nil {
return NodeStatusOffline
}
if n.IsLocal {
return n.Status
}
if now.Sub(n.LastSeen) > OfflineGracePeriod {
return NodeStatusOffline
}
return n.Status
}
// Node represents a managed server node in the cluster.
// The default "local" node is auto-created for single-machine backward compatibility.
type Node struct {
ID uint `gorm:"primaryKey" json:"id"`
Name string `gorm:"size:128;uniqueIndex;not null" json:"name"`
Hostname string `gorm:"size:255" json:"hostname"`
IPAddress string `gorm:"column:ip_address;size:64" json:"ipAddress"`
Token string `gorm:"size:128;uniqueIndex;not null" json:"-"`
Status string `gorm:"size:20;not null;default:'offline'" json:"status"`
IsLocal bool `gorm:"not null;default:false" json:"isLocal"`
OS string `gorm:"size:64" json:"os"`
Arch string `gorm:"size:32" json:"arch"`
AgentVer string `gorm:"column:agent_version;size:32" json:"agentVersion"`
LastSeen time.Time `gorm:"column:last_seen" json:"lastSeen"`
PrevToken string `gorm:"size:128;index" json:"-"`
PrevTokenExpires *time.Time `gorm:"column:prev_token_expires" json:"-"`
ID uint `gorm:"primaryKey" json:"id"`
Name string `gorm:"size:128;uniqueIndex;not null" json:"name"`
Hostname string `gorm:"size:255" json:"hostname"`
IPAddress string `gorm:"column:ip_address;size:64" json:"ipAddress"`
Token string `gorm:"size:128;uniqueIndex;not null" json:"-"`
Status string `gorm:"size:20;not null;default:'offline'" json:"status"`
IsLocal bool `gorm:"not null;default:false" json:"isLocal"`
OS string `gorm:"size:64" json:"os"`
Arch string `gorm:"size:32" json:"arch"`
AgentVer string `gorm:"column:agent_version;size:32" json:"agentVersion"`
LastSeen time.Time `gorm:"column:last_seen" json:"lastSeen"`
PrevToken string `gorm:"size:128;index" json:"-"`
PrevTokenExpires *time.Time `gorm:"column:prev_token_expires" json:"-"`
// MaxConcurrent 该节点允许的最大并发任务数0=不限制,沿用全局 cfg.Backup.MaxConcurrent
// 用于大集群中限制单节点资源占用:例如小内存 Agent 节点可配 1避免多个大备份同时跑挤爆。
MaxConcurrent int `gorm:"column:max_concurrent;not null;default:0" json:"maxConcurrent"`

View File

@@ -0,0 +1,58 @@
package model
import (
"testing"
"time"
)
func TestNodeEffectiveStatus(t *testing.T) {
now := time.Date(2026, 5, 26, 12, 0, 0, 0, time.UTC)
cases := []struct {
name string
node *Node
want string
}{
{
name: "remote fresh heartbeat → stored online",
node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-10 * time.Second)},
want: NodeStatusOnline,
},
{
name: "remote stale heartbeat but stored online → derived offline",
node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-90 * time.Second)},
want: NodeStatusOffline,
},
{
name: "remote just past grace period → offline",
node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-(OfflineGracePeriod + time.Second))},
want: NodeStatusOffline,
},
{
name: "remote within grace period → online",
node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-(OfflineGracePeriod - time.Second))},
want: NodeStatusOnline,
},
{
name: "local node ignores LastSeen → stored online",
node: &Node{IsLocal: true, Status: NodeStatusOnline, LastSeen: now.Add(-24 * time.Hour)},
want: NodeStatusOnline,
},
{
name: "remote stored offline stays offline",
node: &Node{IsLocal: false, Status: NodeStatusOffline, LastSeen: now.Add(-5 * time.Second)},
want: NodeStatusOffline,
},
{
name: "nil node → offline",
node: nil,
want: NodeStatusOffline,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := tc.node.EffectiveStatus(now); got != tc.want {
t.Fatalf("EffectiveStatus = %q, want %q", got, tc.want)
}
})
}
}

View File

@@ -167,7 +167,8 @@ func (s *Service) syncTaskLocked(task *model.BackupTask) error {
// 集群感知:若任务绑定了离线的远程节点,跳过本轮触发避免堆积 failed 记录
if taskNodeID > 0 && s.nodes != nil {
node, err := s.nodes.FindByID(context.Background(), taskNodeID)
if err == nil && node != nil && !node.IsLocal && node.Status != model.NodeStatusOnline {
// 用实时推导的状态判定,避免后台监控刷新前把任务下发给刚失联的节点。
if err == nil && node != nil && !node.IsLocal && node.EffectiveStatus(time.Now().UTC()) != model.NodeStatusOnline {
if s.logger != nil {
s.logger.Warn("skip scheduled run: target node offline",
zap.Uint("task_id", taskID), zap.String("task_name", taskName),

View File

@@ -332,28 +332,8 @@ func (s *BackupExecutionService) deleteRemoteLocalDiskObject(ctx context.Context
// provider 指向的是 Master 本机的同名路径,访问会静默取错文件或 404。明确拒绝
// 让用户知情,避免假成功。
func (s *BackupExecutionService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error {
if record == nil || record.NodeID == 0 {
return nil
}
// 检查是否为远程节点
if s.nodeRepo == nil {
return nil
}
node, err := s.nodeRepo.FindByID(ctx, record.NodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
// 检查存储类型是否为 local_disk跨节点不可达
target, err := s.targets.FindByID(ctx, record.StorageTargetID)
if err != nil || target == nil {
return nil
}
if strings.EqualFold(target.Type, "local_disk") {
return apperror.BadRequest("BACKUP_RECORD_CROSS_NODE_LOCAL_DISK",
fmt.Sprintf("该备份位于节点 %s 的本地磁盘local_diskMaster 无法跨节点访问。请登录该节点或改用云存储后再操作。", node.Name),
nil)
}
return nil
return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record,
"BACKUP_RECORD_CROSS_NODE_LOCAL_DISK", "访问。请登录该节点或改用云存储后再操作")
}
func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async bool) (*BackupRecordDetail, error) {
@@ -593,14 +573,7 @@ func (s *BackupExecutionService) isRemoteNode(ctx context.Context, nodeID uint)
// resolveRemoteNode 返回 NodeID 对应的远程节点指针,或 nil 表示本机执行。
// 相比 isRemoteNode它让调用方能读取节点状态在线/离线)做进一步判断。
func (s *BackupExecutionService) resolveRemoteNode(ctx context.Context, nodeID uint) *model.Node {
if s.nodeRepo == nil || s.agentDispatcher == nil || nodeID == 0 {
return nil
}
node, err := s.nodeRepo.FindByID(ctx, nodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
return node
return resolveRemoteExecutionNode(ctx, s.nodeRepo, s.agentDispatcher != nil, nodeID)
}
func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.BackupTask, recordID uint, startedAt time.Time) {
@@ -952,78 +925,11 @@ func (s *BackupExecutionService) resolveProviderForNode(ctx context.Context, tar
LowLevelRetries: s.retries,
BandwidthLimit: s.effectiveBandwidth(ctx, nodeID),
})
target, err := s.targets.FindByID(ctx, targetID)
if err != nil {
return nil, apperror.Internal("BACKUP_STORAGE_TARGET_GET_FAILED", "无法获取存储目标详情", err)
}
if target == nil {
return nil, apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", "关联的存储目标不存在", nil)
}
configMap := map[string]any{}
if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil {
return nil, apperror.Internal("BACKUP_STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储目标配置", err)
}
provider, err := s.storageRegistry.Create(ctx, target.Type, configMap)
if err != nil {
return nil, err
}
return provider, nil
return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID)
}
func (s *BackupExecutionService) buildTaskSpec(task *model.BackupTask, startedAt time.Time) (backup.TaskSpec, error) {
excludePatterns := []string{}
if strings.TrimSpace(task.ExcludePatterns) != "" {
if err := json.Unmarshal([]byte(task.ExcludePatterns), &excludePatterns); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析排除规则", err)
}
}
password := ""
if strings.TrimSpace(task.DBPasswordCiphertext) != "" {
plain, err := s.cipher.Decrypt(task.DBPasswordCiphertext)
if err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECRYPT_FAILED", "无法解密数据库密码", err)
}
password = string(plain)
}
sourcePaths := []string{}
if strings.TrimSpace(task.SourcePaths) != "" {
if err := json.Unmarshal([]byte(task.SourcePaths), &sourcePaths); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err)
}
}
dbSpec := backup.DatabaseSpec{
Host: task.DBHost,
Port: task.DBPort,
User: task.DBUser,
Password: password,
Names: []string{task.DBName},
Path: task.DBPath,
}
// 解析 ExtraConfig 填充类型特有字段(目前主要用于 SAP HANA
if strings.TrimSpace(task.ExtraConfig) != "" {
extra := map[string]any{}
if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err)
}
applyHANAExtraConfig(&dbSpec, extra)
}
return backup.TaskSpec{
ID: task.ID,
Name: task.Name,
Type: task.Type,
SourcePath: task.SourcePath,
SourcePaths: sourcePaths,
ExcludePatterns: excludePatterns,
StorageTargetID: task.StorageTargetID,
StorageTargetType: "",
Compression: task.Compression,
Encrypt: task.Encrypt,
RetentionDays: task.RetentionDays,
MaxBackups: task.MaxBackups,
StartedAt: startedAt,
TempDir: s.tempDir,
Database: dbSpec,
}, nil
return buildBackupTaskSpec(s.cipher, task, startedAt, s.tempDir)
}
// applyHANAExtraConfig 从 ExtraConfig map 中提取 SAP HANA 字段填入 DatabaseSpec。
@@ -1065,22 +971,7 @@ func (s *BackupExecutionService) loadRecordProvider(ctx context.Context, recordI
}
func (s *BackupExecutionService) prepareArtifactForRestore(artifactPath string) (string, error) {
currentPath := artifactPath
if strings.HasSuffix(strings.ToLower(currentPath), ".enc") {
decryptedPath, err := backupcrypto.DecryptFile(s.cipher.Key(), currentPath)
if err != nil {
return "", err
}
currentPath = decryptedPath
}
if strings.HasSuffix(strings.ToLower(currentPath), ".gz") {
decompressedPath, err := compress.GunzipFile(currentPath)
if err != nil {
return "", err
}
currentPath = decompressedPath
}
return currentPath, nil
return prepareBackupArtifact(s.cipher, artifactPath, nil)
}
func (s *BackupExecutionService) getRecordDetail(ctx context.Context, recordID uint) (*BackupRecordDetail, error) {

View File

@@ -0,0 +1,163 @@
package service
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"backupx/server/internal/apperror"
"backupx/server/internal/backup"
"backupx/server/internal/model"
"backupx/server/internal/repository"
"backupx/server/internal/storage"
"backupx/server/internal/storage/codec"
"backupx/server/pkg/compress"
backupcrypto "backupx/server/pkg/crypto"
)
// 本文件集中放置「备份执行 / 恢复 / 验证 / 复制」四个执行服务共享的执行期辅助逻辑。
//
// 历史上这些函数(解密存储配置创建 provider、按后缀解密解压归档、判定远程节点、
// 跨节点 local_disk 保护、构建任务执行规格)在四个服务里各复制了一份,差异仅在
// 字段名与少量错误码/日志文案。重复实现既增加维护成本,也容易出现"改了一处忘了
// 另一处"的不一致缺陷。这里抽取为单一实现,各服务通过薄封装方法委托调用,调用方
// 无需改动。
// resolveStorageProvider 查询存储目标、解密其配置并创建 provider。
func resolveStorageProvider(ctx context.Context, targets repository.StorageTargetRepository, registry *storage.Registry, cipher *codec.ConfigCipher, targetID uint) (storage.StorageProvider, error) {
target, err := targets.FindByID(ctx, targetID)
if err != nil {
return nil, apperror.Internal("BACKUP_STORAGE_TARGET_GET_FAILED", "无法获取存储目标详情", err)
}
if target == nil {
return nil, apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", "关联的存储目标不存在", nil)
}
configMap := map[string]any{}
if err := cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil {
return nil, apperror.Internal("BACKUP_STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储目标配置", err)
}
return registry.Create(ctx, target.Type, configMap)
}
// prepareBackupArtifact 按文件后缀依次解密(.enc)与解压(.gz),返回最终可读路径。
// logger 可为 nil此时静默执行
func prepareBackupArtifact(cipher *codec.ConfigCipher, artifactPath string, logger *backup.ExecutionLogger) (string, error) {
current := artifactPath
if strings.HasSuffix(strings.ToLower(current), ".enc") {
if logger != nil {
logger.Infof("检测到加密后缀,开始解密")
}
decrypted, err := backupcrypto.DecryptFile(cipher.Key(), current)
if err != nil {
return "", err
}
current = decrypted
}
if strings.HasSuffix(strings.ToLower(current), ".gz") {
if logger != nil {
logger.Infof("检测到 gzip 压缩,开始解压")
}
decompressed, err := compress.GunzipFile(current)
if err != nil {
return "", err
}
current = decompressed
}
return current, nil
}
// resolveRemoteExecutionNode 返回远程(非本机)节点指针,用于判定任务应下发给
// Agent 还是在 Master 本地执行。clusterEnabled 通常为「该服务是否注入了 Agent
// 下发能力」。本机 / 未启用集群 / nodeID=0 / 未找到时返回 nil走本地执行
func resolveRemoteExecutionNode(ctx context.Context, nodeRepo repository.NodeRepository, clusterEnabled bool, nodeID uint) *model.Node {
if nodeRepo == nil || !clusterEnabled || nodeID == 0 {
return nil
}
node, err := nodeRepo.FindByID(ctx, nodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
return node
}
// validateCrossNodeLocalDisk 跨节点 local_disk 保护:若备份记录归属某远程节点,
// 且其存储目标是 local_disk数据位于该节点本地磁盘Master 无法跨节点访问,
// 直接返回错误。errCode/opName 由各服务定制,以给出贴合场景的提示文案。
func validateCrossNodeLocalDisk(ctx context.Context, nodeRepo repository.NodeRepository, targets repository.StorageTargetRepository, record *model.BackupRecord, errCode, opName string) error {
if record == nil || record.NodeID == 0 || nodeRepo == nil {
return nil
}
node, err := nodeRepo.FindByID(ctx, record.NodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
target, err := targets.FindByID(ctx, record.StorageTargetID)
if err != nil || target == nil {
return nil
}
if strings.EqualFold(target.Type, "local_disk") {
return apperror.BadRequest(errCode,
fmt.Sprintf("备份位于节点 %s 的本地磁盘local_diskMaster 无法跨节点%s。", node.Name, opName),
nil)
}
return nil
}
// buildBackupTaskSpec 由备份任务构建执行规格:解析排除规则/源路径、解密 DB 密码、
// 套用 ExtraConfigSAP HANA 等类型特有字段)。被备份执行与恢复服务共享。
func buildBackupTaskSpec(cipher *codec.ConfigCipher, task *model.BackupTask, startedAt time.Time, tempDir string) (backup.TaskSpec, error) {
excludePatterns := []string{}
if strings.TrimSpace(task.ExcludePatterns) != "" {
if err := json.Unmarshal([]byte(task.ExcludePatterns), &excludePatterns); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析排除规则", err)
}
}
password := ""
if strings.TrimSpace(task.DBPasswordCiphertext) != "" {
plain, err := cipher.Decrypt(task.DBPasswordCiphertext)
if err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECRYPT_FAILED", "无法解密数据库密码", err)
}
password = string(plain)
}
sourcePaths := []string{}
if strings.TrimSpace(task.SourcePaths) != "" {
if err := json.Unmarshal([]byte(task.SourcePaths), &sourcePaths); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err)
}
}
dbSpec := backup.DatabaseSpec{
Host: task.DBHost,
Port: task.DBPort,
User: task.DBUser,
Password: password,
Names: []string{task.DBName},
Path: task.DBPath,
}
// 解析 ExtraConfig 填充类型特有字段(目前主要用于 SAP HANA
if strings.TrimSpace(task.ExtraConfig) != "" {
extra := map[string]any{}
if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err)
}
applyHANAExtraConfig(&dbSpec, extra)
}
return backup.TaskSpec{
ID: task.ID,
Name: task.Name,
Type: task.Type,
SourcePath: task.SourcePath,
SourcePaths: sourcePaths,
ExcludePatterns: excludePatterns,
StorageTargetID: task.StorageTargetID,
Compression: task.Compression,
Encrypt: task.Encrypt,
RetentionDays: task.RetentionDays,
MaxBackups: task.MaxBackups,
StartedAt: startedAt,
TempDir: tempDir,
Database: dbSpec,
}, nil
}

View File

@@ -171,12 +171,14 @@ func (s *NodeService) loadQueueSummaries(ctx context.Context) map[uint]repositor
}
func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentCommandQueueSummary) NodeSummary {
// 以 LastSeen 实时推导状态,避免读到后台监控尚未刷新的过期 "online"。
effStatus := node.EffectiveStatus(time.Now().UTC())
summary := NodeSummary{
ID: node.ID,
Name: node.Name,
Hostname: node.Hostname,
IPAddress: node.IPAddress,
Status: node.Status,
Status: effStatus,
IsLocal: node.IsLocal,
OS: node.OS,
Arch: node.Arch,
@@ -195,7 +197,7 @@ func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentComm
},
RunningTasks: queue.Running,
LastError: queue.LastError,
Health: nodeHealth(node, queue),
Health: nodeHealth(effStatus, queue),
}
if queue.OldestActiveAt != nil {
summary.Queue.OldestActiveAgeS = int(time.Since(*queue.OldestActiveAt).Seconds())
@@ -203,8 +205,8 @@ func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentComm
return summary
}
func nodeHealth(node *model.Node, queue repository.AgentCommandQueueSummary) string {
if node.Status != model.NodeStatusOnline {
func nodeHealth(status string, queue repository.AgentCommandQueueSummary) string {
if status != model.NodeStatusOnline {
return "offline"
}
if queue.Timeouts > 0 || strings.TrimSpace(queue.LastError) != "" {
@@ -303,9 +305,9 @@ func (s *NodeService) ListDirectory(ctx context.Context, nodeID uint, path strin
return result, nil
}
// OfflineThreshold 节点被判定为离线的心跳超时阈值
// Agent 默认 15s 心跳一次45s 未见视为离线,预留 3 次重试空间
const OfflineThreshold = 45 * time.Second
// OfflineThreshold 节点被判定为离线的心跳超时阈值,与 model.EffectiveStatus 共用同一阈值,
// 保证「后台监控持久化的 offline」与「读路径实时推导的 offline」判定一致
const OfflineThreshold = model.OfflineGracePeriod
// StartOfflineMonitor 启动后台 goroutine定期把超时未心跳的节点标记为离线。
// 传入的 ctx 被取消后退出。

View File

@@ -82,22 +82,22 @@ func (s *ReplicationService) SetEventDispatcher(dispatcher EventDispatcher) {
// ReplicationRecordSummary 列表项。
type ReplicationRecordSummary struct {
ID uint `json:"id"`
BackupRecordID uint `json:"backupRecordId"`
TaskID uint `json:"taskId"`
SourceTargetID uint `json:"sourceTargetId"`
SourceTargetName string `json:"sourceTargetName"`
DestTargetID uint `json:"destTargetId"`
DestTargetName string `json:"destTargetName"`
Status string `json:"status"`
StoragePath string `json:"storagePath"`
FileSize int64 `json:"fileSize"`
Checksum string `json:"checksum"`
ErrorMessage string `json:"errorMessage"`
DurationSeconds int `json:"durationSeconds"`
TriggeredBy string `json:"triggeredBy"`
StartedAt time.Time `json:"startedAt"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
ID uint `json:"id"`
BackupRecordID uint `json:"backupRecordId"`
TaskID uint `json:"taskId"`
SourceTargetID uint `json:"sourceTargetId"`
SourceTargetName string `json:"sourceTargetName"`
DestTargetID uint `json:"destTargetId"`
DestTargetName string `json:"destTargetName"`
Status string `json:"status"`
StoragePath string `json:"storagePath"`
FileSize int64 `json:"fileSize"`
Checksum string `json:"checksum"`
ErrorMessage string `json:"errorMessage"`
DurationSeconds int `json:"durationSeconds"`
TriggeredBy string `json:"triggeredBy"`
StartedAt time.Time `json:"startedAt"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
}
type ReplicationRecordListInput struct {
@@ -262,39 +262,13 @@ func (s *ReplicationService) executeReplication(ctx context.Context, repID uint)
}
func (s *ReplicationService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) {
target, err := s.targets.FindByID(ctx, targetID)
if err != nil {
return nil, apperror.Internal("STORAGE_TARGET_GET_FAILED", "无法获取存储目标", err)
}
if target == nil {
return nil, apperror.BadRequest("STORAGE_TARGET_INVALID", "存储目标不存在", nil)
}
configMap := map[string]any{}
if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil {
return nil, apperror.Internal("STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储配置", err)
}
return s.storageRegistry.Create(ctx, target.Type, configMap)
return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID)
}
// validateClusterAccessible 拒绝跨节点 local_disk 源Master 无法拉取)
// validateClusterAccessible 拒绝跨节点 local_disk 源Master 无法拉取)
func (s *ReplicationService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error {
if record == nil || record.NodeID == 0 || s.nodeRepo == nil {
return nil
}
node, err := s.nodeRepo.FindByID(ctx, record.NodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
target, err := s.targets.FindByID(ctx, record.StorageTargetID)
if err != nil || target == nil {
return nil
}
if strings.EqualFold(target.Type, "local_disk") {
return apperror.BadRequest("REPLICATION_CROSS_NODE_LOCAL_DISK",
fmt.Sprintf("备份位于节点 %s 的本地磁盘local_diskMaster 无法跨节点复制。请改用云存储作为主备份。", node.Name),
nil)
}
return nil
return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record,
"REPLICATION_CROSS_NODE_LOCAL_DISK", "复制。请改用云存储作为主备份")
}
func (s *ReplicationService) dispatchFailed(ctx context.Context, rep *model.ReplicationRecord, message string) {
@@ -304,12 +278,12 @@ func (s *ReplicationService) dispatchFailed(ctx context.Context, rep *model.Repl
title := "BackupX 备份复制失败"
body := fmt.Sprintf("备份记录:#%d\n源 → 目标:#%d → #%d\n错误%s", rep.BackupRecordID, rep.SourceTargetID, rep.DestTargetID, message)
fields := map[string]any{
"replicationId": rep.ID,
"backupRecordId": rep.BackupRecordID,
"taskId": rep.TaskID,
"sourceTargetId": rep.SourceTargetID,
"destTargetId": rep.DestTargetID,
"error": message,
"replicationId": rep.ID,
"backupRecordId": rep.BackupRecordID,
"taskId": rep.TaskID,
"sourceTargetId": rep.SourceTargetID,
"destTargetId": rep.DestTargetID,
"error": message,
}
_ = s.eventDispatcher.DispatchEvent(ctx, model.NotificationEventReplicationFailed, title, body, fields)
}

View File

@@ -16,8 +16,6 @@ import (
"backupx/server/internal/repository"
"backupx/server/internal/storage"
"backupx/server/internal/storage/codec"
"backupx/server/pkg/compress"
backupcrypto "backupx/server/pkg/crypto"
)
// RestoreService 管理恢复记录生命周期并在集群中路由执行。
@@ -203,14 +201,7 @@ func (s *RestoreService) isRemoteNode(ctx context.Context, nodeID uint) bool {
// resolveRemoteNode 返回远程节点指针(含 Status用于离线判定。
func (s *RestoreService) resolveRemoteNode(ctx context.Context, nodeID uint) *model.Node {
if s.nodeRepo == nil || s.dispatcher == nil || nodeID == 0 {
return nil
}
node, err := s.nodeRepo.FindByID(ctx, nodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
return node
return resolveRemoteExecutionNode(ctx, s.nodeRepo, s.dispatcher != nil, nodeID)
}
// executeLocally 在 Master 本地执行恢复。
@@ -335,97 +326,19 @@ func (s *RestoreService) dispatchRestoreEvent(ctx context.Context, restoreID uin
_ = s.eventDispatcher.DispatchEvent(ctx, eventType, title, body, fields)
}
// resolveProvider 复用 BackupExecutionService 的逻辑(解密 → 创建 provider
// resolveProvider 解密存储目标配置并创建 provider(共享实现)。
func (s *RestoreService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) {
target, err := s.targets.FindByID(ctx, targetID)
if err != nil {
return nil, apperror.Internal("BACKUP_STORAGE_TARGET_GET_FAILED", "无法获取存储目标详情", err)
}
if target == nil {
return nil, apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", "关联的存储目标不存在", nil)
}
configMap := map[string]any{}
if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil {
return nil, apperror.Internal("BACKUP_STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储目标配置", err)
}
return s.storageRegistry.Create(ctx, target.Type, configMap)
return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID)
}
// prepareArtifact 根据文件后缀依次解密、解压。
// prepareArtifact 根据文件后缀依次解密、解压(共享实现)
func (s *RestoreService) prepareArtifact(artifactPath string, logger *backup.ExecutionLogger) (string, error) {
currentPath := artifactPath
if strings.HasSuffix(strings.ToLower(currentPath), ".enc") {
logger.Infof("检测到加密后缀,开始解密")
decrypted, err := backupcrypto.DecryptFile(s.cipher.Key(), currentPath)
if err != nil {
return "", err
}
currentPath = decrypted
}
if strings.HasSuffix(strings.ToLower(currentPath), ".gz") {
logger.Infof("检测到 gzip 压缩,开始解压")
decompressed, err := compress.GunzipFile(currentPath)
if err != nil {
return "", err
}
currentPath = decompressed
}
return currentPath, nil
return prepareBackupArtifact(s.cipher, artifactPath, logger)
}
// buildTaskSpec 复刻 BackupExecutionService.buildTaskSpec 的核心逻辑
// buildTaskSpec 由任务构建执行规格(共享实现)
func (s *RestoreService) buildTaskSpec(task *model.BackupTask, startedAt time.Time) (backup.TaskSpec, error) {
excludePatterns := []string{}
if strings.TrimSpace(task.ExcludePatterns) != "" {
if err := json.Unmarshal([]byte(task.ExcludePatterns), &excludePatterns); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析排除规则", err)
}
}
password := ""
if strings.TrimSpace(task.DBPasswordCiphertext) != "" {
plain, err := s.cipher.Decrypt(task.DBPasswordCiphertext)
if err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECRYPT_FAILED", "无法解密数据库密码", err)
}
password = string(plain)
}
sourcePaths := []string{}
if strings.TrimSpace(task.SourcePaths) != "" {
if err := json.Unmarshal([]byte(task.SourcePaths), &sourcePaths); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err)
}
}
dbSpec := backup.DatabaseSpec{
Host: task.DBHost,
Port: task.DBPort,
User: task.DBUser,
Password: password,
Names: []string{task.DBName},
Path: task.DBPath,
}
if strings.TrimSpace(task.ExtraConfig) != "" {
extra := map[string]any{}
if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil {
return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err)
}
applyHANAExtraConfig(&dbSpec, extra)
}
return backup.TaskSpec{
ID: task.ID,
Name: task.Name,
Type: task.Type,
SourcePath: task.SourcePath,
SourcePaths: sourcePaths,
ExcludePatterns: excludePatterns,
StorageTargetID: task.StorageTargetID,
Compression: task.Compression,
Encrypt: task.Encrypt,
RetentionDays: task.RetentionDays,
MaxBackups: task.MaxBackups,
StartedAt: startedAt,
TempDir: s.tempDir,
Database: dbSpec,
}, nil
return buildBackupTaskSpec(s.cipher, task, startedAt, s.tempDir)
}
// finalize 只更新状态和错误信息,不写 log用于失败的 dispatch 路径)。

View File

@@ -15,8 +15,6 @@ import (
"backupx/server/internal/repository"
"backupx/server/internal/storage"
"backupx/server/internal/storage/codec"
"backupx/server/pkg/compress"
backupcrypto "backupx/server/pkg/crypto"
)
// VerificationService 管理备份验证(恢复演练)记录生命周期。
@@ -92,11 +90,11 @@ func (v *VerificationEventNotifier) NotifyVerificationResult(ctx context.Context
title := "BackupX 备份验证失败"
body := fmt.Sprintf("任务:%s\n验证记录#%d\n错误%s", taskName, record.ID, record.ErrorMessage)
fields := map[string]any{
"taskId": record.TaskID,
"taskName": taskName,
"verifyId": record.ID,
"taskId": record.TaskID,
"taskName": taskName,
"verifyId": record.ID,
"backupRecordId": record.BackupRecordID,
"error": record.ErrorMessage,
"error": record.ErrorMessage,
}
return v.dispatcher.DispatchEvent(ctx, model.NotificationEventVerifyFailed, title, body, fields)
}
@@ -243,23 +241,8 @@ func (s *VerificationService) Start(ctx context.Context, backupRecordID uint, mo
// validateClusterAccessible 复刻 BackupExecutionService 的跨节点 local_disk 保护。
// 避免 Master 端在错误机器下载/校验到假数据。
func (s *VerificationService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error {
if record == nil || record.NodeID == 0 || s.nodeRepo == nil {
return nil
}
node, err := s.nodeRepo.FindByID(ctx, record.NodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
target, err := s.targets.FindByID(ctx, record.StorageTargetID)
if err != nil || target == nil {
return nil
}
if strings.EqualFold(target.Type, "local_disk") {
return apperror.BadRequest("VERIFY_CROSS_NODE_LOCAL_DISK",
fmt.Sprintf("备份位于节点 %s 的本地磁盘local_diskMaster 无法跨节点验证。", node.Name),
nil)
}
return nil
return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record,
"VERIFY_CROSS_NODE_LOCAL_DISK", "验证")
}
// executeLocally 异步执行验证:下载 → 解密 → 解压 → 按类型校验。
@@ -358,24 +341,7 @@ func (s *VerificationService) executeLocally(ctx context.Context, verID uint, ta
// prepareArtifact 按后缀解密/解压,返回可读路径。
func (s *VerificationService) prepareArtifact(artifactPath string, logger *backup.ExecutionLogger) (string, error) {
current := artifactPath
if strings.HasSuffix(strings.ToLower(current), ".enc") {
logger.Infof("检测到加密后缀,开始解密")
decrypted, err := backupcrypto.DecryptFile(s.cipher.Key(), current)
if err != nil {
return "", err
}
current = decrypted
}
if strings.HasSuffix(strings.ToLower(current), ".gz") {
logger.Infof("检测到 gzip解压")
decompressed, err := compress.GunzipFile(current)
if err != nil {
return "", err
}
current = decompressed
}
return current, nil
return prepareBackupArtifact(s.cipher, artifactPath, logger)
}
// verifyByType 按任务类型分派到对应 Verify* 策略。