mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-06-29 09:41:22 +08:00
✨ feat(export-workbench): 支持批量导出工作台并优化 SQL 导出性能
- 侧边栏批量表/批量库入口改为直接打开导出工作台,统一导出配置与进度视图 - 导出工作台新增 batch-tables / batch-databases 模式,支持连接、数据库、对象选择与独立历史记录键 - 连接、数据库、对象下拉项补齐完整名展示与悬浮提示,避免长名称被截断后不可识别 - 后端新增批量对象/批量库导出 WithOptions 链路,统一返回输出文件/目录与进度信息 - SQL dump 数据导出改为按方言批量写入,MySQL/PG 等使用多值 VALUES,Oracle/达梦使用 INSERT ALL - 补充导出工作台与 SQL dump 的回归测试和 benchmark,覆盖批量模式与批量写入语义
This commit is contained in:
@@ -39,6 +39,8 @@ const sqlFileProgressTimeInterval = time.Second
|
||||
const exportProgressEvent = "export:progress"
|
||||
const exportProgressRowInterval int64 = 1000
|
||||
const exportProgressTimeInterval = 500 * time.Millisecond
|
||||
const sqlExportInsertBatchMaxRows = 200
|
||||
const sqlExportInsertBatchMaxBytes = 256 * 1024
|
||||
const defaultAppLogTailLineLimit = 80
|
||||
const maxAppLogTailLineLimit = 200
|
||||
const appLogTailReadWindowBytes int64 = 256 * 1024
|
||||
@@ -220,6 +222,89 @@ func (r *exportProgressReporter) Error(current int64, message string) {
|
||||
r.emit("error", "导出失败", current, message, true)
|
||||
}
|
||||
|
||||
var exportFileNameSanitizer = strings.NewReplacer(
|
||||
"/", "_",
|
||||
"\\", "_",
|
||||
":", "_",
|
||||
"*", "_",
|
||||
"?", "_",
|
||||
"\"", "_",
|
||||
"<", "_",
|
||||
">", "_",
|
||||
"|", "_",
|
||||
)
|
||||
|
||||
func sanitizeExportFileStem(raw string) string {
|
||||
value := strings.TrimSpace(raw)
|
||||
if value == "" {
|
||||
return "export"
|
||||
}
|
||||
value = exportFileNameSanitizer.Replace(value)
|
||||
value = strings.Trim(value, ". ")
|
||||
if value == "" {
|
||||
return "export"
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func resolveSQLExportSuffix(includeSchema bool, includeData bool) string {
|
||||
if includeSchema && includeData {
|
||||
return "backup"
|
||||
}
|
||||
if includeData {
|
||||
return "data"
|
||||
}
|
||||
return "schema"
|
||||
}
|
||||
|
||||
func normalizeExportNameList(names []string) []string {
|
||||
normalized := make([]string, 0, len(names))
|
||||
seen := make(map[string]struct{}, len(names))
|
||||
for _, name := range names {
|
||||
safeName := strings.TrimSpace(name)
|
||||
if safeName == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[safeName]; ok {
|
||||
continue
|
||||
}
|
||||
seen[safeName] = struct{}{}
|
||||
normalized = append(normalized, safeName)
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
func buildTablesExportDefaultFilename(dbName string, objectNames []string, includeSchema bool, includeData bool) string {
|
||||
suffix := resolveSQLExportSuffix(includeSchema, includeData)
|
||||
if len(objectNames) == 1 {
|
||||
return fmt.Sprintf("%s_%s.sql", sanitizeExportFileStem(objectNames[0]), suffix)
|
||||
}
|
||||
safeDbName := strings.TrimSpace(dbName)
|
||||
if safeDbName == "" {
|
||||
safeDbName = "export"
|
||||
}
|
||||
return fmt.Sprintf("%s_%s_%dtables.sql", sanitizeExportFileStem(safeDbName), suffix, len(objectNames))
|
||||
}
|
||||
|
||||
func buildDatabaseExportDefaultFilename(dbName string, includeData bool) string {
|
||||
suffix := "schema"
|
||||
if includeData {
|
||||
suffix = "backup"
|
||||
}
|
||||
return fmt.Sprintf("%s_%s.sql", sanitizeExportFileStem(dbName), suffix)
|
||||
}
|
||||
|
||||
func resolveBatchObjectsTargetName(dbName string, objectNames []string) string {
|
||||
if len(objectNames) == 1 {
|
||||
return objectNames[0]
|
||||
}
|
||||
safeDbName := strings.TrimSpace(dbName)
|
||||
if safeDbName == "" {
|
||||
safeDbName = "当前数据库"
|
||||
}
|
||||
return fmt.Sprintf("%s · %d 个对象", safeDbName, len(objectNames))
|
||||
}
|
||||
|
||||
func normalizeSQLDirectoryPath(directoryPath string) (string, error) {
|
||||
target := strings.TrimSpace(directoryPath)
|
||||
if target == "" {
|
||||
@@ -2246,58 +2331,85 @@ func (a *App) ExportTablesDataSQL(config connection.ConnectionConfig, dbName str
|
||||
return a.exportTablesSQL(config, dbName, tableNames, false, true)
|
||||
}
|
||||
|
||||
func (a *App) exportTablesSQL(config connection.ConnectionConfig, dbName string, tableNames []string, includeSchema bool, includeData bool) connection.QueryResult {
|
||||
func (a *App) ExportTablesSQLWithOptions(
|
||||
config connection.ConnectionConfig,
|
||||
dbName string,
|
||||
tableNames []string,
|
||||
includeSchema bool,
|
||||
includeData bool,
|
||||
options ExportFileOptions,
|
||||
) connection.QueryResult {
|
||||
if !includeSchema && !includeData {
|
||||
return connection.QueryResult{Success: false, Message: "无效的导出模式"}
|
||||
}
|
||||
|
||||
safeDbName := strings.TrimSpace(dbName)
|
||||
if safeDbName == "" {
|
||||
safeDbName = "export"
|
||||
}
|
||||
suffix := "schema"
|
||||
if includeSchema && includeData {
|
||||
suffix = "backup"
|
||||
} else if !includeSchema && includeData {
|
||||
suffix = "data"
|
||||
}
|
||||
defaultFilename := fmt.Sprintf("%s_%s_%dtables.sql", safeDbName, suffix, len(tableNames))
|
||||
if len(tableNames) == 1 && strings.TrimSpace(tableNames[0]) != "" {
|
||||
defaultFilename = fmt.Sprintf("%s_%s.sql", strings.TrimSpace(tableNames[0]), suffix)
|
||||
}
|
||||
objects := normalizeExportNameList(tableNames)
|
||||
options = normalizeExportFileOptions("sql", options)
|
||||
options.TotalRowsHint = int64(len(objects))
|
||||
options.TotalRowsKnown = true
|
||||
|
||||
filename, err := runtime.SaveFileDialog(a.ctx, runtime.SaveDialogOptions{
|
||||
Title: "Export Tables (SQL)",
|
||||
DefaultFilename: defaultFilename,
|
||||
DefaultFilename: buildTablesExportDefaultFilename(dbName, objects, includeSchema, includeData),
|
||||
})
|
||||
if err != nil || filename == "" {
|
||||
return connection.QueryResult{Success: false, Message: "已取消"}
|
||||
}
|
||||
|
||||
reporter := newExportProgressReporter(a, options, resolveBatchObjectsTargetName(dbName, objects), filename)
|
||||
if reporter != nil {
|
||||
reporter.Start("正在准备批量对象导出")
|
||||
}
|
||||
return a.exportTablesSQLToFile(config, dbName, objects, includeSchema, includeData, filename, reporter)
|
||||
}
|
||||
|
||||
func (a *App) exportTablesSQL(config connection.ConnectionConfig, dbName string, tableNames []string, includeSchema bool, includeData bool) connection.QueryResult {
|
||||
if !includeSchema && !includeData {
|
||||
return connection.QueryResult{Success: false, Message: "无效的导出模式"}
|
||||
}
|
||||
objects := normalizeExportNameList(tableNames)
|
||||
|
||||
filename, err := runtime.SaveFileDialog(a.ctx, runtime.SaveDialogOptions{
|
||||
Title: "Export Tables (SQL)",
|
||||
DefaultFilename: buildTablesExportDefaultFilename(dbName, objects, includeSchema, includeData),
|
||||
})
|
||||
if err != nil || filename == "" {
|
||||
return connection.QueryResult{Success: false, Message: "已取消"}
|
||||
}
|
||||
|
||||
return a.exportTablesSQLToFile(config, dbName, objects, includeSchema, includeData, filename, nil)
|
||||
}
|
||||
|
||||
func (a *App) exportTablesSQLToFile(
|
||||
config connection.ConnectionConfig,
|
||||
dbName string,
|
||||
tableNames []string,
|
||||
includeSchema bool,
|
||||
includeData bool,
|
||||
filename string,
|
||||
reporter *exportProgressReporter,
|
||||
) connection.QueryResult {
|
||||
if !includeSchema && !includeData {
|
||||
return connection.QueryResult{Success: false, Message: "无效的导出模式"}
|
||||
}
|
||||
|
||||
runConfig := normalizeRunConfig(config, dbName)
|
||||
dbInst, err := a.getDatabase(runConfig)
|
||||
if err != nil {
|
||||
if reporter != nil {
|
||||
reporter.Error(0, err.Error())
|
||||
}
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
|
||||
objects := make([]string, 0, len(tableNames))
|
||||
seen := make(map[string]struct{}, len(tableNames))
|
||||
for _, t := range tableNames {
|
||||
t = strings.TrimSpace(t)
|
||||
if t == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[t]; ok {
|
||||
continue
|
||||
}
|
||||
seen[t] = struct{}{}
|
||||
objects = append(objects, t)
|
||||
}
|
||||
viewLookup := listViewNameLookup(dbInst, runConfig, dbName)
|
||||
objects = buildExportObjectOrder(runConfig, dbName, objects, viewLookup, false)
|
||||
objects := buildExportObjectOrder(runConfig, dbName, normalizeExportNameList(tableNames), viewLookup, false)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
if reporter != nil {
|
||||
reporter.Error(0, err.Error())
|
||||
}
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
defer f.Close()
|
||||
@@ -2306,18 +2418,44 @@ func (a *App) exportTablesSQL(config connection.ConnectionConfig, dbName string,
|
||||
defer w.Flush()
|
||||
|
||||
if err := writeSQLHeader(w, runConfig, dbName); err != nil {
|
||||
if reporter != nil {
|
||||
reporter.Error(0, err.Error())
|
||||
}
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
for _, objectName := range objects {
|
||||
for index, objectName := range objects {
|
||||
if reporter != nil {
|
||||
reporter.ForceRunning(int64(index), fmt.Sprintf("正在导出 %s (%d/%d)", objectName, index+1, len(objects)))
|
||||
}
|
||||
if err := dumpTableSQL(w, dbInst, runConfig, dbName, objectName, includeSchema, includeData, viewLookup); err != nil {
|
||||
if reporter != nil {
|
||||
reporter.Error(int64(index), err.Error())
|
||||
}
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
if reporter != nil {
|
||||
reporter.ForceRunning(int64(index+1), fmt.Sprintf("正在导出 %s (%d/%d)", objectName, index+1, len(objects)))
|
||||
}
|
||||
}
|
||||
if err := writeSQLFooter(w, runConfig); err != nil {
|
||||
if reporter != nil {
|
||||
reporter.Error(int64(len(objects)), err.Error())
|
||||
}
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
|
||||
return connection.QueryResult{Success: true, Message: "导出完成"}
|
||||
if reporter != nil {
|
||||
reporter.Finalizing(int64(len(objects)))
|
||||
reporter.Done(int64(len(objects)))
|
||||
}
|
||||
return connection.QueryResult{
|
||||
Success: true,
|
||||
Message: "导出完成",
|
||||
Data: map[string]interface{}{
|
||||
"filePath": filename,
|
||||
"objectCount": len(objects),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) ExportDatabaseSQL(config connection.ConnectionConfig, dbName string, includeData bool) connection.QueryResult {
|
||||
@@ -2325,19 +2463,87 @@ func (a *App) ExportDatabaseSQL(config connection.ConnectionConfig, dbName strin
|
||||
if safeDbName == "" {
|
||||
return connection.QueryResult{Success: false, Message: "数据库名称不能为空"}
|
||||
}
|
||||
suffix := "schema"
|
||||
if includeData {
|
||||
suffix = "backup"
|
||||
}
|
||||
|
||||
filename, err := runtime.SaveFileDialog(a.ctx, runtime.SaveDialogOptions{
|
||||
Title: fmt.Sprintf("Export %s (SQL)", safeDbName),
|
||||
DefaultFilename: fmt.Sprintf("%s_%s.sql", safeDbName, suffix),
|
||||
DefaultFilename: buildDatabaseExportDefaultFilename(safeDbName, includeData),
|
||||
})
|
||||
if err != nil || filename == "" {
|
||||
return connection.QueryResult{Success: false, Message: "已取消"}
|
||||
}
|
||||
|
||||
return a.exportDatabaseSQLToFile(config, safeDbName, includeData, filename)
|
||||
}
|
||||
|
||||
func (a *App) ExportDatabasesSQLWithOptions(
|
||||
config connection.ConnectionConfig,
|
||||
dbNames []string,
|
||||
includeData bool,
|
||||
options ExportFileOptions,
|
||||
) connection.QueryResult {
|
||||
normalizedDbNames := normalizeExportNameList(dbNames)
|
||||
if len(normalizedDbNames) == 0 {
|
||||
return connection.QueryResult{Success: false, Message: "请至少选择一个数据库"}
|
||||
}
|
||||
|
||||
directory, err := runtime.OpenDirectoryDialog(a.ctx, runtime.OpenDialogOptions{
|
||||
Title: "选择批量导出目录",
|
||||
DefaultDirectory: normalizeDirectoryDialogPath(""),
|
||||
})
|
||||
if err != nil || strings.TrimSpace(directory) == "" {
|
||||
return connection.QueryResult{Success: false, Message: "已取消"}
|
||||
}
|
||||
|
||||
options = normalizeExportFileOptions("sql", options)
|
||||
options.TotalRowsHint = int64(len(normalizedDbNames))
|
||||
options.TotalRowsKnown = true
|
||||
reporter := newExportProgressReporter(a, options, fmt.Sprintf("%d 个数据库", len(normalizedDbNames)), directory)
|
||||
if reporter != nil {
|
||||
reporter.Start("正在准备批量库导出")
|
||||
}
|
||||
|
||||
for index, name := range normalizedDbNames {
|
||||
if reporter != nil {
|
||||
reporter.ForceRunning(int64(index), fmt.Sprintf("正在导出 %s (%d/%d)", name, index+1, len(normalizedDbNames)))
|
||||
}
|
||||
targetFile := filepath.Join(directory, buildDatabaseExportDefaultFilename(name, includeData))
|
||||
result := a.exportDatabaseSQLToFile(config, name, includeData, targetFile)
|
||||
if !result.Success {
|
||||
if reporter != nil {
|
||||
reporter.Error(int64(index), result.Message)
|
||||
}
|
||||
return result
|
||||
}
|
||||
if reporter != nil {
|
||||
reporter.ForceRunning(int64(index+1), fmt.Sprintf("正在导出 %s (%d/%d)", name, index+1, len(normalizedDbNames)))
|
||||
}
|
||||
}
|
||||
|
||||
if reporter != nil {
|
||||
reporter.Finalizing(int64(len(normalizedDbNames)))
|
||||
reporter.Done(int64(len(normalizedDbNames)))
|
||||
}
|
||||
return connection.QueryResult{
|
||||
Success: true,
|
||||
Message: "导出完成",
|
||||
Data: map[string]interface{}{
|
||||
"directoryPath": directory,
|
||||
"fileCount": len(normalizedDbNames),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) exportDatabaseSQLToFile(
|
||||
config connection.ConnectionConfig,
|
||||
dbName string,
|
||||
includeData bool,
|
||||
filename string,
|
||||
) connection.QueryResult {
|
||||
safeDbName := strings.TrimSpace(dbName)
|
||||
if safeDbName == "" {
|
||||
return connection.QueryResult{Success: false, Message: "数据库名称不能为空"}
|
||||
}
|
||||
|
||||
runConfig := normalizeRunConfig(config, dbName)
|
||||
dbInst, err := a.getDatabase(runConfig)
|
||||
if err != nil {
|
||||
@@ -2372,7 +2578,13 @@ func (a *App) ExportDatabaseSQL(config connection.ConnectionConfig, dbName strin
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
}
|
||||
|
||||
return connection.QueryResult{Success: true, Message: "导出完成"}
|
||||
return connection.QueryResult{
|
||||
Success: true,
|
||||
Message: "导出完成",
|
||||
Data: map[string]interface{}{
|
||||
"filePath": filename,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) ExportSchemaSQL(config connection.ConnectionConfig, dbName string, schemaName string, includeData bool) connection.QueryResult {
|
||||
@@ -3350,6 +3562,12 @@ func dumpTableSQL(
|
||||
columnTypeMap: columnTypeMap,
|
||||
}
|
||||
if err := streamQueryDataForExport(dbInst, config, selectSQL, insertConsumer); err != nil {
|
||||
if flushErr := insertConsumer.Flush(); flushErr != nil {
|
||||
return flushErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := insertConsumer.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
if insertConsumer.rowCount == 0 {
|
||||
@@ -4039,9 +4257,32 @@ type sqlInsertExportConsumer struct {
|
||||
columnTypeMap map[string]string
|
||||
columns []string
|
||||
quotedCols []string
|
||||
columnList string
|
||||
columnTypes []string
|
||||
valueBuf []string
|
||||
rowCount int64
|
||||
mode sqlInsertExportMode
|
||||
pendingRows int
|
||||
statementBuf strings.Builder
|
||||
}
|
||||
|
||||
type sqlInsertExportMode int
|
||||
|
||||
const (
|
||||
sqlInsertExportModeSingle sqlInsertExportMode = iota
|
||||
sqlInsertExportModeMultiValues
|
||||
sqlInsertExportModeInsertAll
|
||||
)
|
||||
|
||||
func resolveSQLInsertExportMode(dbType string) sqlInsertExportMode {
|
||||
switch strings.ToLower(strings.TrimSpace(dbType)) {
|
||||
case "mysql", "mariadb", "oceanbase", "diros", "starrocks", "sphinx", "postgres", "kingbase", "highgo", "vastbase", "opengauss", "gaussdb", "sqlserver", "sqlite", "duckdb", "clickhouse", "iris":
|
||||
return sqlInsertExportModeMultiValues
|
||||
case "oracle", "dameng":
|
||||
return sqlInsertExportModeInsertAll
|
||||
default:
|
||||
return sqlInsertExportModeSingle
|
||||
}
|
||||
}
|
||||
|
||||
func (c *sqlInsertExportConsumer) SetColumns(columns []string) error {
|
||||
@@ -4055,19 +4296,16 @@ func (c *sqlInsertExportConsumer) SetColumns(columns []string) error {
|
||||
for i, column := range columns {
|
||||
c.columnTypes[i] = c.columnTypeMap[normalizeColumnName(column)]
|
||||
}
|
||||
c.columnList = strings.Join(c.quotedCols, ", ")
|
||||
c.mode = resolveSQLInsertExportMode(c.dbType)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *sqlInsertExportConsumer) ConsumeRow(row map[string]interface{}) error {
|
||||
values := make([]string, 0, len(c.columns))
|
||||
for _, column := range c.columns {
|
||||
values = append(values, formatImportSQLValue(c.dbType, c.columnTypeMap[normalizeColumnName(column)], row[column]))
|
||||
for i, column := range c.columns {
|
||||
c.valueBuf[i] = formatImportSQLValue(c.dbType, c.columnTypeMap[normalizeColumnName(column)], row[column])
|
||||
}
|
||||
if _, err := c.w.WriteString(fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s);\n", c.quotedTable, strings.Join(c.quotedCols, ", "), strings.Join(values, ", "))); err != nil {
|
||||
return err
|
||||
}
|
||||
c.rowCount++
|
||||
return nil
|
||||
return c.consumeValueBuf()
|
||||
}
|
||||
|
||||
func (c *sqlInsertExportConsumer) ConsumeRowValues(values []interface{}) error {
|
||||
@@ -4078,10 +4316,92 @@ func (c *sqlInsertExportConsumer) ConsumeRowValues(values []interface{}) error {
|
||||
}
|
||||
c.valueBuf[i] = formatImportSQLValue(c.dbType, c.columnTypes[i], value)
|
||||
}
|
||||
if _, err := c.w.WriteString(fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s);\n", c.quotedTable, strings.Join(c.quotedCols, ", "), strings.Join(c.valueBuf, ", "))); err != nil {
|
||||
return c.consumeValueBuf()
|
||||
}
|
||||
|
||||
func (c *sqlInsertExportConsumer) consumeValueBuf() error {
|
||||
rowValues := "(" + strings.Join(c.valueBuf, ", ") + ")"
|
||||
switch c.mode {
|
||||
case sqlInsertExportModeMultiValues, sqlInsertExportModeInsertAll:
|
||||
return c.appendBatchRow(rowValues)
|
||||
default:
|
||||
if _, err := c.w.WriteString(fmt.Sprintf("INSERT INTO %s (%s) VALUES %s;\n", c.quotedTable, c.columnList, rowValues)); err != nil {
|
||||
return err
|
||||
}
|
||||
c.rowCount++
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *sqlInsertExportConsumer) appendBatchRow(rowValues string) error {
|
||||
if c.pendingRows > 0 {
|
||||
separatorLen := 2
|
||||
if c.mode == sqlInsertExportModeInsertAll {
|
||||
separatorLen = 3
|
||||
}
|
||||
if c.pendingRows >= sqlExportInsertBatchMaxRows || c.statementBuf.Len()+len(rowValues)+separatorLen >= sqlExportInsertBatchMaxBytes {
|
||||
if err := c.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch c.mode {
|
||||
case sqlInsertExportModeMultiValues:
|
||||
if c.pendingRows == 0 {
|
||||
c.statementBuf.WriteString("INSERT INTO ")
|
||||
c.statementBuf.WriteString(c.quotedTable)
|
||||
c.statementBuf.WriteString(" (")
|
||||
c.statementBuf.WriteString(c.columnList)
|
||||
c.statementBuf.WriteString(") VALUES ")
|
||||
} else {
|
||||
c.statementBuf.WriteString(",\n")
|
||||
}
|
||||
c.statementBuf.WriteString(rowValues)
|
||||
case sqlInsertExportModeInsertAll:
|
||||
if c.pendingRows == 0 {
|
||||
c.statementBuf.WriteString("INSERT ALL\n")
|
||||
}
|
||||
c.statementBuf.WriteString(" INTO ")
|
||||
c.statementBuf.WriteString(c.quotedTable)
|
||||
c.statementBuf.WriteString(" (")
|
||||
c.statementBuf.WriteString(c.columnList)
|
||||
c.statementBuf.WriteString(") VALUES ")
|
||||
c.statementBuf.WriteString(rowValues)
|
||||
c.statementBuf.WriteByte('\n')
|
||||
default:
|
||||
if _, err := c.w.WriteString(fmt.Sprintf("INSERT INTO %s (%s) VALUES %s;\n", c.quotedTable, c.columnList, rowValues)); err != nil {
|
||||
return err
|
||||
}
|
||||
c.rowCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
c.pendingRows++
|
||||
if c.pendingRows >= sqlExportInsertBatchMaxRows || c.statementBuf.Len() >= sqlExportInsertBatchMaxBytes {
|
||||
return c.Flush()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *sqlInsertExportConsumer) Flush() error {
|
||||
if c == nil || c.pendingRows == 0 {
|
||||
return nil
|
||||
}
|
||||
switch c.mode {
|
||||
case sqlInsertExportModeMultiValues:
|
||||
c.statementBuf.WriteString(";\n")
|
||||
case sqlInsertExportModeInsertAll:
|
||||
c.statementBuf.WriteString("SELECT 1 FROM DUAL;\n")
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
if _, err := c.w.WriteString(c.statementBuf.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
c.rowCount++
|
||||
c.rowCount += int64(c.pendingRows)
|
||||
c.pendingRows = 0
|
||||
c.statementBuf.Reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -729,6 +730,60 @@ func BenchmarkExportQueryResultToFile_XLSX_StreamValues_20000Rows(b *testing.B)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDumpTableSQL_SQLBackup_StreamMap_20000Rows(b *testing.B) {
|
||||
rows, columns := benchmarkExportRows(20000)
|
||||
streamDB := &fakeStreamExportDB{
|
||||
streamCols: columns,
|
||||
streamData: rows,
|
||||
}
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
writer := bufio.NewWriterSize(io.Discard, 1024*1024)
|
||||
if err := dumpTableSQL(
|
||||
writer,
|
||||
streamDB,
|
||||
connection.ConnectionConfig{Type: "mysql"},
|
||||
"app",
|
||||
"users",
|
||||
false,
|
||||
true,
|
||||
map[string]string{},
|
||||
); err != nil {
|
||||
b.Fatalf("SQL 备份导出失败: %v", err)
|
||||
}
|
||||
if err := writer.Flush(); err != nil {
|
||||
b.Fatalf("flush SQL 备份失败: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDumpTableSQL_SQLBackup_StreamValues_20000Rows(b *testing.B) {
|
||||
rows, columns := benchmarkExportRowValues(20000)
|
||||
streamDB := &fakeValueStreamExportDB{
|
||||
streamCols: columns,
|
||||
streamValues: rows,
|
||||
}
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
writer := bufio.NewWriterSize(io.Discard, 1024*1024)
|
||||
if err := dumpTableSQL(
|
||||
writer,
|
||||
streamDB,
|
||||
connection.ConnectionConfig{Type: "mysql"},
|
||||
"app",
|
||||
"users",
|
||||
false,
|
||||
true,
|
||||
map[string]string{},
|
||||
); err != nil {
|
||||
b.Fatalf("SQL 备份导出失败: %v", err)
|
||||
}
|
||||
if err := writer.Flush(); err != nil {
|
||||
b.Fatalf("flush SQL 备份失败: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFormatImportSQLValue_NormalizesTimestampWithoutTimezone(t *testing.T) {
|
||||
got := formatImportSQLValue("postgres", "timestamp without time zone", "2026-01-21T18:32:26+08:00")
|
||||
if got != "'2026-01-21 18:32:26'" {
|
||||
@@ -808,6 +863,81 @@ func TestDumpTableSQL_PostgresBooleanBackupUsesBooleanLiterals(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDumpTableSQL_MySQLBackupBatchesRowsIntoMultiValueInsert(t *testing.T) {
|
||||
fake := &fakeValueStreamExportDB{
|
||||
streamCols: []string{"id", "name"},
|
||||
streamValues: [][]interface{}{
|
||||
{1, "alice"},
|
||||
{2, "bob"},
|
||||
{3, "carol"},
|
||||
},
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
writer := bufio.NewWriter(&buf)
|
||||
|
||||
err := dumpTableSQL(
|
||||
writer,
|
||||
fake,
|
||||
connection.ConnectionConfig{Type: "mysql"},
|
||||
"app",
|
||||
"users",
|
||||
false,
|
||||
true,
|
||||
map[string]string{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("dumpTableSQL 返回错误: %v", err)
|
||||
}
|
||||
if err := writer.Flush(); err != nil {
|
||||
t.Fatalf("flush 导出 SQL 失败: %v", err)
|
||||
}
|
||||
|
||||
content := buf.String()
|
||||
if strings.Count(content, "INSERT INTO `app`.`users`") != 1 {
|
||||
t.Fatalf("MySQL 备份应合并为单条批量 INSERT,content=%s", content)
|
||||
}
|
||||
if !strings.Contains(content, "VALUES (1, 'alice'),\n(2, 'bob'),\n(3, 'carol');") {
|
||||
t.Fatalf("MySQL 批量 INSERT 内容异常,content=%s", content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDumpTableSQL_OracleBackupBatchesRowsIntoInsertAll(t *testing.T) {
|
||||
fake := &fakeValueStreamExportDB{
|
||||
streamCols: []string{"id", "name"},
|
||||
streamValues: [][]interface{}{
|
||||
{1, "alice"},
|
||||
{2, "bob"},
|
||||
},
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
writer := bufio.NewWriter(&buf)
|
||||
|
||||
err := dumpTableSQL(
|
||||
writer,
|
||||
fake,
|
||||
connection.ConnectionConfig{Type: "oracle"},
|
||||
"APP",
|
||||
"USERS",
|
||||
false,
|
||||
true,
|
||||
map[string]string{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("dumpTableSQL 返回错误: %v", err)
|
||||
}
|
||||
if err := writer.Flush(); err != nil {
|
||||
t.Fatalf("flush 导出 SQL 失败: %v", err)
|
||||
}
|
||||
|
||||
content := buf.String()
|
||||
if strings.Count(content, "INSERT ALL") != 1 {
|
||||
t.Fatalf("Oracle 备份应合并为单条 INSERT ALL,content=%s", content)
|
||||
}
|
||||
if !strings.Contains(content, "INTO \"APP\".\"USERS\" (\"id\", \"name\") VALUES (1, 'alice')\n INTO \"APP\".\"USERS\" (\"id\", \"name\") VALUES (2, 'bob')\nSELECT 1 FROM DUAL;") {
|
||||
t.Fatalf("Oracle INSERT ALL 内容异常,content=%s", content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterExportObjectsBySchema_PostgresQualifiedObjectsOnly(t *testing.T) {
|
||||
got := filterExportObjectsBySchema(
|
||||
connection.ConnectionConfig{Type: "postgres"},
|
||||
|
||||
Reference in New Issue
Block a user