diff --git a/README.md b/README.md index 4981057..24e5a1d 100644 --- a/README.md +++ b/README.md @@ -319,16 +319,81 @@ Backint Agent 使用本地 SQLite 维护 `EBID ↔ 对象键` 目录,所有操 ## 多节点集群 -BackupX 支持 Master-Agent 模式管理多台服务器: +BackupX 支持 Master-Agent 模式管理多台服务器:备份任务可以指定在哪个节点执行,Agent 在本地完成备份并直接上传到存储后端。 -1. Web 控制台 → **节点管理** → **添加节点**,系统生成 Token -2. 在远程服务器部署 Agent 并使用 Token 连接 Master -3. 创建备份任务时选择对应节点,Master 自动下发任务 +### 架构概览 -- 本机节点自动检测 IP 地址和版本信息 -- 远程节点通过 Agent 心跳上报系统信息(主机名、IP、OS、架构、版本) -- 支持在控制台直接编辑节点名称 -- 创建文件备份任务时可通过目录浏览器远程选择 Agent 节点上的目录 +``` +[Web 控制台] ←── JWT ──→ [Master (backupx)] + ↑ ↓ + │ │ HTTP 长轮询 (token 认证) + │ ↓ + [Agent (backupx agent)] ← 运行在远程服务器 + ↓ + [70+ 存储后端] +``` + +- **通信协议**:HTTP 长轮询,Agent 主动发起所有连接,无需 Master 反向访问 +- **心跳**:Agent 每 15s 上报一次;Master 每 15s 扫描,超过 45s 未心跳判为离线 +- **任务下发**:Master 通过数据库命令队列派发 `run_task`,Agent 轮询拉取 +- **执行**:Agent 本地复用 BackupRunner(file / mysql / postgresql / sqlite / saphana)并直接上传到存储 +- **安全**:每个节点独立 Token;Agent 不持有 Master 的 JWT 密钥和加密密钥 + +### 使用步骤 + +**1. 在 Master 创建节点并获取 Token** + +Web 控制台 → **节点管理** → **添加节点**,填写节点名称并保存。界面会显示一个 64 字节十六进制令牌(仅显示一次,请妥善保存)。 + +**2. 在远程服务器部署 Agent** + +把 BackupX 二进制上传到目标服务器(与 Master 同一个文件),然后用以下任一方式启动: + +```bash +# 方式 A:CLI 参数 +backupx agent --master http://master.example.com:8340 --token + +# 方式 B:配置文件 +cat > /etc/backupx/agent.yaml < +heartbeatInterval: 15s +pollInterval: 5s +tempDir: /var/lib/backupx-agent +EOF +backupx agent --config /etc/backupx/agent.yaml + +# 方式 C:环境变量(适合 Docker / systemd) +BACKUPX_AGENT_MASTER=http://master.example.com:8340 \ +BACKUPX_AGENT_TOKEN= \ +backupx agent +``` + +启动成功后,Master 的节点列表会把该节点标记为**在线**。 + +**3. 创建路由到该节点的备份任务** + +在 **备份任务** 页面新建任务时选择对应节点。任务被触发后: + +- 本机节点或未指定节点(`nodeId=0`):由 Master 进程本地执行 +- 远程节点:Master 写入命令队列 → Agent 轮询拉取 → 本地执行并上传 → 上报记录 + +### 限制说明 + +- **不支持加密备份**:Agent 不持有 Master 的 AES-256 加密密钥,启用 `encrypt: true` 的任务会路由到 Agent 时失败 +- **目录浏览超时**:远程目录浏览通过命令队列做同步 RPC,默认 15s 超时,网络慢时可能失败 +- **命令超时**:Agent 领取但未完成的命令超过 10min 会被标记为超时 + +### CLI 参考 + +```bash +backupx agent --help + -master string Master URL + -token string Agent 认证令牌 + -config string YAML 配置文件路径(优先级高于环境变量) + -temp-dir string 本地临时目录(默认 /tmp/backupx-agent) + -insecure-tls 跳过 TLS 证书校验(仅测试用) +``` --- diff --git a/README_EN.md b/README_EN.md index 1f07077..9f6fc6b 100644 --- a/README_EN.md +++ b/README_EN.md @@ -317,16 +317,81 @@ The Backint Agent maintains an `EBID ↔ object-key` catalog in a local SQLite D ## Multi-Node Cluster -BackupX supports Master-Agent mode for managing multiple servers: +BackupX supports Master-Agent mode for managing multiple servers. Backup tasks can be routed to specific nodes — the Agent runs the backup locally and uploads straight to storage backends. -1. Web Console → **Node Management** → **Add Node** — system generates a Token -2. Deploy Agent on remote server, connect using the Token -3. Create backup tasks and assign to specific nodes — Master dispatches automatically +### Architecture -- Local node auto-detects IP address and version -- Remote nodes report system info via Agent heartbeat (hostname, IP, OS, architecture, version) -- Node names can be edited directly from the console -- Visual directory browser lets you pick directories on remote Agent nodes +``` +[Web Console] ←── JWT ──→ [Master (backupx)] + ↑ ↓ + │ │ HTTP long-poll (token auth) + │ ↓ + [Agent (backupx agent)] ← runs on remote host + ↓ + [70+ Storage Backends] +``` + +- **Protocol**: HTTP long-polling; the Agent initiates all connections — Master never needs reverse access +- **Heartbeat**: Agent reports every 15s; Master marks nodes offline after 45s of silence +- **Dispatch**: Master persists `run_task` commands to a queue; Agent polls and claims them +- **Execution**: Agent reuses the same BackupRunner (file / mysql / postgresql / sqlite / saphana) and uploads directly to storage +- **Security**: Each node gets its own token; the Agent never holds the Master's JWT secret or encryption key + +### Walkthrough + +**1. Create a node on Master and copy the token** + +Web Console → **Node Management** → **Add Node**. The dialog shows a 64-byte hex token once — keep it safe. + +**2. Deploy the Agent on a remote host** + +Upload the BackupX binary (same file as Master) to the target host, then start the Agent: + +```bash +# Option A: CLI flags +backupx agent --master http://master.example.com:8340 --token + +# Option B: config file +cat > /etc/backupx/agent.yaml < +heartbeatInterval: 15s +pollInterval: 5s +tempDir: /var/lib/backupx-agent +EOF +backupx agent --config /etc/backupx/agent.yaml + +# Option C: environment variables (Docker / systemd-friendly) +BACKUPX_AGENT_MASTER=http://master.example.com:8340 \ +BACKUPX_AGENT_TOKEN= \ +backupx agent +``` + +Once connected, the node appears as **online** in the list. + +**3. Create a task routed to that node** + +In the **Backup Tasks** page, pick the target node when creating the task. When triggered: + +- Local / unassigned (`nodeId=0`) tasks run in-process on Master +- Remote-node tasks are enqueued → Agent claims → Agent runs locally → uploads → reports back + +### Limitations + +- **No encrypted backups via Agent**: the Agent doesn't hold Master's AES-256 key. Tasks with `encrypt: true` will fail if routed to an Agent +- **Directory browse timeout**: remote dir listing is a synchronous RPC through the queue; default 15s timeout +- **Command timeout**: claimed-but-unfinished commands are marked timed out after 10 minutes + +### CLI Reference + +```bash +backupx agent --help + -master string Master URL + -token string Agent auth token + -config string YAML config path (takes precedence over env) + -temp-dir string Local temp directory (default /tmp/backupx-agent) + -insecure-tls Skip TLS verification (testing only) +``` --- diff --git a/server/cmd/backupx/agent.go b/server/cmd/backupx/agent.go new file mode 100644 index 0000000..2cccee9 --- /dev/null +++ b/server/cmd/backupx/agent.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + "backupx/server/internal/agent" +) + +// runAgent 是 `backupx agent` 子命令入口。 +// +// 用法: +// +// backupx agent --master http://master:8340 --token +// backupx agent --config /etc/backupx-agent.yaml +// +// 配置优先级:CLI 参数 > 配置文件 > 环境变量 +func runAgent(args []string) { + fs := flag.NewFlagSet("agent", flag.ExitOnError) + configPath := fs.String("config", "", "path to agent config YAML (optional)") + master := fs.String("master", "", "master URL, e.g. http://master.example.com:8340") + token := fs.String("token", "", "agent authentication token") + tempDir := fs.String("temp-dir", "", "local temp directory for backup artifacts") + insecureTLS := fs.Bool("insecure-tls", false, "skip TLS verification (testing only)") + + if err := fs.Parse(args); err != nil { + os.Exit(2) + } + + cfg, err := loadAgentConfig(*configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "agent: load config: %v\n", err) + os.Exit(2) + } + cfg.MergeWithFlags(*master, *token, *tempDir) + if *insecureTLS { + cfg.InsecureSkipTLSVerify = true + } + if err := cfg.Validate(); err != nil { + fmt.Fprintf(os.Stderr, "agent: %v\n", err) + os.Exit(2) + } + + a, err := agent.New(cfg, version) + if err != nil { + fmt.Fprintf(os.Stderr, "agent: init: %v\n", err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + fmt.Fprintf(os.Stderr, "backupx agent %s starting (master=%s)\n", version, cfg.Master) + if err := a.Run(ctx); err != nil && err != context.Canceled { + fmt.Fprintf(os.Stderr, "agent: %v\n", err) + os.Exit(1) + } +} + +// loadAgentConfig 按优先级加载配置:如果提供了 --config 就用文件,否则走环境变量。 +func loadAgentConfig(configPath string) (*agent.Config, error) { + if configPath != "" { + return agent.LoadConfigFile(configPath) + } + return agent.LoadConfigFromEnv() +} diff --git a/server/cmd/backupx/main.go b/server/cmd/backupx/main.go index 5a75019..2a72212 100644 --- a/server/cmd/backupx/main.go +++ b/server/cmd/backupx/main.go @@ -29,6 +29,11 @@ func main() { runBackint(os.Args[2:]) return } + // 子命令分发:agent(远程节点 Agent 模式) + if len(os.Args) > 1 && os.Args[1] == "agent" { + runAgent(os.Args[2:]) + return + } var configPath string var showVersion bool diff --git a/server/go.mod b/server/go.mod index be0de08..b659eed 100644 --- a/server/go.mod +++ b/server/go.mod @@ -14,6 +14,7 @@ require ( golang.org/x/crypto v0.48.0 golang.org/x/oauth2 v0.34.0 google.golang.org/api v0.255.0 + gopkg.in/yaml.v3 v3.0.1 gorm.io/gorm v1.25.12 ) @@ -245,7 +246,6 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/validator.v2 v2.0.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.22.5 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect diff --git a/server/internal/agent/agent.go b/server/internal/agent/agent.go new file mode 100644 index 0000000..843b3a9 --- /dev/null +++ b/server/internal/agent/agent.go @@ -0,0 +1,203 @@ +package agent + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net" + "os" + "runtime" + "strings" + "sync" + "time" +) + +// Agent 是 Agent 进程的主控制器。 +type Agent struct { + cfg *Config + client *MasterClient + executor *Executor + version string + + mu sync.Mutex + started bool +} + +// New 构造 Agent。 +func New(cfg *Config, version string) (*Agent, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + client := NewMasterClient(cfg.Master, cfg.Token, cfg.InsecureSkipTLSVerify) + executor := NewExecutor(client, cfg.TempDir) + return &Agent{ + cfg: cfg, + client: client, + executor: executor, + version: version, + }, nil +} + +// Run 启动 Agent 主循环,阻塞直到 ctx 被取消。 +func (a *Agent) Run(ctx context.Context) error { + a.mu.Lock() + if a.started { + a.mu.Unlock() + return fmt.Errorf("agent already started") + } + a.started = true + a.mu.Unlock() + + hbInterval := parseDuration(a.cfg.HeartbeatInterval, 15*time.Second) + pollInterval := parseDuration(a.cfg.PollInterval, 5*time.Second) + + // 首次握手:通过一次心跳确认 token 有效 + if err := a.heartbeatOnce(ctx); err != nil { + return fmt.Errorf("initial heartbeat failed: %w", err) + } + log.Printf("[agent] connected to master %s", a.cfg.Master) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + a.heartbeatLoop(ctx, hbInterval) + }() + go func() { + defer wg.Done() + a.pollLoop(ctx, pollInterval) + }() + wg.Wait() + return ctx.Err() +} + +// heartbeatLoop 定期发送心跳。 +func (a *Agent) heartbeatLoop(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := a.heartbeatOnce(ctx); err != nil { + log.Printf("[agent] heartbeat failed: %v", err) + } + } + } +} + +func (a *Agent) heartbeatOnce(ctx context.Context) error { + hostname, _ := os.Hostname() + req := HeartbeatRequest{ + Token: a.cfg.Token, + Hostname: hostname, + IPAddress: detectLocalIP(), + AgentVersion: a.version, + OS: runtime.GOOS, + Arch: runtime.GOARCH, + } + _, err := a.client.Heartbeat(ctx, req) + return err +} + +// pollLoop 定期拉取并处理待执行命令。 +func (a *Agent) pollLoop(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.pollAndHandleOnce(ctx) + } + } +} + +func (a *Agent) pollAndHandleOnce(ctx context.Context) { + cmd, err := a.client.PollCommand(ctx) + if err != nil { + log.Printf("[agent] poll command failed: %v", err) + return + } + if cmd == nil { + return + } + log.Printf("[agent] received command #%d type=%s", cmd.ID, cmd.Type) + switch cmd.Type { + case "run_task": + a.handleRunTask(ctx, cmd) + case "list_dir": + a.handleListDir(ctx, cmd) + default: + msg := fmt.Sprintf("unknown command type: %s", cmd.Type) + log.Printf("[agent] %s", msg) + _ = a.client.SubmitCommandResult(ctx, cmd.ID, false, msg, nil) + } +} + +// handleRunTask 处理 run_task 命令 +func (a *Agent) handleRunTask(ctx context.Context, cmd *CommandPayload) { + var payload struct { + TaskID uint `json:"taskId"` + RecordID uint `json:"recordId"` + } + if err := json.Unmarshal(cmd.Payload, &payload); err != nil { + _ = a.client.SubmitCommandResult(ctx, cmd.ID, false, "invalid payload: "+err.Error(), nil) + return + } + if err := a.executor.ExecuteRunTask(ctx, payload.TaskID, payload.RecordID); err != nil { + _ = a.client.SubmitCommandResult(ctx, cmd.ID, false, err.Error(), nil) + return + } + _ = a.client.SubmitCommandResult(ctx, cmd.ID, true, "", map[string]any{ + "taskId": payload.TaskID, + "recordId": payload.RecordID, + }) +} + +// handleListDir 处理 list_dir 命令(阶段四实现) +func (a *Agent) handleListDir(ctx context.Context, cmd *CommandPayload) { + var payload struct { + Path string `json:"path"` + } + if err := json.Unmarshal(cmd.Payload, &payload); err != nil { + _ = a.client.SubmitCommandResult(ctx, cmd.ID, false, "invalid payload: "+err.Error(), nil) + return + } + entries, err := listLocalDir(payload.Path) + if err != nil { + _ = a.client.SubmitCommandResult(ctx, cmd.ID, false, err.Error(), nil) + return + } + _ = a.client.SubmitCommandResult(ctx, cmd.ID, true, "", map[string]any{"entries": entries}) +} + +// 辅助函数 + +func parseDuration(s string, fallback time.Duration) time.Duration { + if strings.TrimSpace(s) == "" { + return fallback + } + if d, err := time.ParseDuration(s); err == nil { + return d + } + return fallback +} + +func detectLocalIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "" + } + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + return ipNet.IP.String() + } + } + } + return "" +} diff --git a/server/internal/agent/client.go b/server/internal/agent/client.go new file mode 100644 index 0000000..5b9da08 --- /dev/null +++ b/server/internal/agent/client.go @@ -0,0 +1,208 @@ +package agent + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// MasterClient 是 Agent 调用 Master HTTP API 的封装。 +type MasterClient struct { + baseURL string + token string + httpClient *http.Client +} + +// NewMasterClient 构造 Master 客户端。 +func NewMasterClient(baseURL, token string, insecureTLS bool) *MasterClient { + transport := &http.Transport{} + if insecureTLS { + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + return &MasterClient{ + baseURL: strings.TrimRight(baseURL, "/"), + token: token, + httpClient: &http.Client{ + Timeout: 120 * time.Second, + Transport: transport, + }, + } +} + +// HeartbeatRequest Agent 上报心跳的请求 +type HeartbeatRequest struct { + Token string `json:"token"` + Hostname string `json:"hostname,omitempty"` + IPAddress string `json:"ipAddress,omitempty"` + AgentVersion string `json:"agentVersion,omitempty"` + OS string `json:"os,omitempty"` + Arch string `json:"arch,omitempty"` +} + +// HeartbeatResponse Master 返回的心跳响应 +type HeartbeatResponse struct { + Status string `json:"status"` + NodeID uint `json:"nodeId"` + Name string `json:"name"` +} + +// Heartbeat 上报心跳并获取节点元信息 +func (c *MasterClient) Heartbeat(ctx context.Context, req HeartbeatRequest) (*HeartbeatResponse, error) { + var resp HeartbeatResponse + if err := c.do(ctx, http.MethodPost, "/api/agent/heartbeat", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +// CommandPayload 与 service.AgentCommandPayload 对齐 +type CommandPayload struct { + ID uint `json:"id"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +// PollCommandResponse 轮询响应:无命令时 Command 为 nil +type PollCommandResponse struct { + Command *CommandPayload `json:"command"` +} + +// PollCommand 拉取下一条待执行命令 +func (c *MasterClient) PollCommand(ctx context.Context) (*CommandPayload, error) { + var resp PollCommandResponse + if err := c.do(ctx, http.MethodPost, "/api/agent/commands/poll", nil, &resp); err != nil { + return nil, err + } + return resp.Command, nil +} + +// SubmitCommandResult 上报命令执行结果 +func (c *MasterClient) SubmitCommandResult(ctx context.Context, cmdID uint, success bool, errorMsg string, result any) error { + var resultJSON json.RawMessage + if result != nil { + data, err := json.Marshal(result) + if err != nil { + return fmt.Errorf("marshal result: %w", err) + } + resultJSON = data + } + payload := map[string]any{ + "success": success, + "errorMessage": errorMsg, + } + if resultJSON != nil { + payload["result"] = resultJSON + } + path := fmt.Sprintf("/api/agent/commands/%d/result", cmdID) + return c.do(ctx, http.MethodPost, path, payload, nil) +} + +// TaskSpec 与 service.AgentTaskSpec 对齐 +type TaskSpec struct { + TaskID uint `json:"taskId"` + Name string `json:"name"` + Type string `json:"type"` + SourcePath string `json:"sourcePath"` + SourcePaths string `json:"sourcePaths"` + ExcludePatterns string `json:"excludePatterns"` + DBHost string `json:"dbHost"` + DBPort int `json:"dbPort"` + DBUser string `json:"dbUser"` + DBPassword string `json:"dbPassword"` + DBName string `json:"dbName"` + DBPath string `json:"dbPath"` + ExtraConfig string `json:"extraConfig"` + Compression string `json:"compression"` + Encrypt bool `json:"encrypt"` + StorageTargets []StorageTargetConfig `json:"storageTargets"` +} + +// StorageTargetConfig 与 service.AgentStorageTargetConfig 对齐 +type StorageTargetConfig struct { + ID uint `json:"id"` + Type string `json:"type"` + Name string `json:"name"` + Config json.RawMessage `json:"config"` +} + +// GetTaskSpec 拉取任务规格 +func (c *MasterClient) GetTaskSpec(ctx context.Context, taskID uint) (*TaskSpec, error) { + var spec TaskSpec + path := fmt.Sprintf("/api/agent/tasks/%d", taskID) + if err := c.do(ctx, http.MethodGet, path, nil, &spec); err != nil { + return nil, err + } + return &spec, nil +} + +// RecordUpdate 与 service.AgentRecordUpdate 对齐 +type RecordUpdate struct { + Status string `json:"status,omitempty"` + FileName string `json:"fileName,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` + Checksum string `json:"checksum,omitempty"` + StoragePath string `json:"storagePath,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + LogAppend string `json:"logAppend,omitempty"` +} + +// UpdateRecord 上报备份记录的状态/日志 +func (c *MasterClient) UpdateRecord(ctx context.Context, recordID uint, update RecordUpdate) error { + path := fmt.Sprintf("/api/agent/records/%d", recordID) + return c.do(ctx, http.MethodPost, path, update, nil) +} + +// do 是通用 HTTP 调用。所有 Agent API 都统一走 JSON + X-Agent-Token。 +func (c *MasterClient) do(ctx context.Context, method, path string, body any, out any) error { + var reqBody io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("marshal request: %w", err) + } + reqBody = bytes.NewReader(data) + } + req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, reqBody) + if err != nil { + return err + } + req.Header.Set("X-Agent-Token", c.token) + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("%s %s: %w", method, path, err) + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("%s %s: http %d: %s", method, path, resp.StatusCode, string(data)) + } + if out == nil { + return nil + } + // BackupX API 统一封装成 {code, data, message} 形式,需要解出 data 字段 + var envelope struct { + Code string `json:"code"` + Data json.RawMessage `json:"data"` + Message string `json:"message"` + } + if err := json.Unmarshal(data, &envelope); err == nil && envelope.Data != nil { + if err := json.Unmarshal(envelope.Data, out); err != nil { + return fmt.Errorf("decode data: %w", err) + } + return nil + } + // 兼容直接返回对象的情况 + return json.Unmarshal(data, out) +} diff --git a/server/internal/agent/config.go b/server/internal/agent/config.go new file mode 100644 index 0000000..82d4873 --- /dev/null +++ b/server/internal/agent/config.go @@ -0,0 +1,105 @@ +// Package agent 实现 BackupX 远程 Agent。 +// +// Agent 是一个独立的 Go 进程,部署在远程服务器上,通过 HTTP 轮询的方式 +// 与 Master 通信:定期上报心跳、拉取 Master 下发的命令、本地执行备份、 +// 把执行结果和日志回报给 Master。 +// +// 通信协议见 server/internal/http/agent_handler.go。 +package agent + +import ( + "errors" + "fmt" + "os" + "strings" + + "gopkg.in/yaml.v3" +) + +// Config 是 Agent 的运行时配置。 +type Config struct { + // Master BackupX Master 的 HTTP 基础地址,例如 http://master.example.com:8340 + Master string `yaml:"master"` + // Token 节点认证令牌(在 Master 创建节点时生成) + Token string `yaml:"token"` + // HeartbeatInterval 心跳间隔,默认 15s + HeartbeatInterval string `yaml:"heartbeatInterval"` + // PollInterval 命令轮询间隔,默认 5s + PollInterval string `yaml:"pollInterval"` + // TempDir 备份临时目录,默认 /tmp/backupx-agent + TempDir string `yaml:"tempDir"` + // InsecureSkipTLSVerify 测试环境允许跳过 TLS 证书校验 + InsecureSkipTLSVerify bool `yaml:"insecureSkipTlsVerify"` +} + +// LoadConfigFile 从 YAML 文件加载 Agent 配置。 +func LoadConfigFile(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read agent config: %w", err) + } + var cfg Config + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("parse agent config: %w", err) + } + return applyConfigDefaults(&cfg) +} + +// LoadConfigFromEnv 从环境变量加载 Agent 配置。优先级低于 --config 文件。 +// +// 支持的环境变量: +// - BACKUPX_AGENT_MASTER Master URL +// - BACKUPX_AGENT_TOKEN 节点认证令牌 +// - BACKUPX_AGENT_HEARTBEAT 心跳间隔(如 15s) +// - BACKUPX_AGENT_POLL 命令轮询间隔(如 5s) +// - BACKUPX_AGENT_TEMP_DIR 临时目录 +// - BACKUPX_AGENT_INSECURE_TLS true / 1 跳过 TLS 校验 +func LoadConfigFromEnv() (*Config, error) { + cfg := &Config{ + Master: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_MASTER")), + Token: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_TOKEN")), + HeartbeatInterval: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_HEARTBEAT")), + PollInterval: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_POLL")), + TempDir: strings.TrimSpace(os.Getenv("BACKUPX_AGENT_TEMP_DIR")), + InsecureSkipTLSVerify: strings.EqualFold(os.Getenv("BACKUPX_AGENT_INSECURE_TLS"), "true") || os.Getenv("BACKUPX_AGENT_INSECURE_TLS") == "1", + } + return applyConfigDefaults(cfg) +} + +// MergeWithFlags 把命令行覆盖值合并入配置(非空覆盖)。 +func (c *Config) MergeWithFlags(master, token, tempDir string) { + if strings.TrimSpace(master) != "" { + c.Master = master + } + if strings.TrimSpace(token) != "" { + c.Token = token + } + if strings.TrimSpace(tempDir) != "" { + c.TempDir = tempDir + } +} + +// Validate 校验必填字段。 +func (c *Config) Validate() error { + if strings.TrimSpace(c.Master) == "" { + return errors.New("master url is required (set via --master, BACKUPX_AGENT_MASTER or config file)") + } + if strings.TrimSpace(c.Token) == "" { + return errors.New("token is required (set via --token, BACKUPX_AGENT_TOKEN or config file)") + } + return nil +} + +func applyConfigDefaults(cfg *Config) (*Config, error) { + if cfg.HeartbeatInterval == "" { + cfg.HeartbeatInterval = "15s" + } + if cfg.PollInterval == "" { + cfg.PollInterval = "5s" + } + if cfg.TempDir == "" { + cfg.TempDir = "/tmp/backupx-agent" + } + cfg.Master = strings.TrimRight(strings.TrimSpace(cfg.Master), "/") + return cfg, nil +} diff --git a/server/internal/agent/config_test.go b/server/internal/agent/config_test.go new file mode 100644 index 0000000..838a38d --- /dev/null +++ b/server/internal/agent/config_test.go @@ -0,0 +1,101 @@ +package agent + +import ( + "os" + "path/filepath" + "testing" +) + +func TestLoadConfigFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "agent.yaml") + content := `master: http://master.example.com:8340/ +token: abc123 +heartbeatInterval: 20s +pollInterval: 3s +tempDir: /var/backupx-agent +insecureSkipTlsVerify: true +` + if err := os.WriteFile(path, []byte(content), 0644); err != nil { + t.Fatal(err) + } + cfg, err := LoadConfigFile(path) + if err != nil { + t.Fatalf("load: %v", err) + } + if cfg.Master != "http://master.example.com:8340" { + t.Errorf("trailing slash should be trimmed: %q", cfg.Master) + } + if cfg.Token != "abc123" { + t.Errorf("token: %q", cfg.Token) + } + if cfg.HeartbeatInterval != "20s" || cfg.PollInterval != "3s" { + t.Errorf("intervals: %+v", cfg) + } + if !cfg.InsecureSkipTLSVerify { + t.Errorf("insecure should be true") + } +} + +func TestLoadConfigDefaults(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "agent.yaml") + if err := os.WriteFile(path, []byte("master: http://m\ntoken: t\n"), 0644); err != nil { + t.Fatal(err) + } + cfg, err := LoadConfigFile(path) + if err != nil { + t.Fatal(err) + } + if cfg.HeartbeatInterval != "15s" || cfg.PollInterval != "5s" { + t.Errorf("default intervals not applied: %+v", cfg) + } + if cfg.TempDir != "/tmp/backupx-agent" { + t.Errorf("default tempdir: %q", cfg.TempDir) + } +} + +func TestConfigValidate(t *testing.T) { + cases := []struct { + name string + cfg Config + wantErr bool + }{ + {"valid", Config{Master: "http://m", Token: "t"}, false}, + {"missing master", Config{Token: "t"}, true}, + {"missing token", Config{Master: "http://m"}, true}, + } + for _, c := range cases { + err := c.cfg.Validate() + if (err != nil) != c.wantErr { + t.Errorf("%s: err=%v wantErr=%v", c.name, err, c.wantErr) + } + } +} + +func TestMergeWithFlags(t *testing.T) { + cfg := &Config{Master: "http://old", Token: "old"} + cfg.MergeWithFlags("http://new", "", "/tmp/x") + if cfg.Master != "http://new" { + t.Errorf("master not overridden: %q", cfg.Master) + } + if cfg.Token != "old" { + t.Errorf("empty flag should not override: %q", cfg.Token) + } + if cfg.TempDir != "/tmp/x" { + t.Errorf("tempDir: %q", cfg.TempDir) + } +} + +func TestLoadConfigFromEnv(t *testing.T) { + t.Setenv("BACKUPX_AGENT_MASTER", "http://env-master") + t.Setenv("BACKUPX_AGENT_TOKEN", "env-token") + t.Setenv("BACKUPX_AGENT_INSECURE_TLS", "true") + cfg, err := LoadConfigFromEnv() + if err != nil { + t.Fatal(err) + } + if cfg.Master != "http://env-master" || cfg.Token != "env-token" || !cfg.InsecureSkipTLSVerify { + t.Errorf("env not picked up: %+v", cfg) + } +} diff --git a/server/internal/agent/executor.go b/server/internal/agent/executor.go new file mode 100644 index 0000000..80cb33e --- /dev/null +++ b/server/internal/agent/executor.go @@ -0,0 +1,266 @@ +package agent + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "backupx/server/internal/backup" + "backupx/server/internal/storage" + storageRclone "backupx/server/internal/storage/rclone" + "backupx/server/pkg/compress" +) + +// Executor 负责在 Agent 本地执行命令。 +type Executor struct { + client *MasterClient + tempDir string + backupRegistry *backup.Registry + storageRegistry *storage.Registry +} + +// NewExecutor 构造执行器。预先初始化 backup runner 与 storage registry。 +func NewExecutor(client *MasterClient, tempDir string) *Executor { + backupRegistry := backup.NewRegistry( + backup.NewFileRunner(), + backup.NewSQLiteRunner(), + backup.NewMySQLRunner(nil), + backup.NewPostgreSQLRunner(nil), + backup.NewSAPHANARunner(nil), + ) + storageRegistry := storage.NewRegistry( + storageRclone.NewLocalDiskFactory(), + storageRclone.NewS3Factory(), + storageRclone.NewWebDAVFactory(), + storageRclone.NewGoogleDriveFactory(), + storageRclone.NewAliyunOSSFactory(), + storageRclone.NewTencentCOSFactory(), + storageRclone.NewQiniuKodoFactory(), + storageRclone.NewFTPFactory(), + storageRclone.NewRcloneFactory(), + ) + storageRclone.RegisterAllBackends(storageRegistry) + return &Executor{ + client: client, + tempDir: tempDir, + backupRegistry: backupRegistry, + storageRegistry: storageRegistry, + } +} + +// ExecuteRunTask 处理 run_task 命令:拉规格 → 执行 runner → 压缩 → 上传 → 上报记录。 +// +// 注意:Agent 当前不支持 Encrypt=true(加密密钥不下发到 Agent,避免密钥扩散)。 +// 遇到启用加密的任务会向 Master 上报失败并返回错误。 +func (e *Executor) ExecuteRunTask(ctx context.Context, taskID, recordID uint) error { + // 1) 拉取任务规格 + spec, err := e.client.GetTaskSpec(ctx, taskID) + if err != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("拉取任务规格失败: %v", err)) + return err + } + if spec.Encrypt { + msg := "Agent 不支持加密备份(加密密钥仅在 Master 端持有)" + e.reportRecordFailure(ctx, recordID, msg) + return fmt.Errorf("%s", msg) + } + e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 开始执行任务 %s (type=%s)\n", spec.Name, spec.Type)) + + // 2) 构造 backup.TaskSpec 并找对应 runner + startedAt := time.Now().UTC() + if err := os.MkdirAll(e.tempDir, 0o755); err != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("创建临时目录失败: %v", err)) + return err + } + backupSpec := buildBackupTaskSpec(spec, startedAt, e.tempDir) + runner, err := e.backupRegistry.Runner(backupSpec.Type) + if err != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("不支持的备份类型: %v", err)) + return err + } + + // 3) 运行 runner + logger := newRecordLogger(ctx, e.client, recordID) + result, err := runner.Run(ctx, backupSpec, logger) + if err != nil { + e.reportRecordFailure(ctx, recordID, err.Error()) + return err + } + defer os.RemoveAll(result.TempDir) + + // 4) 可选 gzip 压缩 + finalPath := result.ArtifactPath + if strings.EqualFold(spec.Compression, "gzip") && !strings.HasSuffix(strings.ToLower(finalPath), ".gz") { + e.appendLog(ctx, recordID, "[agent] 开始压缩备份文件\n") + compressedPath, compressErr := compress.GzipFile(finalPath) + if compressErr != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("压缩失败: %v", compressErr)) + return compressErr + } + finalPath = compressedPath + } + info, err := os.Stat(finalPath) + if err != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("获取文件信息失败: %v", err)) + return err + } + fileName := filepath.Base(finalPath) + fileSize := info.Size() + storagePath := backup.BuildStorageKey(spec.Type, startedAt, fileName) + + // 5) 计算 checksum(一次读一次)并上传到所有目标 + checksum, err := computeFileSHA256(finalPath) + if err != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("计算 checksum 失败: %v", err)) + return err + } + if len(spec.StorageTargets) == 0 { + e.reportRecordFailure(ctx, recordID, "没有关联的存储目标") + return fmt.Errorf("no storage targets") + } + for _, target := range spec.StorageTargets { + if err := e.uploadToTarget(ctx, recordID, target, finalPath, storagePath, fileSize, spec.TaskID); err != nil { + e.reportRecordFailure(ctx, recordID, fmt.Sprintf("上传到 %s 失败: %v", target.Name, err)) + return err + } + e.appendLog(ctx, recordID, fmt.Sprintf("[agent] 已上传到存储目标 %s\n", target.Name)) + } + + // 6) 上报最终成功 + return e.client.UpdateRecord(ctx, recordID, RecordUpdate{ + Status: "success", + FileName: fileName, + FileSize: fileSize, + Checksum: checksum, + StoragePath: storagePath, + LogAppend: fmt.Sprintf("[agent] 任务完成,总计 %d 字节\n", fileSize), + }) +} + +// uploadToTarget 上传单个目标。为保持简化不做上传级重试(rclone 本身已有 low-level 重试)。 +func (e *Executor) uploadToTarget(ctx context.Context, recordID uint, target StorageTargetConfig, filePath, objectKey string, fileSize int64, taskID uint) error { + var rawConfig map[string]any + if len(target.Config) > 0 { + // DecodeRawConfig 通过 json 解析 + if err := jsonUnmarshalMap(target.Config, &rawConfig); err != nil { + return fmt.Errorf("parse storage config: %w", err) + } + } + provider, err := e.storageRegistry.Create(ctx, target.Type, rawConfig) + if err != nil { + return fmt.Errorf("create provider: %w", err) + } + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("open artifact: %w", err) + } + defer f.Close() + meta := map[string]string{ + "taskId": fmt.Sprintf("%d", taskID), + "recordId": fmt.Sprintf("%d", recordID), + } + return provider.Upload(ctx, objectKey, f, fileSize, meta) +} + +// appendLog 追加日志到 Master 记录(尽力而为,失败不中断主流程) +func (e *Executor) appendLog(ctx context.Context, recordID uint, line string) { + _ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{LogAppend: line}) +} + +// reportRecordFailure 上报失败状态 +func (e *Executor) reportRecordFailure(ctx context.Context, recordID uint, msg string) { + _ = e.client.UpdateRecord(ctx, recordID, RecordUpdate{ + Status: "failed", + ErrorMessage: msg, + LogAppend: fmt.Sprintf("[agent] 错误: %s\n", msg), + }) +} + +// buildBackupTaskSpec 把 AgentTaskSpec 转换为 backup.TaskSpec。 +func buildBackupTaskSpec(spec *TaskSpec, startedAt time.Time, tempDir string) backup.TaskSpec { + var sourcePaths []string + if strings.TrimSpace(spec.SourcePaths) != "" { + for _, p := range strings.Split(spec.SourcePaths, "\n") { + if p = strings.TrimSpace(p); p != "" { + sourcePaths = append(sourcePaths, p) + } + } + } + var excludes []string + if strings.TrimSpace(spec.ExcludePatterns) != "" { + for _, p := range strings.Split(spec.ExcludePatterns, "\n") { + if p = strings.TrimSpace(p); p != "" { + excludes = append(excludes, p) + } + } + } + return backup.TaskSpec{ + ID: spec.TaskID, + Name: spec.Name, + Type: spec.Type, + SourcePath: spec.SourcePath, + SourcePaths: sourcePaths, + ExcludePatterns: excludes, + Database: backup.DatabaseSpec{ + Host: spec.DBHost, + Port: spec.DBPort, + User: spec.DBUser, + Password: spec.DBPassword, + Path: spec.DBPath, + Names: splitCommaOrNewline(spec.DBName), + }, + Compression: spec.Compression, + Encrypt: spec.Encrypt, + StartedAt: startedAt, + TempDir: tempDir, + } +} + +// recordLogger 把 runner 日志回传到 Master 记录。 +// 实现 backup.LogWriter,每条日志追加到 record.log_content。 +type recordLogger struct { + ctx context.Context + client *MasterClient + recordID uint +} + +func newRecordLogger(ctx context.Context, client *MasterClient, recordID uint) *recordLogger { + return &recordLogger{ctx: ctx, client: client, recordID: recordID} +} + +func (l *recordLogger) WriteLine(message string) { + _ = l.client.UpdateRecord(l.ctx, l.recordID, RecordUpdate{LogAppend: message + "\n"}) +} + +// 辅助函数 + +func computeFileSHA256(path string) (string, error) { + f, err := os.Open(path) + if err != nil { + return "", err + } + defer f.Close() + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return "", err + } + return hex.EncodeToString(h.Sum(nil)), nil +} + +func splitCommaOrNewline(s string) []string { + var result []string + for _, part := range strings.FieldsFunc(s, func(r rune) bool { + return r == ',' || r == '\n' || r == ';' + }) { + if p := strings.TrimSpace(part); p != "" { + result = append(result, p) + } + } + return result +} diff --git a/server/internal/agent/fs.go b/server/internal/agent/fs.go new file mode 100644 index 0000000..135dea3 --- /dev/null +++ b/server/internal/agent/fs.go @@ -0,0 +1,49 @@ +package agent + +import ( + "fmt" + "os" + "path/filepath" + "sort" +) + +// DirEntry Agent 返回给 Master 的目录项。 +type DirEntry struct { + Name string `json:"name"` + Path string `json:"path"` + IsDir bool `json:"isDir"` + Size int64 `json:"size"` +} + +// listLocalDir 列出 Agent 所在机器的指定路径。 +func listLocalDir(path string) ([]DirEntry, error) { + cleaned := filepath.Clean(path) + if cleaned == "" { + cleaned = "/" + } + entries, err := os.ReadDir(cleaned) + if err != nil { + return nil, fmt.Errorf("read dir: %w", err) + } + result := make([]DirEntry, 0, len(entries)) + for _, entry := range entries { + info, _ := entry.Info() + size := int64(0) + if info != nil && !entry.IsDir() { + size = info.Size() + } + result = append(result, DirEntry{ + Name: entry.Name(), + Path: filepath.Join(cleaned, entry.Name()), + IsDir: entry.IsDir(), + Size: size, + }) + } + sort.Slice(result, func(i, j int) bool { + if result[i].IsDir != result[j].IsDir { + return result[i].IsDir + } + return result[i].Name < result[j].Name + }) + return result, nil +} diff --git a/server/internal/agent/fs_test.go b/server/internal/agent/fs_test.go new file mode 100644 index 0000000..fe3229b --- /dev/null +++ b/server/internal/agent/fs_test.go @@ -0,0 +1,61 @@ +package agent + +import ( + "os" + "path/filepath" + "testing" +) + +func TestListLocalDir(t *testing.T) { + dir := t.TempDir() + _ = os.WriteFile(filepath.Join(dir, "a.txt"), []byte("hello"), 0644) + _ = os.Mkdir(filepath.Join(dir, "sub"), 0755) + _ = os.WriteFile(filepath.Join(dir, "b.txt"), []byte("world!"), 0644) + + entries, err := listLocalDir(dir) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(entries) != 3 { + t.Fatalf("expected 3 entries, got %d", len(entries)) + } + // 目录排序靠前 + if !entries[0].IsDir || entries[0].Name != "sub" { + t.Errorf("directories should sort first: %+v", entries) + } + // 文件大小正确 + var a *DirEntry + for i := range entries { + if entries[i].Name == "a.txt" { + a = &entries[i] + break + } + } + if a == nil || a.Size != 5 { + t.Errorf("file size: %+v", a) + } +} + +func TestSplitCommaOrNewline(t *testing.T) { + cases := []struct { + in string + out []string + }{ + {"", nil}, + {"a,b,c", []string{"a", "b", "c"}}, + {"a\nb\nc", []string{"a", "b", "c"}}, + {"a; b ,\nc\n", []string{"a", "b", "c"}}, + } + for _, c := range cases { + got := splitCommaOrNewline(c.in) + if len(got) != len(c.out) { + t.Errorf("%q: got %v want %v", c.in, got, c.out) + continue + } + for i := range got { + if got[i] != c.out[i] { + t.Errorf("%q[%d]: %q vs %q", c.in, i, got[i], c.out[i]) + } + } + } +} diff --git a/server/internal/agent/json_util.go b/server/internal/agent/json_util.go new file mode 100644 index 0000000..5281f06 --- /dev/null +++ b/server/internal/agent/json_util.go @@ -0,0 +1,12 @@ +package agent + +import "encoding/json" + +// jsonUnmarshalMap 把 []byte 或 json.RawMessage 解为 map[string]any。 +func jsonUnmarshalMap(data []byte, out *map[string]any) error { + if len(data) == 0 { + *out = map[string]any{} + return nil + } + return json.Unmarshal(data, out) +} diff --git a/server/internal/app/app.go b/server/internal/app/app.go index 6542e4f..fa913f1 100644 --- a/server/internal/app/app.go +++ b/server/internal/app/app.go @@ -112,9 +112,21 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, // Cluster: Node management nodeRepo := repository.NewNodeRepository(db) nodeService := service.NewNodeService(nodeRepo, version) + nodeService.SetTaskRepository(backupTaskRepo) if err := nodeService.EnsureLocalNode(ctx); err != nil { appLogger.Warn("failed to ensure local node", zap.Error(err)) } + // 启动离线检测:每 15s 扫描一次,超过 45s 未心跳的远程节点标记为离线 + nodeService.StartOfflineMonitor(ctx, 15*time.Second) + + // Agent 协议服务:命令队列 + 任务下发 + 记录上报 + agentCmdRepo := repository.NewAgentCommandRepository(db) + agentService := service.NewAgentService(nodeRepo, backupTaskRepo, backupRecordRepo, storageTargetRepo, agentCmdRepo, configCipher) + agentService.StartCommandTimeoutMonitor(ctx, 30*time.Second, 10*time.Minute) + // 把 Agent 下发能力注入到备份执行服务,实现多节点路由 + backupExecutionService.SetClusterDependencies(nodeRepo, agentService) + // 启用远程目录浏览:NodeService 通过 AgentService 做同步 RPC + nodeService.SetAgentRPC(agentService) router := aphttp.NewRouter(aphttp.RouterDependencies{ Config: cfg, @@ -130,6 +142,7 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application, DashboardService: dashboardService, SettingsService: settingsService, NodeService: nodeService, + AgentService: agentService, DatabaseDiscoveryService: databaseDiscoveryService, AuditService: auditService, JWTManager: jwtManager, diff --git a/server/internal/database/database.go b/server/internal/database/database.go index 1f4c906..030e1f5 100644 --- a/server/internal/database/database.go +++ b/server/internal/database/database.go @@ -23,7 +23,7 @@ func Open(cfg config.DatabaseConfig, logger *zap.Logger) (*gorm.DB, error) { return nil, fmt.Errorf("open sqlite: %w", err) } - if err := db.AutoMigrate(&model.User{}, &model.SystemConfig{}, &model.StorageTarget{}, &model.OAuthSession{}, &model.BackupTask{}, &model.BackupRecord{}, &model.Notification{}, &model.Node{}, &model.BackupTaskStorageTarget{}, &model.AuditLog{}); err != nil { + if err := db.AutoMigrate(&model.User{}, &model.SystemConfig{}, &model.StorageTarget{}, &model.OAuthSession{}, &model.BackupTask{}, &model.BackupRecord{}, &model.Notification{}, &model.Node{}, &model.BackupTaskStorageTarget{}, &model.AuditLog{}, &model.AgentCommand{}); err != nil { return nil, fmt.Errorf("migrate schema: %w", err) } diff --git a/server/internal/http/agent_handler.go b/server/internal/http/agent_handler.go new file mode 100644 index 0000000..b0195eb --- /dev/null +++ b/server/internal/http/agent_handler.go @@ -0,0 +1,156 @@ +package http + +import ( + stdhttp "net/http" + "strconv" + "strings" + + "backupx/server/internal/service" + "backupx/server/pkg/response" + + "github.com/gin-gonic/gin" +) + +// AgentHandler 实现 Agent 调用 Master 的 HTTP API。 +// 全部端点通过 X-Agent-Token 头做节点认证,不使用 JWT。 +type AgentHandler struct { + agentService *service.AgentService + nodeService *service.NodeService +} + +func NewAgentHandler(agentService *service.AgentService, nodeService *service.NodeService) *AgentHandler { + return &AgentHandler{agentService: agentService, nodeService: nodeService} +} + +// extractToken 从请求头或 JSON body 中提取 Agent Token。 +func extractToken(c *gin.Context) string { + if t := strings.TrimSpace(c.GetHeader("X-Agent-Token")); t != "" { + return t + } + // Authorization: Bearer + if auth := c.GetHeader("Authorization"); strings.HasPrefix(auth, "Bearer ") { + return strings.TrimSpace(strings.TrimPrefix(auth, "Bearer ")) + } + return "" +} + +// Heartbeat 扩展原有 heartbeat:除上报状态外,返回节点 ID 给 Agent 做后续调用。 +func (h *AgentHandler) Heartbeat(c *gin.Context) { + var input struct { + Token string `json:"token"` + Hostname string `json:"hostname"` + IPAddress string `json:"ipAddress"` + AgentVersion string `json:"agentVersion"` + OS string `json:"os"` + Arch string `json:"arch"` + } + _ = c.ShouldBindJSON(&input) + // token 优先走 body(向后兼容),否则从 header 读 + token := input.Token + if token == "" { + token = extractToken(c) + } + if token == "" { + c.JSON(stdhttp.StatusBadRequest, gin.H{"code": "INVALID_INPUT", "message": "missing token"}) + return + } + if err := h.nodeService.Heartbeat(c.Request.Context(), token, input.Hostname, input.IPAddress, input.AgentVersion, input.OS, input.Arch); err != nil { + response.Error(c, err) + return + } + // 返回节点元信息给 Agent(node_id 用于后续 API 路径) + node, err := h.agentService.AuthenticatedNode(c.Request.Context(), token) + if err != nil { + response.Error(c, err) + return + } + response.Success(c, gin.H{ + "status": "ok", + "nodeId": node.ID, + "name": node.Name, + }) +} + +// Poll Agent 长轮询获取下一条待执行命令。 +// 无命令时返回 {command: null}。 +func (h *AgentHandler) Poll(c *gin.Context) { + node, err := h.agentService.AuthenticatedNode(c.Request.Context(), extractToken(c)) + if err != nil { + response.Error(c, err) + return + } + cmd, err := h.agentService.PollCommand(c.Request.Context(), node) + if err != nil { + response.Error(c, err) + return + } + response.Success(c, gin.H{"command": cmd}) +} + +// SubmitCommandResult Agent 上报命令执行结果。 +func (h *AgentHandler) SubmitCommandResult(c *gin.Context) { + node, err := h.agentService.AuthenticatedNode(c.Request.Context(), extractToken(c)) + if err != nil { + response.Error(c, err) + return + } + id, err := strconv.ParseUint(c.Param("id"), 10, 32) + if err != nil { + response.Error(c, err) + return + } + var input service.AgentCommandResult + if err := c.ShouldBindJSON(&input); err != nil { + c.JSON(stdhttp.StatusBadRequest, gin.H{"code": "INVALID_INPUT", "message": err.Error()}) + return + } + if err := h.agentService.SubmitCommandResult(c.Request.Context(), node, uint(id), input); err != nil { + response.Error(c, err) + return + } + response.Success(c, gin.H{"status": "ok"}) +} + +// GetTaskSpec Agent 拉取任务规格(含解密后的存储配置)。 +func (h *AgentHandler) GetTaskSpec(c *gin.Context) { + node, err := h.agentService.AuthenticatedNode(c.Request.Context(), extractToken(c)) + if err != nil { + response.Error(c, err) + return + } + id, err := strconv.ParseUint(c.Param("id"), 10, 32) + if err != nil { + response.Error(c, err) + return + } + spec, err := h.agentService.GetTaskSpec(c.Request.Context(), node, uint(id)) + if err != nil { + response.Error(c, err) + return + } + response.Success(c, spec) +} + +// UpdateRecord Agent 更新备份记录(进度/完成状态/日志)。 +func (h *AgentHandler) UpdateRecord(c *gin.Context) { + node, err := h.agentService.AuthenticatedNode(c.Request.Context(), extractToken(c)) + if err != nil { + response.Error(c, err) + return + } + id, err := strconv.ParseUint(c.Param("id"), 10, 32) + if err != nil { + response.Error(c, err) + return + } + var input service.AgentRecordUpdate + if err := c.ShouldBindJSON(&input); err != nil { + c.JSON(stdhttp.StatusBadRequest, gin.H{"code": "INVALID_INPUT", "message": err.Error()}) + return + } + if err := h.agentService.UpdateRecord(c.Request.Context(), node, uint(id), input); err != nil { + response.Error(c, err) + return + } + response.Success(c, gin.H{"status": "ok"}) +} diff --git a/server/internal/http/router.go b/server/internal/http/router.go index 21b55fa..47445ed 100644 --- a/server/internal/http/router.go +++ b/server/internal/http/router.go @@ -28,6 +28,7 @@ type RouterDependencies struct { DashboardService *service.DashboardService SettingsService *service.SettingsService NodeService *service.NodeService + AgentService *service.AgentService DatabaseDiscoveryService *service.DatabaseDiscoveryService AuditService *service.AuditService JWTManager *security.JWTManager @@ -150,8 +151,19 @@ func NewRouter(deps RouterDependencies) *gin.Engine { nodes.DELETE("/:id", nodeHandler.Delete) nodes.GET("/:id/fs/list", nodeHandler.ListDirectory) - // Agent heartbeat (public, token-authenticated) - api.POST("/agent/heartbeat", nodeHandler.Heartbeat) + // Agent API(token 认证,无需 JWT) + if deps.AgentService != nil { + agentHandler := NewAgentHandler(deps.AgentService, deps.NodeService) + agent := api.Group("/agent") + agent.POST("/heartbeat", agentHandler.Heartbeat) + agent.POST("/commands/poll", agentHandler.Poll) + agent.POST("/commands/:id/result", agentHandler.SubmitCommandResult) + agent.GET("/tasks/:id", agentHandler.GetTaskSpec) + agent.POST("/records/:id", agentHandler.UpdateRecord) + } else { + // 未启用 Agent 服务时,保留原有 heartbeat 端点以兼容 + api.POST("/agent/heartbeat", nodeHandler.Heartbeat) + } } engine.NoRoute(func(c *gin.Context) { diff --git a/server/internal/model/agent_command.go b/server/internal/model/agent_command.go new file mode 100644 index 0000000..078d6b6 --- /dev/null +++ b/server/internal/model/agent_command.go @@ -0,0 +1,44 @@ +package model + +import "time" + +// AgentCommand 状态常量 +const ( + AgentCommandStatusPending = "pending" // 待 Agent 拉取 + AgentCommandStatusDispatched = "dispatched" // Agent 已领取,正在执行 + AgentCommandStatusSucceeded = "succeeded" // 执行成功 + AgentCommandStatusFailed = "failed" // 执行失败 + AgentCommandStatusTimeout = "timeout" // 超时未完成 +) + +// AgentCommand 类型常量 +const ( + // AgentCommandTypeRunTask 运行指定备份任务 + // Payload: {"taskId": 123, "recordId": 456} + AgentCommandTypeRunTask = "run_task" + // AgentCommandTypeListDir 远程列目录(用于文件备份时的目录浏览器) + // Payload: {"path": "/var/log"} + // Result: {"entries": [{"name":"...", "path":"...", "isDir":true, "size":0}]} + AgentCommandTypeListDir = "list_dir" +) + +// AgentCommand 代表 Master 发给某个 Agent 节点的待执行命令。 +// 使用简单的数据库队列实现:Agent 通过 token 长轮询拉取本节点 pending 命令, +// 执行后回写状态与结果。Master 侧通过定时检查把超时的命令标记为 timeout。 +type AgentCommand struct { + ID uint `gorm:"primaryKey" json:"id"` + NodeID uint `gorm:"column:node_id;index;not null" json:"nodeId"` + Type string `gorm:"size:32;index;not null" json:"type"` + Status string `gorm:"size:20;index;not null;default:'pending'" json:"status"` + Payload string `gorm:"type:text" json:"payload"` // JSON + Result string `gorm:"type:text" json:"result"` // JSON(成功结果) + ErrorMessage string `gorm:"column:error_message;type:text" json:"errorMessage"` + DispatchedAt *time.Time `gorm:"column:dispatched_at" json:"dispatchedAt,omitempty"` + CompletedAt *time.Time `gorm:"column:completed_at" json:"completedAt,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +func (AgentCommand) TableName() string { + return "agent_commands" +} diff --git a/server/internal/repository/agent_command_repository.go b/server/internal/repository/agent_command_repository.go new file mode 100644 index 0000000..0e07e51 --- /dev/null +++ b/server/internal/repository/agent_command_repository.go @@ -0,0 +1,101 @@ +package repository + +import ( + "context" + "errors" + "time" + + "backupx/server/internal/model" + "gorm.io/gorm" +) + +// AgentCommandRepository 维护 Agent 命令队列。 +type AgentCommandRepository interface { + Create(ctx context.Context, cmd *model.AgentCommand) error + FindByID(ctx context.Context, id uint) (*model.AgentCommand, error) + // ClaimPending 以原子方式把该节点一条 pending 命令置为 dispatched, + // 并返回领取到的命令。无命令时返回 (nil, nil)。 + ClaimPending(ctx context.Context, nodeID uint) (*model.AgentCommand, error) + Update(ctx context.Context, cmd *model.AgentCommand) error + // MarkStaleTimeout 把 dispatched 状态但超时未完成的命令标记为 timeout。 + MarkStaleTimeout(ctx context.Context, threshold time.Time) (int64, error) +} + +type GormAgentCommandRepository struct { + db *gorm.DB +} + +func NewAgentCommandRepository(db *gorm.DB) *GormAgentCommandRepository { + return &GormAgentCommandRepository{db: db} +} + +func (r *GormAgentCommandRepository) Create(ctx context.Context, cmd *model.AgentCommand) error { + return r.db.WithContext(ctx).Create(cmd).Error +} + +func (r *GormAgentCommandRepository) FindByID(ctx context.Context, id uint) (*model.AgentCommand, error) { + var item model.AgentCommand + if err := r.db.WithContext(ctx).First(&item, id).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, err + } + return &item, nil +} + +// ClaimPending 使用 UPDATE...WHERE id=(SELECT...) 的两步方式实现原子领取。 +// SQLite 不支持 SELECT FOR UPDATE,这里用事务 + 乐观锁。 +func (r *GormAgentCommandRepository) ClaimPending(ctx context.Context, nodeID uint) (*model.AgentCommand, error) { + var claimed *model.AgentCommand + err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var item model.AgentCommand + err := tx.Where("node_id = ? AND status = ?", nodeID, model.AgentCommandStatusPending). + Order("id asc").First(&item).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + } + now := time.Now().UTC() + result := tx.Model(&model.AgentCommand{}). + Where("id = ? AND status = ?", item.ID, model.AgentCommandStatusPending). + Updates(map[string]any{ + "status": model.AgentCommandStatusDispatched, + "dispatched_at": &now, + }) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + // 被其它 worker 抢占,放弃 + return nil + } + item.Status = model.AgentCommandStatusDispatched + item.DispatchedAt = &now + claimed = &item + return nil + }) + if err != nil { + return nil, err + } + return claimed, nil +} + +func (r *GormAgentCommandRepository) Update(ctx context.Context, cmd *model.AgentCommand) error { + return r.db.WithContext(ctx).Save(cmd).Error +} + +func (r *GormAgentCommandRepository) MarkStaleTimeout(ctx context.Context, threshold time.Time) (int64, error) { + result := r.db.WithContext(ctx).Model(&model.AgentCommand{}). + Where("status = ? AND dispatched_at < ?", model.AgentCommandStatusDispatched, threshold). + Updates(map[string]any{ + "status": model.AgentCommandStatusTimeout, + "error_message": "agent did not report result before timeout", + }) + if result.Error != nil { + return 0, result.Error + } + return result.RowsAffected, nil +} diff --git a/server/internal/repository/agent_command_repository_test.go b/server/internal/repository/agent_command_repository_test.go new file mode 100644 index 0000000..7d53e8e --- /dev/null +++ b/server/internal/repository/agent_command_repository_test.go @@ -0,0 +1,120 @@ +package repository + +import ( + "context" + "testing" + "time" + + "backupx/server/internal/model" + "github.com/glebarez/sqlite" + "gorm.io/gorm" + gormlogger "gorm.io/gorm/logger" +) + +func newTestDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{Logger: gormlogger.Default.LogMode(gormlogger.Silent)}) + if err != nil { + t.Fatalf("open: %v", err) + } + if err := db.AutoMigrate(&model.AgentCommand{}); err != nil { + t.Fatalf("migrate: %v", err) + } + return db +} + +func TestAgentCommandRepository_ClaimPending(t *testing.T) { + db := newTestDB(t) + repo := NewAgentCommandRepository(db) + ctx := context.Background() + + // 插入两条 pending 命令 + c1 := &model.AgentCommand{NodeID: 5, Type: "run_task", Status: model.AgentCommandStatusPending, Payload: `{"taskId":1}`} + c2 := &model.AgentCommand{NodeID: 5, Type: "list_dir", Status: model.AgentCommandStatusPending, Payload: `{"path":"/"}`} + c3 := &model.AgentCommand{NodeID: 99, Type: "run_task", Status: model.AgentCommandStatusPending} + for _, c := range []*model.AgentCommand{c1, c2, c3} { + if err := repo.Create(ctx, c); err != nil { + t.Fatal(err) + } + } + + // 第一次 Claim 应拿到 c1 + claimed, err := repo.ClaimPending(ctx, 5) + if err != nil { + t.Fatalf("claim: %v", err) + } + if claimed == nil || claimed.ID != c1.ID || claimed.Status != model.AgentCommandStatusDispatched { + t.Fatalf("expected c1 dispatched: %+v", claimed) + } + + // 第二次应拿到 c2 + claimed2, err := repo.ClaimPending(ctx, 5) + if err != nil || claimed2 == nil || claimed2.ID != c2.ID { + t.Fatalf("expected c2: %+v %v", claimed2, err) + } + + // 第三次无 pending,返回 nil + claimed3, err := repo.ClaimPending(ctx, 5) + if err != nil || claimed3 != nil { + t.Fatalf("expected nil, got %+v", claimed3) + } + + // 不同 node 的命令不应被抢到 + other, err := repo.ClaimPending(ctx, 5) + if err != nil || other != nil { + t.Fatalf("expected nil: %+v", other) + } +} + +func TestAgentCommandRepository_Update(t *testing.T) { + db := newTestDB(t) + repo := NewAgentCommandRepository(db) + ctx := context.Background() + cmd := &model.AgentCommand{NodeID: 1, Type: "run_task", Status: model.AgentCommandStatusPending} + _ = repo.Create(ctx, cmd) + + cmd.Status = model.AgentCommandStatusSucceeded + cmd.Result = `{"ok":true}` + now := time.Now().UTC() + cmd.CompletedAt = &now + if err := repo.Update(ctx, cmd); err != nil { + t.Fatal(err) + } + + got, err := repo.FindByID(ctx, cmd.ID) + if err != nil || got == nil { + t.Fatal(err) + } + if got.Status != model.AgentCommandStatusSucceeded || got.Result != `{"ok":true}` { + t.Errorf("mismatch: %+v", got) + } +} + +func TestAgentCommandRepository_MarkStaleTimeout(t *testing.T) { + db := newTestDB(t) + repo := NewAgentCommandRepository(db) + ctx := context.Background() + old := time.Now().Add(-time.Hour) + recent := time.Now() + // 两条 dispatched:一条旧、一条新 + oldCmd := &model.AgentCommand{NodeID: 1, Type: "run_task", Status: model.AgentCommandStatusDispatched, DispatchedAt: &old} + newCmd := &model.AgentCommand{NodeID: 1, Type: "run_task", Status: model.AgentCommandStatusDispatched, DispatchedAt: &recent} + _ = repo.Create(ctx, oldCmd) + _ = repo.Create(ctx, newCmd) + + n, err := repo.MarkStaleTimeout(ctx, time.Now().Add(-30*time.Minute)) + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Errorf("expected 1 row, got %d", n) + } + oldGot, _ := repo.FindByID(ctx, oldCmd.ID) + newGot, _ := repo.FindByID(ctx, newCmd.ID) + if oldGot.Status != model.AgentCommandStatusTimeout { + t.Errorf("old should be timeout: %+v", oldGot) + } + if newGot.Status != model.AgentCommandStatusDispatched { + t.Errorf("new should stay dispatched: %+v", newGot) + } +} diff --git a/server/internal/repository/backup_task_repository.go b/server/internal/repository/backup_task_repository.go index 2d3467b..2f01d13 100644 --- a/server/internal/repository/backup_task_repository.go +++ b/server/internal/repository/backup_task_repository.go @@ -21,6 +21,8 @@ type BackupTaskRepository interface { Count(context.Context) (int64, error) CountEnabled(context.Context) (int64, error) CountByStorageTargetID(context.Context, uint) (int64, error) + CountByNodeID(context.Context, uint) (int64, error) + ListByNodeID(context.Context, uint) ([]model.BackupTask, error) Create(context.Context, *model.BackupTask) error Update(context.Context, *model.BackupTask) error Delete(context.Context, uint) error @@ -103,6 +105,24 @@ func (r *GormBackupTaskRepository) CountByStorageTargetID(ctx context.Context, s return count, nil } +// CountByNodeID 统计绑定到指定节点的任务数。用于删除节点前的引用检查。 +func (r *GormBackupTaskRepository) CountByNodeID(ctx context.Context, nodeID uint) (int64, error) { + var count int64 + if err := r.db.WithContext(ctx).Model(&model.BackupTask{}).Where("node_id = ?", nodeID).Count(&count).Error; err != nil { + return 0, err + } + return count, nil +} + +// ListByNodeID 列出绑定到指定节点的任务。用于 Agent 拉取本节点待执行任务。 +func (r *GormBackupTaskRepository) ListByNodeID(ctx context.Context, nodeID uint) ([]model.BackupTask, error) { + var items []model.BackupTask + if err := r.db.WithContext(ctx).Preload("StorageTarget").Preload("StorageTargets").Where("node_id = ?", nodeID).Order("id asc").Find(&items).Error; err != nil { + return nil, err + } + return items, nil +} + func (r *GormBackupTaskRepository) Create(ctx context.Context, item *model.BackupTask) error { if err := r.db.WithContext(ctx).Create(item).Error; err != nil { return err diff --git a/server/internal/repository/node_repository.go b/server/internal/repository/node_repository.go index 87ec957..b497b26 100644 --- a/server/internal/repository/node_repository.go +++ b/server/internal/repository/node_repository.go @@ -3,6 +3,7 @@ package repository import ( "context" "errors" + "time" "backupx/server/internal/model" "gorm.io/gorm" @@ -16,6 +17,7 @@ type NodeRepository interface { Create(context.Context, *model.Node) error Update(context.Context, *model.Node) error Delete(context.Context, uint) error + MarkStaleOffline(ctx context.Context, threshold time.Time) (int64, error) } type GormNodeRepository struct { @@ -78,3 +80,16 @@ func (r *GormNodeRepository) Update(ctx context.Context, item *model.Node) error func (r *GormNodeRepository) Delete(ctx context.Context, id uint) error { return r.db.WithContext(ctx).Delete(&model.Node{}, id).Error } + +// MarkStaleOffline 把最近心跳早于 threshold 的在线远程节点标记为离线。 +// 本机节点 (is_local=true) 不受影响,由主程序自己维护 online 状态。 +// 返回受影响行数。 +func (r *GormNodeRepository) MarkStaleOffline(ctx context.Context, threshold time.Time) (int64, error) { + result := r.db.WithContext(ctx).Model(&model.Node{}). + Where("is_local = ? AND status = ? AND last_seen < ?", false, model.NodeStatusOnline, threshold). + Update("status", model.NodeStatusOffline) + if result.Error != nil { + return 0, result.Error + } + return result.RowsAffected, nil +} diff --git a/server/internal/scheduler/service_test.go b/server/internal/scheduler/service_test.go index 074bb97..66b1b4e 100644 --- a/server/internal/scheduler/service_test.go +++ b/server/internal/scheduler/service_test.go @@ -31,6 +31,12 @@ func (r *fakeTaskRepository) CountEnabled(context.Context) (int64, error) { retu func (r *fakeTaskRepository) CountByStorageTargetID(context.Context, uint) (int64, error) { return 0, nil } +func (r *fakeTaskRepository) CountByNodeID(context.Context, uint) (int64, error) { + return 0, nil +} +func (r *fakeTaskRepository) ListByNodeID(context.Context, uint) ([]model.BackupTask, error) { + return nil, nil +} func (r *fakeTaskRepository) Create(context.Context, *model.BackupTask) error { return nil } func (r *fakeTaskRepository) Update(context.Context, *model.BackupTask) error { return nil } func (r *fakeTaskRepository) Delete(context.Context, uint) error { return nil } diff --git a/server/internal/service/agent_service.go b/server/internal/service/agent_service.go new file mode 100644 index 0000000..17e74b5 --- /dev/null +++ b/server/internal/service/agent_service.go @@ -0,0 +1,348 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "backupx/server/internal/apperror" + "backupx/server/internal/model" + "backupx/server/internal/repository" + "backupx/server/internal/storage/codec" +) + +// AgentService 实现 Master 端 Agent 协议,提供给远程 Agent 通过 HTTP 调用。 +// 所有方法使用 Agent Token 进行节点认证,避免暴露 JWT 给 Agent。 +type AgentService struct { + nodeRepo repository.NodeRepository + taskRepo repository.BackupTaskRepository + recordRepo repository.BackupRecordRepository + storageRepo repository.StorageTargetRepository + cmdRepo repository.AgentCommandRepository + cipher *codec.ConfigCipher +} + +func NewAgentService( + nodeRepo repository.NodeRepository, + taskRepo repository.BackupTaskRepository, + recordRepo repository.BackupRecordRepository, + storageRepo repository.StorageTargetRepository, + cmdRepo repository.AgentCommandRepository, + cipher *codec.ConfigCipher, +) *AgentService { + return &AgentService{ + nodeRepo: nodeRepo, + taskRepo: taskRepo, + recordRepo: recordRepo, + storageRepo: storageRepo, + cmdRepo: cmdRepo, + cipher: cipher, + } +} + +// AuthenticatedNode 通过 token 解析并返回节点。失败返回 401。 +func (s *AgentService) AuthenticatedNode(ctx context.Context, token string) (*model.Node, error) { + if strings.TrimSpace(token) == "" { + return nil, apperror.Unauthorized("NODE_INVALID_TOKEN", "缺少认证令牌", nil) + } + node, err := s.nodeRepo.FindByToken(ctx, token) + if err != nil { + return nil, err + } + if node == nil { + return nil, apperror.Unauthorized("NODE_INVALID_TOKEN", "无效的节点认证令牌", nil) + } + return node, nil +} + +// AgentCommandPayload 给 Agent 返回的命令描述 +type AgentCommandPayload struct { + ID uint `json:"id"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +// PollCommand 为指定节点拉取一条 pending 命令;无命令时返回 (nil, nil)。 +func (s *AgentService) PollCommand(ctx context.Context, node *model.Node) (*AgentCommandPayload, error) { + cmd, err := s.cmdRepo.ClaimPending(ctx, node.ID) + if err != nil { + return nil, err + } + if cmd == nil { + return nil, nil + } + return &AgentCommandPayload{ + ID: cmd.ID, + Type: cmd.Type, + Payload: json.RawMessage(cmd.Payload), + }, nil +} + +// AgentCommandResult Agent 上报命令执行结果 +type AgentCommandResult struct { + Success bool `json:"success"` + ErrorMessage string `json:"errorMessage,omitempty"` + Result json.RawMessage `json:"result,omitempty"` +} + +// SubmitCommandResult 接收 Agent 上报的命令结果。 +func (s *AgentService) SubmitCommandResult(ctx context.Context, node *model.Node, cmdID uint, result AgentCommandResult) error { + cmd, err := s.cmdRepo.FindByID(ctx, cmdID) + if err != nil { + return err + } + if cmd == nil { + return apperror.New(404, "AGENT_COMMAND_NOT_FOUND", "命令不存在", fmt.Errorf("command %d not found", cmdID)) + } + if cmd.NodeID != node.ID { + return apperror.Unauthorized("AGENT_COMMAND_FORBIDDEN", "命令不属于当前节点", nil) + } + now := time.Now().UTC() + if result.Success { + cmd.Status = model.AgentCommandStatusSucceeded + } else { + cmd.Status = model.AgentCommandStatusFailed + } + cmd.ErrorMessage = result.ErrorMessage + if len(result.Result) > 0 { + cmd.Result = string(result.Result) + } + cmd.CompletedAt = &now + return s.cmdRepo.Update(ctx, cmd) +} + +// AgentTaskSpec 给 Agent 返回的任务规格,包含解密后的存储配置,供 Agent 直接执行。 +// 敏感信息:此接口仅供 Agent 调用(token 认证),避免通过公共 API 泄露。 +type AgentTaskSpec struct { + TaskID uint `json:"taskId"` + Name string `json:"name"` + Type string `json:"type"` + SourcePath string `json:"sourcePath,omitempty"` + SourcePaths string `json:"sourcePaths,omitempty"` + ExcludePatterns string `json:"excludePatterns,omitempty"` + DBHost string `json:"dbHost,omitempty"` + DBPort int `json:"dbPort,omitempty"` + DBUser string `json:"dbUser,omitempty"` + DBPassword string `json:"dbPassword,omitempty"` + DBName string `json:"dbName,omitempty"` + DBPath string `json:"dbPath,omitempty"` + ExtraConfig string `json:"extraConfig,omitempty"` + Compression string `json:"compression"` + Encrypt bool `json:"encrypt"` + StorageTargets []AgentStorageTargetConfig `json:"storageTargets"` +} + +// AgentStorageTargetConfig 存储目标配置(已解密) +type AgentStorageTargetConfig struct { + ID uint `json:"id"` + Type string `json:"type"` + Name string `json:"name"` + Config json.RawMessage `json:"config"` +} + +// GetTaskSpec 返回 Agent 执行任务所需的完整规格。 +func (s *AgentService) GetTaskSpec(ctx context.Context, node *model.Node, taskID uint) (*AgentTaskSpec, error) { + task, err := s.taskRepo.FindByID(ctx, taskID) + if err != nil { + return nil, err + } + if task == nil { + return nil, apperror.New(404, "BACKUP_TASK_NOT_FOUND", "任务不存在", nil) + } + if task.NodeID != node.ID { + return nil, apperror.Unauthorized("BACKUP_TASK_FORBIDDEN", "任务不属于当前节点", nil) + } + // 解密数据库密码(若有) + dbPassword := "" + if task.DBPasswordCiphertext != "" { + plain, decErr := s.cipher.Decrypt(task.DBPasswordCiphertext) + if decErr != nil { + return nil, fmt.Errorf("decrypt db password: %w", decErr) + } + dbPassword = string(plain) + } + // 解密存储目标配置 + targets := collectTargetIDs(task) + storageTargets := make([]AgentStorageTargetConfig, 0, len(targets)) + for _, tid := range targets { + target, err := s.storageRepo.FindByID(ctx, tid) + if err != nil { + return nil, err + } + if target == nil { + continue + } + configRaw, err := s.cipher.Decrypt(target.ConfigCiphertext) + if err != nil { + return nil, fmt.Errorf("decrypt storage config: %w", err) + } + storageTargets = append(storageTargets, AgentStorageTargetConfig{ + ID: target.ID, + Type: target.Type, + Name: target.Name, + Config: json.RawMessage(configRaw), + }) + } + return &AgentTaskSpec{ + TaskID: task.ID, + Name: task.Name, + Type: task.Type, + SourcePath: task.SourcePath, + SourcePaths: task.SourcePaths, + ExcludePatterns: task.ExcludePatterns, + DBHost: task.DBHost, + DBPort: task.DBPort, + DBUser: task.DBUser, + DBPassword: dbPassword, + DBName: task.DBName, + DBPath: task.DBPath, + ExtraConfig: task.ExtraConfig, + Compression: task.Compression, + Encrypt: task.Encrypt, + StorageTargets: storageTargets, + }, nil +} + +// AgentRecordUpdate Agent 上报备份记录的最终状态。 +type AgentRecordUpdate struct { + Status string `json:"status"` // running | success | failed + FileName string `json:"fileName,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` + Checksum string `json:"checksum,omitempty"` + StoragePath string `json:"storagePath,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + LogAppend string `json:"logAppend,omitempty"` // 增量日志,追加到 record.log_content +} + +// UpdateRecord 更新备份记录的状态/日志。Agent 在执行过程中可多次调用。 +func (s *AgentService) UpdateRecord(ctx context.Context, node *model.Node, recordID uint, update AgentRecordUpdate) error { + record, err := s.recordRepo.FindByID(ctx, recordID) + if err != nil { + return err + } + if record == nil { + return apperror.New(404, "BACKUP_RECORD_NOT_FOUND", "记录不存在", nil) + } + // 通过 task.NodeID 判断是否属于当前 agent + task, err := s.taskRepo.FindByID(ctx, record.TaskID) + if err != nil { + return err + } + if task == nil || task.NodeID != node.ID { + return apperror.Unauthorized("BACKUP_RECORD_FORBIDDEN", "记录不属于当前节点", nil) + } + if update.Status != "" { + record.Status = update.Status + } + if update.FileName != "" { + record.FileName = update.FileName + } + if update.FileSize > 0 { + record.FileSize = update.FileSize + } + if update.Checksum != "" { + record.Checksum = update.Checksum + } + if update.StoragePath != "" { + record.StoragePath = update.StoragePath + } + if update.ErrorMessage != "" { + record.ErrorMessage = update.ErrorMessage + } + if update.LogAppend != "" { + if record.LogContent == "" { + record.LogContent = update.LogAppend + } else { + record.LogContent += update.LogAppend + } + } + if update.Status == model.BackupRecordStatusSuccess || update.Status == model.BackupRecordStatusFailed { + now := time.Now().UTC() + record.CompletedAt = &now + record.DurationSeconds = int(now.Sub(record.StartedAt).Seconds()) + } + if err := s.recordRepo.Update(ctx, record); err != nil { + return err + } + // 同步更新任务的 last_status + if update.Status == model.BackupRecordStatusSuccess || update.Status == model.BackupRecordStatusFailed { + task.LastStatus = update.Status + _ = s.taskRepo.Update(ctx, task) + } + return nil +} + +// EnqueueCommand Master 端调用:给指定节点插入一条待执行命令。 +// 返回命令 ID。 +func (s *AgentService) EnqueueCommand(ctx context.Context, nodeID uint, cmdType string, payload any) (uint, error) { + if nodeID == 0 { + return 0, errors.New("nodeID is required") + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return 0, fmt.Errorf("marshal payload: %w", err) + } + cmd := &model.AgentCommand{ + NodeID: nodeID, + Type: cmdType, + Status: model.AgentCommandStatusPending, + Payload: string(payloadBytes), + } + if err := s.cmdRepo.Create(ctx, cmd); err != nil { + return 0, err + } + return cmd.ID, nil +} + +// WaitForCommandResult 同步等待指定命令完成(用于 list_dir 这类 RPC 式调用)。 +// timeout 为 0 表示不限,建议传 10~30s。 +func (s *AgentService) WaitForCommandResult(ctx context.Context, cmdID uint, timeout time.Duration) (*model.AgentCommand, error) { + deadline := time.Now().Add(timeout) + for { + cmd, err := s.cmdRepo.FindByID(ctx, cmdID) + if err != nil { + return nil, err + } + if cmd == nil { + return nil, apperror.New(404, "AGENT_COMMAND_NOT_FOUND", "命令不存在", nil) + } + switch cmd.Status { + case model.AgentCommandStatusSucceeded, model.AgentCommandStatusFailed, model.AgentCommandStatusTimeout: + return cmd, nil + } + if timeout > 0 && time.Now().After(deadline) { + return nil, apperror.New(504, "AGENT_COMMAND_TIMEOUT", "等待 Agent 响应超时", nil) + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(300 * time.Millisecond): + } + } +} + +// StartCommandTimeoutMonitor 启动后台定时任务,把超时命令标记为 timeout。 +func (s *AgentService) StartCommandTimeoutMonitor(ctx context.Context, interval time.Duration, timeout time.Duration) { + if interval <= 0 { + interval = 30 * time.Second + } + if timeout <= 0 { + timeout = 10 * time.Minute + } + ticker := time.NewTicker(interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + threshold := time.Now().UTC().Add(-timeout) + _, _ = s.cmdRepo.MarkStaleTimeout(ctx, threshold) + } + } + }() +} diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index 7dc323c..b1e5925 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -75,12 +75,14 @@ type BackupExecutionService struct { tasks repository.BackupTaskRepository records repository.BackupRecordRepository targets repository.StorageTargetRepository + nodeRepo repository.NodeRepository storageRegistry *storage.Registry runnerRegistry *backup.Registry logHub *backup.LogHub retention *backupretention.Service cipher *codec.ConfigCipher notifier BackupResultNotifier + agentDispatcher AgentDispatcher async func(func()) now func() time.Time tempDir string @@ -89,6 +91,18 @@ type BackupExecutionService struct { bandwidthLimit string // rclone 带宽限制 } +// AgentDispatcher 抽象把任务下发给 Agent 的能力,由 AgentService 实现。 +// 用接口避免 execution service ↔ agent service 的循环依赖风险。 +type AgentDispatcher interface { + EnqueueCommand(ctx context.Context, nodeID uint, cmdType string, payload any) (uint, error) +} + +// SetClusterDependencies 注入集群相关的依赖,使备份执行时可把任务路由到远程节点。 +func (s *BackupExecutionService) SetClusterDependencies(nodeRepo repository.NodeRepository, dispatcher AgentDispatcher) { + s.nodeRepo = nodeRepo + s.agentDispatcher = dispatcher +} + func NewBackupExecutionService( tasks repository.BackupTaskRepository, records repository.BackupRecordRepository, @@ -243,6 +257,20 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b if err := s.tasks.Update(ctx, task); err != nil { return nil, apperror.Internal("BACKUP_TASK_UPDATE_FAILED", "无法更新任务状态", err) } + // 多节点路由:task.NodeID 指向远程节点时,把执行任务入队给 Agent; + // NodeID=0 或本机节点时由 Master 直接执行。 + if s.isRemoteNode(ctx, task.NodeID) { + if _, enqueueErr := s.agentDispatcher.EnqueueCommand(ctx, task.NodeID, model.AgentCommandTypeRunTask, map[string]any{ + "taskId": task.ID, + "recordId": record.ID, + }); enqueueErr != nil { + // 入队失败 → 在记录中标记失败,继续返回详情 + _ = s.finalizeRecord(ctx, task, record.ID, startedAt, model.BackupRecordStatusFailed, + "无法下发任务到远程节点: "+enqueueErr.Error(), "", "", 0, "", "") + return nil, apperror.Internal("AGENT_COMMAND_ENQUEUE_FAILED", "无法下发任务到远程节点", enqueueErr) + } + return s.getRecordDetail(ctx, record.ID) + } run := func() { s.executeTask(context.Background(), task, record.ID, startedAt) } @@ -254,6 +282,19 @@ func (s *BackupExecutionService) startTask(ctx context.Context, id uint, async b return s.getRecordDetail(ctx, record.ID) } +// isRemoteNode 判断 NodeID 是否指向一个有效的远程(非本机)节点。 +// 当未注入集群依赖、nodeID 为 0、或节点为本机时,均返回 false(走本地执行)。 +func (s *BackupExecutionService) isRemoteNode(ctx context.Context, nodeID uint) bool { + if s.nodeRepo == nil || s.agentDispatcher == nil || nodeID == 0 { + return false + } + node, err := s.nodeRepo.FindByID(ctx, nodeID) + if err != nil || node == nil { + return false + } + return !node.IsLocal +} + func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.BackupTask, recordID uint, startedAt time.Time) { s.semaphore <- struct{}{} defer func() { <-s.semaphore }() diff --git a/server/internal/service/node_service.go b/server/internal/service/node_service.go index 260d45a..6dee41a 100644 --- a/server/internal/service/node_service.go +++ b/server/internal/service/node_service.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "encoding/json" "fmt" "net" "net/http" @@ -46,14 +47,33 @@ type NodeUpdateInput struct { // NodeService manages the cluster nodes. type NodeService struct { - repo repository.NodeRepository - version string + repo repository.NodeRepository + taskRepo repository.BackupTaskRepository + agentRPC NodeAgentRPC + version string +} + +// NodeAgentRPC 抽象 Agent 远程调用能力(避免 service 内循环依赖)。 +// 由 AgentService 实现;当 Agent 未启用时可不注入,远程目录浏览返回提示。 +type NodeAgentRPC interface { + EnqueueCommand(ctx context.Context, nodeID uint, cmdType string, payload any) (uint, error) + WaitForCommandResult(ctx context.Context, cmdID uint, timeout time.Duration) (*model.AgentCommand, error) } func NewNodeService(repo repository.NodeRepository, version string) *NodeService { return &NodeService{repo: repo, version: version} } +// SetTaskRepository 注入任务仓储以支持删除前引用检查。可选注入,便于测试。 +func (s *NodeService) SetTaskRepository(taskRepo repository.BackupTaskRepository) { + s.taskRepo = taskRepo +} + +// SetAgentRPC 注入 Agent RPC 能力,启用远程目录浏览。 +func (s *NodeService) SetAgentRPC(rpc NodeAgentRPC) { + s.agentRPC = rpc +} + // EnsureLocalNode creates the default "local" node if it does not exist. func (s *NodeService) EnsureLocalNode(ctx context.Context) error { existing, err := s.repo.FindLocal(ctx) @@ -165,6 +185,20 @@ func (s *NodeService) Delete(ctx context.Context, id uint) error { if node.IsLocal { return apperror.BadRequest("NODE_DELETE_LOCAL", "无法删除本机节点", nil) } + // 删除前检查是否有关联备份任务,避免孤立任务 + if s.taskRepo != nil { + count, err := s.taskRepo.CountByNodeID(ctx, id) + if err != nil { + return err + } + if count > 0 { + return apperror.BadRequest( + "NODE_HAS_TASKS", + fmt.Sprintf("无法删除:该节点上还有 %d 个备份任务,请先删除或迁移", count), + nil, + ) + } + } return s.repo.Delete(ctx, id) } @@ -178,7 +212,8 @@ func (s *NodeService) ListDirectory(ctx context.Context, nodeID uint, path strin return nil, apperror.New(http.StatusNotFound, "NODE_NOT_FOUND", "节点不存在", nil) } if !node.IsLocal { - return nil, apperror.BadRequest("NODE_REMOTE_FS_NOT_SUPPORTED", "远程节点的目录浏览需要 Agent 在线连接(即将支持)", nil) + // 远程节点:通过 Agent 命令队列做同步 RPC + return s.remoteListDirectory(ctx, node, path) } cleanPath := filepath.Clean(path) @@ -210,6 +245,31 @@ func (s *NodeService) ListDirectory(ctx context.Context, nodeID uint, path strin return result, nil } +// OfflineThreshold 节点被判定为离线的心跳超时阈值。 +// Agent 默认 15s 心跳一次;45s 未见视为离线,预留 3 次重试空间。 +const OfflineThreshold = 45 * time.Second + +// StartOfflineMonitor 启动后台 goroutine,定期把超时未心跳的节点标记为离线。 +// 传入的 ctx 被取消后退出。 +func (s *NodeService) StartOfflineMonitor(ctx context.Context, interval time.Duration) { + if interval <= 0 { + interval = 15 * time.Second + } + ticker := time.NewTicker(interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + threshold := time.Now().UTC().Add(-OfflineThreshold) + _, _ = s.repo.MarkStaleOffline(ctx, threshold) + } + } + }() +} + // Heartbeat updates the node status when an agent reports in. func (s *NodeService) Heartbeat(ctx context.Context, token string, hostname string, ip string, agentVer string, osName string, archName string) error { node, err := s.repo.FindByToken(ctx, token) @@ -261,6 +321,42 @@ type DirEntry struct { Size int64 `json:"size"` } +// remoteListDirectory 通过命令队列下发 list_dir 给 Agent 并同步等待结果。 +// Agent 必须在线,且响应需在 15s 内返回,否则返回超时错误。 +func (s *NodeService) remoteListDirectory(ctx context.Context, node *model.Node, path string) ([]DirEntry, error) { + if s.agentRPC == nil { + return nil, apperror.BadRequest("NODE_REMOTE_FS_NOT_SUPPORTED", "远程目录浏览未启用,需要 Master 启用 Agent 服务", nil) + } + if node.Status != model.NodeStatusOnline { + return nil, apperror.BadRequest("NODE_OFFLINE", "节点当前不在线,无法浏览其目录", nil) + } + if strings.TrimSpace(path) == "" { + path = "/" + } + cmdID, err := s.agentRPC.EnqueueCommand(ctx, node.ID, model.AgentCommandTypeListDir, map[string]any{"path": path}) + if err != nil { + return nil, apperror.Internal("AGENT_COMMAND_ENQUEUE_FAILED", "下发目录浏览命令失败", err) + } + cmd, err := s.agentRPC.WaitForCommandResult(ctx, cmdID, 15*time.Second) + if err != nil { + return nil, err + } + if cmd.Status != model.AgentCommandStatusSucceeded { + msg := cmd.ErrorMessage + if msg == "" { + msg = fmt.Sprintf("command status: %s", cmd.Status) + } + return nil, apperror.BadRequest("NODE_FS_READ_ERROR", fmt.Sprintf("远程目录浏览失败: %s", msg), nil) + } + var result struct { + Entries []DirEntry `json:"entries"` + } + if err := json.Unmarshal([]byte(cmd.Result), &result); err != nil { + return nil, apperror.Internal("AGENT_RESULT_INVALID", "Agent 返回结果格式错误", err) + } + return result.Entries, nil +} + // detectLocalIP 获取本机第一个非回环 IPv4 地址。 func detectLocalIP() string { addrs, err := net.InterfaceAddrs() diff --git a/web/src/pages/nodes/NodesPage.tsx b/web/src/pages/nodes/NodesPage.tsx index 1fbe828..44e6688 100644 --- a/web/src/pages/nodes/NodesPage.tsx +++ b/web/src/pages/nodes/NodesPage.tsx @@ -177,6 +177,7 @@ export default function NodesPage() { title="添加远程节点" visible={createVisible} onCancel={() => setCreateVisible(false)} + style={{ width: 640 }} footer={newToken ? ( ) : undefined} @@ -197,7 +198,24 @@ export default function NodesPage() { ]} />
- 请将此令牌配置到远程服务器的 Agent 启动参数中。令牌仅显示一次,请妥善保存。 + 令牌仅显示一次,请妥善保存。将其配置到远程服务器后,Agent 会自动连接 Master。 + +
+
+ Agent 部署步骤 +
    +
  1. 把 BackupX 二进制上传到目标服务器(与 Master 同一个可执行文件)
  2. +
  3. 通过以下命令启动 Agent(替换 MASTER_URL):
  4. +
+
+ + {`backupx agent --master ${window.location.origin} --token ${newToken}`} + +
+ + 或使用配置文件 / 环境变量:
+ BACKUPX_AGENT_MASTER={window.location.origin}
+ BACKUPX_AGENT_TOKEN={newToken}