perf(query): 批量写语句走一次性 Exec 减少网络往返,修复大量 INSERT 执行慢问题

- 新增 BatchWriteExecer 可选接口(ExecBatchContext)
- MySQL/MariaDB/Doris/PostgreSQL/SQLite/DuckDB 实现该接口
- DBQueryMulti 检测到纯写操作时走批量路径,500 条 INSERT 从 500 次网络往返降至 1 次
- 混合语句(SELECT + INSERT)及不支持该接口的驱动继续走原有逐条执行路径
This commit is contained in:
tianqijiuyun-latiao
2026-03-29 12:17:37 +08:00
parent ed4a7b96d4
commit c85de27aac
7 changed files with 104 additions and 0 deletions

View File

@@ -581,6 +581,48 @@ func (a *App) DBQueryMulti(config connection.ConnectionConfig, dbName string, qu
}
}
// 全部为写操作且驱动支持批量 Exec → 一次性发送,大幅减少网络往返
// 适用于 MySQL/MariaDB/Doris/PostgreSQL/SQLite/DuckDB 等支持多语句 Exec 的驱动
if !allReadOnly {
allWrite := true
for _, stmt := range statements {
if strings.TrimSpace(stmt) != "" && isReadOnlySQLQuery(runConfig.Type, stmt) {
allWrite = false
break
}
}
if allWrite {
if batcher, ok := dbInst.(db.BatchWriteExecer); ok {
affected, batchErr := batcher.ExecBatchContext(ctx, query)
if batchErr != nil && shouldRefreshCachedConnection(batchErr) {
if a.invalidateCachedDatabase(runConfig, batchErr) {
retryInst, retryErr := a.getDatabaseForcePing(runConfig)
if retryErr != nil {
logger.Error(retryErr, "DBQueryMulti 批量写重建连接失败:%s", formatConnSummary(runConfig))
return connection.QueryResult{Success: false, Message: retryErr.Error(), QueryID: queryID}
}
if retryBatcher, ok2 := retryInst.(db.BatchWriteExecer); ok2 {
affected, batchErr = retryBatcher.ExecBatchContext(ctx, query)
}
}
}
if batchErr != nil {
logger.Error(batchErr, "DBQueryMulti 批量写执行失败:%s SQL片段=%q", formatConnSummary(runConfig), sqlSnippet(query))
return connection.QueryResult{Success: false, Message: batchErr.Error(), QueryID: queryID}
}
logger.Infof("DBQueryMulti 批量写执行成功:%s 语句数=%d affectedRows=%d", formatConnSummary(runConfig), len(statements), affected)
return connection.QueryResult{
Success: true,
Data: []connection.ResultSetData{{
Rows: []map[string]interface{}{{"affectedRows": affected}},
Columns: []string{"affectedRows"},
}},
QueryID: queryID,
}
}
}
}
var resultSets []connection.ResultSetData
for idx, stmt := range statements {
stmt = strings.TrimSpace(stmt)

View File

@@ -50,6 +50,13 @@ type MultiResultQuerierContext interface {
QueryMultiContext(ctx context.Context, query string) ([]connection.ResultSetData, error)
}
// BatchWriteExecer 是可选接口,支持将多条写语句一次性批量发送执行。
// 驱动的底层连接需支持多语句协议(如 MySQL multiStatements=true、PostgreSQL 原生多语句)。
// 实现此接口可大幅减少批量 INSERT/UPDATE/DELETE 的网络往返次数。
type BatchWriteExecer interface {
ExecBatchContext(ctx context.Context, query string) (int64, error)
}
// BatchApplier 定义了批量变更提交接口。
// 支持批量编辑的驱动实现此接口,用于一次性提交前端 DataGrid 中的增删改操作。
type BatchApplier interface {

View File

@@ -90,6 +90,17 @@ func (d *DuckDB) Query(query string) ([]map[string]interface{}, []string, error)
return scanRows(rows)
}
func (d *DuckDB) ExecBatchContext(ctx context.Context, query string) (int64, error) {
if d.conn == nil {
return 0, fmt.Errorf("连接未打开")
}
res, err := d.conn.ExecContext(ctx, query)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (d *DuckDB) ExecContext(ctx context.Context, query string) (int64, error) {
if d.conn == nil {
return 0, fmt.Errorf("连接未打开")

View File

@@ -135,6 +135,17 @@ func (m *MariaDB) Query(query string) ([]map[string]interface{}, []string, error
return scanRows(rows)
}
func (m *MariaDB) ExecBatchContext(ctx context.Context, query string) (int64, error) {
if m.conn == nil {
return 0, fmt.Errorf("连接未打开")
}
res, err := m.conn.ExecContext(ctx, query)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (m *MariaDB) ExecContext(ctx context.Context, query string) (int64, error) {
if m.conn == nil {
return 0, fmt.Errorf("连接未打开")

View File

@@ -329,6 +329,17 @@ func (m *MySQLDB) Query(query string) ([]map[string]interface{}, []string, error
return scanRows(rows)
}
func (m *MySQLDB) ExecBatchContext(ctx context.Context, query string) (int64, error) {
if m.conn == nil {
return 0, fmt.Errorf("连接未打开")
}
res, err := m.conn.ExecContext(ctx, query)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (m *MySQLDB) ExecContext(ctx context.Context, query string) (int64, error) {
if m.conn == nil {
return 0, fmt.Errorf("连接未打开")

View File

@@ -233,6 +233,17 @@ func (p *PostgresDB) Query(query string) ([]map[string]interface{}, []string, er
return scanRows(rows)
}
func (p *PostgresDB) ExecBatchContext(ctx context.Context, query string) (int64, error) {
if p.conn == nil {
return 0, fmt.Errorf("连接未打开")
}
res, err := p.conn.ExecContext(ctx, query)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (p *PostgresDB) ExecContext(ctx context.Context, query string) (int64, error) {
if p.conn == nil {
return 0, fmt.Errorf("连接未打开")

View File

@@ -222,6 +222,17 @@ func (s *SQLiteDB) Query(query string) ([]map[string]interface{}, []string, erro
return scanRows(rows)
}
func (s *SQLiteDB) ExecBatchContext(ctx context.Context, query string) (int64, error) {
if s.conn == nil {
return 0, fmt.Errorf("连接未打开")
}
res, err := s.conn.ExecContext(ctx, query)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (s *SQLiteDB) ExecContext(ctx context.Context, query string) (int64, error) {
if s.conn == nil {
return 0, fmt.Errorf("连接未打开")