Files
BackupX/server/internal/service/replication_service.go
Wu Qing 5021fe665e 功能: v2.1 可观测性与流控 (#47)
* 功能: v2.1 可观测性与流控 — Prometheus + 节点带宽 + 审计 Webhook

核心能力:
- Prometheus /metrics 端点:11 类指标(任务/存储/节点/SLA/验证/恢复/复制)
- 节点级带宽限速生效:model.Node.BandwidthLimit 覆盖全局默认
- 审计日志 Webhook 外输:HMAC-SHA256 签名,配合 SIEM 合规留档

实现:
- server/internal/metrics/  独立 Registry + 异步 Gauge Collector(30s)
- backup/restore/verify/replication 服务注入 metrics 钩子,nil 安全
- resolveProviderForNode() 按 task.NodeID 解析 BandwidthLimit
- AuditService.SetWebhook + 动态 settings 推送,无需重启

测试:
- metrics/registry_test.go: 注册/采集/nil safety/HTTP handler
- service/audit_service_webhook_test.go: 签名正确性/异步投递/禁用路径
- go test ./... 全部通过

* chore: 触发 CodeQL 扫描
2026-04-20 23:26:04 +08:00

384 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"backupx/server/internal/apperror"
"backupx/server/internal/metrics"
"backupx/server/internal/model"
"backupx/server/internal/repository"
"backupx/server/internal/storage"
"backupx/server/internal/storage/codec"
)
// ReplicationService 实现备份复制3-2-1 规则核心)。
// 语义:把源备份对象从 source storage target 镜像到 dest target保持 StoragePath。
//
// 触发路径:
// 1. 自动BackupExecutionService 备份成功后调用 TriggerAutoReplication
// 2. 手动:前端通过 BackupRecord 详情页触发 Start
//
// 执行模型:异步 + 节点无关(复制在 Master 本地 download → upload
// 跨节点 local_disk 场景不支持(与 Download/Delete 保护一致)。
type ReplicationService struct {
replications repository.ReplicationRecordRepository
records repository.BackupRecordRepository
targets repository.StorageTargetRepository
nodeRepo repository.NodeRepository
storageRegistry *storage.Registry
cipher *codec.ConfigCipher
eventDispatcher EventDispatcher
tempDir string
semaphore chan struct{}
async func(func())
now func() time.Time
metrics *metrics.Metrics
}
// SetMetrics 注入 Prometheus 采集器。
func (s *ReplicationService) SetMetrics(m *metrics.Metrics) {
s.metrics = m
}
func NewReplicationService(
replications repository.ReplicationRecordRepository,
records repository.BackupRecordRepository,
targets repository.StorageTargetRepository,
nodeRepo repository.NodeRepository,
storageRegistry *storage.Registry,
cipher *codec.ConfigCipher,
tempDir string,
maxConcurrent int,
) *ReplicationService {
if tempDir == "" {
tempDir = "/tmp/backupx-replicate"
}
if maxConcurrent <= 0 {
maxConcurrent = 2
}
return &ReplicationService{
replications: replications,
records: records,
targets: targets,
nodeRepo: nodeRepo,
storageRegistry: storageRegistry,
cipher: cipher,
tempDir: tempDir,
semaphore: make(chan struct{}, maxConcurrent),
async: func(job func()) { go job() },
now: func() time.Time { return time.Now().UTC() },
}
}
func (s *ReplicationService) SetEventDispatcher(dispatcher EventDispatcher) {
s.eventDispatcher = dispatcher
}
// ReplicationRecordSummary 列表项。
type ReplicationRecordSummary struct {
ID uint `json:"id"`
BackupRecordID uint `json:"backupRecordId"`
TaskID uint `json:"taskId"`
SourceTargetID uint `json:"sourceTargetId"`
SourceTargetName string `json:"sourceTargetName"`
DestTargetID uint `json:"destTargetId"`
DestTargetName string `json:"destTargetName"`
Status string `json:"status"`
StoragePath string `json:"storagePath"`
FileSize int64 `json:"fileSize"`
Checksum string `json:"checksum"`
ErrorMessage string `json:"errorMessage"`
DurationSeconds int `json:"durationSeconds"`
TriggeredBy string `json:"triggeredBy"`
StartedAt time.Time `json:"startedAt"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
}
type ReplicationRecordListInput struct {
TaskID *uint
BackupRecordID *uint
DestTargetID *uint
Status string
DateFrom *time.Time
DateTo *time.Time
Limit int
Offset int
}
// TriggerAutoReplication 备份成功钩子:根据 task.ReplicationTargetIDs 自动派发复制。
// best-effort单个目标失败不影响其他。
func (s *ReplicationService) TriggerAutoReplication(ctx context.Context, task *model.BackupTask, record *model.BackupRecord) {
if task == nil || record == nil {
return
}
destIDs := parseUintCSV(task.ReplicationTargetIDs)
if len(destIDs) == 0 {
return
}
// 跨节点 local_disk 场景保护Master 无法访问远程节点本地文件
if err := s.validateClusterAccessible(ctx, record); err != nil {
return
}
for _, destID := range destIDs {
if destID == record.StorageTargetID {
continue // 源与目标相同,跳过
}
_, _ = s.Start(ctx, record.ID, destID, "system")
}
}
// Start 开始一次复制。同步创建 ReplicationRecord → 异步执行。
func (s *ReplicationService) Start(ctx context.Context, backupRecordID, destTargetID uint, triggeredBy string) (*ReplicationRecordSummary, error) {
record, err := s.records.FindByID(ctx, backupRecordID)
if err != nil {
return nil, apperror.Internal("BACKUP_RECORD_GET_FAILED", "无法获取备份记录", err)
}
if record == nil {
return nil, apperror.New(404, "BACKUP_RECORD_NOT_FOUND", "备份记录不存在", nil)
}
if record.Status != model.BackupRecordStatusSuccess {
return nil, apperror.BadRequest("REPLICATION_SOURCE_INVALID", "只能复制成功的备份记录", nil)
}
if destTargetID == 0 || destTargetID == record.StorageTargetID {
return nil, apperror.BadRequest("REPLICATION_DEST_INVALID", "目标存储无效或与源相同", nil)
}
if err := s.validateClusterAccessible(ctx, record); err != nil {
return nil, err
}
dest, err := s.targets.FindByID(ctx, destTargetID)
if err != nil || dest == nil {
return nil, apperror.BadRequest("REPLICATION_DEST_INVALID", "目标存储不存在", err)
}
if !dest.Enabled {
return nil, apperror.BadRequest("REPLICATION_DEST_DISABLED", "目标存储已禁用", nil)
}
startedAt := s.now()
rep := &model.ReplicationRecord{
BackupRecordID: backupRecordID,
TaskID: record.TaskID,
SourceTargetID: record.StorageTargetID,
DestTargetID: destTargetID,
Status: model.ReplicationStatusRunning,
StoragePath: record.StoragePath,
TriggeredBy: strings.TrimSpace(triggeredBy),
StartedAt: startedAt,
}
if err := s.replications.Create(ctx, rep); err != nil {
return nil, apperror.Internal("REPLICATION_CREATE_FAILED", "无法创建复制记录", err)
}
s.async(func() {
s.executeReplication(context.Background(), rep.ID)
})
summary := s.toSummary(rep, "", dest.Name)
return &summary, nil
}
// executeReplication 实际执行:下载源对象到本地临时文件 → 上传到目标存储。
func (s *ReplicationService) executeReplication(ctx context.Context, repID uint) {
s.semaphore <- struct{}{}
defer func() { <-s.semaphore }()
rep, err := s.replications.FindByID(ctx, repID)
if err != nil || rep == nil {
return
}
status := model.ReplicationStatusFailed
errMessage := ""
fileSize := int64(0)
defer func() {
completedAt := s.now()
rep.Status = status
rep.FileSize = fileSize
rep.ErrorMessage = strings.TrimSpace(errMessage)
rep.DurationSeconds = int(completedAt.Sub(rep.StartedAt).Seconds())
rep.CompletedAt = &completedAt
_ = s.replications.Update(ctx, rep)
s.metrics.ObserveReplication(status)
if status == model.ReplicationStatusFailed {
s.dispatchFailed(ctx, rep, errMessage)
}
}()
sourceProvider, err := s.resolveProvider(ctx, rep.SourceTargetID)
if err != nil {
errMessage = err.Error()
return
}
destProvider, err := s.resolveProvider(ctx, rep.DestTargetID)
if err != nil {
errMessage = err.Error()
return
}
if err := os.MkdirAll(s.tempDir, 0o755); err != nil {
errMessage = err.Error()
return
}
tempDir, err := os.MkdirTemp(s.tempDir, "replicate-*")
if err != nil {
errMessage = err.Error()
return
}
defer os.RemoveAll(tempDir)
reader, err := sourceProvider.Download(ctx, rep.StoragePath)
if err != nil {
errMessage = fmt.Sprintf("下载源对象失败: %v", err)
return
}
localPath := filepath.Join(tempDir, filepath.Base(rep.StoragePath))
if err := writeReaderToFile(localPath, reader); err != nil {
errMessage = fmt.Sprintf("写入临时文件失败: %v", err)
return
}
info, err := os.Stat(localPath)
if err != nil {
errMessage = err.Error()
return
}
fileSize = info.Size()
file, err := os.Open(localPath)
if err != nil {
errMessage = err.Error()
return
}
defer file.Close()
meta := map[string]string{
"replicationId": strconv.FormatUint(uint64(rep.ID), 10),
"sourceRecord": strconv.FormatUint(uint64(rep.BackupRecordID), 10),
}
if err := destProvider.Upload(ctx, rep.StoragePath, file, fileSize, meta); err != nil {
errMessage = fmt.Sprintf("上传到目标存储失败: %v", err)
return
}
rep.Checksum = "" // 可选:调用方可按需复算 SHA-256
status = model.ReplicationStatusSuccess
}
func (s *ReplicationService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) {
target, err := s.targets.FindByID(ctx, targetID)
if err != nil {
return nil, apperror.Internal("STORAGE_TARGET_GET_FAILED", "无法获取存储目标", err)
}
if target == nil {
return nil, apperror.BadRequest("STORAGE_TARGET_INVALID", "存储目标不存在", nil)
}
configMap := map[string]any{}
if err := s.cipher.DecryptJSON(target.ConfigCiphertext, &configMap); err != nil {
return nil, apperror.Internal("STORAGE_TARGET_DECRYPT_FAILED", "无法解密存储配置", err)
}
return s.storageRegistry.Create(ctx, target.Type, configMap)
}
// validateClusterAccessible 拒绝跨节点 local_disk 源Master 无法拉取)
func (s *ReplicationService) validateClusterAccessible(ctx context.Context, record *model.BackupRecord) error {
if record == nil || record.NodeID == 0 || s.nodeRepo == nil {
return nil
}
node, err := s.nodeRepo.FindByID(ctx, record.NodeID)
if err != nil || node == nil || node.IsLocal {
return nil
}
target, err := s.targets.FindByID(ctx, record.StorageTargetID)
if err != nil || target == nil {
return nil
}
if strings.EqualFold(target.Type, "local_disk") {
return apperror.BadRequest("REPLICATION_CROSS_NODE_LOCAL_DISK",
fmt.Sprintf("备份位于节点 %s 的本地磁盘local_diskMaster 无法跨节点复制。请改用云存储作为主备份。", node.Name),
nil)
}
return nil
}
func (s *ReplicationService) dispatchFailed(ctx context.Context, rep *model.ReplicationRecord, message string) {
if s.eventDispatcher == nil || rep == nil {
return
}
title := "BackupX 备份复制失败"
body := fmt.Sprintf("备份记录:#%d\n源 → 目标:#%d → #%d\n错误%s", rep.BackupRecordID, rep.SourceTargetID, rep.DestTargetID, message)
fields := map[string]any{
"replicationId": rep.ID,
"backupRecordId": rep.BackupRecordID,
"taskId": rep.TaskID,
"sourceTargetId": rep.SourceTargetID,
"destTargetId": rep.DestTargetID,
"error": message,
}
_ = s.eventDispatcher.DispatchEvent(ctx, model.NotificationEventReplicationFailed, title, body, fields)
}
// List / Get / toSummary
func (s *ReplicationService) List(ctx context.Context, input ReplicationRecordListInput) ([]ReplicationRecordSummary, error) {
items, err := s.replications.List(ctx, repository.ReplicationRecordListOptions{
TaskID: input.TaskID, BackupRecordID: input.BackupRecordID, DestTargetID: input.DestTargetID,
Status: strings.TrimSpace(input.Status), DateFrom: input.DateFrom, DateTo: input.DateTo,
Limit: input.Limit, Offset: input.Offset,
})
if err != nil {
return nil, apperror.Internal("REPLICATION_LIST_FAILED", "无法获取复制记录", err)
}
result := make([]ReplicationRecordSummary, 0, len(items))
for i := range items {
item := items[i]
result = append(result, s.toSummary(&item, item.SourceTarget.Name, item.DestTarget.Name))
}
return result, nil
}
func (s *ReplicationService) Get(ctx context.Context, id uint) (*ReplicationRecordSummary, error) {
item, err := s.replications.FindByID(ctx, id)
if err != nil {
return nil, apperror.Internal("REPLICATION_GET_FAILED", "无法获取复制记录", err)
}
if item == nil {
return nil, apperror.New(404, "REPLICATION_NOT_FOUND", "复制记录不存在", nil)
}
summary := s.toSummary(item, item.SourceTarget.Name, item.DestTarget.Name)
return &summary, nil
}
func (s *ReplicationService) toSummary(rep *model.ReplicationRecord, sourceName, destName string) ReplicationRecordSummary {
return ReplicationRecordSummary{
ID: rep.ID, BackupRecordID: rep.BackupRecordID, TaskID: rep.TaskID,
SourceTargetID: rep.SourceTargetID, SourceTargetName: sourceName,
DestTargetID: rep.DestTargetID, DestTargetName: destName,
Status: rep.Status, StoragePath: rep.StoragePath, FileSize: rep.FileSize,
Checksum: rep.Checksum, ErrorMessage: rep.ErrorMessage, DurationSeconds: rep.DurationSeconds,
TriggeredBy: rep.TriggeredBy, StartedAt: rep.StartedAt, CompletedAt: rep.CompletedAt,
}
}
// parseUintCSV 解析逗号分隔的 uint 列表,跳过非法项。
func parseUintCSV(value string) []uint {
if strings.TrimSpace(value) == "" {
return nil
}
parts := strings.Split(value, ",")
out := make([]uint, 0, len(parts))
seen := map[uint]bool{}
for _, p := range parts {
trimmed := strings.TrimSpace(p)
if trimmed == "" {
continue
}
parsed, err := strconv.ParseUint(trimmed, 10, 32)
if err != nil {
continue
}
id := uint(parsed)
if seen[id] {
continue
}
seen[id] = true
out = append(out, id)
}
return out
}