mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-06-26 00:11:43 +08:00
- GC 策略:主进程与 driver-agent 启动时收紧 SetGCPercent 至 50 - 周期回收:scan_rows 与 callStreamQuery 每 5 万行触发 runtime.GC - 自适应限流:driver-agent 引入 GOMEMLIMIT 自适应策略,2GB 起步按 1GB 步长抬升至 8GB 上限 - 批次调优:流式批次由 256 行缩减至 64 行,降低 JSON 编解码瞬时峰值
247 lines
6.5 KiB
Go
247 lines
6.5 KiB
Go
package db
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"runtime"
|
||
|
||
"GoNavi-Wails/internal/connection"
|
||
)
|
||
|
||
// streamRowsPeriodicGCInterval 控制 streamRowsForDialect 每处理多少行主动触发一次 runtime.GC。
|
||
//
|
||
// 背景:大结果集(88W+ 行)流式扫描时,每行 scanner 会分配 []interface{} 和 map[string]interface{},
|
||
// Go 默认 GOGC=100 下堆翻倍才触发 GC,瞬时峰值可达数据总量 5-8 倍。
|
||
// 这里周期性主动 GC,让内存在扫描过程中及时回收,避免 RSS 单调爬升。
|
||
//
|
||
// 取值 50000:每 5W 行触发一次 GC,对 88W 行导出场景约触发 18 次,CPU 开销可忽略;
|
||
// 同时保证单次 GC 之间累积的临时对象不超过几百 MB,避免 GC 间隙堆膨胀。
|
||
const streamRowsPeriodicGCInterval = 50000
|
||
|
||
func scanRows(rows *sql.Rows) ([]map[string]interface{}, []string, error) {
|
||
return scanRowsForDialect(rows, "")
|
||
}
|
||
|
||
func streamRows(rows *sql.Rows, consumer QueryStreamConsumer) error {
|
||
return streamRowsForDialect(rows, "", consumer)
|
||
}
|
||
|
||
type queryRowScanner struct {
|
||
columns []string
|
||
dbTypeNames []string
|
||
dialect string
|
||
values []interface{}
|
||
normalized []interface{}
|
||
valuePtrs []interface{}
|
||
}
|
||
|
||
func scanRowsForDialect(rows *sql.Rows, dialect string) ([]map[string]interface{}, []string, error) {
|
||
columns, err := rows.Columns()
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
columns = ensureUniqueQueryColumnNames(columns)
|
||
|
||
colTypes, err := rows.ColumnTypes()
|
||
if err != nil || len(colTypes) != len(columns) {
|
||
colTypes = nil
|
||
}
|
||
|
||
scanner := newQueryRowScanner(columns, colTypes, dialect)
|
||
resultData := make([]map[string]interface{}, 0)
|
||
|
||
for rows.Next() {
|
||
entry, err := scanner.scanCurrentRow(rows)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
resultData = append(resultData, entry)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
return resultData, columns, err
|
||
}
|
||
return resultData, columns, nil
|
||
}
|
||
|
||
func streamRowsForDialect(rows *sql.Rows, dialect string, consumer QueryStreamConsumer) error {
|
||
if consumer == nil {
|
||
return fmt.Errorf("query stream consumer required")
|
||
}
|
||
|
||
columns, err := rows.Columns()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
columns = ensureUniqueQueryColumnNames(columns)
|
||
|
||
colTypes, err := rows.ColumnTypes()
|
||
if err != nil || len(colTypes) != len(columns) {
|
||
colTypes = nil
|
||
}
|
||
|
||
scanner := newQueryRowScanner(columns, colTypes, dialect)
|
||
if err := consumer.SetColumns(columns); err != nil {
|
||
return err
|
||
}
|
||
valueConsumer, useValueConsumer := consumer.(QueryStreamValueConsumer)
|
||
|
||
// processedRows 用于周期性触发 GC,见 streamRowsPeriodicGCInterval 注释。
|
||
// 注意:此路径同时被 driver-agent 进程(OceanBase 等 optional driver)和
|
||
// 主进程的 in-process 流式查询调用,所以一处加 GC 即可覆盖两端。
|
||
var processedRows int64
|
||
|
||
for rows.Next() {
|
||
if useValueConsumer {
|
||
values, err := scanner.scanCurrentRowValues(rows)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if err := valueConsumer.ConsumeRowValues(values); err != nil {
|
||
return err
|
||
}
|
||
} else {
|
||
entry, err := scanner.scanCurrentRow(rows)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if err := consumer.ConsumeRow(entry); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
processedRows++
|
||
if processedRows%streamRowsPeriodicGCInterval == 0 {
|
||
runtime.GC()
|
||
// 自适应抬升 driver-agent 进程的内存 soft limit。
|
||
// 主进程未启用 soft limit(未调 InitMemorySoftLimit),此调用是 no-op。
|
||
MaybeGrowMemoryLimit()
|
||
}
|
||
}
|
||
|
||
return rows.Err()
|
||
}
|
||
|
||
func newQueryRowScanner(columns []string, colTypes []*sql.ColumnType, dialect string) *queryRowScanner {
|
||
values := make([]interface{}, len(columns))
|
||
valuePtrs := make([]interface{}, len(columns))
|
||
for i := range columns {
|
||
valuePtrs[i] = &values[i]
|
||
}
|
||
dbTypeNames := make([]string, len(columns))
|
||
for i := range columns {
|
||
if colTypes != nil && i < len(colTypes) && colTypes[i] != nil {
|
||
dbTypeNames[i] = colTypes[i].DatabaseTypeName()
|
||
}
|
||
}
|
||
return &queryRowScanner{
|
||
columns: columns,
|
||
dbTypeNames: dbTypeNames,
|
||
dialect: dialect,
|
||
values: values,
|
||
normalized: make([]interface{}, len(columns)),
|
||
valuePtrs: valuePtrs,
|
||
}
|
||
}
|
||
|
||
func (s *queryRowScanner) scanCurrentRowValues(rows *sql.Rows) ([]interface{}, error) {
|
||
if err := rows.Scan(s.valuePtrs...); err != nil {
|
||
return nil, err
|
||
}
|
||
for i := range s.columns {
|
||
s.normalized[i] = normalizeQueryValueWithDBTypeAndDialect(s.values[i], s.dbTypeNames[i], s.dialect)
|
||
}
|
||
return s.normalized, nil
|
||
}
|
||
|
||
func (s *queryRowScanner) scanCurrentRow(rows *sql.Rows) (map[string]interface{}, error) {
|
||
normalized, err := s.scanCurrentRowValues(rows)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
entry := make(map[string]interface{}, len(s.columns))
|
||
for i, col := range s.columns {
|
||
entry[col] = normalized[i]
|
||
}
|
||
return entry, nil
|
||
}
|
||
|
||
func ensureUniqueQueryColumnNames(columns []string) []string {
|
||
if len(columns) == 0 {
|
||
return columns
|
||
}
|
||
|
||
uniqueColumns := make([]string, len(columns))
|
||
taken := make(map[string]struct{}, len(columns))
|
||
nextSuffix := make(map[string]int, len(columns))
|
||
|
||
for idx, column := range columns {
|
||
base := column
|
||
if base == "" {
|
||
base = fmt.Sprintf("column_%d", idx+1)
|
||
}
|
||
|
||
candidate := base
|
||
if _, exists := taken[candidate]; exists {
|
||
suffix := nextSuffix[base]
|
||
if suffix < 2 {
|
||
suffix = 2
|
||
}
|
||
for {
|
||
candidate = fmt.Sprintf("%s_%d", base, suffix)
|
||
if _, exists := taken[candidate]; !exists {
|
||
break
|
||
}
|
||
suffix++
|
||
}
|
||
nextSuffix[base] = suffix + 1
|
||
} else {
|
||
nextSuffix[base] = 2
|
||
}
|
||
|
||
uniqueColumns[idx] = candidate
|
||
taken[candidate] = struct{}{}
|
||
}
|
||
|
||
return uniqueColumns
|
||
}
|
||
|
||
// scanMultiRows 遍历 sql.Rows 中的所有结果集,将每个结果集作为 ResultSetData 返回。
|
||
// 利用 rows.NextResultSet() 支持一次 query 返回多个结果集的场景。
|
||
func scanMultiRows(rows *sql.Rows) ([]connection.ResultSetData, error) {
|
||
return scanMultiRowsForDialect(rows, "")
|
||
}
|
||
|
||
func scanMultiRowsForDialect(rows *sql.Rows, dialect string) ([]connection.ResultSetData, error) {
|
||
var results []connection.ResultSetData
|
||
for {
|
||
data, cols, err := scanRowsForDialect(rows, dialect)
|
||
if err != nil {
|
||
return results, err
|
||
}
|
||
if data == nil {
|
||
data = make([]map[string]interface{}, 0)
|
||
}
|
||
if cols == nil {
|
||
cols = []string{}
|
||
}
|
||
results = append(results, connection.ResultSetData{
|
||
Rows: data,
|
||
Columns: cols,
|
||
})
|
||
if !rows.NextResultSet() {
|
||
break
|
||
}
|
||
}
|
||
if len(results) == 0 {
|
||
results = []connection.ResultSetData{{
|
||
Rows: make([]map[string]interface{}, 0),
|
||
Columns: []string{},
|
||
}}
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return results, err
|
||
}
|
||
return results, nil
|
||
}
|