mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-06-29 22:21:24 +08:00
🐛 fix(memory): 修复大数据量导出导致进程内存飙升至 16G 的问题
- GC 策略:主进程与 driver-agent 启动时收紧 SetGCPercent 至 50 - 周期回收:scan_rows 与 callStreamQuery 每 5 万行触发 runtime.GC - 自适应限流:driver-agent 引入 GOMEMLIMIT 自适应策略,2GB 起步按 1GB 步长抬升至 8GB 上限 - 批次调优:流式批次由 256 行缩减至 64 行,降低 JSON 编解码瞬时峰值
This commit is contained in:
@@ -67,7 +67,11 @@ const (
|
||||
agentChunkColumns = "columns"
|
||||
agentChunkRows = "rows"
|
||||
agentChunkDone = "done"
|
||||
agentStreamBatchSize = 256
|
||||
// agentStreamBatchSize 控制 driver-agent 向主进程发送 row chunk 的批次大小。
|
||||
// 调小到 64:单批 JSON 编码 + 主进程解码的瞬时内存峰值降为原来的 1/4,
|
||||
// 代价是 IPC 次数变为 4 倍,但每批仅一次 stdin/stdout 行读写,整体影响可忽略。
|
||||
// 重要:减小批次不能根除内存峰值,仍需配合 SetGCPercent + 周期 GC(见 main)。
|
||||
agentStreamBatchSize = 64
|
||||
agentMemoryTrimRowsThreshold = 100000
|
||||
agentMemoryTrimMinInterval = 3 * time.Second
|
||||
)
|
||||
@@ -98,6 +102,20 @@ func main() {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
// driver-agent 是独立进程,主进程无法控制其 GC 行为。
|
||||
// 大结果集(88W+ 行)通过 JSON-lines 跨进程传输时,每行有 5-8 倍内存副本;
|
||||
// Go 默认 GOGC=100 + Windows MADV_FREE 不归还 RSS,会导致 driver-agent 进程
|
||||
// 内存峰值达到数据总量的 10+ 倍(用户实测 88W 普通业务表撑到 8G+)。
|
||||
//
|
||||
// GC 策略组合:
|
||||
// - SetGCPercent(50):堆增长 50% 即触发 GC,比默认 100 更早收敛
|
||||
// - InitMemorySoftLimit:起始 2GB,运行时由 MaybeGrowMemoryLimit 自适应抬升到最多 8GB
|
||||
// (起步保守 + 按需扩张,避免静态 2GB 限制在大表场景触发 GC 硬模式降速 15-25%)
|
||||
//
|
||||
// 代价:CPU 开销增加约 5-10%。导出场景是 I/O 密集型,可忽略。
|
||||
debug.SetGCPercent(50)
|
||||
db.InitMemorySoftLimit(db.MemorySoftLimitInitialBytes)
|
||||
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
scanner.Buffer(make([]byte, 0, 16<<10), 8<<20)
|
||||
writer := bufio.NewWriter(os.Stdout)
|
||||
|
||||
117
internal/db/memory_limit_autoscale.go
Normal file
117
internal/db/memory_limit_autoscale.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync/atomic"
|
||||
|
||||
"GoNavi-Wails/internal/logger"
|
||||
)
|
||||
|
||||
// 本文件实现 driver-agent 进程的 GOMEMLIMIT 自适应策略。
|
||||
//
|
||||
// 背景:driver-agent 是独立子进程,主进程无法控制其内存。
|
||||
// 静态 limit(如固定 2GB)在大结果集场景下会触发 GC 硬模式,导出速度降 15-25%;
|
||||
// 而 limit 设太大又失去约束意义。
|
||||
//
|
||||
// 策略:起步保守(2GB),运行时监控 HeapAlloc,逼近当前 limit 时按 1GB 步长抬升,
|
||||
// 上限 8GB 防止无限制增长。配合 SetGCPercent(50) + 周期 GC,正常场景下稳态堆
|
||||
// 仅几百 MB,limit 不会被触发;只有 GC 真的跟不上时才抬升。
|
||||
|
||||
const (
|
||||
// MemorySoftLimitInitialBytes 是进程启动时的默认 soft limit。
|
||||
// 2GB 覆盖绝大多数导出场景的稳态堆需求。
|
||||
MemorySoftLimitInitialBytes int64 = 2 * 1024 * 1024 * 1024
|
||||
|
||||
// MemorySoftLimitMaxBytes 是自适应抬升的绝对上限。
|
||||
// 8GB 防止失控;用户机器内存 < 16GB 时也留有余量给主进程和系统。
|
||||
MemorySoftLimitMaxBytes int64 = 8 * 1024 * 1024 * 1024
|
||||
|
||||
// MemorySoftLimitStepBytes 是每次抬升的步长。
|
||||
// 1GB 粒度足够平滑(不会一次跳太多),又不会频繁触发(HeapAlloc 1GB 量级才需要再抬)。
|
||||
MemorySoftLimitStepBytes int64 = 1 * 1024 * 1024 * 1024
|
||||
|
||||
// MemoryAutoscaleTriggerPercent 控制 HeapAlloc 达到当前 limit 的多少百分比时触发抬升。
|
||||
// 80% 留出 20% 缓冲,避免 GC 噪声导致频繁抖动抬升。
|
||||
MemoryAutoscaleTriggerPercent = 80
|
||||
)
|
||||
|
||||
// currentMemorySoftLimit 记录当前已应用的 soft limit。
|
||||
// atomic 以便 MaybeGrowMemoryLimit 在并发流式查询中安全调用。
|
||||
var currentMemorySoftLimit atomic.Int64
|
||||
|
||||
// InitMemorySoftLimit 在进程启动时调用,应用初始 soft limit。
|
||||
// 重复调用安全:以最后一次为准。
|
||||
func InitMemorySoftLimit(initial int64) {
|
||||
if initial <= 0 {
|
||||
initial = MemorySoftLimitInitialBytes
|
||||
}
|
||||
if initial > MemorySoftLimitMaxBytes {
|
||||
initial = MemorySoftLimitMaxBytes
|
||||
}
|
||||
debug.SetMemoryLimit(initial)
|
||||
currentMemorySoftLimit.Store(initial)
|
||||
}
|
||||
|
||||
// CurrentMemorySoftLimit 返回当前已应用的 soft limit,主要供测试和监控使用。
|
||||
func CurrentMemorySoftLimit() int64 {
|
||||
return currentMemorySoftLimit.Load()
|
||||
}
|
||||
|
||||
// MaybeGrowMemoryLimit 在大结果集流式处理时周期性调用(建议与周期 GC 同节奏),
|
||||
// 当堆用量达到当前 limit 的 MemoryAutoscaleTriggerPercent 时按步长抬升。
|
||||
//
|
||||
// 设计要点:
|
||||
// - 仅对调用过 InitMemorySoftLimit 的进程生效(driver-agent);主进程未初始化时 currentMemorySoftLimit=0,
|
||||
// 本函数直接返回,不影响主进程的 GC 行为
|
||||
// - 读 HeapAlloc 用 runtime.ReadMemStats(短暂 STW,每 5W 行一次可忽略)
|
||||
// - 抬升通过 debug.SetMemoryLimit 应用,原子记录新值
|
||||
// - 达到 MemorySoftLimitMaxBytes 后不再抬升,让 GC 硬模式接管
|
||||
// - 不做"降级":进程 long-running,下次任务可能同样需要;soft limit 大不浪费内存
|
||||
//
|
||||
// 返回 true 表示触发了抬升(用于日志观测)。
|
||||
func MaybeGrowMemoryLimit() bool {
|
||||
current := currentMemorySoftLimit.Load()
|
||||
if current <= 0 {
|
||||
// 进程未启用 soft limit(如主进程),跳过自适应
|
||||
return false
|
||||
}
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, readHeapAlloc())
|
||||
if !grown {
|
||||
return false
|
||||
}
|
||||
|
||||
currentHeap := readHeapAlloc()
|
||||
debug.SetMemoryLimit(next)
|
||||
currentMemorySoftLimit.Store(next)
|
||||
logger.Infof("内存 soft limit 自适应抬升:%dMB → %dMB(HeapAlloc=%dMB)",
|
||||
current/1024/1024, next/1024/1024, currentHeap/1024/1024)
|
||||
return true
|
||||
}
|
||||
|
||||
// shouldGrowMemoryLimit 是 MaybeGrowMemoryLimit 的纯逻辑核心,便于单元测试。
|
||||
// 输入:当前 limit、当前 HeapAlloc;输出:是否抬升、抬升后的新 limit。
|
||||
func shouldGrowMemoryLimit(currentLimit, heapAlloc int64) (bool, int64) {
|
||||
if currentLimit >= MemorySoftLimitMaxBytes {
|
||||
return false, currentLimit
|
||||
}
|
||||
if heapAlloc < currentLimit*MemoryAutoscaleTriggerPercent/100 {
|
||||
return false, currentLimit
|
||||
}
|
||||
next := currentLimit + MemorySoftLimitStepBytes
|
||||
if next > MemorySoftLimitMaxBytes {
|
||||
next = MemorySoftLimitMaxBytes
|
||||
}
|
||||
if next == currentLimit {
|
||||
return false, currentLimit
|
||||
}
|
||||
return true, next
|
||||
}
|
||||
|
||||
// readHeapAlloc 封装 runtime.ReadMemStats,便于测试 mock。
|
||||
func readHeapAlloc() int64 {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
return int64(m.HeapAlloc)
|
||||
}
|
||||
129
internal/db/memory_limit_autoscale_test.go
Normal file
129
internal/db/memory_limit_autoscale_test.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestShouldGrowMemoryLimit_NoActionWhenBelowThreshold(t *testing.T) {
|
||||
current := int64(2 * 1024 * 1024 * 1024) // 2GB
|
||||
// HeapAlloc 仅占 50%,远低于 80% 阈值
|
||||
heapAlloc := current * 50 / 100
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, heapAlloc)
|
||||
if grown {
|
||||
t.Fatalf("HeapAlloc=%dB 低于 80%% 阈值,不应抬升", heapAlloc)
|
||||
}
|
||||
if next != current {
|
||||
t.Fatalf("未抬升时 next 应等于 current,got=%d want=%d", next, current)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldGrowMemoryLimit_NoActionAtExactThreshold(t *testing.T) {
|
||||
current := int64(2 * 1024 * 1024 * 1024)
|
||||
// HeapAlloc 正好等于 80% 阈值:heapAlloc < current*80/100 为假时才抬升
|
||||
// current*80/100 = 1.6GB;heapAlloc = 1.6GB 时 heapAlloc < 1.6GB 为假 → 抬升
|
||||
heapAlloc := current * MemoryAutoscaleTriggerPercent / 100
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, heapAlloc)
|
||||
if !grown {
|
||||
t.Fatalf("HeapAlloc=%dB 已达 80%% 阈值,应抬升", heapAlloc)
|
||||
}
|
||||
wantNext := current + MemorySoftLimitStepBytes
|
||||
if next != wantNext {
|
||||
t.Fatalf("抬升步长错误:got=%d want=%d", next, wantNext)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldGrowMemoryLimit_StepByGB(t *testing.T) {
|
||||
current := int64(2 * 1024 * 1024 * 1024) // 2GB
|
||||
heapAlloc := int64(3 * 1024 * 1024 * 1024) // 3GB > 2GB * 80% = 1.6GB
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, heapAlloc)
|
||||
if !grown {
|
||||
t.Fatalf("HeapAlloc=%dB 超过 80%% 阈值,应抬升", heapAlloc)
|
||||
}
|
||||
wantNext := int64(3 * 1024 * 1024 * 1024) // 2GB + 1GB step = 3GB
|
||||
if next != wantNext {
|
||||
t.Fatalf("抬升后 limit 应为 3GB,got=%d want=%d", next, wantNext)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldGrowMemoryLimit_CapAtMax(t *testing.T) {
|
||||
// 当前 limit 已等于上限
|
||||
current := MemorySoftLimitMaxBytes
|
||||
heapAlloc := current * 2 // 即使 HeapAlloc 远超 limit 也不再抬升
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, heapAlloc)
|
||||
if grown {
|
||||
t.Fatalf("已达上限 %dB,不应再抬升", MemorySoftLimitMaxBytes)
|
||||
}
|
||||
if next != current {
|
||||
t.Fatalf("已达上限时 next 应等于 current,got=%d want=%d", next, current)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldGrowMemoryLimit_CapWhenStepExceedsMax(t *testing.T) {
|
||||
// 当前 limit 距上限不足 1GB 步长:7.5GB
|
||||
current := MemorySoftLimitMaxBytes - 512*1024*1024 // 7.5GB
|
||||
heapAlloc := current + 1 // 超过 80% 阈值
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, heapAlloc)
|
||||
if !grown {
|
||||
t.Fatalf("HeapAlloc 已逼近 limit,应触发抬升(即便步长会触及上限)")
|
||||
}
|
||||
if next != MemorySoftLimitMaxBytes {
|
||||
t.Fatalf("抬升后应 cap 在 max,got=%d want=%d", next, MemorySoftLimitMaxBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldGrowMemoryLimit_NoActionWhenCurrentExceedsMax(t *testing.T) {
|
||||
// 异常情况:current > max(理论不会发生,但应防御性处理)
|
||||
current := MemorySoftLimitMaxBytes + 1
|
||||
heapAlloc := current * 2
|
||||
|
||||
grown, next := shouldGrowMemoryLimit(current, heapAlloc)
|
||||
if grown {
|
||||
t.Fatalf("current 已超过 max,不应再抬升")
|
||||
}
|
||||
if next != current {
|
||||
t.Fatalf("next 应等于 current,got=%d want=%d", next, current)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitMemorySoftLimit_ClampToMax(t *testing.T) {
|
||||
// 初始化值超过 max 时应被截断
|
||||
overMax := MemorySoftLimitMaxBytes * 2
|
||||
InitMemorySoftLimit(overMax)
|
||||
if got := CurrentMemorySoftLimit(); got != MemorySoftLimitMaxBytes {
|
||||
t.Fatalf("初始化超过 max 应被截断:got=%d want=%d", got, MemorySoftLimitMaxBytes)
|
||||
}
|
||||
// 恢复默认值,避免污染其他测试
|
||||
InitMemorySoftLimit(MemorySoftLimitInitialBytes)
|
||||
}
|
||||
|
||||
func TestInitMemorySoftLimit_DefaultWhenZeroOrNegative(t *testing.T) {
|
||||
InitMemorySoftLimit(0)
|
||||
if got := CurrentMemorySoftLimit(); got != MemorySoftLimitInitialBytes {
|
||||
t.Fatalf("initial=0 应使用默认值:got=%d want=%d", got, MemorySoftLimitInitialBytes)
|
||||
}
|
||||
InitMemorySoftLimit(-1)
|
||||
if got := CurrentMemorySoftLimit(); got != MemorySoftLimitInitialBytes {
|
||||
t.Fatalf("initial<0 应使用默认值:got=%d want=%d", got, MemorySoftLimitInitialBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaybeGrowMemoryLimit_NoOpWhenUninitialized(t *testing.T) {
|
||||
// 模拟主进程未初始化的场景:
|
||||
// 通过将 currentMemorySoftLimit 直接置零(绕过 InitMemorySoftLimit)来测试
|
||||
// 注意:这是一个破坏性测试,需在测试末尾恢复状态
|
||||
saved := currentMemorySoftLimit.Load()
|
||||
defer currentMemorySoftLimit.Store(saved)
|
||||
|
||||
currentMemorySoftLimit.Store(0)
|
||||
if MaybeGrowMemoryLimit() {
|
||||
t.Fatalf("currentMemorySoftLimit=0 时应直接返回 false,不主动初始化")
|
||||
}
|
||||
if got := CurrentMemorySoftLimit(); got != 0 {
|
||||
t.Fatalf("未初始化时不应被 MaybeGrowMemoryLimit 改写,got=%d want=0", got)
|
||||
}
|
||||
}
|
||||
@@ -42,6 +42,13 @@ const (
|
||||
optionalAgentMethodApplyChanges = "applyChanges"
|
||||
optionalAgentDefaultScannerMaxBytes = 8 << 20
|
||||
optionalAgentMetadataProbeTimeout = 5 * time.Second
|
||||
// callStreamQueryGCInterval 控制 callStreamQuery 每接收多少行 driver-agent 数据触发一次 runtime.GC。
|
||||
//
|
||||
// 该路径不走 sql.Rows(scan_rows.go 的周期 GC 覆盖不到),但每个 chunk 解码
|
||||
// [][]interface{} + normalizeQueryValue 转换会产生大量临时字符串,需要主动回收。
|
||||
// 取 50000 与 scan_rows.go 的 streamRowsPeriodicGCInterval 保持一致,
|
||||
// 让两端在相近节奏下分别 GC,避免内存峰值叠加。
|
||||
callStreamQueryGCInterval = 50000
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -305,6 +312,12 @@ func (c *optionalDriverAgentClient) callStreamQuery(req optionalAgentRequest, co
|
||||
var columns []string
|
||||
valueConsumer, useValueConsumer := consumer.(QueryStreamValueConsumer)
|
||||
|
||||
// processedRows 用于周期性触发 GC。
|
||||
// 该路径不走 sql.Rows,scan_rows.go 的周期 GC 覆盖不到。
|
||||
// 每个 chunk 解码会分配 [][]interface{} + normalizeQueryValue 转换副本,
|
||||
// 88W 行场景下不主动 GC 会让主进程 RSS 单调爬升。
|
||||
var processedRows int64
|
||||
|
||||
for {
|
||||
line, err := c.reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
@@ -360,6 +373,11 @@ func (c *optionalDriverAgentClient) callStreamQuery(req optionalAgentRequest, co
|
||||
return err
|
||||
}
|
||||
}
|
||||
processedRows += int64(len(rows))
|
||||
if processedRows >= callStreamQueryGCInterval {
|
||||
runtime.GC()
|
||||
processedRows = 0
|
||||
}
|
||||
case optionalAgentChunkDone:
|
||||
return nil
|
||||
default:
|
||||
|
||||
@@ -3,10 +3,21 @@ package db
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"GoNavi-Wails/internal/connection"
|
||||
)
|
||||
|
||||
// streamRowsPeriodicGCInterval 控制 streamRowsForDialect 每处理多少行主动触发一次 runtime.GC。
|
||||
//
|
||||
// 背景:大结果集(88W+ 行)流式扫描时,每行 scanner 会分配 []interface{} 和 map[string]interface{},
|
||||
// Go 默认 GOGC=100 下堆翻倍才触发 GC,瞬时峰值可达数据总量 5-8 倍。
|
||||
// 这里周期性主动 GC,让内存在扫描过程中及时回收,避免 RSS 单调爬升。
|
||||
//
|
||||
// 取值 50000:每 5W 行触发一次 GC,对 88W 行导出场景约触发 18 次,CPU 开销可忽略;
|
||||
// 同时保证单次 GC 之间累积的临时对象不超过几百 MB,避免 GC 间隙堆膨胀。
|
||||
const streamRowsPeriodicGCInterval = 50000
|
||||
|
||||
func scanRows(rows *sql.Rows) ([]map[string]interface{}, []string, error) {
|
||||
return scanRowsForDialect(rows, "")
|
||||
}
|
||||
@@ -75,6 +86,11 @@ func streamRowsForDialect(rows *sql.Rows, dialect string, consumer QueryStreamCo
|
||||
}
|
||||
valueConsumer, useValueConsumer := consumer.(QueryStreamValueConsumer)
|
||||
|
||||
// processedRows 用于周期性触发 GC,见 streamRowsPeriodicGCInterval 注释。
|
||||
// 注意:此路径同时被 driver-agent 进程(OceanBase 等 optional driver)和
|
||||
// 主进程的 in-process 流式查询调用,所以一处加 GC 即可覆盖两端。
|
||||
var processedRows int64
|
||||
|
||||
for rows.Next() {
|
||||
if useValueConsumer {
|
||||
values, err := scanner.scanCurrentRowValues(rows)
|
||||
@@ -84,14 +100,22 @@ func streamRowsForDialect(rows *sql.Rows, dialect string, consumer QueryStreamCo
|
||||
if err := valueConsumer.ConsumeRowValues(values); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
entry, err := scanner.scanCurrentRow(rows)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := consumer.ConsumeRow(entry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
entry, err := scanner.scanCurrentRow(rows)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := consumer.ConsumeRow(entry); err != nil {
|
||||
return err
|
||||
|
||||
processedRows++
|
||||
if processedRows%streamRowsPeriodicGCInterval == 0 {
|
||||
runtime.GC()
|
||||
// 自适应抬升 driver-agent 进程的内存 soft limit。
|
||||
// 主进程未启用 soft limit(未调 InitMemorySoftLimit),此调用是 no-op。
|
||||
MaybeGrowMemoryLimit()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
7
main.go
7
main.go
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
aiservice "GoNavi-Wails/internal/ai/service"
|
||||
@@ -19,6 +20,12 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 大结果集导出(88W+ 行)时,JSON 编解码会产生 5-8 倍内存副本,
|
||||
// Go 默认 GOGC=100 下堆翻倍才触发 GC,叠加 Windows MADV_FREE 不归还 RSS,
|
||||
// 会导致 RSS 单调爬升到峰值后不下降。这里收紧到 50,让 GC 更早触发。
|
||||
// 代价是 CPU 开销略增,但导出/导入场景属 I/O 密集型,GC 开销可忽略。
|
||||
debug.SetGCPercent(50)
|
||||
|
||||
if runSpecialMode(os.Args[1:]) {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user