️ perf(export): 重构大结果集导出链路并支持流式写入

- 新增 ExportFileOptions 统一承载导出格式、进度任务和 XLSX sheet 行数上限
- 查询导出改为流式写入文件,避免一次性缓存整批结果导致高内存占用
- 增加值数组快速路径并复用扫描与写入缓冲,减少逐行 map 分配开销
- 为 ClickHouse、自定义驱动、达梦、SQLServer 和 TDengine 补齐 StreamQuery 支持
- 导出时间字符串仅在形似时间时再解析,避免普通文本被误判改写
- 补充 XLSX 分 sheet、流式导出和基准测试覆盖
This commit is contained in:
Syngnat
2026-06-17 14:18:41 +08:00
parent d7ad83f0d5
commit b3c321be67
10 changed files with 1637 additions and 171 deletions

View File

@@ -0,0 +1,50 @@
package app
import "strings"
const (
maxXLSXRowsPerSheet = 1048575
defaultXLSXRowsPerSheet = maxXLSXRowsPerSheet
)
type ExportFileOptions struct {
Format string `json:"format"`
XLSXMaxRowsPerSheet int `json:"xlsxMaxRowsPerSheet,omitempty"`
JobID string `json:"jobId,omitempty"`
TotalRowsHint int64 `json:"totalRowsHint,omitempty"`
TotalRowsKnown bool `json:"totalRowsKnown,omitempty"`
}
func normalizeExportFileOptions(format string, options ExportFileOptions) ExportFileOptions {
resolvedFormat := strings.ToLower(strings.TrimSpace(format))
if explicitFormat := strings.ToLower(strings.TrimSpace(options.Format)); explicitFormat != "" {
resolvedFormat = explicitFormat
}
return ExportFileOptions{
Format: resolvedFormat,
XLSXMaxRowsPerSheet: normalizeXLSXRowsPerSheet(options.XLSXMaxRowsPerSheet),
JobID: strings.TrimSpace(options.JobID),
TotalRowsHint: normalizeExportTotalRowsHint(options.TotalRowsHint, options.TotalRowsKnown),
TotalRowsKnown: options.TotalRowsKnown,
}
}
func normalizeXLSXRowsPerSheet(value int) int {
if value <= 0 {
return defaultXLSXRowsPerSheet
}
if value > maxXLSXRowsPerSheet {
return maxXLSXRowsPerSheet
}
return value
}
func normalizeExportTotalRowsHint(value int64, known bool) int64 {
if !known {
return 0
}
if value < 0 {
return 0
}
return value
}

File diff suppressed because it is too large Load Diff

View File

@@ -11,6 +11,8 @@ import (
"time"
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/db"
"github.com/xuri/excelize/v2"
)
type fakeExportQueryDB struct {
@@ -24,6 +26,23 @@ type fakeExportQueryDB struct {
hasContextDeadline bool
}
type fakeStreamExportDB struct {
fakeExportQueryDB
streamData []map[string]interface{}
streamCols []string
streamHits int
queryHits int
}
type fakeValueStreamExportDB struct {
fakeExportQueryDB
streamCols []string
streamValues [][]interface{}
streamHits int
queryHits int
valueHits int
}
func (f *fakeExportQueryDB) Connect(config connection.ConnectionConfig) error { return nil }
func (f *fakeExportQueryDB) Close() error { return nil }
func (f *fakeExportQueryDB) Ping() error { return nil }
@@ -63,6 +82,77 @@ func (f *fakeExportQueryDB) GetTriggers(dbName, tableName string) ([]connection.
return nil, nil
}
func (f *fakeStreamExportDB) Query(query string) ([]map[string]interface{}, []string, error) {
f.queryHits++
return f.fakeExportQueryDB.Query(query)
}
func (f *fakeStreamExportDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
f.queryHits++
return f.fakeExportQueryDB.QueryContext(ctx, query)
}
func (f *fakeStreamExportDB) StreamQuery(query string, consumer db.QueryStreamConsumer) error {
return f.StreamQueryContext(context.Background(), query, consumer)
}
func (f *fakeStreamExportDB) StreamQueryContext(_ context.Context, query string, consumer db.QueryStreamConsumer) error {
f.streamHits++
f.lastQuery = query
if err := consumer.SetColumns(f.streamCols); err != nil {
return err
}
for _, row := range f.streamData {
if err := consumer.ConsumeRow(row); err != nil {
return err
}
}
return nil
}
func (f *fakeValueStreamExportDB) Query(query string) ([]map[string]interface{}, []string, error) {
f.queryHits++
return f.fakeExportQueryDB.Query(query)
}
func (f *fakeValueStreamExportDB) QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error) {
f.queryHits++
return f.fakeExportQueryDB.QueryContext(ctx, query)
}
func (f *fakeValueStreamExportDB) StreamQuery(query string, consumer db.QueryStreamConsumer) error {
return f.StreamQueryContext(context.Background(), query, consumer)
}
func (f *fakeValueStreamExportDB) StreamQueryContext(_ context.Context, query string, consumer db.QueryStreamConsumer) error {
f.streamHits++
f.lastQuery = query
if err := consumer.SetColumns(f.streamCols); err != nil {
return err
}
if valueConsumer, ok := consumer.(db.QueryStreamValueConsumer); ok {
for _, row := range f.streamValues {
f.valueHits++
if err := valueConsumer.ConsumeRowValues(row); err != nil {
return err
}
}
return nil
}
for _, row := range f.streamValues {
entry := make(map[string]interface{}, len(f.streamCols))
for idx, column := range f.streamCols {
if idx < len(row) {
entry[column] = row[idx]
}
}
if err := consumer.ConsumeRow(entry); err != nil {
return err
}
}
return nil
}
func TestFormatExportCellText_FloatNoScientificNotation(t *testing.T) {
got := formatExportCellText(1.445663e+06)
if strings.Contains(strings.ToLower(got), "e+") || strings.Contains(strings.ToLower(got), "e-") {
@@ -86,7 +176,7 @@ func TestWriteRowsToFile_Markdown_NumberKeepPlainText(t *testing.T) {
}
columns := []string{"id"}
if err := writeRowsToFile(f, data, columns, "md"); err != nil {
if err := writeRowsToFile(f, data, columns, ExportFileOptions{Format: "md"}); err != nil {
t.Fatalf("写入 md 失败: %v", err)
}
@@ -116,7 +206,7 @@ func TestWriteRowsToFile_JSON_NumberKeepPlainText(t *testing.T) {
}
columns := []string{"id"}
if err := writeRowsToFile(f, data, columns, "json"); err != nil {
if err := writeRowsToFile(f, data, columns, ExportFileOptions{Format: "json"}); err != nil {
t.Fatalf("写入 json 失败: %v", err)
}
@@ -166,6 +256,24 @@ func TestFormatExportCellText_TimeValue_KeepWallClock(t *testing.T) {
}
}
func TestFormatExportCellText_StringRFC3339_KeepWallClock(t *testing.T) {
originalLocal := time.Local
time.Local = time.FixedZone("UTC+8", 8*60*60)
defer func() { time.Local = originalLocal }()
got := formatExportCellText("2026-04-07T10:44:32Z")
if got != "2026-04-07 10:44:32" {
t.Fatalf("字符串时间导出应保持原始钟表时间want=%q got=%q", "2026-04-07 10:44:32", got)
}
}
func TestFormatExportCellText_PlainString_Untouched(t *testing.T) {
got := formatExportCellText("plain export payload without timezone marker")
if got != "plain export payload without timezone marker" {
t.Fatalf("普通字符串不应被改写got=%q", got)
}
}
func TestParseTemporalString_LocalDateTime_NoTimezoneShift(t *testing.T) {
originalLocal := time.Local
time.Local = time.FixedZone("UTC+8", 8*60*60)
@@ -248,6 +356,105 @@ func TestQueryDataForExport_UsesLargerConfiguredTimeout(t *testing.T) {
}
}
func TestExportQueryResultToFile_UsesStreamQueryPath(t *testing.T) {
f, err := os.CreateTemp("", "gonavi-export-stream-*.csv")
if err != nil {
t.Fatalf("创建临时文件失败: %v", err)
}
defer os.Remove(f.Name())
defer f.Close()
fake := &fakeStreamExportDB{
fakeExportQueryDB: fakeExportQueryDB{
err: context.DeadlineExceeded,
data: []map[string]interface{}{{"id": 999}},
cols: []string{"id"},
},
streamCols: []string{"id", "name"},
streamData: []map[string]interface{}{
{"id": 1, "name": "alice"},
{"id": 2, "name": "bob"},
},
}
rowCount, columns, err := exportQueryResultToFile(
f,
fake,
connection.ConnectionConfig{Type: "mysql", Timeout: 10},
"SELECT id, name FROM users",
ExportFileOptions{Format: "csv"},
nil,
)
if err != nil {
t.Fatalf("exportQueryResultToFile 返回错误: %v", err)
}
if fake.streamHits != 1 {
t.Fatalf("应优先使用流式查询streamHits=%d", fake.streamHits)
}
if fake.queryHits != 0 {
t.Fatalf("不应回退到缓冲查询queryHits=%d", fake.queryHits)
}
if rowCount != 2 {
t.Fatalf("导出行数异常want=2 got=%d", rowCount)
}
if len(columns) != 2 || columns[0] != "id" || columns[1] != "name" {
t.Fatalf("导出列异常got=%v", columns)
}
contentBytes, err := os.ReadFile(f.Name())
if err != nil {
t.Fatalf("读取导出文件失败: %v", err)
}
content := string(contentBytes)
if !strings.Contains(content, "alice") || !strings.Contains(content, "bob") {
t.Fatalf("流式导出内容异常: %s", content)
}
}
func TestExportQueryResultToFile_UsesValueStreamPathWhenAvailable(t *testing.T) {
f, err := os.CreateTemp("", "gonavi-export-stream-values-*.csv")
if err != nil {
t.Fatalf("创建临时文件失败: %v", err)
}
defer os.Remove(f.Name())
defer f.Close()
fake := &fakeValueStreamExportDB{
streamCols: []string{"id", "name"},
streamValues: [][]interface{}{
{1, "alice"},
{2, "bob"},
},
}
rowCount, columns, err := exportQueryResultToFile(
f,
fake,
connection.ConnectionConfig{Type: "mysql", Timeout: 10},
"SELECT id, name FROM users",
ExportFileOptions{Format: "csv"},
nil,
)
if err != nil {
t.Fatalf("exportQueryResultToFile 返回错误: %v", err)
}
if fake.streamHits != 1 {
t.Fatalf("应优先使用流式查询streamHits=%d", fake.streamHits)
}
if fake.valueHits != 2 {
t.Fatalf("应走值数组流式路径valueHits=%d", fake.valueHits)
}
if fake.queryHits != 0 {
t.Fatalf("不应回退到缓冲查询queryHits=%d", fake.queryHits)
}
if rowCount != 2 {
t.Fatalf("导出行数异常want=2 got=%d", rowCount)
}
if len(columns) != 2 || columns[0] != "id" || columns[1] != "name" {
t.Fatalf("导出列异常got=%v", columns)
}
}
func TestGetExportQueryTimeout_ClickHouseUsesLongerMinimum(t *testing.T) {
timeout := getExportQueryTimeout(connection.ConnectionConfig{
Type: "clickhouse",
@@ -300,7 +507,7 @@ func TestWriteRowsToFile_HTML_EscapeAndStyle(t *testing.T) {
}
columns := []string{"name", "note", "nullable"}
if err := writeRowsToFile(f, data, columns, "html"); err != nil {
if err := writeRowsToFile(f, data, columns, ExportFileOptions{Format: "html"}); err != nil {
t.Fatalf("写入 html 失败: %v", err)
}
@@ -343,7 +550,7 @@ func TestWriteRowsToFile_HTML_EscapeHeader(t *testing.T) {
columnName := "<b>name</b>"
data := []map[string]interface{}{{columnName: "ok"}}
if err := writeRowsToFile(f, data, []string{columnName}, "html"); err != nil {
if err := writeRowsToFile(f, data, []string{columnName}, ExportFileOptions{Format: "html"}); err != nil {
t.Fatalf("写入 html 失败: %v", err)
}
contentBytes, _ := os.ReadFile(f.Name())
@@ -353,6 +560,175 @@ func TestWriteRowsToFile_HTML_EscapeHeader(t *testing.T) {
}
}
func TestWriteRowsToFile_XLSX_SplitsByMaxRowsPerSheet(t *testing.T) {
f, err := os.CreateTemp("", "gonavi-export-*.xlsx")
if err != nil {
t.Fatalf("创建临时文件失败: %v", err)
}
defer os.Remove(f.Name())
defer f.Close()
data := []map[string]interface{}{
{"id": 1, "name": "alice"},
{"id": 2, "name": "bob"},
{"id": 3, "name": "carol"},
}
columns := []string{"id", "name"}
if err := writeRowsToFile(f, data, columns, ExportFileOptions{
Format: "xlsx",
XLSXMaxRowsPerSheet: 2,
}); err != nil {
t.Fatalf("写入 xlsx 失败: %v", err)
}
workbook, err := excelize.OpenFile(f.Name())
if err != nil {
t.Fatalf("打开 xlsx 失败: %v", err)
}
defer workbook.Close()
sheets := workbook.GetSheetList()
if len(sheets) != 2 {
t.Fatalf("sheet 数量异常want=2 got=%d (%v)", len(sheets), sheets)
}
rows1, err := workbook.GetRows("Sheet1")
if err != nil {
t.Fatalf("读取 Sheet1 失败: %v", err)
}
if len(rows1) != 3 {
t.Fatalf("Sheet1 行数异常want=3 got=%d", len(rows1))
}
rows2, err := workbook.GetRows("Sheet2")
if err != nil {
t.Fatalf("读取 Sheet2 失败: %v", err)
}
if len(rows2) != 2 {
t.Fatalf("Sheet2 行数异常want=2 got=%d", len(rows2))
}
if rows2[1][1] != "carol" {
t.Fatalf("Sheet2 数据异常want=%q got=%q", "carol", rows2[1][1])
}
}
func benchmarkExportRows(rowCount int) ([]map[string]interface{}, []string) {
columns := []string{"id", "name", "note", "created_at", "status"}
rows := make([]map[string]interface{}, rowCount)
for i := 0; i < rowCount; i++ {
rows[i] = map[string]interface{}{
"id": i + 1,
"name": "benchmark-user",
"note": "plain export payload without timezone marker",
"created_at": "2026-06-17 12:34:56",
"status": "enabled",
}
}
return rows, columns
}
func benchmarkExportRowValues(rowCount int) ([][]interface{}, []string) {
columns := []string{"id", "name", "note", "created_at", "status"}
rows := make([][]interface{}, rowCount)
for i := 0; i < rowCount; i++ {
rows[i] = []interface{}{
i + 1,
"benchmark-user",
"plain export payload without timezone marker",
"2026-06-17 12:34:56",
"enabled",
}
}
return rows, columns
}
func BenchmarkFormatExportCellText_PlainString(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = formatExportCellText("plain export payload without timezone marker")
}
}
func BenchmarkWriteRowsToFile_XLSX_20000Rows(b *testing.B) {
rows, columns := benchmarkExportRows(20000)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
f, err := os.CreateTemp("", "gonavi-export-bench-*.xlsx")
if err != nil {
b.Fatalf("创建临时文件失败: %v", err)
}
name := f.Name()
if err := writeRowsToFile(f, rows, columns, ExportFileOptions{Format: "xlsx"}); err != nil {
_ = os.Remove(name)
b.Fatalf("写入 xlsx 失败: %v", err)
}
if err := os.Remove(name); err != nil {
b.Fatalf("删除临时文件失败: %v", err)
}
}
}
func BenchmarkExportQueryResultToFile_XLSX_StreamMap_20000Rows(b *testing.B) {
rows, columns := benchmarkExportRows(20000)
streamDB := &fakeStreamExportDB{
streamCols: columns,
streamData: rows,
}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
f, err := os.CreateTemp("", "gonavi-export-stream-map-*.xlsx")
if err != nil {
b.Fatalf("创建临时文件失败: %v", err)
}
name := f.Name()
if _, _, err := exportQueryResultToFile(
f,
streamDB,
connection.ConnectionConfig{Type: "mysql", Timeout: 10},
"SELECT * FROM users",
ExportFileOptions{Format: "xlsx"},
nil,
); err != nil {
_ = os.Remove(name)
b.Fatalf("流式 map 导出失败: %v", err)
}
if err := os.Remove(name); err != nil {
b.Fatalf("删除临时文件失败: %v", err)
}
}
}
func BenchmarkExportQueryResultToFile_XLSX_StreamValues_20000Rows(b *testing.B) {
rows, columns := benchmarkExportRowValues(20000)
streamDB := &fakeValueStreamExportDB{
streamCols: columns,
streamValues: rows,
}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
f, err := os.CreateTemp("", "gonavi-export-stream-values-*.xlsx")
if err != nil {
b.Fatalf("创建临时文件失败: %v", err)
}
name := f.Name()
if _, _, err := exportQueryResultToFile(
f,
streamDB,
connection.ConnectionConfig{Type: "mysql", Timeout: 10},
"SELECT * FROM users",
ExportFileOptions{Format: "xlsx"},
nil,
); err != nil {
_ = os.Remove(name)
b.Fatalf("流式值数组导出失败: %v", err)
}
if err := os.Remove(name); err != nil {
b.Fatalf("删除临时文件失败: %v", err)
}
}
}
func TestFormatImportSQLValue_NormalizesTimestampWithoutTimezone(t *testing.T) {
got := formatImportSQLValue("postgres", "timestamp without time zone", "2026-01-21T18:32:26+08:00")
if got != "'2026-01-21 18:32:26'" {

View File

@@ -777,6 +777,22 @@ func (c *ClickHouseDB) Query(query string) ([]map[string]interface{}, []string,
return scanRows(rows)
}
func (c *ClickHouseDB) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
if c.conn == nil {
return fmt.Errorf("连接未打开")
}
rows, err := c.conn.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRows(rows, consumer)
}
func (c *ClickHouseDB) StreamQuery(query string, consumer QueryStreamConsumer) error {
return c.StreamQueryContext(context.Background(), query, consumer)
}
func (c *ClickHouseDB) ExecContext(ctx context.Context, query string) (int64, error) {
if c.conn == nil {
return 0, fmt.Errorf("连接未打开")

View File

@@ -111,6 +111,24 @@ func (c *CustomDB) Query(query string) ([]map[string]interface{}, []string, erro
return scanRowsForDialect(rows, c.scanDialect())
}
func (c *CustomDB) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
if c.conn == nil {
return fmt.Errorf("连接未打开")
}
rows, err := c.conn.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRowsForDialect(rows, c.scanDialect(), consumer)
}
func (c *CustomDB) StreamQuery(query string, consumer QueryStreamConsumer) error {
return c.StreamQueryContext(context.Background(), query, consumer)
}
func (c *CustomDB) scanDialect() string {
if strings.EqualFold(strings.TrimSpace(c.driver), "mysql") {
return "mysql"

View File

@@ -182,6 +182,24 @@ func (d *DamengDB) Query(query string) ([]map[string]interface{}, []string, erro
return scanRows(rows)
}
func (d *DamengDB) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
if d.conn == nil {
return fmt.Errorf("连接未打开")
}
rows, err := d.conn.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRows(rows, consumer)
}
func (d *DamengDB) StreamQuery(query string, consumer QueryStreamConsumer) error {
return d.StreamQueryContext(context.Background(), query, consumer)
}
func (d *DamengDB) ExecContext(ctx context.Context, query string) (int64, error) {
if d.conn == nil {
return 0, fmt.Errorf("连接未打开")

View File

@@ -76,6 +76,28 @@ type StatementQueryExecer interface {
QueryContext(ctx context.Context, query string) ([]map[string]interface{}, []string, error)
}
// QueryStreamConsumer receives query metadata and rows incrementally.
// Implementations can stream rows directly to files to avoid buffering entire result sets in memory.
type QueryStreamConsumer interface {
SetColumns(columns []string) error
ConsumeRow(row map[string]interface{}) error
}
// QueryStreamValueConsumer is an optional fast path for stream consumers that
// can consume normalized row values in column order without requiring a
// map[string]interface{} allocation per row.
type QueryStreamValueConsumer interface {
SetColumns(columns []string) error
ConsumeRowValues(values []interface{}) error
}
// StreamQueryExecer is an optional interface for drivers or pinned sessions that can
// stream query rows incrementally instead of materializing []map rows in memory.
type StreamQueryExecer interface {
StreamQuery(query string, consumer QueryStreamConsumer) error
StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error
}
// StatementQueryMessageExecer can run queries on a pinned session and return
// extra server messages/notices alongside rows.
type StatementQueryMessageExecer interface {
@@ -178,6 +200,22 @@ func (e *sqlConnStatementExecer) Query(query string) ([]map[string]interface{},
return e.QueryContext(context.Background(), query)
}
func (e *sqlConnStatementExecer) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
if e == nil || e.conn == nil {
return fmt.Errorf("连接未打开")
}
rows, err := e.conn.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRowsForDialect(rows, e.scanDialect, consumer)
}
func (e *sqlConnStatementExecer) StreamQuery(query string, consumer QueryStreamConsumer) error {
return e.StreamQueryContext(context.Background(), query, consumer)
}
func (e *sqlConnStatementExecer) QueryMultiContext(ctx context.Context, query string) ([]connection.ResultSetData, error) {
if e == nil || e.conn == nil {
return nil, fmt.Errorf("连接未打开")
@@ -275,6 +313,23 @@ func (e *sqlConnTransactionExecer) Query(query string) ([]map[string]interface{}
return e.QueryContext(context.Background(), query)
}
func (e *sqlConnTransactionExecer) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
conn, err := e.activeConn()
if err != nil {
return err
}
rows, err := conn.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRowsForDialect(rows, e.scanDialect, consumer)
}
func (e *sqlConnTransactionExecer) StreamQuery(query string, consumer QueryStreamConsumer) error {
return e.StreamQueryContext(context.Background(), query, consumer)
}
func (e *sqlConnTransactionExecer) QueryMultiContext(ctx context.Context, query string) ([]connection.ResultSetData, error) {
conn, err := e.activeConn()
if err != nil {
@@ -401,6 +456,23 @@ func (e *sqlTxStatementExecer) Query(query string) ([]map[string]interface{}, []
return e.QueryContext(context.Background(), query)
}
func (e *sqlTxStatementExecer) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
tx, err := e.activeTx()
if err != nil {
return err
}
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRows(rows, consumer)
}
func (e *sqlTxStatementExecer) StreamQuery(query string, consumer QueryStreamConsumer) error {
return e.StreamQueryContext(context.Background(), query, consumer)
}
func (e *sqlTxStatementExecer) QueryMultiContext(ctx context.Context, query string) ([]connection.ResultSetData, error) {
tx, err := e.activeTx()
if err != nil {

View File

@@ -11,6 +11,19 @@ func scanRows(rows *sql.Rows) ([]map[string]interface{}, []string, error) {
return scanRowsForDialect(rows, "")
}
func streamRows(rows *sql.Rows, consumer QueryStreamConsumer) error {
return streamRowsForDialect(rows, "", consumer)
}
type queryRowScanner struct {
columns []string
dbTypeNames []string
dialect string
values []interface{}
normalized []interface{}
valuePtrs []interface{}
}
func scanRowsForDialect(rows *sql.Rows, dialect string) ([]map[string]interface{}, []string, error) {
columns, err := rows.Columns()
if err != nil {
@@ -23,27 +36,14 @@ func scanRowsForDialect(rows *sql.Rows, dialect string) ([]map[string]interface{
colTypes = nil
}
scanner := newQueryRowScanner(columns, colTypes, dialect)
resultData := make([]map[string]interface{}, 0)
for rows.Next() {
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range columns {
valuePtrs[i] = &values[i]
}
if err := rows.Scan(valuePtrs...); err != nil {
entry, err := scanner.scanCurrentRow(rows)
if err != nil {
continue
}
entry := make(map[string]interface{}, len(columns))
for i, col := range columns {
dbTypeName := ""
if colTypes != nil && i < len(colTypes) && colTypes[i] != nil {
dbTypeName = colTypes[i].DatabaseTypeName()
}
entry[col] = normalizeQueryValueWithDBTypeAndDialect(values[i], dbTypeName, dialect)
}
resultData = append(resultData, entry)
}
@@ -53,6 +53,95 @@ func scanRowsForDialect(rows *sql.Rows, dialect string) ([]map[string]interface{
return resultData, columns, nil
}
func streamRowsForDialect(rows *sql.Rows, dialect string, consumer QueryStreamConsumer) error {
if consumer == nil {
return fmt.Errorf("query stream consumer required")
}
columns, err := rows.Columns()
if err != nil {
return err
}
columns = ensureUniqueQueryColumnNames(columns)
colTypes, err := rows.ColumnTypes()
if err != nil || len(colTypes) != len(columns) {
colTypes = nil
}
scanner := newQueryRowScanner(columns, colTypes, dialect)
if err := consumer.SetColumns(columns); err != nil {
return err
}
valueConsumer, useValueConsumer := consumer.(QueryStreamValueConsumer)
for rows.Next() {
if useValueConsumer {
values, err := scanner.scanCurrentRowValues(rows)
if err != nil {
continue
}
if err := valueConsumer.ConsumeRowValues(values); err != nil {
return err
}
continue
}
entry, err := scanner.scanCurrentRow(rows)
if err != nil {
continue
}
if err := consumer.ConsumeRow(entry); err != nil {
return err
}
}
return rows.Err()
}
func newQueryRowScanner(columns []string, colTypes []*sql.ColumnType, dialect string) *queryRowScanner {
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range columns {
valuePtrs[i] = &values[i]
}
dbTypeNames := make([]string, len(columns))
for i := range columns {
if colTypes != nil && i < len(colTypes) && colTypes[i] != nil {
dbTypeNames[i] = colTypes[i].DatabaseTypeName()
}
}
return &queryRowScanner{
columns: columns,
dbTypeNames: dbTypeNames,
dialect: dialect,
values: values,
normalized: make([]interface{}, len(columns)),
valuePtrs: valuePtrs,
}
}
func (s *queryRowScanner) scanCurrentRowValues(rows *sql.Rows) ([]interface{}, error) {
if err := rows.Scan(s.valuePtrs...); err != nil {
return nil, err
}
for i := range s.columns {
s.normalized[i] = normalizeQueryValueWithDBTypeAndDialect(s.values[i], s.dbTypeNames[i], s.dialect)
}
return s.normalized, nil
}
func (s *queryRowScanner) scanCurrentRow(rows *sql.Rows) (map[string]interface{}, error) {
normalized, err := s.scanCurrentRowValues(rows)
if err != nil {
return nil, err
}
entry := make(map[string]interface{}, len(s.columns))
for i, col := range s.columns {
entry[col] = normalized[i]
}
return entry, nil
}
func ensureUniqueQueryColumnNames(columns []string) []string {
if len(columns) == 0 {
return columns

View File

@@ -385,6 +385,23 @@ func (e *sqlServerSessionExecer) QueryContext(ctx context.Context, query string)
return rows, columns, err
}
func (e *sqlServerSessionExecer) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
if e == nil || e.conn == nil {
return fmt.Errorf("连接未打开")
}
retmsg := &sqlexp.ReturnMessage{}
rows, err := e.conn.QueryContext(ctx, query, retmsg)
if err != nil {
return err
}
defer rows.Close()
return streamRows(rows, consumer)
}
func (e *sqlServerSessionExecer) StreamQuery(query string, consumer QueryStreamConsumer) error {
return e.StreamQueryContext(context.Background(), query, consumer)
}
func (e *sqlServerSessionExecer) QueryWithMessages(query string) ([]map[string]interface{}, []string, []string, error) {
return e.QueryContextWithMessages(context.Background(), query)
}

View File

@@ -168,6 +168,24 @@ func (t *TDengineDB) Query(query string) ([]map[string]interface{}, []string, er
return scanRows(rows)
}
func (t *TDengineDB) StreamQueryContext(ctx context.Context, query string, consumer QueryStreamConsumer) error {
if t.conn == nil {
return fmt.Errorf("连接未打开")
}
rows, err := t.conn.QueryContext(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return streamRows(rows, consumer)
}
func (t *TDengineDB) StreamQuery(query string, consumer QueryStreamConsumer) error {
return t.StreamQueryContext(context.Background(), query, consumer)
}
func (t *TDengineDB) ExecContext(ctx context.Context, query string) (int64, error) {
if t.conn == nil {
return 0, fmt.Errorf("连接未打开")