From 0f30e7bf52f6a0d3f0cdaa0479507103907a0996 Mon Sep 17 00:00:00 2001 From: Wu Qing <3184394176@qq.com> Date: Tue, 26 May 2026 14:12:39 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=B6=88=E9=99=A4=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E6=89=A7=E8=A1=8C=E6=9C=8D=E5=8A=A1=E5=86=97=E4=BD=99?= =?UTF-8?q?=E9=80=BB=E8=BE=91=20+=20=E4=BF=AE=E5=A4=8D=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=BB=9E=E5=90=8E=E7=BC=BA=E9=99=B7=20(#74)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 抽取备份/恢复/验证/复制四服务的复制粘贴逻辑到 execution_helpers.go(净减约 250 行);节点状态改为按 LastSeen 实时推导,消除过期 online 误判;Agent systemd 单元补齐 LimitNOFILE 与单机端一致。go build/test 全绿。 --- .../templates/agent-install.sh.tmpl | 2 + server/internal/model/node.go | 50 ++++-- server/internal/model/node_status_test.go | 58 +++++++ server/internal/scheduler/service.go | 3 +- .../service/backup_execution_service.go | 121 +------------ server/internal/service/execution_helpers.go | 163 ++++++++++++++++++ server/internal/service/node_service.go | 16 +- .../internal/service/replication_service.go | 78 +++------ server/internal/service/restore_service.go | 101 +---------- .../internal/service/verification_service.go | 48 +----- 10 files changed, 317 insertions(+), 323 deletions(-) create mode 100644 server/internal/model/node_status_test.go create mode 100644 server/internal/service/execution_helpers.go diff --git a/server/internal/installscript/templates/agent-install.sh.tmpl b/server/internal/installscript/templates/agent-install.sh.tmpl index fdee8dc..5d4adaf 100644 --- a/server/internal/installscript/templates/agent-install.sh.tmpl +++ b/server/internal/installscript/templates/agent-install.sh.tmpl @@ -70,6 +70,8 @@ Environment="BACKUPX_AGENT_TOKEN=${AGENT_TOKEN}" ExecStart=${INSTALL_PREFIX}/backupx agent --temp-dir /var/lib/backupx-agent/tmp Restart=on-failure RestartSec=10s +# Agent 需以 root 运行以读取任意源数据;与单机服务端保持一致的资源/句柄上限。 +LimitNOFILE=65535 [Install] WantedBy=multi-user.target diff --git a/server/internal/model/node.go b/server/internal/model/node.go index 1b24a5c..f230f09 100644 --- a/server/internal/model/node.go +++ b/server/internal/model/node.go @@ -10,22 +10,46 @@ const ( NodeStatusOffline = "offline" ) +// OfflineGracePeriod 节点心跳超时判定阈值:超过该时长未心跳的远程节点视为离线。 +// Agent 默认 15s 心跳一次,预留 3 次重试空间。 +const OfflineGracePeriod = 45 * time.Second + +// EffectiveStatus 返回节点的「实时」在线状态。 +// +// 存储字段 Status 由心跳置 online、由后台离线监控置 offline,二者之间存在最长一个 +// 监控周期的滞后窗口;期间 List/Get/调度器可能读到过期的 "online",进而把任务下发 +// 给一台刚刚失联的节点。本方法直接以 LastSeen 推导:远程节点若超过 OfflineGracePeriod +// 未心跳即视为 offline,消除该滞后导致的误判。本机节点恒以存储状态为准(它就是 Master +// 自身,不依赖心跳)。 +func (n *Node) EffectiveStatus(now time.Time) string { + if n == nil { + return NodeStatusOffline + } + if n.IsLocal { + return n.Status + } + if now.Sub(n.LastSeen) > OfflineGracePeriod { + return NodeStatusOffline + } + return n.Status +} + // Node represents a managed server node in the cluster. // The default "local" node is auto-created for single-machine backward compatibility. type Node struct { - ID uint `gorm:"primaryKey" json:"id"` - Name string `gorm:"size:128;uniqueIndex;not null" json:"name"` - Hostname string `gorm:"size:255" json:"hostname"` - IPAddress string `gorm:"column:ip_address;size:64" json:"ipAddress"` - Token string `gorm:"size:128;uniqueIndex;not null" json:"-"` - Status string `gorm:"size:20;not null;default:'offline'" json:"status"` - IsLocal bool `gorm:"not null;default:false" json:"isLocal"` - OS string `gorm:"size:64" json:"os"` - Arch string `gorm:"size:32" json:"arch"` - AgentVer string `gorm:"column:agent_version;size:32" json:"agentVersion"` - LastSeen time.Time `gorm:"column:last_seen" json:"lastSeen"` - PrevToken string `gorm:"size:128;index" json:"-"` - PrevTokenExpires *time.Time `gorm:"column:prev_token_expires" json:"-"` + ID uint `gorm:"primaryKey" json:"id"` + Name string `gorm:"size:128;uniqueIndex;not null" json:"name"` + Hostname string `gorm:"size:255" json:"hostname"` + IPAddress string `gorm:"column:ip_address;size:64" json:"ipAddress"` + Token string `gorm:"size:128;uniqueIndex;not null" json:"-"` + Status string `gorm:"size:20;not null;default:'offline'" json:"status"` + IsLocal bool `gorm:"not null;default:false" json:"isLocal"` + OS string `gorm:"size:64" json:"os"` + Arch string `gorm:"size:32" json:"arch"` + AgentVer string `gorm:"column:agent_version;size:32" json:"agentVersion"` + LastSeen time.Time `gorm:"column:last_seen" json:"lastSeen"` + PrevToken string `gorm:"size:128;index" json:"-"` + PrevTokenExpires *time.Time `gorm:"column:prev_token_expires" json:"-"` // MaxConcurrent 该节点允许的最大并发任务数(0=不限制,沿用全局 cfg.Backup.MaxConcurrent)。 // 用于大集群中限制单节点资源占用:例如小内存 Agent 节点可配 1,避免多个大备份同时跑挤爆。 MaxConcurrent int `gorm:"column:max_concurrent;not null;default:0" json:"maxConcurrent"` diff --git a/server/internal/model/node_status_test.go b/server/internal/model/node_status_test.go new file mode 100644 index 0000000..b7d5350 --- /dev/null +++ b/server/internal/model/node_status_test.go @@ -0,0 +1,58 @@ +package model + +import ( + "testing" + "time" +) + +func TestNodeEffectiveStatus(t *testing.T) { + now := time.Date(2026, 5, 26, 12, 0, 0, 0, time.UTC) + cases := []struct { + name string + node *Node + want string + }{ + { + name: "remote fresh heartbeat → stored online", + node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-10 * time.Second)}, + want: NodeStatusOnline, + }, + { + name: "remote stale heartbeat but stored online → derived offline", + node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-90 * time.Second)}, + want: NodeStatusOffline, + }, + { + name: "remote just past grace period → offline", + node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-(OfflineGracePeriod + time.Second))}, + want: NodeStatusOffline, + }, + { + name: "remote within grace period → online", + node: &Node{IsLocal: false, Status: NodeStatusOnline, LastSeen: now.Add(-(OfflineGracePeriod - time.Second))}, + want: NodeStatusOnline, + }, + { + name: "local node ignores LastSeen → stored online", + node: &Node{IsLocal: true, Status: NodeStatusOnline, LastSeen: now.Add(-24 * time.Hour)}, + want: NodeStatusOnline, + }, + { + name: "remote stored offline stays offline", + node: &Node{IsLocal: false, Status: NodeStatusOffline, LastSeen: now.Add(-5 * time.Second)}, + want: NodeStatusOffline, + }, + { + name: "nil node → offline", + node: nil, + want: NodeStatusOffline, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := tc.node.EffectiveStatus(now); got != tc.want { + t.Fatalf("EffectiveStatus = %q, want %q", got, tc.want) + } + }) + } +} diff --git a/server/internal/scheduler/service.go b/server/internal/scheduler/service.go index f24717a..6e64247 100644 --- a/server/internal/scheduler/service.go +++ b/server/internal/scheduler/service.go @@ -167,7 +167,8 @@ func (s *Service) syncTaskLocked(task *model.BackupTask) error { // 集群感知:若任务绑定了离线的远程节点,跳过本轮触发避免堆积 failed 记录 if taskNodeID > 0 && s.nodes != nil { node, err := s.nodes.FindByID(context.Background(), taskNodeID) - if err == nil && node != nil && !node.IsLocal && node.Status != model.NodeStatusOnline { + // 用实时推导的状态判定,避免后台监控刷新前把任务下发给刚失联的节点。 + if err == nil && node != nil && !node.IsLocal && node.EffectiveStatus(time.Now().UTC()) != model.NodeStatusOnline { if s.logger != nil { s.logger.Warn("skip scheduled run: target node offline", zap.Uint("task_id", taskID), zap.String("task_name", taskName), diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index daf7d71..71f25fc 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -332,28 +332,8 @@ func (s *BackupExecutionService) deleteRemoteLocalDiskObject(ctx context.Context // provider 指向的是 Master 本机的同名路径,访问会静默取错文件或 404。明确拒绝 // 让用户知情,避免假成功。 func (s *BackupExecutionService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error { - if record == nil || record.NodeID == 0 { - return nil - } - // 检查是否为远程节点 - if s.nodeRepo == nil { - return nil - } - node, err := s.nodeRepo.FindByID(ctx, record.NodeID) - if err != nil || node == nil || node.IsLocal { - return nil - } - // 检查存储类型是否为 local_disk(跨节点不可达) - target, err := s.targets.FindByID(ctx, record.StorageTargetID) - if err != nil || target == nil { - return nil - } - if strings.EqualFold(target.Type, "local_disk") { - return apperror.BadRequest("BACKUP_RECORD_CROSS_NODE_LOCAL_DISK", - fmt.Sprintf("该备份位于节点 %s 的本地磁盘(local_disk),Master 无法跨节点访问。请登录该节点或改用云存储后再操作。", node.Name), - nil) - } - return nil + return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record, + "BACKUP_RECORD_CROSS_NODE_LOCAL_DISK", "访问。请登录该节点或改用云存储后再操作") } func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async bool) (*BackupRecordDetail, error) { @@ -593,14 +573,7 @@ func (s *BackupExecutionService) isRemoteNode(ctx context.Context, nodeID uint) // resolveRemoteNode 返回 NodeID 对应的远程节点指针,或 nil 表示本机执行。 // 相比 isRemoteNode,它让调用方能读取节点状态(在线/离线)做进一步判断。 func (s *BackupExecutionService) resolveRemoteNode(ctx context.Context, nodeID uint) *model.Node { - if s.nodeRepo == nil || s.agentDispatcher == nil || nodeID == 0 { - return nil - } - node, err := s.nodeRepo.FindByID(ctx, nodeID) - if err != nil || node == nil || node.IsLocal { - return nil - } - return node + return resolveRemoteExecutionNode(ctx, s.nodeRepo, s.agentDispatcher != nil, nodeID) } func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.BackupTask, recordID uint, startedAt time.Time) { @@ -952,78 +925,11 @@ func (s *BackupExecutionService) resolveProviderForNode(ctx context.Context, tar LowLevelRetries: s.retries, BandwidthLimit: s.effectiveBandwidth(ctx, nodeID), }) - target, err := s.targets.FindByID(ctx, targetID) - if err != nil { - return nil, apperror.Internal("BACKUP_STORAGE_TARGET_GET_FAILED", "无法获取存储目标详情", err) - } - if target == nil { - return nil, apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", "关联的存储目标不存在", nil) - } - configMap := map[string]any{} - if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil { - return nil, apperror.Internal("BACKUP_STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储目标配置", err) - } - provider, err := s.storageRegistry.Create(ctx, target.Type, configMap) - if err != nil { - return nil, err - } - return provider, nil + return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID) } func (s *BackupExecutionService) buildTaskSpec(task *model.BackupTask, startedAt time.Time) (backup.TaskSpec, error) { - excludePatterns := []string{} - if strings.TrimSpace(task.ExcludePatterns) != "" { - if err := json.Unmarshal([]byte(task.ExcludePatterns), &excludePatterns); err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析排除规则", err) - } - } - password := "" - if strings.TrimSpace(task.DBPasswordCiphertext) != "" { - plain, err := s.cipher.Decrypt(task.DBPasswordCiphertext) - if err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECRYPT_FAILED", "无法解密数据库密码", err) - } - password = string(plain) - } - sourcePaths := []string{} - if strings.TrimSpace(task.SourcePaths) != "" { - if err := json.Unmarshal([]byte(task.SourcePaths), &sourcePaths); err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err) - } - } - dbSpec := backup.DatabaseSpec{ - Host: task.DBHost, - Port: task.DBPort, - User: task.DBUser, - Password: password, - Names: []string{task.DBName}, - Path: task.DBPath, - } - // 解析 ExtraConfig 填充类型特有字段(目前主要用于 SAP HANA) - if strings.TrimSpace(task.ExtraConfig) != "" { - extra := map[string]any{} - if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err) - } - applyHANAExtraConfig(&dbSpec, extra) - } - return backup.TaskSpec{ - ID: task.ID, - Name: task.Name, - Type: task.Type, - SourcePath: task.SourcePath, - SourcePaths: sourcePaths, - ExcludePatterns: excludePatterns, - StorageTargetID: task.StorageTargetID, - StorageTargetType: "", - Compression: task.Compression, - Encrypt: task.Encrypt, - RetentionDays: task.RetentionDays, - MaxBackups: task.MaxBackups, - StartedAt: startedAt, - TempDir: s.tempDir, - Database: dbSpec, - }, nil + return buildBackupTaskSpec(s.cipher, task, startedAt, s.tempDir) } // applyHANAExtraConfig 从 ExtraConfig map 中提取 SAP HANA 字段填入 DatabaseSpec。 @@ -1065,22 +971,7 @@ func (s *BackupExecutionService) loadRecordProvider(ctx context.Context, recordI } func (s *BackupExecutionService) prepareArtifactForRestore(artifactPath string) (string, error) { - currentPath := artifactPath - if strings.HasSuffix(strings.ToLower(currentPath), ".enc") { - decryptedPath, err := backupcrypto.DecryptFile(s.cipher.Key(), currentPath) - if err != nil { - return "", err - } - currentPath = decryptedPath - } - if strings.HasSuffix(strings.ToLower(currentPath), ".gz") { - decompressedPath, err := compress.GunzipFile(currentPath) - if err != nil { - return "", err - } - currentPath = decompressedPath - } - return currentPath, nil + return prepareBackupArtifact(s.cipher, artifactPath, nil) } func (s *BackupExecutionService) getRecordDetail(ctx context.Context, recordID uint) (*BackupRecordDetail, error) { diff --git a/server/internal/service/execution_helpers.go b/server/internal/service/execution_helpers.go new file mode 100644 index 0000000..17004f3 --- /dev/null +++ b/server/internal/service/execution_helpers.go @@ -0,0 +1,163 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "backupx/server/internal/apperror" + "backupx/server/internal/backup" + "backupx/server/internal/model" + "backupx/server/internal/repository" + "backupx/server/internal/storage" + "backupx/server/internal/storage/codec" + "backupx/server/pkg/compress" + backupcrypto "backupx/server/pkg/crypto" +) + +// 本文件集中放置「备份执行 / 恢复 / 验证 / 复制」四个执行服务共享的执行期辅助逻辑。 +// +// 历史上这些函数(解密存储配置创建 provider、按后缀解密解压归档、判定远程节点、 +// 跨节点 local_disk 保护、构建任务执行规格)在四个服务里各复制了一份,差异仅在 +// 字段名与少量错误码/日志文案。重复实现既增加维护成本,也容易出现"改了一处忘了 +// 另一处"的不一致缺陷。这里抽取为单一实现,各服务通过薄封装方法委托调用,调用方 +// 无需改动。 + +// resolveStorageProvider 查询存储目标、解密其配置并创建 provider。 +func resolveStorageProvider(ctx context.Context, targets repository.StorageTargetRepository, registry *storage.Registry, cipher *codec.ConfigCipher, targetID uint) (storage.StorageProvider, error) { + target, err := targets.FindByID(ctx, targetID) + if err != nil { + return nil, apperror.Internal("BACKUP_STORAGE_TARGET_GET_FAILED", "无法获取存储目标详情", err) + } + if target == nil { + return nil, apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", "关联的存储目标不存在", nil) + } + configMap := map[string]any{} + if err := cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil { + return nil, apperror.Internal("BACKUP_STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储目标配置", err) + } + return registry.Create(ctx, target.Type, configMap) +} + +// prepareBackupArtifact 按文件后缀依次解密(.enc)与解压(.gz),返回最终可读路径。 +// logger 可为 nil(此时静默执行)。 +func prepareBackupArtifact(cipher *codec.ConfigCipher, artifactPath string, logger *backup.ExecutionLogger) (string, error) { + current := artifactPath + if strings.HasSuffix(strings.ToLower(current), ".enc") { + if logger != nil { + logger.Infof("检测到加密后缀,开始解密") + } + decrypted, err := backupcrypto.DecryptFile(cipher.Key(), current) + if err != nil { + return "", err + } + current = decrypted + } + if strings.HasSuffix(strings.ToLower(current), ".gz") { + if logger != nil { + logger.Infof("检测到 gzip 压缩,开始解压") + } + decompressed, err := compress.GunzipFile(current) + if err != nil { + return "", err + } + current = decompressed + } + return current, nil +} + +// resolveRemoteExecutionNode 返回远程(非本机)节点指针,用于判定任务应下发给 +// Agent 还是在 Master 本地执行。clusterEnabled 通常为「该服务是否注入了 Agent +// 下发能力」。本机 / 未启用集群 / nodeID=0 / 未找到时返回 nil(走本地执行)。 +func resolveRemoteExecutionNode(ctx context.Context, nodeRepo repository.NodeRepository, clusterEnabled bool, nodeID uint) *model.Node { + if nodeRepo == nil || !clusterEnabled || nodeID == 0 { + return nil + } + node, err := nodeRepo.FindByID(ctx, nodeID) + if err != nil || node == nil || node.IsLocal { + return nil + } + return node +} + +// validateCrossNodeLocalDisk 跨节点 local_disk 保护:若备份记录归属某远程节点, +// 且其存储目标是 local_disk(数据位于该节点本地磁盘),Master 无法跨节点访问, +// 直接返回错误。errCode/opName 由各服务定制,以给出贴合场景的提示文案。 +func validateCrossNodeLocalDisk(ctx context.Context, nodeRepo repository.NodeRepository, targets repository.StorageTargetRepository, record *model.BackupRecord, errCode, opName string) error { + if record == nil || record.NodeID == 0 || nodeRepo == nil { + return nil + } + node, err := nodeRepo.FindByID(ctx, record.NodeID) + if err != nil || node == nil || node.IsLocal { + return nil + } + target, err := targets.FindByID(ctx, record.StorageTargetID) + if err != nil || target == nil { + return nil + } + if strings.EqualFold(target.Type, "local_disk") { + return apperror.BadRequest(errCode, + fmt.Sprintf("备份位于节点 %s 的本地磁盘(local_disk),Master 无法跨节点%s。", node.Name, opName), + nil) + } + return nil +} + +// buildBackupTaskSpec 由备份任务构建执行规格:解析排除规则/源路径、解密 DB 密码、 +// 套用 ExtraConfig(SAP HANA 等类型特有字段)。被备份执行与恢复服务共享。 +func buildBackupTaskSpec(cipher *codec.ConfigCipher, task *model.BackupTask, startedAt time.Time, tempDir string) (backup.TaskSpec, error) { + excludePatterns := []string{} + if strings.TrimSpace(task.ExcludePatterns) != "" { + if err := json.Unmarshal([]byte(task.ExcludePatterns), &excludePatterns); err != nil { + return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析排除规则", err) + } + } + password := "" + if strings.TrimSpace(task.DBPasswordCiphertext) != "" { + plain, err := cipher.Decrypt(task.DBPasswordCiphertext) + if err != nil { + return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECRYPT_FAILED", "无法解密数据库密码", err) + } + password = string(plain) + } + sourcePaths := []string{} + if strings.TrimSpace(task.SourcePaths) != "" { + if err := json.Unmarshal([]byte(task.SourcePaths), &sourcePaths); err != nil { + return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err) + } + } + dbSpec := backup.DatabaseSpec{ + Host: task.DBHost, + Port: task.DBPort, + User: task.DBUser, + Password: password, + Names: []string{task.DBName}, + Path: task.DBPath, + } + // 解析 ExtraConfig 填充类型特有字段(目前主要用于 SAP HANA) + if strings.TrimSpace(task.ExtraConfig) != "" { + extra := map[string]any{} + if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil { + return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err) + } + applyHANAExtraConfig(&dbSpec, extra) + } + return backup.TaskSpec{ + ID: task.ID, + Name: task.Name, + Type: task.Type, + SourcePath: task.SourcePath, + SourcePaths: sourcePaths, + ExcludePatterns: excludePatterns, + StorageTargetID: task.StorageTargetID, + Compression: task.Compression, + Encrypt: task.Encrypt, + RetentionDays: task.RetentionDays, + MaxBackups: task.MaxBackups, + StartedAt: startedAt, + TempDir: tempDir, + Database: dbSpec, + }, nil +} diff --git a/server/internal/service/node_service.go b/server/internal/service/node_service.go index 84dcfeb..57433f8 100644 --- a/server/internal/service/node_service.go +++ b/server/internal/service/node_service.go @@ -171,12 +171,14 @@ func (s *NodeService) loadQueueSummaries(ctx context.Context) map[uint]repositor } func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentCommandQueueSummary) NodeSummary { + // 以 LastSeen 实时推导状态,避免读到后台监控尚未刷新的过期 "online"。 + effStatus := node.EffectiveStatus(time.Now().UTC()) summary := NodeSummary{ ID: node.ID, Name: node.Name, Hostname: node.Hostname, IPAddress: node.IPAddress, - Status: node.Status, + Status: effStatus, IsLocal: node.IsLocal, OS: node.OS, Arch: node.Arch, @@ -195,7 +197,7 @@ func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentComm }, RunningTasks: queue.Running, LastError: queue.LastError, - Health: nodeHealth(node, queue), + Health: nodeHealth(effStatus, queue), } if queue.OldestActiveAt != nil { summary.Queue.OldestActiveAgeS = int(time.Since(*queue.OldestActiveAt).Seconds()) @@ -203,8 +205,8 @@ func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentComm return summary } -func nodeHealth(node *model.Node, queue repository.AgentCommandQueueSummary) string { - if node.Status != model.NodeStatusOnline { +func nodeHealth(status string, queue repository.AgentCommandQueueSummary) string { + if status != model.NodeStatusOnline { return "offline" } if queue.Timeouts > 0 || strings.TrimSpace(queue.LastError) != "" { @@ -303,9 +305,9 @@ func (s *NodeService) ListDirectory(ctx context.Context, nodeID uint, path strin return result, nil } -// OfflineThreshold 节点被判定为离线的心跳超时阈值。 -// Agent 默认 15s 心跳一次;45s 未见视为离线,预留 3 次重试空间。 -const OfflineThreshold = 45 * time.Second +// OfflineThreshold 节点被判定为离线的心跳超时阈值,与 model.EffectiveStatus 共用同一阈值, +// 保证「后台监控持久化的 offline」与「读路径实时推导的 offline」判定一致。 +const OfflineThreshold = model.OfflineGracePeriod // StartOfflineMonitor 启动后台 goroutine,定期把超时未心跳的节点标记为离线。 // 传入的 ctx 被取消后退出。 diff --git a/server/internal/service/replication_service.go b/server/internal/service/replication_service.go index 9e8f83f..b811472 100644 --- a/server/internal/service/replication_service.go +++ b/server/internal/service/replication_service.go @@ -82,22 +82,22 @@ func (s *ReplicationService) SetEventDispatcher(dispatcher EventDispatcher) { // ReplicationRecordSummary 列表项。 type ReplicationRecordSummary struct { - ID uint `json:"id"` - BackupRecordID uint `json:"backupRecordId"` - TaskID uint `json:"taskId"` - SourceTargetID uint `json:"sourceTargetId"` - SourceTargetName string `json:"sourceTargetName"` - DestTargetID uint `json:"destTargetId"` - DestTargetName string `json:"destTargetName"` - Status string `json:"status"` - StoragePath string `json:"storagePath"` - FileSize int64 `json:"fileSize"` - Checksum string `json:"checksum"` - ErrorMessage string `json:"errorMessage"` - DurationSeconds int `json:"durationSeconds"` - TriggeredBy string `json:"triggeredBy"` - StartedAt time.Time `json:"startedAt"` - CompletedAt *time.Time `json:"completedAt,omitempty"` + ID uint `json:"id"` + BackupRecordID uint `json:"backupRecordId"` + TaskID uint `json:"taskId"` + SourceTargetID uint `json:"sourceTargetId"` + SourceTargetName string `json:"sourceTargetName"` + DestTargetID uint `json:"destTargetId"` + DestTargetName string `json:"destTargetName"` + Status string `json:"status"` + StoragePath string `json:"storagePath"` + FileSize int64 `json:"fileSize"` + Checksum string `json:"checksum"` + ErrorMessage string `json:"errorMessage"` + DurationSeconds int `json:"durationSeconds"` + TriggeredBy string `json:"triggeredBy"` + StartedAt time.Time `json:"startedAt"` + CompletedAt *time.Time `json:"completedAt,omitempty"` } type ReplicationRecordListInput struct { @@ -262,39 +262,13 @@ func (s *ReplicationService) executeReplication(ctx context.Context, repID uint) } func (s *ReplicationService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) { - target, err := s.targets.FindByID(ctx, targetID) - if err != nil { - return nil, apperror.Internal("STORAGE_TARGET_GET_FAILED", "无法获取存储目标", err) - } - if target == nil { - return nil, apperror.BadRequest("STORAGE_TARGET_INVALID", "存储目标不存在", nil) - } - configMap := map[string]any{} - if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil { - return nil, apperror.Internal("STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储配置", err) - } - return s.storageRegistry.Create(ctx, target.Type, configMap) + return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID) } -// validateClusterAccessible 拒绝跨节点 local_disk 源(Master 无法拉取) +// validateClusterAccessible 拒绝跨节点 local_disk 源(Master 无法拉取)。 func (s *ReplicationService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error { - if record == nil || record.NodeID == 0 || s.nodeRepo == nil { - return nil - } - node, err := s.nodeRepo.FindByID(ctx, record.NodeID) - if err != nil || node == nil || node.IsLocal { - return nil - } - target, err := s.targets.FindByID(ctx, record.StorageTargetID) - if err != nil || target == nil { - return nil - } - if strings.EqualFold(target.Type, "local_disk") { - return apperror.BadRequest("REPLICATION_CROSS_NODE_LOCAL_DISK", - fmt.Sprintf("备份位于节点 %s 的本地磁盘(local_disk),Master 无法跨节点复制。请改用云存储作为主备份。", node.Name), - nil) - } - return nil + return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record, + "REPLICATION_CROSS_NODE_LOCAL_DISK", "复制。请改用云存储作为主备份") } func (s *ReplicationService) dispatchFailed(ctx context.Context, rep *model.ReplicationRecord, message string) { @@ -304,12 +278,12 @@ func (s *ReplicationService) dispatchFailed(ctx context.Context, rep *model.Repl title := "BackupX 备份复制失败" body := fmt.Sprintf("备份记录:#%d\n源 → 目标:#%d → #%d\n错误:%s", rep.BackupRecordID, rep.SourceTargetID, rep.DestTargetID, message) fields := map[string]any{ - "replicationId": rep.ID, - "backupRecordId": rep.BackupRecordID, - "taskId": rep.TaskID, - "sourceTargetId": rep.SourceTargetID, - "destTargetId": rep.DestTargetID, - "error": message, + "replicationId": rep.ID, + "backupRecordId": rep.BackupRecordID, + "taskId": rep.TaskID, + "sourceTargetId": rep.SourceTargetID, + "destTargetId": rep.DestTargetID, + "error": message, } _ = s.eventDispatcher.DispatchEvent(ctx, model.NotificationEventReplicationFailed, title, body, fields) } diff --git a/server/internal/service/restore_service.go b/server/internal/service/restore_service.go index f679d9b..c418cc1 100644 --- a/server/internal/service/restore_service.go +++ b/server/internal/service/restore_service.go @@ -16,8 +16,6 @@ import ( "backupx/server/internal/repository" "backupx/server/internal/storage" "backupx/server/internal/storage/codec" - "backupx/server/pkg/compress" - backupcrypto "backupx/server/pkg/crypto" ) // RestoreService 管理恢复记录生命周期并在集群中路由执行。 @@ -203,14 +201,7 @@ func (s *RestoreService) isRemoteNode(ctx context.Context, nodeID uint) bool { // resolveRemoteNode 返回远程节点指针(含 Status),用于离线判定。 func (s *RestoreService) resolveRemoteNode(ctx context.Context, nodeID uint) *model.Node { - if s.nodeRepo == nil || s.dispatcher == nil || nodeID == 0 { - return nil - } - node, err := s.nodeRepo.FindByID(ctx, nodeID) - if err != nil || node == nil || node.IsLocal { - return nil - } - return node + return resolveRemoteExecutionNode(ctx, s.nodeRepo, s.dispatcher != nil, nodeID) } // executeLocally 在 Master 本地执行恢复。 @@ -335,97 +326,19 @@ func (s *RestoreService) dispatchRestoreEvent(ctx context.Context, restoreID uin _ = s.eventDispatcher.DispatchEvent(ctx, eventType, title, body, fields) } -// resolveProvider 复用 BackupExecutionService 的逻辑(解密 → 创建 provider)。 +// resolveProvider 解密存储目标配置并创建 provider(共享实现)。 func (s *RestoreService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) { - target, err := s.targets.FindByID(ctx, targetID) - if err != nil { - return nil, apperror.Internal("BACKUP_STORAGE_TARGET_GET_FAILED", "无法获取存储目标详情", err) - } - if target == nil { - return nil, apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", "关联的存储目标不存在", nil) - } - configMap := map[string]any{} - if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil { - return nil, apperror.Internal("BACKUP_STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储目标配置", err) - } - return s.storageRegistry.Create(ctx, target.Type, configMap) + return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID) } -// prepareArtifact 根据文件后缀依次解密、解压。 +// prepareArtifact 根据文件后缀依次解密、解压(共享实现)。 func (s *RestoreService) prepareArtifact(artifactPath string, logger *backup.ExecutionLogger) (string, error) { - currentPath := artifactPath - if strings.HasSuffix(strings.ToLower(currentPath), ".enc") { - logger.Infof("检测到加密后缀,开始解密") - decrypted, err := backupcrypto.DecryptFile(s.cipher.Key(), currentPath) - if err != nil { - return "", err - } - currentPath = decrypted - } - if strings.HasSuffix(strings.ToLower(currentPath), ".gz") { - logger.Infof("检测到 gzip 压缩,开始解压") - decompressed, err := compress.GunzipFile(currentPath) - if err != nil { - return "", err - } - currentPath = decompressed - } - return currentPath, nil + return prepareBackupArtifact(s.cipher, artifactPath, logger) } -// buildTaskSpec 复刻 BackupExecutionService.buildTaskSpec 的核心逻辑。 +// buildTaskSpec 由任务构建执行规格(共享实现)。 func (s *RestoreService) buildTaskSpec(task *model.BackupTask, startedAt time.Time) (backup.TaskSpec, error) { - excludePatterns := []string{} - if strings.TrimSpace(task.ExcludePatterns) != "" { - if err := json.Unmarshal([]byte(task.ExcludePatterns), &excludePatterns); err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析排除规则", err) - } - } - password := "" - if strings.TrimSpace(task.DBPasswordCiphertext) != "" { - plain, err := s.cipher.Decrypt(task.DBPasswordCiphertext) - if err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECRYPT_FAILED", "无法解密数据库密码", err) - } - password = string(plain) - } - sourcePaths := []string{} - if strings.TrimSpace(task.SourcePaths) != "" { - if err := json.Unmarshal([]byte(task.SourcePaths), &sourcePaths); err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析源路径配置", err) - } - } - dbSpec := backup.DatabaseSpec{ - Host: task.DBHost, - Port: task.DBPort, - User: task.DBUser, - Password: password, - Names: []string{task.DBName}, - Path: task.DBPath, - } - if strings.TrimSpace(task.ExtraConfig) != "" { - extra := map[string]any{} - if err := json.Unmarshal([]byte(task.ExtraConfig), &extra); err != nil { - return backup.TaskSpec{}, apperror.Internal("BACKUP_TASK_DECODE_FAILED", "无法解析扩展配置", err) - } - applyHANAExtraConfig(&dbSpec, extra) - } - return backup.TaskSpec{ - ID: task.ID, - Name: task.Name, - Type: task.Type, - SourcePath: task.SourcePath, - SourcePaths: sourcePaths, - ExcludePatterns: excludePatterns, - StorageTargetID: task.StorageTargetID, - Compression: task.Compression, - Encrypt: task.Encrypt, - RetentionDays: task.RetentionDays, - MaxBackups: task.MaxBackups, - StartedAt: startedAt, - TempDir: s.tempDir, - Database: dbSpec, - }, nil + return buildBackupTaskSpec(s.cipher, task, startedAt, s.tempDir) } // finalize 只更新状态和错误信息,不写 log(用于失败的 dispatch 路径)。 diff --git a/server/internal/service/verification_service.go b/server/internal/service/verification_service.go index b7e5912..d8cdcc2 100644 --- a/server/internal/service/verification_service.go +++ b/server/internal/service/verification_service.go @@ -15,8 +15,6 @@ import ( "backupx/server/internal/repository" "backupx/server/internal/storage" "backupx/server/internal/storage/codec" - "backupx/server/pkg/compress" - backupcrypto "backupx/server/pkg/crypto" ) // VerificationService 管理备份验证(恢复演练)记录生命周期。 @@ -92,11 +90,11 @@ func (v *VerificationEventNotifier) NotifyVerificationResult(ctx context.Context title := "BackupX 备份验证失败" body := fmt.Sprintf("任务:%s\n验证记录:#%d\n错误:%s", taskName, record.ID, record.ErrorMessage) fields := map[string]any{ - "taskId": record.TaskID, - "taskName": taskName, - "verifyId": record.ID, + "taskId": record.TaskID, + "taskName": taskName, + "verifyId": record.ID, "backupRecordId": record.BackupRecordID, - "error": record.ErrorMessage, + "error": record.ErrorMessage, } return v.dispatcher.DispatchEvent(ctx, model.NotificationEventVerifyFailed, title, body, fields) } @@ -243,23 +241,8 @@ func (s *VerificationService) Start(ctx context.Context, backupRecordID uint, mo // validateClusterAccessible 复刻 BackupExecutionService 的跨节点 local_disk 保护。 // 避免 Master 端在错误机器下载/校验到假数据。 func (s *VerificationService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error { - if record == nil || record.NodeID == 0 || s.nodeRepo == nil { - return nil - } - node, err := s.nodeRepo.FindByID(ctx, record.NodeID) - if err != nil || node == nil || node.IsLocal { - return nil - } - target, err := s.targets.FindByID(ctx, record.StorageTargetID) - if err != nil || target == nil { - return nil - } - if strings.EqualFold(target.Type, "local_disk") { - return apperror.BadRequest("VERIFY_CROSS_NODE_LOCAL_DISK", - fmt.Sprintf("备份位于节点 %s 的本地磁盘(local_disk),Master 无法跨节点验证。", node.Name), - nil) - } - return nil + return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record, + "VERIFY_CROSS_NODE_LOCAL_DISK", "验证") } // executeLocally 异步执行验证:下载 → 解密 → 解压 → 按类型校验。 @@ -358,24 +341,7 @@ func (s *VerificationService) executeLocally(ctx context.Context, verID uint, ta // prepareArtifact 按后缀解密/解压,返回可读路径。 func (s *VerificationService) prepareArtifact(artifactPath string, logger *backup.ExecutionLogger) (string, error) { - current := artifactPath - if strings.HasSuffix(strings.ToLower(current), ".enc") { - logger.Infof("检测到加密后缀,开始解密") - decrypted, err := backupcrypto.DecryptFile(s.cipher.Key(), current) - if err != nil { - return "", err - } - current = decrypted - } - if strings.HasSuffix(strings.ToLower(current), ".gz") { - logger.Infof("检测到 gzip,解压") - decompressed, err := compress.GunzipFile(current) - if err != nil { - return "", err - } - current = decompressed - } - return current, nil + return prepareBackupArtifact(s.cipher, artifactPath, logger) } // verifyByType 按任务类型分派到对应 Verify* 策略。