Files
BackupX/server/internal/service/replication_service.go
Wu Qing 0f30e7bf52 refactor: 消除集群执行服务冗余逻辑 + 修复节点状态滞后缺陷 (#74)
抽取备份/恢复/验证/复制四服务的复制粘贴逻辑到 execution_helpers.go(净减约 250 行);节点状态改为按 LastSeen 实时推导,消除过期 online 误判;Agent systemd 单元补齐 LimitNOFILE 与单机端一致。go build/test 全绿。
2026-05-26 14:12:39 +08:00

358 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"backupx/server/internal/apperror"
"backupx/server/internal/metrics"
"backupx/server/internal/model"
"backupx/server/internal/repository"
"backupx/server/internal/storage"
"backupx/server/internal/storage/codec"
)
// ReplicationService 实现备份复制3-2-1 规则核心)。
// 语义:把源备份对象从 source storage target 镜像到 dest target保持 StoragePath。
//
// 触发路径:
// 1. 自动BackupExecutionService 备份成功后调用 TriggerAutoReplication
// 2. 手动:前端通过 BackupRecord 详情页触发 Start
//
// 执行模型:异步 + 节点无关(复制在 Master 本地 download → upload
// 跨节点 local_disk 场景不支持(与 Download/Delete 保护一致)。
type ReplicationService struct {
replications repository.ReplicationRecordRepository
records repository.BackupRecordRepository
targets repository.StorageTargetRepository
nodeRepo repository.NodeRepository
storageRegistry *storage.Registry
cipher *codec.ConfigCipher
eventDispatcher EventDispatcher
tempDir string
semaphore chan struct{}
async func(func())
now func() time.Time
metrics *metrics.Metrics
}
// SetMetrics 注入 Prometheus 采集器。
func (s *ReplicationService) SetMetrics(m *metrics.Metrics) {
s.metrics = m
}
func NewReplicationService(
replications repository.ReplicationRecordRepository,
records repository.BackupRecordRepository,
targets repository.StorageTargetRepository,
nodeRepo repository.NodeRepository,
storageRegistry *storage.Registry,
cipher *codec.ConfigCipher,
tempDir string,
maxConcurrent int,
) *ReplicationService {
if tempDir == "" {
tempDir = "/tmp/backupx-replicate"
}
if maxConcurrent <= 0 {
maxConcurrent = 2
}
return &ReplicationService{
replications: replications,
records: records,
targets: targets,
nodeRepo: nodeRepo,
storageRegistry: storageRegistry,
cipher: cipher,
tempDir: tempDir,
semaphore: make(chan struct{}, maxConcurrent),
async: func(job func()) { go job() },
now: func() time.Time { return time.Now().UTC() },
}
}
func (s *ReplicationService) SetEventDispatcher(dispatcher EventDispatcher) {
s.eventDispatcher = dispatcher
}
// ReplicationRecordSummary 列表项。
type ReplicationRecordSummary struct {
ID uint `json:"id"`
BackupRecordID uint `json:"backupRecordId"`
TaskID uint `json:"taskId"`
SourceTargetID uint `json:"sourceTargetId"`
SourceTargetName string `json:"sourceTargetName"`
DestTargetID uint `json:"destTargetId"`
DestTargetName string `json:"destTargetName"`
Status string `json:"status"`
StoragePath string `json:"storagePath"`
FileSize int64 `json:"fileSize"`
Checksum string `json:"checksum"`
ErrorMessage string `json:"errorMessage"`
DurationSeconds int `json:"durationSeconds"`
TriggeredBy string `json:"triggeredBy"`
StartedAt time.Time `json:"startedAt"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
}
type ReplicationRecordListInput struct {
TaskID *uint
BackupRecordID *uint
DestTargetID *uint
Status string
DateFrom *time.Time
DateTo *time.Time
Limit int
Offset int
}
// TriggerAutoReplication 备份成功钩子:根据 task.ReplicationTargetIDs 自动派发复制。
// best-effort单个目标失败不影响其他。
func (s *ReplicationService) TriggerAutoReplication(ctx context.Context, task *model.BackupTask, record *model.BackupRecord) {
if task == nil || record == nil {
return
}
destIDs := parseUintCSV(task.ReplicationTargetIDs)
if len(destIDs) == 0 {
return
}
// 跨节点 local_disk 场景保护Master 无法访问远程节点本地文件
if err := s.validateClusterAccessible(ctx, record); err != nil {
return
}
for _, destID := range destIDs {
if destID == record.StorageTargetID {
continue // 源与目标相同,跳过
}
_, _ = s.Start(ctx, record.ID, destID, "system")
}
}
// Start 开始一次复制。同步创建 ReplicationRecord → 异步执行。
func (s *ReplicationService) Start(ctx context.Context, backupRecordID, destTargetID uint, triggeredBy string) (*ReplicationRecordSummary, error) {
record, err := s.records.FindByID(ctx, backupRecordID)
if err != nil {
return nil, apperror.Internal("BACKUP_RECORD_GET_FAILED", "无法获取备份记录", err)
}
if record == nil {
return nil, apperror.New(404, "BACKUP_RECORD_NOT_FOUND", "备份记录不存在", nil)
}
if record.Status != model.BackupRecordStatusSuccess {
return nil, apperror.BadRequest("REPLICATION_SOURCE_INVALID", "只能复制成功的备份记录", nil)
}
if destTargetID == 0 || destTargetID == record.StorageTargetID {
return nil, apperror.BadRequest("REPLICATION_DEST_INVALID", "目标存储无效或与源相同", nil)
}
if err := s.validateClusterAccessible(ctx, record); err != nil {
return nil, err
}
dest, err := s.targets.FindByID(ctx, destTargetID)
if err != nil || dest == nil {
return nil, apperror.BadRequest("REPLICATION_DEST_INVALID", "目标存储不存在", err)
}
if !dest.Enabled {
return nil, apperror.BadRequest("REPLICATION_DEST_DISABLED", "目标存储已禁用", nil)
}
startedAt := s.now()
rep := &model.ReplicationRecord{
BackupRecordID: backupRecordID,
TaskID: record.TaskID,
SourceTargetID: record.StorageTargetID,
DestTargetID: destTargetID,
Status: model.ReplicationStatusRunning,
StoragePath: record.StoragePath,
TriggeredBy: strings.TrimSpace(triggeredBy),
StartedAt: startedAt,
}
if err := s.replications.Create(ctx, rep); err != nil {
return nil, apperror.Internal("REPLICATION_CREATE_FAILED", "无法创建复制记录", err)
}
s.async(func() {
s.executeReplication(context.Background(), rep.ID)
})
summary := s.toSummary(rep, "", dest.Name)
return &summary, nil
}
// executeReplication 实际执行:下载源对象到本地临时文件 → 上传到目标存储。
func (s *ReplicationService) executeReplication(ctx context.Context, repID uint) {
s.semaphore <- struct{}{}
defer func() { <-s.semaphore }()
rep, err := s.replications.FindByID(ctx, repID)
if err != nil || rep == nil {
return
}
status := model.ReplicationStatusFailed
errMessage := ""
fileSize := int64(0)
defer func() {
completedAt := s.now()
rep.Status = status
rep.FileSize = fileSize
rep.ErrorMessage = strings.TrimSpace(errMessage)
rep.DurationSeconds = int(completedAt.Sub(rep.StartedAt).Seconds())
rep.CompletedAt = &completedAt
_ = s.replications.Update(ctx, rep)
s.metrics.ObserveReplication(status)
if status == model.ReplicationStatusFailed {
s.dispatchFailed(ctx, rep, errMessage)
}
}()
sourceProvider, err := s.resolveProvider(ctx, rep.SourceTargetID)
if err != nil {
errMessage = err.Error()
return
}
destProvider, err := s.resolveProvider(ctx, rep.DestTargetID)
if err != nil {
errMessage = err.Error()
return
}
if err := os.MkdirAll(s.tempDir, 0o755); err != nil {
errMessage = err.Error()
return
}
tempDir, err := os.MkdirTemp(s.tempDir, "replicate-*")
if err != nil {
errMessage = err.Error()
return
}
defer os.RemoveAll(tempDir)
reader, err := sourceProvider.Download(ctx, rep.StoragePath)
if err != nil {
errMessage = fmt.Sprintf("下载源对象失败: %v", err)
return
}
localPath := filepath.Join(tempDir, filepath.Base(rep.StoragePath))
if err := writeReaderToFile(localPath, reader); err != nil {
errMessage = fmt.Sprintf("写入临时文件失败: %v", err)
return
}
info, err := os.Stat(localPath)
if err != nil {
errMessage = err.Error()
return
}
fileSize = info.Size()
file, err := os.Open(localPath)
if err != nil {
errMessage = err.Error()
return
}
defer file.Close()
meta := map[string]string{
"replicationId": strconv.FormatUint(uint64(rep.ID), 10),
"sourceRecord": strconv.FormatUint(uint64(rep.BackupRecordID), 10),
}
if err := destProvider.Upload(ctx, rep.StoragePath, file, fileSize, meta); err != nil {
errMessage = fmt.Sprintf("上传到目标存储失败: %v", err)
return
}
rep.Checksum = "" // 可选:调用方可按需复算 SHA-256
status = model.ReplicationStatusSuccess
}
func (s *ReplicationService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) {
return resolveStorageProvider(ctx, s.targets, s.storageRegistry, s.cipher, targetID)
}
// validateClusterAccessible 拒绝跨节点 local_disk 源Master 无法拉取)。
func (s *ReplicationService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error {
return validateCrossNodeLocalDisk(ctx, s.nodeRepo, s.targets, record,
"REPLICATION_CROSS_NODE_LOCAL_DISK", "复制。请改用云存储作为主备份")
}
func (s *ReplicationService) dispatchFailed(ctx context.Context, rep *model.ReplicationRecord, message string) {
if s.eventDispatcher == nil || rep == nil {
return
}
title := "BackupX 备份复制失败"
body := fmt.Sprintf("备份记录:#%d\n源 → 目标:#%d → #%d\n错误%s", rep.BackupRecordID, rep.SourceTargetID, rep.DestTargetID, message)
fields := map[string]any{
"replicationId": rep.ID,
"backupRecordId": rep.BackupRecordID,
"taskId": rep.TaskID,
"sourceTargetId": rep.SourceTargetID,
"destTargetId": rep.DestTargetID,
"error": message,
}
_ = s.eventDispatcher.DispatchEvent(ctx, model.NotificationEventReplicationFailed, title, body, fields)
}
// List / Get / toSummary
func (s *ReplicationService) List(ctx context.Context, input ReplicationRecordListInput) ([]ReplicationRecordSummary, error) {
items, err := s.replications.List(ctx, repository.ReplicationRecordListOptions{
TaskID: input.TaskID, BackupRecordID: input.BackupRecordID, DestTargetID: input.DestTargetID,
Status: strings.TrimSpace(input.Status), DateFrom: input.DateFrom, DateTo: input.DateTo,
Limit: input.Limit, Offset: input.Offset,
})
if err != nil {
return nil, apperror.Internal("REPLICATION_LIST_FAILED", "无法获取复制记录", err)
}
result := make([]ReplicationRecordSummary, 0, len(items))
for i := range items {
item := items[i]
result = append(result, s.toSummary(&item, item.SourceTarget.Name, item.DestTarget.Name))
}
return result, nil
}
func (s *ReplicationService) Get(ctx context.Context, id uint) (*ReplicationRecordSummary, error) {
item, err := s.replications.FindByID(ctx, id)
if err != nil {
return nil, apperror.Internal("REPLICATION_GET_FAILED", "无法获取复制记录", err)
}
if item == nil {
return nil, apperror.New(404, "REPLICATION_NOT_FOUND", "复制记录不存在", nil)
}
summary := s.toSummary(item, item.SourceTarget.Name, item.DestTarget.Name)
return &summary, nil
}
func (s *ReplicationService) toSummary(rep *model.ReplicationRecord, sourceName, destName string) ReplicationRecordSummary {
return ReplicationRecordSummary{
ID: rep.ID, BackupRecordID: rep.BackupRecordID, TaskID: rep.TaskID,
SourceTargetID: rep.SourceTargetID, SourceTargetName: sourceName,
DestTargetID: rep.DestTargetID, DestTargetName: destName,
Status: rep.Status, StoragePath: rep.StoragePath, FileSize: rep.FileSize,
Checksum: rep.Checksum, ErrorMessage: rep.ErrorMessage, DurationSeconds: rep.DurationSeconds,
TriggeredBy: rep.TriggeredBy, StartedAt: rep.StartedAt, CompletedAt: rep.CompletedAt,
}
}
// parseUintCSV 解析逗号分隔的 uint 列表,跳过非法项。
func parseUintCSV(value string) []uint {
if strings.TrimSpace(value) == "" {
return nil
}
parts := strings.Split(value, ",")
out := make([]uint, 0, len(parts))
seen := map[uint]bool{}
for _, p := range parts {
trimmed := strings.TrimSpace(p)
if trimmed == "" {
continue
}
parsed, err := strconv.ParseUint(trimmed, 10, 32)
if err != nil {
continue
}
id := uint(parsed)
if seen[id] {
continue
}
seen[id] = true
out = append(out, id)
}
return out
}