feat(BackupX): harden agent cluster backup workflow

Squash merge PR #61
This commit is contained in:
Wu Qing
2026-05-13 14:24:45 +08:00
committed by GitHub
parent 7a6ffd4ddd
commit 7084d47c4b
30 changed files with 1360 additions and 155 deletions

View File

@@ -1,6 +1,10 @@
#!/bin/sh
set -e
if [ "${1:-}" = "agent" ]; then
exec /app/bin/backupx "$@"
fi
# Backend listens on internal port 8341, Nginx exposes 8340
export BACKUPX_SERVER_PORT="${BACKUPX_SERVER_PORT_INTERNAL:-8341}"

View File

@@ -8,6 +8,8 @@ description: File, MySQL, PostgreSQL, SQLite and SAP HANA — what they back up
BackupX supports five built-in backup types. Type determines which runner executes the job.
When a task is routed to a remote Agent, the source tools and paths are resolved on that Agent host. Multi-target uploads are still tracked per storage target; if at least one target succeeds, the backup record is marked successful and the per-target result table shows partial failures.
## File / Directory
Tars (and optionally gzips) one or more filesystem paths.

View File

@@ -62,6 +62,8 @@ The script runs automatically and:
5. Runs `systemctl enable --now backupx-agent`
6. Polls `/api/v1/agent/self` until the master confirms `status: online` (up to 30 s)
Docker mode uses the same `BACKUPX_AGENT_MASTER`, `BACKUPX_AGENT_TOKEN`, and `BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp` environment contract. After starting the container, the installer also probes `/api/v1/agent/self`; if the node does not come online, it prints `docker ps` and `docker logs --tail=100 backupx-agent` diagnostics before exiting non-zero.
If you choose the URL-based fallback command and `curl` prints HTML or the shell reports `Syntax error: newline unexpected`, the install URL is being served by the web console instead of the backend. Ensure either `/api/install/` or `/install/` is forwarded to the BackupX backend, or use the embedded command generated by the console.
Reruns are idempotent — to upgrade or re-provision, simply generate a new install command and run it again. The one-time install link expires after its TTL or after first consumption, whichever is sooner.
@@ -81,9 +83,15 @@ In the **Backup Tasks** page, pick the target node when creating the task. When
- Local (`nodeId=0`) → Master executes in-process
- Remote node → Master enqueues the command → Agent claims → Agent runs locally → uploads → reports back
The node table shows the Agent health and command queue state: pending/dispatched depth, running long commands, timeouts, oldest active command age, and the latest Agent-side error. The same queue depth, running-command, and timeout snapshots are exported as Prometheus metrics:
- `backupx_agent_command_queue_depth`
- `backupx_agent_command_running`
- `backupx_agent_command_timeout_total`
## Known limitations
- **Encrypted backups don't work via Agent** — the Agent doesn't hold Master's AES-256 key. Tasks with `encrypt: true` will fail if routed to an Agent
- **Encrypted backups are Master-only** — the Agent doesn't hold Master's AES-256 key. Creating or updating a task with `encrypt: true` and a remote node or node pool is rejected up front
- **Directory browser timeout** — remote dir listing is a synchronous RPC through the queue (15s default)
- **Dispatched command timeout** — claimed-but-unfinished commands are marked `timeout` after 10 minutes

View File

@@ -42,6 +42,8 @@ Go to **Backup Tasks → New**. Three steps:
2. **Source** — paths for file backup (multi-source supported), or connection info for databases
3. **Storage & policy** — pick target(s), compression, retention days, encryption on/off
For Agent-routed tasks, encryption must stay off because the Agent never receives the Master's encryption key. BackupX rejects remote-node or node-pool tasks with encryption enabled during create/update.
Save, then click **Run Now** to trigger a test. Live logs stream on the **Backup Records** page.
:::note

View File

@@ -8,6 +8,8 @@ description: 文件、MySQL、PostgreSQL、SQLite 和 SAP HANA — 各自的能
BackupX 支持五种内置备份类型,类型决定了用哪个 runner 执行。
当任务路由到远程 Agent 时,源路径和外部工具都会在该 Agent 主机上解析。多存储目标上传仍会逐目标记录结果;只要至少一个目标上传成功,备份记录即为成功,详情中的目标结果表会展示部分失败。
## 文件 / 目录
打包(可选 gzip一个或多个文件系统路径。

View File

@@ -62,6 +62,8 @@ Web 控制台 → **节点管理** → **添加节点**,打开三步向导:
5. 执行 `systemctl enable --now backupx-agent`
6. 轮询 `/api/v1/agent/self`,直到 Master 确认 `status: online`(最多 30 秒)
Docker 模式使用同一组环境变量约定:`BACKUPX_AGENT_MASTER`、`BACKUPX_AGENT_TOKEN` 和 `BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp`。容器启动后,安装脚本同样会探测 `/api/v1/agent/self`;如果节点没有上线,会输出 `docker ps` 与 `docker logs --tail=100 backupx-agent` 排查命令,并以非零状态退出。
如果使用 URL 备用命令时 `curl` 输出 HTML或 shell 报 `Syntax error: newline unexpected`,说明安装 URL 被 Web 控制台接管而不是转发到后端。需要确保 `/api/install/` 或 `/install/` 至少一个路径能转发到 BackupX 后端,或改用控制台生成的嵌入式命令。
脚本是幂等的:升级或重装只需重新生成一条安装命令再跑一次。一次性安装链接在 TTL 到期或被首次消费后立即作废。
@@ -81,9 +83,15 @@ Web 控制台 → **节点管理** → **添加节点**,打开三步向导:
- 本机 / 未指定(`nodeId=0`Master 进程内直接执行
- 远程节点Master 写入命令队列 → Agent 拉取 → Agent 本地执行 → 上传 → 回报
节点列表会展示 Agent 健康与命令队列状态pending/dispatched 深度、运行中的长任务、超时数、最旧活跃命令年龄和最近 Agent 错误。同样的队列深度、运行中命令数和超时快照会导出为 Prometheus 指标:
- `backupx_agent_command_queue_depth`
- `backupx_agent_command_running`
- `backupx_agent_command_timeout_total`
## 已知限制
- **Agent 不支持加密备份**Agent 不持有 Master 的 AES-256 密钥。`encrypt: true` 的任务路由到 Agent 时会直接上报失败
- **加密备份仅支持 Master 本机执行**Agent 不持有 Master 的 AES-256 密钥。创建或更新任务时,如果 `encrypt: true` 且选择了远程节点或节点池,会在入口直接拒绝
- **目录浏览超时**:远程目录浏览通过命令队列做同步 RPC默认 15s 超时
- **派发命令超时**Agent 领取但未完成的命令超过 10 分钟会被置 `timeout`

View File

@@ -42,6 +42,8 @@ description: 部署 BackupX、添加存储目标、创建第一个备份任务
2. **源配置** — 文件备份选择源路径(支持多个),数据库备份填写连接信息
3. **存储与策略** — 选择存储目标(支持多个)、压缩策略、保留天数、是否加密
对于路由到 Agent 的任务,加密必须关闭,因为 Agent 不会拿到 Master 的加密密钥。BackupX 会在创建/更新阶段拒绝开启加密的远程节点或节点池任务。
保存后可点击 **立即执行** 测试,**备份记录** 页面实时查看执行日志。
:::note

View File

@@ -143,13 +143,24 @@ func (c *MasterClient) GetTaskSpec(ctx context.Context, taskID uint) (*TaskSpec,
// RecordUpdate 与 service.AgentRecordUpdate 对齐
type RecordUpdate struct {
Status string `json:"status,omitempty"`
FileName string `json:"fileName,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Checksum string `json:"checksum,omitempty"`
StoragePath string `json:"storagePath,omitempty"`
ErrorMessage string `json:"errorMessage,omitempty"`
LogAppend string `json:"logAppend,omitempty"`
Status string `json:"status,omitempty"`
FileName string `json:"fileName,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Checksum string `json:"checksum,omitempty"`
StoragePath string `json:"storagePath,omitempty"`
StorageTargetID uint `json:"storageTargetId,omitempty"`
StorageUploadResults []StorageResultItem `json:"storageUploadResults,omitempty"`
ErrorMessage string `json:"errorMessage,omitempty"`
LogAppend string `json:"logAppend,omitempty"`
}
type StorageResultItem struct {
StorageTargetID uint `json:"storageTargetId"`
StorageTargetName string `json:"storageTargetName"`
Status string `json:"status"`
StoragePath string `json:"storagePath,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Error string `json:"error,omitempty"`
}
// UpdateRecord 上报备份记录的状态/日志

View File

@@ -126,22 +126,52 @@ func (e *Executor) ExecuteRunTask(ctx context.Context, taskID, recordID uint) er
e.reportRecordFailure(ctx, recordID, "没有关联的存储目标")
return fmt.Errorf("no storage targets")
}
uploadResults := make([]StorageResultItem, 0, len(spec.StorageTargets))
selectedStorageTargetID := uint(0)
var uploadErrors []string
for _, target := range spec.StorageTargets {
if err := e.uploadToTarget(ctx, recordID, target, finalPath, storagePath, fileSize, spec.TaskID); err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("上传到 %s 失败: %v", target.Name, err))
return err
uploadResults = append(uploadResults, StorageResultItem{
StorageTargetID: target.ID,
StorageTargetName: target.Name,
Status: "failed",
Error: err.Error(),
})
uploadErrors = append(uploadErrors, fmt.Sprintf("%s: %v", target.Name, err))
e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 上传到存储目标 %s 失败: %v\n", target.Name, err))
continue
}
if selectedStorageTargetID == 0 {
selectedStorageTargetID = target.ID
}
uploadResults = append(uploadResults, StorageResultItem{
StorageTargetID: target.ID,
StorageTargetName: target.Name,
Status: "success",
StoragePath: storagePath,
FileSize: fileSize,
})
e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 已上传到存储目标 %s\n", target.Name))
}
if selectedStorageTargetID == 0 {
msg := strings.Join(uploadErrors, "; ")
if msg == "" {
msg = "所有存储目标上传均失败"
}
e.reportRecordFailureWithUploadResults(ctx, recordID, msg, uploadResults)
return fmt.Errorf("%s", msg)
}
// 6) 上报最终成功
return e.client.UpdateRecord(ctx, recordID, RecordUpdate{
Status: "success",
FileName: fileName,
FileSize: fileSize,
Checksum: checksum,
StoragePath: storagePath,
LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize),
Status: "success",
FileName: fileName,
FileSize: fileSize,
Checksum: checksum,
StoragePath: storagePath,
StorageTargetID: selectedStorageTargetID,
StorageUploadResults: uploadResults,
LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize),
})
}
@@ -177,10 +207,15 @@ func (e *Executor) appendLog(ctx context.Context, recordID uint, line string) {
// reportRecordFailure 上报失败状态
func (e *Executor) reportRecordFailure(ctx context.Context, recordID uint, msg string) {
e.reportRecordFailureWithUploadResults(ctx, recordID, msg, nil)
}
func (e *Executor) reportRecordFailureWithUploadResults(ctx context.Context, recordID uint, msg string, uploadResults []StorageResultItem) {
_ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{
Status: "failed",
ErrorMessage: msg,
LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg),
Status: "failed",
ErrorMessage: msg,
StorageUploadResults: uploadResults,
LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg),
})
}

View File

@@ -1,9 +1,20 @@
package agent
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
"backupx/server/internal/storage"
)
func TestBuildBackupTaskSpecParsesJSONSourcePaths(t *testing.T) {
@@ -32,3 +43,191 @@ func TestParseStringListFieldKeepsLegacyLineFormat(t *testing.T) {
t.Fatalf("paths = %#v, want %#v", got, want)
}
}
func TestExecuteRunTaskRecordsPerTargetUploadResults(t *testing.T) {
sourceDir := t.TempDir()
if err := os.WriteFile(filepath.Join(sourceDir, "index.html"), []byte("hello"), 0o644); err != nil {
t.Fatalf("WriteFile returned error: %v", err)
}
var finalUpdate RecordUpdate
var updates []RecordUpdate
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/agent/tasks/1":
writeAgentEnvelope(t, w, TaskSpec{
TaskID: 1,
Name: "site",
Type: "file",
SourcePath: sourceDir,
Compression: "gzip",
StorageTargets: []StorageTargetConfig{
{ID: 11, Name: "broken", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"broken"}`)},
{ID: 12, Name: "ok", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"ok"}`)},
},
})
case r.Method == http.MethodPost && r.URL.Path == "/api/agent/records/99":
var update RecordUpdate
if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
t.Fatalf("Decode update returned error: %v", err)
}
updates = append(updates, update)
if update.Status != "" {
finalUpdate = update
}
writeAgentEnvelope(t, w, map[string]string{"status": "ok"})
default:
http.NotFound(w, r)
}
}))
defer server.Close()
executor := NewExecutor(NewMasterClient(server.URL, "token", false), filepath.Join(t.TempDir(), "tmp"))
executor.storageRegistry = storage.NewRegistry(&agentTestStorageFactory{
providers: map[string]*agentTestStorageProvider{
"broken": {name: "broken", failUpload: true},
"ok": {name: "ok", objects: map[string][]byte{}},
},
})
if err := executor.ExecuteRunTask(context.Background(), 1, 99); err != nil {
t.Fatalf("ExecuteRunTask returned error: %v", err)
}
if len(updates) == 0 || finalUpdate.Status != "success" {
t.Fatalf("expected final success update, got updates=%#v final=%#v", updates, finalUpdate)
}
if finalUpdate.StorageTargetID != 12 {
t.Fatalf("expected first successful target 12, got %d", finalUpdate.StorageTargetID)
}
if len(finalUpdate.StorageUploadResults) != 2 {
t.Fatalf("expected two upload results, got %#v", finalUpdate.StorageUploadResults)
}
if finalUpdate.StorageUploadResults[0].Status != "failed" || finalUpdate.StorageUploadResults[1].Status != "success" {
t.Fatalf("unexpected upload results: %#v", finalUpdate.StorageUploadResults)
}
if finalUpdate.StoragePath == "" || finalUpdate.FileSize <= 0 || finalUpdate.Checksum == "" {
t.Fatalf("expected artifact metadata in final update, got %#v", finalUpdate)
}
}
func TestExecuteRunTaskReportsPerTargetUploadResultsWhenAllTargetsFail(t *testing.T) {
sourceDir := t.TempDir()
if err := os.WriteFile(filepath.Join(sourceDir, "index.html"), []byte("hello"), 0o644); err != nil {
t.Fatalf("WriteFile returned error: %v", err)
}
var finalUpdate RecordUpdate
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/agent/tasks/1":
writeAgentEnvelope(t, w, TaskSpec{
TaskID: 1,
Name: "site",
Type: "file",
SourcePath: sourceDir,
Compression: "gzip",
StorageTargets: []StorageTargetConfig{
{ID: 11, Name: "broken-a", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"broken-a"}`)},
{ID: 12, Name: "broken-b", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"broken-b"}`)},
},
})
case r.Method == http.MethodPost && r.URL.Path == "/api/agent/records/99":
var update RecordUpdate
if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
t.Fatalf("Decode update returned error: %v", err)
}
if update.Status != "" {
finalUpdate = update
}
writeAgentEnvelope(t, w, map[string]string{"status": "ok"})
default:
http.NotFound(w, r)
}
}))
defer server.Close()
executor := NewExecutor(NewMasterClient(server.URL, "token", false), filepath.Join(t.TempDir(), "tmp"))
executor.storageRegistry = storage.NewRegistry(&agentTestStorageFactory{
providers: map[string]*agentTestStorageProvider{
"broken-a": {name: "broken-a", failUpload: true},
"broken-b": {name: "broken-b", failUpload: true},
},
})
if err := executor.ExecuteRunTask(context.Background(), 1, 99); err == nil {
t.Fatal("expected ExecuteRunTask to return upload failure")
}
if finalUpdate.Status != "failed" {
t.Fatalf("expected final failed update, got %#v", finalUpdate)
}
if len(finalUpdate.StorageUploadResults) != 2 {
t.Fatalf("expected failed update to keep per-target results, got %#v", finalUpdate.StorageUploadResults)
}
for _, item := range finalUpdate.StorageUploadResults {
if item.Status != "failed" || item.Error == "" {
t.Fatalf("unexpected upload result: %#v", item)
}
}
}
type agentTestStorageFactory struct {
providers map[string]*agentTestStorageProvider
}
func (f *agentTestStorageFactory) Type() storage.ProviderType {
return "agent_test_storage"
}
func (f *agentTestStorageFactory) New(_ context.Context, config map[string]any) (storage.StorageProvider, error) {
name, _ := config["name"].(string)
provider := f.providers[name]
if provider == nil {
return nil, fmt.Errorf("unknown provider %q", name)
}
return provider, nil
}
type agentTestStorageProvider struct {
name string
failUpload bool
objects map[string][]byte
}
func (p *agentTestStorageProvider) Type() storage.ProviderType { return "agent_test_storage" }
func (p *agentTestStorageProvider) TestConnection(context.Context) error {
return nil
}
func (p *agentTestStorageProvider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error {
if p.failUpload {
return fmt.Errorf("upload failed for %s", p.name)
}
data, err := io.ReadAll(reader)
if err != nil {
return err
}
if p.objects == nil {
p.objects = map[string][]byte{}
}
p.objects[objectKey] = data
return nil
}
func (p *agentTestStorageProvider) Download(_ context.Context, objectKey string) (io.ReadCloser, error) {
data, ok := p.objects[objectKey]
if !ok {
return nil, fmt.Errorf("object %s not found", objectKey)
}
return io.NopCloser(strings.NewReader(string(data))), nil
}
func (p *agentTestStorageProvider) Delete(_ context.Context, objectKey string) error {
delete(p.objects, objectKey)
return nil
}
func (p *agentTestStorageProvider) List(context.Context, string) ([]storage.ObjectInfo, error) {
return nil, nil
}
func writeAgentEnvelope(t *testing.T, w http.ResponseWriter, data any) {
t.Helper()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]any{"code": "OK", "data": data}); err != nil {
t.Fatalf("Encode response returned error: %v", err)
}
}

View File

@@ -131,6 +131,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
// Agent 协议服务:命令队列 + 任务下发 + 记录上报
agentCmdRepo := repository.NewAgentCommandRepository(db)
nodeService.SetAgentCommandRepository(agentCmdRepo)
agentService := service.NewAgentService(nodeRepo, backupTaskRepo, backupRecordRepo, storageTargetRepo, agentCmdRepo, configCipher)
agentService.SetRestoreRepository(restoreRecordRepo)
agentService.StartCommandTimeoutMonitor(ctx, 30*time.Second, 10*time.Minute)
@@ -240,7 +241,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
replicationService.SetMetrics(appMetrics)
metricsCollector := metrics.NewCollector(
appMetrics,
metrics.NewRepoSource(storageTargetRepo, backupRecordRepo, nodeRepo, backupTaskRepo),
metrics.NewRepoSource(storageTargetRepo, backupRecordRepo, nodeRepo, backupTaskRepo, agentCmdRepo),
30*time.Second,
)
metricsCollector.Start(ctx)

View File

@@ -1,6 +1,8 @@
package installscript
import (
"os"
"path/filepath"
"strings"
"testing"
@@ -85,11 +87,38 @@ func TestRenderScriptDocker(t *testing.T) {
if !strings.Contains(got, "awuqing/backupx:${AGENT_VERSION}") {
t.Errorf("docker script missing image tag reference:\n%s", got)
}
if !strings.Contains(got, `"awuqing/backupx:${AGENT_VERSION}" agent`) {
t.Errorf("docker script must start image in agent mode:\n%s", got)
}
if !strings.Contains(got, `-e "BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp"`) {
t.Errorf("docker script missing temp dir env:\n%s", got)
}
if !strings.Contains(got, `docker logs --tail=100 backupx-agent`) {
t.Errorf("docker script missing diagnostic log command:\n%s", got)
}
if !strings.Contains(got, `grep -q '"status":"online"'`) {
t.Errorf("docker script missing online probe:\n%s", got)
}
if strings.Contains(got, "systemctl daemon-reload") {
t.Errorf("docker script should not reference systemctl:\n%s", got)
}
}
func TestDockerEntrypointForwardsAgentSubcommand(t *testing.T) {
entrypointPath := filepath.Join("..", "..", "..", "deploy", "docker", "entrypoint.sh")
got, err := os.ReadFile(entrypointPath)
if err != nil {
t.Fatalf("read docker entrypoint: %v", err)
}
script := string(got)
if !strings.Contains(script, `"${1:-}" = "agent"`) {
t.Fatalf("entrypoint must detect the agent subcommand before starting server:\n%s", script)
}
if !strings.Contains(script, `exec /app/bin/backupx "$@"`) {
t.Fatalf("entrypoint must exec backupx with forwarded args:\n%s", script)
}
}
func TestRenderComposeYaml(t *testing.T) {
ctx := testCtx
ctx.Mode = model.InstallModeDocker
@@ -100,9 +129,15 @@ func TestRenderComposeYaml(t *testing.T) {
if !strings.Contains(got, "image: awuqing/backupx:v1.7.0") {
t.Errorf("compose missing image:\n%s", got)
}
if !strings.Contains(got, `command: ["agent"]`) {
t.Errorf("compose must start image in agent mode:\n%s", got)
}
if !strings.Contains(got, `BACKUPX_AGENT_TOKEN: "deadbeefcafebabe0123456789abcdef0123456789abcdef0123456789abcdef"`) {
t.Errorf("compose missing token env:\n%s", got)
}
if !strings.Contains(got, `BACKUPX_AGENT_TEMP_DIR: "/var/lib/backupx-agent/tmp"`) {
t.Errorf("compose missing temp dir env:\n%s", got)
}
if !strings.Contains(got, "/var/lib/backupx-agent:/var/lib/backupx-agent") {
t.Errorf("compose missing agent data volume:\n%s", got)
}

View File

@@ -9,5 +9,6 @@ services:
environment:
BACKUPX_AGENT_MASTER: "{{.MasterURL}}"
BACKUPX_AGENT_TOKEN: "{{.AgentToken}}"
BACKUPX_AGENT_TEMP_DIR: "/var/lib/backupx-agent/tmp"
volumes:
- /var/lib/backupx-agent:/var/lib/backupx-agent

View File

@@ -109,7 +109,20 @@ docker rm -f backupx-agent >/dev/null 2>&1 || true
docker run -d --name backupx-agent --restart=unless-stopped \
-e "BACKUPX_AGENT_MASTER=${MASTER_URL}" \
-e "BACKUPX_AGENT_TOKEN=${AGENT_TOKEN}" \
-e "BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp" \
-v /var/lib/backupx-agent:/var/lib/backupx-agent \
"awuqing/backupx:${AGENT_VERSION}" agent
echo "✓ 容器已启动"
echo "✓ 容器已启动,等待节点上线"
for i in $(seq 1 15); do
sleep 2
if curl -fsSL -H "X-Agent-Token: ${AGENT_TOKEN}" "${MASTER_URL}/api/v1/agent/self" 2>/dev/null \
| grep -q '"status":"online"'; then
echo "✓ 节点已上线"
exit 0
fi
done
echo "⚠ 30s 内未收到上线心跳,请检查容器状态、网络与 Master URL。"
echo "排查命令docker ps -a --filter name=backupx-agent"
echo "排查命令docker logs --tail=100 backupx-agent"
exit 2
{{end}}

View File

@@ -13,16 +13,18 @@ type SampleSource interface {
ListStorageTargets(ctx context.Context) ([]model.StorageTarget, error)
StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error)
ListNodes(ctx context.Context) ([]model.Node, error)
AgentQueueSummaries(ctx context.Context) (map[uint]repository.AgentCommandQueueSummary, error)
CountSLABreach(ctx context.Context) (int, error)
}
// repoSource 把 repository 适配到 SampleSource。
type repoSource struct {
targets repository.StorageTargetRepository
records repository.BackupRecordRepository
nodes repository.NodeRepository
tasks repository.BackupTaskRepository
now func() time.Time
targets repository.StorageTargetRepository
records repository.BackupRecordRepository
nodes repository.NodeRepository
tasks repository.BackupTaskRepository
commands repository.AgentCommandRepository
now func() time.Time
}
// NewRepoSource 用仓储实例构造 SampleSource。
@@ -31,13 +33,15 @@ func NewRepoSource(
records repository.BackupRecordRepository,
nodes repository.NodeRepository,
tasks repository.BackupTaskRepository,
commands repository.AgentCommandRepository,
) SampleSource {
return &repoSource{
targets: targets,
records: records,
nodes: nodes,
tasks: tasks,
now: func() time.Time { return time.Now().UTC() },
targets: targets,
records: records,
nodes: nodes,
tasks: tasks,
commands: commands,
now: func() time.Time { return time.Now().UTC() },
}
}
@@ -53,6 +57,13 @@ func (s *repoSource) ListNodes(ctx context.Context) ([]model.Node, error) {
return s.nodes.List(ctx)
}
func (s *repoSource) AgentQueueSummaries(ctx context.Context) (map[uint]repository.AgentCommandQueueSummary, error) {
if s.commands == nil {
return nil, nil
}
return s.commands.NodeQueueSummaries(ctx)
}
// CountSLABreach 统计当前违反 RPO 的任务:
// - 任务启用且配置了 SLAHoursRPO > 0
// - 最近一次成功备份距今超出 SLA 时间窗,或从未成功过
@@ -136,7 +147,9 @@ func (c *Collector) collect(ctx context.Context) {
}
// 节点在线状态role 约定为 master / agent
if nodes, err := c.source.ListNodes(ctx); err == nil {
queueByNode, _ := c.source.AgentQueueSummaries(ctx)
c.metrics.ResetNodeOnline()
c.metrics.ResetAgentQueue()
for i := range nodes {
n := &nodes[i]
role := "agent"
@@ -144,6 +157,8 @@ func (c *Collector) collect(ctx context.Context) {
role = "master"
}
c.metrics.SetNodeOnline(n.Name, role, n.Status == model.NodeStatusOnline)
queue := queueByNode[n.ID]
c.metrics.SetAgentQueue(n.Name, role, queue.Depth, queue.Running, queue.Timeouts)
}
}
if breach, err := c.source.CountSLABreach(ctx); err == nil {

View File

@@ -31,6 +31,12 @@ type Metrics struct {
StorageUsedBytes *prometheus.GaugeVec
// 节点在线状态labels: node_name, rolevalue: 0/1
NodeOnline *prometheus.GaugeVec
// Agent 命令队列深度labels: node_name, role
AgentCommandQueueDepth *prometheus.GaugeVec
// Agent 正在执行的长命令数labels: node_name, role
AgentCommandRunning *prometheus.GaugeVec
// Agent 命令超时累计数快照labels: node_name, role
AgentCommandTimeoutTotal *prometheus.GaugeVec
// 验证演练结果labels: status
VerifyRunTotal *prometheus.CounterVec
// 恢复操作结果labels: status
@@ -78,6 +84,18 @@ func New(version string) *Metrics {
Name: "backupx_node_online",
Help: "集群节点在线状态1 在线 / 0 离线)",
}, []string{"node_name", "role"}),
AgentCommandQueueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "backupx_agent_command_queue_depth",
Help: "Agent 当前 pending/dispatched 命令总数",
}, []string{"node_name", "role"}),
AgentCommandRunning: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "backupx_agent_command_running",
Help: "Agent 当前正在执行的长命令数",
}, []string{"node_name", "role"}),
AgentCommandTimeoutTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "backupx_agent_command_timeout_total",
Help: "Agent 已超时命令数快照",
}, []string{"node_name", "role"}),
VerifyRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "backupx_verify_run_total",
Help: "备份验证演练执行总数",
@@ -106,6 +124,9 @@ func New(version string) *Metrics {
m.TaskRunningGauge,
m.StorageUsedBytes,
m.NodeOnline,
m.AgentCommandQueueDepth,
m.AgentCommandRunning,
m.AgentCommandTimeoutTotal,
m.VerifyRunTotal,
m.RestoreRunTotal,
m.ReplicationRunTotal,
@@ -208,6 +229,24 @@ func (m *Metrics) ResetNodeOnline() {
m.NodeOnline.Reset()
}
func (m *Metrics) SetAgentQueue(name, role string, depth, running, timeoutCount int) {
if m == nil {
return
}
m.AgentCommandQueueDepth.WithLabelValues(name, role).Set(float64(depth))
m.AgentCommandRunning.WithLabelValues(name, role).Set(float64(running))
m.AgentCommandTimeoutTotal.WithLabelValues(name, role).Set(float64(timeoutCount))
}
func (m *Metrics) ResetAgentQueue() {
if m == nil {
return
}
m.AgentCommandQueueDepth.Reset()
m.AgentCommandRunning.Reset()
m.AgentCommandTimeoutTotal.Reset()
}
// ResetStorageUsed 清空存储目标 gauge。
func (m *Metrics) ResetStorageUsed() {
if m == nil {

View File

@@ -41,9 +41,11 @@ func TestObserveTaskRun_NilReceiverIsSafe(t *testing.T) {
m.DecTaskRunning()
m.SetStorageUsed("a", "s3", 1)
m.SetNodeOnline("n1", "master", true)
m.SetAgentQueue("n1", "agent", 2, 1, 3)
m.SetSLABreach(3)
m.ResetNodeOnline()
m.ResetStorageUsed()
m.ResetAgentQueue()
// no panic -> pass
}
@@ -51,6 +53,7 @@ func TestHandler_ExposesBackupxMetrics(t *testing.T) {
m := New("0.0.0-test")
m.ObserveTaskRun("file", "success", 1.0, 2048)
m.SetNodeOnline("n1", "master", true)
m.SetAgentQueue("edge-a", "agent", 3, 1, 2)
m.SetSLABreach(1)
recorder := httptest.NewRecorder()
@@ -66,6 +69,9 @@ func TestHandler_ExposesBackupxMetrics(t *testing.T) {
"backupx_task_run_total",
"backupx_task_run_duration_seconds",
"backupx_node_online",
"backupx_agent_command_queue_depth",
"backupx_agent_command_running",
"backupx_agent_command_timeout_total",
"backupx_sla_breach_tasks",
"backupx_app_info",
} {

View File

@@ -35,6 +35,28 @@ type AgentCommandRepository interface {
// ListPendingByNode 列出某节点下的所有 pending/dispatched 命令。
// 用于删除节点或节点离线时的清理。
ListPendingByNode(ctx context.Context, nodeID uint) ([]model.AgentCommand, error)
NodeQueueSummaries(ctx context.Context) (map[uint]AgentCommandQueueSummary, error)
}
type AgentCommandQueueSummary struct {
NodeID uint `json:"nodeId"`
Pending int `json:"pending"`
Dispatched int `json:"dispatched"`
Running int `json:"running"`
Depth int `json:"depth"`
Timeouts int `json:"timeouts"`
LastError string `json:"lastError,omitempty"`
OldestActiveAt *time.Time `json:"oldestActiveAt,omitempty"`
}
type agentCommandTimeoutCount struct {
NodeID uint
Count int
}
type agentCommandLastError struct {
NodeID uint
ErrorMessage string
}
type GormAgentCommandRepository struct {
@@ -186,3 +208,114 @@ func (r *GormAgentCommandRepository) ListPendingByNode(ctx context.Context, node
}
return items, nil
}
func (r *GormAgentCommandRepository) NodeQueueSummaries(ctx context.Context) (map[uint]AgentCommandQueueSummary, error) {
summaries, err := r.activeQueueSummaries(ctx)
if err != nil {
return nil, err
}
if err := r.applyTerminalQueueStats(ctx, summaries); err != nil {
return nil, err
}
return summaries, nil
}
func (r *GormAgentCommandRepository) activeQueueSummaries(ctx context.Context) (map[uint]AgentCommandQueueSummary, error) {
var items []model.AgentCommand
if err := r.db.WithContext(ctx).
Where("status IN ?", []string{
model.AgentCommandStatusPending,
model.AgentCommandStatusDispatched,
}).
Order("node_id asc, id asc").
Find(&items).Error; err != nil {
return nil, err
}
summaries := make(map[uint]AgentCommandQueueSummary)
for i := range items {
cmd := &items[i]
summary := summaries[cmd.NodeID]
summary.NodeID = cmd.NodeID
switch cmd.Status {
case model.AgentCommandStatusPending:
summary.Pending++
summary.Depth++
summary.OldestActiveAt = oldestTime(summary.OldestActiveAt, &cmd.CreatedAt)
case model.AgentCommandStatusDispatched:
summary.Dispatched++
summary.Depth++
if isLongRunningAgentCommand(cmd.Type) {
summary.Running++
}
summary.OldestActiveAt = oldestTime(summary.OldestActiveAt, cmd.DispatchedAt)
}
summaries[cmd.NodeID] = summary
}
return summaries, nil
}
func (r *GormAgentCommandRepository) applyTerminalQueueStats(ctx context.Context, summaries map[uint]AgentCommandQueueSummary) error {
var timeoutCounts []agentCommandTimeoutCount
if err := r.db.WithContext(ctx).
Model(&model.AgentCommand{}).
Select("node_id, COUNT(*) AS count").
Where("status = ?", model.AgentCommandStatusTimeout).
Group("node_id").
Scan(&timeoutCounts).Error; err != nil {
return err
}
for _, item := range timeoutCounts {
summary := summaries[item.NodeID]
summary.NodeID = item.NodeID
summary.Timeouts = item.Count
summaries[item.NodeID] = summary
}
terminalStatuses := []string{
model.AgentCommandStatusFailed,
model.AgentCommandStatusTimeout,
}
latestByNode := r.db.WithContext(ctx).
Model(&model.AgentCommand{}).
Select("node_id, MAX(COALESCE(completed_at, updated_at, created_at)) AS last_error_at").
Where("status IN ? AND error_message <> ''", terminalStatuses).
Group("node_id")
var lastErrors []agentCommandLastError
if err := r.db.WithContext(ctx).
Table("agent_commands AS cmd").
Select("cmd.node_id, cmd.error_message").
Joins("JOIN (?) latest ON latest.node_id = cmd.node_id AND latest.last_error_at = COALESCE(cmd.completed_at, cmd.updated_at, cmd.created_at)", latestByNode).
Where("cmd.status IN ? AND cmd.error_message <> ''", terminalStatuses).
Order("cmd.node_id asc, cmd.id desc").
Scan(&lastErrors).Error; err != nil {
return err
}
seenLastError := make(map[uint]struct{}, len(lastErrors))
for _, item := range lastErrors {
if _, ok := seenLastError[item.NodeID]; ok {
continue
}
summary := summaries[item.NodeID]
summary.NodeID = item.NodeID
summary.LastError = item.ErrorMessage
summaries[item.NodeID] = summary
seenLastError[item.NodeID] = struct{}{}
}
return nil
}
func oldestTime(current *time.Time, candidate *time.Time) *time.Time {
if candidate == nil {
return current
}
if current == nil || candidate.Before(*current) {
value := *candidate
return &value
}
return current
}
func isLongRunningAgentCommand(commandType string) bool {
return commandType == model.AgentCommandTypeRunTask || commandType == model.AgentCommandTypeRestoreRecord
}

View File

@@ -218,3 +218,44 @@ func TestAgentCommandRepository_ListStaleActiveIncludesPendingAndDispatched(t *t
t.Fatalf("unexpected stale active order/items: %#v", items)
}
}
func TestAgentCommandRepository_NodeQueueSummaries(t *testing.T) {
db := newTestDB(t)
repo := NewAgentCommandRepository(db)
ctx := context.Background()
old := time.Now().UTC().Add(-20 * time.Minute)
recent := time.Now().UTC().Add(-2 * time.Minute)
dispatchedAt := time.Now().UTC().Add(-5 * time.Minute)
completedAt := time.Now().UTC().Add(-1 * time.Minute)
commands := []*model.AgentCommand{
{NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusPending, CreatedAt: old},
{NodeID: 1, Type: model.AgentCommandTypeRestoreRecord, Status: model.AgentCommandStatusPending, CreatedAt: recent},
{NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusDispatched, DispatchedAt: &dispatchedAt},
{NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusFailed, ErrorMessage: "boom", CompletedAt: &completedAt},
{NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusTimeout, ErrorMessage: "late", CompletedAt: &recent},
{NodeID: 2, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusPending, CreatedAt: old},
}
for _, cmd := range commands {
if err := repo.Create(ctx, cmd); err != nil {
t.Fatalf("Create returned error: %v", err)
}
}
summaries, err := repo.NodeQueueSummaries(ctx)
if err != nil {
t.Fatalf("NodeQueueSummaries returned error: %v", err)
}
nodeOne := summaries[1]
if nodeOne.Pending != 2 || nodeOne.Dispatched != 1 || nodeOne.Running != 1 || nodeOne.Depth != 3 {
t.Fatalf("unexpected node 1 summary: %#v", nodeOne)
}
if nodeOne.Timeouts != 1 || nodeOne.LastError != "boom" {
t.Fatalf("expected terminal timeout and latest error in summary, got %#v", nodeOne)
}
if nodeOne.OldestActiveAt == nil || !nodeOne.OldestActiveAt.Equal(old) {
t.Fatalf("expected oldest active at %s, got %#v", old, nodeOne.OldestActiveAt)
}
if nodeTwo := summaries[2]; nodeTwo.Pending != 1 || nodeTwo.Depth != 1 || nodeTwo.Timeouts != 0 || nodeTwo.LastError != "" {
t.Fatalf("unexpected node 2 summary: %#v", nodeTwo)
}
}

View File

@@ -230,13 +230,15 @@ func (s *AgentService) ensureTaskSpecAccess(ctx context.Context, node *model.Nod
// AgentRecordUpdate Agent 上报备份记录的最终状态。
type AgentRecordUpdate struct {
Status string `json:"status"` // running | success | failed
FileName string `json:"fileName,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Checksum string `json:"checksum,omitempty"`
StoragePath string `json:"storagePath,omitempty"`
ErrorMessage string `json:"errorMessage,omitempty"`
LogAppend string `json:"logAppend,omitempty"` // 增量日志,追加到 record.log_content
Status string `json:"status"` // running | success | failed
FileName string `json:"fileName,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Checksum string `json:"checksum,omitempty"`
StoragePath string `json:"storagePath,omitempty"`
StorageTargetID uint `json:"storageTargetId,omitempty"`
StorageUploadResults []StorageUploadResultItem `json:"storageUploadResults,omitempty"`
ErrorMessage string `json:"errorMessage,omitempty"`
LogAppend string `json:"logAppend,omitempty"` // 增量日志,追加到 record.log_content
}
// UpdateRecord 更新备份记录的状态/日志。Agent 在执行过程中可多次调用。
@@ -273,6 +275,14 @@ func (s *AgentService) UpdateRecord(ctx context.Context, node *model.Node, recor
if update.StoragePath != "" {
record.StoragePath = update.StoragePath
}
if update.StorageTargetID > 0 {
record.StorageTargetID = update.StorageTargetID
}
if len(update.StorageUploadResults) > 0 {
if resultsJSON, marshalErr := json.Marshal(update.StorageUploadResults); marshalErr == nil {
record.StorageUploadResults = string(resultsJSON)
}
}
if update.ErrorMessage != "" {
record.ErrorMessage = update.ErrorMessage
}
@@ -294,7 +304,10 @@ func (s *AgentService) UpdateRecord(ctx context.Context, node *model.Node, recor
// 同步更新任务的 last_status
if update.Status == model.BackupRecordStatusSuccess || update.Status == model.BackupRecordStatusFailed {
task.LastStatus = update.Status
_ = s.taskRepo.Update(ctx, task)
task.LastRunAt = &record.StartedAt
if err := s.taskRepo.Update(ctx, task); err != nil {
return fmt.Errorf("update backup task summary: %w", err)
}
}
return nil
}

View File

@@ -2,7 +2,9 @@ package service
import (
"context"
"errors"
"path/filepath"
"strings"
"testing"
"time"
@@ -93,10 +95,15 @@ func TestAgentServicePooledTaskUsesRecordNodeForSpecAndRecordUpdates(t *testing.
}
if err := svc.UpdateRecord(ctx, owner, 1, AgentRecordUpdate{
Status: model.BackupRecordStatusSuccess,
FileName: "backup.tar.gz",
FileSize: 123,
StoragePath: "tasks/1/backup.tar.gz",
Status: model.BackupRecordStatusSuccess,
FileName: "backup.tar.gz",
FileSize: 123,
StoragePath: "tasks/1/backup.tar.gz",
StorageTargetID: 2,
StorageUploadResults: []StorageUploadResultItem{
{StorageTargetID: 1, StorageTargetName: "first", Status: "failed", Error: "boom"},
{StorageTargetID: 2, StorageTargetName: "second", Status: "success", StoragePath: "tasks/1/backup.tar.gz", FileSize: 123},
},
}); err != nil {
t.Fatalf("owner UpdateRecord returned error: %v", err)
}
@@ -107,11 +114,60 @@ func TestAgentServicePooledTaskUsesRecordNodeForSpecAndRecordUpdates(t *testing.
if updated.Status != model.BackupRecordStatusSuccess || updated.NodeID != owner.ID {
t.Fatalf("unexpected updated record: %#v", updated)
}
if updated.StorageTargetID != 2 {
t.Fatalf("expected successful storage target id 2, got %d", updated.StorageTargetID)
}
if !strings.Contains(updated.StorageUploadResults, `"storageTargetName":"second"`) {
t.Fatalf("expected upload results to be persisted, got %q", updated.StorageUploadResults)
}
if err := svc.UpdateRecord(ctx, other, 1, AgentRecordUpdate{LogAppend: "bad"}); err == nil {
t.Fatal("expected non-owner node to be forbidden from record update")
}
}
func TestAgentServiceUpdateRecordRefreshesTaskSummaryOnTerminalStatus(t *testing.T) {
for _, status := range []string{model.BackupRecordStatusSuccess, model.BackupRecordStatusFailed} {
t.Run(status, func(t *testing.T) {
svc, _, records, _, owner, _ := newAgentServicePoolTestHarness(t)
ctx := context.Background()
record, err := records.FindByID(ctx, 1)
if err != nil {
t.Fatalf("FindByID record returned error: %v", err)
}
if err := svc.UpdateRecord(ctx, owner, record.ID, AgentRecordUpdate{Status: status}); err != nil {
t.Fatalf("UpdateRecord returned error: %v", err)
}
task, err := svc.taskRepo.FindByID(ctx, record.TaskID)
if err != nil {
t.Fatalf("FindByID task returned error: %v", err)
}
if task.LastStatus != status {
t.Fatalf("expected task LastStatus %q, got %q", status, task.LastStatus)
}
if task.LastRunAt == nil || !task.LastRunAt.Equal(record.StartedAt) {
t.Fatalf("expected task LastRunAt to match record startedAt %s, got %#v", record.StartedAt, task.LastRunAt)
}
})
}
}
func TestAgentServiceUpdateRecordReturnsTaskSummaryUpdateError(t *testing.T) {
svc, _, _, _, owner, _ := newAgentServicePoolTestHarness(t)
ctx := context.Background()
expectedErr := errors.New("task update failed")
svc.taskRepo = &failingUpdateTaskRepo{
BackupTaskRepository: svc.taskRepo,
err: expectedErr,
}
err := svc.UpdateRecord(ctx, owner, 1, AgentRecordUpdate{Status: model.BackupRecordStatusSuccess})
if !errors.Is(err, expectedErr) {
t.Fatalf("expected task update error %v, got %v", expectedErr, err)
}
}
func TestAgentServiceProcessStaleCommandsFailsPendingRunTaskRecord(t *testing.T) {
svc, _, records, commands, owner, _ := newAgentServicePoolTestHarness(t)
ctx := context.Background()
@@ -587,3 +643,12 @@ func setBackupRecordUpdatedAt(db *gorm.DB, id uint, updatedAt time.Time) error {
func setRestoreRecordUpdatedAt(db *gorm.DB, id uint, updatedAt time.Time) error {
return db.Model(&model.RestoreRecord{}).Where("id = ?", id).UpdateColumn("updated_at", updatedAt).Error
}
type failingUpdateTaskRepo struct {
repository.BackupTaskRepository
err error
}
func (r *failingUpdateTaskRepo) Update(context.Context, *model.BackupTask) error {
return r.err
}

View File

@@ -52,6 +52,11 @@ type StorageUploadResultItem struct {
Error string `json:"error,omitempty"`
}
const (
uploadMaxAttempts = 3
uploadRetryBackoff = 10 * time.Second
)
type DownloadedArtifact struct {
FileName string
Reader io.ReadCloser
@@ -96,6 +101,7 @@ type BackupExecutionService struct {
retries int // rclone 底层重试次数
bandwidthLimit string // rclone 带宽限制(全局默认,节点配置可覆盖)
metrics *metrics.Metrics
taskLocks sync.Map
}
// SetMetrics 注入 Prometheus 采集器。nil 时所有埋点退化为 no-op。
@@ -358,6 +364,11 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b
if task == nil {
return nil, apperror.New(404, "BACKUP_TASK_NOT_FOUND", "备份任务不存在", fmt.Errorf("backup task %d not found", id))
}
unlock := s.acquireTaskStartLock(task.ID)
defer unlock()
if err := s.ensureTaskNotRunning(ctx, task); err != nil {
return nil, err
}
// 维护窗口校验:手动执行同样尊重窗口,避免业务高峰期误触发。
if strings.TrimSpace(task.MaintenanceWindows) != "" {
windows := backup.ParseMaintenanceWindows(task.MaintenanceWindows)
@@ -427,6 +438,27 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b
return s.getRecordDetail(ctx, record.ID)
}
func (s *BackupExecutionService) acquireTaskStartLock(taskID uint) func() {
value, _ := s.taskLocks.LoadOrStore(taskID, &sync.Mutex{})
mu := value.(*sync.Mutex)
mu.Lock()
return mu.Unlock
}
func (s *BackupExecutionService) ensureTaskNotRunning(ctx context.Context, task *model.BackupTask) error {
taskID := task.ID
items, err := s.records.List(ctx, repository.BackupRecordListOptions{TaskID: &taskID, Status: model.BackupRecordStatusRunning})
if err != nil {
return apperror.Internal("BACKUP_RECORD_LIST_FAILED", "无法检查任务运行状态", err)
}
if len(items) == 0 {
return nil
}
return apperror.BadRequest("BACKUP_TASK_ALREADY_RUNNING",
fmt.Sprintf("任务「%s」正在运行记录 #%d请等待完成后再触发。", task.Name, items[0].ID),
nil)
}
// shouldNotify 按任务的告警策略决定是否发送本次通知。
// 成功结果:始终发送(方便用户确认备份状态)。
// 失败结果:仅当"最近 N 条记录(含本次)均为 failed"时发送N = AlertOnConsecutiveFails。
@@ -678,6 +710,11 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
logger.Errorf("没有关联的存储目标")
return
}
storageUsage, err := s.storageUsageSnapshot(ctx)
if err != nil {
logger.Warnf("读取存储目标用量失败,跳过本次软配额校验:%v", err)
storageUsage = map[uint]int64{}
}
// 并行上传到所有目标
uploadResults = make([]StorageUploadResultItem, len(targetIDs))
@@ -701,15 +738,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
// 软限额校验QuotaBytes > 0 时,已累计 + 本次 > 配额 → 拒绝上传
if target != nil && target.QuotaBytes > 0 {
currentUsed := int64(0)
if items, err := s.records.StorageUsage(ctx); err == nil {
for _, it := range items {
if it.StorageTargetID == targetID {
currentUsed = it.TotalSize
break
}
}
}
currentUsed := storageUsage[targetID]
if currentUsed+fileSize > target.QuotaBytes {
quotaMsg := fmt.Sprintf("超出存储目标 %s 的配额(%d + %d > %d", targetName, currentUsed, fileSize, target.QuotaBytes)
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: quotaMsg}
@@ -718,15 +747,18 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
}
logger.Infof("开始上传备份到存储目标:%s", targetName)
// 上传级重试:最多 3 次,指数退避10s, 30s, 90s
maxAttempts := 3
// 上传级重试:最多 3 次,等待时间随 context 取消及时退出。
var lastUploadErr error
var hr *hashingReader
for attempt := 1; attempt <= maxAttempts; attempt++ {
for attempt := 1; attempt <= uploadMaxAttempts; attempt++ {
if attempt > 1 {
backoff := time.Duration(attempt*attempt) * 10 * time.Second
backoff := time.Duration(attempt-1) * uploadRetryBackoff
logger.Warnf("存储目标 %s 第 %d 次重试(等待 %v%v", targetName, attempt, backoff, lastUploadErr)
time.Sleep(backoff)
if waitErr := waitForUploadRetry(ctx, backoff); waitErr != nil {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: waitErr.Error()}
logger.Warnf("存储目标 %s 上传重试已取消:%v", targetName, waitErr)
return
}
}
artifact, openErr := os.Open(finalPath)
if openErr != nil {
@@ -756,7 +788,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
if lastUploadErr != nil {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: lastUploadErr.Error()}
logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, maxAttempts, lastUploadErr)
logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, uploadMaxAttempts, lastUploadErr)
return
}
// 完整性校验:对比实际传输字节数
@@ -881,6 +913,32 @@ func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model
return s.tasks.Update(ctx, task)
}
func (s *BackupExecutionService) storageUsageSnapshot(ctx context.Context) (map[uint]int64, error) {
items, err := s.records.StorageUsage(ctx)
if err != nil {
return nil, fmt.Errorf("storage usage snapshot: %w", err)
}
usage := make(map[uint]int64, len(items))
for _, item := range items {
usage[item.StorageTargetID] = item.TotalSize
}
return usage, nil
}
func waitForUploadRetry(ctx context.Context, delay time.Duration) error {
if delay <= 0 {
return nil
}
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func (s *BackupExecutionService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) {
return s.resolveProviderForNode(ctx, targetID, 0)
}

View File

@@ -2,11 +2,13 @@ package service
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@@ -40,9 +42,11 @@ func (f *testStorageFactory) New(_ context.Context, config map[string]any) (stor
}
type testStorageProvider struct {
name string
failUpload bool
objects map[string][]byte
name string
failUpload bool
blockUpload <-chan struct{}
onUpload func()
objects map[string][]byte
}
func (p *testStorageProvider) Type() storage.ProviderType { return "test_storage" }
@@ -50,6 +54,12 @@ func (p *testStorageProvider) TestConnection(context.Context) error {
return nil
}
func (p *testStorageProvider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error {
if p.blockUpload != nil {
<-p.blockUpload
}
if p.onUpload != nil {
p.onUpload()
}
if p.failUpload {
return fmt.Errorf("upload failed for %s", p.name)
}
@@ -193,6 +203,39 @@ func TestBackupExecutionServiceNodePoolSelectionDoesNotPersistTaskNodeID(t *test
}
}
func TestBackupExecutionServiceRejectsDuplicateRunningTask(t *testing.T) {
executionService, _, tasks, _, records, _, _ := newExecutionTestServices(t)
ctx := context.Background()
task, err := tasks.FindByID(ctx, 1)
if err != nil {
t.Fatalf("FindByID task returned error: %v", err)
}
startedAt := time.Now().UTC()
running := &model.BackupRecord{
TaskID: task.ID,
StorageTargetID: task.StorageTargetID,
NodeID: 0,
Status: model.BackupRecordStatusRunning,
StartedAt: startedAt,
}
if err := records.Create(ctx, running); err != nil {
t.Fatalf("Create running record returned error: %v", err)
}
_, err = executionService.RunTaskByIDSync(ctx, task.ID)
if err == nil || !strings.Contains(err.Error(), "正在运行") {
t.Fatalf("expected duplicate running task to be rejected, got %v", err)
}
items, err := records.List(ctx, repository.BackupRecordListOptions{Status: model.BackupRecordStatusRunning})
if err != nil {
t.Fatalf("List running records returned error: %v", err)
}
if len(items) != 1 || items[0].ID != running.ID {
t.Fatalf("expected only the original running record, got %#v", items)
}
}
func TestBackupExecutionServiceDeleteRecordDispatchesRemoteLocalDiskCleanup(t *testing.T) {
executionService, _, tasks, _, records, _, _ := newExecutionTestServices(t)
ctx := context.Background()
@@ -334,6 +377,155 @@ func TestBackupExecutionServiceRecordsFirstSuccessfulStorageTarget(t *testing.T)
}
}
func TestBackupExecutionServiceUploadRetryStopsWhenContextCancelled(t *testing.T) {
executionService, _, tasks, targets, records, _, _ := newExecutionTestServices(t)
ctx, cancel := context.WithCancel(context.Background())
var cancelOnce sync.Once
failing := &testStorageProvider{
name: "failing",
failUpload: true,
onUpload: func() {
cancelOnce.Do(cancel)
},
}
executionService.storageRegistry = storage.NewRegistry(&testStorageFactory{providers: map[string]*testStorageProvider{
"failing": failing,
}})
cipher := codec.NewConfigCipher("execution-secret")
failingConfig, err := cipher.EncryptJSON(map[string]any{"name": "failing"})
if err != nil {
t.Fatalf("EncryptJSON returned error: %v", err)
}
if err := targets.Update(ctx, &model.StorageTarget{
ID: 1,
Name: "local",
Type: "test_storage",
Enabled: true,
ConfigCiphertext: failingConfig,
ConfigVersion: 1,
LastTestStatus: "unknown",
}); err != nil {
t.Fatalf("Update target returned error: %v", err)
}
task, err := tasks.FindByID(ctx, 1)
if err != nil {
t.Fatalf("FindByID task returned error: %v", err)
}
startedAt := time.Now().UTC()
record := &model.BackupRecord{
TaskID: task.ID,
StorageTargetID: task.StorageTargetID,
Status: model.BackupRecordStatusRunning,
StartedAt: startedAt,
}
if err := records.Create(ctx, record); err != nil {
t.Fatalf("Create record returned error: %v", err)
}
done := make(chan struct{})
go func() {
executionService.executeTask(ctx, task, record.ID, startedAt)
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("expected cancelled upload retry to stop without waiting for backoff sleep")
}
}
func TestBackupExecutionServiceReadsStorageUsageOnceForMultiTargetQuotaChecks(t *testing.T) {
executionService, _, tasks, targets, records, _, _ := newExecutionTestServices(t)
ctx := context.Background()
first := &testStorageProvider{name: "first", objects: map[string][]byte{}}
second := &testStorageProvider{name: "second", objects: map[string][]byte{}}
executionService.storageRegistry = storage.NewRegistry(&testStorageFactory{providers: map[string]*testStorageProvider{
"first": first,
"second": second,
}})
cipher := codec.NewConfigCipher("execution-secret")
firstConfig, err := cipher.EncryptJSON(map[string]any{"name": "first"})
if err != nil {
t.Fatalf("EncryptJSON first returned error: %v", err)
}
secondConfig, err := cipher.EncryptJSON(map[string]any{"name": "second"})
if err != nil {
t.Fatalf("EncryptJSON second returned error: %v", err)
}
if err := targets.Update(ctx, &model.StorageTarget{ID: 1, Name: "local", Type: "test_storage", Enabled: true, ConfigCiphertext: firstConfig, ConfigVersion: 1, LastTestStatus: "unknown", QuotaBytes: 1 << 30}); err != nil {
t.Fatalf("Update first target returned error: %v", err)
}
if err := targets.Create(ctx, &model.StorageTarget{Name: "second", Type: "test_storage", Enabled: true, ConfigCiphertext: secondConfig, ConfigVersion: 1, LastTestStatus: "unknown", QuotaBytes: 1 << 30}); err != nil {
t.Fatalf("Create second target returned error: %v", err)
}
task, err := tasks.FindByID(ctx, 1)
if err != nil {
t.Fatalf("FindByID task returned error: %v", err)
}
task.StorageTargets = []model.StorageTarget{{ID: 1}, {ID: 2}}
if err := tasks.Update(ctx, task); err != nil {
t.Fatalf("Update task returned error: %v", err)
}
executionService.records = &storageUsageCountingRecordRepo{BackupRecordRepository: records}
detail, err := executionService.RunTaskByIDSync(ctx, task.ID)
if err != nil {
t.Fatalf("RunTaskByIDSync returned error: %v", err)
}
if detail.Status != model.BackupRecordStatusSuccess {
t.Fatalf("expected success, got %#v", detail)
}
countingRepo := executionService.records.(*storageUsageCountingRecordRepo)
if countingRepo.usageCalls != 1 {
t.Fatalf("expected StorageUsage to be called once for quota snapshot, got %d", countingRepo.usageCalls)
}
if len(first.objects) != 1 || len(second.objects) != 1 {
t.Fatalf("expected both targets to receive upload, got first=%d second=%d", len(first.objects), len(second.objects))
}
}
func TestBackupExecutionServiceContinuesWhenStorageUsageSnapshotFails(t *testing.T) {
executionService, _, _, targets, records, _, _ := newExecutionTestServices(t)
ctx := context.Background()
provider := &testStorageProvider{name: "primary", objects: map[string][]byte{}}
executionService.storageRegistry = storage.NewRegistry(&testStorageFactory{providers: map[string]*testStorageProvider{
"primary": provider,
}})
cipher := codec.NewConfigCipher("execution-secret")
configCiphertext, err := cipher.EncryptJSON(map[string]any{"name": "primary"})
if err != nil {
t.Fatalf("EncryptJSON returned error: %v", err)
}
if err := targets.Update(ctx, &model.StorageTarget{
ID: 1,
Name: "local",
Type: "test_storage",
Enabled: true,
ConfigCiphertext: configCiphertext,
ConfigVersion: 1,
LastTestStatus: "unknown",
QuotaBytes: 1 << 30,
}); err != nil {
t.Fatalf("Update target returned error: %v", err)
}
executionService.records = &storageUsageFailingRecordRepo{
BackupRecordRepository: records,
err: errStorageUsageFailed,
}
detail, err := executionService.RunTaskByIDSync(ctx, 1)
if err != nil {
t.Fatalf("RunTaskByIDSync returned error: %v", err)
}
if detail.Status != model.BackupRecordStatusSuccess {
t.Fatalf("expected success despite soft quota usage snapshot error, got %#v", detail)
}
if len(provider.objects) != 1 {
t.Fatalf("expected upload to proceed, got %d uploaded objects", len(provider.objects))
}
}
func TestBackupRecordServiceRestore(t *testing.T) {
executionService, recordService, _, _, _, sourceDir, _ := newExecutionTestServices(t)
detail, err := executionService.RunTaskByIDSync(context.Background(), 1)
@@ -354,3 +546,27 @@ func TestBackupRecordServiceRestore(t *testing.T) {
t.Fatalf("unexpected restored content: %s", string(content))
}
}
type storageUsageCountingRecordRepo struct {
repository.BackupRecordRepository
mu sync.Mutex
usageCalls int
}
func (r *storageUsageCountingRecordRepo) StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error) {
r.mu.Lock()
r.usageCalls++
r.mu.Unlock()
return r.BackupRecordRepository.StorageUsage(ctx)
}
type storageUsageFailingRecordRepo struct {
repository.BackupRecordRepository
err error
}
func (r *storageUsageFailingRecordRepo) StorageUsage(context.Context) ([]repository.BackupStorageUsageItem, error) {
return nil, r.err
}
var errStorageUsageFailed = errors.New("storage usage failed")

View File

@@ -33,16 +33,16 @@ type BackupTaskUpsertInput struct {
DBPassword string `json:"dbPassword" binding:"max=255"`
DBName string `json:"dbName" binding:"max=255"`
DBPath string `json:"dbPath" binding:"max=500"`
StorageTargetID uint `json:"storageTargetId"` // deprecated: 向后兼容
StorageTargetIDs []uint `json:"storageTargetIds"` // 新增:多存储目标
NodeID uint `json:"nodeId"` // 执行节点0 = 本机 Master 或节点池)
StorageTargetID uint `json:"storageTargetId"` // deprecated: 向后兼容
StorageTargetIDs []uint `json:"storageTargetIds"` // 新增:多存储目标
NodeID uint `json:"nodeId"` // 执行节点0 = 本机 Master 或节点池)
// NodePoolTag 节点池标签。NodeID=0 且本字段非空时,调度器动态从 Labels 命中的在线节点中选负载最低者。
NodePoolTag string `json:"nodePoolTag" binding:"max=64"`
Tags string `json:"tags" binding:"max=500"` // 逗号分隔标签
RetentionDays int `json:"retentionDays"`
Compression string `json:"compression" binding:"omitempty,oneof=gzip none"`
Encrypt bool `json:"encrypt"`
MaxBackups int `json:"maxBackups"`
NodePoolTag string `json:"nodePoolTag" binding:"max=64"`
Tags string `json:"tags" binding:"max=500"` // 逗号分隔标签
RetentionDays int `json:"retentionDays"`
Compression string `json:"compression" binding:"omitempty,oneof=gzip none"`
Encrypt bool `json:"encrypt"`
MaxBackups int `json:"maxBackups"`
// ExtraConfig 类型特有扩展配置(如 SAP HANA 的 backupLevel/backupChannels
ExtraConfig map[string]any `json:"extraConfig"`
// 验证(恢复演练)配置
@@ -70,8 +70,8 @@ type BackupTaskSummary struct {
Type string `json:"type"`
Enabled bool `json:"enabled"`
CronExpr string `json:"cronExpr"`
StorageTargetID uint `json:"storageTargetId"` // deprecated: 取第一个
StorageTargetName string `json:"storageTargetName"` // deprecated: 取第一个
StorageTargetID uint `json:"storageTargetId"` // deprecated: 取第一个
StorageTargetName string `json:"storageTargetName"` // deprecated: 取第一个
StorageTargetIDs []uint `json:"storageTargetIds"`
StorageTargetNames []string `json:"storageTargetNames"`
NodeID uint `json:"nodeId"`
@@ -91,10 +91,10 @@ type BackupTaskSummary struct {
SLAHoursRPO int `json:"slaHoursRpo"`
AlertOnConsecutiveFails int `json:"alertOnConsecutiveFails"`
// 备份复制目标3-2-1
ReplicationTargetIDs []uint `json:"replicationTargetIds"`
MaintenanceWindows string `json:"maintenanceWindows"`
DependsOnTaskIDs []uint `json:"dependsOnTaskIds"`
UpdatedAt time.Time `json:"updatedAt"`
ReplicationTargetIDs []uint `json:"replicationTargetIds"`
MaintenanceWindows string `json:"maintenanceWindows"`
DependsOnTaskIDs []uint `json:"dependsOnTaskIds"`
UpdatedAt time.Time `json:"updatedAt"`
}
type BackupTaskDetail struct {
@@ -488,6 +488,7 @@ func (s *BackupTaskService) validateInput(ctx context.Context, existing *model.B
return apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", fmt.Sprintf("关联的存储目标 %d 不存在", tid), nil)
}
}
var fixedNode *model.Node
if input.NodeID > 0 && s.nodes != nil {
node, err := s.nodes.FindByID(ctx, input.NodeID)
if err != nil {
@@ -496,12 +497,17 @@ func (s *BackupTaskService) validateInput(ctx context.Context, existing *model.B
if node == nil {
return apperror.BadRequest("BACKUP_TASK_INVALID", "所选执行节点不存在", nil)
}
fixedNode = node
}
// 节点池与固定节点互斥:固定节点已确定执行位置,不再动态调度
if input.NodeID > 0 && strings.TrimSpace(input.NodePoolTag) != "" {
return apperror.BadRequest("BACKUP_TASK_INVALID",
"固定执行节点与节点池标签只能选其一", nil)
}
if input.Encrypt && (strings.TrimSpace(input.NodePoolTag) != "" || (fixedNode != nil && !fixedNode.IsLocal)) {
return apperror.BadRequest("BACKUP_TASK_REMOTE_ENCRYPT_UNSUPPORTED",
"远程节点暂不支持加密备份。请关闭加密,或将任务固定在 Master 本机执行。", nil)
}
if input.RetentionDays < 0 {
return apperror.BadRequest("BACKUP_TASK_INVALID", "保留天数不能小于 0", nil)
}
@@ -639,38 +645,38 @@ func (s *BackupTaskService) buildTask(existing *model.BackupTask, input BackupTa
return nil, apperror.BadRequest("BACKUP_TASK_INVALID", "扩展配置格式不合法", err)
}
item := &model.BackupTask{
Name: strings.TrimSpace(input.Name),
Type: normalizeBackupTaskType(input.Type),
Enabled: input.Enabled,
CronExpr: strings.TrimSpace(input.CronExpr),
SourcePath: primarySourcePath,
SourcePaths: sourcePathsJSON,
ExcludePatterns: excludePatterns,
DBHost: strings.TrimSpace(input.DBHost),
DBPort: input.DBPort,
DBUser: strings.TrimSpace(input.DBUser),
DBPasswordCiphertext: passwordCiphertext,
DBName: strings.TrimSpace(input.DBName),
DBPath: strings.TrimSpace(input.DBPath),
ExtraConfig: extraConfigJSON,
StorageTargetID: primaryTargetID,
StorageTargets: storageTargets,
NodeID: input.NodeID,
NodePoolTag: strings.TrimSpace(input.NodePoolTag),
Tags: strings.TrimSpace(input.Tags),
RetentionDays: input.RetentionDays,
Compression: compression,
Encrypt: input.Encrypt,
MaxBackups: maxBackups,
LastStatus: "idle",
VerifyEnabled: input.VerifyEnabled,
VerifyCronExpr: strings.TrimSpace(input.VerifyCronExpr),
VerifyMode: normalizeVerifyMode(input.VerifyMode),
SLAHoursRPO: maxInt(0, input.SLAHoursRPO),
Name: strings.TrimSpace(input.Name),
Type: normalizeBackupTaskType(input.Type),
Enabled: input.Enabled,
CronExpr: strings.TrimSpace(input.CronExpr),
SourcePath: primarySourcePath,
SourcePaths: sourcePathsJSON,
ExcludePatterns: excludePatterns,
DBHost: strings.TrimSpace(input.DBHost),
DBPort: input.DBPort,
DBUser: strings.TrimSpace(input.DBUser),
DBPasswordCiphertext: passwordCiphertext,
DBName: strings.TrimSpace(input.DBName),
DBPath: strings.TrimSpace(input.DBPath),
ExtraConfig: extraConfigJSON,
StorageTargetID: primaryTargetID,
StorageTargets: storageTargets,
NodeID: input.NodeID,
NodePoolTag: strings.TrimSpace(input.NodePoolTag),
Tags: strings.TrimSpace(input.Tags),
RetentionDays: input.RetentionDays,
Compression: compression,
Encrypt: input.Encrypt,
MaxBackups: maxBackups,
LastStatus: "idle",
VerifyEnabled: input.VerifyEnabled,
VerifyCronExpr: strings.TrimSpace(input.VerifyCronExpr),
VerifyMode: normalizeVerifyMode(input.VerifyMode),
SLAHoursRPO: maxInt(0, input.SLAHoursRPO),
AlertOnConsecutiveFails: alertThreshold(input.AlertOnConsecutiveFails),
ReplicationTargetIDs: encodeUintCSV(input.ReplicationTargetIDs),
MaintenanceWindows: strings.TrimSpace(input.MaintenanceWindows),
DependsOnTaskIDs: encodeUintCSV(input.DependsOnTaskIDs),
ReplicationTargetIDs: encodeUintCSV(input.ReplicationTargetIDs),
MaintenanceWindows: strings.TrimSpace(input.MaintenanceWindows),
DependsOnTaskIDs: encodeUintCSV(input.DependsOnTaskIDs),
}
if existing != nil {
item.LastRunAt = existing.LastRunAt
@@ -736,25 +742,25 @@ func toBackupTaskSummary(item *model.BackupTask) BackupTaskSummary {
primaryName = targetNames[0]
}
return BackupTaskSummary{
ID: item.ID,
Name: item.Name,
Type: normalizeBackupTaskType(item.Type),
Enabled: item.Enabled,
CronExpr: item.CronExpr,
StorageTargetID: primaryID,
StorageTargetName: primaryName,
StorageTargetIDs: targetIDs,
StorageTargetNames: targetNames,
NodeID: item.NodeID,
NodeName: item.Node.Name,
NodePoolTag: item.NodePoolTag,
Tags: item.Tags,
RetentionDays: item.RetentionDays,
Compression: item.Compression,
Encrypt: item.Encrypt,
MaxBackups: item.MaxBackups,
LastRunAt: item.LastRunAt,
LastStatus: item.LastStatus,
ID: item.ID,
Name: item.Name,
Type: normalizeBackupTaskType(item.Type),
Enabled: item.Enabled,
CronExpr: item.CronExpr,
StorageTargetID: primaryID,
StorageTargetName: primaryName,
StorageTargetIDs: targetIDs,
StorageTargetNames: targetNames,
NodeID: item.NodeID,
NodeName: item.Node.Name,
NodePoolTag: item.NodePoolTag,
Tags: item.Tags,
RetentionDays: item.RetentionDays,
Compression: item.Compression,
Encrypt: item.Encrypt,
MaxBackups: item.MaxBackups,
LastRunAt: item.LastRunAt,
LastStatus: item.LastStatus,
VerifyEnabled: item.VerifyEnabled,
VerifyCronExpr: item.VerifyCronExpr,
VerifyMode: item.VerifyMode,
@@ -763,7 +769,7 @@ func toBackupTaskSummary(item *model.BackupTask) BackupTaskSummary {
ReplicationTargetIDs: parseUintCSV(item.ReplicationTargetIDs),
MaintenanceWindows: item.MaintenanceWindows,
DependsOnTaskIDs: parseUintCSV(item.DependsOnTaskIDs),
UpdatedAt: item.UpdatedAt,
UpdatedAt: item.UpdatedAt,
}
}

View File

@@ -3,6 +3,7 @@ package service
import (
"context"
"path/filepath"
"strings"
"testing"
"backupx/server/internal/config"
@@ -29,6 +30,82 @@ func newBackupTaskServiceForTest(t *testing.T) (*BackupTaskService, repository.S
return service, targets, tasks
}
func TestBackupTaskServiceRejectsEncryptedRemoteTasks(t *testing.T) {
ctx := context.Background()
service, targets, _ := newBackupTaskServiceForTest(t)
service.SetNodeRepository(&nodeRepoStub{nodes: []model.Node{
{ID: 41, Name: "master", Token: "master-token", Status: model.NodeStatusOnline, IsLocal: true},
{ID: 42, Name: "edge", Token: "edge-token", Status: model.NodeStatusOnline, IsLocal: false},
}})
if err := targets.Create(ctx, &model.StorageTarget{Name: "local", Type: "local_disk", Enabled: true, ConfigCiphertext: "ciphertext", ConfigVersion: 1, LastTestStatus: "unknown"}); err != nil {
t.Fatalf("seed storage target error: %v", err)
}
_, err := service.Create(ctx, BackupTaskUpsertInput{
Name: "encrypted-node-pool",
Type: "file",
Enabled: true,
SourcePath: "/srv/site",
StorageTargetID: 1,
NodePoolTag: "db",
RetentionDays: 30,
Compression: "gzip",
MaxBackups: 10,
Encrypt: true,
})
if err == nil || !strings.Contains(err.Error(), "远程节点暂不支持加密备份") {
t.Fatalf("expected encrypted node-pool task to be rejected, got %v", err)
}
created, err := service.Create(ctx, BackupTaskUpsertInput{
Name: "local-encrypted",
Type: "file",
Enabled: true,
SourcePath: "/srv/site",
StorageTargetID: 1,
RetentionDays: 30,
Compression: "gzip",
MaxBackups: 10,
Encrypt: true,
})
if err != nil {
t.Fatalf("Create local encrypted task returned error: %v", err)
}
localNodeTask, err := service.Create(ctx, BackupTaskUpsertInput{
Name: "local-node-encrypted",
Type: "file",
Enabled: true,
SourcePath: "/srv/site",
StorageTargetID: 1,
NodeID: 41,
RetentionDays: 30,
Compression: "gzip",
MaxBackups: 10,
Encrypt: true,
})
if err != nil {
t.Fatalf("Create encrypted task pinned to local node returned error: %v", err)
}
if localNodeTask.NodeID != 41 || !localNodeTask.Encrypt {
t.Fatalf("expected encrypted task to keep local node, got %#v", localNodeTask)
}
_, err = service.Update(ctx, created.ID, BackupTaskUpsertInput{
Name: created.Name,
Type: created.Type,
Enabled: true,
SourcePath: "/srv/site",
StorageTargetID: 1,
NodeID: 42,
RetentionDays: 30,
Compression: "gzip",
MaxBackups: 10,
Encrypt: true,
})
if err == nil || !strings.Contains(err.Error(), "远程节点暂不支持加密备份") {
t.Fatalf("expected encrypted fixed-node update to be rejected, got %v", err)
}
}
func TestBackupTaskServiceCreateAndGet(t *testing.T) {
ctx := context.Background()
service, targets, _ := newBackupTaskServiceForTest(t)

View File

@@ -36,6 +36,19 @@ type NodeSummary struct {
BandwidthLimit string `json:"bandwidthLimit"`
Labels string `json:"labels"`
CreatedAt time.Time `json:"createdAt"`
Queue NodeQueue `json:"queue"`
RunningTasks int `json:"runningTasks"`
LastError string `json:"lastError,omitempty"`
Health string `json:"health"`
}
type NodeQueue struct {
Pending int `json:"pending"`
Dispatched int `json:"dispatched"`
Depth int `json:"depth"`
Timeouts int `json:"timeouts"`
OldestActiveAt *time.Time `json:"oldestActiveAt,omitempty"`
OldestActiveAgeS int `json:"oldestActiveAgeSeconds"`
}
// NodeCreateInput is the input for creating a new remote node.
@@ -54,10 +67,11 @@ type NodeUpdateInput struct {
// NodeService manages the cluster nodes.
type NodeService struct {
repo repository.NodeRepository
taskRepo repository.BackupTaskRepository
agentRPC NodeAgentRPC
version string
repo repository.NodeRepository
taskRepo repository.BackupTaskRepository
agentRPC NodeAgentRPC
cmdRepo repository.AgentCommandRepository
version string
}
// NodeAgentRPC 抽象 Agent 远程调用能力(避免 service 内循环依赖)。
@@ -81,6 +95,10 @@ func (s *NodeService) SetAgentRPC(rpc NodeAgentRPC) {
s.agentRPC = rpc
}
func (s *NodeService) SetAgentCommandRepository(cmdRepo repository.AgentCommandRepository) {
s.cmdRepo = cmdRepo
}
// EnsureLocalNode creates the default "local" node if it does not exist.
func (s *NodeService) EnsureLocalNode(ctx context.Context) error {
existing, err := s.repo.FindLocal(ctx)
@@ -120,24 +138,10 @@ func (s *NodeService) List(ctx context.Context) ([]NodeSummary, error) {
if err != nil {
return nil, err
}
queueByNode := s.loadQueueSummaries(ctx)
result := make([]NodeSummary, len(nodes))
for i, n := range nodes {
result[i] = NodeSummary{
ID: n.ID,
Name: n.Name,
Hostname: n.Hostname,
IPAddress: n.IPAddress,
Status: n.Status,
IsLocal: n.IsLocal,
OS: n.OS,
Arch: n.Arch,
AgentVersion: n.AgentVer,
LastSeen: n.LastSeen,
MaxConcurrent: n.MaxConcurrent,
BandwidthLimit: n.BandwidthLimit,
Labels: n.Labels,
CreatedAt: n.CreatedAt,
}
result[i] = s.toNodeSummary(&n, queueByNode[n.ID])
}
return result, nil
}
@@ -150,7 +154,24 @@ func (s *NodeService) Get(ctx context.Context, id uint) (*NodeSummary, error) {
if node == nil {
return nil, apperror.New(http.StatusNotFound, "NODE_NOT_FOUND", "节点不存在", nil)
}
return &NodeSummary{
queueByNode := s.loadQueueSummaries(ctx)
summary := s.toNodeSummary(node, queueByNode[node.ID])
return &summary, nil
}
func (s *NodeService) loadQueueSummaries(ctx context.Context) map[uint]repository.AgentCommandQueueSummary {
if s.cmdRepo == nil {
return nil
}
summaries, err := s.cmdRepo.NodeQueueSummaries(ctx)
if err != nil {
return nil
}
return summaries
}
func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentCommandQueueSummary) NodeSummary {
summary := NodeSummary{
ID: node.ID,
Name: node.Name,
Hostname: node.Hostname,
@@ -165,7 +186,31 @@ func (s *NodeService) Get(ctx context.Context, id uint) (*NodeSummary, error) {
BandwidthLimit: node.BandwidthLimit,
Labels: node.Labels,
CreatedAt: node.CreatedAt,
}, nil
Queue: NodeQueue{
Pending: queue.Pending,
Dispatched: queue.Dispatched,
Depth: queue.Depth,
Timeouts: queue.Timeouts,
OldestActiveAt: queue.OldestActiveAt,
},
RunningTasks: queue.Running,
LastError: queue.LastError,
Health: nodeHealth(node, queue),
}
if queue.OldestActiveAt != nil {
summary.Queue.OldestActiveAgeS = int(time.Since(*queue.OldestActiveAt).Seconds())
}
return summary
}
func nodeHealth(node *model.Node, queue repository.AgentCommandQueueSummary) string {
if node.Status != model.NodeStatusOnline {
return "offline"
}
if queue.Timeouts > 0 || strings.TrimSpace(queue.LastError) != "" {
return "degraded"
}
return "healthy"
}
// Create registers a new remote node and returns its authentication token.

View File

@@ -23,6 +23,9 @@ func openNodeServiceDB(t *testing.T) *gorm.DB {
if err := db.AutoMigrate(&model.Node{}); err != nil {
t.Fatalf("migrate: %v", err)
}
if err := db.AutoMigrate(&model.AgentCommand{}); err != nil {
t.Fatalf("migrate agent commands: %v", err)
}
return db
}
@@ -157,3 +160,48 @@ func TestRotateTokenNotFound(t *testing.T) {
t.Fatalf("expected not found error")
}
}
func TestNodeServiceListIncludesQueueHealthSummary(t *testing.T) {
db := openNodeServiceDB(t)
nodeRepo := repository.NewNodeRepository(db)
cmdRepo := repository.NewAgentCommandRepository(db)
svc := NewNodeService(nodeRepo, "test")
svc.SetAgentCommandRepository(cmdRepo)
ctx := context.Background()
node := &model.Node{
Name: "edge-a",
Token: "edge-token",
Status: model.NodeStatusOnline,
IsLocal: false,
LastSeen: time.Now().UTC(),
}
if err := nodeRepo.Create(ctx, node); err != nil {
t.Fatalf("Create node returned error: %v", err)
}
old := time.Now().UTC().Add(-time.Minute)
if err := cmdRepo.Create(ctx, &model.AgentCommand{NodeID: node.ID, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusPending, CreatedAt: old}); err != nil {
t.Fatalf("Create pending command returned error: %v", err)
}
completedAt := time.Now().UTC()
if err := cmdRepo.Create(ctx, &model.AgentCommand{NodeID: node.ID, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusTimeout, ErrorMessage: "agent timeout", CompletedAt: &completedAt}); err != nil {
t.Fatalf("Create timeout command returned error: %v", err)
}
items, err := svc.List(ctx)
if err != nil {
t.Fatalf("List returned error: %v", err)
}
if len(items) != 1 {
t.Fatalf("expected one node, got %#v", items)
}
got := items[0]
if got.Queue.Pending != 1 || got.Queue.Depth != 1 || got.Queue.Timeouts != 1 {
t.Fatalf("unexpected queue summary: %#v", got.Queue)
}
if got.Health != "degraded" || got.LastError != "agent timeout" {
t.Fatalf("expected terminal command errors to degrade healthy node, got %#v", got)
}
if got.Queue.OldestActiveAt == nil || got.Queue.OldestActiveAgeS <= 0 {
t.Fatalf("expected oldest active metadata, got %#v", got.Queue)
}
}

View File

@@ -1,6 +1,7 @@
import { describe, expect, it } from 'vitest'
import type { UserInfo } from '../../services/auth'
import { canManageNodes } from './NodesPage'
import { canManageNodes, formatQueueAge, getNodeHealthView } from './NodesPage'
import type { NodeSummary } from '../../types/nodes'
function user(role: string): UserInfo {
return {
@@ -19,3 +20,58 @@ describe('canManageNodes', () => {
expect(canManageNodes(null)).toBe(false)
})
})
describe('node diagnostics helpers', () => {
it('formats queue age and health status from backend summaries', () => {
const node: NodeSummary = {
id: 1,
name: 'edge-a',
hostname: '',
ipAddress: '',
status: 'online',
isLocal: false,
os: 'linux',
arch: 'amd64',
agentVersion: 'v1',
lastSeen: '2026-05-12T00:00:00Z',
createdAt: '2026-05-12T00:00:00Z',
health: 'degraded',
lastError: 'agent timeout',
runningTasks: 1,
queue: {
pending: 2,
dispatched: 1,
depth: 3,
timeouts: 1,
oldestActiveAgeSeconds: 125,
},
}
expect(formatQueueAge(node.queue?.oldestActiveAgeSeconds)).toBe('2m')
expect(getNodeHealthView(node)).toEqual({
text: '异常',
badgeStatus: 'warning',
tagColor: 'orangered',
tooltip: 'agent timeout',
})
})
it('treats offline nodes as offline even without queue errors', () => {
const node = {
id: 2,
name: 'edge-b',
hostname: '',
ipAddress: '',
status: 'offline',
isLocal: false,
os: '',
arch: '',
agentVersion: '',
lastSeen: '',
createdAt: '',
} satisfies NodeSummary
expect(formatQueueAge(0)).toBe('-')
expect(getNodeHealthView(node).text).toBe('离线')
})
})

View File

@@ -20,6 +20,28 @@ export function canManageNodes(user: UserInfo | null | undefined): boolean {
return isAdmin(user)
}
export function formatQueueAge(seconds?: number): string {
if (!seconds || seconds <= 0) return '-'
if (seconds < 60) return `${seconds}s`
if (seconds < 3600) return `${Math.floor(seconds / 60)}m`
return `${Math.floor(seconds / 3600)}h`
}
export function getNodeHealthView(node: NodeSummary) {
if (node.status !== 'online' || node.health === 'offline') {
return { text: '离线', badgeStatus: 'default' as const, tagColor: 'gray', tooltip: '节点未在线' }
}
if (node.health === 'degraded' || node.queue?.timeouts || node.lastError) {
return {
text: '异常',
badgeStatus: 'warning' as const,
tagColor: 'orangered',
tooltip: node.lastError || '存在超时或失败的 Agent 命令',
}
}
return { text: '健康', badgeStatus: 'success' as const, tagColor: 'green', tooltip: 'Agent 心跳与队列状态正常' }
}
export default function NodesPage() {
const [nodes, setNodes] = useState<NodeSummary[]>([])
const [loading, setLoading] = useState(false)
@@ -122,10 +144,18 @@ export default function NodesPage() {
),
},
{
title: '状态', dataIndex: 'status', width: 100,
render: (status: string) => status === 'online'
? <Badge status="success" text="在线" />
: <Badge status="default" text="离线" />,
title: '健康', dataIndex: 'health', width: 150,
render: (_: string, record: NodeSummary) => {
const health = getNodeHealthView(record)
return (
<Tooltip content={health.tooltip}>
<Space size={6}>
<Badge status={health.badgeStatus} />
<Tag color={health.tagColor}>{health.text}</Tag>
</Space>
</Tooltip>
)
},
},
{ title: '主机名', dataIndex: 'hostname', render: (v: string) => v || '-' },
{ title: 'IP 地址', dataIndex: 'ipAddress', render: (v: string) => v || '-' },
@@ -138,6 +168,27 @@ export default function NodesPage() {
title: 'Agent 版本', dataIndex: 'agentVersion', width: 140,
render: (v: string) => renderAgentVersion(v, masterVersion),
},
{
title: '队列', dataIndex: 'queue', width: 160,
render: (_: unknown, record: NodeSummary) => {
const queue = record.queue
if (!queue || queue.depth === 0) {
return <Text type="secondary"></Text>
}
return (
<Tooltip content={`pending ${queue.pending} / dispatched ${queue.dispatched} / oldest ${formatQueueAge(queue.oldestActiveAgeSeconds)}`}>
<Space size={4}>
<Tag color="arcoblue"> {queue.depth}</Tag>
{queue.timeouts > 0 && <Tag color="orangered"> {queue.timeouts}</Tag>}
</Space>
</Tooltip>
)
},
},
{
title: '运行中', dataIndex: 'runningTasks', width: 90,
render: (v: number | undefined) => v && v > 0 ? <Tag color="green">{v}</Tag> : <Text type="secondary">0</Text>,
},
{
title: '标签 / 节点池', dataIndex: 'labels', width: 180,
render: (v: string) => {

View File

@@ -14,6 +14,19 @@ export interface NodeSummary {
/** CSV 节点标签;任务的 NodePoolTag 命中这里任一即会被调度到本节点 */
labels?: string
createdAt: string
queue?: NodeQueueSummary
runningTasks?: number
lastError?: string
health?: 'healthy' | 'degraded' | 'offline'
}
export interface NodeQueueSummary {
pending: number
dispatched: number
depth: number
timeouts: number
oldestActiveAt?: string
oldestActiveAgeSeconds?: number
}
export interface DirEntry {