mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-05-13 07:00:16 +08:00
基础修复: - 新增节点离线检测:每 15s 扫描,超 45s 未心跳的远程节点自动置离线 - 节点删除前检查关联任务,避免孤立备份任务 - BackupTaskRepository 新增 CountByNodeID/ListByNodeID Master 端 Agent 协议: - 新增 AgentCommand 模型与命令队列仓储(pending/dispatched/succeeded/failed/timeout) - 新增 AgentService:任务下发、命令轮询、结果回收、超时扫描 - 新增专用 Agent HTTP API(X-Agent-Token 认证): /api/agent/heartbeat /api/agent/commands/poll /api/agent/commands/:id/result /api/agent/tasks/:id /api/agent/records/:id - BackupExecutionService 支持 node 路由:task.NodeID 指向远程节点时自动入队派发 Agent CLI(backupx agent 子命令): - 配置:YAML 文件 / 环境变量 / CLI 参数,优先级 CLI > 文件 > 环境 - 心跳循环 + 命令轮询循环 + 优雅退出 - 本地复用 BackupRunner 与 storage registry 执行备份并直接上传 - 支持 run_task 和 list_dir 两种命令 远程目录浏览: - NodeService 支持通过 Agent RPC 列出远程节点目录(15s 超时) 前端: - NodesPage 添加节点后展示 Agent 启动命令和环境变量配置 文档: - README 中英文重写"多节点集群"章节,含架构图、步骤、限制、CLI 参考
204 lines
4.7 KiB
Go
204 lines
4.7 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Agent 是 Agent 进程的主控制器。
|
|
type Agent struct {
|
|
cfg *Config
|
|
client *MasterClient
|
|
executor *Executor
|
|
version string
|
|
|
|
mu sync.Mutex
|
|
started bool
|
|
}
|
|
|
|
// New 构造 Agent。
|
|
func New(cfg *Config, version string) (*Agent, error) {
|
|
if err := cfg.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
client := NewMasterClient(cfg.Master, cfg.Token, cfg.InsecureSkipTLSVerify)
|
|
executor := NewExecutor(client, cfg.TempDir)
|
|
return &Agent{
|
|
cfg: cfg,
|
|
client: client,
|
|
executor: executor,
|
|
version: version,
|
|
}, nil
|
|
}
|
|
|
|
// Run 启动 Agent 主循环,阻塞直到 ctx 被取消。
|
|
func (a *Agent) Run(ctx context.Context) error {
|
|
a.mu.Lock()
|
|
if a.started {
|
|
a.mu.Unlock()
|
|
return fmt.Errorf("agent already started")
|
|
}
|
|
a.started = true
|
|
a.mu.Unlock()
|
|
|
|
hbInterval := parseDuration(a.cfg.HeartbeatInterval, 15*time.Second)
|
|
pollInterval := parseDuration(a.cfg.PollInterval, 5*time.Second)
|
|
|
|
// 首次握手:通过一次心跳确认 token 有效
|
|
if err := a.heartbeatOnce(ctx); err != nil {
|
|
return fmt.Errorf("initial heartbeat failed: %w", err)
|
|
}
|
|
log.Printf("[agent] connected to master %s", a.cfg.Master)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
a.heartbeatLoop(ctx, hbInterval)
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
a.pollLoop(ctx, pollInterval)
|
|
}()
|
|
wg.Wait()
|
|
return ctx.Err()
|
|
}
|
|
|
|
// heartbeatLoop 定期发送心跳。
|
|
func (a *Agent) heartbeatLoop(ctx context.Context, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := a.heartbeatOnce(ctx); err != nil {
|
|
log.Printf("[agent] heartbeat failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) heartbeatOnce(ctx context.Context) error {
|
|
hostname, _ := os.Hostname()
|
|
req := HeartbeatRequest{
|
|
Token: a.cfg.Token,
|
|
Hostname: hostname,
|
|
IPAddress: detectLocalIP(),
|
|
AgentVersion: a.version,
|
|
OS: runtime.GOOS,
|
|
Arch: runtime.GOARCH,
|
|
}
|
|
_, err := a.client.Heartbeat(ctx, req)
|
|
return err
|
|
}
|
|
|
|
// pollLoop 定期拉取并处理待执行命令。
|
|
func (a *Agent) pollLoop(ctx context.Context, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
a.pollAndHandleOnce(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Agent) pollAndHandleOnce(ctx context.Context) {
|
|
cmd, err := a.client.PollCommand(ctx)
|
|
if err != nil {
|
|
log.Printf("[agent] poll command failed: %v", err)
|
|
return
|
|
}
|
|
if cmd == nil {
|
|
return
|
|
}
|
|
log.Printf("[agent] received command #%d type=%s", cmd.ID, cmd.Type)
|
|
switch cmd.Type {
|
|
case "run_task":
|
|
a.handleRunTask(ctx, cmd)
|
|
case "list_dir":
|
|
a.handleListDir(ctx, cmd)
|
|
default:
|
|
msg := fmt.Sprintf("unknown command type: %s", cmd.Type)
|
|
log.Printf("[agent] %s", msg)
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, msg, nil)
|
|
}
|
|
}
|
|
|
|
// handleRunTask 处理 run_task 命令
|
|
func (a *Agent) handleRunTask(ctx context.Context, cmd *CommandPayload) {
|
|
var payload struct {
|
|
TaskID uint `json:"taskId"`
|
|
RecordID uint `json:"recordId"`
|
|
}
|
|
if err := json.Unmarshal(cmd.Payload, &payload); err != nil {
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, "invalid payload: "+err.Error(), nil)
|
|
return
|
|
}
|
|
if err := a.executor.ExecuteRunTask(ctx, payload.TaskID, payload.RecordID); err != nil {
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, err.Error(), nil)
|
|
return
|
|
}
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, true, "", map[string]any{
|
|
"taskId": payload.TaskID,
|
|
"recordId": payload.RecordID,
|
|
})
|
|
}
|
|
|
|
// handleListDir 处理 list_dir 命令(阶段四实现)
|
|
func (a *Agent) handleListDir(ctx context.Context, cmd *CommandPayload) {
|
|
var payload struct {
|
|
Path string `json:"path"`
|
|
}
|
|
if err := json.Unmarshal(cmd.Payload, &payload); err != nil {
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, "invalid payload: "+err.Error(), nil)
|
|
return
|
|
}
|
|
entries, err := listLocalDir(payload.Path)
|
|
if err != nil {
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, false, err.Error(), nil)
|
|
return
|
|
}
|
|
_ = a.client.SubmitCommandResult(ctx, cmd.ID, true, "", map[string]any{"entries": entries})
|
|
}
|
|
|
|
// 辅助函数
|
|
|
|
func parseDuration(s string, fallback time.Duration) time.Duration {
|
|
if strings.TrimSpace(s) == "" {
|
|
return fallback
|
|
}
|
|
if d, err := time.ParseDuration(s); err == nil {
|
|
return d
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func detectLocalIP() string {
|
|
addrs, err := net.InterfaceAddrs()
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
for _, addr := range addrs {
|
|
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
|
|
if ipNet.IP.To4() != nil {
|
|
return ipNet.IP.String()
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|