mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-06-25 03:23:41 +08:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a699da8d6 | ||
|
|
1b73f19eb1 |
@@ -46,6 +46,9 @@
|
||||
| **Multi-Node Cluster** | Master-Agent mode via HTTP long-polling — Agents run tasks locally, upload straight to storage, no reverse connectivity required |
|
||||
| **Security** | JWT + bcrypt + AES-256-GCM encrypted config + optional backup encryption + full audit log |
|
||||
| **Notifications** | Email / Webhook / Telegram on success or failure |
|
||||
| **Observability** | Prometheus `/metrics` endpoint + `/health` + `/ready` probes + SLA breach gauge |
|
||||
| **Audit Webhook** | HMAC-SHA256 signed forwarding to SIEM / WORM storage for compliance (SOC2 / GDPR) |
|
||||
| **Flow Control** | Per-node bandwidth cap + per-node concurrency limit — tune big/small nodes independently |
|
||||
| **Deployment** | Single binary + embedded SQLite, Docker one-click, zero external dependencies |
|
||||
|
||||
## Quick Start
|
||||
|
||||
@@ -46,6 +46,9 @@
|
||||
| **多节点集群** | Master-Agent 模式,基于 HTTP 长轮询跨多台服务器管理备份。Agent 本地执行任务并直接上传到存储,无需反向连通性 |
|
||||
| **安全** | JWT + bcrypt + AES-256-GCM 加密配置 + 可选备份文件加密 + 完整审计日志 |
|
||||
| **通知** | 邮件 / Webhook / Telegram,备份成功或失败时自动推送 |
|
||||
| **可观测性** | Prometheus `/metrics` 端点 + `/health` + `/ready` 探针 + SLA 违约监控 |
|
||||
| **审计外输** | HMAC-SHA256 签名 Webhook,对接 SIEM / WORM 存储满足 SOC2 / GDPR 合规 |
|
||||
| **流控** | 节点级带宽限速 + 节点级并发控制,大小节点分别配置,避免小内存 Agent 被挤爆 |
|
||||
| **部署** | 单二进制 + 内嵌 SQLite,Docker 一键启动,零外部依赖 |
|
||||
|
||||
## 快速开始
|
||||
|
||||
@@ -18,6 +18,22 @@ server {
|
||||
proxy_read_timeout 3600s;
|
||||
}
|
||||
|
||||
# Agent 一键安装脚本路径(兼容 v2.0 及之前生成的命令)。
|
||||
# v2.1+ 新生成的命令走 /api/install/... 自动命中上面的 /api/ 代理。
|
||||
location /install/ {
|
||||
proxy_pass http://127.0.0.1:8340/install/;
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
}
|
||||
|
||||
# 健康检查端点同样不走 SPA fallback。
|
||||
location = /health { proxy_pass http://127.0.0.1:8340/health; }
|
||||
location = /ready { proxy_pass http://127.0.0.1:8340/ready; }
|
||||
location = /metrics { proxy_pass http://127.0.0.1:8340/metrics; }
|
||||
|
||||
location / {
|
||||
try_files $uri $uri/ /index.html;
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ require (
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
github.com/golang-jwt/jwt/v5 v5.3.0
|
||||
github.com/natefinch/lumberjack v2.0.0+incompatible
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/rclone/rclone v1.73.3
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/spf13/viper v1.20.0
|
||||
@@ -181,7 +182,6 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
|
||||
github.com/pquerna/otp v1.5.0 // indirect
|
||||
github.com/prometheus/client_golang v1.23.2 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.67.2 // indirect
|
||||
github.com/prometheus/procfs v0.19.2 // indirect
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"backupx/server/internal/database"
|
||||
aphttp "backupx/server/internal/http"
|
||||
"backupx/server/internal/logger"
|
||||
"backupx/server/internal/metrics"
|
||||
"backupx/server/internal/notify"
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/scheduler"
|
||||
@@ -109,6 +110,8 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
|
||||
auditService := service.NewAuditService(auditLogRepo)
|
||||
authService.SetAuditService(auditService)
|
||||
schedulerService.SetAuditRecorder(auditService)
|
||||
// 审计日志外输:启动时用当前 settings 初始化 webhook,后续前端修改立即生效
|
||||
settingsService.SetAuditWebhookConfigurer(ctx, auditService)
|
||||
|
||||
// Database discovery(集群依赖在 agentService 创建后注入)
|
||||
databaseDiscoveryService := service.NewDatabaseDiscoveryService(backup.NewOSCommandExecutor())
|
||||
@@ -226,6 +229,21 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
|
||||
// Dashboard 集群概览依赖注入
|
||||
dashboardService.SetClusterDependencies(nodeRepo, version)
|
||||
|
||||
// Prometheus 指标采集:Counter/Histogram 由业务服务实时写入;
|
||||
// Gauge 类(存储用量、节点在线、SLA 违约)由 Collector 每 30s 异步刷新,
|
||||
// 避免 /metrics 请求路径做慢 IO。
|
||||
appMetrics := metrics.New(version)
|
||||
backupExecutionService.SetMetrics(appMetrics)
|
||||
restoreService.SetMetrics(appMetrics)
|
||||
verificationService.SetMetrics(appMetrics)
|
||||
replicationService.SetMetrics(appMetrics)
|
||||
metricsCollector := metrics.NewCollector(
|
||||
appMetrics,
|
||||
metrics.NewRepoSource(storageTargetRepo, backupRecordRepo, nodeRepo, backupTaskRepo),
|
||||
30*time.Second,
|
||||
)
|
||||
metricsCollector.Start(ctx)
|
||||
|
||||
router := aphttp.NewRouter(aphttp.RouterDependencies{
|
||||
Context: ctx,
|
||||
Config: cfg,
|
||||
@@ -259,6 +277,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
|
||||
InstallTokenService: installTokenService,
|
||||
MasterExternalURL: "", // 如需覆盖 URL,可扩展 cfg.Server 增字段;目前留空依赖 X-Forwarded-* / Request.Host
|
||||
DB: db,
|
||||
Metrics: appMetrics,
|
||||
})
|
||||
|
||||
httpServer := &stdhttp.Server{
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -171,6 +172,22 @@ func TestOneClickInstallFlow(t *testing.T) {
|
||||
if !strings.Contains(scriptRec.Body.String(), "systemctl enable --now backupx-agent") {
|
||||
t.Fatalf("script missing systemctl enable:\n%s", scriptRec.Body.String())
|
||||
}
|
||||
// Issue #46 防嗅探 headers:text/plain + nosniff + no-store + Content-Disposition
|
||||
if ct := scriptRec.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") {
|
||||
t.Errorf("script Content-Type should be text/plain*, got %q", ct)
|
||||
}
|
||||
if nosniff := scriptRec.Header().Get("X-Content-Type-Options"); nosniff != "nosniff" {
|
||||
t.Errorf("missing X-Content-Type-Options: nosniff (got %q)", nosniff)
|
||||
}
|
||||
if cc := scriptRec.Header().Get("Cache-Control"); !strings.Contains(cc, "no-store") {
|
||||
t.Errorf("missing Cache-Control: no-store (got %q)", cc)
|
||||
}
|
||||
if cd := scriptRec.Header().Get("Content-Disposition"); !strings.Contains(cd, "backupx-agent-install.sh") {
|
||||
t.Errorf("Content-Disposition should name the script file (got %q)", cd)
|
||||
}
|
||||
if !strings.Contains(scriptRec.Body.String(), "BACKUPX_AGENT_INSTALL_V1") {
|
||||
t.Errorf("script missing magic marker BACKUPX_AGENT_INSTALL_V1")
|
||||
}
|
||||
|
||||
// 4. 再次消费应 410
|
||||
scriptReq2 := httptest.NewRequest(http.MethodGet, "/install/"+genResp.Data.InstallToken, nil)
|
||||
@@ -181,6 +198,73 @@ func TestOneClickInstallFlow(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestInstallScriptAliasUnderAPI 验证 /api/install/:token 别名路径可用,
|
||||
// 这是 Issue #46 的根本修复:让 install 端点自动命中反向代理的 /api/ 转发规则,
|
||||
// 避免 nginx SPA fallback 把请求当前端路由返回 index.html。
|
||||
func TestInstallScriptAliasUnderAPI(t *testing.T) {
|
||||
router, token := setupInstallFlowRouter(t)
|
||||
|
||||
// 1. 创建一个节点,生成 install token
|
||||
batchBody, _ := json.Marshal(map[string][]string{"names": {"alias-node"}})
|
||||
batchReq := httptest.NewRequest(http.MethodPost, "/api/nodes/batch", bytes.NewReader(batchBody))
|
||||
batchReq.Header.Set("Content-Type", "application/json")
|
||||
batchReq.Header.Set("Authorization", "Bearer "+token)
|
||||
batchRec := httptest.NewRecorder()
|
||||
router.ServeHTTP(batchRec, batchReq)
|
||||
if batchRec.Code != 200 {
|
||||
t.Fatalf("batch create failed: %d %s", batchRec.Code, batchRec.Body.String())
|
||||
}
|
||||
var batchResp struct {
|
||||
Data []struct {
|
||||
ID uint `json:"id"`
|
||||
} `json:"data"`
|
||||
}
|
||||
_ = json.Unmarshal(batchRec.Body.Bytes(), &batchResp)
|
||||
if len(batchResp.Data) == 0 {
|
||||
t.Fatalf("batch create returned no nodes: %s", batchRec.Body.String())
|
||||
}
|
||||
nodeID := batchResp.Data[0].ID
|
||||
|
||||
genBody, _ := json.Marshal(map[string]any{
|
||||
"mode": "systemd", "arch": "auto", "agentVersion": "v1.7.0", "downloadSrc": "github", "ttlSeconds": 600,
|
||||
})
|
||||
genReq := httptest.NewRequest(http.MethodPost,
|
||||
"/api/nodes/"+strconv.FormatUint(uint64(nodeID), 10)+"/install-tokens", bytes.NewReader(genBody))
|
||||
genReq.Header.Set("Content-Type", "application/json")
|
||||
genReq.Header.Set("Authorization", "Bearer "+token)
|
||||
genRec := httptest.NewRecorder()
|
||||
router.ServeHTTP(genRec, genReq)
|
||||
if genRec.Code != 200 {
|
||||
t.Fatalf("gen install token failed: %d %s", genRec.Code, genRec.Body.String())
|
||||
}
|
||||
var genResp struct {
|
||||
Data struct {
|
||||
InstallToken string `json:"installToken"`
|
||||
URL string `json:"url"`
|
||||
} `json:"data"`
|
||||
}
|
||||
_ = json.Unmarshal(genRec.Body.Bytes(), &genResp)
|
||||
|
||||
// 2. 新生成的 url 应指向 /api/install/... —— 让反向代理的 /api/ 转发规则自动接管
|
||||
if !strings.Contains(genResp.Data.URL, "/api/install/") {
|
||||
t.Errorf("new install URL should use /api/install/ prefix, got %s", genResp.Data.URL)
|
||||
}
|
||||
|
||||
// 3. /api/install/:token 必须可消费(与 /install/:token 等价)
|
||||
aliasReq := httptest.NewRequest(http.MethodGet, "/api/install/"+genResp.Data.InstallToken, nil)
|
||||
aliasRec := httptest.NewRecorder()
|
||||
router.ServeHTTP(aliasRec, aliasReq)
|
||||
if aliasRec.Code != 200 {
|
||||
t.Fatalf("/api/install alias failed: %d %s", aliasRec.Code, aliasRec.Body.String())
|
||||
}
|
||||
if !strings.Contains(aliasRec.Body.String(), "systemctl enable --now backupx-agent") {
|
||||
t.Errorf("alias should return rendered script, got:\n%s", aliasRec.Body.String())
|
||||
}
|
||||
if ct := aliasRec.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") {
|
||||
t.Errorf("alias Content-Type should be text/plain*, got %q", ct)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInstallTokenRateLimit(t *testing.T) {
|
||||
router, jwt := setupInstallFlowRouter(t)
|
||||
|
||||
|
||||
@@ -36,6 +36,13 @@ func NewInstallHandler(gcCtx context.Context, tokenService *service.InstallToken
|
||||
}
|
||||
|
||||
// Script 消费 install token 并返回 shell 脚本;Mode 由 token 存储决定(systemd/docker/foreground 均返回 shell)。
|
||||
//
|
||||
// 响应头策略(issue #46 教训):
|
||||
// - Content-Type 用 text/plain 而非 text/x-shellscript:避免 Cloudflare/反向代理把
|
||||
// 脚本内容按特殊类型识别并触发 minify/HTML rewrite,导致 `curl | sh` 收到非脚本内容
|
||||
// - X-Content-Type-Options: nosniff:禁止浏览器/中间层按内容嗅探改写 MIME
|
||||
// - Cache-Control: no-store:token 一次性消费,禁止任何缓存层留存旧脚本
|
||||
// - Content-Disposition: inline; filename=...:部分代理会跳过带文件名的响应
|
||||
func (h *InstallHandler) Script(c *gin.Context) {
|
||||
if !h.limiter.allow(c.ClientIP()) {
|
||||
c.String(stdhttp.StatusTooManyRequests, "请求过于频繁,请稍后再试\n")
|
||||
@@ -66,7 +73,10 @@ func (h *InstallHandler) Script(c *gin.Context) {
|
||||
c.String(stdhttp.StatusInternalServerError, "render error\n")
|
||||
return
|
||||
}
|
||||
c.Data(stdhttp.StatusOK, "text/x-shellscript; charset=utf-8", []byte(script))
|
||||
c.Header("X-Content-Type-Options", "nosniff")
|
||||
c.Header("Cache-Control", "no-store")
|
||||
c.Header("Content-Disposition", `inline; filename="backupx-agent-install.sh"`)
|
||||
c.Data(stdhttp.StatusOK, "text/plain; charset=utf-8", []byte(script))
|
||||
}
|
||||
|
||||
// Compose 消费 install token 并返回 docker-compose YAML,仅 Mode=docker 有效。
|
||||
|
||||
@@ -262,14 +262,16 @@ func (h *NodeHandler) CreateInstallToken(c *gin.Context) {
|
||||
fmt.Sprintf("生成 %s/%s install token TTL=%ds", input.Mode, input.Arch, input.TTLSeconds))
|
||||
|
||||
masterURL := resolveMasterURL(c, h.externalURL)
|
||||
// 使用 /api/install/... 而非 /install/... —— 让反向代理的 /api/ 转发规则
|
||||
// 自动接管,避免 SPA fallback 把请求当成前端路由返回 index.html(issue #46)。
|
||||
body := gin.H{
|
||||
"installToken": out.Token,
|
||||
"expiresAt": out.ExpiresAt,
|
||||
"url": masterURL + "/install/" + out.Token,
|
||||
"url": masterURL + "/api/install/" + out.Token,
|
||||
"composeUrl": "",
|
||||
}
|
||||
if input.Mode == "docker" {
|
||||
body["composeUrl"] = masterURL + "/install/" + out.Token + "/compose.yml"
|
||||
body["composeUrl"] = masterURL + "/api/install/" + out.Token + "/compose.yml"
|
||||
}
|
||||
response.Success(c, body)
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"backupx/server/internal/apperror"
|
||||
"backupx/server/internal/config"
|
||||
"backupx/server/internal/metrics"
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/security"
|
||||
"backupx/server/internal/service"
|
||||
@@ -52,6 +53,8 @@ type RouterDependencies struct {
|
||||
MasterExternalURL string
|
||||
// DB 注入给健康检查端点做 liveness/readiness 探测。
|
||||
DB *gorm.DB
|
||||
// Metrics 注入给 /metrics 端点;为 nil 时端点返回 503。
|
||||
Metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
func NewRouter(deps RouterDependencies) *gin.Engine {
|
||||
@@ -311,7 +314,19 @@ func NewRouter(deps RouterDependencies) *gin.Engine {
|
||||
engine.GET("/api/health", healthHandler.Live)
|
||||
engine.GET("/api/ready", healthHandler.Ready)
|
||||
|
||||
// 公开安装路由(不走 JWT 中间件)
|
||||
// Prometheus /metrics 端点(公开、无认证;内网/反向代理授权即可)。
|
||||
// 业内通行做法:/metrics 通常由 Prometheus pull 抓取,不走 API Key。
|
||||
if deps.Metrics != nil {
|
||||
engine.GET("/metrics", gin.WrapH(deps.Metrics.Handler()))
|
||||
}
|
||||
|
||||
// 公开安装路由(不走 JWT 中间件)。
|
||||
// 同时注册到 / 和 /api 前缀下:
|
||||
// - /install/:token 保留历史 URL,兼容旧 nginx 部署
|
||||
// - /api/install/:token 新 URL,自动走反向代理的 /api/ 转发规则
|
||||
//
|
||||
// Issue #46:用户的 nginx 只转发 /api/,/install/* 被 SPA fallback 到 index.html,
|
||||
// 返回 HTML 被 sh 解释成 "Syntax error"。使用 /api/install/ 可避开此问题。
|
||||
if deps.InstallTokenService != nil {
|
||||
gcCtx := deps.Context
|
||||
if gcCtx == nil {
|
||||
@@ -320,6 +335,8 @@ func NewRouter(deps RouterDependencies) *gin.Engine {
|
||||
installHandler := NewInstallHandler(gcCtx, deps.InstallTokenService, deps.AuditService, deps.MasterExternalURL)
|
||||
engine.GET("/install/:token", installHandler.Script)
|
||||
engine.GET("/install/:token/compose.yml", installHandler.Compose)
|
||||
engine.GET("/api/install/:token", installHandler.Script)
|
||||
engine.GET("/api/install/:token/compose.yml", installHandler.Compose)
|
||||
}
|
||||
|
||||
engine.NoRoute(func(c *gin.Context) {
|
||||
|
||||
38
server/internal/installscript/issue46_test.go
Normal file
38
server/internal/installscript/issue46_test.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package installscript
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"backupx/server/internal/model"
|
||||
)
|
||||
|
||||
// TestRenderScriptIncludesMagicMarker 渲染脚本必须包含 Issue #46 引入的魔数注释,
|
||||
// 方便用户通过 `head -3 脚本` 自查是否被中间层改写。
|
||||
func TestRenderScriptIncludesMagicMarker(t *testing.T) {
|
||||
for _, mode := range []string{model.InstallModeSystemd, model.InstallModeDocker, model.InstallModeForeground} {
|
||||
ctx := testCtx
|
||||
ctx.Mode = mode
|
||||
got, err := RenderScript(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("render err (%s): %v", mode, err)
|
||||
}
|
||||
if !strings.Contains(got, "BACKUPX_AGENT_INSTALL_V1") {
|
||||
t.Errorf("mode=%s: script missing magic marker:\n%s", mode, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRenderScriptBashBootstrap 脚本顶部必须有 bash 自举段,文件执行时跳到 bash。
|
||||
func TestRenderScriptBashBootstrap(t *testing.T) {
|
||||
got, err := RenderScript(testCtx)
|
||||
if err != nil {
|
||||
t.Fatalf("render err: %v", err)
|
||||
}
|
||||
if !strings.Contains(got, `[ -z "${BASH_VERSION:-}" ]`) {
|
||||
t.Errorf("script missing bash bootstrap guard:\n%s", got)
|
||||
}
|
||||
if !strings.Contains(got, `exec bash "$0" "$@"`) {
|
||||
t.Errorf("script missing exec bash fallback:\n%s", got)
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,16 @@
|
||||
#!/bin/sh
|
||||
# BackupX Agent 一键安装脚本(由 Master 动态渲染)
|
||||
# Magic: BACKUPX_AGENT_INSTALL_V1 —— 若 `head -3 脚本` 看不到此行,说明反向代理/CDN 改写了响应
|
||||
# 模式: {{.Mode}} | 架构: {{.Arch}} | 版本: {{.AgentVersion}}
|
||||
set -eu
|
||||
|
||||
# 自举到 bash(文件执行模式下生效;管道模式 $0 不是文件,exec 会静默失败,继续用 sh)。
|
||||
# 动机:部分 Debian/Ubuntu 用户通过 `curl | sudo sh` 触发时,dash 对本脚本报语法错误;
|
||||
# 若目标机装有 bash,优先切换到 bash 获得更一致的行为。
|
||||
if [ -z "${BASH_VERSION:-}" ] && command -v bash >/dev/null 2>&1 && [ -f "$0" ]; then
|
||||
exec bash "$0" "$@"
|
||||
fi
|
||||
|
||||
MASTER_URL="{{.MasterURL}}"
|
||||
AGENT_TOKEN="{{.AgentToken}}"
|
||||
AGENT_VERSION="{{.AgentVersion}}"
|
||||
|
||||
152
server/internal/metrics/collector.go
Normal file
152
server/internal/metrics/collector.go
Normal file
@@ -0,0 +1,152 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/model"
|
||||
"backupx/server/internal/repository"
|
||||
)
|
||||
|
||||
// SampleSource 抽象 Collector 需要的仓储访问,便于单测替换。
|
||||
type SampleSource interface {
|
||||
ListStorageTargets(ctx context.Context) ([]model.StorageTarget, error)
|
||||
StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error)
|
||||
ListNodes(ctx context.Context) ([]model.Node, error)
|
||||
CountSLABreach(ctx context.Context) (int, error)
|
||||
}
|
||||
|
||||
// repoSource 把 repository 适配到 SampleSource。
|
||||
type repoSource struct {
|
||||
targets repository.StorageTargetRepository
|
||||
records repository.BackupRecordRepository
|
||||
nodes repository.NodeRepository
|
||||
tasks repository.BackupTaskRepository
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
// NewRepoSource 用仓储实例构造 SampleSource。
|
||||
func NewRepoSource(
|
||||
targets repository.StorageTargetRepository,
|
||||
records repository.BackupRecordRepository,
|
||||
nodes repository.NodeRepository,
|
||||
tasks repository.BackupTaskRepository,
|
||||
) SampleSource {
|
||||
return &repoSource{
|
||||
targets: targets,
|
||||
records: records,
|
||||
nodes: nodes,
|
||||
tasks: tasks,
|
||||
now: func() time.Time { return time.Now().UTC() },
|
||||
}
|
||||
}
|
||||
|
||||
func (s *repoSource) ListStorageTargets(ctx context.Context) ([]model.StorageTarget, error) {
|
||||
return s.targets.List(ctx)
|
||||
}
|
||||
|
||||
func (s *repoSource) StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error) {
|
||||
return s.records.StorageUsage(ctx)
|
||||
}
|
||||
|
||||
func (s *repoSource) ListNodes(ctx context.Context) ([]model.Node, error) {
|
||||
return s.nodes.List(ctx)
|
||||
}
|
||||
|
||||
// CountSLABreach 统计当前违反 RPO 的任务:
|
||||
// - 任务启用且配置了 SLAHoursRPO > 0
|
||||
// - 最近一次成功备份距今超出 SLA 时间窗,或从未成功过
|
||||
func (s *repoSource) CountSLABreach(ctx context.Context) (int, error) {
|
||||
tasks, err := s.tasks.List(ctx, repository.BackupTaskListOptions{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
now := s.now()
|
||||
count := 0
|
||||
for i := range tasks {
|
||||
task := &tasks[i]
|
||||
if task.SLAHoursRPO <= 0 || !task.Enabled {
|
||||
continue
|
||||
}
|
||||
threshold := now.Add(-time.Duration(task.SLAHoursRPO) * time.Hour)
|
||||
if task.LastRunAt == nil || task.LastRunAt.Before(threshold) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// Collector 周期性采集 gauge 类指标(存储用量、节点在线、SLA 违约)。
|
||||
// 用后台 goroutine 驱动,避免在 /metrics 请求路径做慢 IO。
|
||||
type Collector struct {
|
||||
metrics *Metrics
|
||||
source SampleSource
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
// NewCollector 创建周期采集器。interval=0 走默认 30s。
|
||||
func NewCollector(m *Metrics, source SampleSource, interval time.Duration) *Collector {
|
||||
if interval <= 0 {
|
||||
interval = 30 * time.Second
|
||||
}
|
||||
return &Collector{metrics: m, source: source, interval: interval}
|
||||
}
|
||||
|
||||
// Start 在后台运行采集循环;随 ctx 取消而终止。
|
||||
// 启动时立即采一次,之后按 interval 轮询。
|
||||
func (c *Collector) Start(ctx context.Context) {
|
||||
if c == nil || c.metrics == nil || c.source == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
c.collect(ctx)
|
||||
ticker := time.NewTicker(c.interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.collect(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// collect 执行一次采样;单轮失败不影响下次。
|
||||
func (c *Collector) collect(ctx context.Context) {
|
||||
// 存储用量:按 StorageTargetID 聚合 file_size,对应 target name/type
|
||||
if targets, err := c.source.ListStorageTargets(ctx); err == nil {
|
||||
nameByID := make(map[uint]string, len(targets))
|
||||
typeByID := make(map[uint]string, len(targets))
|
||||
for i := range targets {
|
||||
nameByID[targets[i].ID] = targets[i].Name
|
||||
typeByID[targets[i].ID] = targets[i].Type
|
||||
}
|
||||
if usage, uerr := c.source.StorageUsage(ctx); uerr == nil {
|
||||
c.metrics.ResetStorageUsed()
|
||||
for _, item := range usage {
|
||||
name := nameByID[item.StorageTargetID]
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
c.metrics.SetStorageUsed(name, typeByID[item.StorageTargetID], item.TotalSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
// 节点在线状态:role 约定为 master / agent
|
||||
if nodes, err := c.source.ListNodes(ctx); err == nil {
|
||||
c.metrics.ResetNodeOnline()
|
||||
for i := range nodes {
|
||||
n := &nodes[i]
|
||||
role := "agent"
|
||||
if n.IsLocal {
|
||||
role = "master"
|
||||
}
|
||||
c.metrics.SetNodeOnline(n.Name, role, n.Status == model.NodeStatusOnline)
|
||||
}
|
||||
}
|
||||
if breach, err := c.source.CountSLABreach(ctx); err == nil {
|
||||
c.metrics.SetSLABreach(breach)
|
||||
}
|
||||
}
|
||||
225
server/internal/metrics/registry.go
Normal file
225
server/internal/metrics/registry.go
Normal file
@@ -0,0 +1,225 @@
|
||||
// Package metrics 暴露 BackupX 的 Prometheus 采集器。
|
||||
//
|
||||
// 设计要点:
|
||||
// - 使用独立 Registry,避免与 default registry 中的 Go runtime metrics 混淆
|
||||
// - Counter/Gauge/Histogram 全部以 backupx_ 为前缀,遵循 Prometheus 命名规范
|
||||
// - 所有指标都支持零值:未注入时调用方法是 no-op,不会 panic
|
||||
// - 组件只依赖本包,不反向引用 service/repository,避免循环
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
// Metrics 聚合所有采集器,由 app 层组装一次并按需注入到 service。
|
||||
type Metrics struct {
|
||||
registry *prometheus.Registry
|
||||
|
||||
// 任务执行计数(labels: status, task_type)
|
||||
TaskRunTotal *prometheus.CounterVec
|
||||
// 任务耗时分布(labels: task_type)
|
||||
TaskRunDuration *prometheus.HistogramVec
|
||||
// 任务产出字节数(labels: task_type)
|
||||
TaskBytesTotal *prometheus.CounterVec
|
||||
// 正在运行的任务数
|
||||
TaskRunningGauge prometheus.Gauge
|
||||
// 存储目标用量(labels: target_name, target_type)
|
||||
StorageUsedBytes *prometheus.GaugeVec
|
||||
// 节点在线状态(labels: node_name, role;value: 0/1)
|
||||
NodeOnline *prometheus.GaugeVec
|
||||
// 验证演练结果(labels: status)
|
||||
VerifyRunTotal *prometheus.CounterVec
|
||||
// 恢复操作结果(labels: status)
|
||||
RestoreRunTotal *prometheus.CounterVec
|
||||
// 副本复制结果(labels: status)
|
||||
ReplicationRunTotal *prometheus.CounterVec
|
||||
// SLA 违约数(gauge)
|
||||
SLABreachGauge prometheus.Gauge
|
||||
// 应用信息(label: version)
|
||||
AppInfo *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
// New 构造并注册所有采集器。
|
||||
// 失败时 panic:采集器注册失败属于启动期编程错误,没有合理 fallback。
|
||||
func New(version string) *Metrics {
|
||||
reg := prometheus.NewRegistry()
|
||||
// 注入标准 Go runtime + process 指标
|
||||
reg.MustRegister(collectors.NewGoCollector())
|
||||
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
|
||||
|
||||
m := &Metrics{
|
||||
registry: reg,
|
||||
TaskRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "backupx_task_run_total",
|
||||
Help: "备份任务执行总数,按状态和任务类型细分",
|
||||
}, []string{"status", "task_type"}),
|
||||
TaskRunDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "backupx_task_run_duration_seconds",
|
||||
Help: "备份任务耗时分布",
|
||||
Buckets: []float64{1, 5, 15, 30, 60, 120, 300, 600, 1800, 3600, 7200},
|
||||
}, []string{"task_type"}),
|
||||
TaskBytesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "backupx_task_bytes_total",
|
||||
Help: "备份任务累计产出字节数",
|
||||
}, []string{"task_type"}),
|
||||
TaskRunningGauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "backupx_task_running",
|
||||
Help: "当前正在执行的备份任务数",
|
||||
}),
|
||||
StorageUsedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "backupx_storage_used_bytes",
|
||||
Help: "存储目标已用字节数",
|
||||
}, []string{"target_name", "target_type"}),
|
||||
NodeOnline: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "backupx_node_online",
|
||||
Help: "集群节点在线状态(1 在线 / 0 离线)",
|
||||
}, []string{"node_name", "role"}),
|
||||
VerifyRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "backupx_verify_run_total",
|
||||
Help: "备份验证演练执行总数",
|
||||
}, []string{"status"}),
|
||||
RestoreRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "backupx_restore_run_total",
|
||||
Help: "恢复操作执行总数",
|
||||
}, []string{"status"}),
|
||||
ReplicationRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "backupx_replication_run_total",
|
||||
Help: "备份副本复制执行总数",
|
||||
}, []string{"status"}),
|
||||
SLABreachGauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "backupx_sla_breach_tasks",
|
||||
Help: "当前违反 SLA/RPO 的任务数",
|
||||
}),
|
||||
AppInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "backupx_app_info",
|
||||
Help: "BackupX 应用元信息(恒为 1,通过 label 暴露版本号)",
|
||||
}, []string{"version"}),
|
||||
}
|
||||
reg.MustRegister(
|
||||
m.TaskRunTotal,
|
||||
m.TaskRunDuration,
|
||||
m.TaskBytesTotal,
|
||||
m.TaskRunningGauge,
|
||||
m.StorageUsedBytes,
|
||||
m.NodeOnline,
|
||||
m.VerifyRunTotal,
|
||||
m.RestoreRunTotal,
|
||||
m.ReplicationRunTotal,
|
||||
m.SLABreachGauge,
|
||||
m.AppInfo,
|
||||
)
|
||||
m.AppInfo.WithLabelValues(version).Set(1)
|
||||
return m
|
||||
}
|
||||
|
||||
// Handler 返回 /metrics 的 HTTP handler。
|
||||
// 使用本包专属 registry,避免混入其他组件的默认 metrics。
|
||||
func (m *Metrics) Handler() http.Handler {
|
||||
if m == nil {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
http.Error(w, "metrics disabled", http.StatusServiceUnavailable)
|
||||
})
|
||||
}
|
||||
return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
|
||||
EnableOpenMetrics: false,
|
||||
})
|
||||
}
|
||||
|
||||
// ObserveTaskRun 记录一次任务执行结果。
|
||||
// status 常用值:success / failed / cancelled。nil 接收器安全。
|
||||
func (m *Metrics) ObserveTaskRun(taskType, status string, durationSec float64, bytes int64) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.TaskRunTotal.WithLabelValues(status, taskType).Inc()
|
||||
m.TaskRunDuration.WithLabelValues(taskType).Observe(durationSec)
|
||||
if bytes > 0 {
|
||||
m.TaskBytesTotal.WithLabelValues(taskType).Add(float64(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
// IncTaskRunning / DecTaskRunning 配套使用,反映并发中任务数。
|
||||
func (m *Metrics) IncTaskRunning() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.TaskRunningGauge.Inc()
|
||||
}
|
||||
|
||||
func (m *Metrics) DecTaskRunning() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.TaskRunningGauge.Dec()
|
||||
}
|
||||
|
||||
// ObserveRestore / ObserveVerify / ObserveReplication 记录子动作结果。
|
||||
// 所有方法对 nil 接收器安全:未注入 Metrics 时静默降级,不 panic。
|
||||
func (m *Metrics) ObserveRestore(status string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.RestoreRunTotal.WithLabelValues(status).Inc()
|
||||
}
|
||||
|
||||
func (m *Metrics) ObserveVerify(status string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.VerifyRunTotal.WithLabelValues(status).Inc()
|
||||
}
|
||||
|
||||
func (m *Metrics) ObserveReplication(status string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.ReplicationRunTotal.WithLabelValues(status).Inc()
|
||||
}
|
||||
|
||||
// SetStorageUsed 刷新某存储目标的用量。调用方负责周期采集。
|
||||
func (m *Metrics) SetStorageUsed(name, targetType string, bytes int64) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.StorageUsedBytes.WithLabelValues(name, targetType).Set(float64(bytes))
|
||||
}
|
||||
|
||||
// SetNodeOnline 刷新节点在线状态。
|
||||
func (m *Metrics) SetNodeOnline(name, role string, online bool) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
val := 0.0
|
||||
if online {
|
||||
val = 1
|
||||
}
|
||||
m.NodeOnline.WithLabelValues(name, role).Set(val)
|
||||
}
|
||||
|
||||
// ResetNodeOnline 清空节点 gauge(当节点被删除时避免残留指标)。
|
||||
func (m *Metrics) ResetNodeOnline() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.NodeOnline.Reset()
|
||||
}
|
||||
|
||||
// ResetStorageUsed 清空存储目标 gauge。
|
||||
func (m *Metrics) ResetStorageUsed() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.StorageUsedBytes.Reset()
|
||||
}
|
||||
|
||||
// SetSLABreach 刷新 SLA 违约任务数。
|
||||
func (m *Metrics) SetSLABreach(count int) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.SLABreachGauge.Set(float64(count))
|
||||
}
|
||||
76
server/internal/metrics/registry_test.go
Normal file
76
server/internal/metrics/registry_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
)
|
||||
|
||||
func TestNew_AppInfoVersionLabel(t *testing.T) {
|
||||
m := New("2.1.0")
|
||||
if got := testutil.ToFloat64(m.AppInfo.WithLabelValues("2.1.0")); got != 1 {
|
||||
t.Fatalf("app_info(version=2.1.0) expected 1, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserveTaskRun_IncrementsCounterAndHistogram(t *testing.T) {
|
||||
m := New("test")
|
||||
m.ObserveTaskRun("mysql", "success", 12.5, 1024)
|
||||
m.ObserveTaskRun("mysql", "failed", 3.0, 0)
|
||||
if got := testutil.ToFloat64(m.TaskRunTotal.WithLabelValues("success", "mysql")); got != 1 {
|
||||
t.Fatalf("task_run_total{status=success,task_type=mysql}: expected 1, got %v", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.TaskRunTotal.WithLabelValues("failed", "mysql")); got != 1 {
|
||||
t.Fatalf("task_run_total{status=failed,task_type=mysql}: expected 1, got %v", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.TaskBytesTotal.WithLabelValues("mysql")); got != 1024 {
|
||||
t.Fatalf("task_bytes_total{task_type=mysql}: expected 1024, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestObserveTaskRun_NilReceiverIsSafe(t *testing.T) {
|
||||
var m *Metrics // nil
|
||||
m.ObserveTaskRun("file", "success", 1, 1)
|
||||
m.ObserveRestore("success")
|
||||
m.ObserveVerify("failed")
|
||||
m.ObserveReplication("success")
|
||||
m.IncTaskRunning()
|
||||
m.DecTaskRunning()
|
||||
m.SetStorageUsed("a", "s3", 1)
|
||||
m.SetNodeOnline("n1", "master", true)
|
||||
m.SetSLABreach(3)
|
||||
m.ResetNodeOnline()
|
||||
m.ResetStorageUsed()
|
||||
// no panic -> pass
|
||||
}
|
||||
|
||||
func TestHandler_ExposesBackupxMetrics(t *testing.T) {
|
||||
m := New("0.0.0-test")
|
||||
m.ObserveTaskRun("file", "success", 1.0, 2048)
|
||||
m.SetNodeOnline("n1", "master", true)
|
||||
m.SetSLABreach(1)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||
m.Handler().ServeHTTP(recorder, req)
|
||||
|
||||
body, err := io.ReadAll(recorder.Result().Body)
|
||||
if err != nil {
|
||||
t.Fatalf("read body: %v", err)
|
||||
}
|
||||
content := string(body)
|
||||
for _, keyword := range []string{
|
||||
"backupx_task_run_total",
|
||||
"backupx_task_run_duration_seconds",
|
||||
"backupx_node_online",
|
||||
"backupx_sla_breach_tasks",
|
||||
"backupx_app_info",
|
||||
} {
|
||||
if !strings.Contains(content, keyword) {
|
||||
t.Errorf("expected /metrics to contain %q", keyword)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,18 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/apperror"
|
||||
"backupx/server/internal/model"
|
||||
@@ -25,10 +34,39 @@ type AuditEntry struct {
|
||||
|
||||
type AuditService struct {
|
||||
repo repository.AuditLogRepository
|
||||
|
||||
// webhook 外输配置(可选)
|
||||
webhookMu sync.RWMutex
|
||||
webhookURL string
|
||||
webhookSecret string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func NewAuditService(repo repository.AuditLogRepository) *AuditService {
|
||||
return &AuditService{repo: repo}
|
||||
return &AuditService{
|
||||
repo: repo,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 3 * time.Second, // 短超时:审计 webhook 不应拖慢业务
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// SetWebhook 动态配置审计事件转发 URL 与签名密钥。
|
||||
// - url 为空字符串时禁用转发
|
||||
// - secret 非空时对 payload 计算 HMAC-SHA256,作为 X-BackupX-Signature header
|
||||
//
|
||||
// 适用场景:
|
||||
// - 企业 SIEM 集成(Splunk HEC、ELK、Loki)
|
||||
// - 安全审计留痕到第三方 WORM 存储
|
||||
// - 合规日志归档(GDPR / SOC2)
|
||||
func (s *AuditService) SetWebhook(url, secret string) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.webhookMu.Lock()
|
||||
defer s.webhookMu.Unlock()
|
||||
s.webhookURL = strings.TrimSpace(url)
|
||||
s.webhookSecret = strings.TrimSpace(secret)
|
||||
}
|
||||
|
||||
// Record 异步 fire-and-forget 写入审计日志,不阻塞业务逻辑
|
||||
@@ -51,9 +89,65 @@ func (s *AuditService) Record(entry AuditEntry) {
|
||||
if err := s.repo.Create(context.Background(), record); err != nil {
|
||||
log.Printf("[audit] failed to write audit log: %v", err)
|
||||
}
|
||||
s.fireWebhook(record)
|
||||
}()
|
||||
}
|
||||
|
||||
// fireWebhook 异步向外部系统转发审计事件。失败降级到本地日志,永不影响主流程。
|
||||
func (s *AuditService) fireWebhook(record *model.AuditLog) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.webhookMu.RLock()
|
||||
url := s.webhookURL
|
||||
secret := s.webhookSecret
|
||||
s.webhookMu.RUnlock()
|
||||
if url == "" {
|
||||
return
|
||||
}
|
||||
payload := map[string]any{
|
||||
"eventType": "audit.log",
|
||||
"occurredAt": record.CreatedAt.UTC().Format(time.RFC3339),
|
||||
"actor": map[string]any{
|
||||
"userId": record.UserID,
|
||||
"username": record.Username,
|
||||
},
|
||||
"category": record.Category,
|
||||
"action": record.Action,
|
||||
"targetType": record.TargetType,
|
||||
"targetId": record.TargetID,
|
||||
"targetName": record.TargetName,
|
||||
"detail": record.Detail,
|
||||
"clientIp": record.ClientIP,
|
||||
}
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
log.Printf("[audit] webhook marshal failed: %v", err)
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
log.Printf("[audit] webhook build request failed: %v", err)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "BackupX-Audit/1.0")
|
||||
if secret != "" {
|
||||
mac := hmac.New(sha256.New, []byte(secret))
|
||||
mac.Write(body)
|
||||
req.Header.Set("X-BackupX-Signature", "sha256="+hex.EncodeToString(mac.Sum(nil)))
|
||||
}
|
||||
resp, err := s.httpClient.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("[audit] webhook POST failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
log.Printf("[audit] webhook returned status %d", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// List 分页查询审计日志
|
||||
func (s *AuditService) List(ctx context.Context, category string, limit, offset int) (*repository.AuditLogListResult, error) {
|
||||
result, err := s.repo.List(ctx, repository.AuditLogListOptions{
|
||||
|
||||
129
server/internal/service/audit_service_webhook_test.go
Normal file
129
server/internal/service/audit_service_webhook_test.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/model"
|
||||
"backupx/server/internal/repository"
|
||||
)
|
||||
|
||||
// fakeAuditRepo 用通道同步等待异步写入,避免 sleep。
|
||||
type fakeAuditRepo struct {
|
||||
mu sync.Mutex
|
||||
logs []model.AuditLog
|
||||
created chan struct{}
|
||||
}
|
||||
|
||||
func newFakeAuditRepo() *fakeAuditRepo {
|
||||
return &fakeAuditRepo{created: make(chan struct{}, 4)}
|
||||
}
|
||||
|
||||
func (r *fakeAuditRepo) Create(_ context.Context, log *model.AuditLog) error {
|
||||
r.mu.Lock()
|
||||
log.CreatedAt = time.Now().UTC()
|
||||
r.logs = append(r.logs, *log)
|
||||
r.mu.Unlock()
|
||||
r.created <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *fakeAuditRepo) List(context.Context, repository.AuditLogListOptions) (*repository.AuditLogListResult, error) {
|
||||
return &repository.AuditLogListResult{}, nil
|
||||
}
|
||||
|
||||
func (r *fakeAuditRepo) ListAll(context.Context, repository.AuditLogListOptions) ([]model.AuditLog, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestAuditService_WebhookDeliversSignedPayload(t *testing.T) {
|
||||
var hits atomic.Int32
|
||||
var got struct {
|
||||
sig string
|
||||
payload map[string]any
|
||||
received chan struct{}
|
||||
}
|
||||
got.received = make(chan struct{}, 1)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hits.Add(1)
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
got.sig = r.Header.Get("X-BackupX-Signature")
|
||||
_ = json.Unmarshal(body, &got.payload)
|
||||
|
||||
// 验证 HMAC 正确
|
||||
mac := hmac.New(sha256.New, []byte("s3cret"))
|
||||
mac.Write(body)
|
||||
expected := "sha256=" + hex.EncodeToString(mac.Sum(nil))
|
||||
if got.sig != expected {
|
||||
t.Errorf("signature mismatch: expected %s, got %s", expected, got.sig)
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
got.received <- struct{}{}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
repo := newFakeAuditRepo()
|
||||
svc := NewAuditService(repo)
|
||||
svc.SetWebhook(server.URL, "s3cret")
|
||||
|
||||
svc.Record(AuditEntry{
|
||||
Username: "alice",
|
||||
Category: "auth",
|
||||
Action: "login_success",
|
||||
ClientIP: "10.0.0.1",
|
||||
Detail: "admin login",
|
||||
})
|
||||
|
||||
// 等待异步写入 + webhook
|
||||
select {
|
||||
case <-repo.created:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("audit log not written within 1s")
|
||||
}
|
||||
select {
|
||||
case <-got.received:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("webhook not invoked within 1s")
|
||||
}
|
||||
|
||||
if hits.Load() != 1 {
|
||||
t.Fatalf("expected 1 webhook hit, got %d", hits.Load())
|
||||
}
|
||||
if got.payload["eventType"] != "audit.log" {
|
||||
t.Errorf("eventType wrong: %v", got.payload["eventType"])
|
||||
}
|
||||
actor, ok := got.payload["actor"].(map[string]any)
|
||||
if !ok || actor["username"] != "alice" {
|
||||
t.Errorf("actor.username mismatch: %v", got.payload["actor"])
|
||||
}
|
||||
if got.payload["action"] != "login_success" {
|
||||
t.Errorf("action mismatch: %v", got.payload["action"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuditService_WebhookDisabledWhenURLEmpty(t *testing.T) {
|
||||
repo := newFakeAuditRepo()
|
||||
svc := NewAuditService(repo)
|
||||
// 不调用 SetWebhook:应该不发送任何请求
|
||||
svc.Record(AuditEntry{Username: "bob", Action: "logout"})
|
||||
|
||||
select {
|
||||
case <-repo.created:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("audit log not written within 1s")
|
||||
}
|
||||
// 给 webhook 一些时间(即便它不会被调用)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// 无显式断言:能不 panic 即算通过
|
||||
}
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"backupx/server/internal/apperror"
|
||||
"backupx/server/internal/backup"
|
||||
backupretention "backupx/server/internal/backup/retention"
|
||||
"backupx/server/internal/metrics"
|
||||
"backupx/server/internal/model"
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/storage"
|
||||
@@ -92,8 +93,14 @@ type BackupExecutionService struct {
|
||||
// nodeSemaphores 节点级并发限制(按 NodeID 映射)。
|
||||
// 没命中的 NodeID 走全局 semaphore,节点配置 MaxConcurrent>0 时按该节点独立排队。
|
||||
nodeSemaphores sync.Map
|
||||
retries int // rclone 底层重试次数
|
||||
bandwidthLimit string // rclone 带宽限制
|
||||
retries int // rclone 底层重试次数
|
||||
bandwidthLimit string // rclone 带宽限制(全局默认,节点配置可覆盖)
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// SetMetrics 注入 Prometheus 采集器。nil 时所有埋点退化为 no-op。
|
||||
func (s *BackupExecutionService) SetMetrics(m *metrics.Metrics) {
|
||||
s.metrics = m
|
||||
}
|
||||
|
||||
// ReplicationTrigger 抽象备份成功后的副本派发(实现者:ReplicationService)。
|
||||
@@ -407,6 +414,22 @@ func (s *BackupExecutionService) shouldNotify(ctx context.Context, task *model.B
|
||||
return true
|
||||
}
|
||||
|
||||
// effectiveBandwidth 返回当前上下文应用的带宽限速字符串。
|
||||
// 优先级:Node.BandwidthLimit(非空) > 全局 s.bandwidthLimit。
|
||||
func (s *BackupExecutionService) effectiveBandwidth(ctx context.Context, nodeID uint) string {
|
||||
if nodeID == 0 || s.nodeRepo == nil {
|
||||
return s.bandwidthLimit
|
||||
}
|
||||
node, err := s.nodeRepo.FindByID(ctx, nodeID)
|
||||
if err != nil || node == nil {
|
||||
return s.bandwidthLimit
|
||||
}
|
||||
if strings.TrimSpace(node.BandwidthLimit) != "" {
|
||||
return node.BandwidthLimit
|
||||
}
|
||||
return s.bandwidthLimit
|
||||
}
|
||||
|
||||
// acquireNodeSemaphore 返回节点级并发通道。懒初始化:第一次为某节点排队时创建。
|
||||
// 如果节点未配置 MaxConcurrent 或 nodeRepo 未注入,返回 nil(调用方走全局 semaphore)。
|
||||
// 节点容量仅在首次创建时采用,后续变更需重启服务才生效(避免运行时 resize 通道的复杂度)。
|
||||
@@ -456,6 +479,10 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
||||
s.semaphore <- struct{}{}
|
||||
defer func() { <-s.semaphore }()
|
||||
|
||||
// Prometheus: running gauge + 完成时 observe 耗时/字节/状态
|
||||
s.metrics.IncTaskRunning()
|
||||
defer s.metrics.DecTaskRunning()
|
||||
|
||||
logger := backup.NewExecutionLogger(recordID, s.logHub)
|
||||
status := "failed"
|
||||
errMessage := ""
|
||||
@@ -468,6 +495,8 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
||||
if finalizeErr := s.finalizeRecord(ctx, task, recordID, startedAt, status, errMessage, logger.String(), fileName, fileSize, checksum, storagePath); finalizeErr != nil {
|
||||
logger.Errorf("写回备份记录失败:%v", finalizeErr)
|
||||
}
|
||||
// 采集任务执行结果到 Prometheus(耗时 + 产出字节 + 状态计数)
|
||||
s.metrics.ObserveTaskRun(task.Type, status, time.Since(startedAt).Seconds(), fileSize)
|
||||
// 写入多目标上传结果
|
||||
if len(uploadResults) > 0 {
|
||||
if resultsJSON, marshalErr := json.Marshal(uploadResults); marshalErr == nil {
|
||||
@@ -559,7 +588,8 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
||||
if findErr == nil && target != nil {
|
||||
targetName = target.Name
|
||||
}
|
||||
provider, resolveErr := s.resolveProvider(ctx, targetID)
|
||||
// 节点级带宽覆盖:若 task 绑定节点并配置了 BandwidthLimit,覆盖全局限速
|
||||
provider, resolveErr := s.resolveProviderForNode(ctx, targetID, task.NodeID)
|
||||
if resolveErr != nil {
|
||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: resolveErr.Error()}
|
||||
logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr)
|
||||
@@ -742,10 +772,17 @@ func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model
|
||||
}
|
||||
|
||||
func (s *BackupExecutionService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) {
|
||||
// 注入 rclone 传输配置(重试、带宽限制)
|
||||
return s.resolveProviderForNode(ctx, targetID, 0)
|
||||
}
|
||||
|
||||
// resolveProviderForNode 根据节点的 BandwidthLimit 覆盖全局默认。
|
||||
// nodeID=0 或节点未配置时退化为全局默认。
|
||||
// 仅在 Master 本地执行生效;Agent 会收到自身 Node 配置,并在独立 runtime 中应用。
|
||||
func (s *BackupExecutionService) resolveProviderForNode(ctx context.Context, targetID uint, nodeID uint) (storage.StorageProvider, error) {
|
||||
// 注入 rclone 传输配置(重试、节点级带宽覆盖全局)
|
||||
ctx = rclone.ConfiguredContext(ctx, rclone.TransferConfig{
|
||||
LowLevelRetries: s.retries,
|
||||
BandwidthLimit: s.bandwidthLimit,
|
||||
BandwidthLimit: s.effectiveBandwidth(ctx, nodeID),
|
||||
})
|
||||
target, err := s.targets.FindByID(ctx, targetID)
|
||||
if err != nil {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/apperror"
|
||||
"backupx/server/internal/metrics"
|
||||
"backupx/server/internal/model"
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/storage"
|
||||
@@ -37,6 +38,12 @@ type ReplicationService struct {
|
||||
semaphore chan struct{}
|
||||
async func(func())
|
||||
now func() time.Time
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// SetMetrics 注入 Prometheus 采集器。
|
||||
func (s *ReplicationService) SetMetrics(m *metrics.Metrics) {
|
||||
s.metrics = m
|
||||
}
|
||||
|
||||
func NewReplicationService(
|
||||
@@ -193,6 +200,7 @@ func (s *ReplicationService) executeReplication(ctx context.Context, repID uint)
|
||||
rep.DurationSeconds = int(completedAt.Sub(rep.StartedAt).Seconds())
|
||||
rep.CompletedAt = &completedAt
|
||||
_ = s.replications.Update(ctx, rep)
|
||||
s.metrics.ObserveReplication(status)
|
||||
if status == model.ReplicationStatusFailed {
|
||||
s.dispatchFailed(ctx, rep, errMessage)
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"backupx/server/internal/apperror"
|
||||
"backupx/server/internal/backup"
|
||||
"backupx/server/internal/metrics"
|
||||
"backupx/server/internal/model"
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/storage"
|
||||
@@ -41,6 +42,12 @@ type RestoreService struct {
|
||||
semaphore chan struct{}
|
||||
async func(func())
|
||||
now func() time.Time
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// SetMetrics 注入 Prometheus 采集器。
|
||||
func (s *RestoreService) SetMetrics(m *metrics.Metrics) {
|
||||
s.metrics = m
|
||||
}
|
||||
|
||||
// NewRestoreService 构造恢复服务。maxConcurrent 控制本地并发恢复数。
|
||||
@@ -432,6 +439,7 @@ func (s *RestoreService) finalizeWithLog(ctx context.Context, restoreID uint, st
|
||||
}
|
||||
record.DurationSeconds = int(completedAt.Sub(record.StartedAt).Seconds())
|
||||
record.CompletedAt = &completedAt
|
||||
s.metrics.ObserveRestore(status)
|
||||
return s.restores.Update(ctx, record)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,21 +8,55 @@ import (
|
||||
"backupx/server/internal/repository"
|
||||
)
|
||||
|
||||
// AuditWebhookConfigurer 抽象审计 webhook 配置接口,由 AuditService 实现。
|
||||
// 用接口解耦避免 settings_service 直接依赖 AuditService 具体类型。
|
||||
type AuditWebhookConfigurer interface {
|
||||
SetWebhook(url, secret string)
|
||||
}
|
||||
|
||||
type SettingsService struct {
|
||||
configs repository.SystemConfigRepository
|
||||
configs repository.SystemConfigRepository
|
||||
auditWebhook AuditWebhookConfigurer
|
||||
}
|
||||
|
||||
func NewSettingsService(configs repository.SystemConfigRepository) *SettingsService {
|
||||
return &SettingsService{configs: configs}
|
||||
}
|
||||
|
||||
// settingsKeys lists all user-editable setting keys.
|
||||
// SetAuditWebhookConfigurer 注入 audit webhook 配置接收方。
|
||||
// 启动时立即用当前 DB 中的设置调用一次,后续每次 Update 变更 webhook key 时同步推送。
|
||||
func (s *SettingsService) SetAuditWebhookConfigurer(ctx context.Context, configurer AuditWebhookConfigurer) {
|
||||
if s == nil || configurer == nil {
|
||||
return
|
||||
}
|
||||
s.auditWebhook = configurer
|
||||
// 启动时同步一次,保证重启后配置不丢失
|
||||
all, err := s.GetAll(ctx)
|
||||
if err == nil {
|
||||
configurer.SetWebhook(all[SettingKeyAuditWebhookURL], all[SettingKeyAuditWebhookSecret])
|
||||
}
|
||||
}
|
||||
|
||||
// 可被前端写入的系统设置键。新增键必须同步加入此清单,
|
||||
// 否则 Update 会忽略(安全原则:显式 allow-list)。
|
||||
const (
|
||||
SettingKeySiteName = "site_name"
|
||||
SettingKeyLanguage = "language"
|
||||
SettingKeyTimezone = "timezone"
|
||||
SettingKeyBackupNotificationEnabled = "backup_notification_enabled"
|
||||
SettingKeyBandwidthLimit = "bandwidth_limit"
|
||||
SettingKeyAuditWebhookURL = "audit_webhook_url"
|
||||
SettingKeyAuditWebhookSecret = "audit_webhook_secret"
|
||||
)
|
||||
|
||||
var settingsKeys = []string{
|
||||
"site_name",
|
||||
"language",
|
||||
"timezone",
|
||||
"backup_notification_enabled",
|
||||
"bandwidth_limit",
|
||||
SettingKeySiteName,
|
||||
SettingKeyLanguage,
|
||||
SettingKeyTimezone,
|
||||
SettingKeyBackupNotificationEnabled,
|
||||
SettingKeyBandwidthLimit,
|
||||
SettingKeyAuditWebhookURL,
|
||||
SettingKeyAuditWebhookSecret,
|
||||
}
|
||||
|
||||
func (s *SettingsService) GetAll(ctx context.Context) (map[string]string, error) {
|
||||
@@ -42,6 +76,7 @@ func (s *SettingsService) Update(ctx context.Context, settings map[string]string
|
||||
for _, key := range settingsKeys {
|
||||
allowed[key] = true
|
||||
}
|
||||
auditWebhookTouched := false
|
||||
for key, value := range settings {
|
||||
if !allowed[key] {
|
||||
continue
|
||||
@@ -50,6 +85,14 @@ func (s *SettingsService) Update(ctx context.Context, settings map[string]string
|
||||
if err := s.configs.Upsert(ctx, item); err != nil {
|
||||
return nil, apperror.Internal("SETTINGS_UPDATE_FAILED", "无法更新系统设置", err)
|
||||
}
|
||||
if key == SettingKeyAuditWebhookURL || key == SettingKeyAuditWebhookSecret {
|
||||
auditWebhookTouched = true
|
||||
}
|
||||
}
|
||||
// audit webhook 配置变化:立即同步到 AuditService,避免重启才生效
|
||||
if auditWebhookTouched && s.auditWebhook != nil {
|
||||
all, _ := s.GetAll(ctx)
|
||||
s.auditWebhook.SetWebhook(all[SettingKeyAuditWebhookURL], all[SettingKeyAuditWebhookSecret])
|
||||
}
|
||||
return s.GetAll(ctx)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"backupx/server/internal/apperror"
|
||||
"backupx/server/internal/backup"
|
||||
"backupx/server/internal/metrics"
|
||||
"backupx/server/internal/model"
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/storage"
|
||||
@@ -42,6 +43,12 @@ type VerificationService struct {
|
||||
semaphore chan struct{}
|
||||
async func(func())
|
||||
now func() time.Time
|
||||
metrics *metrics.Metrics
|
||||
}
|
||||
|
||||
// SetMetrics 注入 Prometheus 采集器。
|
||||
func (s *VerificationService) SetMetrics(m *metrics.Metrics) {
|
||||
s.metrics = m
|
||||
}
|
||||
|
||||
// VerificationNotifier 给用户推送验证完成/失败通知。
|
||||
@@ -413,6 +420,7 @@ func (s *VerificationService) finalize(ctx context.Context, verID uint, status,
|
||||
}
|
||||
record.DurationSeconds = int(completedAt.Sub(record.StartedAt).Seconds())
|
||||
record.CompletedAt = &completedAt
|
||||
s.metrics.ObserveVerify(status)
|
||||
return s.verifications.Update(ctx, record)
|
||||
}
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ export function AgentInstallWizard({ visible, onClose, onSuccess, masterVersion,
|
||||
const rows: BatchCommandRow[] = tokens.map(({ c, tok }) => ({
|
||||
nodeId: c.id,
|
||||
nodeName: c.name,
|
||||
command: `curl -fsSL ${tok.url} | sudo sh`,
|
||||
command: `curl -fsSL ${tok.url} | sudo bash`,
|
||||
expiresAt: tok.expiresAt,
|
||||
}))
|
||||
if (mountedRef.current) setBatchRows(rows)
|
||||
|
||||
@@ -29,7 +29,11 @@ export function Step3CommandPreview({ nodeId, nodeName, token, mode, previewPara
|
||||
}, [token.expiresAt])
|
||||
|
||||
const expired = remaining === 0
|
||||
const command = `curl -fsSL ${token.url} | sudo sh`
|
||||
// 使用 bash 管道执行:避开 Debian/Ubuntu 默认 /bin/sh=dash 的差异,
|
||||
// 同时让反向代理 / CDN 不再按 "sh" 的脚本类型做内容识别(issue #46)。
|
||||
const command = `curl -fsSL ${token.url} | sudo bash`
|
||||
// 备用命令:若当前机器无 bash,或中间代理过滤了管道响应,可先落盘再执行。
|
||||
const fallbackCommand = `curl -fsSL ${token.url} -o /tmp/bx-agent-install.sh && sudo sh /tmp/bx-agent-install.sh`
|
||||
const dockerComposeCmd = mode === 'docker' && token.composeUrl
|
||||
? `curl -fsSL ${token.composeUrl} -o docker-compose.yml && docker-compose up -d`
|
||||
: null
|
||||
@@ -76,6 +80,21 @@ export function Step3CommandPreview({ nodeId, nodeName, token, mode, previewPara
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div style={{ background: 'var(--color-fill-2)', padding: '12px 14px', borderRadius: 6, marginBottom: 12 }}>
|
||||
<Text type="secondary" style={{ fontSize: 12, display: 'block', marginBottom: 4 }}>
|
||||
或先下载再执行(当目标机无 bash / 反向代理过滤管道响应时):
|
||||
</Text>
|
||||
<Text style={{
|
||||
fontFamily: 'monospace', fontSize: 13, wordBreak: 'break-all',
|
||||
opacity: expired ? 0.4 : 1, userSelect: 'all',
|
||||
}}>
|
||||
{fallbackCommand}
|
||||
</Text>
|
||||
<div style={{ marginTop: 8 }}>
|
||||
<Button size="small" icon={<IconCopy />} disabled={expired} onClick={() => copy(fallbackCommand)}>复制</Button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{dockerComposeCmd && (
|
||||
<div style={{ background: 'var(--color-fill-2)', padding: '12px 14px', borderRadius: 6, marginBottom: 12 }}>
|
||||
<Text type="secondary" style={{ fontSize: 12, display: 'block', marginBottom: 4 }}>
|
||||
|
||||
Reference in New Issue
Block a user