diff --git a/deploy/docker/entrypoint.sh b/deploy/docker/entrypoint.sh index fd92b64..35c7114 100644 --- a/deploy/docker/entrypoint.sh +++ b/deploy/docker/entrypoint.sh @@ -1,6 +1,10 @@ #!/bin/sh set -e +if [ "${1:-}" = "agent" ]; then + exec /app/bin/backupx "$@" +fi + # Backend listens on internal port 8341, Nginx exposes 8340 export BACKUPX_SERVER_PORT="${BACKUPX_SERVER_PORT_INTERNAL:-8341}" diff --git a/docs-site/docs/features/backup-types.md b/docs-site/docs/features/backup-types.md index f309419..5285c20 100644 --- a/docs-site/docs/features/backup-types.md +++ b/docs-site/docs/features/backup-types.md @@ -8,6 +8,8 @@ description: File, MySQL, PostgreSQL, SQLite and SAP HANA — what they back up BackupX supports five built-in backup types. Type determines which runner executes the job. +When a task is routed to a remote Agent, the source tools and paths are resolved on that Agent host. Multi-target uploads are still tracked per storage target; if at least one target succeeds, the backup record is marked successful and the per-target result table shows partial failures. + ## File / Directory Tars (and optionally gzips) one or more filesystem paths. diff --git a/docs-site/docs/features/multi-node.md b/docs-site/docs/features/multi-node.md index 0bd6fdb..0cfc477 100644 --- a/docs-site/docs/features/multi-node.md +++ b/docs-site/docs/features/multi-node.md @@ -62,6 +62,8 @@ The script runs automatically and: 5. Runs `systemctl enable --now backupx-agent` 6. Polls `/api/v1/agent/self` until the master confirms `status: online` (up to 30 s) +Docker mode uses the same `BACKUPX_AGENT_MASTER`, `BACKUPX_AGENT_TOKEN`, and `BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp` environment contract. After starting the container, the installer also probes `/api/v1/agent/self`; if the node does not come online, it prints `docker ps` and `docker logs --tail=100 backupx-agent` diagnostics before exiting non-zero. + If you choose the URL-based fallback command and `curl` prints HTML or the shell reports `Syntax error: newline unexpected`, the install URL is being served by the web console instead of the backend. Ensure either `/api/install/` or `/install/` is forwarded to the BackupX backend, or use the embedded command generated by the console. Reruns are idempotent — to upgrade or re-provision, simply generate a new install command and run it again. The one-time install link expires after its TTL or after first consumption, whichever is sooner. @@ -81,9 +83,15 @@ In the **Backup Tasks** page, pick the target node when creating the task. When - Local (`nodeId=0`) → Master executes in-process - Remote node → Master enqueues the command → Agent claims → Agent runs locally → uploads → reports back +The node table shows the Agent health and command queue state: pending/dispatched depth, running long commands, timeouts, oldest active command age, and the latest Agent-side error. The same queue depth, running-command, and timeout snapshots are exported as Prometheus metrics: + +- `backupx_agent_command_queue_depth` +- `backupx_agent_command_running` +- `backupx_agent_command_timeout_total` + ## Known limitations -- **Encrypted backups don't work via Agent** — the Agent doesn't hold Master's AES-256 key. Tasks with `encrypt: true` will fail if routed to an Agent +- **Encrypted backups are Master-only** — the Agent doesn't hold Master's AES-256 key. Creating or updating a task with `encrypt: true` and a remote node or node pool is rejected up front - **Directory browser timeout** — remote dir listing is a synchronous RPC through the queue (15s default) - **Dispatched command timeout** — claimed-but-unfinished commands are marked `timeout` after 10 minutes diff --git a/docs-site/docs/getting-started/quick-start.md b/docs-site/docs/getting-started/quick-start.md index 9db8ac0..001f7b9 100644 --- a/docs-site/docs/getting-started/quick-start.md +++ b/docs-site/docs/getting-started/quick-start.md @@ -42,6 +42,8 @@ Go to **Backup Tasks → New**. Three steps: 2. **Source** — paths for file backup (multi-source supported), or connection info for databases 3. **Storage & policy** — pick target(s), compression, retention days, encryption on/off +For Agent-routed tasks, encryption must stay off because the Agent never receives the Master's encryption key. BackupX rejects remote-node or node-pool tasks with encryption enabled during create/update. + Save, then click **Run Now** to trigger a test. Live logs stream on the **Backup Records** page. :::note diff --git a/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/backup-types.md b/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/backup-types.md index 8a30dda..df97237 100644 --- a/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/backup-types.md +++ b/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/backup-types.md @@ -8,6 +8,8 @@ description: 文件、MySQL、PostgreSQL、SQLite 和 SAP HANA — 各自的能 BackupX 支持五种内置备份类型,类型决定了用哪个 runner 执行。 +当任务路由到远程 Agent 时,源路径和外部工具都会在该 Agent 主机上解析。多存储目标上传仍会逐目标记录结果;只要至少一个目标上传成功,备份记录即为成功,详情中的目标结果表会展示部分失败。 + ## 文件 / 目录 打包(可选 gzip)一个或多个文件系统路径。 diff --git a/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/multi-node.md b/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/multi-node.md index 22804cf..9b5a540 100644 --- a/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/multi-node.md +++ b/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/features/multi-node.md @@ -62,6 +62,8 @@ Web 控制台 → **节点管理** → **添加节点**,打开三步向导: 5. 执行 `systemctl enable --now backupx-agent` 6. 轮询 `/api/v1/agent/self`,直到 Master 确认 `status: online`(最多 30 秒) +Docker 模式使用同一组环境变量约定:`BACKUPX_AGENT_MASTER`、`BACKUPX_AGENT_TOKEN` 和 `BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp`。容器启动后,安装脚本同样会探测 `/api/v1/agent/self`;如果节点没有上线,会输出 `docker ps` 与 `docker logs --tail=100 backupx-agent` 排查命令,并以非零状态退出。 + 如果使用 URL 备用命令时 `curl` 输出 HTML,或 shell 报 `Syntax error: newline unexpected`,说明安装 URL 被 Web 控制台接管而不是转发到后端。需要确保 `/api/install/` 或 `/install/` 至少一个路径能转发到 BackupX 后端,或改用控制台生成的嵌入式命令。 脚本是幂等的:升级或重装只需重新生成一条安装命令再跑一次。一次性安装链接在 TTL 到期或被首次消费后立即作废。 @@ -81,9 +83,15 @@ Web 控制台 → **节点管理** → **添加节点**,打开三步向导: - 本机 / 未指定(`nodeId=0`):Master 进程内直接执行 - 远程节点:Master 写入命令队列 → Agent 拉取 → Agent 本地执行 → 上传 → 回报 +节点列表会展示 Agent 健康与命令队列状态:pending/dispatched 深度、运行中的长任务、超时数、最旧活跃命令年龄和最近 Agent 错误。同样的队列深度、运行中命令数和超时快照会导出为 Prometheus 指标: + +- `backupx_agent_command_queue_depth` +- `backupx_agent_command_running` +- `backupx_agent_command_timeout_total` + ## 已知限制 -- **Agent 不支持加密备份**:Agent 不持有 Master 的 AES-256 密钥。`encrypt: true` 的任务路由到 Agent 时会直接上报失败 +- **加密备份仅支持 Master 本机执行**:Agent 不持有 Master 的 AES-256 密钥。创建或更新任务时,如果 `encrypt: true` 且选择了远程节点或节点池,会在入口直接拒绝 - **目录浏览超时**:远程目录浏览通过命令队列做同步 RPC,默认 15s 超时 - **派发命令超时**:Agent 领取但未完成的命令超过 10 分钟会被置 `timeout` diff --git a/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/getting-started/quick-start.md b/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/getting-started/quick-start.md index 583c6d3..aee774d 100644 --- a/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/getting-started/quick-start.md +++ b/docs-site/i18n/zh-CN/docusaurus-plugin-content-docs/current/getting-started/quick-start.md @@ -42,6 +42,8 @@ description: 部署 BackupX、添加存储目标、创建第一个备份任务 2. **源配置** — 文件备份选择源路径(支持多个),数据库备份填写连接信息 3. **存储与策略** — 选择存储目标(支持多个)、压缩策略、保留天数、是否加密 +对于路由到 Agent 的任务,加密必须关闭,因为 Agent 不会拿到 Master 的加密密钥。BackupX 会在创建/更新阶段拒绝开启加密的远程节点或节点池任务。 + 保存后可点击 **立即执行** 测试,**备份记录** 页面实时查看执行日志。 :::note diff --git a/server/internal/agent/client.go b/server/internal/agent/client.go index 6b22c86..7864b24 100644 --- a/server/internal/agent/client.go +++ b/server/internal/agent/client.go @@ -143,13 +143,24 @@ func (c *MasterClient) GetTaskSpec(ctx context.Context, taskID uint) (*TaskSpec, // RecordUpdate 与 service.AgentRecordUpdate 对齐 type RecordUpdate struct { - Status string `json:"status,omitempty"` - FileName string `json:"fileName,omitempty"` - FileSize int64 `json:"fileSize,omitempty"` - Checksum string `json:"checksum,omitempty"` - StoragePath string `json:"storagePath,omitempty"` - ErrorMessage string `json:"errorMessage,omitempty"` - LogAppend string `json:"logAppend,omitempty"` + Status string `json:"status,omitempty"` + FileName string `json:"fileName,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` + Checksum string `json:"checksum,omitempty"` + StoragePath string `json:"storagePath,omitempty"` + StorageTargetID uint `json:"storageTargetId,omitempty"` + StorageUploadResults []StorageResultItem `json:"storageUploadResults,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + LogAppend string `json:"logAppend,omitempty"` +} + +type StorageResultItem struct { + StorageTargetID uint `json:"storageTargetId"` + StorageTargetName string `json:"storageTargetName"` + Status string `json:"status"` + StoragePath string `json:"storagePath,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` + Error string `json:"error,omitempty"` } // UpdateRecord 上报备份记录的状态/日志 diff --git a/server/internal/agent/executor.go b/server/internal/agent/executor.go index ffef445..a6ff6f0 100644 --- a/server/internal/agent/executor.go +++ b/server/internal/agent/executor.go @@ -126,22 +126,52 @@ func (e *Executor) ExecuteRunTask(ctx context.Context, taskID, recordID uint) er e.reportRecordFailure(ctx, recordID, "没有关联的存储目标") return fmt.Errorf("no storage targets") } + uploadResults := make([]StorageResultItem, 0, len(spec.StorageTargets)) + selectedStorageTargetID := uint(0) + var uploadErrors []string for _, target := range spec.StorageTargets { if err := e.uploadToTarget(ctx, recordID, target, finalPath, storagePath, fileSize, spec.TaskID); err != nil { - e.reportRecordFailure(ctx, recordID, fmt.Sprintf("上传到 %s 失败: %v", target.Name, err)) - return err + uploadResults = append(uploadResults, StorageResultItem{ + StorageTargetID: target.ID, + StorageTargetName: target.Name, + Status: "failed", + Error: err.Error(), + }) + uploadErrors = append(uploadErrors, fmt.Sprintf("%s: %v", target.Name, err)) + e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 上传到存储目标 %s 失败: %v\n", target.Name, err)) + continue } + if selectedStorageTargetID == 0 { + selectedStorageTargetID = target.ID + } + uploadResults = append(uploadResults, StorageResultItem{ + StorageTargetID: target.ID, + StorageTargetName: target.Name, + Status: "success", + StoragePath: storagePath, + FileSize: fileSize, + }) e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 已上传到存储目标 %s\n", target.Name)) } + if selectedStorageTargetID == 0 { + msg := strings.Join(uploadErrors, "; ") + if msg == "" { + msg = "所有存储目标上传均失败" + } + e.reportRecordFailureWithUploadResults(ctx, recordID, msg, uploadResults) + return fmt.Errorf("%s", msg) + } // 6) 上报最终成功 return e.client.UpdateRecord(ctx, recordID, RecordUpdate{ - Status: "success", - FileName: fileName, - FileSize: fileSize, - Checksum: checksum, - StoragePath: storagePath, - LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize), + Status: "success", + FileName: fileName, + FileSize: fileSize, + Checksum: checksum, + StoragePath: storagePath, + StorageTargetID: selectedStorageTargetID, + StorageUploadResults: uploadResults, + LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize), }) } @@ -177,10 +207,15 @@ func (e *Executor) appendLog(ctx context.Context, recordID uint, line string) { // reportRecordFailure 上报失败状态 func (e *Executor) reportRecordFailure(ctx context.Context, recordID uint, msg string) { + e.reportRecordFailureWithUploadResults(ctx, recordID, msg, nil) +} + +func (e *Executor) reportRecordFailureWithUploadResults(ctx context.Context, recordID uint, msg string, uploadResults []StorageResultItem) { _ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{ - Status: "failed", - ErrorMessage: msg, - LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg), + Status: "failed", + ErrorMessage: msg, + StorageUploadResults: uploadResults, + LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg), }) } diff --git a/server/internal/agent/executor_test.go b/server/internal/agent/executor_test.go index ee40adf..d4f35ff 100644 --- a/server/internal/agent/executor_test.go +++ b/server/internal/agent/executor_test.go @@ -1,9 +1,20 @@ package agent import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" "reflect" + "strings" "testing" "time" + + "backupx/server/internal/storage" ) func TestBuildBackupTaskSpecParsesJSONSourcePaths(t *testing.T) { @@ -32,3 +43,191 @@ func TestParseStringListFieldKeepsLegacyLineFormat(t *testing.T) { t.Fatalf("paths = %#v, want %#v", got, want) } } + +func TestExecuteRunTaskRecordsPerTargetUploadResults(t *testing.T) { + sourceDir := t.TempDir() + if err := os.WriteFile(filepath.Join(sourceDir, "index.html"), []byte("hello"), 0o644); err != nil { + t.Fatalf("WriteFile returned error: %v", err) + } + var finalUpdate RecordUpdate + var updates []RecordUpdate + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/agent/tasks/1": + writeAgentEnvelope(t, w, TaskSpec{ + TaskID: 1, + Name: "site", + Type: "file", + SourcePath: sourceDir, + Compression: "gzip", + StorageTargets: []StorageTargetConfig{ + {ID: 11, Name: "broken", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"broken"}`)}, + {ID: 12, Name: "ok", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"ok"}`)}, + }, + }) + case r.Method == http.MethodPost && r.URL.Path == "/api/agent/records/99": + var update RecordUpdate + if err := json.NewDecoder(r.Body).Decode(&update); err != nil { + t.Fatalf("Decode update returned error: %v", err) + } + updates = append(updates, update) + if update.Status != "" { + finalUpdate = update + } + writeAgentEnvelope(t, w, map[string]string{"status": "ok"}) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + executor := NewExecutor(NewMasterClient(server.URL, "token", false), filepath.Join(t.TempDir(), "tmp")) + executor.storageRegistry = storage.NewRegistry(&agentTestStorageFactory{ + providers: map[string]*agentTestStorageProvider{ + "broken": {name: "broken", failUpload: true}, + "ok": {name: "ok", objects: map[string][]byte{}}, + }, + }) + + if err := executor.ExecuteRunTask(context.Background(), 1, 99); err != nil { + t.Fatalf("ExecuteRunTask returned error: %v", err) + } + if len(updates) == 0 || finalUpdate.Status != "success" { + t.Fatalf("expected final success update, got updates=%#v final=%#v", updates, finalUpdate) + } + if finalUpdate.StorageTargetID != 12 { + t.Fatalf("expected first successful target 12, got %d", finalUpdate.StorageTargetID) + } + if len(finalUpdate.StorageUploadResults) != 2 { + t.Fatalf("expected two upload results, got %#v", finalUpdate.StorageUploadResults) + } + if finalUpdate.StorageUploadResults[0].Status != "failed" || finalUpdate.StorageUploadResults[1].Status != "success" { + t.Fatalf("unexpected upload results: %#v", finalUpdate.StorageUploadResults) + } + if finalUpdate.StoragePath == "" || finalUpdate.FileSize <= 0 || finalUpdate.Checksum == "" { + t.Fatalf("expected artifact metadata in final update, got %#v", finalUpdate) + } +} + +func TestExecuteRunTaskReportsPerTargetUploadResultsWhenAllTargetsFail(t *testing.T) { + sourceDir := t.TempDir() + if err := os.WriteFile(filepath.Join(sourceDir, "index.html"), []byte("hello"), 0o644); err != nil { + t.Fatalf("WriteFile returned error: %v", err) + } + var finalUpdate RecordUpdate + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/agent/tasks/1": + writeAgentEnvelope(t, w, TaskSpec{ + TaskID: 1, + Name: "site", + Type: "file", + SourcePath: sourceDir, + Compression: "gzip", + StorageTargets: []StorageTargetConfig{ + {ID: 11, Name: "broken-a", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"broken-a"}`)}, + {ID: 12, Name: "broken-b", Type: "agent_test_storage", Config: json.RawMessage(`{"name":"broken-b"}`)}, + }, + }) + case r.Method == http.MethodPost && r.URL.Path == "/api/agent/records/99": + var update RecordUpdate + if err := json.NewDecoder(r.Body).Decode(&update); err != nil { + t.Fatalf("Decode update returned error: %v", err) + } + if update.Status != "" { + finalUpdate = update + } + writeAgentEnvelope(t, w, map[string]string{"status": "ok"}) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + executor := NewExecutor(NewMasterClient(server.URL, "token", false), filepath.Join(t.TempDir(), "tmp")) + executor.storageRegistry = storage.NewRegistry(&agentTestStorageFactory{ + providers: map[string]*agentTestStorageProvider{ + "broken-a": {name: "broken-a", failUpload: true}, + "broken-b": {name: "broken-b", failUpload: true}, + }, + }) + + if err := executor.ExecuteRunTask(context.Background(), 1, 99); err == nil { + t.Fatal("expected ExecuteRunTask to return upload failure") + } + if finalUpdate.Status != "failed" { + t.Fatalf("expected final failed update, got %#v", finalUpdate) + } + if len(finalUpdate.StorageUploadResults) != 2 { + t.Fatalf("expected failed update to keep per-target results, got %#v", finalUpdate.StorageUploadResults) + } + for _, item := range finalUpdate.StorageUploadResults { + if item.Status != "failed" || item.Error == "" { + t.Fatalf("unexpected upload result: %#v", item) + } + } +} + +type agentTestStorageFactory struct { + providers map[string]*agentTestStorageProvider +} + +func (f *agentTestStorageFactory) Type() storage.ProviderType { + return "agent_test_storage" +} + +func (f *agentTestStorageFactory) New(_ context.Context, config map[string]any) (storage.StorageProvider, error) { + name, _ := config["name"].(string) + provider := f.providers[name] + if provider == nil { + return nil, fmt.Errorf("unknown provider %q", name) + } + return provider, nil +} + +type agentTestStorageProvider struct { + name string + failUpload bool + objects map[string][]byte +} + +func (p *agentTestStorageProvider) Type() storage.ProviderType { return "agent_test_storage" } +func (p *agentTestStorageProvider) TestConnection(context.Context) error { + return nil +} +func (p *agentTestStorageProvider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error { + if p.failUpload { + return fmt.Errorf("upload failed for %s", p.name) + } + data, err := io.ReadAll(reader) + if err != nil { + return err + } + if p.objects == nil { + p.objects = map[string][]byte{} + } + p.objects[objectKey] = data + return nil +} +func (p *agentTestStorageProvider) Download(_ context.Context, objectKey string) (io.ReadCloser, error) { + data, ok := p.objects[objectKey] + if !ok { + return nil, fmt.Errorf("object %s not found", objectKey) + } + return io.NopCloser(strings.NewReader(string(data))), nil +} +func (p *agentTestStorageProvider) Delete(_ context.Context, objectKey string) error { + delete(p.objects, objectKey) + return nil +} +func (p *agentTestStorageProvider) List(context.Context, string) ([]storage.ObjectInfo, error) { + return nil, nil +} + +func writeAgentEnvelope(t *testing.T, w http.ResponseWriter, data any) { + t.Helper() + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{"code": "OK", "data": data}); err != nil { + t.Fatalf("Encode response returned error: %v", err) + } +} diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 8b90960..336d0c0 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -131,6 +131,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, // Agent 协议服务:命令队列 + 任务下发 + 记录上报 agentCmdRepo := repository.NewAgentCommandRepository(db) + nodeService.SetAgentCommandRepository(agentCmdRepo) agentService := service.NewAgentService(nodeRepo, backupTaskRepo, backupRecordRepo, storageTargetRepo, agentCmdRepo, configCipher) agentService.SetRestoreRepository(restoreRecordRepo) agentService.StartCommandTimeoutMonitor(ctx, 30*time.Second, 10*time.Minute) @@ -240,7 +241,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, replicationService.SetMetrics(appMetrics) metricsCollector := metrics.NewCollector( appMetrics, - metrics.NewRepoSource(storageTargetRepo, backupRecordRepo, nodeRepo, backupTaskRepo), + metrics.NewRepoSource(storageTargetRepo, backupRecordRepo, nodeRepo, backupTaskRepo, agentCmdRepo), 30*time.Second, ) metricsCollector.Start(ctx) diff --git a/server/internal/installscript/renderer_test.go b/server/internal/installscript/renderer_test.go index 079c648..91f0601 100644 --- a/server/internal/installscript/renderer_test.go +++ b/server/internal/installscript/renderer_test.go @@ -1,6 +1,8 @@ package installscript import ( + "os" + "path/filepath" "strings" "testing" @@ -85,11 +87,38 @@ func TestRenderScriptDocker(t *testing.T) { if !strings.Contains(got, "awuqing/backupx:${AGENT_VERSION}") { t.Errorf("docker script missing image tag reference:\n%s", got) } + if !strings.Contains(got, `"awuqing/backupx:${AGENT_VERSION}" agent`) { + t.Errorf("docker script must start image in agent mode:\n%s", got) + } + if !strings.Contains(got, `-e "BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp"`) { + t.Errorf("docker script missing temp dir env:\n%s", got) + } + if !strings.Contains(got, `docker logs --tail=100 backupx-agent`) { + t.Errorf("docker script missing diagnostic log command:\n%s", got) + } + if !strings.Contains(got, `grep -q '"status":"online"'`) { + t.Errorf("docker script missing online probe:\n%s", got) + } if strings.Contains(got, "systemctl daemon-reload") { t.Errorf("docker script should not reference systemctl:\n%s", got) } } +func TestDockerEntrypointForwardsAgentSubcommand(t *testing.T) { + entrypointPath := filepath.Join("..", "..", "..", "deploy", "docker", "entrypoint.sh") + got, err := os.ReadFile(entrypointPath) + if err != nil { + t.Fatalf("read docker entrypoint: %v", err) + } + script := string(got) + if !strings.Contains(script, `"${1:-}" = "agent"`) { + t.Fatalf("entrypoint must detect the agent subcommand before starting server:\n%s", script) + } + if !strings.Contains(script, `exec /app/bin/backupx "$@"`) { + t.Fatalf("entrypoint must exec backupx with forwarded args:\n%s", script) + } +} + func TestRenderComposeYaml(t *testing.T) { ctx := testCtx ctx.Mode = model.InstallModeDocker @@ -100,9 +129,15 @@ func TestRenderComposeYaml(t *testing.T) { if !strings.Contains(got, "image: awuqing/backupx:v1.7.0") { t.Errorf("compose missing image:\n%s", got) } + if !strings.Contains(got, `command: ["agent"]`) { + t.Errorf("compose must start image in agent mode:\n%s", got) + } if !strings.Contains(got, `BACKUPX_AGENT_TOKEN: "deadbeefcafebabe0123456789abcdef0123456789abcdef0123456789abcdef"`) { t.Errorf("compose missing token env:\n%s", got) } + if !strings.Contains(got, `BACKUPX_AGENT_TEMP_DIR: "/var/lib/backupx-agent/tmp"`) { + t.Errorf("compose missing temp dir env:\n%s", got) + } if !strings.Contains(got, "/var/lib/backupx-agent:/var/lib/backupx-agent") { t.Errorf("compose missing agent data volume:\n%s", got) } diff --git a/server/internal/installscript/templates/agent-compose.yml.tmpl b/server/internal/installscript/templates/agent-compose.yml.tmpl index 2acad59..a49b072 100644 --- a/server/internal/installscript/templates/agent-compose.yml.tmpl +++ b/server/internal/installscript/templates/agent-compose.yml.tmpl @@ -9,5 +9,6 @@ services: environment: BACKUPX_AGENT_MASTER: "{{.MasterURL}}" BACKUPX_AGENT_TOKEN: "{{.AgentToken}}" + BACKUPX_AGENT_TEMP_DIR: "/var/lib/backupx-agent/tmp" volumes: - /var/lib/backupx-agent:/var/lib/backupx-agent diff --git a/server/internal/installscript/templates/agent-install.sh.tmpl b/server/internal/installscript/templates/agent-install.sh.tmpl index 84292f0..fdee8dc 100644 --- a/server/internal/installscript/templates/agent-install.sh.tmpl +++ b/server/internal/installscript/templates/agent-install.sh.tmpl @@ -109,7 +109,20 @@ docker rm -f backupx-agent >/dev/null 2>&1 || true docker run -d --name backupx-agent --restart=unless-stopped \ -e "BACKUPX_AGENT_MASTER=${MASTER_URL}" \ -e "BACKUPX_AGENT_TOKEN=${AGENT_TOKEN}" \ + -e "BACKUPX_AGENT_TEMP_DIR=/var/lib/backupx-agent/tmp" \ -v /var/lib/backupx-agent:/var/lib/backupx-agent \ "awuqing/backupx:${AGENT_VERSION}" agent -echo "✓ 容器已启动" +echo "✓ 容器已启动,等待节点上线" +for i in $(seq 1 15); do + sleep 2 + if curl -fsSL -H "X-Agent-Token: ${AGENT_TOKEN}" "${MASTER_URL}/api/v1/agent/self" 2>/dev/null \ + | grep -q '"status":"online"'; then + echo "✓ 节点已上线" + exit 0 + fi +done +echo "⚠ 30s 内未收到上线心跳,请检查容器状态、网络与 Master URL。" +echo "排查命令:docker ps -a --filter name=backupx-agent" +echo "排查命令:docker logs --tail=100 backupx-agent" +exit 2 {{end}} diff --git a/server/internal/metrics/collector.go b/server/internal/metrics/collector.go index fa546e7..68a2dcd 100644 --- a/server/internal/metrics/collector.go +++ b/server/internal/metrics/collector.go @@ -13,16 +13,18 @@ type SampleSource interface { ListStorageTargets(ctx context.Context) ([]model.StorageTarget, error) StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error) ListNodes(ctx context.Context) ([]model.Node, error) + AgentQueueSummaries(ctx context.Context) (map[uint]repository.AgentCommandQueueSummary, error) CountSLABreach(ctx context.Context) (int, error) } // repoSource 把 repository 适配到 SampleSource。 type repoSource struct { - targets repository.StorageTargetRepository - records repository.BackupRecordRepository - nodes repository.NodeRepository - tasks repository.BackupTaskRepository - now func() time.Time + targets repository.StorageTargetRepository + records repository.BackupRecordRepository + nodes repository.NodeRepository + tasks repository.BackupTaskRepository + commands repository.AgentCommandRepository + now func() time.Time } // NewRepoSource 用仓储实例构造 SampleSource。 @@ -31,13 +33,15 @@ func NewRepoSource( records repository.BackupRecordRepository, nodes repository.NodeRepository, tasks repository.BackupTaskRepository, + commands repository.AgentCommandRepository, ) SampleSource { return &repoSource{ - targets: targets, - records: records, - nodes: nodes, - tasks: tasks, - now: func() time.Time { return time.Now().UTC() }, + targets: targets, + records: records, + nodes: nodes, + tasks: tasks, + commands: commands, + now: func() time.Time { return time.Now().UTC() }, } } @@ -53,6 +57,13 @@ func (s *repoSource) ListNodes(ctx context.Context) ([]model.Node, error) { return s.nodes.List(ctx) } +func (s *repoSource) AgentQueueSummaries(ctx context.Context) (map[uint]repository.AgentCommandQueueSummary, error) { + if s.commands == nil { + return nil, nil + } + return s.commands.NodeQueueSummaries(ctx) +} + // CountSLABreach 统计当前违反 RPO 的任务: // - 任务启用且配置了 SLAHoursRPO > 0 // - 最近一次成功备份距今超出 SLA 时间窗,或从未成功过 @@ -136,7 +147,9 @@ func (c *Collector) collect(ctx context.Context) { } // 节点在线状态:role 约定为 master / agent if nodes, err := c.source.ListNodes(ctx); err == nil { + queueByNode, _ := c.source.AgentQueueSummaries(ctx) c.metrics.ResetNodeOnline() + c.metrics.ResetAgentQueue() for i := range nodes { n := &nodes[i] role := "agent" @@ -144,6 +157,8 @@ func (c *Collector) collect(ctx context.Context) { role = "master" } c.metrics.SetNodeOnline(n.Name, role, n.Status == model.NodeStatusOnline) + queue := queueByNode[n.ID] + c.metrics.SetAgentQueue(n.Name, role, queue.Depth, queue.Running, queue.Timeouts) } } if breach, err := c.source.CountSLABreach(ctx); err == nil { diff --git a/server/internal/metrics/registry.go b/server/internal/metrics/registry.go index 2fe4692..cb38d0e 100644 --- a/server/internal/metrics/registry.go +++ b/server/internal/metrics/registry.go @@ -31,6 +31,12 @@ type Metrics struct { StorageUsedBytes *prometheus.GaugeVec // 节点在线状态(labels: node_name, role;value: 0/1) NodeOnline *prometheus.GaugeVec + // Agent 命令队列深度(labels: node_name, role) + AgentCommandQueueDepth *prometheus.GaugeVec + // Agent 正在执行的长命令数(labels: node_name, role) + AgentCommandRunning *prometheus.GaugeVec + // Agent 命令超时累计数快照(labels: node_name, role) + AgentCommandTimeoutTotal *prometheus.GaugeVec // 验证演练结果(labels: status) VerifyRunTotal *prometheus.CounterVec // 恢复操作结果(labels: status) @@ -78,6 +84,18 @@ func New(version string) *Metrics { Name: "backupx_node_online", Help: "集群节点在线状态(1 在线 / 0 离线)", }, []string{"node_name", "role"}), + AgentCommandQueueDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backupx_agent_command_queue_depth", + Help: "Agent 当前 pending/dispatched 命令总数", + }, []string{"node_name", "role"}), + AgentCommandRunning: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backupx_agent_command_running", + Help: "Agent 当前正在执行的长命令数", + }, []string{"node_name", "role"}), + AgentCommandTimeoutTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "backupx_agent_command_timeout_total", + Help: "Agent 已超时命令数快照", + }, []string{"node_name", "role"}), VerifyRunTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "backupx_verify_run_total", Help: "备份验证演练执行总数", @@ -106,6 +124,9 @@ func New(version string) *Metrics { m.TaskRunningGauge, m.StorageUsedBytes, m.NodeOnline, + m.AgentCommandQueueDepth, + m.AgentCommandRunning, + m.AgentCommandTimeoutTotal, m.VerifyRunTotal, m.RestoreRunTotal, m.ReplicationRunTotal, @@ -208,6 +229,24 @@ func (m *Metrics) ResetNodeOnline() { m.NodeOnline.Reset() } +func (m *Metrics) SetAgentQueue(name, role string, depth, running, timeoutCount int) { + if m == nil { + return + } + m.AgentCommandQueueDepth.WithLabelValues(name, role).Set(float64(depth)) + m.AgentCommandRunning.WithLabelValues(name, role).Set(float64(running)) + m.AgentCommandTimeoutTotal.WithLabelValues(name, role).Set(float64(timeoutCount)) +} + +func (m *Metrics) ResetAgentQueue() { + if m == nil { + return + } + m.AgentCommandQueueDepth.Reset() + m.AgentCommandRunning.Reset() + m.AgentCommandTimeoutTotal.Reset() +} + // ResetStorageUsed 清空存储目标 gauge。 func (m *Metrics) ResetStorageUsed() { if m == nil { diff --git a/server/internal/metrics/registry_test.go b/server/internal/metrics/registry_test.go index fed64e1..048c69f 100644 --- a/server/internal/metrics/registry_test.go +++ b/server/internal/metrics/registry_test.go @@ -41,9 +41,11 @@ func TestObserveTaskRun_NilReceiverIsSafe(t *testing.T) { m.DecTaskRunning() m.SetStorageUsed("a", "s3", 1) m.SetNodeOnline("n1", "master", true) + m.SetAgentQueue("n1", "agent", 2, 1, 3) m.SetSLABreach(3) m.ResetNodeOnline() m.ResetStorageUsed() + m.ResetAgentQueue() // no panic -> pass } @@ -51,6 +53,7 @@ func TestHandler_ExposesBackupxMetrics(t *testing.T) { m := New("0.0.0-test") m.ObserveTaskRun("file", "success", 1.0, 2048) m.SetNodeOnline("n1", "master", true) + m.SetAgentQueue("edge-a", "agent", 3, 1, 2) m.SetSLABreach(1) recorder := httptest.NewRecorder() @@ -66,6 +69,9 @@ func TestHandler_ExposesBackupxMetrics(t *testing.T) { "backupx_task_run_total", "backupx_task_run_duration_seconds", "backupx_node_online", + "backupx_agent_command_queue_depth", + "backupx_agent_command_running", + "backupx_agent_command_timeout_total", "backupx_sla_breach_tasks", "backupx_app_info", } { diff --git a/server/internal/repository/agent_command_repository.go b/server/internal/repository/agent_command_repository.go index c5fbb96..6c03266 100644 --- a/server/internal/repository/agent_command_repository.go +++ b/server/internal/repository/agent_command_repository.go @@ -35,6 +35,28 @@ type AgentCommandRepository interface { // ListPendingByNode 列出某节点下的所有 pending/dispatched 命令。 // 用于删除节点或节点离线时的清理。 ListPendingByNode(ctx context.Context, nodeID uint) ([]model.AgentCommand, error) + NodeQueueSummaries(ctx context.Context) (map[uint]AgentCommandQueueSummary, error) +} + +type AgentCommandQueueSummary struct { + NodeID uint `json:"nodeId"` + Pending int `json:"pending"` + Dispatched int `json:"dispatched"` + Running int `json:"running"` + Depth int `json:"depth"` + Timeouts int `json:"timeouts"` + LastError string `json:"lastError,omitempty"` + OldestActiveAt *time.Time `json:"oldestActiveAt,omitempty"` +} + +type agentCommandTimeoutCount struct { + NodeID uint + Count int +} + +type agentCommandLastError struct { + NodeID uint + ErrorMessage string } type GormAgentCommandRepository struct { @@ -186,3 +208,114 @@ func (r *GormAgentCommandRepository) ListPendingByNode(ctx context.Context, node } return items, nil } + +func (r *GormAgentCommandRepository) NodeQueueSummaries(ctx context.Context) (map[uint]AgentCommandQueueSummary, error) { + summaries, err := r.activeQueueSummaries(ctx) + if err != nil { + return nil, err + } + if err := r.applyTerminalQueueStats(ctx, summaries); err != nil { + return nil, err + } + return summaries, nil +} + +func (r *GormAgentCommandRepository) activeQueueSummaries(ctx context.Context) (map[uint]AgentCommandQueueSummary, error) { + var items []model.AgentCommand + if err := r.db.WithContext(ctx). + Where("status IN ?", []string{ + model.AgentCommandStatusPending, + model.AgentCommandStatusDispatched, + }). + Order("node_id asc, id asc"). + Find(&items).Error; err != nil { + return nil, err + } + summaries := make(map[uint]AgentCommandQueueSummary) + for i := range items { + cmd := &items[i] + summary := summaries[cmd.NodeID] + summary.NodeID = cmd.NodeID + switch cmd.Status { + case model.AgentCommandStatusPending: + summary.Pending++ + summary.Depth++ + summary.OldestActiveAt = oldestTime(summary.OldestActiveAt, &cmd.CreatedAt) + case model.AgentCommandStatusDispatched: + summary.Dispatched++ + summary.Depth++ + if isLongRunningAgentCommand(cmd.Type) { + summary.Running++ + } + summary.OldestActiveAt = oldestTime(summary.OldestActiveAt, cmd.DispatchedAt) + } + summaries[cmd.NodeID] = summary + } + return summaries, nil +} + +func (r *GormAgentCommandRepository) applyTerminalQueueStats(ctx context.Context, summaries map[uint]AgentCommandQueueSummary) error { + var timeoutCounts []agentCommandTimeoutCount + if err := r.db.WithContext(ctx). + Model(&model.AgentCommand{}). + Select("node_id, COUNT(*) AS count"). + Where("status = ?", model.AgentCommandStatusTimeout). + Group("node_id"). + Scan(&timeoutCounts).Error; err != nil { + return err + } + for _, item := range timeoutCounts { + summary := summaries[item.NodeID] + summary.NodeID = item.NodeID + summary.Timeouts = item.Count + summaries[item.NodeID] = summary + } + + terminalStatuses := []string{ + model.AgentCommandStatusFailed, + model.AgentCommandStatusTimeout, + } + latestByNode := r.db.WithContext(ctx). + Model(&model.AgentCommand{}). + Select("node_id, MAX(COALESCE(completed_at, updated_at, created_at)) AS last_error_at"). + Where("status IN ? AND error_message <> ''", terminalStatuses). + Group("node_id") + + var lastErrors []agentCommandLastError + if err := r.db.WithContext(ctx). + Table("agent_commands AS cmd"). + Select("cmd.node_id, cmd.error_message"). + Joins("JOIN (?) latest ON latest.node_id = cmd.node_id AND latest.last_error_at = COALESCE(cmd.completed_at, cmd.updated_at, cmd.created_at)", latestByNode). + Where("cmd.status IN ? AND cmd.error_message <> ''", terminalStatuses). + Order("cmd.node_id asc, cmd.id desc"). + Scan(&lastErrors).Error; err != nil { + return err + } + seenLastError := make(map[uint]struct{}, len(lastErrors)) + for _, item := range lastErrors { + if _, ok := seenLastError[item.NodeID]; ok { + continue + } + summary := summaries[item.NodeID] + summary.NodeID = item.NodeID + summary.LastError = item.ErrorMessage + summaries[item.NodeID] = summary + seenLastError[item.NodeID] = struct{}{} + } + return nil +} + +func oldestTime(current *time.Time, candidate *time.Time) *time.Time { + if candidate == nil { + return current + } + if current == nil || candidate.Before(*current) { + value := *candidate + return &value + } + return current +} + +func isLongRunningAgentCommand(commandType string) bool { + return commandType == model.AgentCommandTypeRunTask || commandType == model.AgentCommandTypeRestoreRecord +} diff --git a/server/internal/repository/agent_command_repository_test.go b/server/internal/repository/agent_command_repository_test.go index 6f68913..b9689e9 100644 --- a/server/internal/repository/agent_command_repository_test.go +++ b/server/internal/repository/agent_command_repository_test.go @@ -218,3 +218,44 @@ func TestAgentCommandRepository_ListStaleActiveIncludesPendingAndDispatched(t *t t.Fatalf("unexpected stale active order/items: %#v", items) } } + +func TestAgentCommandRepository_NodeQueueSummaries(t *testing.T) { + db := newTestDB(t) + repo := NewAgentCommandRepository(db) + ctx := context.Background() + old := time.Now().UTC().Add(-20 * time.Minute) + recent := time.Now().UTC().Add(-2 * time.Minute) + dispatchedAt := time.Now().UTC().Add(-5 * time.Minute) + completedAt := time.Now().UTC().Add(-1 * time.Minute) + commands := []*model.AgentCommand{ + {NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusPending, CreatedAt: old}, + {NodeID: 1, Type: model.AgentCommandTypeRestoreRecord, Status: model.AgentCommandStatusPending, CreatedAt: recent}, + {NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusDispatched, DispatchedAt: &dispatchedAt}, + {NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusFailed, ErrorMessage: "boom", CompletedAt: &completedAt}, + {NodeID: 1, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusTimeout, ErrorMessage: "late", CompletedAt: &recent}, + {NodeID: 2, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusPending, CreatedAt: old}, + } + for _, cmd := range commands { + if err := repo.Create(ctx, cmd); err != nil { + t.Fatalf("Create returned error: %v", err) + } + } + + summaries, err := repo.NodeQueueSummaries(ctx) + if err != nil { + t.Fatalf("NodeQueueSummaries returned error: %v", err) + } + nodeOne := summaries[1] + if nodeOne.Pending != 2 || nodeOne.Dispatched != 1 || nodeOne.Running != 1 || nodeOne.Depth != 3 { + t.Fatalf("unexpected node 1 summary: %#v", nodeOne) + } + if nodeOne.Timeouts != 1 || nodeOne.LastError != "boom" { + t.Fatalf("expected terminal timeout and latest error in summary, got %#v", nodeOne) + } + if nodeOne.OldestActiveAt == nil || !nodeOne.OldestActiveAt.Equal(old) { + t.Fatalf("expected oldest active at %s, got %#v", old, nodeOne.OldestActiveAt) + } + if nodeTwo := summaries[2]; nodeTwo.Pending != 1 || nodeTwo.Depth != 1 || nodeTwo.Timeouts != 0 || nodeTwo.LastError != "" { + t.Fatalf("unexpected node 2 summary: %#v", nodeTwo) + } +} diff --git a/server/internal/service/agent_service.go b/server/internal/service/agent_service.go index b3f551f..7723325 100644 --- a/server/internal/service/agent_service.go +++ b/server/internal/service/agent_service.go @@ -230,13 +230,15 @@ func (s *AgentService) ensureTaskSpecAccess(ctx context.Context, node *model.Nod // AgentRecordUpdate Agent 上报备份记录的最终状态。 type AgentRecordUpdate struct { - Status string `json:"status"` // running | success | failed - FileName string `json:"fileName,omitempty"` - FileSize int64 `json:"fileSize,omitempty"` - Checksum string `json:"checksum,omitempty"` - StoragePath string `json:"storagePath,omitempty"` - ErrorMessage string `json:"errorMessage,omitempty"` - LogAppend string `json:"logAppend,omitempty"` // 增量日志,追加到 record.log_content + Status string `json:"status"` // running | success | failed + FileName string `json:"fileName,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` + Checksum string `json:"checksum,omitempty"` + StoragePath string `json:"storagePath,omitempty"` + StorageTargetID uint `json:"storageTargetId,omitempty"` + StorageUploadResults []StorageUploadResultItem `json:"storageUploadResults,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + LogAppend string `json:"logAppend,omitempty"` // 增量日志,追加到 record.log_content } // UpdateRecord 更新备份记录的状态/日志。Agent 在执行过程中可多次调用。 @@ -273,6 +275,14 @@ func (s *AgentService) UpdateRecord(ctx context.Context, node *model.Node, recor if update.StoragePath != "" { record.StoragePath = update.StoragePath } + if update.StorageTargetID > 0 { + record.StorageTargetID = update.StorageTargetID + } + if len(update.StorageUploadResults) > 0 { + if resultsJSON, marshalErr := json.Marshal(update.StorageUploadResults); marshalErr == nil { + record.StorageUploadResults = string(resultsJSON) + } + } if update.ErrorMessage != "" { record.ErrorMessage = update.ErrorMessage } @@ -294,7 +304,10 @@ func (s *AgentService) UpdateRecord(ctx context.Context, node *model.Node, recor // 同步更新任务的 last_status if update.Status == model.BackupRecordStatusSuccess || update.Status == model.BackupRecordStatusFailed { task.LastStatus = update.Status - _ = s.taskRepo.Update(ctx, task) + task.LastRunAt = &record.StartedAt + if err := s.taskRepo.Update(ctx, task); err != nil { + return fmt.Errorf("update backup task summary: %w", err) + } } return nil } diff --git a/server/internal/service/agent_service_test.go b/server/internal/service/agent_service_test.go index 7f6af2b..ccda6f4 100644 --- a/server/internal/service/agent_service_test.go +++ b/server/internal/service/agent_service_test.go @@ -2,7 +2,9 @@ package service import ( "context" + "errors" "path/filepath" + "strings" "testing" "time" @@ -93,10 +95,15 @@ func TestAgentServicePooledTaskUsesRecordNodeForSpecAndRecordUpdates(t *testing. } if err := svc.UpdateRecord(ctx, owner, 1, AgentRecordUpdate{ - Status: model.BackupRecordStatusSuccess, - FileName: "backup.tar.gz", - FileSize: 123, - StoragePath: "tasks/1/backup.tar.gz", + Status: model.BackupRecordStatusSuccess, + FileName: "backup.tar.gz", + FileSize: 123, + StoragePath: "tasks/1/backup.tar.gz", + StorageTargetID: 2, + StorageUploadResults: []StorageUploadResultItem{ + {StorageTargetID: 1, StorageTargetName: "first", Status: "failed", Error: "boom"}, + {StorageTargetID: 2, StorageTargetName: "second", Status: "success", StoragePath: "tasks/1/backup.tar.gz", FileSize: 123}, + }, }); err != nil { t.Fatalf("owner UpdateRecord returned error: %v", err) } @@ -107,11 +114,60 @@ func TestAgentServicePooledTaskUsesRecordNodeForSpecAndRecordUpdates(t *testing. if updated.Status != model.BackupRecordStatusSuccess || updated.NodeID != owner.ID { t.Fatalf("unexpected updated record: %#v", updated) } + if updated.StorageTargetID != 2 { + t.Fatalf("expected successful storage target id 2, got %d", updated.StorageTargetID) + } + if !strings.Contains(updated.StorageUploadResults, `"storageTargetName":"second"`) { + t.Fatalf("expected upload results to be persisted, got %q", updated.StorageUploadResults) + } if err := svc.UpdateRecord(ctx, other, 1, AgentRecordUpdate{LogAppend: "bad"}); err == nil { t.Fatal("expected non-owner node to be forbidden from record update") } } +func TestAgentServiceUpdateRecordRefreshesTaskSummaryOnTerminalStatus(t *testing.T) { + for _, status := range []string{model.BackupRecordStatusSuccess, model.BackupRecordStatusFailed} { + t.Run(status, func(t *testing.T) { + svc, _, records, _, owner, _ := newAgentServicePoolTestHarness(t) + ctx := context.Background() + record, err := records.FindByID(ctx, 1) + if err != nil { + t.Fatalf("FindByID record returned error: %v", err) + } + + if err := svc.UpdateRecord(ctx, owner, record.ID, AgentRecordUpdate{Status: status}); err != nil { + t.Fatalf("UpdateRecord returned error: %v", err) + } + + task, err := svc.taskRepo.FindByID(ctx, record.TaskID) + if err != nil { + t.Fatalf("FindByID task returned error: %v", err) + } + if task.LastStatus != status { + t.Fatalf("expected task LastStatus %q, got %q", status, task.LastStatus) + } + if task.LastRunAt == nil || !task.LastRunAt.Equal(record.StartedAt) { + t.Fatalf("expected task LastRunAt to match record startedAt %s, got %#v", record.StartedAt, task.LastRunAt) + } + }) + } +} + +func TestAgentServiceUpdateRecordReturnsTaskSummaryUpdateError(t *testing.T) { + svc, _, _, _, owner, _ := newAgentServicePoolTestHarness(t) + ctx := context.Background() + expectedErr := errors.New("task update failed") + svc.taskRepo = &failingUpdateTaskRepo{ + BackupTaskRepository: svc.taskRepo, + err: expectedErr, + } + + err := svc.UpdateRecord(ctx, owner, 1, AgentRecordUpdate{Status: model.BackupRecordStatusSuccess}) + if !errors.Is(err, expectedErr) { + t.Fatalf("expected task update error %v, got %v", expectedErr, err) + } +} + func TestAgentServiceProcessStaleCommandsFailsPendingRunTaskRecord(t *testing.T) { svc, _, records, commands, owner, _ := newAgentServicePoolTestHarness(t) ctx := context.Background() @@ -587,3 +643,12 @@ func setBackupRecordUpdatedAt(db *gorm.DB, id uint, updatedAt time.Time) error { func setRestoreRecordUpdatedAt(db *gorm.DB, id uint, updatedAt time.Time) error { return db.Model(&model.RestoreRecord{}).Where("id = ?", id).UpdateColumn("updated_at", updatedAt).Error } + +type failingUpdateTaskRepo struct { + repository.BackupTaskRepository + err error +} + +func (r *failingUpdateTaskRepo) Update(context.Context, *model.BackupTask) error { + return r.err +} diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index 52cbcc8..daf7d71 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -52,6 +52,11 @@ type StorageUploadResultItem struct { Error string `json:"error,omitempty"` } +const ( + uploadMaxAttempts = 3 + uploadRetryBackoff = 10 * time.Second +) + type DownloadedArtifact struct { FileName string Reader io.ReadCloser @@ -96,6 +101,7 @@ type BackupExecutionService struct { retries int // rclone 底层重试次数 bandwidthLimit string // rclone 带宽限制(全局默认,节点配置可覆盖) metrics *metrics.Metrics + taskLocks sync.Map } // SetMetrics 注入 Prometheus 采集器。nil 时所有埋点退化为 no-op。 @@ -358,6 +364,11 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b if task == nil { return nil, apperror.New(404, "BACKUP_TASK_NOT_FOUND", "备份任务不存在", fmt.Errorf("backup task %d not found", id)) } + unlock := s.acquireTaskStartLock(task.ID) + defer unlock() + if err := s.ensureTaskNotRunning(ctx, task); err != nil { + return nil, err + } // 维护窗口校验:手动执行同样尊重窗口,避免业务高峰期误触发。 if strings.TrimSpace(task.MaintenanceWindows) != "" { windows := backup.ParseMaintenanceWindows(task.MaintenanceWindows) @@ -427,6 +438,27 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b return s.getRecordDetail(ctx, record.ID) } +func (s *BackupExecutionService) acquireTaskStartLock(taskID uint) func() { + value, _ := s.taskLocks.LoadOrStore(taskID, &sync.Mutex{}) + mu := value.(*sync.Mutex) + mu.Lock() + return mu.Unlock +} + +func (s *BackupExecutionService) ensureTaskNotRunning(ctx context.Context, task *model.BackupTask) error { + taskID := task.ID + items, err := s.records.List(ctx, repository.BackupRecordListOptions{TaskID: &taskID, Status: model.BackupRecordStatusRunning}) + if err != nil { + return apperror.Internal("BACKUP_RECORD_LIST_FAILED", "无法检查任务运行状态", err) + } + if len(items) == 0 { + return nil + } + return apperror.BadRequest("BACKUP_TASK_ALREADY_RUNNING", + fmt.Sprintf("任务「%s」正在运行(记录 #%d),请等待完成后再触发。", task.Name, items[0].ID), + nil) +} + // shouldNotify 按任务的告警策略决定是否发送本次通知。 // 成功结果:始终发送(方便用户确认备份状态)。 // 失败结果:仅当"最近 N 条记录(含本次)均为 failed"时发送,N = AlertOnConsecutiveFails。 @@ -678,6 +710,11 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba logger.Errorf("没有关联的存储目标") return } + storageUsage, err := s.storageUsageSnapshot(ctx) + if err != nil { + logger.Warnf("读取存储目标用量失败,跳过本次软配额校验:%v", err) + storageUsage = map[uint]int64{} + } // 并行上传到所有目标 uploadResults = make([]StorageUploadResultItem, len(targetIDs)) @@ -701,15 +738,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba } // 软限额校验:QuotaBytes > 0 时,已累计 + 本次 > 配额 → 拒绝上传 if target != nil && target.QuotaBytes > 0 { - currentUsed := int64(0) - if items, err := s.records.StorageUsage(ctx); err == nil { - for _, it := range items { - if it.StorageTargetID == targetID { - currentUsed = it.TotalSize - break - } - } - } + currentUsed := storageUsage[targetID] if currentUsed+fileSize > target.QuotaBytes { quotaMsg := fmt.Sprintf("超出存储目标 %s 的配额(%d + %d > %d)", targetName, currentUsed, fileSize, target.QuotaBytes) uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: quotaMsg} @@ -718,15 +747,18 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba } } logger.Infof("开始上传备份到存储目标:%s", targetName) - // 上传级重试:最多 3 次,指数退避(10s, 30s, 90s) - maxAttempts := 3 + // 上传级重试:最多 3 次,等待时间随 context 取消及时退出。 var lastUploadErr error var hr *hashingReader - for attempt := 1; attempt <= maxAttempts; attempt++ { + for attempt := 1; attempt <= uploadMaxAttempts; attempt++ { if attempt > 1 { - backoff := time.Duration(attempt*attempt) * 10 * time.Second + backoff := time.Duration(attempt-1) * uploadRetryBackoff logger.Warnf("存储目标 %s 第 %d 次重试(等待 %v):%v", targetName, attempt, backoff, lastUploadErr) - time.Sleep(backoff) + if waitErr := waitForUploadRetry(ctx, backoff); waitErr != nil { + uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: waitErr.Error()} + logger.Warnf("存储目标 %s 上传重试已取消:%v", targetName, waitErr) + return + } } artifact, openErr := os.Open(finalPath) if openErr != nil { @@ -756,7 +788,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba } if lastUploadErr != nil { uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: lastUploadErr.Error()} - logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, maxAttempts, lastUploadErr) + logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, uploadMaxAttempts, lastUploadErr) return } // 完整性校验:对比实际传输字节数 @@ -881,6 +913,32 @@ func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model return s.tasks.Update(ctx, task) } +func (s *BackupExecutionService) storageUsageSnapshot(ctx context.Context) (map[uint]int64, error) { + items, err := s.records.StorageUsage(ctx) + if err != nil { + return nil, fmt.Errorf("storage usage snapshot: %w", err) + } + usage := make(map[uint]int64, len(items)) + for _, item := range items { + usage[item.StorageTargetID] = item.TotalSize + } + return usage, nil +} + +func waitForUploadRetry(ctx context.Context, delay time.Duration) error { + if delay <= 0 { + return nil + } + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func (s *BackupExecutionService) resolveProvider(ctx context.Context, targetID uint) (storage.StorageProvider, error) { return s.resolveProviderForNode(ctx, targetID, 0) } diff --git a/server/internal/service/backup_execution_service_test.go b/server/internal/service/backup_execution_service_test.go index ca4eb8b..5f9ec15 100644 --- a/server/internal/service/backup_execution_service_test.go +++ b/server/internal/service/backup_execution_service_test.go @@ -2,11 +2,13 @@ package service import ( "context" + "errors" "fmt" "io" "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -40,9 +42,11 @@ func (f *testStorageFactory) New(_ context.Context, config map[string]any) (stor } type testStorageProvider struct { - name string - failUpload bool - objects map[string][]byte + name string + failUpload bool + blockUpload <-chan struct{} + onUpload func() + objects map[string][]byte } func (p *testStorageProvider) Type() storage.ProviderType { return "test_storage" } @@ -50,6 +54,12 @@ func (p *testStorageProvider) TestConnection(context.Context) error { return nil } func (p *testStorageProvider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error { + if p.blockUpload != nil { + <-p.blockUpload + } + if p.onUpload != nil { + p.onUpload() + } if p.failUpload { return fmt.Errorf("upload failed for %s", p.name) } @@ -193,6 +203,39 @@ func TestBackupExecutionServiceNodePoolSelectionDoesNotPersistTaskNodeID(t *test } } +func TestBackupExecutionServiceRejectsDuplicateRunningTask(t *testing.T) { + executionService, _, tasks, _, records, _, _ := newExecutionTestServices(t) + ctx := context.Background() + + task, err := tasks.FindByID(ctx, 1) + if err != nil { + t.Fatalf("FindByID task returned error: %v", err) + } + startedAt := time.Now().UTC() + running := &model.BackupRecord{ + TaskID: task.ID, + StorageTargetID: task.StorageTargetID, + NodeID: 0, + Status: model.BackupRecordStatusRunning, + StartedAt: startedAt, + } + if err := records.Create(ctx, running); err != nil { + t.Fatalf("Create running record returned error: %v", err) + } + + _, err = executionService.RunTaskByIDSync(ctx, task.ID) + if err == nil || !strings.Contains(err.Error(), "正在运行") { + t.Fatalf("expected duplicate running task to be rejected, got %v", err) + } + items, err := records.List(ctx, repository.BackupRecordListOptions{Status: model.BackupRecordStatusRunning}) + if err != nil { + t.Fatalf("List running records returned error: %v", err) + } + if len(items) != 1 || items[0].ID != running.ID { + t.Fatalf("expected only the original running record, got %#v", items) + } +} + func TestBackupExecutionServiceDeleteRecordDispatchesRemoteLocalDiskCleanup(t *testing.T) { executionService, _, tasks, _, records, _, _ := newExecutionTestServices(t) ctx := context.Background() @@ -334,6 +377,155 @@ func TestBackupExecutionServiceRecordsFirstSuccessfulStorageTarget(t *testing.T) } } +func TestBackupExecutionServiceUploadRetryStopsWhenContextCancelled(t *testing.T) { + executionService, _, tasks, targets, records, _, _ := newExecutionTestServices(t) + ctx, cancel := context.WithCancel(context.Background()) + var cancelOnce sync.Once + failing := &testStorageProvider{ + name: "failing", + failUpload: true, + onUpload: func() { + cancelOnce.Do(cancel) + }, + } + executionService.storageRegistry = storage.NewRegistry(&testStorageFactory{providers: map[string]*testStorageProvider{ + "failing": failing, + }}) + cipher := codec.NewConfigCipher("execution-secret") + failingConfig, err := cipher.EncryptJSON(map[string]any{"name": "failing"}) + if err != nil { + t.Fatalf("EncryptJSON returned error: %v", err) + } + if err := targets.Update(ctx, &model.StorageTarget{ + ID: 1, + Name: "local", + Type: "test_storage", + Enabled: true, + ConfigCiphertext: failingConfig, + ConfigVersion: 1, + LastTestStatus: "unknown", + }); err != nil { + t.Fatalf("Update target returned error: %v", err) + } + task, err := tasks.FindByID(ctx, 1) + if err != nil { + t.Fatalf("FindByID task returned error: %v", err) + } + startedAt := time.Now().UTC() + record := &model.BackupRecord{ + TaskID: task.ID, + StorageTargetID: task.StorageTargetID, + Status: model.BackupRecordStatusRunning, + StartedAt: startedAt, + } + if err := records.Create(ctx, record); err != nil { + t.Fatalf("Create record returned error: %v", err) + } + + done := make(chan struct{}) + go func() { + executionService.executeTask(ctx, task, record.ID, startedAt) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("expected cancelled upload retry to stop without waiting for backoff sleep") + } +} + +func TestBackupExecutionServiceReadsStorageUsageOnceForMultiTargetQuotaChecks(t *testing.T) { + executionService, _, tasks, targets, records, _, _ := newExecutionTestServices(t) + ctx := context.Background() + first := &testStorageProvider{name: "first", objects: map[string][]byte{}} + second := &testStorageProvider{name: "second", objects: map[string][]byte{}} + executionService.storageRegistry = storage.NewRegistry(&testStorageFactory{providers: map[string]*testStorageProvider{ + "first": first, + "second": second, + }}) + cipher := codec.NewConfigCipher("execution-secret") + firstConfig, err := cipher.EncryptJSON(map[string]any{"name": "first"}) + if err != nil { + t.Fatalf("EncryptJSON first returned error: %v", err) + } + secondConfig, err := cipher.EncryptJSON(map[string]any{"name": "second"}) + if err != nil { + t.Fatalf("EncryptJSON second returned error: %v", err) + } + if err := targets.Update(ctx, &model.StorageTarget{ID: 1, Name: "local", Type: "test_storage", Enabled: true, ConfigCiphertext: firstConfig, ConfigVersion: 1, LastTestStatus: "unknown", QuotaBytes: 1 << 30}); err != nil { + t.Fatalf("Update first target returned error: %v", err) + } + if err := targets.Create(ctx, &model.StorageTarget{Name: "second", Type: "test_storage", Enabled: true, ConfigCiphertext: secondConfig, ConfigVersion: 1, LastTestStatus: "unknown", QuotaBytes: 1 << 30}); err != nil { + t.Fatalf("Create second target returned error: %v", err) + } + task, err := tasks.FindByID(ctx, 1) + if err != nil { + t.Fatalf("FindByID task returned error: %v", err) + } + task.StorageTargets = []model.StorageTarget{{ID: 1}, {ID: 2}} + if err := tasks.Update(ctx, task); err != nil { + t.Fatalf("Update task returned error: %v", err) + } + executionService.records = &storageUsageCountingRecordRepo{BackupRecordRepository: records} + + detail, err := executionService.RunTaskByIDSync(ctx, task.ID) + if err != nil { + t.Fatalf("RunTaskByIDSync returned error: %v", err) + } + if detail.Status != model.BackupRecordStatusSuccess { + t.Fatalf("expected success, got %#v", detail) + } + countingRepo := executionService.records.(*storageUsageCountingRecordRepo) + if countingRepo.usageCalls != 1 { + t.Fatalf("expected StorageUsage to be called once for quota snapshot, got %d", countingRepo.usageCalls) + } + if len(first.objects) != 1 || len(second.objects) != 1 { + t.Fatalf("expected both targets to receive upload, got first=%d second=%d", len(first.objects), len(second.objects)) + } +} + +func TestBackupExecutionServiceContinuesWhenStorageUsageSnapshotFails(t *testing.T) { + executionService, _, _, targets, records, _, _ := newExecutionTestServices(t) + ctx := context.Background() + provider := &testStorageProvider{name: "primary", objects: map[string][]byte{}} + executionService.storageRegistry = storage.NewRegistry(&testStorageFactory{providers: map[string]*testStorageProvider{ + "primary": provider, + }}) + cipher := codec.NewConfigCipher("execution-secret") + configCiphertext, err := cipher.EncryptJSON(map[string]any{"name": "primary"}) + if err != nil { + t.Fatalf("EncryptJSON returned error: %v", err) + } + if err := targets.Update(ctx, &model.StorageTarget{ + ID: 1, + Name: "local", + Type: "test_storage", + Enabled: true, + ConfigCiphertext: configCiphertext, + ConfigVersion: 1, + LastTestStatus: "unknown", + QuotaBytes: 1 << 30, + }); err != nil { + t.Fatalf("Update target returned error: %v", err) + } + executionService.records = &storageUsageFailingRecordRepo{ + BackupRecordRepository: records, + err: errStorageUsageFailed, + } + + detail, err := executionService.RunTaskByIDSync(ctx, 1) + if err != nil { + t.Fatalf("RunTaskByIDSync returned error: %v", err) + } + if detail.Status != model.BackupRecordStatusSuccess { + t.Fatalf("expected success despite soft quota usage snapshot error, got %#v", detail) + } + if len(provider.objects) != 1 { + t.Fatalf("expected upload to proceed, got %d uploaded objects", len(provider.objects)) + } +} + func TestBackupRecordServiceRestore(t *testing.T) { executionService, recordService, _, _, _, sourceDir, _ := newExecutionTestServices(t) detail, err := executionService.RunTaskByIDSync(context.Background(), 1) @@ -354,3 +546,27 @@ func TestBackupRecordServiceRestore(t *testing.T) { t.Fatalf("unexpected restored content: %s", string(content)) } } + +type storageUsageCountingRecordRepo struct { + repository.BackupRecordRepository + mu sync.Mutex + usageCalls int +} + +func (r *storageUsageCountingRecordRepo) StorageUsage(ctx context.Context) ([]repository.BackupStorageUsageItem, error) { + r.mu.Lock() + r.usageCalls++ + r.mu.Unlock() + return r.BackupRecordRepository.StorageUsage(ctx) +} + +type storageUsageFailingRecordRepo struct { + repository.BackupRecordRepository + err error +} + +func (r *storageUsageFailingRecordRepo) StorageUsage(context.Context) ([]repository.BackupStorageUsageItem, error) { + return nil, r.err +} + +var errStorageUsageFailed = errors.New("storage usage failed") diff --git a/server/internal/service/backup_task_service.go b/server/internal/service/backup_task_service.go index b8bc607..1f40f2d 100644 --- a/server/internal/service/backup_task_service.go +++ b/server/internal/service/backup_task_service.go @@ -33,16 +33,16 @@ type BackupTaskUpsertInput struct { DBPassword string `json:"dbPassword" binding:"max=255"` DBName string `json:"dbName" binding:"max=255"` DBPath string `json:"dbPath" binding:"max=500"` - StorageTargetID uint `json:"storageTargetId"` // deprecated: 向后兼容 - StorageTargetIDs []uint `json:"storageTargetIds"` // 新增:多存储目标 - NodeID uint `json:"nodeId"` // 执行节点(0 = 本机 Master 或节点池) + StorageTargetID uint `json:"storageTargetId"` // deprecated: 向后兼容 + StorageTargetIDs []uint `json:"storageTargetIds"` // 新增:多存储目标 + NodeID uint `json:"nodeId"` // 执行节点(0 = 本机 Master 或节点池) // NodePoolTag 节点池标签。NodeID=0 且本字段非空时,调度器动态从 Labels 命中的在线节点中选负载最低者。 - NodePoolTag string `json:"nodePoolTag" binding:"max=64"` - Tags string `json:"tags" binding:"max=500"` // 逗号分隔标签 - RetentionDays int `json:"retentionDays"` - Compression string `json:"compression" binding:"omitempty,oneof=gzip none"` - Encrypt bool `json:"encrypt"` - MaxBackups int `json:"maxBackups"` + NodePoolTag string `json:"nodePoolTag" binding:"max=64"` + Tags string `json:"tags" binding:"max=500"` // 逗号分隔标签 + RetentionDays int `json:"retentionDays"` + Compression string `json:"compression" binding:"omitempty,oneof=gzip none"` + Encrypt bool `json:"encrypt"` + MaxBackups int `json:"maxBackups"` // ExtraConfig 类型特有扩展配置(如 SAP HANA 的 backupLevel/backupChannels) ExtraConfig map[string]any `json:"extraConfig"` // 验证(恢复演练)配置 @@ -70,8 +70,8 @@ type BackupTaskSummary struct { Type string `json:"type"` Enabled bool `json:"enabled"` CronExpr string `json:"cronExpr"` - StorageTargetID uint `json:"storageTargetId"` // deprecated: 取第一个 - StorageTargetName string `json:"storageTargetName"` // deprecated: 取第一个 + StorageTargetID uint `json:"storageTargetId"` // deprecated: 取第一个 + StorageTargetName string `json:"storageTargetName"` // deprecated: 取第一个 StorageTargetIDs []uint `json:"storageTargetIds"` StorageTargetNames []string `json:"storageTargetNames"` NodeID uint `json:"nodeId"` @@ -91,10 +91,10 @@ type BackupTaskSummary struct { SLAHoursRPO int `json:"slaHoursRpo"` AlertOnConsecutiveFails int `json:"alertOnConsecutiveFails"` // 备份复制目标(3-2-1) - ReplicationTargetIDs []uint `json:"replicationTargetIds"` - MaintenanceWindows string `json:"maintenanceWindows"` - DependsOnTaskIDs []uint `json:"dependsOnTaskIds"` - UpdatedAt time.Time `json:"updatedAt"` + ReplicationTargetIDs []uint `json:"replicationTargetIds"` + MaintenanceWindows string `json:"maintenanceWindows"` + DependsOnTaskIDs []uint `json:"dependsOnTaskIds"` + UpdatedAt time.Time `json:"updatedAt"` } type BackupTaskDetail struct { @@ -488,6 +488,7 @@ func (s *BackupTaskService) validateInput(ctx context.Context, existing *model.B return apperror.BadRequest("BACKUP_STORAGE_TARGET_INVALID", fmt.Sprintf("关联的存储目标 %d 不存在", tid), nil) } } + var fixedNode *model.Node if input.NodeID > 0 && s.nodes != nil { node, err := s.nodes.FindByID(ctx, input.NodeID) if err != nil { @@ -496,12 +497,17 @@ func (s *BackupTaskService) validateInput(ctx context.Context, existing *model.B if node == nil { return apperror.BadRequest("BACKUP_TASK_INVALID", "所选执行节点不存在", nil) } + fixedNode = node } // 节点池与固定节点互斥:固定节点已确定执行位置,不再动态调度 if input.NodeID > 0 && strings.TrimSpace(input.NodePoolTag) != "" { return apperror.BadRequest("BACKUP_TASK_INVALID", "固定执行节点与节点池标签只能选其一", nil) } + if input.Encrypt && (strings.TrimSpace(input.NodePoolTag) != "" || (fixedNode != nil && !fixedNode.IsLocal)) { + return apperror.BadRequest("BACKUP_TASK_REMOTE_ENCRYPT_UNSUPPORTED", + "远程节点暂不支持加密备份。请关闭加密,或将任务固定在 Master 本机执行。", nil) + } if input.RetentionDays < 0 { return apperror.BadRequest("BACKUP_TASK_INVALID", "保留天数不能小于 0", nil) } @@ -639,38 +645,38 @@ func (s *BackupTaskService) buildTask(existing *model.BackupTask, input BackupTa return nil, apperror.BadRequest("BACKUP_TASK_INVALID", "扩展配置格式不合法", err) } item := &model.BackupTask{ - Name: strings.TrimSpace(input.Name), - Type: normalizeBackupTaskType(input.Type), - Enabled: input.Enabled, - CronExpr: strings.TrimSpace(input.CronExpr), - SourcePath: primarySourcePath, - SourcePaths: sourcePathsJSON, - ExcludePatterns: excludePatterns, - DBHost: strings.TrimSpace(input.DBHost), - DBPort: input.DBPort, - DBUser: strings.TrimSpace(input.DBUser), - DBPasswordCiphertext: passwordCiphertext, - DBName: strings.TrimSpace(input.DBName), - DBPath: strings.TrimSpace(input.DBPath), - ExtraConfig: extraConfigJSON, - StorageTargetID: primaryTargetID, - StorageTargets: storageTargets, - NodeID: input.NodeID, - NodePoolTag: strings.TrimSpace(input.NodePoolTag), - Tags: strings.TrimSpace(input.Tags), - RetentionDays: input.RetentionDays, - Compression: compression, - Encrypt: input.Encrypt, - MaxBackups: maxBackups, - LastStatus: "idle", - VerifyEnabled: input.VerifyEnabled, - VerifyCronExpr: strings.TrimSpace(input.VerifyCronExpr), - VerifyMode: normalizeVerifyMode(input.VerifyMode), - SLAHoursRPO: maxInt(0, input.SLAHoursRPO), + Name: strings.TrimSpace(input.Name), + Type: normalizeBackupTaskType(input.Type), + Enabled: input.Enabled, + CronExpr: strings.TrimSpace(input.CronExpr), + SourcePath: primarySourcePath, + SourcePaths: sourcePathsJSON, + ExcludePatterns: excludePatterns, + DBHost: strings.TrimSpace(input.DBHost), + DBPort: input.DBPort, + DBUser: strings.TrimSpace(input.DBUser), + DBPasswordCiphertext: passwordCiphertext, + DBName: strings.TrimSpace(input.DBName), + DBPath: strings.TrimSpace(input.DBPath), + ExtraConfig: extraConfigJSON, + StorageTargetID: primaryTargetID, + StorageTargets: storageTargets, + NodeID: input.NodeID, + NodePoolTag: strings.TrimSpace(input.NodePoolTag), + Tags: strings.TrimSpace(input.Tags), + RetentionDays: input.RetentionDays, + Compression: compression, + Encrypt: input.Encrypt, + MaxBackups: maxBackups, + LastStatus: "idle", + VerifyEnabled: input.VerifyEnabled, + VerifyCronExpr: strings.TrimSpace(input.VerifyCronExpr), + VerifyMode: normalizeVerifyMode(input.VerifyMode), + SLAHoursRPO: maxInt(0, input.SLAHoursRPO), AlertOnConsecutiveFails: alertThreshold(input.AlertOnConsecutiveFails), - ReplicationTargetIDs: encodeUintCSV(input.ReplicationTargetIDs), - MaintenanceWindows: strings.TrimSpace(input.MaintenanceWindows), - DependsOnTaskIDs: encodeUintCSV(input.DependsOnTaskIDs), + ReplicationTargetIDs: encodeUintCSV(input.ReplicationTargetIDs), + MaintenanceWindows: strings.TrimSpace(input.MaintenanceWindows), + DependsOnTaskIDs: encodeUintCSV(input.DependsOnTaskIDs), } if existing != nil { item.LastRunAt = existing.LastRunAt @@ -736,25 +742,25 @@ func toBackupTaskSummary(item *model.BackupTask) BackupTaskSummary { primaryName = targetNames[0] } return BackupTaskSummary{ - ID: item.ID, - Name: item.Name, - Type: normalizeBackupTaskType(item.Type), - Enabled: item.Enabled, - CronExpr: item.CronExpr, - StorageTargetID: primaryID, - StorageTargetName: primaryName, - StorageTargetIDs: targetIDs, - StorageTargetNames: targetNames, - NodeID: item.NodeID, - NodeName: item.Node.Name, - NodePoolTag: item.NodePoolTag, - Tags: item.Tags, - RetentionDays: item.RetentionDays, - Compression: item.Compression, - Encrypt: item.Encrypt, - MaxBackups: item.MaxBackups, - LastRunAt: item.LastRunAt, - LastStatus: item.LastStatus, + ID: item.ID, + Name: item.Name, + Type: normalizeBackupTaskType(item.Type), + Enabled: item.Enabled, + CronExpr: item.CronExpr, + StorageTargetID: primaryID, + StorageTargetName: primaryName, + StorageTargetIDs: targetIDs, + StorageTargetNames: targetNames, + NodeID: item.NodeID, + NodeName: item.Node.Name, + NodePoolTag: item.NodePoolTag, + Tags: item.Tags, + RetentionDays: item.RetentionDays, + Compression: item.Compression, + Encrypt: item.Encrypt, + MaxBackups: item.MaxBackups, + LastRunAt: item.LastRunAt, + LastStatus: item.LastStatus, VerifyEnabled: item.VerifyEnabled, VerifyCronExpr: item.VerifyCronExpr, VerifyMode: item.VerifyMode, @@ -763,7 +769,7 @@ func toBackupTaskSummary(item *model.BackupTask) BackupTaskSummary { ReplicationTargetIDs: parseUintCSV(item.ReplicationTargetIDs), MaintenanceWindows: item.MaintenanceWindows, DependsOnTaskIDs: parseUintCSV(item.DependsOnTaskIDs), - UpdatedAt: item.UpdatedAt, + UpdatedAt: item.UpdatedAt, } } diff --git a/server/internal/service/backup_task_service_test.go b/server/internal/service/backup_task_service_test.go index 5bdbdd8..a62420b 100644 --- a/server/internal/service/backup_task_service_test.go +++ b/server/internal/service/backup_task_service_test.go @@ -3,6 +3,7 @@ package service import ( "context" "path/filepath" + "strings" "testing" "backupx/server/internal/config" @@ -29,6 +30,82 @@ func newBackupTaskServiceForTest(t *testing.T) (*BackupTaskService, repository.S return service, targets, tasks } +func TestBackupTaskServiceRejectsEncryptedRemoteTasks(t *testing.T) { + ctx := context.Background() + service, targets, _ := newBackupTaskServiceForTest(t) + service.SetNodeRepository(&nodeRepoStub{nodes: []model.Node{ + {ID: 41, Name: "master", Token: "master-token", Status: model.NodeStatusOnline, IsLocal: true}, + {ID: 42, Name: "edge", Token: "edge-token", Status: model.NodeStatusOnline, IsLocal: false}, + }}) + if err := targets.Create(ctx, &model.StorageTarget{Name: "local", Type: "local_disk", Enabled: true, ConfigCiphertext: "ciphertext", ConfigVersion: 1, LastTestStatus: "unknown"}); err != nil { + t.Fatalf("seed storage target error: %v", err) + } + + _, err := service.Create(ctx, BackupTaskUpsertInput{ + Name: "encrypted-node-pool", + Type: "file", + Enabled: true, + SourcePath: "/srv/site", + StorageTargetID: 1, + NodePoolTag: "db", + RetentionDays: 30, + Compression: "gzip", + MaxBackups: 10, + Encrypt: true, + }) + if err == nil || !strings.Contains(err.Error(), "远程节点暂不支持加密备份") { + t.Fatalf("expected encrypted node-pool task to be rejected, got %v", err) + } + + created, err := service.Create(ctx, BackupTaskUpsertInput{ + Name: "local-encrypted", + Type: "file", + Enabled: true, + SourcePath: "/srv/site", + StorageTargetID: 1, + RetentionDays: 30, + Compression: "gzip", + MaxBackups: 10, + Encrypt: true, + }) + if err != nil { + t.Fatalf("Create local encrypted task returned error: %v", err) + } + localNodeTask, err := service.Create(ctx, BackupTaskUpsertInput{ + Name: "local-node-encrypted", + Type: "file", + Enabled: true, + SourcePath: "/srv/site", + StorageTargetID: 1, + NodeID: 41, + RetentionDays: 30, + Compression: "gzip", + MaxBackups: 10, + Encrypt: true, + }) + if err != nil { + t.Fatalf("Create encrypted task pinned to local node returned error: %v", err) + } + if localNodeTask.NodeID != 41 || !localNodeTask.Encrypt { + t.Fatalf("expected encrypted task to keep local node, got %#v", localNodeTask) + } + _, err = service.Update(ctx, created.ID, BackupTaskUpsertInput{ + Name: created.Name, + Type: created.Type, + Enabled: true, + SourcePath: "/srv/site", + StorageTargetID: 1, + NodeID: 42, + RetentionDays: 30, + Compression: "gzip", + MaxBackups: 10, + Encrypt: true, + }) + if err == nil || !strings.Contains(err.Error(), "远程节点暂不支持加密备份") { + t.Fatalf("expected encrypted fixed-node update to be rejected, got %v", err) + } +} + func TestBackupTaskServiceCreateAndGet(t *testing.T) { ctx := context.Background() service, targets, _ := newBackupTaskServiceForTest(t) diff --git a/server/internal/service/node_service.go b/server/internal/service/node_service.go index 20c36e1..84dcfeb 100644 --- a/server/internal/service/node_service.go +++ b/server/internal/service/node_service.go @@ -36,6 +36,19 @@ type NodeSummary struct { BandwidthLimit string `json:"bandwidthLimit"` Labels string `json:"labels"` CreatedAt time.Time `json:"createdAt"` + Queue NodeQueue `json:"queue"` + RunningTasks int `json:"runningTasks"` + LastError string `json:"lastError,omitempty"` + Health string `json:"health"` +} + +type NodeQueue struct { + Pending int `json:"pending"` + Dispatched int `json:"dispatched"` + Depth int `json:"depth"` + Timeouts int `json:"timeouts"` + OldestActiveAt *time.Time `json:"oldestActiveAt,omitempty"` + OldestActiveAgeS int `json:"oldestActiveAgeSeconds"` } // NodeCreateInput is the input for creating a new remote node. @@ -54,10 +67,11 @@ type NodeUpdateInput struct { // NodeService manages the cluster nodes. type NodeService struct { - repo repository.NodeRepository - taskRepo repository.BackupTaskRepository - agentRPC NodeAgentRPC - version string + repo repository.NodeRepository + taskRepo repository.BackupTaskRepository + agentRPC NodeAgentRPC + cmdRepo repository.AgentCommandRepository + version string } // NodeAgentRPC 抽象 Agent 远程调用能力(避免 service 内循环依赖)。 @@ -81,6 +95,10 @@ func (s *NodeService) SetAgentRPC(rpc NodeAgentRPC) { s.agentRPC = rpc } +func (s *NodeService) SetAgentCommandRepository(cmdRepo repository.AgentCommandRepository) { + s.cmdRepo = cmdRepo +} + // EnsureLocalNode creates the default "local" node if it does not exist. func (s *NodeService) EnsureLocalNode(ctx context.Context) error { existing, err := s.repo.FindLocal(ctx) @@ -120,24 +138,10 @@ func (s *NodeService) List(ctx context.Context) ([]NodeSummary, error) { if err != nil { return nil, err } + queueByNode := s.loadQueueSummaries(ctx) result := make([]NodeSummary, len(nodes)) for i, n := range nodes { - result[i] = NodeSummary{ - ID: n.ID, - Name: n.Name, - Hostname: n.Hostname, - IPAddress: n.IPAddress, - Status: n.Status, - IsLocal: n.IsLocal, - OS: n.OS, - Arch: n.Arch, - AgentVersion: n.AgentVer, - LastSeen: n.LastSeen, - MaxConcurrent: n.MaxConcurrent, - BandwidthLimit: n.BandwidthLimit, - Labels: n.Labels, - CreatedAt: n.CreatedAt, - } + result[i] = s.toNodeSummary(&n, queueByNode[n.ID]) } return result, nil } @@ -150,7 +154,24 @@ func (s *NodeService) Get(ctx context.Context, id uint) (*NodeSummary, error) { if node == nil { return nil, apperror.New(http.StatusNotFound, "NODE_NOT_FOUND", "节点不存在", nil) } - return &NodeSummary{ + queueByNode := s.loadQueueSummaries(ctx) + summary := s.toNodeSummary(node, queueByNode[node.ID]) + return &summary, nil +} + +func (s *NodeService) loadQueueSummaries(ctx context.Context) map[uint]repository.AgentCommandQueueSummary { + if s.cmdRepo == nil { + return nil + } + summaries, err := s.cmdRepo.NodeQueueSummaries(ctx) + if err != nil { + return nil + } + return summaries +} + +func (s *NodeService) toNodeSummary(node *model.Node, queue repository.AgentCommandQueueSummary) NodeSummary { + summary := NodeSummary{ ID: node.ID, Name: node.Name, Hostname: node.Hostname, @@ -165,7 +186,31 @@ func (s *NodeService) Get(ctx context.Context, id uint) (*NodeSummary, error) { BandwidthLimit: node.BandwidthLimit, Labels: node.Labels, CreatedAt: node.CreatedAt, - }, nil + Queue: NodeQueue{ + Pending: queue.Pending, + Dispatched: queue.Dispatched, + Depth: queue.Depth, + Timeouts: queue.Timeouts, + OldestActiveAt: queue.OldestActiveAt, + }, + RunningTasks: queue.Running, + LastError: queue.LastError, + Health: nodeHealth(node, queue), + } + if queue.OldestActiveAt != nil { + summary.Queue.OldestActiveAgeS = int(time.Since(*queue.OldestActiveAt).Seconds()) + } + return summary +} + +func nodeHealth(node *model.Node, queue repository.AgentCommandQueueSummary) string { + if node.Status != model.NodeStatusOnline { + return "offline" + } + if queue.Timeouts > 0 || strings.TrimSpace(queue.LastError) != "" { + return "degraded" + } + return "healthy" } // Create registers a new remote node and returns its authentication token. diff --git a/server/internal/service/node_service_test.go b/server/internal/service/node_service_test.go index 51cca16..7896b0e 100644 --- a/server/internal/service/node_service_test.go +++ b/server/internal/service/node_service_test.go @@ -23,6 +23,9 @@ func openNodeServiceDB(t *testing.T) *gorm.DB { if err := db.AutoMigrate(&model.Node{}); err != nil { t.Fatalf("migrate: %v", err) } + if err := db.AutoMigrate(&model.AgentCommand{}); err != nil { + t.Fatalf("migrate agent commands: %v", err) + } return db } @@ -157,3 +160,48 @@ func TestRotateTokenNotFound(t *testing.T) { t.Fatalf("expected not found error") } } + +func TestNodeServiceListIncludesQueueHealthSummary(t *testing.T) { + db := openNodeServiceDB(t) + nodeRepo := repository.NewNodeRepository(db) + cmdRepo := repository.NewAgentCommandRepository(db) + svc := NewNodeService(nodeRepo, "test") + svc.SetAgentCommandRepository(cmdRepo) + ctx := context.Background() + node := &model.Node{ + Name: "edge-a", + Token: "edge-token", + Status: model.NodeStatusOnline, + IsLocal: false, + LastSeen: time.Now().UTC(), + } + if err := nodeRepo.Create(ctx, node); err != nil { + t.Fatalf("Create node returned error: %v", err) + } + old := time.Now().UTC().Add(-time.Minute) + if err := cmdRepo.Create(ctx, &model.AgentCommand{NodeID: node.ID, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusPending, CreatedAt: old}); err != nil { + t.Fatalf("Create pending command returned error: %v", err) + } + completedAt := time.Now().UTC() + if err := cmdRepo.Create(ctx, &model.AgentCommand{NodeID: node.ID, Type: model.AgentCommandTypeRunTask, Status: model.AgentCommandStatusTimeout, ErrorMessage: "agent timeout", CompletedAt: &completedAt}); err != nil { + t.Fatalf("Create timeout command returned error: %v", err) + } + + items, err := svc.List(ctx) + if err != nil { + t.Fatalf("List returned error: %v", err) + } + if len(items) != 1 { + t.Fatalf("expected one node, got %#v", items) + } + got := items[0] + if got.Queue.Pending != 1 || got.Queue.Depth != 1 || got.Queue.Timeouts != 1 { + t.Fatalf("unexpected queue summary: %#v", got.Queue) + } + if got.Health != "degraded" || got.LastError != "agent timeout" { + t.Fatalf("expected terminal command errors to degrade healthy node, got %#v", got) + } + if got.Queue.OldestActiveAt == nil || got.Queue.OldestActiveAgeS <= 0 { + t.Fatalf("expected oldest active metadata, got %#v", got.Queue) + } +} diff --git a/web/src/pages/nodes/NodesPage.test.ts b/web/src/pages/nodes/NodesPage.test.ts index 51a3fa5..6d8d105 100644 --- a/web/src/pages/nodes/NodesPage.test.ts +++ b/web/src/pages/nodes/NodesPage.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from 'vitest' import type { UserInfo } from '../../services/auth' -import { canManageNodes } from './NodesPage' +import { canManageNodes, formatQueueAge, getNodeHealthView } from './NodesPage' +import type { NodeSummary } from '../../types/nodes' function user(role: string): UserInfo { return { @@ -19,3 +20,58 @@ describe('canManageNodes', () => { expect(canManageNodes(null)).toBe(false) }) }) + +describe('node diagnostics helpers', () => { + it('formats queue age and health status from backend summaries', () => { + const node: NodeSummary = { + id: 1, + name: 'edge-a', + hostname: '', + ipAddress: '', + status: 'online', + isLocal: false, + os: 'linux', + arch: 'amd64', + agentVersion: 'v1', + lastSeen: '2026-05-12T00:00:00Z', + createdAt: '2026-05-12T00:00:00Z', + health: 'degraded', + lastError: 'agent timeout', + runningTasks: 1, + queue: { + pending: 2, + dispatched: 1, + depth: 3, + timeouts: 1, + oldestActiveAgeSeconds: 125, + }, + } + + expect(formatQueueAge(node.queue?.oldestActiveAgeSeconds)).toBe('2m') + expect(getNodeHealthView(node)).toEqual({ + text: '异常', + badgeStatus: 'warning', + tagColor: 'orangered', + tooltip: 'agent timeout', + }) + }) + + it('treats offline nodes as offline even without queue errors', () => { + const node = { + id: 2, + name: 'edge-b', + hostname: '', + ipAddress: '', + status: 'offline', + isLocal: false, + os: '', + arch: '', + agentVersion: '', + lastSeen: '', + createdAt: '', + } satisfies NodeSummary + + expect(formatQueueAge(0)).toBe('-') + expect(getNodeHealthView(node).text).toBe('离线') + }) +}) diff --git a/web/src/pages/nodes/NodesPage.tsx b/web/src/pages/nodes/NodesPage.tsx index b32cd12..9587078 100644 --- a/web/src/pages/nodes/NodesPage.tsx +++ b/web/src/pages/nodes/NodesPage.tsx @@ -20,6 +20,28 @@ export function canManageNodes(user: UserInfo | null | undefined): boolean { return isAdmin(user) } +export function formatQueueAge(seconds?: number): string { + if (!seconds || seconds <= 0) return '-' + if (seconds < 60) return `${seconds}s` + if (seconds < 3600) return `${Math.floor(seconds / 60)}m` + return `${Math.floor(seconds / 3600)}h` +} + +export function getNodeHealthView(node: NodeSummary) { + if (node.status !== 'online' || node.health === 'offline') { + return { text: '离线', badgeStatus: 'default' as const, tagColor: 'gray', tooltip: '节点未在线' } + } + if (node.health === 'degraded' || node.queue?.timeouts || node.lastError) { + return { + text: '异常', + badgeStatus: 'warning' as const, + tagColor: 'orangered', + tooltip: node.lastError || '存在超时或失败的 Agent 命令', + } + } + return { text: '健康', badgeStatus: 'success' as const, tagColor: 'green', tooltip: 'Agent 心跳与队列状态正常' } +} + export default function NodesPage() { const [nodes, setNodes] = useState([]) const [loading, setLoading] = useState(false) @@ -122,10 +144,18 @@ export default function NodesPage() { ), }, { - title: '状态', dataIndex: 'status', width: 100, - render: (status: string) => status === 'online' - ? - : , + title: '健康', dataIndex: 'health', width: 150, + render: (_: string, record: NodeSummary) => { + const health = getNodeHealthView(record) + return ( + + + + {health.text} + + + ) + }, }, { title: '主机名', dataIndex: 'hostname', render: (v: string) => v || '-' }, { title: 'IP 地址', dataIndex: 'ipAddress', render: (v: string) => v || '-' }, @@ -138,6 +168,27 @@ export default function NodesPage() { title: 'Agent 版本', dataIndex: 'agentVersion', width: 140, render: (v: string) => renderAgentVersion(v, masterVersion), }, + { + title: '队列', dataIndex: 'queue', width: 160, + render: (_: unknown, record: NodeSummary) => { + const queue = record.queue + if (!queue || queue.depth === 0) { + return 空闲 + } + return ( + + + 深度 {queue.depth} + {queue.timeouts > 0 && 超时 {queue.timeouts}} + + + ) + }, + }, + { + title: '运行中', dataIndex: 'runningTasks', width: 90, + render: (v: number | undefined) => v && v > 0 ? {v} : 0, + }, { title: '标签 / 节点池', dataIndex: 'labels', width: 180, render: (v: string) => { diff --git a/web/src/types/nodes.ts b/web/src/types/nodes.ts index ded4552..9a8b1a9 100644 --- a/web/src/types/nodes.ts +++ b/web/src/types/nodes.ts @@ -14,6 +14,19 @@ export interface NodeSummary { /** CSV 节点标签;任务的 NodePoolTag 命中这里任一即会被调度到本节点 */ labels?: string createdAt: string + queue?: NodeQueueSummary + runningTasks?: number + lastError?: string + health?: 'healthy' | 'degraded' | 'offline' +} + +export interface NodeQueueSummary { + pending: number + dispatched: number + depth: number + timeouts: number + oldestActiveAt?: string + oldestActiveAgeSeconds?: number } export interface DirEntry {