功能: 修复并实现多节点集群部署 (#38)

基础修复:
- 新增节点离线检测:每 15s 扫描,超 45s 未心跳的远程节点自动置离线
- 节点删除前检查关联任务,避免孤立备份任务
- BackupTaskRepository 新增 CountByNodeID/ListByNodeID

Master 端 Agent 协议:
- 新增 AgentCommand 模型与命令队列仓储(pending/dispatched/succeeded/failed/timeout)
- 新增 AgentService:任务下发、命令轮询、结果回收、超时扫描
- 新增专用 Agent HTTP API(X-Agent-Token 认证):
  /api/agent/heartbeat
  /api/agent/commands/poll
  /api/agent/commands/:id/result
  /api/agent/tasks/:id
  /api/agent/records/:id
- BackupExecutionService 支持 node 路由:task.NodeID 指向远程节点时自动入队派发

Agent CLI(backupx agent 子命令):
- 配置:YAML 文件 / 环境变量 / CLI 参数,优先级 CLI > 文件 > 环境
- 心跳循环 + 命令轮询循环 + 优雅退出
- 本地复用 BackupRunner 与 storage registry 执行备份并直接上传
- 支持 run_task 和 list_dir 两种命令

远程目录浏览:
- NodeService 支持通过 Agent RPC 列出远程节点目录(15s 超时)

前端:
- NodesPage 添加节点后展示 Agent 启动命令和环境变量配置

文档:
- README 中英文重写"多节点集群"章节,含架构图、步骤、限制、CLI 参考
This commit is contained in:
Wu Qing
2026-04-17 12:29:08 +08:00
committed by GitHub
parent e04774ff68
commit 757b0fa5ed
27 changed files with 2224 additions and 24 deletions

View File

@@ -0,0 +1,203 @@
package agent
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"os"
"runtime"
"strings"
"sync"
"time"
)
// Agent 是 Agent 进程的主控制器。
type Agent struct {
cfg *Config
client *MasterClient
executor *Executor
version string
mu sync.Mutex
started bool
}
// New 构造 Agent。
func New(cfg *Config, version string) (*Agent, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
client := NewMasterClient(cfg.Master, cfg.Token, cfg.InsecureSkipTLSVerify)
executor := NewExecutor(client, cfg.TempDir)
return &Agent{
cfg: cfg,
client: client,
executor: executor,
version: version,
}, nil
}
// Run 启动 Agent 主循环,阻塞直到 ctx 被取消。
func (a *Agent) Run(ctx context.Context) error {
a.mu.Lock()
if a.started {
a.mu.Unlock()
return fmt.Errorf("agent already started")
}
a.started = true
a.mu.Unlock()
hbInterval := parseDuration(a.cfg.HeartbeatInterval, 15*time.Second)
pollInterval := parseDuration(a.cfg.PollInterval, 5*time.Second)
// 首次握手:通过一次心跳确认 token 有效
if err := a.heartbeatOnce(ctx); err != nil {
return fmt.Errorf("initial heartbeat failed: %w", err)
}
log.Printf("[agent] connected to master %s", a.cfg.Master)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
a.heartbeatLoop(ctx, hbInterval)
}()
go func() {
defer wg.Done()
a.pollLoop(ctx, pollInterval)
}()
wg.Wait()
return ctx.Err()
}
// heartbeatLoop 定期发送心跳。
func (a *Agent) heartbeatLoop(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := a.heartbeatOnce(ctx); err != nil {
log.Printf("[agent] heartbeat failed: %v", err)
}
}
}
}
func (a *Agent) heartbeatOnce(ctx context.Context) error {
hostname, _ := os.Hostname()
req := HeartbeatRequest{
Token: a.cfg.Token,
Hostname: hostname,
IPAddress: detectLocalIP(),
AgentVersion: a.version,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
_, err := a.client.Heartbeat(ctx, req)
return err
}
// pollLoop 定期拉取并处理待执行命令。
func (a *Agent) pollLoop(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
a.pollAndHandleOnce(ctx)
}
}
}
func (a *Agent) pollAndHandleOnce(ctx context.Context) {
cmd, err := a.client.PollCommand(ctx)
if err != nil {
log.Printf("[agent] poll command failed: %v", err)
return
}
if cmd == nil {
return
}
log.Printf("[agent] received command #%d type=%s", cmd.ID, cmd.Type)
switch cmd.Type {
case "run_task":
a.handleRunTask(ctx, cmd)
case "list_dir":
a.handleListDir(ctx, cmd)
default:
msg := fmt.Sprintf("unknown command type: %s", cmd.Type)
log.Printf("[agent] %s", msg)
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, msg, nil)
}
}
// handleRunTask 处理 run_task 命令
func (a *Agent) handleRunTask(ctx context.Context, cmd *CommandPayload) {
var payload struct {
TaskID uint `json:"taskId"`
RecordID uint `json:"recordId"`
}
if err := json.Unmarshal(cmd.Payload, &payload); err != nil {
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, "invalid payload: "+err.Error(), nil)
return
}
if err := a.executor.ExecuteRunTask(ctx, payload.TaskID, payload.RecordID); err != nil {
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, err.Error(), nil)
return
}
_ = a.client.SubmitCommandResult(ctx, cmd.ID, true, "", map[string]any{
"taskId": payload.TaskID,
"recordId": payload.RecordID,
})
}
// handleListDir 处理 list_dir 命令(阶段四实现)
func (a *Agent) handleListDir(ctx context.Context, cmd *CommandPayload) {
var payload struct {
Path string `json:"path"`
}
if err := json.Unmarshal(cmd.Payload, &payload); err != nil {
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, "invalid payload: "+err.Error(), nil)
return
}
entries, err := listLocalDir(payload.Path)
if err != nil {
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, err.Error(), nil)
return
}
_ = a.client.SubmitCommandResult(ctx, cmd.ID, true, "", map[string]any{"entries": entries})
}
// 辅助函数
func parseDuration(s string, fallback time.Duration) time.Duration {
if strings.TrimSpace(s) == "" {
return fallback
}
if d, err := time.ParseDuration(s); err == nil {
return d
}
return fallback
}
func detectLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String()
}
}
}
return ""
}

View File

@@ -0,0 +1,208 @@
package agent
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// MasterClient 是 Agent 调用 Master HTTP API 的封装。
type MasterClient struct {
baseURL string
token string
httpClient *http.Client
}
// NewMasterClient 构造 Master 客户端。
func NewMasterClient(baseURL, token string, insecureTLS bool) *MasterClient {
transport := &http.Transport{}
if insecureTLS {
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
return &MasterClient{
baseURL: strings.TrimRight(baseURL, "/"),
token: token,
httpClient: &http.Client{
Timeout: 120 * time.Second,
Transport: transport,
},
}
}
// HeartbeatRequest Agent 上报心跳的请求
type HeartbeatRequest struct {
Token string `json:"token"`
Hostname string `json:"hostname,omitempty"`
IPAddress string `json:"ipAddress,omitempty"`
AgentVersion string `json:"agentVersion,omitempty"`
OS string `json:"os,omitempty"`
Arch string `json:"arch,omitempty"`
}
// HeartbeatResponse Master 返回的心跳响应
type HeartbeatResponse struct {
Status string `json:"status"`
NodeID uint `json:"nodeId"`
Name string `json:"name"`
}
// Heartbeat 上报心跳并获取节点元信息
func (c *MasterClient) Heartbeat(ctx context.Context, req HeartbeatRequest) (*HeartbeatResponse, error) {
var resp HeartbeatResponse
if err := c.do(ctx, http.MethodPost, "/api/agent/heartbeat", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
// CommandPayload 与 service.AgentCommandPayload 对齐
type CommandPayload struct {
ID uint `json:"id"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload,omitempty"`
}
// PollCommandResponse 轮询响应:无命令时 Command 为 nil
type PollCommandResponse struct {
Command *CommandPayload `json:"command"`
}
// PollCommand 拉取下一条待执行命令
func (c *MasterClient) PollCommand(ctx context.Context) (*CommandPayload, error) {
var resp PollCommandResponse
if err := c.do(ctx, http.MethodPost, "/api/agent/commands/poll", nil, &resp); err != nil {
return nil, err
}
return resp.Command, nil
}
// SubmitCommandResult 上报命令执行结果
func (c *MasterClient) SubmitCommandResult(ctx context.Context, cmdID uint, success bool, errorMsg string, result any) error {
var resultJSON json.RawMessage
if result != nil {
data, err := json.Marshal(result)
if err != nil {
return fmt.Errorf("marshal result: %w", err)
}
resultJSON = data
}
payload := map[string]any{
"success": success,
"errorMessage": errorMsg,
}
if resultJSON != nil {
payload["result"] = resultJSON
}
path := fmt.Sprintf("/api/agent/commands/%d/result", cmdID)
return c.do(ctx, http.MethodPost, path, payload, nil)
}
// TaskSpec 与 service.AgentTaskSpec 对齐
type TaskSpec struct {
TaskID uint `json:"taskId"`
Name string `json:"name"`
Type string `json:"type"`
SourcePath string `json:"sourcePath"`
SourcePaths string `json:"sourcePaths"`
ExcludePatterns string `json:"excludePatterns"`
DBHost string `json:"dbHost"`
DBPort int `json:"dbPort"`
DBUser string `json:"dbUser"`
DBPassword string `json:"dbPassword"`
DBName string `json:"dbName"`
DBPath string `json:"dbPath"`
ExtraConfig string `json:"extraConfig"`
Compression string `json:"compression"`
Encrypt bool `json:"encrypt"`
StorageTargets []StorageTargetConfig `json:"storageTargets"`
}
// StorageTargetConfig 与 service.AgentStorageTargetConfig 对齐
type StorageTargetConfig struct {
ID uint `json:"id"`
Type string `json:"type"`
Name string `json:"name"`
Config json.RawMessage `json:"config"`
}
// GetTaskSpec 拉取任务规格
func (c *MasterClient) GetTaskSpec(ctx context.Context, taskID uint) (*TaskSpec, error) {
var spec TaskSpec
path := fmt.Sprintf("/api/agent/tasks/%d", taskID)
if err := c.do(ctx, http.MethodGet, path, nil, &spec); err != nil {
return nil, err
}
return &spec, nil
}
// RecordUpdate 与 service.AgentRecordUpdate 对齐
type RecordUpdate struct {
Status string `json:"status,omitempty"`
FileName string `json:"fileName,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Checksum string `json:"checksum,omitempty"`
StoragePath string `json:"storagePath,omitempty"`
ErrorMessage string `json:"errorMessage,omitempty"`
LogAppend string `json:"logAppend,omitempty"`
}
// UpdateRecord 上报备份记录的状态/日志
func (c *MasterClient) UpdateRecord(ctx context.Context, recordID uint, update RecordUpdate) error {
path := fmt.Sprintf("/api/agent/records/%d", recordID)
return c.do(ctx, http.MethodPost, path, update, nil)
}
// do 是通用 HTTP 调用。所有 Agent API 都统一走 JSON + X-Agent-Token。
func (c *MasterClient) do(ctx context.Context, method, path string, body any, out any) error {
var reqBody io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
reqBody = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, reqBody)
if err != nil {
return err
}
req.Header.Set("X-Agent-Token", c.token)
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("%s %s: %w", method, path, err)
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read response: %w", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("%s %s: http %d: %s", method, path, resp.StatusCode, string(data))
}
if out == nil {
return nil
}
// BackupX API 统一封装成 {code, data, message} 形式,需要解出 data 字段
var envelope struct {
Code string `json:"code"`
Data json.RawMessage `json:"data"`
Message string `json:"message"`
}
if err := json.Unmarshal(data, &envelope); err == nil && envelope.Data != nil {
if err := json.Unmarshal(envelope.Data, out); err != nil {
return fmt.Errorf("decode data: %w", err)
}
return nil
}
// 兼容直接返回对象的情况
return json.Unmarshal(data, out)
}

View File

@@ -0,0 +1,105 @@
// Package agent 实现 BackupX 远程 Agent。
//
// Agent 是一个独立的 Go 进程,部署在远程服务器上,通过 HTTP 轮询的方式
// 与 Master 通信:定期上报心跳、拉取 Master 下发的命令、本地执行备份、
// 把执行结果和日志回报给 Master。
//
// 通信协议见 server/internal/http/agent_handler.go。
package agent
import (
"errors"
"fmt"
"os"
"strings"
"gopkg.in/yaml.v3"
)
// Config 是 Agent 的运行时配置。
type Config struct {
// Master BackupX Master 的 HTTP 基础地址,例如 http://master.example.com:8340
Master string `yaml:"master"`
// Token 节点认证令牌(在 Master 创建节点时生成)
Token string `yaml:"token"`
// HeartbeatInterval 心跳间隔,默认 15s
HeartbeatInterval string `yaml:"heartbeatInterval"`
// PollInterval 命令轮询间隔,默认 5s
PollInterval string `yaml:"pollInterval"`
// TempDir 备份临时目录,默认 /tmp/backupx-agent
TempDir string `yaml:"tempDir"`
// InsecureSkipTLSVerify 测试环境允许跳过 TLS 证书校验
InsecureSkipTLSVerify bool `yaml:"insecureSkipTlsVerify"`
}
// LoadConfigFile 从 YAML 文件加载 Agent 配置。
func LoadConfigFile(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("read agent config: %w", err)
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("parse agent config: %w", err)
}
return applyConfigDefaults(&cfg)
}
// LoadConfigFromEnv 从环境变量加载 Agent 配置。优先级低于 --config 文件。
//
// 支持的环境变量:
// - BACKUPX_AGENT_MASTER Master URL
// - BACKUPX_AGENT_TOKEN 节点认证令牌
// - BACKUPX_AGENT_HEARTBEAT 心跳间隔(如 15s
// - BACKUPX_AGENT_POLL 命令轮询间隔(如 5s
// - BACKUPX_AGENT_TEMP_DIR 临时目录
// - BACKUPX_AGENT_INSECURE_TLS true / 1 跳过 TLS 校验
func LoadConfigFromEnv() (*Config, error) {
cfg := &Config{
Master: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_MASTER")),
Token: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_TOKEN")),
HeartbeatInterval: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_HEARTBEAT")),
PollInterval: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_POLL")),
TempDir: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_TEMP_DIR")),
InsecureSkipTLSVerify: strings.EqualFold(os.Getenv("BACKUPX_AGENT_INSECURE_TLS"), "true") || os.Getenv("BACKUPX_AGENT_INSECURE_TLS") == "1",
}
return applyConfigDefaults(cfg)
}
// MergeWithFlags 把命令行覆盖值合并入配置(非空覆盖)。
func (c *Config) MergeWithFlags(master, token, tempDir string) {
if strings.TrimSpace(master) != "" {
c.Master = master
}
if strings.TrimSpace(token) != "" {
c.Token = token
}
if strings.TrimSpace(tempDir) != "" {
c.TempDir = tempDir
}
}
// Validate 校验必填字段。
func (c *Config) Validate() error {
if strings.TrimSpace(c.Master) == "" {
return errors.New("master url is required (set via --master, BACKUPX_AGENT_MASTER or config file)")
}
if strings.TrimSpace(c.Token) == "" {
return errors.New("token is required (set via --token, BACKUPX_AGENT_TOKEN or config file)")
}
return nil
}
func applyConfigDefaults(cfg *Config) (*Config, error) {
if cfg.HeartbeatInterval == "" {
cfg.HeartbeatInterval = "15s"
}
if cfg.PollInterval == "" {
cfg.PollInterval = "5s"
}
if cfg.TempDir == "" {
cfg.TempDir = "/tmp/backupx-agent"
}
cfg.Master = strings.TrimRight(strings.TrimSpace(cfg.Master), "/")
return cfg, nil
}

View File

@@ -0,0 +1,101 @@
package agent
import (
"os"
"path/filepath"
"testing"
)
func TestLoadConfigFile(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "agent.yaml")
content := `master: http://master.example.com:8340/
token: abc123
heartbeatInterval: 20s
pollInterval: 3s
tempDir: /var/backupx-agent
insecureSkipTlsVerify: true
`
if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatal(err)
}
cfg, err := LoadConfigFile(path)
if err != nil {
t.Fatalf("load: %v", err)
}
if cfg.Master != "http://master.example.com:8340" {
t.Errorf("trailing slash should be trimmed: %q", cfg.Master)
}
if cfg.Token != "abc123" {
t.Errorf("token: %q", cfg.Token)
}
if cfg.HeartbeatInterval != "20s" || cfg.PollInterval != "3s" {
t.Errorf("intervals: %+v", cfg)
}
if !cfg.InsecureSkipTLSVerify {
t.Errorf("insecure should be true")
}
}
func TestLoadConfigDefaults(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "agent.yaml")
if err := os.WriteFile(path, []byte("master: http://m\ntoken: t\n"), 0644); err != nil {
t.Fatal(err)
}
cfg, err := LoadConfigFile(path)
if err != nil {
t.Fatal(err)
}
if cfg.HeartbeatInterval != "15s" || cfg.PollInterval != "5s" {
t.Errorf("default intervals not applied: %+v", cfg)
}
if cfg.TempDir != "/tmp/backupx-agent" {
t.Errorf("default tempdir: %q", cfg.TempDir)
}
}
func TestConfigValidate(t *testing.T) {
cases := []struct {
name string
cfg Config
wantErr bool
}{
{"valid", Config{Master: "http://m", Token: "t"}, false},
{"missing master", Config{Token: "t"}, true},
{"missing token", Config{Master: "http://m"}, true},
}
for _, c := range cases {
err := c.cfg.Validate()
if (err != nil) != c.wantErr {
t.Errorf("%s: err=%v wantErr=%v", c.name, err, c.wantErr)
}
}
}
func TestMergeWithFlags(t *testing.T) {
cfg := &Config{Master: "http://old", Token: "old"}
cfg.MergeWithFlags("http://new", "", "/tmp/x")
if cfg.Master != "http://new" {
t.Errorf("master not overridden: %q", cfg.Master)
}
if cfg.Token != "old" {
t.Errorf("empty flag should not override: %q", cfg.Token)
}
if cfg.TempDir != "/tmp/x" {
t.Errorf("tempDir: %q", cfg.TempDir)
}
}
func TestLoadConfigFromEnv(t *testing.T) {
t.Setenv("BACKUPX_AGENT_MASTER", "http://env-master")
t.Setenv("BACKUPX_AGENT_TOKEN", "env-token")
t.Setenv("BACKUPX_AGENT_INSECURE_TLS", "true")
cfg, err := LoadConfigFromEnv()
if err != nil {
t.Fatal(err)
}
if cfg.Master != "http://env-master" || cfg.Token != "env-token" || !cfg.InsecureSkipTLSVerify {
t.Errorf("env not picked up: %+v", cfg)
}
}

View File

@@ -0,0 +1,266 @@
package agent
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"backupx/server/internal/backup"
"backupx/server/internal/storage"
storageRclone "backupx/server/internal/storage/rclone"
"backupx/server/pkg/compress"
)
// Executor 负责在 Agent 本地执行命令。
type Executor struct {
client *MasterClient
tempDir string
backupRegistry *backup.Registry
storageRegistry *storage.Registry
}
// NewExecutor 构造执行器。预先初始化 backup runner 与 storage registry。
func NewExecutor(client *MasterClient, tempDir string) *Executor {
backupRegistry := backup.NewRegistry(
backup.NewFileRunner(),
backup.NewSQLiteRunner(),
backup.NewMySQLRunner(nil),
backup.NewPostgreSQLRunner(nil),
backup.NewSAPHANARunner(nil),
)
storageRegistry := storage.NewRegistry(
storageRclone.NewLocalDiskFactory(),
storageRclone.NewS3Factory(),
storageRclone.NewWebDAVFactory(),
storageRclone.NewGoogleDriveFactory(),
storageRclone.NewAliyunOSSFactory(),
storageRclone.NewTencentCOSFactory(),
storageRclone.NewQiniuKodoFactory(),
storageRclone.NewFTPFactory(),
storageRclone.NewRcloneFactory(),
)
storageRclone.RegisterAllBackends(storageRegistry)
return &Executor{
client: client,
tempDir: tempDir,
backupRegistry: backupRegistry,
storageRegistry: storageRegistry,
}
}
// ExecuteRunTask 处理 run_task 命令:拉规格 → 执行 runner → 压缩 → 上传 → 上报记录。
//
// 注意Agent 当前不支持 Encrypt=true加密密钥不下发到 Agent避免密钥扩散
// 遇到启用加密的任务会向 Master 上报失败并返回错误。
func (e *Executor) ExecuteRunTask(ctx context.Context, taskID, recordID uint) error {
// 1) 拉取任务规格
spec, err := e.client.GetTaskSpec(ctx, taskID)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("拉取任务规格失败: %v", err))
return err
}
if spec.Encrypt {
msg := "Agent 不支持加密备份(加密密钥仅在 Master 端持有)"
e.reportRecordFailure(ctx, recordID, msg)
return fmt.Errorf("%s", msg)
}
e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 开始执行任务 %s (type=%s)\n", spec.Name, spec.Type))
// 2) 构造 backup.TaskSpec 并找对应 runner
startedAt := time.Now().UTC()
if err := os.MkdirAll(e.tempDir, 0o755); err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("创建临时目录失败: %v", err))
return err
}
backupSpec := buildBackupTaskSpec(spec, startedAt, e.tempDir)
runner, err := e.backupRegistry.Runner(backupSpec.Type)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("不支持的备份类型: %v", err))
return err
}
// 3) 运行 runner
logger := newRecordLogger(ctx, e.client, recordID)
result, err := runner.Run(ctx, backupSpec, logger)
if err != nil {
e.reportRecordFailure(ctx, recordID, err.Error())
return err
}
defer os.RemoveAll(result.TempDir)
// 4) 可选 gzip 压缩
finalPath := result.ArtifactPath
if strings.EqualFold(spec.Compression, "gzip") && !strings.HasSuffix(strings.ToLower(finalPath), ".gz") {
e.appendLog(ctx, recordID, "[agent] 开始压缩备份文件\n")
compressedPath, compressErr := compress.GzipFile(finalPath)
if compressErr != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("压缩失败: %v", compressErr))
return compressErr
}
finalPath = compressedPath
}
info, err := os.Stat(finalPath)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("获取文件信息失败: %v", err))
return err
}
fileName := filepath.Base(finalPath)
fileSize := info.Size()
storagePath := backup.BuildStorageKey(spec.Type, startedAt, fileName)
// 5) 计算 checksum一次读一次并上传到所有目标
checksum, err := computeFileSHA256(finalPath)
if err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("计算 checksum 失败: %v", err))
return err
}
if len(spec.StorageTargets) == 0 {
e.reportRecordFailure(ctx, recordID, "没有关联的存储目标")
return fmt.Errorf("no storage targets")
}
for _, target := range spec.StorageTargets {
if err := e.uploadToTarget(ctx, recordID, target, finalPath, storagePath, fileSize, spec.TaskID); err != nil {
e.reportRecordFailure(ctx, recordID, fmt.Sprintf("上传到 %s 失败: %v", target.Name, err))
return err
}
e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 已上传到存储目标 %s\n", target.Name))
}
// 6) 上报最终成功
return e.client.UpdateRecord(ctx, recordID, RecordUpdate{
Status: "success",
FileName: fileName,
FileSize: fileSize,
Checksum: checksum,
StoragePath: storagePath,
LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize),
})
}
// uploadToTarget 上传单个目标。为保持简化不做上传级重试rclone 本身已有 low-level 重试)。
func (e *Executor) uploadToTarget(ctx context.Context, recordID uint, target StorageTargetConfig, filePath, objectKey string, fileSize int64, taskID uint) error {
var rawConfig map[string]any
if len(target.Config) > 0 {
// DecodeRawConfig 通过 json 解析
if err := jsonUnmarshalMap(target.Config, &rawConfig); err != nil {
return fmt.Errorf("parse storage config: %w", err)
}
}
provider, err := e.storageRegistry.Create(ctx, target.Type, rawConfig)
if err != nil {
return fmt.Errorf("create provider: %w", err)
}
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("open artifact: %w", err)
}
defer f.Close()
meta := map[string]string{
"taskId": fmt.Sprintf("%d", taskID),
"recordId": fmt.Sprintf("%d", recordID),
}
return provider.Upload(ctx, objectKey, f, fileSize, meta)
}
// appendLog 追加日志到 Master 记录(尽力而为,失败不中断主流程)
func (e *Executor) appendLog(ctx context.Context, recordID uint, line string) {
_ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{LogAppend: line})
}
// reportRecordFailure 上报失败状态
func (e *Executor) reportRecordFailure(ctx context.Context, recordID uint, msg string) {
_ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{
Status: "failed",
ErrorMessage: msg,
LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg),
})
}
// buildBackupTaskSpec 把 AgentTaskSpec 转换为 backup.TaskSpec。
func buildBackupTaskSpec(spec *TaskSpec, startedAt time.Time, tempDir string) backup.TaskSpec {
var sourcePaths []string
if strings.TrimSpace(spec.SourcePaths) != "" {
for _, p := range strings.Split(spec.SourcePaths, "\n") {
if p = strings.TrimSpace(p); p != "" {
sourcePaths = append(sourcePaths, p)
}
}
}
var excludes []string
if strings.TrimSpace(spec.ExcludePatterns) != "" {
for _, p := range strings.Split(spec.ExcludePatterns, "\n") {
if p = strings.TrimSpace(p); p != "" {
excludes = append(excludes, p)
}
}
}
return backup.TaskSpec{
ID: spec.TaskID,
Name: spec.Name,
Type: spec.Type,
SourcePath: spec.SourcePath,
SourcePaths: sourcePaths,
ExcludePatterns: excludes,
Database: backup.DatabaseSpec{
Host: spec.DBHost,
Port: spec.DBPort,
User: spec.DBUser,
Password: spec.DBPassword,
Path: spec.DBPath,
Names: splitCommaOrNewline(spec.DBName),
},
Compression: spec.Compression,
Encrypt: spec.Encrypt,
StartedAt: startedAt,
TempDir: tempDir,
}
}
// recordLogger 把 runner 日志回传到 Master 记录。
// 实现 backup.LogWriter每条日志追加到 record.log_content。
type recordLogger struct {
ctx context.Context
client *MasterClient
recordID uint
}
func newRecordLogger(ctx context.Context, client *MasterClient, recordID uint) *recordLogger {
return &recordLogger{ctx: ctx, client: client, recordID: recordID}
}
func (l *recordLogger) WriteLine(message string) {
_ = l.client.UpdateRecord(l.ctx, l.recordID, RecordUpdate{LogAppend: message + "\n"})
}
// 辅助函数
func computeFileSHA256(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return hex.EncodeToString(h.Sum(nil)), nil
}
func splitCommaOrNewline(s string) []string {
var result []string
for _, part := range strings.FieldsFunc(s, func(r rune) bool {
return r == ',' || r == '\n' || r == ';'
}) {
if p := strings.TrimSpace(part); p != "" {
result = append(result, p)
}
}
return result
}

View File

@@ -0,0 +1,49 @@
package agent
import (
"fmt"
"os"
"path/filepath"
"sort"
)
// DirEntry Agent 返回给 Master 的目录项。
type DirEntry struct {
Name string `json:"name"`
Path string `json:"path"`
IsDir bool `json:"isDir"`
Size int64 `json:"size"`
}
// listLocalDir 列出 Agent 所在机器的指定路径。
func listLocalDir(path string) ([]DirEntry, error) {
cleaned := filepath.Clean(path)
if cleaned == "" {
cleaned = "/"
}
entries, err := os.ReadDir(cleaned)
if err != nil {
return nil, fmt.Errorf("read dir: %w", err)
}
result := make([]DirEntry, 0, len(entries))
for _, entry := range entries {
info, _ := entry.Info()
size := int64(0)
if info != nil && !entry.IsDir() {
size = info.Size()
}
result = append(result, DirEntry{
Name: entry.Name(),
Path: filepath.Join(cleaned, entry.Name()),
IsDir: entry.IsDir(),
Size: size,
})
}
sort.Slice(result, func(i, j int) bool {
if result[i].IsDir != result[j].IsDir {
return result[i].IsDir
}
return result[i].Name < result[j].Name
})
return result, nil
}

View File

@@ -0,0 +1,61 @@
package agent
import (
"os"
"path/filepath"
"testing"
)
func TestListLocalDir(t *testing.T) {
dir := t.TempDir()
_ = os.WriteFile(filepath.Join(dir, "a.txt"), []byte("hello"), 0644)
_ = os.Mkdir(filepath.Join(dir, "sub"), 0755)
_ = os.WriteFile(filepath.Join(dir, "b.txt"), []byte("world!"), 0644)
entries, err := listLocalDir(dir)
if err != nil {
t.Fatalf("list: %v", err)
}
if len(entries) != 3 {
t.Fatalf("expected 3 entries, got %d", len(entries))
}
// 目录排序靠前
if !entries[0].IsDir || entries[0].Name != "sub" {
t.Errorf("directories should sort first: %+v", entries)
}
// 文件大小正确
var a *DirEntry
for i := range entries {
if entries[i].Name == "a.txt" {
a = &entries[i]
break
}
}
if a == nil || a.Size != 5 {
t.Errorf("file size: %+v", a)
}
}
func TestSplitCommaOrNewline(t *testing.T) {
cases := []struct {
in string
out []string
}{
{"", nil},
{"a,b,c", []string{"a", "b", "c"}},
{"a\nb\nc", []string{"a", "b", "c"}},
{"a; b ,\nc\n", []string{"a", "b", "c"}},
}
for _, c := range cases {
got := splitCommaOrNewline(c.in)
if len(got) != len(c.out) {
t.Errorf("%q: got %v want %v", c.in, got, c.out)
continue
}
for i := range got {
if got[i] != c.out[i] {
t.Errorf("%q[%d]: %q vs %q", c.in, i, got[i], c.out[i])
}
}
}
}

View File

@@ -0,0 +1,12 @@
package agent
import "encoding/json"
// jsonUnmarshalMap 把 []byte 或 json.RawMessage 解为 map[string]any。
func jsonUnmarshalMap(data []byte, out *map[string]any) error {
if len(data) == 0 {
*out = map[string]any{}
return nil
}
return json.Unmarshal(data, out)
}