From eff48342c8af9cfd16c6ca0c8c764baee0594c88 Mon Sep 17 00:00:00 2001 From: Wu Qing <3184394176@qq.com> Date: Tue, 21 Apr 2026 14:05:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD:=20v2.2=20=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=B1=A0=E8=B0=83=E5=BA=A6=20+=20Grafana=20Dashboard=20+=20?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E6=BC=82=E7=A7=BB=20UI=20(#49)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 节点池动态调度(企业集群核心需求): - model.Node 新增 Labels CSV;Node.HasLabel / LabelSet 辅助方法 - model.BackupTask 新增 NodePoolTag;与 NodeID 互斥(校验层拒绝同时设置) - BackupExecutionService.selectPoolNode:匹配标签的在线节点中选"运行中任务最少" 并列按 ID 升序稳定;空池返回 NODE_POOL_EMPTY 让用户立即感知 - 选中节点仅写 BackupRecord,不回写 task.NodeID —— 每次执行重选实现真轮转均衡 Grafana Dashboard(v2.1 指标的可视化闭环): - deploy/grafana/backupx-dashboard.json:11 个面板覆盖概览/时序/容量/集群 - deploy/grafana/README.md:Prometheus 抓取配置 + 告警建议 - release workflow 打包 grafana/ + nginx.conf 到 tar.gz 前端: - 节点列表:Agent 版本 vs Master 不一致时橙红 Tag + Tooltip 提示升级 - 节点列表新增"标签/节点池"列,支持 CSV 编辑 + 并发/带宽一起改 - 任务表单新增 NodePoolTag 输入框,与节点选择器互斥禁用 测试: - model/node_label_test.go:HasLabel / LabelSet / nil 安全 - service/node_pool_scheduler_test.go:负载最低优先 / 空池错误 / nil repo 降级 - go test ./... + npm run build 全绿 --- .github/workflows/release.yml | 5 + deploy/grafana/README.md | 34 +++ deploy/grafana/backupx-dashboard.json | 193 ++++++++++++++++++ .../2026-04-21-node-pool-grafana-design.md | 85 ++++++++ server/internal/model/backup_task.go | 4 + server/internal/model/node.go | 43 +++- server/internal/model/node_label_test.go | 47 +++++ .../service/backup_execution_service.go | 73 ++++++- .../internal/service/backup_task_service.go | 12 +- .../service/node_pool_scheduler_test.go | 83 ++++++++ server/internal/service/node_service.go | 25 +++ .../backup-tasks/BackupTaskFormDrawer.tsx | 20 +- web/src/pages/nodes/NodesPage.tsx | 75 ++++++- web/src/services/nodes.ts | 9 +- web/src/types/backup-tasks.ts | 4 + web/src/types/nodes.ts | 4 + 16 files changed, 701 insertions(+), 15 deletions(-) create mode 100644 deploy/grafana/README.md create mode 100644 deploy/grafana/backupx-dashboard.json create mode 100644 docs/superpowers/specs/2026-04-21-node-pool-grafana-design.md create mode 100644 server/internal/model/node_label_test.go create mode 100644 server/internal/service/node_pool_scheduler_test.go diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8a27443..f3e4fcc 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -110,6 +110,11 @@ jobs: cp -r web/dist "${ARCHIVE_NAME}/web" cp server/config.example.yaml "${ARCHIVE_NAME}/" cp deploy/install.sh "${ARCHIVE_NAME}/" 2>/dev/null || true + # v2.2+: 随发布包提供 Grafana dashboard 与 nginx.conf 模板 + if [ -d deploy/grafana ]; then + cp -r deploy/grafana "${ARCHIVE_NAME}/grafana" + fi + cp deploy/nginx.conf "${ARCHIVE_NAME}/nginx.conf" 2>/dev/null || true tar czf "${ARCHIVE_NAME}.tar.gz" "${ARCHIVE_NAME}" - name: Upload to GitHub Release diff --git a/deploy/grafana/README.md b/deploy/grafana/README.md new file mode 100644 index 0000000..4127a94 --- /dev/null +++ b/deploy/grafana/README.md @@ -0,0 +1,34 @@ +# BackupX Grafana Dashboard + +对接 BackupX v2.1+ 暴露的 Prometheus `/metrics` 端点。 + +## 导入步骤 + +1. 在 Grafana 配置 Prometheus 数据源指向你的 Prometheus(例如 `http://prometheus:9090`) +2. 在 Prometheus 配置抓取 BackupX: + +```yaml +scrape_configs: + - job_name: 'backupx' + scrape_interval: 30s + static_configs: + - targets: ['backupx-master:8340'] +``` + +3. Grafana → Dashboards → Import → 上传 `backupx-dashboard.json` → 选 Prometheus 数据源 → Import + +## 面板内容 + +- 当前运行任务数 / SLA 违约数 / 在线节点 / 24h 成功率 / 应用版本 +- 任务执行速率(按 success/failed 堆叠) +- 任务耗时 P50/P95/P99(按任务类型) +- 任务产出字节速率 +- 存储目标用量 TopN 柱状图 +- 节点在线状态表(红/绿标色) +- 验证 / 恢复 / 复制的成功率时间线 + +## 自定义建议 + +- 将 `backupx_sla_breach_tasks > 0` 配为 AlertManager 告警 +- `sum(backupx_node_online) < N` 触发集群容量告警(N 为你集群的最少节点数) +- P99 任务耗时突变可用于发现慢任务和资源压力 diff --git a/deploy/grafana/backupx-dashboard.json b/deploy/grafana/backupx-dashboard.json new file mode 100644 index 0000000..5f5c6fe --- /dev/null +++ b/deploy/grafana/backupx-dashboard.json @@ -0,0 +1,193 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": {"type": "grafana", "uid": "-- Grafana --"}, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "description": "BackupX v2.1+ 核心指标面板。对接 /metrics 端点,抓取周期建议 30s(与服务端 Gauge collector 同步)。", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [ + { + "title": "BackupX 文档", + "url": "https://awuqing.github.io/BackupX/", + "type": "link", + "targetBlank": true + } + ], + "liveNow": false, + "panels": [ + { + "type": "stat", + "title": "正在运行的任务", + "gridPos": {"h": 4, "w": 4, "x": 0, "y": 0}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "backupx_task_running", "refId": "A"}], + "fieldConfig": { + "defaults": { + "unit": "short", + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "yellow", "value": 5}]} + } + }, + "options": {"colorMode": "value", "graphMode": "area", "textMode": "auto"} + }, + { + "type": "stat", + "title": "SLA 违约任务数", + "gridPos": {"h": 4, "w": 4, "x": 4, "y": 0}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "backupx_sla_breach_tasks", "refId": "A"}], + "fieldConfig": { + "defaults": { + "unit": "short", + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "red", "value": 1}]} + } + } + }, + { + "type": "stat", + "title": "在线节点", + "gridPos": {"h": 4, "w": 4, "x": 8, "y": 0}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "sum(backupx_node_online)", "refId": "A"}], + "fieldConfig": { + "defaults": {"unit": "short", "color": {"mode": "thresholds"}, "thresholds": {"steps": [{"color": "red", "value": null}, {"color": "green", "value": 1}]}} + } + }, + { + "type": "stat", + "title": "24h 任务成功率", + "gridPos": {"h": 4, "w": 6, "x": 12, "y": 0}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{ + "expr": "sum(rate(backupx_task_run_total{status=\"success\"}[24h])) / sum(rate(backupx_task_run_total[24h])) * 100", + "refId": "A" + }], + "fieldConfig": { + "defaults": { + "unit": "percent", "decimals": 2, + "thresholds": {"mode": "absolute", "steps": [{"color": "red", "value": null}, {"color": "yellow", "value": 95}, {"color": "green", "value": 99}]} + } + } + }, + { + "type": "stat", + "title": "应用版本", + "gridPos": {"h": 4, "w": 6, "x": 18, "y": 0}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "backupx_app_info", "refId": "A", "format": "table", "instant": true}], + "options": {"textMode": "value_and_name", "reduceOptions": {"calcs": ["last"], "fields": "/^version$/"}} + }, + { + "type": "timeseries", + "title": "任务执行速率(按状态)", + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 4}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{ + "expr": "sum by (status) (rate(backupx_task_run_total[5m]))", + "refId": "A", + "legendFormat": "{{status}}" + }], + "fieldConfig": { + "defaults": { + "unit": "ops", + "custom": {"drawStyle": "line", "lineInterpolation": "smooth", "fillOpacity": 10, "stacking": {"mode": "normal"}} + }, + "overrides": [ + {"matcher": {"id": "byName", "options": "success"}, "properties": [{"id": "color", "value": {"mode": "fixed", "fixedColor": "green"}}]}, + {"matcher": {"id": "byName", "options": "failed"}, "properties": [{"id": "color", "value": {"mode": "fixed", "fixedColor": "red"}}]} + ] + } + }, + { + "type": "timeseries", + "title": "任务耗时 P50 / P95 / P99", + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 4}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [ + {"expr": "histogram_quantile(0.50, sum(rate(backupx_task_run_duration_seconds_bucket[10m])) by (le, task_type))", "refId": "A", "legendFormat": "P50 {{task_type}}"}, + {"expr": "histogram_quantile(0.95, sum(rate(backupx_task_run_duration_seconds_bucket[10m])) by (le, task_type))", "refId": "B", "legendFormat": "P95 {{task_type}}"}, + {"expr": "histogram_quantile(0.99, sum(rate(backupx_task_run_duration_seconds_bucket[10m])) by (le, task_type))", "refId": "C", "legendFormat": "P99 {{task_type}}"} + ], + "fieldConfig": {"defaults": {"unit": "s"}} + }, + { + "type": "timeseries", + "title": "任务产出字节速率", + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 12}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "sum by (task_type) (rate(backupx_task_bytes_total[5m]))", "refId": "A", "legendFormat": "{{task_type}}"}], + "fieldConfig": {"defaults": {"unit": "Bps"}} + }, + { + "type": "bargauge", + "title": "存储目标用量 TopN", + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 12}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "topk(10, backupx_storage_used_bytes)", "refId": "A", "legendFormat": "{{target_name}} ({{target_type}})"}], + "fieldConfig": {"defaults": {"unit": "bytes"}}, + "options": {"orientation": "horizontal", "displayMode": "gradient"} + }, + { + "type": "table", + "title": "节点在线状态", + "gridPos": {"h": 8, "w": 12, "x": 0, "y": 20}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [{"expr": "backupx_node_online", "refId": "A", "format": "table", "instant": true}], + "transformations": [ + {"id": "organize", "options": {"excludeByName": {"Time": true, "__name__": true, "job": true, "instance": true}, "indexByName": {"node_name": 0, "role": 1, "Value": 2}, "renameByName": {"Value": "online"}}} + ], + "fieldConfig": { + "overrides": [{ + "matcher": {"id": "byName", "options": "online"}, + "properties": [{"id": "mappings", "value": [{"type": "value", "options": {"0": {"text": "离线", "color": "red"}, "1": {"text": "在线", "color": "green"}}}]}] + }] + } + }, + { + "type": "timeseries", + "title": "验证 / 恢复 / 复制成功率", + "gridPos": {"h": 8, "w": 12, "x": 12, "y": 20}, + "datasource": {"type": "prometheus", "uid": "${DS_PROMETHEUS}"}, + "targets": [ + {"expr": "sum by (status) (rate(backupx_verify_run_total[15m]))", "refId": "A", "legendFormat": "verify {{status}}"}, + {"expr": "sum by (status) (rate(backupx_restore_run_total[15m]))", "refId": "B", "legendFormat": "restore {{status}}"}, + {"expr": "sum by (status) (rate(backupx_replication_run_total[15m]))", "refId": "C", "legendFormat": "replication {{status}}"} + ], + "fieldConfig": {"defaults": {"unit": "ops"}} + } + ], + "refresh": "30s", + "schemaVersion": 39, + "tags": ["backupx", "backup", "sre"], + "templating": { + "list": [ + { + "current": {"selected": false, "text": "Prometheus", "value": "Prometheus"}, + "label": "Datasource", + "name": "DS_PROMETHEUS", + "query": "prometheus", + "refresh": 1, + "regex": "", + "type": "datasource" + } + ] + }, + "time": {"from": "now-6h", "to": "now"}, + "timepicker": {}, + "timezone": "", + "title": "BackupX Overview", + "uid": "backupx-overview", + "version": 1, + "weekStart": "" +} diff --git a/docs/superpowers/specs/2026-04-21-node-pool-grafana-design.md b/docs/superpowers/specs/2026-04-21-node-pool-grafana-design.md new file mode 100644 index 0000000..8d18a44 --- /dev/null +++ b/docs/superpowers/specs/2026-04-21-node-pool-grafana-design.md @@ -0,0 +1,85 @@ +# v2.2.0 节点池与可视化运维闭环 (2026-04-21) + +## 背景 + +v2.1 暴露了 Prometheus `/metrics` + 节点级带宽限速,SRE 已经拿到"看"的能力。本轮补齐"调度"和"可视化"的闭环: + +1. **节点池**:任务不再只能绑定固定节点,还可以按标签动态调度 +2. **Grafana Dashboard**:v2.1 指标从"裸数据"升级为开箱即用的运维视图 +3. **Agent 版本漂移 UI**:节点列表一眼看出哪台 Agent 落后于 Master + +## 范围 + +- `model.Node.Labels` (CSV) + `model.BackupTask.NodePoolTag` +- `BackupExecutionService.selectPoolNode()` — 标签匹配 + 当前运行任务数最少原则 +- `deploy/grafana/backupx-dashboard.json` — 11 面板对接 v2.1 指标 +- 前端节点列表显示版本漂移 + 标签/池;任务表单支持节点池输入 + +## 架构 + +### 1. 节点池调度 + +``` +task.NodeID == 0 && task.NodePoolTag != "" + ↓ +selectPoolNode(ctx, tag): + 1. nodeRepo.List() 过滤 status=online AND HasLabel(tag) + 2. 无候选 → 返回 NODE_POOL_EMPTY 错误(任务失败,用户立即感知) + 3. 按 countRunningOnNode(id) 升序选最小负载者 + 4. 并列按 ID 稳定(可预期) + ↓ +record.NodeID = chosen.ID (仅本次运行,不回写 task) +task.NodeID = chosen.ID (供后续 route/agent 路由逻辑使用) +``` + +**互斥规则**:`NodeID > 0` 与 `NodePoolTag != ""` 在 Create/Update 校验中被拒绝(`BACKUP_TASK_INVALID`)。固定节点 = 显式路由,节点池 = 动态路由,两者语义互斥。 + +**调度不回写 task**:池选出的节点 ID 仅写入 BackupRecord(审计追溯),task.NodeID 仍为 0。这样下次执行会**重新选**负载最低者,实现真正的轮转均衡。 + +### 2. Grafana Dashboard + +11 个面板,按语义分组: + +| 区域 | 面板 | +|------|------| +| 概览(4 stat) | 运行中任务数 / SLA 违约数 / 在线节点数 / 24h 成功率 / 应用版本 | +| 时序(4) | 任务执行速率(按状态堆叠)、P50/P95/P99 耗时、产出字节速率、验证/恢复/复制成功率 | +| 容量(1) | 存储目标用量 TopN 柱状图 | +| 集群(1) | 节点在线状态表(值 0/1 → 红/绿色文本映射) | + +设计要点: +- `DS_PROMETHEUS` 为 template variable,导入时让用户选数据源 +- 默认 refresh `30s`,与服务端 collector 采样周期一致 +- SLA 违约 stat 阈值 ≥1 即红色,直接可接 AlertManager + +### 3. Agent 版本漂移 UI + +``` +renderAgentVersion(agentVer, masterVer): + 空 agentVer → "-"(未上报) + agentVer == masterVer → 原样显示 + 不同 → 橙红 Tag "" + Tooltip 建议升级 +``` + +`masterVer` 通过 `/api/system/info` 已有接口获得,前端无需新增 API。 + +## 测试 + +- `model/node_label_test.go` — HasLabel / LabelSet / nil 安全 +- `service/node_pool_scheduler_test.go` — 负载最低 / 空池报错 / nil repo 降级 +- 前端 `npm run build` 通过 + +## 风险与应对 + +| 风险 | 应对 | +|------|------| +| 节点池在所有节点离线时任务失败 | 明确返回 `NODE_POOL_EMPTY`,用户立即感知并切换固定节点 | +| 运行任务数统计成瓶颈 | countRunningOnNode 走 BackupRecord.List({status:running}),规模大时可引入节点级 semaphore 计数器 | +| Labels 格式笔误(重复、空格) | `normalizeLabels` 规整 CSV 再入库;前端 Tag 渲染自动 trim | +| 版本漂移 UI 误报 dev 分支 | Master 版本取自 main.version ldflags;dev 构建显示 "dev",不会匹配任何 Agent 版本,纯显示意义 | + +## 下轮候选 + +- Agent 二进制自更新(远程下发 + 签名验证) +- 任务运行前的"节点可达性预检"(TCP/HTTP 探针) +- Grafana Loki 集成:把 backup 日志流接入 Loki,配合 Tempo 做端到端追踪 diff --git a/server/internal/model/backup_task.go b/server/internal/model/backup_task.go index daa5f14..1e77e75 100644 --- a/server/internal/model/backup_task.go +++ b/server/internal/model/backup_task.go @@ -39,6 +39,10 @@ type BackupTask struct { StorageTargets []StorageTarget `gorm:"many2many:backup_task_storage_targets" json:"storageTargets,omitempty"` NodeID uint `gorm:"column:node_id;index;default:0" json:"nodeId"` Node Node `json:"node,omitempty"` + // NodePoolTag 节点池标签(可选)。非空且 NodeID=0 时,调度器会从 Node.Labels 包含该 tag + // 的在线节点中动态挑选一台执行(按运行中任务数最少原则),失败会 best-effort 切换到下一个候选。 + // 典型场景:NodePoolTag="db" 让 MySQL 备份任务在任意标有 "db" 的数据库节点执行。 + NodePoolTag string `gorm:"column:node_pool_tag;size:64;index" json:"nodePoolTag"` Tags string `gorm:"column:tags;size:500" json:"tags"` RetentionDays int `gorm:"column:retention_days;not null;default:30" json:"retentionDays"` Compression string `gorm:"size:10;not null;default:'gzip'" json:"compression"` diff --git a/server/internal/model/node.go b/server/internal/model/node.go index 8552635..1b24a5c 100644 --- a/server/internal/model/node.go +++ b/server/internal/model/node.go @@ -1,6 +1,9 @@ package model -import "time" +import ( + "strings" + "time" +) const ( NodeStatusOnline = "online" @@ -29,8 +32,42 @@ type Node struct { // BandwidthLimit 该节点上传带宽上限(rclone 可识别格式:10M / 1G / 0=不限)。 // 对集群感知的上传场景有效(Master 本地与 Agent 运行时均会应用)。 BandwidthLimit string `gorm:"column:bandwidth_limit;size:32" json:"bandwidthLimit"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` + // Labels 节点标签(CSV,如 "prod,db-host,high-mem")。 + // 用于任务调度的节点池选择:任务配置 NodePoolTag 时,调度器会从 Labels 包含该 tag 的 + // 在线节点中自动挑选一台执行(按当前运行中任务数升序)。单节点可属多个池。 + Labels string `gorm:"column:labels;size:500" json:"labels"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +// LabelSet 把 CSV Labels 解析为 set,便于做成员判定。 +// 空白与空 token 自动忽略。 +func (n *Node) LabelSet() map[string]struct{} { + if n == nil { + return nil + } + out := make(map[string]struct{}) + for _, raw := range strings.Split(n.Labels, ",") { + label := strings.TrimSpace(raw) + if label != "" { + out[label] = struct{}{} + } + } + return out +} + +// HasLabel 判断节点是否属于指定池。nil/空 tag 返回 false。 +func (n *Node) HasLabel(tag string) bool { + tag = strings.TrimSpace(tag) + if n == nil || tag == "" { + return false + } + for _, raw := range strings.Split(n.Labels, ",") { + if strings.TrimSpace(raw) == tag { + return true + } + } + return false } func (Node) TableName() string { diff --git a/server/internal/model/node_label_test.go b/server/internal/model/node_label_test.go new file mode 100644 index 0000000..4874663 --- /dev/null +++ b/server/internal/model/node_label_test.go @@ -0,0 +1,47 @@ +package model + +import "testing" + +func TestNodeHasLabel(t *testing.T) { + cases := []struct { + labels string + tag string + want bool + }{ + {"prod,db,high-mem", "prod", true}, + {"prod,db,high-mem", "db", true}, + {"prod,db,high-mem", "backup", false}, + {" prod , db ", "db", true}, // trim 空白 + {"", "prod", false}, + {"prod", "", false}, // 空 tag 不匹配 + } + for _, c := range cases { + n := &Node{Labels: c.labels} + if got := n.HasLabel(c.tag); got != c.want { + t.Errorf("labels=%q tag=%q want %v got %v", c.labels, c.tag, c.want, got) + } + } +} + +func TestNodeLabelSet(t *testing.T) { + n := &Node{Labels: "prod, db ,,high-mem,prod"} + set := n.LabelSet() + for _, want := range []string{"prod", "db", "high-mem"} { + if _, ok := set[want]; !ok { + t.Errorf("expected label %q in set", want) + } + } + if len(set) != 3 { + t.Errorf("duplicates not deduped, got %v", set) + } +} + +func TestNilNodeHasLabelSafe(t *testing.T) { + var n *Node + if n.HasLabel("anything") { + t.Error("nil node should never match any label") + } + if s := n.LabelSet(); s != nil { + t.Errorf("nil node LabelSet should be nil, got %v", s) + } +} diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index 096f940..56c5a54 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -335,16 +335,29 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b nil) } } + // 节点池动态选择:task.NodeID=0 且 NodePoolTag 非空时,从匹配的在线节点中挑一台。 + // 选择策略:正在运行任务数最少者优先;并列时按 ID 升序稳定。 + // 选中节点仅影响本次运行(task.NodeID 不持久化改动),保证任务在池内轮转。 + resolvedNodeID := task.NodeID + if task.NodeID == 0 && strings.TrimSpace(task.NodePoolTag) != "" { + if pooled, perr := s.selectPoolNode(ctx, task.NodePoolTag); perr == nil && pooled != nil { + resolvedNodeID = pooled.ID + } else if perr != nil { + return nil, perr + } + } startedAt := s.now() // 取第一个存储目标 ID 做兼容 primaryTargetID := task.StorageTargetID if tids := collectTargetIDs(task); len(tids) > 0 { primaryTargetID = tids[0] } - record := &model.BackupRecord{TaskID: task.ID, StorageTargetID: primaryTargetID, NodeID: task.NodeID, Status: "running", StartedAt: startedAt} + record := &model.BackupRecord{TaskID: task.ID, StorageTargetID: primaryTargetID, NodeID: resolvedNodeID, Status: "running", StartedAt: startedAt} if err := s.records.Create(ctx, record); err != nil { return nil, apperror.Internal("BACKUP_RECORD_CREATE_FAILED", "无法创建备份记录", err) } + // 用池选出的节点 ID 复写 task 副本,使后续路由/执行沿用 + task.NodeID = resolvedNodeID task.LastRunAt = &startedAt task.LastStatus = "running" if err := s.tasks.Update(ctx, task); err != nil { @@ -414,6 +427,64 @@ func (s *BackupExecutionService) shouldNotify(ctx context.Context, task *model.B return true } +// selectPoolNode 从所有 Labels 包含 poolTag 的在线节点中选择"当前运行中任务最少"的一台。 +// 返回 (nil, error) 表示硬错误(仓储访问失败);(nil, nil) 表示没有匹配节点(退化走本机 Master)。 +// 本方法不修改任何持久化状态,仅做选择。 +func (s *BackupExecutionService) selectPoolNode(ctx context.Context, poolTag string) (*model.Node, error) { + if s.nodeRepo == nil { + // 没接入集群依赖时,降级为让调用方走本机 Master + return nil, nil + } + nodes, err := s.nodeRepo.List(ctx) + if err != nil { + return nil, apperror.Internal("NODE_LIST_FAILED", "无法枚举节点池", err) + } + candidates := make([]*model.Node, 0) + for i := range nodes { + n := &nodes[i] + if n.Status != model.NodeStatusOnline { + continue + } + if !n.HasLabel(poolTag) { + continue + } + candidates = append(candidates, n) + } + if len(candidates) == 0 { + return nil, apperror.BadRequest("NODE_POOL_EMPTY", + fmt.Sprintf("节点池 %q 下无在线节点,任务无法调度", poolTag), nil) + } + // 运行中记录数越少越优先。并列按 ID 升序(稳定、可预期)。 + best := candidates[0] + bestLoad := s.countRunningOnNode(ctx, best.ID) + for _, n := range candidates[1:] { + load := s.countRunningOnNode(ctx, n.ID) + if load < bestLoad || (load == bestLoad && n.ID < best.ID) { + best = n + bestLoad = load + } + } + return best, nil +} + +// countRunningOnNode 近似返回节点当前 running 记录数。失败按 0 处理(不影响功能,仅退化调度精度)。 +func (s *BackupExecutionService) countRunningOnNode(ctx context.Context, nodeID uint) int { + if s.records == nil { + return 0 + } + items, err := s.records.List(ctx, repository.BackupRecordListOptions{Status: model.BackupRecordStatusRunning}) + if err != nil { + return 0 + } + count := 0 + for i := range items { + if items[i].NodeID == nodeID { + count++ + } + } + return count +} + // effectiveBandwidth 返回当前上下文应用的带宽限速字符串。 // 优先级:Node.BandwidthLimit(非空) > 全局 s.bandwidthLimit。 func (s *BackupExecutionService) effectiveBandwidth(ctx context.Context, nodeID uint) string { diff --git a/server/internal/service/backup_task_service.go b/server/internal/service/backup_task_service.go index 4a9924e..b8bc607 100644 --- a/server/internal/service/backup_task_service.go +++ b/server/internal/service/backup_task_service.go @@ -35,7 +35,9 @@ type BackupTaskUpsertInput struct { DBPath string `json:"dbPath" binding:"max=500"` StorageTargetID uint `json:"storageTargetId"` // deprecated: 向后兼容 StorageTargetIDs []uint `json:"storageTargetIds"` // 新增:多存储目标 - NodeID uint `json:"nodeId"` // 执行节点(0 = 本机 Master) + NodeID uint `json:"nodeId"` // 执行节点(0 = 本机 Master 或节点池) + // NodePoolTag 节点池标签。NodeID=0 且本字段非空时,调度器动态从 Labels 命中的在线节点中选负载最低者。 + NodePoolTag string `json:"nodePoolTag" binding:"max=64"` Tags string `json:"tags" binding:"max=500"` // 逗号分隔标签 RetentionDays int `json:"retentionDays"` Compression string `json:"compression" binding:"omitempty,oneof=gzip none"` @@ -74,6 +76,7 @@ type BackupTaskSummary struct { StorageTargetNames []string `json:"storageTargetNames"` NodeID uint `json:"nodeId"` NodeName string `json:"nodeName,omitempty"` + NodePoolTag string `json:"nodePoolTag,omitempty"` Tags string `json:"tags"` RetentionDays int `json:"retentionDays"` Compression string `json:"compression"` @@ -494,6 +497,11 @@ func (s *BackupTaskService) validateInput(ctx context.Context, existing *model.B return apperror.BadRequest("BACKUP_TASK_INVALID", "所选执行节点不存在", nil) } } + // 节点池与固定节点互斥:固定节点已确定执行位置,不再动态调度 + if input.NodeID > 0 && strings.TrimSpace(input.NodePoolTag) != "" { + return apperror.BadRequest("BACKUP_TASK_INVALID", + "固定执行节点与节点池标签只能选其一", nil) + } if input.RetentionDays < 0 { return apperror.BadRequest("BACKUP_TASK_INVALID", "保留天数不能小于 0", nil) } @@ -648,6 +656,7 @@ func (s *BackupTaskService) buildTask(existing *model.BackupTask, input BackupTa StorageTargetID: primaryTargetID, StorageTargets: storageTargets, NodeID: input.NodeID, + NodePoolTag: strings.TrimSpace(input.NodePoolTag), Tags: strings.TrimSpace(input.Tags), RetentionDays: input.RetentionDays, Compression: compression, @@ -738,6 +747,7 @@ func toBackupTaskSummary(item *model.BackupTask) BackupTaskSummary { StorageTargetNames: targetNames, NodeID: item.NodeID, NodeName: item.Node.Name, + NodePoolTag: item.NodePoolTag, Tags: item.Tags, RetentionDays: item.RetentionDays, Compression: item.Compression, diff --git a/server/internal/service/node_pool_scheduler_test.go b/server/internal/service/node_pool_scheduler_test.go new file mode 100644 index 0000000..8ddbf3c --- /dev/null +++ b/server/internal/service/node_pool_scheduler_test.go @@ -0,0 +1,83 @@ +package service + +import ( + "context" + "errors" + "testing" + "time" + + "backupx/server/internal/apperror" + "backupx/server/internal/model" +) + +// nodeRepoStub 返回预设节点切片;仅关注 List/FindByID。 +// 其余方法返回零值,避免在调度路径被调用到。 +type nodeRepoStub struct { + nodes []model.Node +} + +func (s *nodeRepoStub) List(context.Context) ([]model.Node, error) { return s.nodes, nil } +func (s *nodeRepoStub) FindByID(_ context.Context, id uint) (*model.Node, error) { + for i := range s.nodes { + if s.nodes[i].ID == id { + return &s.nodes[i], nil + } + } + return nil, nil +} +func (s *nodeRepoStub) FindByToken(context.Context, string) (*model.Node, error) { return nil, nil } +func (s *nodeRepoStub) FindLocal(context.Context) (*model.Node, error) { return nil, nil } +func (s *nodeRepoStub) Create(context.Context, *model.Node) error { return nil } +func (s *nodeRepoStub) BatchCreate(context.Context, []*model.Node) error { return nil } +func (s *nodeRepoStub) Update(context.Context, *model.Node) error { return nil } +func (s *nodeRepoStub) Delete(context.Context, uint) error { return nil } +func (s *nodeRepoStub) MarkStaleOffline(context.Context, time.Time) (int64, error) { + return 0, nil +} + +func TestSelectPoolNode_PicksLeastLoaded(t *testing.T) { + nodes := []model.Node{ + {ID: 1, Name: "node-a", Status: model.NodeStatusOnline, Labels: "prod,db"}, + {ID: 2, Name: "node-b", Status: model.NodeStatusOnline, Labels: "prod,db"}, + {ID: 3, Name: "node-offline", Status: model.NodeStatusOffline, Labels: "prod,db"}, + {ID: 4, Name: "node-other-pool", Status: model.NodeStatusOnline, Labels: "staging"}, + } + svc := &BackupExecutionService{ + nodeRepo: &nodeRepoStub{nodes: nodes}, + records: nil, // 触发 countRunningOnNode 返回 0,节点并列时按 ID 升序 + } + chosen, err := svc.selectPoolNode(context.Background(), "db") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if chosen == nil || chosen.ID != 1 { + t.Fatalf("expected node-a (ID=1), got %#v", chosen) + } +} + +func TestSelectPoolNode_EmptyPoolReturnsError(t *testing.T) { + svc := &BackupExecutionService{ + nodeRepo: &nodeRepoStub{nodes: []model.Node{ + {ID: 1, Status: model.NodeStatusOnline, Labels: "prod"}, + }}, + } + _, err := svc.selectPoolNode(context.Background(), "missing-pool") + if err == nil { + t.Fatal("expected empty-pool error") + } + var apperr *apperror.AppError + if !errors.As(err, &apperr) || apperr.Code != "NODE_POOL_EMPTY" { + t.Errorf("expected NODE_POOL_EMPTY, got %v", err) + } +} + +func TestSelectPoolNode_NilRepoDegradesGracefully(t *testing.T) { + svc := &BackupExecutionService{} + got, err := svc.selectPoolNode(context.Background(), "any") + if err != nil { + t.Errorf("nil repo should degrade silently, got err %v", err) + } + if got != nil { + t.Errorf("nil repo should return nil node, got %v", got) + } +} diff --git a/server/internal/service/node_service.go b/server/internal/service/node_service.go index 1264221..20c36e1 100644 --- a/server/internal/service/node_service.go +++ b/server/internal/service/node_service.go @@ -34,6 +34,7 @@ type NodeSummary struct { LastSeen time.Time `json:"lastSeen"` MaxConcurrent int `json:"maxConcurrent"` BandwidthLimit string `json:"bandwidthLimit"` + Labels string `json:"labels"` CreatedAt time.Time `json:"createdAt"` } @@ -47,6 +48,8 @@ type NodeUpdateInput struct { Name string `json:"name" binding:"required"` MaxConcurrent int `json:"maxConcurrent"` BandwidthLimit string `json:"bandwidthLimit" binding:"max=32"` + // Labels CSV;同时作为调度器的节点池标签(task.NodePoolTag 对齐的值)。 + Labels string `json:"labels" binding:"max=500"` } // NodeService manages the cluster nodes. @@ -132,6 +135,7 @@ func (s *NodeService) List(ctx context.Context) ([]NodeSummary, error) { LastSeen: n.LastSeen, MaxConcurrent: n.MaxConcurrent, BandwidthLimit: n.BandwidthLimit, + Labels: n.Labels, CreatedAt: n.CreatedAt, } } @@ -159,6 +163,7 @@ func (s *NodeService) Get(ctx context.Context, id uint) (*NodeSummary, error) { LastSeen: node.LastSeen, MaxConcurrent: node.MaxConcurrent, BandwidthLimit: node.BandwidthLimit, + Labels: node.Labels, CreatedAt: node.CreatedAt, }, nil } @@ -320,12 +325,32 @@ func (s *NodeService) Update(ctx context.Context, id uint, input NodeUpdateInput } node.MaxConcurrent = input.MaxConcurrent node.BandwidthLimit = strings.TrimSpace(input.BandwidthLimit) + node.Labels = normalizeLabels(input.Labels) if err := s.repo.Update(ctx, node); err != nil { return nil, err } return s.Get(ctx, id) } +// normalizeLabels 规整 CSV labels:去空白、去空 token、去重、保持首次出现顺序。 +// 输入 " prod, db , prod ,high-mem " → "prod,db,high-mem" +func normalizeLabels(raw string) string { + seen := make(map[string]struct{}) + out := make([]string, 0) + for _, token := range strings.Split(raw, ",") { + label := strings.TrimSpace(token) + if label == "" { + continue + } + if _, dup := seen[label]; dup { + continue + } + seen[label] = struct{}{} + out = append(out, label) + } + return strings.Join(out, ",") +} + // DirEntry represents a file or directory in a node's file system. type DirEntry struct { Name string `json:"name"` diff --git a/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx b/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx index c4e10ff..d0a2233 100644 --- a/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx +++ b/web/src/components/backup-tasks/BackupTaskFormDrawer.tsx @@ -59,6 +59,7 @@ function createEmptyDraft(storageTargets?: StorageTargetSummary[]): BackupTaskPa storageTargetId: defaultIds[0] ?? 0, storageTargetIds: defaultIds, nodeId: 0, + nodePoolTag: '', tags: '', retentionDays: 30, compression: 'gzip', @@ -127,6 +128,7 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa storageTargetId: editTargetIds[0] ?? 0, storageTargetIds: editTargetIds, nodeId: (initialValue as any).nodeId ?? 0, + nodePoolTag: (initialValue as any).nodePoolTag ?? '', tags: initialValue.tags ?? '', retentionDays: initialValue.retentionDays, compression: initialValue.compression, @@ -297,12 +299,28 @@ export function BackupTaskFormDrawer({ visible, loading, initialValue, storageTa 0} + onChange={(value) => updateDraft({ nodePoolTag: value })} + /> + + 执行节点选"本机 / 未指定"时可启用;从节点 Labels 命中此 tag 的在线节点中按当前运行任务数最少的挑选一台执行。 + +
Cron 表达式 updateDraft({ cronExpr: value })} /> diff --git a/web/src/pages/nodes/NodesPage.tsx b/web/src/pages/nodes/NodesPage.tsx index bccff3e..c4e3383 100644 --- a/web/src/pages/nodes/NodesPage.tsx +++ b/web/src/pages/nodes/NodesPage.tsx @@ -1,7 +1,7 @@ import React, { useEffect, useState, useCallback } from 'react' import { Table, Button, Space, Tag, Typography, PageHeader, Modal, Input, Message, Badge, Popconfirm, Card, - Empty, Dropdown, Menu, + Empty, Dropdown, Menu, Tooltip, InputNumber, } from '@arco-design/web-react' import { IconPlus, IconDelete, IconDesktop, IconCloudDownload, IconEdit, IconMore, @@ -25,6 +25,9 @@ export default function NodesPage() { const [editVisible, setEditVisible] = useState(false) const [editNode, setEditNode] = useState(null) const [editName, setEditName] = useState('') + const [editLabels, setEditLabels] = useState('') + const [editMaxConcurrent, setEditMaxConcurrent] = useState(0) + const [editBandwidthLimit, setEditBandwidthLimit] = useState('') const fetchNodes = useCallback(async () => { setLoading(true) @@ -63,7 +66,12 @@ export default function NodesPage() { return } try { - await updateNode(editNode.id, { name: editName.trim() }) + await updateNode(editNode.id, { + name: editName.trim(), + labels: editLabels.trim(), + maxConcurrent: editMaxConcurrent, + bandwidthLimit: editBandwidthLimit.trim(), + }) Message.success('节点更新成功') setEditVisible(false) fetchNodes() @@ -117,7 +125,18 @@ export default function NodesPage() { render: (_: string, record: NodeSummary) => record.os ? {record.os}/{record.arch} : '-', }, - { title: 'Agent 版本', dataIndex: 'agentVersion', width: 100, render: (v: string) => v || '-' }, + { + title: 'Agent 版本', dataIndex: 'agentVersion', width: 140, + render: (v: string) => renderAgentVersion(v, masterVersion), + }, + { + title: '标签 / 节点池', dataIndex: 'labels', width: 180, + render: (v: string) => { + const tags = (v || '').split(',').map(s => s.trim()).filter(Boolean) + if (tags.length === 0) return - + return {tags.map(tag => {tag})} + }, + }, { title: '最后活跃', dataIndex: 'lastSeen', width: 170, render: (v: string) => v ? new Date(v).toLocaleString('zh-CN') : '-', @@ -127,7 +146,13 @@ export default function NodesPage() { render: (_: unknown, record: NodeSummary) => (
) } + +/** + * 渲染 Agent 版本 + 与 Master 的漂移状态。 + * 空版本 → "-"(未上报) + * 与 Master 相同 → 原样显示 + * 不同(且非本机) → 红色 Tag + 提示升级 + */ +function renderAgentVersion(agentVer: string, masterVer: string | null): React.ReactNode { + if (!agentVer) return - + if (!masterVer) return agentVer + if (agentVer === masterVer) return agentVer + return ( + + {agentVer} ≠ {masterVer} + + ) +} diff --git a/web/src/services/nodes.ts b/web/src/services/nodes.ts index aa7f53b..2786375 100644 --- a/web/src/services/nodes.ts +++ b/web/src/services/nodes.ts @@ -16,7 +16,14 @@ export async function createNode(name: string) { return unwrapApiEnvelope(response.data) } -export async function updateNode(id: number, data: { name: string }) { +export interface NodeUpdateInput { + name: string + labels?: string + maxConcurrent?: number + bandwidthLimit?: string +} + +export async function updateNode(id: number, data: NodeUpdateInput) { const response = await http.put>(`/nodes/${id}`, data) return unwrapApiEnvelope(response.data) } diff --git a/web/src/types/backup-tasks.ts b/web/src/types/backup-tasks.ts index cb3e74d..26526bf 100644 --- a/web/src/types/backup-tasks.ts +++ b/web/src/types/backup-tasks.ts @@ -14,6 +14,8 @@ export interface BackupTaskSummary { storageTargetNames: string[] nodeId: number nodeName?: string + /** 节点池标签(summary):当任务绑定节点池而非固定节点时显示 */ + nodePoolTag?: string tags: string retentionDays: number compression: BackupCompression @@ -64,6 +66,8 @@ export interface BackupTaskPayload { storageTargetId: number storageTargetIds: number[] nodeId: number + /** 节点池标签(创建/更新)。与 nodeId 互斥:nodeId=0 且本字段非空时触发动态调度。 */ + nodePoolTag?: string tags: string retentionDays: number compression: BackupCompression diff --git a/web/src/types/nodes.ts b/web/src/types/nodes.ts index 3a51138..aa2753d 100644 --- a/web/src/types/nodes.ts +++ b/web/src/types/nodes.ts @@ -9,6 +9,10 @@ export interface NodeSummary { arch: string agentVersion: string lastSeen: string + maxConcurrent?: number + bandwidthLimit?: string + /** CSV 节点标签;任务的 NodePoolTag 命中这里任一即会被调度到本节点 */ + labels?: string createdAt: string }