diff --git a/internal/app/methods_db.go b/internal/app/methods_db.go index 14a8bb1..59c6d1c 100644 --- a/internal/app/methods_db.go +++ b/internal/app/methods_db.go @@ -581,6 +581,48 @@ func (a *App) DBQueryMulti(config connection.ConnectionConfig, dbName string, qu } } + // 全部为写操作且驱动支持批量 Exec → 一次性发送,大幅减少网络往返 + // 适用于 MySQL/MariaDB/Doris/PostgreSQL/SQLite/DuckDB 等支持多语句 Exec 的驱动 + if !allReadOnly { + allWrite := true + for _, stmt := range statements { + if strings.TrimSpace(stmt) != "" && isReadOnlySQLQuery(runConfig.Type, stmt) { + allWrite = false + break + } + } + if allWrite { + if batcher, ok := dbInst.(db.BatchWriteExecer); ok { + affected, batchErr := batcher.ExecBatchContext(ctx, query) + if batchErr != nil && shouldRefreshCachedConnection(batchErr) { + if a.invalidateCachedDatabase(runConfig, batchErr) { + retryInst, retryErr := a.getDatabaseForcePing(runConfig) + if retryErr != nil { + logger.Error(retryErr, "DBQueryMulti 批量写重建连接失败:%s", formatConnSummary(runConfig)) + return connection.QueryResult{Success: false, Message: retryErr.Error(), QueryID: queryID} + } + if retryBatcher, ok2 := retryInst.(db.BatchWriteExecer); ok2 { + affected, batchErr = retryBatcher.ExecBatchContext(ctx, query) + } + } + } + if batchErr != nil { + logger.Error(batchErr, "DBQueryMulti 批量写执行失败:%s SQL片段=%q", formatConnSummary(runConfig), sqlSnippet(query)) + return connection.QueryResult{Success: false, Message: batchErr.Error(), QueryID: queryID} + } + logger.Infof("DBQueryMulti 批量写执行成功:%s 语句数=%d affectedRows=%d", formatConnSummary(runConfig), len(statements), affected) + return connection.QueryResult{ + Success: true, + Data: []connection.ResultSetData{{ + Rows: []map[string]interface{}{{"affectedRows": affected}}, + Columns: []string{"affectedRows"}, + }}, + QueryID: queryID, + } + } + } + } + var resultSets []connection.ResultSetData for idx, stmt := range statements { stmt = strings.TrimSpace(stmt) diff --git a/internal/db/database.go b/internal/db/database.go index 038a48a..4e6f228 100644 --- a/internal/db/database.go +++ b/internal/db/database.go @@ -50,6 +50,13 @@ type MultiResultQuerierContext interface { QueryMultiContext(ctx context.Context, query string) ([]connection.ResultSetData, error) } +// BatchWriteExecer 是可选接口,支持将多条写语句一次性批量发送执行。 +// 驱动的底层连接需支持多语句协议(如 MySQL multiStatements=true、PostgreSQL 原生多语句)。 +// 实现此接口可大幅减少批量 INSERT/UPDATE/DELETE 的网络往返次数。 +type BatchWriteExecer interface { + ExecBatchContext(ctx context.Context, query string) (int64, error) +} + // BatchApplier 定义了批量变更提交接口。 // 支持批量编辑的驱动实现此接口,用于一次性提交前端 DataGrid 中的增删改操作。 type BatchApplier interface { diff --git a/internal/db/duckdb_impl.go b/internal/db/duckdb_impl.go index 843b49b..eeefdf2 100644 --- a/internal/db/duckdb_impl.go +++ b/internal/db/duckdb_impl.go @@ -90,6 +90,17 @@ func (d *DuckDB) Query(query string) ([]map[string]interface{}, []string, error) return scanRows(rows) } +func (d *DuckDB) ExecBatchContext(ctx context.Context, query string) (int64, error) { + if d.conn == nil { + return 0, fmt.Errorf("连接未打开") + } + res, err := d.conn.ExecContext(ctx, query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (d *DuckDB) ExecContext(ctx context.Context, query string) (int64, error) { if d.conn == nil { return 0, fmt.Errorf("连接未打开") diff --git a/internal/db/mariadb_impl.go b/internal/db/mariadb_impl.go index b21b5ed..c13e83b 100644 --- a/internal/db/mariadb_impl.go +++ b/internal/db/mariadb_impl.go @@ -135,6 +135,17 @@ func (m *MariaDB) Query(query string) ([]map[string]interface{}, []string, error return scanRows(rows) } +func (m *MariaDB) ExecBatchContext(ctx context.Context, query string) (int64, error) { + if m.conn == nil { + return 0, fmt.Errorf("连接未打开") + } + res, err := m.conn.ExecContext(ctx, query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (m *MariaDB) ExecContext(ctx context.Context, query string) (int64, error) { if m.conn == nil { return 0, fmt.Errorf("连接未打开") diff --git a/internal/db/mysql_impl.go b/internal/db/mysql_impl.go index baca324..d0459f9 100644 --- a/internal/db/mysql_impl.go +++ b/internal/db/mysql_impl.go @@ -329,6 +329,17 @@ func (m *MySQLDB) Query(query string) ([]map[string]interface{}, []string, error return scanRows(rows) } +func (m *MySQLDB) ExecBatchContext(ctx context.Context, query string) (int64, error) { + if m.conn == nil { + return 0, fmt.Errorf("连接未打开") + } + res, err := m.conn.ExecContext(ctx, query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (m *MySQLDB) ExecContext(ctx context.Context, query string) (int64, error) { if m.conn == nil { return 0, fmt.Errorf("连接未打开") diff --git a/internal/db/postgres_impl.go b/internal/db/postgres_impl.go index dc6a06b..727b3dc 100644 --- a/internal/db/postgres_impl.go +++ b/internal/db/postgres_impl.go @@ -233,6 +233,17 @@ func (p *PostgresDB) Query(query string) ([]map[string]interface{}, []string, er return scanRows(rows) } +func (p *PostgresDB) ExecBatchContext(ctx context.Context, query string) (int64, error) { + if p.conn == nil { + return 0, fmt.Errorf("连接未打开") + } + res, err := p.conn.ExecContext(ctx, query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (p *PostgresDB) ExecContext(ctx context.Context, query string) (int64, error) { if p.conn == nil { return 0, fmt.Errorf("连接未打开") diff --git a/internal/db/sqlite_impl.go b/internal/db/sqlite_impl.go index 4f76866..7d74610 100644 --- a/internal/db/sqlite_impl.go +++ b/internal/db/sqlite_impl.go @@ -222,6 +222,17 @@ func (s *SQLiteDB) Query(query string) ([]map[string]interface{}, []string, erro return scanRows(rows) } +func (s *SQLiteDB) ExecBatchContext(ctx context.Context, query string) (int64, error) { + if s.conn == nil { + return 0, fmt.Errorf("连接未打开") + } + res, err := s.conn.ExecContext(ctx, query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (s *SQLiteDB) ExecContext(ctx context.Context, query string) (int64, error) { if s.conn == nil { return 0, fmt.Errorf("连接未打开")