Files
BackupX/server/internal/http/install_handler.go
Awuqing 34106556be 修复: 后端审查发现的 5 项问题
根据 Spec + Code Quality 双审查修复:

1. BatchCreate 事务保护(node_service.go/node_repository.go)
   原循环 Create 在 DB 约束失败时会残留半截数据。改为预先构造所有 Node
   再走 repo.BatchCreate 单一事务,任一失败整体回滚。

2. Peek 语义与 Consume 对齐(agent_install_token_repository.go)
   FindByToken 无条件返回任意记录,导致已消费/已过期的僵尸 token
   可通过 compose 端点的 mode 检查但必然 Consume 失败,出现 410 假错。
   新增 FindValidByToken,Peek 改用之。

3. MasterURL / AgentToken / AgentVersion 渲染前校验(installscript/renderer.go)
   防止 YAML 注入(换行/引号逃逸 compose 配置)、shell 注入($(...))、
   非法字符。加 TestRenderScriptRejects* 系列测试覆盖。

4. ipLimiter 无界增长修复(install_handler.go)
   新增 gc 方法 + startGC 后台协程,每 window 周期清理过期 IP 条目。
   RouterDependencies.Context 控制生命周期;app 传入 ctx,测试 t.Cleanup 取消。

5. CreateInstallToken 的 CreatedByID 从 JWT subject 解析(node_handler.go)
   原硬编码 0 导致审计不可追溯。新增 resolveCurrentUserID helper,
   借助 UserRepository 把 JWT subject(用户名)→ user.ID;失败退回 0。
2026-04-19 17:14:17 +08:00

222 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package http
import (
"context"
stdhttp "net/http"
"strconv"
"strings"
"sync"
"time"
"backupx/server/internal/installscript"
"backupx/server/internal/model"
"backupx/server/internal/service"
"github.com/gin-gonic/gin"
)
// InstallHandler 公开路由(不走 JWT 中间件):/install/:token 与 /install/:token/compose.yml。
type InstallHandler struct {
tokenService *service.InstallTokenService
auditService *service.AuditService
externalURL string
limiter *ipLimiter
}
// NewInstallHandler 构造 handler 并启动限流器的后台 GC 协程。
// gcCtx 控制 GC 协程生命周期,建议传入 app context。
func NewInstallHandler(gcCtx context.Context, tokenService *service.InstallTokenService, auditService *service.AuditService, externalURL string) *InstallHandler {
limiter := newIPLimiter(20, time.Minute)
limiter.startGC(gcCtx)
return &InstallHandler{
tokenService: tokenService,
auditService: auditService,
externalURL: externalURL,
limiter: limiter,
}
}
// Script 消费 install token 并返回 shell 脚本Mode 由 token 存储决定systemd/docker/foreground 均返回 shell
func (h *InstallHandler) Script(c *gin.Context) {
if !h.limiter.allow(c.ClientIP()) {
c.String(stdhttp.StatusTooManyRequests, "请求过于频繁,请稍后再试\n")
return
}
token := strings.TrimSpace(c.Param("token"))
consumed, err := h.tokenService.Consume(c.Request.Context(), token)
if err != nil {
c.String(stdhttp.StatusInternalServerError, "server error\n")
return
}
if consumed == nil {
c.String(stdhttp.StatusGone, "install token 不存在、已过期或已消费\n")
return
}
h.recordConsumeAudit(c, consumed, "script")
script, err := installscript.RenderScript(installscript.Context{
MasterURL: resolveMasterURL(c, h.externalURL),
AgentToken: consumed.Node.Token,
AgentVersion: consumed.Record.AgentVer,
Mode: consumed.Record.Mode,
Arch: consumed.Record.Arch,
DownloadBase: installscript.DownloadBaseFor(consumed.Record.DownloadSrc),
InstallPrefix: "/opt/backupx-agent",
NodeID: consumed.Node.ID,
})
if err != nil {
c.String(stdhttp.StatusInternalServerError, "render error\n")
return
}
c.Data(stdhttp.StatusOK, "text/x-shellscript; charset=utf-8", []byte(script))
}
// Compose 消费 install token 并返回 docker-compose YAML仅 Mode=docker 有效。
// 注意:/install/:token 与 /install/:token/compose.yml 共享同一 token 的消费状态,任一首次命中即消费。
func (h *InstallHandler) Compose(c *gin.Context) {
if !h.limiter.allow(c.ClientIP()) {
c.String(stdhttp.StatusTooManyRequests, "请求过于频繁,请稍后再试\n")
return
}
token := strings.TrimSpace(c.Param("token"))
// 先 Peek 看 Mode不消费若非 docker 直接 400
record, err := h.tokenService.Peek(c.Request.Context(), token)
if err != nil {
c.String(stdhttp.StatusInternalServerError, "server error\n")
return
}
if record == nil {
c.String(stdhttp.StatusGone, "install token 不存在\n")
return
}
if record.Mode != model.InstallModeDocker {
c.String(stdhttp.StatusBadRequest, "该 install token 的模式不是 docker\n")
return
}
// 消费
consumed, err := h.tokenService.Consume(c.Request.Context(), token)
if err != nil {
c.String(stdhttp.StatusInternalServerError, "server error\n")
return
}
if consumed == nil {
c.String(stdhttp.StatusGone, "install token 已过期或已消费\n")
return
}
h.recordConsumeAudit(c, consumed, "compose")
yaml, err := installscript.RenderComposeYaml(installscript.Context{
MasterURL: resolveMasterURL(c, h.externalURL),
AgentToken: consumed.Node.Token,
AgentVersion: consumed.Record.AgentVer,
Mode: model.InstallModeDocker,
NodeID: consumed.Node.ID,
})
if err != nil {
c.String(stdhttp.StatusInternalServerError, "render error\n")
return
}
c.Data(stdhttp.StatusOK, "text/yaml; charset=utf-8", []byte(yaml))
}
func (h *InstallHandler) recordConsumeAudit(c *gin.Context, consumed *service.ConsumedInstallToken, kind string) {
if h.auditService == nil {
return
}
h.auditService.Record(service.AuditEntry{
Category: "install_token",
Action: "consume",
TargetType: "node",
TargetID: strconv.FormatUint(uint64(consumed.Node.ID), 10),
TargetName: consumed.Node.Name,
Detail: "install token 消费 (" + kind + ")",
ClientIP: c.ClientIP(),
})
}
// resolveMasterURL 按优先级推导 Master URL外部配置 > X-Forwarded-* > Request.Host。
// 此为包级 helper供 install_handler 和 node_handler 共用。
func resolveMasterURL(c *gin.Context, externalURL string) string {
if strings.TrimSpace(externalURL) != "" {
return strings.TrimRight(externalURL, "/")
}
scheme := strings.TrimSpace(c.GetHeader("X-Forwarded-Proto"))
if scheme == "" {
if c.Request.TLS != nil {
scheme = "https"
} else {
scheme = "http"
}
}
host := strings.TrimSpace(c.GetHeader("X-Forwarded-Host"))
if host == "" {
host = c.Request.Host
}
return scheme + "://" + host
}
// ipLimiter 简单内存滑动窗口限流,按 client IP 维度。
type ipLimiter struct {
mu sync.Mutex
events map[string][]time.Time
limit int
window time.Duration
}
func newIPLimiter(limit int, window time.Duration) *ipLimiter {
return &ipLimiter{events: make(map[string][]time.Time), limit: limit, window: window}
}
func (l *ipLimiter) allow(ip string) bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
cutoff := now.Add(-l.window)
keep := l.events[ip][:0]
for _, t := range l.events[ip] {
if t.After(cutoff) {
keep = append(keep, t)
}
}
if len(keep) >= l.limit {
l.events[ip] = keep
return false
}
l.events[ip] = append(keep, now)
return true
}
// gc 清理窗口外所有过期的 IP 条目,防止公网扫描导致 map 无界增长。
// 由后台 goroutine 周期性调用。
func (l *ipLimiter) gc(now time.Time) {
l.mu.Lock()
defer l.mu.Unlock()
cutoff := now.Add(-l.window)
for k, v := range l.events {
stale := true
for _, t := range v {
if t.After(cutoff) {
stale = false
break
}
}
if stale {
delete(l.events, k)
}
}
}
// startGC 启动后台清理协程,每 window 周期清扫一次 map。
// ctx 取消时协程退出。
func (l *ipLimiter) startGC(ctx context.Context) {
go func() {
ticker := time.NewTicker(l.window)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
l.gc(t)
}
}
}()
}