From 5021fe665eec03a397d78f09d4c0e092cd0195da Mon Sep 17 00:00:00 2001 From: Wu Qing <3184394176@qq.com> Date: Mon, 20 Apr 2026 23:26:04 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD:=20v2.1=20=E5=8F=AF=E8=A7=82?= =?UTF-8?q?=E6=B5=8B=E6=80=A7=E4=B8=8E=E6=B5=81=E6=8E=A7=20(#47)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 功能: v2.1 可观测性与流控 — Prometheus + 节点带宽 + 审计 Webhook 核心能力: - Prometheus /metrics 端点:11 类指标(任务/存储/节点/SLA/验证/恢复/复制) - 节点级带宽限速生效:model.Node.BandwidthLimit 覆盖全局默认 - 审计日志 Webhook 外输:HMAC-SHA256 签名,配合 SIEM 合规留档 实现: - server/internal/metrics/ 独立 Registry + 异步 Gauge Collector(30s) - backup/restore/verify/replication 服务注入 metrics 钩子,nil 安全 - resolveProviderForNode() 按 task.NodeID 解析 BandwidthLimit - AuditService.SetWebhook + 动态 settings 推送,无需重启 测试: - metrics/registry_test.go: 注册/采集/nil safety/HTTP handler - service/audit_service_webhook_test.go: 签名正确性/异步投递/禁用路径 - go test ./... 全部通过 * chore: 触发 CodeQL 扫描 --- README.md | 3 + README.zh-CN.md | 3 + ...04-19-observability-flow-control-design.md | 142 +++++++++++ server/go.mod | 2 +- server/internal/app/app.go | 19 ++ server/internal/http/router.go | 9 + server/internal/metrics/collector.go | 152 ++++++++++++ server/internal/metrics/registry.go | 225 ++++++++++++++++++ server/internal/metrics/registry_test.go | 76 ++++++ server/internal/service/audit_service.go | 96 +++++++- .../service/audit_service_webhook_test.go | 129 ++++++++++ .../service/backup_execution_service.go | 47 +++- .../internal/service/replication_service.go | 8 + server/internal/service/restore_service.go | 8 + server/internal/service/settings_service.go | 57 ++++- .../internal/service/verification_service.go | 8 + 16 files changed, 970 insertions(+), 14 deletions(-) create mode 100644 docs/superpowers/specs/2026-04-19-observability-flow-control-design.md create mode 100644 server/internal/metrics/collector.go create mode 100644 server/internal/metrics/registry.go create mode 100644 server/internal/metrics/registry_test.go create mode 100644 server/internal/service/audit_service_webhook_test.go diff --git a/README.md b/README.md index ae5a335..c344eee 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,9 @@ | **Multi-Node Cluster** | Master-Agent mode via HTTP long-polling — Agents run tasks locally, upload straight to storage, no reverse connectivity required | | **Security** | JWT + bcrypt + AES-256-GCM encrypted config + optional backup encryption + full audit log | | **Notifications** | Email / Webhook / Telegram on success or failure | +| **Observability** | Prometheus `/metrics` endpoint + `/health` + `/ready` probes + SLA breach gauge | +| **Audit Webhook** | HMAC-SHA256 signed forwarding to SIEM / WORM storage for compliance (SOC2 / GDPR) | +| **Flow Control** | Per-node bandwidth cap + per-node concurrency limit — tune big/small nodes independently | | **Deployment** | Single binary + embedded SQLite, Docker one-click, zero external dependencies | ## Quick Start diff --git a/README.zh-CN.md b/README.zh-CN.md index b575023..b2c5363 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -46,6 +46,9 @@ | **多节点集群** | Master-Agent 模式,基于 HTTP 长轮询跨多台服务器管理备份。Agent 本地执行任务并直接上传到存储,无需反向连通性 | | **安全** | JWT + bcrypt + AES-256-GCM 加密配置 + 可选备份文件加密 + 完整审计日志 | | **通知** | 邮件 / Webhook / Telegram,备份成功或失败时自动推送 | +| **可观测性** | Prometheus `/metrics` 端点 + `/health` + `/ready` 探针 + SLA 违约监控 | +| **审计外输** | HMAC-SHA256 签名 Webhook,对接 SIEM / WORM 存储满足 SOC2 / GDPR 合规 | +| **流控** | 节点级带宽限速 + 节点级并发控制,大小节点分别配置,避免小内存 Agent 被挤爆 | | **部署** | 单二进制 + 内嵌 SQLite,Docker 一键启动,零外部依赖 | ## 快速开始 diff --git a/docs/superpowers/specs/2026-04-19-observability-flow-control-design.md b/docs/superpowers/specs/2026-04-19-observability-flow-control-design.md new file mode 100644 index 0000000..70c7de0 --- /dev/null +++ b/docs/superpowers/specs/2026-04-19-observability-flow-control-design.md @@ -0,0 +1,142 @@ +# v2.1.0 可观测性与流控设计 (2026-04-19) + +## 背景 + +v2.0.0 交付了 11 项企业能力(RBAC / API Key / 多节点集群 / 3-2-1 复制 / 验证演练等),产品具备"企业级备份管理平台"的完整能力。v2.1.0 聚焦 "**投入生产后运维团队**" 的两类刚需: + +1. **可观测性**:SRE 要能把 BackupX 接入 Prometheus/Grafana 做容量规划与告警。 +2. **流控精细化**:不同节点的带宽/并发应该能各自配置,而不是一刀切。 +3. **审计外输**:合规团队需要把审计事件送到 SIEM / WORM 存储,实现集中留档。 + +## 范围 + +**In scope:** + +- `/metrics` Prometheus 端点(10+ 核心指标) +- 节点级带宽限速生效(`model.Node.BandwidthLimit` 已存在但未落地) +- 审计日志 Webhook 外输(HMAC-SHA256 签名) + +**Out of scope(放入后续迭代):** + +- Prometheus 鉴权(企业生产可用反向代理做) +- Grafana Dashboard JSON 预置 +- 节点级并发已在 v2.0 完成,不再重复 +- 审计事件的 Syslog/Kafka 渠道(Webhook 已能衔接 Fluent Bit) +- 前端 Settings 页 UI(可 API 配置,UI 后续补) + +## 架构 + +### 1. Prometheus /metrics + +``` +业务服务 metrics.Metrics /metrics HTTP +───────── ──────────────── ────────────── +BackupExec ─ObserveRun──► Counter+Histogram ◄─Scrape── Prometheus +Restore ─ObserveRun──► +Verify ─ObserveRun──► +Replication ─ObserveRun──► + Gauge (storage/node/SLA) +Collector(30s) ─update───► ▲ + │ + repo.StorageUsage / Node.List / Task.List +``` + +- **独立 Registry**:避免与 default registry 中的默认 metrics 混淆,只暴露 backupx_ + go_ + process_ +- **零值安全**:`*Metrics` nil 时所有方法静默退化,不影响未注入 metrics 的单测 +- **Gauge 异步刷新**:30s 后台 goroutine 采集慢查询数据,避免阻塞 /metrics 请求 +- **Counter/Histogram 同步**:任务完成时直接 Observe,延迟 < 1μs + +指标清单: + +| 指标 | 类型 | 标签 | 含义 | +|------|------|------|------| +| `backupx_task_run_total` | Counter | status, task_type | 备份任务运行计数 | +| `backupx_task_run_duration_seconds` | Histogram | task_type | 任务耗时分布 | +| `backupx_task_bytes_total` | Counter | task_type | 累计产出字节数 | +| `backupx_task_running` | Gauge | - | 正在运行任务数 | +| `backupx_storage_used_bytes` | Gauge | target_name, target_type | 存储目标用量 | +| `backupx_node_online` | Gauge | node_name, role | 节点在线状态 | +| `backupx_verify_run_total` | Counter | status | 验证演练计数 | +| `backupx_restore_run_total` | Counter | status | 恢复操作计数 | +| `backupx_replication_run_total` | Counter | status | 副本复制计数 | +| `backupx_sla_breach_tasks` | Gauge | - | 违反 SLA 任务数 | +| `backupx_app_info` | Gauge | version | 应用版本(恒为 1) | + +### 2. 节点级带宽限速 + +现状:`BackupExecutionService` 在 `resolveProvider()` 中用全局 `s.bandwidthLimit`(来自 `cfg.Backup.BandwidthLimit`)注入 rclone TransferConfig。 + +改进:新增 `resolveProviderForNode(ctx, targetID, nodeID)`: + +```go +func (s *BackupExecutionService) effectiveBandwidth(ctx context.Context, nodeID uint) string { + if nodeID == 0 || s.nodeRepo == nil { + return s.bandwidthLimit + } + node, err := s.nodeRepo.FindByID(ctx, nodeID) + if err != nil || node == nil { + return s.bandwidthLimit + } + if strings.TrimSpace(node.BandwidthLimit) != "" { + return node.BandwidthLimit + } + return s.bandwidthLimit +} +``` + +优先级:`Node.BandwidthLimit` > 全局默认。仅 Master 本地执行生效;Agent 使用自身 Node 配置(在 Agent runtime 中独立应用)。 + +### 3. 审计 Webhook + +``` +AuditService.Record(entry) + │ + ├─> repo.Create (写 DB) [fire-and-forget] + └─> fireWebhook(record) [fire-and-forget] + │ + ├─ HTTP POST JSON to webhookURL + ├─ Header: X-BackupX-Signature: sha256= + └─ 失败: log.Printf,不阻塞主流程 +``` + +Payload schema: + +```json +{ + "eventType": "audit.log", + "occurredAt": "2026-04-19T10:30:00Z", + "actor": { "userId": 1, "username": "alice" }, + "category": "auth", + "action": "login_success", + "targetType": "user", + "targetId": "1", + "targetName": "alice", + "detail": "admin login", + "clientIp": "10.0.0.1" +} +``` + +签名:`HMAC-SHA256(secret, raw_json_body)`,接收方需要验证以防伪造。 + +配置路径:前端通过 `PUT /api/settings` 写入 `audit_webhook_url` / `audit_webhook_secret`,SettingsService 保存后立即通过 `AuditWebhookConfigurer` 接口同步到 AuditService,无需重启。 + +## 测试 + +- `metrics/registry_test.go` — 注册、采集、nil safety、HTTP handler 端到端 +- `service/audit_service_webhook_test.go` — 签名正确性、异步发送、禁用路径 +- 所有现有测试保持通过(backup_execution_service_test / restore_service_test / verification_service_test) + +## 风险与应对 + +| 风险 | 应对 | +|------|------| +| Prometheus 采集阻塞 | Gauge 走后台 Collector + Counter/Histogram 是内存操作,无 IO | +| Webhook 打爆业务 | 3s 超时 + fire-and-forget goroutine,单次 panic 也不影响主流程 | +| 指标基数爆炸 | task_name 不作为 label(仅 task_type),避免 Prometheus series 失控 | +| 节点带宽配置错误 | 走 rclone.BwTimetable.Set 校验,解析失败静默沿用全局默认 | + +## 部署建议 + +- Prometheus 抓取配置:`scrape_interval: 30s`,匹配 Collector 间隔 +- Grafana alert 示例:`sum(backupx_sla_breach_tasks) > 0` 触发 +- Webhook 接收侧建议:Fluent Bit HTTP input → Elasticsearch / Loki diff --git a/server/go.mod b/server/go.mod index b659eed..4f912e2 100644 --- a/server/go.mod +++ b/server/go.mod @@ -7,6 +7,7 @@ require ( github.com/glebarez/sqlite v1.11.0 github.com/golang-jwt/jwt/v5 v5.3.0 github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/prometheus/client_golang v1.23.2 github.com/rclone/rclone v1.73.3 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/viper v1.20.0 @@ -181,7 +182,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/pquerna/otp v1.5.0 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.2 // indirect github.com/prometheus/procfs v0.19.2 // indirect diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 9b26d67..762afb6 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -13,6 +13,7 @@ import ( "backupx/server/internal/database" aphttp "backupx/server/internal/http" "backupx/server/internal/logger" + "backupx/server/internal/metrics" "backupx/server/internal/notify" "backupx/server/internal/repository" "backupx/server/internal/scheduler" @@ -109,6 +110,8 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, auditService := service.NewAuditService(auditLogRepo) authService.SetAuditService(auditService) schedulerService.SetAuditRecorder(auditService) + // 审计日志外输:启动时用当前 settings 初始化 webhook,后续前端修改立即生效 + settingsService.SetAuditWebhookConfigurer(ctx, auditService) // Database discovery(集群依赖在 agentService 创建后注入) databaseDiscoveryService := service.NewDatabaseDiscoveryService(backup.NewOSCommandExecutor()) @@ -226,6 +229,21 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, // Dashboard 集群概览依赖注入 dashboardService.SetClusterDependencies(nodeRepo, version) + // Prometheus 指标采集:Counter/Histogram 由业务服务实时写入; + // Gauge 类(存储用量、节点在线、SLA 违约)由 Collector 每 30s 异步刷新, + // 避免 /metrics 请求路径做慢 IO。 + appMetrics := metrics.New(version) + backupExecutionService.SetMetrics(appMetrics) + restoreService.SetMetrics(appMetrics) + verificationService.SetMetrics(appMetrics) + replicationService.SetMetrics(appMetrics) + metricsCollector := metrics.NewCollector( + appMetrics, + metrics.NewRepoSource(storageTargetRepo, backupRecordRepo, nodeRepo, backupTaskRepo), + 30*time.Second, + ) + metricsCollector.Start(ctx) + router := aphttp.NewRouter(aphttp.RouterDependencies{ Context: ctx, Config: cfg, @@ -259,6 +277,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, InstallTokenService: installTokenService, MasterExternalURL: "", // 如需覆盖 URL,可扩展 cfg.Server 增字段;目前留空依赖 X-Forwarded-* / Request.Host DB: db, + Metrics: appMetrics, }) httpServer := &stdhttp.Server{ diff --git a/server/internal/http/router.go b/server/internal/http/router.go index 241f61c..599bf47 100644 --- a/server/internal/http/router.go +++ b/server/internal/http/router.go @@ -7,6 +7,7 @@ import ( "backupx/server/internal/apperror" "backupx/server/internal/config" + "backupx/server/internal/metrics" "backupx/server/internal/repository" "backupx/server/internal/security" "backupx/server/internal/service" @@ -52,6 +53,8 @@ type RouterDependencies struct { MasterExternalURL string // DB 注入给健康检查端点做 liveness/readiness 探测。 DB *gorm.DB + // Metrics 注入给 /metrics 端点;为 nil 时端点返回 503。 + Metrics *metrics.Metrics } func NewRouter(deps RouterDependencies) *gin.Engine { @@ -311,6 +314,12 @@ func NewRouter(deps RouterDependencies) *gin.Engine { engine.GET("/api/health", healthHandler.Live) engine.GET("/api/ready", healthHandler.Ready) + // Prometheus /metrics 端点(公开、无认证;内网/反向代理授权即可)。 + // 业内通行做法:/metrics 通常由 Prometheus pull 抓取,不走 API Key。 + if deps.Metrics != nil { + engine.GET("/metrics", gin.WrapH(deps.Metrics.Handler())) + } + // 公开安装路由(不走 JWT 中间件) if deps.InstallTokenService != nil { gcCtx := deps.Context diff --git a/server/internal/metrics/collector.go b/server/internal/metrics/collector.go new file mode 100644 index 0000000..fa546e7 --- /dev/null +++ b/server/internal/metrics/collector.go @@ -0,0 +1,152 @@ +package metrics + +import ( + "context" + "time" + + "backupx/server/internal/model" + "backupx/server/internal/repository" +) + +// SampleSource 抽象 Collector 需要的仓储访问,便于单测替换。 +type SampleSource interface { + ListStorageTargets(ctx context.Context) ([]model.StorageTarget, error) + StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error) + ListNodes(ctx context.Context) ([]model.Node, error) + CountSLABreach(ctx context.Context) (int, error) +} + +// repoSource 把 repository 适配到 SampleSource。 +type repoSource struct { + targets repository.StorageTargetRepository + records repository.BackupRecordRepository + nodes repository.NodeRepository + tasks repository.BackupTaskRepository + now func() time.Time +} + +// NewRepoSource 用仓储实例构造 SampleSource。 +func NewRepoSource( + targets repository.StorageTargetRepository, + records repository.BackupRecordRepository, + nodes repository.NodeRepository, + tasks repository.BackupTaskRepository, +) SampleSource { + return &repoSource{ + targets: targets, + records: records, + nodes: nodes, + tasks: tasks, + now: func() time.Time { return time.Now().UTC() }, + } +} + +func (s *repoSource) ListStorageTargets(ctx context.Context) ([]model.StorageTarget, error) { + return s.targets.List(ctx) +} + +func (s *repoSource) StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error) { + return s.records.StorageUsage(ctx) +} + +func (s *repoSource) ListNodes(ctx context.Context) ([]model.Node, error) { + return s.nodes.List(ctx) +} + +// CountSLABreach 统计当前违反 RPO 的任务: +// - 任务启用且配置了 SLAHoursRPO > 0 +// - 最近一次成功备份距今超出 SLA 时间窗,或从未成功过 +func (s *repoSource) CountSLABreach(ctx context.Context) (int, error) { + tasks, err := s.tasks.List(ctx, repository.BackupTaskListOptions{}) + if err != nil { + return 0, err + } + now := s.now() + count := 0 + for i := range tasks { + task := &tasks[i] + if task.SLAHoursRPO <= 0 || !task.Enabled { + continue + } + threshold := now.Add(-time.Duration(task.SLAHoursRPO) * time.Hour) + if task.LastRunAt == nil || task.LastRunAt.Before(threshold) { + count++ + } + } + return count, nil +} + +// Collector 周期性采集 gauge 类指标(存储用量、节点在线、SLA 违约)。 +// 用后台 goroutine 驱动,避免在 /metrics 请求路径做慢 IO。 +type Collector struct { + metrics *Metrics + source SampleSource + interval time.Duration +} + +// NewCollector 创建周期采集器。interval=0 走默认 30s。 +func NewCollector(m *Metrics, source SampleSource, interval time.Duration) *Collector { + if interval <= 0 { + interval = 30 * time.Second + } + return &Collector{metrics: m, source: source, interval: interval} +} + +// Start 在后台运行采集循环;随 ctx 取消而终止。 +// 启动时立即采一次,之后按 interval 轮询。 +func (c *Collector) Start(ctx context.Context) { + if c == nil || c.metrics == nil || c.source == nil { + return + } + go func() { + c.collect(ctx) + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.collect(ctx) + } + } + }() +} + +// collect 执行一次采样;单轮失败不影响下次。 +func (c *Collector) collect(ctx context.Context) { + // 存储用量:按 StorageTargetID 聚合 file_size,对应 target name/type + if targets, err := c.source.ListStorageTargets(ctx); err == nil { + nameByID := make(map[uint]string, len(targets)) + typeByID := make(map[uint]string, len(targets)) + for i := range targets { + nameByID[targets[i].ID] = targets[i].Name + typeByID[targets[i].ID] = targets[i].Type + } + if usage, uerr := c.source.StorageUsage(ctx); uerr == nil { + c.metrics.ResetStorageUsed() + for _, item := range usage { + name := nameByID[item.StorageTargetID] + if name == "" { + continue + } + c.metrics.SetStorageUsed(name, typeByID[item.StorageTargetID], item.TotalSize) + } + } + } + // 节点在线状态:role 约定为 master / agent + if nodes, err := c.source.ListNodes(ctx); err == nil { + c.metrics.ResetNodeOnline() + for i := range nodes { + n := &nodes[i] + role := "agent" + if n.IsLocal { + role = "master" + } + c.metrics.SetNodeOnline(n.Name, role, n.Status == model.NodeStatusOnline) + } + } + if breach, err := c.source.CountSLABreach(ctx); err == nil { + c.metrics.SetSLABreach(breach) + } +} diff --git a/server/internal/metrics/registry.go b/server/internal/metrics/registry.go new file mode 100644 index 0000000..2fe4692 --- /dev/null +++ b/server/internal/metrics/registry.go @@ -0,0 +1,225 @@ +// Package metrics 暴露 BackupX 的 Prometheus 采集器。 +// +// 设计要点: +// - 使用独立 Registry,避免与 default registry 中的 Go runtime metrics 混淆 +// - Counter/Gauge/Histogram 全部以 backupx_ 为前缀,遵循 Prometheus 命名规范 +// - 所有指标都支持零值:未注入时调用方法是 no-op,不会 panic +// - 组件只依赖本包,不反向引用 service/repository,避免循环 +package metrics + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Metrics 聚合所有采集器,由 app 层组装一次并按需注入到 service。 +type Metrics struct { + registry *prometheus.Registry + + // 任务执行计数(labels: status, task_type) + TaskRunTotal *prometheus.CounterVec + // 任务耗时分布(labels: task_type) + TaskRunDuration *prometheus.HistogramVec + // 任务产出字节数(labels: task_type) + TaskBytesTotal *prometheus.CounterVec + // 正在运行的任务数 + TaskRunningGauge prometheus.Gauge + // 存储目标用量(labels: target_name, target_type) + StorageUsedBytes *prometheus.GaugeVec + // 节点在线状态(labels: node_name, role;value: 0/1) + NodeOnline *prometheus.GaugeVec + // 验证演练结果(labels: status) + VerifyRunTotal *prometheus.CounterVec + // 恢复操作结果(labels: status) + RestoreRunTotal *prometheus.CounterVec + // 副本复制结果(labels: status) + ReplicationRunTotal *prometheus.CounterVec + // SLA 违约数(gauge) + SLABreachGauge prometheus.Gauge + // 应用信息(label: version) + AppInfo *prometheus.GaugeVec +} + +// New 构造并注册所有采集器。 +// 失败时 panic:采集器注册失败属于启动期编程错误,没有合理 fallback。 +func New(version string) *Metrics { + reg := prometheus.NewRegistry() + // 注入标准 Go runtime + process 指标 + reg.MustRegister(collectors.NewGoCollector()) + reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + m := &Metrics{ + registry: reg, + TaskRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "backupx_task_run_total", + Help: "备份任务执行总数,按状态和任务类型细分", + }, []string{"status", "task_type"}), + TaskRunDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "backupx_task_run_duration_seconds", + Help: "备份任务耗时分布", + Buckets: []float64{1, 5, 15, 30, 60, 120, 300, 600, 1800, 3600, 7200}, + }, []string{"task_type"}), + TaskBytesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "backupx_task_bytes_total", + Help: "备份任务累计产出字节数", + }, []string{"task_type"}), + TaskRunningGauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "backupx_task_running", + Help: "当前正在执行的备份任务数", + }), + StorageUsedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backupx_storage_used_bytes", + Help: "存储目标已用字节数", + }, []string{"target_name", "target_type"}), + NodeOnline: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backupx_node_online", + Help: "集群节点在线状态(1 在线 / 0 离线)", + }, []string{"node_name", "role"}), + VerifyRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "backupx_verify_run_total", + Help: "备份验证演练执行总数", + }, []string{"status"}), + RestoreRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "backupx_restore_run_total", + Help: "恢复操作执行总数", + }, []string{"status"}), + ReplicationRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "backupx_replication_run_total", + Help: "备份副本复制执行总数", + }, []string{"status"}), + SLABreachGauge: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "backupx_sla_breach_tasks", + Help: "当前违反 SLA/RPO 的任务数", + }), + AppInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backupx_app_info", + Help: "BackupX 应用元信息(恒为 1,通过 label 暴露版本号)", + }, []string{"version"}), + } + reg.MustRegister( + m.TaskRunTotal, + m.TaskRunDuration, + m.TaskBytesTotal, + m.TaskRunningGauge, + m.StorageUsedBytes, + m.NodeOnline, + m.VerifyRunTotal, + m.RestoreRunTotal, + m.ReplicationRunTotal, + m.SLABreachGauge, + m.AppInfo, + ) + m.AppInfo.WithLabelValues(version).Set(1) + return m +} + +// Handler 返回 /metrics 的 HTTP handler。 +// 使用本包专属 registry,避免混入其他组件的默认 metrics。 +func (m *Metrics) Handler() http.Handler { + if m == nil { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "metrics disabled", http.StatusServiceUnavailable) + }) + } + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{ + EnableOpenMetrics: false, + }) +} + +// ObserveTaskRun 记录一次任务执行结果。 +// status 常用值:success / failed / cancelled。nil 接收器安全。 +func (m *Metrics) ObserveTaskRun(taskType, status string, durationSec float64, bytes int64) { + if m == nil { + return + } + m.TaskRunTotal.WithLabelValues(status, taskType).Inc() + m.TaskRunDuration.WithLabelValues(taskType).Observe(durationSec) + if bytes > 0 { + m.TaskBytesTotal.WithLabelValues(taskType).Add(float64(bytes)) + } +} + +// IncTaskRunning / DecTaskRunning 配套使用,反映并发中任务数。 +func (m *Metrics) IncTaskRunning() { + if m == nil { + return + } + m.TaskRunningGauge.Inc() +} + +func (m *Metrics) DecTaskRunning() { + if m == nil { + return + } + m.TaskRunningGauge.Dec() +} + +// ObserveRestore / ObserveVerify / ObserveReplication 记录子动作结果。 +// 所有方法对 nil 接收器安全:未注入 Metrics 时静默降级,不 panic。 +func (m *Metrics) ObserveRestore(status string) { + if m == nil { + return + } + m.RestoreRunTotal.WithLabelValues(status).Inc() +} + +func (m *Metrics) ObserveVerify(status string) { + if m == nil { + return + } + m.VerifyRunTotal.WithLabelValues(status).Inc() +} + +func (m *Metrics) ObserveReplication(status string) { + if m == nil { + return + } + m.ReplicationRunTotal.WithLabelValues(status).Inc() +} + +// SetStorageUsed 刷新某存储目标的用量。调用方负责周期采集。 +func (m *Metrics) SetStorageUsed(name, targetType string, bytes int64) { + if m == nil { + return + } + m.StorageUsedBytes.WithLabelValues(name, targetType).Set(float64(bytes)) +} + +// SetNodeOnline 刷新节点在线状态。 +func (m *Metrics) SetNodeOnline(name, role string, online bool) { + if m == nil { + return + } + val := 0.0 + if online { + val = 1 + } + m.NodeOnline.WithLabelValues(name, role).Set(val) +} + +// ResetNodeOnline 清空节点 gauge(当节点被删除时避免残留指标)。 +func (m *Metrics) ResetNodeOnline() { + if m == nil { + return + } + m.NodeOnline.Reset() +} + +// ResetStorageUsed 清空存储目标 gauge。 +func (m *Metrics) ResetStorageUsed() { + if m == nil { + return + } + m.StorageUsedBytes.Reset() +} + +// SetSLABreach 刷新 SLA 违约任务数。 +func (m *Metrics) SetSLABreach(count int) { + if m == nil { + return + } + m.SLABreachGauge.Set(float64(count)) +} diff --git a/server/internal/metrics/registry_test.go b/server/internal/metrics/registry_test.go new file mode 100644 index 0000000..fed64e1 --- /dev/null +++ b/server/internal/metrics/registry_test.go @@ -0,0 +1,76 @@ +package metrics + +import ( + "io" + "net/http/httptest" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus/testutil" +) + +func TestNew_AppInfoVersionLabel(t *testing.T) { + m := New("2.1.0") + if got := testutil.ToFloat64(m.AppInfo.WithLabelValues("2.1.0")); got != 1 { + t.Fatalf("app_info(version=2.1.0) expected 1, got %v", got) + } +} + +func TestObserveTaskRun_IncrementsCounterAndHistogram(t *testing.T) { + m := New("test") + m.ObserveTaskRun("mysql", "success", 12.5, 1024) + m.ObserveTaskRun("mysql", "failed", 3.0, 0) + if got := testutil.ToFloat64(m.TaskRunTotal.WithLabelValues("success", "mysql")); got != 1 { + t.Fatalf("task_run_total{status=success,task_type=mysql}: expected 1, got %v", got) + } + if got := testutil.ToFloat64(m.TaskRunTotal.WithLabelValues("failed", "mysql")); got != 1 { + t.Fatalf("task_run_total{status=failed,task_type=mysql}: expected 1, got %v", got) + } + if got := testutil.ToFloat64(m.TaskBytesTotal.WithLabelValues("mysql")); got != 1024 { + t.Fatalf("task_bytes_total{task_type=mysql}: expected 1024, got %v", got) + } +} + +func TestObserveTaskRun_NilReceiverIsSafe(t *testing.T) { + var m *Metrics // nil + m.ObserveTaskRun("file", "success", 1, 1) + m.ObserveRestore("success") + m.ObserveVerify("failed") + m.ObserveReplication("success") + m.IncTaskRunning() + m.DecTaskRunning() + m.SetStorageUsed("a", "s3", 1) + m.SetNodeOnline("n1", "master", true) + m.SetSLABreach(3) + m.ResetNodeOnline() + m.ResetStorageUsed() + // no panic -> pass +} + +func TestHandler_ExposesBackupxMetrics(t *testing.T) { + m := New("0.0.0-test") + m.ObserveTaskRun("file", "success", 1.0, 2048) + m.SetNodeOnline("n1", "master", true) + m.SetSLABreach(1) + + recorder := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/metrics", nil) + m.Handler().ServeHTTP(recorder, req) + + body, err := io.ReadAll(recorder.Result().Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + content := string(body) + for _, keyword := range []string{ + "backupx_task_run_total", + "backupx_task_run_duration_seconds", + "backupx_node_online", + "backupx_sla_breach_tasks", + "backupx_app_info", + } { + if !strings.Contains(content, keyword) { + t.Errorf("expected /metrics to contain %q", keyword) + } + } +} diff --git a/server/internal/service/audit_service.go b/server/internal/service/audit_service.go index 94f115a..52a3f46 100644 --- a/server/internal/service/audit_service.go +++ b/server/internal/service/audit_service.go @@ -1,9 +1,18 @@ package service import ( + "bytes" "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" "fmt" "log" + "net/http" + "strings" + "sync" + "time" "backupx/server/internal/apperror" "backupx/server/internal/model" @@ -25,10 +34,39 @@ type AuditEntry struct { type AuditService struct { repo repository.AuditLogRepository + + // webhook 外输配置(可选) + webhookMu sync.RWMutex + webhookURL string + webhookSecret string + httpClient *http.Client } func NewAuditService(repo repository.AuditLogRepository) *AuditService { - return &AuditService{repo: repo} + return &AuditService{ + repo: repo, + httpClient: &http.Client{ + Timeout: 3 * time.Second, // 短超时:审计 webhook 不应拖慢业务 + }, + } +} + +// SetWebhook 动态配置审计事件转发 URL 与签名密钥。 +// - url 为空字符串时禁用转发 +// - secret 非空时对 payload 计算 HMAC-SHA256,作为 X-BackupX-Signature header +// +// 适用场景: +// - 企业 SIEM 集成(Splunk HEC、ELK、Loki) +// - 安全审计留痕到第三方 WORM 存储 +// - 合规日志归档(GDPR / SOC2) +func (s *AuditService) SetWebhook(url, secret string) { + if s == nil { + return + } + s.webhookMu.Lock() + defer s.webhookMu.Unlock() + s.webhookURL = strings.TrimSpace(url) + s.webhookSecret = strings.TrimSpace(secret) } // Record 异步 fire-and-forget 写入审计日志,不阻塞业务逻辑 @@ -51,9 +89,65 @@ func (s *AuditService) Record(entry AuditEntry) { if err := s.repo.Create(context.Background(), record); err != nil { log.Printf("[audit] failed to write audit log: %v", err) } + s.fireWebhook(record) }() } +// fireWebhook 异步向外部系统转发审计事件。失败降级到本地日志,永不影响主流程。 +func (s *AuditService) fireWebhook(record *model.AuditLog) { + if s == nil { + return + } + s.webhookMu.RLock() + url := s.webhookURL + secret := s.webhookSecret + s.webhookMu.RUnlock() + if url == "" { + return + } + payload := map[string]any{ + "eventType": "audit.log", + "occurredAt": record.CreatedAt.UTC().Format(time.RFC3339), + "actor": map[string]any{ + "userId": record.UserID, + "username": record.Username, + }, + "category": record.Category, + "action": record.Action, + "targetType": record.TargetType, + "targetId": record.TargetID, + "targetName": record.TargetName, + "detail": record.Detail, + "clientIp": record.ClientIP, + } + body, err := json.Marshal(payload) + if err != nil { + log.Printf("[audit] webhook marshal failed: %v", err) + return + } + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + log.Printf("[audit] webhook build request failed: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "BackupX-Audit/1.0") + if secret != "" { + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write(body) + req.Header.Set("X-BackupX-Signature", "sha256="+hex.EncodeToString(mac.Sum(nil))) + } + resp, err := s.httpClient.Do(req) + if err != nil { + log.Printf("[audit] webhook POST failed: %v", err) + return + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + log.Printf("[audit] webhook returned status %d", resp.StatusCode) + } +} + // List 分页查询审计日志 func (s *AuditService) List(ctx context.Context, category string, limit, offset int) (*repository.AuditLogListResult, error) { result, err := s.repo.List(ctx, repository.AuditLogListOptions{ diff --git a/server/internal/service/audit_service_webhook_test.go b/server/internal/service/audit_service_webhook_test.go new file mode 100644 index 0000000..5896151 --- /dev/null +++ b/server/internal/service/audit_service_webhook_test.go @@ -0,0 +1,129 @@ +package service + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "backupx/server/internal/model" + "backupx/server/internal/repository" +) + +// fakeAuditRepo 用通道同步等待异步写入,避免 sleep。 +type fakeAuditRepo struct { + mu sync.Mutex + logs []model.AuditLog + created chan struct{} +} + +func newFakeAuditRepo() *fakeAuditRepo { + return &fakeAuditRepo{created: make(chan struct{}, 4)} +} + +func (r *fakeAuditRepo) Create(_ context.Context, log *model.AuditLog) error { + r.mu.Lock() + log.CreatedAt = time.Now().UTC() + r.logs = append(r.logs, *log) + r.mu.Unlock() + r.created <- struct{}{} + return nil +} + +func (r *fakeAuditRepo) List(context.Context, repository.AuditLogListOptions) (*repository.AuditLogListResult, error) { + return &repository.AuditLogListResult{}, nil +} + +func (r *fakeAuditRepo) ListAll(context.Context, repository.AuditLogListOptions) ([]model.AuditLog, error) { + return nil, nil +} + +func TestAuditService_WebhookDeliversSignedPayload(t *testing.T) { + var hits atomic.Int32 + var got struct { + sig string + payload map[string]any + received chan struct{} + } + got.received = make(chan struct{}, 1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hits.Add(1) + body, _ := io.ReadAll(r.Body) + got.sig = r.Header.Get("X-BackupX-Signature") + _ = json.Unmarshal(body, &got.payload) + + // 验证 HMAC 正确 + mac := hmac.New(sha256.New, []byte("s3cret")) + mac.Write(body) + expected := "sha256=" + hex.EncodeToString(mac.Sum(nil)) + if got.sig != expected { + t.Errorf("signature mismatch: expected %s, got %s", expected, got.sig) + } + w.WriteHeader(http.StatusOK) + got.received <- struct{}{} + })) + defer server.Close() + + repo := newFakeAuditRepo() + svc := NewAuditService(repo) + svc.SetWebhook(server.URL, "s3cret") + + svc.Record(AuditEntry{ + Username: "alice", + Category: "auth", + Action: "login_success", + ClientIP: "10.0.0.1", + Detail: "admin login", + }) + + // 等待异步写入 + webhook + select { + case <-repo.created: + case <-time.After(time.Second): + t.Fatal("audit log not written within 1s") + } + select { + case <-got.received: + case <-time.After(time.Second): + t.Fatal("webhook not invoked within 1s") + } + + if hits.Load() != 1 { + t.Fatalf("expected 1 webhook hit, got %d", hits.Load()) + } + if got.payload["eventType"] != "audit.log" { + t.Errorf("eventType wrong: %v", got.payload["eventType"]) + } + actor, ok := got.payload["actor"].(map[string]any) + if !ok || actor["username"] != "alice" { + t.Errorf("actor.username mismatch: %v", got.payload["actor"]) + } + if got.payload["action"] != "login_success" { + t.Errorf("action mismatch: %v", got.payload["action"]) + } +} + +func TestAuditService_WebhookDisabledWhenURLEmpty(t *testing.T) { + repo := newFakeAuditRepo() + svc := NewAuditService(repo) + // 不调用 SetWebhook:应该不发送任何请求 + svc.Record(AuditEntry{Username: "bob", Action: "logout"}) + + select { + case <-repo.created: + case <-time.After(time.Second): + t.Fatal("audit log not written within 1s") + } + // 给 webhook 一些时间(即便它不会被调用) + time.Sleep(100 * time.Millisecond) + // 无显式断言:能不 panic 即算通过 +} diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index f0d7f04..096f940 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -17,6 +17,7 @@ import ( "backupx/server/internal/apperror" "backupx/server/internal/backup" backupretention "backupx/server/internal/backup/retention" + "backupx/server/internal/metrics" "backupx/server/internal/model" "backupx/server/internal/repository" "backupx/server/internal/storage" @@ -92,8 +93,14 @@ type BackupExecutionService struct { // nodeSemaphores 节点级并发限制(按 NodeID 映射)。 // 没命中的 NodeID 走全局 semaphore,节点配置 MaxConcurrent>0 时按该节点独立排队。 nodeSemaphores sync.Map - retries int // rclone 底层重试次数 - bandwidthLimit string // rclone 带宽限制 + retries int // rclone 底层重试次数 + bandwidthLimit string // rclone 带宽限制(全局默认,节点配置可覆盖) + metrics *metrics.Metrics +} + +// SetMetrics 注入 Prometheus 采集器。nil 时所有埋点退化为 no-op。 +func (s *BackupExecutionService) SetMetrics(m *metrics.Metrics) { + s.metrics = m } // ReplicationTrigger 抽象备份成功后的副本派发(实现者:ReplicationService)。 @@ -407,6 +414,22 @@ func (s *BackupExecutionService) shouldNotify(ctx context.Context, task *model.B return true } +// effectiveBandwidth 返回当前上下文应用的带宽限速字符串。 +// 优先级:Node.BandwidthLimit(非空) > 全局 s.bandwidthLimit。 +func (s *BackupExecutionService) effectiveBandwidth(ctx context.Context, nodeID uint) string { + if nodeID == 0 || s.nodeRepo == nil { + return s.bandwidthLimit + } + node, err := s.nodeRepo.FindByID(ctx, nodeID) + if err != nil || node == nil { + return s.bandwidthLimit + } + if strings.TrimSpace(node.BandwidthLimit) != "" { + return node.BandwidthLimit + } + return s.bandwidthLimit +} + // acquireNodeSemaphore 返回节点级并发通道。懒初始化:第一次为某节点排队时创建。 // 如果节点未配置 MaxConcurrent 或 nodeRepo 未注入,返回 nil(调用方走全局 semaphore)。 // 节点容量仅在首次创建时采用,后续变更需重启服务才生效(避免运行时 resize 通道的复杂度)。 @@ -456,6 +479,10 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba s.semaphore <- struct{}{} defer func() { <-s.semaphore }() + // Prometheus: running gauge + 完成时 observe 耗时/字节/状态 + s.metrics.IncTaskRunning() + defer s.metrics.DecTaskRunning() + logger := backup.NewExecutionLogger(recordID, s.logHub) status := "failed" errMessage := "" @@ -468,6 +495,8 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba if finalizeErr := s.finalizeRecord(ctx, task, recordID, startedAt, status, errMessage, logger.String(), fileName, fileSize, checksum, storagePath); finalizeErr != nil { logger.Errorf("写回备份记录失败:%v", finalizeErr) } + // 采集任务执行结果到 Prometheus(耗时 + 产出字节 + 状态计数) + s.metrics.ObserveTaskRun(task.Type, status, time.Since(startedAt).Seconds(), fileSize) // 写入多目标上传结果 if len(uploadResults) > 0 { if resultsJSON, marshalErr := json.Marshal(uploadResults); marshalErr == nil { @@ -559,7 +588,8 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba if findErr == nil && target != nil { targetName = target.Name } - provider, resolveErr := s.resolveProvider(ctx, targetID) + // 节点级带宽覆盖:若 task 绑定节点并配置了 BandwidthLimit,覆盖全局限速 + provider, resolveErr := s.resolveProviderForNode(ctx, targetID, task.NodeID) if resolveErr != nil { uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: resolveErr.Error()} logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr) @@ -742,10 +772,17 @@ func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model } func (s *BackupExecutionService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) { - // 注入 rclone 传输配置(重试、带宽限制) + return s.resolveProviderForNode(ctx, targetID, 0) +} + +// resolveProviderForNode 根据节点的 BandwidthLimit 覆盖全局默认。 +// nodeID=0 或节点未配置时退化为全局默认。 +// 仅在 Master 本地执行生效;Agent 会收到自身 Node 配置,并在独立 runtime 中应用。 +func (s *BackupExecutionService) resolveProviderForNode(ctx context.Context, targetID uint, nodeID uint) (storage.StorageProvider, error) { + // 注入 rclone 传输配置(重试、节点级带宽覆盖全局) ctx = rclone.ConfiguredContext(ctx, rclone.TransferConfig{ LowLevelRetries: s.retries, - BandwidthLimit: s.bandwidthLimit, + BandwidthLimit: s.effectiveBandwidth(ctx, nodeID), }) target, err := s.targets.FindByID(ctx, targetID) if err != nil { diff --git a/server/internal/service/replication_service.go b/server/internal/service/replication_service.go index 6fce207..9e8f83f 100644 --- a/server/internal/service/replication_service.go +++ b/server/internal/service/replication_service.go @@ -10,6 +10,7 @@ import ( "time" "backupx/server/internal/apperror" + "backupx/server/internal/metrics" "backupx/server/internal/model" "backupx/server/internal/repository" "backupx/server/internal/storage" @@ -37,6 +38,12 @@ type ReplicationService struct { semaphore chan struct{} async func(func()) now func() time.Time + metrics *metrics.Metrics +} + +// SetMetrics 注入 Prometheus 采集器。 +func (s *ReplicationService) SetMetrics(m *metrics.Metrics) { + s.metrics = m } func NewReplicationService( @@ -193,6 +200,7 @@ func (s *ReplicationService) executeReplication(ctx context.Context, repID uint) rep.DurationSeconds = int(completedAt.Sub(rep.StartedAt).Seconds()) rep.CompletedAt = &completedAt _ = s.replications.Update(ctx, rep) + s.metrics.ObserveReplication(status) if status == model.ReplicationStatusFailed { s.dispatchFailed(ctx, rep, errMessage) } diff --git a/server/internal/service/restore_service.go b/server/internal/service/restore_service.go index 55135f2..cc3f0c4 100644 --- a/server/internal/service/restore_service.go +++ b/server/internal/service/restore_service.go @@ -11,6 +11,7 @@ import ( "backupx/server/internal/apperror" "backupx/server/internal/backup" + "backupx/server/internal/metrics" "backupx/server/internal/model" "backupx/server/internal/repository" "backupx/server/internal/storage" @@ -41,6 +42,12 @@ type RestoreService struct { semaphore chan struct{} async func(func()) now func() time.Time + metrics *metrics.Metrics +} + +// SetMetrics 注入 Prometheus 采集器。 +func (s *RestoreService) SetMetrics(m *metrics.Metrics) { + s.metrics = m } // NewRestoreService 构造恢复服务。maxConcurrent 控制本地并发恢复数。 @@ -432,6 +439,7 @@ func (s *RestoreService) finalizeWithLog(ctx context.Context, restoreID uint, st } record.DurationSeconds = int(completedAt.Sub(record.StartedAt).Seconds()) record.CompletedAt = &completedAt + s.metrics.ObserveRestore(status) return s.restores.Update(ctx, record) } diff --git a/server/internal/service/settings_service.go b/server/internal/service/settings_service.go index 780fec5..154709e 100644 --- a/server/internal/service/settings_service.go +++ b/server/internal/service/settings_service.go @@ -8,21 +8,55 @@ import ( "backupx/server/internal/repository" ) +// AuditWebhookConfigurer 抽象审计 webhook 配置接口,由 AuditService 实现。 +// 用接口解耦避免 settings_service 直接依赖 AuditService 具体类型。 +type AuditWebhookConfigurer interface { + SetWebhook(url, secret string) +} + type SettingsService struct { - configs repository.SystemConfigRepository + configs repository.SystemConfigRepository + auditWebhook AuditWebhookConfigurer } func NewSettingsService(configs repository.SystemConfigRepository) *SettingsService { return &SettingsService{configs: configs} } -// settingsKeys lists all user-editable setting keys. +// SetAuditWebhookConfigurer 注入 audit webhook 配置接收方。 +// 启动时立即用当前 DB 中的设置调用一次,后续每次 Update 变更 webhook key 时同步推送。 +func (s *SettingsService) SetAuditWebhookConfigurer(ctx context.Context, configurer AuditWebhookConfigurer) { + if s == nil || configurer == nil { + return + } + s.auditWebhook = configurer + // 启动时同步一次,保证重启后配置不丢失 + all, err := s.GetAll(ctx) + if err == nil { + configurer.SetWebhook(all[SettingKeyAuditWebhookURL], all[SettingKeyAuditWebhookSecret]) + } +} + +// 可被前端写入的系统设置键。新增键必须同步加入此清单, +// 否则 Update 会忽略(安全原则:显式 allow-list)。 +const ( + SettingKeySiteName = "site_name" + SettingKeyLanguage = "language" + SettingKeyTimezone = "timezone" + SettingKeyBackupNotificationEnabled = "backup_notification_enabled" + SettingKeyBandwidthLimit = "bandwidth_limit" + SettingKeyAuditWebhookURL = "audit_webhook_url" + SettingKeyAuditWebhookSecret = "audit_webhook_secret" +) + var settingsKeys = []string{ - "site_name", - "language", - "timezone", - "backup_notification_enabled", - "bandwidth_limit", + SettingKeySiteName, + SettingKeyLanguage, + SettingKeyTimezone, + SettingKeyBackupNotificationEnabled, + SettingKeyBandwidthLimit, + SettingKeyAuditWebhookURL, + SettingKeyAuditWebhookSecret, } func (s *SettingsService) GetAll(ctx context.Context) (map[string]string, error) { @@ -42,6 +76,7 @@ func (s *SettingsService) Update(ctx context.Context, settings map[string]string for _, key := range settingsKeys { allowed[key] = true } + auditWebhookTouched := false for key, value := range settings { if !allowed[key] { continue @@ -50,6 +85,14 @@ func (s *SettingsService) Update(ctx context.Context, settings map[string]string if err := s.configs.Upsert(ctx, item); err != nil { return nil, apperror.Internal("SETTINGS_UPDATE_FAILED", "无法更新系统设置", err) } + if key == SettingKeyAuditWebhookURL || key == SettingKeyAuditWebhookSecret { + auditWebhookTouched = true + } + } + // audit webhook 配置变化:立即同步到 AuditService,避免重启才生效 + if auditWebhookTouched && s.auditWebhook != nil { + all, _ := s.GetAll(ctx) + s.auditWebhook.SetWebhook(all[SettingKeyAuditWebhookURL], all[SettingKeyAuditWebhookSecret]) } return s.GetAll(ctx) } diff --git a/server/internal/service/verification_service.go b/server/internal/service/verification_service.go index 220faf3..b7e5912 100644 --- a/server/internal/service/verification_service.go +++ b/server/internal/service/verification_service.go @@ -10,6 +10,7 @@ import ( "backupx/server/internal/apperror" "backupx/server/internal/backup" + "backupx/server/internal/metrics" "backupx/server/internal/model" "backupx/server/internal/repository" "backupx/server/internal/storage" @@ -42,6 +43,12 @@ type VerificationService struct { semaphore chan struct{} async func(func()) now func() time.Time + metrics *metrics.Metrics +} + +// SetMetrics 注入 Prometheus 采集器。 +func (s *VerificationService) SetMetrics(m *metrics.Metrics) { + s.metrics = m } // VerificationNotifier 给用户推送验证完成/失败通知。 @@ -413,6 +420,7 @@ func (s *VerificationService) finalize(ctx context.Context, verID uint, status, } record.DurationSeconds = int(completedAt.Sub(record.StartedAt).Seconds()) record.CompletedAt = &completedAt + s.metrics.ObserveVerify(status) return s.verifications.Update(ctx, record) }