From fb70f1420ca17ab96de764f658ba65cdcb959675 Mon Sep 17 00:00:00 2001 From: Syngnat Date: Wed, 18 Mar 2026 15:33:37 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(sql-file):=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=A4=A7=20SQL=20=E6=96=87=E4=BB=B6=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E6=89=A7=E8=A1=8C=EF=BC=8C=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=20WebView2=20=E5=B4=A9=E6=BA=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增流式 SQL 拆分器 sql_split_stream.go(逐行状态机) - OpenSQLFile 超过 50MB 返回文件路径而非内容 - 新增 ExecuteSQLFile 后端流式读取+拆分+逐条执行+事件推送进度 - 新增 CancelSQLFileExecution 支持中途取消 - 前端增加 SQL 文件执行进度 Modal(进度条/计数/取消/结果展示) - refs #238 --- frontend/src/components/Sidebar.tsx | 154 ++++++++++++++++++-- frontend/wailsjs/go/app/App.d.ts | 4 + frontend/wailsjs/go/app/App.js | 8 ++ internal/app/methods_file.go | 200 ++++++++++++++++++++++++++ internal/app/sql_split_stream.go | 209 ++++++++++++++++++++++++++++ 5 files changed, 567 insertions(+), 8 deletions(-) create mode 100644 internal/app/sql_split_stream.go diff --git a/frontend/src/components/Sidebar.tsx b/frontend/src/components/Sidebar.tsx index 45e8ec5..b7e7dec 100644 --- a/frontend/src/components/Sidebar.tsx +++ b/frontend/src/components/Sidebar.tsx @@ -1,5 +1,5 @@ import React, { useEffect, useState, useMemo, useRef } from 'react'; -import { Tree, message, Dropdown, MenuProps, Input, Button, Modal, Form, Badge, Checkbox, Space, Select, Popover, Tooltip } from 'antd'; +import { Tree, message, Dropdown, MenuProps, Input, Button, Modal, Form, Badge, Checkbox, Space, Select, Popover, Tooltip, Progress } from 'antd'; import { DatabaseOutlined, TableOutlined, @@ -35,7 +35,8 @@ import { Tree, message, Dropdown, MenuProps, Input, Button, Modal, Form, Badge, import { useStore } from '../store'; import { buildOverlayWorkbenchTheme } from '../utils/overlayWorkbenchTheme'; import { SavedConnection } from '../types'; - import { DBGetDatabases, DBGetTables, DBQuery, DBShowCreateTable, ExportTable, OpenSQLFile, CreateDatabase, RenameDatabase, DropDatabase, RenameTable, DropTable, DropView, DropFunction, RenameView } from '../../wailsjs/go/app/App'; + import { DBGetDatabases, DBGetTables, DBQuery, DBShowCreateTable, ExportTable, OpenSQLFile, ExecuteSQLFile, CancelSQLFileExecution, CreateDatabase, RenameDatabase, DropDatabase, RenameTable, DropTable, DropView, DropFunction, RenameView } from '../../wailsjs/go/app/App'; + import { EventsOn } from '../../wailsjs/runtime/runtime'; import { normalizeOpacityForPlatform, resolveAppearanceValues } from '../utils/appearance'; const { Search } = Input; @@ -2059,9 +2060,23 @@ const Sidebar: React.FC<{ onEditConnection?: (conn: SavedConnection) => void }> }; const handleRunSQLFile = async (node: any) => { - const res = await (window as any).go.app.App.OpenSQLFile(); + const res = await OpenSQLFile(); if (res.success) { - const sqlContent = res.data; + const data = res.data; + // 大文件:后端返回文件路径,走流式执行 + if (data && typeof data === 'object' && data.isLargeFile) { + const connId = node.type === 'connection' ? node.key : node.dataRef?.id; + const dbName = node.dataRef?.dbName || ''; + const conn = connections.find(c => c.id === connId); + if (!conn) { + message.error('未找到对应的连接配置'); + return; + } + startSQLFileExecution(conn.config, dbName, data.filePath, data.fileSizeMB); + return; + } + // 小文件:加载到编辑器 + const sqlContent = data; const { dbName, id } = node.dataRef; addTab({ id: `query-${Date.now()}`, @@ -2071,8 +2086,8 @@ const Sidebar: React.FC<{ onEditConnection?: (conn: SavedConnection) => void }> dbName: dbName, query: sqlContent }); - } else if (res.message !== "已取消") { - message.error("读取文件失败: " + res.message); + } else if (res.message !== '已取消') { + message.error('读取文件失败: ' + res.message); } }; @@ -2082,21 +2097,90 @@ const Sidebar: React.FC<{ onEditConnection?: (conn: SavedConnection) => void }> message.warning('请先选择一个连接或数据库'); return; } - const res = await (window as any).go.app.App.OpenSQLFile(); + const res = await OpenSQLFile(); if (res.success) { + const data = res.data; + // 大文件:后端流式执行 + if (data && typeof data === 'object' && data.isLargeFile) { + const conn = connections.find(c => c.id === ctx.connectionId); + if (!conn) { + message.error('未找到对应的连接配置'); + return; + } + startSQLFileExecution(conn.config, ctx.dbName || '', data.filePath, data.fileSizeMB); + return; + } + // 小文件 addTab({ id: `query-${Date.now()}`, title: `运行外部SQL文件`, type: 'query', connectionId: ctx.connectionId, dbName: ctx.dbName || undefined, - query: res.data + query: data }); } else if (res.message !== '已取消') { message.error('读取文件失败: ' + res.message); } }; + // SQL 文件流式执行状态 + const [sqlFileExecState, setSqlFileExecState] = useState<{ + open: boolean; + jobId: string; + fileSizeMB: string; + status: 'running' | 'done' | 'cancelled' | 'error'; + executed: number; + failed: number; + total: number; + percent: number; + currentSQL: string; + resultMessage: string; + }>({ + open: false, jobId: '', fileSizeMB: '', status: 'running', + executed: 0, failed: 0, total: 0, percent: 0, currentSQL: '', resultMessage: '' + }); + + const startSQLFileExecution = (config: any, dbName: string, filePath: string, fileSizeMB: string) => { + const jobId = `sqlfile-${Date.now()}`; + setSqlFileExecState({ + open: true, jobId, fileSizeMB, status: 'running', + executed: 0, failed: 0, total: 0, percent: 0, currentSQL: '', resultMessage: '' + }); + + // 监听进度事件 + const offProgress = EventsOn('sqlfile:progress', (event: any) => { + if (!event || event.jobId !== jobId) return; + setSqlFileExecState(prev => ({ + ...prev, + status: event.status || prev.status, + executed: typeof event.executed === 'number' ? event.executed : prev.executed, + failed: typeof event.failed === 'number' ? event.failed : prev.failed, + total: typeof event.total === 'number' ? event.total : prev.total, + percent: typeof event.percent === 'number' ? Math.min(100, event.percent) : prev.percent, + currentSQL: typeof event.currentSQL === 'string' ? event.currentSQL : prev.currentSQL, + })); + }); + + // 异步执行 + ExecuteSQLFile(config, dbName, filePath, jobId).then(res => { + offProgress(); + setSqlFileExecState(prev => ({ + ...prev, + status: res.success ? 'done' : (prev.status === 'cancelled' ? 'cancelled' : 'error'), + percent: 100, + resultMessage: res.message || '', + })); + }).catch(err => { + offProgress(); + setSqlFileExecState(prev => ({ + ...prev, + status: 'error', + resultMessage: String(err?.message || err), + })); + }); + }; + const handleCreateDatabase = async () => { try { const values = await createDbForm.validateFields(); @@ -4174,6 +4258,60 @@ const Sidebar: React.FC<{ onEditConnection?: (conn: SavedConnection) => void }> )} + + {/* SQL 文件流式执行进度 Modal */} + { + CancelSQLFileExecution(sqlFileExecState.jobId); + setSqlFileExecState(prev => ({ ...prev, status: 'cancelled' })); + }}> + 取消执行 + + ] : [ + + ]} + onCancel={() => { + if (sqlFileExecState.status !== 'running') { + setSqlFileExecState(prev => ({ ...prev, open: false })); + } + }} + styles={{ content: modalPanelStyle, header: { background: 'transparent', borderBottom: 'none' }, body: { paddingTop: 8 }, footer: { background: 'transparent', borderTop: 'none' } }} + > +
+ +
+
+
文件大小:{sqlFileExecState.fileSizeMB} MB
+
状态:{ + sqlFileExecState.status === 'running' ? '执行中...' : + sqlFileExecState.status === 'done' ? '✅ 完成' : + sqlFileExecState.status === 'cancelled' ? '⚠️ 已取消' : '❌ 出错' + }
+
已执行:{sqlFileExecState.executed} 条 | 失败: 0 ? '#ff4d4f' : undefined }}>{sqlFileExecState.failed}
+
+ {sqlFileExecState.currentSQL && sqlFileExecState.status === 'running' && ( +
+ {sqlFileExecState.currentSQL} +
+ )} + {sqlFileExecState.resultMessage && sqlFileExecState.status !== 'running' && ( +
+ {sqlFileExecState.resultMessage} +
+ )} +
); }; diff --git a/frontend/wailsjs/go/app/App.d.ts b/frontend/wailsjs/go/app/App.d.ts index ce28585..5b288e4 100755 --- a/frontend/wailsjs/go/app/App.d.ts +++ b/frontend/wailsjs/go/app/App.d.ts @@ -9,6 +9,8 @@ export function ApplyChanges(arg1:connection.ConnectionConfig,arg2:string,arg3:s export function CancelQuery(arg1:string):Promise; +export function CancelSQLFileExecution(arg1:string):Promise; + export function CheckDriverNetworkStatus():Promise; export function CheckForUpdates():Promise; @@ -65,6 +67,8 @@ export function DropTable(arg1:connection.ConnectionConfig,arg2:string,arg3:stri export function DropView(arg1:connection.ConnectionConfig,arg2:string,arg3:string):Promise; +export function ExecuteSQLFile(arg1:connection.ConnectionConfig,arg2:string,arg3:string,arg4:string):Promise; + export function ExportData(arg1:Array>,arg2:Array,arg3:string,arg4:string):Promise; export function ExportDatabaseSQL(arg1:connection.ConnectionConfig,arg2:string,arg3:boolean):Promise; diff --git a/frontend/wailsjs/go/app/App.js b/frontend/wailsjs/go/app/App.js index 7a5f409..dfee01f 100755 --- a/frontend/wailsjs/go/app/App.js +++ b/frontend/wailsjs/go/app/App.js @@ -10,6 +10,10 @@ export function CancelQuery(arg1) { return window['go']['app']['App']['CancelQuery'](arg1); } +export function CancelSQLFileExecution(arg1) { + return window['go']['app']['App']['CancelSQLFileExecution'](arg1); +} + export function CheckDriverNetworkStatus() { return window['go']['app']['App']['CheckDriverNetworkStatus'](); } @@ -122,6 +126,10 @@ export function DropView(arg1, arg2, arg3) { return window['go']['app']['App']['DropView'](arg1, arg2, arg3); } +export function ExecuteSQLFile(arg1, arg2, arg3, arg4) { + return window['go']['app']['App']['ExecuteSQLFile'](arg1, arg2, arg3, arg4); +} + export function ExportData(arg1, arg2, arg3, arg4) { return window['go']['app']['App']['ExportData'](arg1, arg2, arg3, arg4); } diff --git a/internal/app/methods_file.go b/internal/app/methods_file.go index 3e2c581..e484091 100644 --- a/internal/app/methods_file.go +++ b/internal/app/methods_file.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "html" + "io" "math" "os" "path/filepath" @@ -51,6 +52,27 @@ func (a *App) OpenSQLFile() connection.QueryResult { return connection.QueryResult{Success: false, Message: "已取消"} } + // 检查文件大小 + const maxSQLFileSize int64 = 50 * 1024 * 1024 // 50MB + fi, err := os.Stat(selection) + if err != nil { + return connection.QueryResult{Success: false, Message: fmt.Sprintf("无法读取文件信息: %v", err)} + } + + // 大文件:只返回文件路径和大小,不读取内容 + if fi.Size() > maxSQLFileSize { + sizeMB := float64(fi.Size()) / (1024 * 1024) + return connection.QueryResult{ + Success: true, + Data: map[string]interface{}{ + "isLargeFile": true, + "filePath": selection, + "fileSize": fi.Size(), + "fileSizeMB": fmt.Sprintf("%.1f", sizeMB), + }, + } + } + content, err := os.ReadFile(selection) if err != nil { return connection.QueryResult{Success: false, Message: err.Error()} @@ -59,6 +81,184 @@ func (a *App) OpenSQLFile() connection.QueryResult { return connection.QueryResult{Success: true, Data: string(content)} } +// ExecuteSQLFile 在后端流式读取并执行大 SQL 文件,通过事件推送进度。 +// 前端通过 EventsOn("sqlfile:progress", ...) 监听进度。 +func (a *App) ExecuteSQLFile(config connection.ConnectionConfig, dbName string, filePath string, jobID string) connection.QueryResult { + if strings.TrimSpace(filePath) == "" { + return connection.QueryResult{Success: false, Message: "文件路径为空"} + } + if strings.TrimSpace(jobID) == "" { + jobID = fmt.Sprintf("sqlfile-%d", time.Now().UnixMilli()) + } + + logger.Warnf("ExecuteSQLFile 开始:file=%s db=%s jobID=%s", filePath, dbName, jobID) + + // 获取数据库连接 + runConfig := normalizeRunConfig(config, dbName) + dbInst, err := a.getDatabase(runConfig) + if err != nil { + logger.Error(err, "ExecuteSQLFile 获取连接失败:%s", formatConnSummary(runConfig)) + return connection.QueryResult{Success: false, Message: err.Error()} + } + + // 打开文件 + f, err := os.Open(filePath) + if err != nil { + return connection.QueryResult{Success: false, Message: fmt.Sprintf("无法打开文件: %v", err)} + } + defer f.Close() + + // 获取文件大小用于计算进度 + fi, _ := f.Stat() + totalSize := fi.Size() + + // 设置取消上下文 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a.queryMu.Lock() + a.runningQueries[jobID] = queryContext{ + cancel: cancel, + started: time.Now(), + } + a.queryMu.Unlock() + defer func() { + a.queryMu.Lock() + delete(a.runningQueries, jobID) + a.queryMu.Unlock() + }() + + // 发送进度事件的辅助函数 + emitProgress := func(status string, executed, failed, total int, bytesRead int64, currentSQL string, errMsg string) { + percent := 0.0 + if totalSize > 0 { + percent = float64(bytesRead) / float64(totalSize) * 100 + if percent > 100 { + percent = 100 + } + } + runtime.EventsEmit(a.ctx, "sqlfile:progress", map[string]interface{}{ + "jobId": jobID, + "status": status, + "executed": executed, + "failed": failed, + "total": total, + "percent": percent, + "bytesRead": bytesRead, + "totalBytes": totalSize, + "currentSQL": currentSQL, + "error": errMsg, + }) + } + + emitProgress("running", 0, 0, 0, 0, "", "") + + // 使用 countingReader 追踪已读取字节数 + cr := &countingReader{r: f} + + var executedCount int + var failedCount int + var errorLogs []string + startTime := time.Now() + + _, streamErr := streamSQLFile(cr, func(index int, stmt string) error { + // 检查是否已取消 + select { + case <-ctx.Done(): + return fmt.Errorf("已取消") + default: + } + + // 执行语句 + _, execErr := dbInst.Exec(stmt) + if execErr != nil { + failedCount++ + snippet := stmt + if len(snippet) > 200 { + snippet = snippet[:200] + "..." + } + errLog := fmt.Sprintf("第 %d 条语句执行失败: %v\n SQL: %s", index+1, execErr, snippet) + errorLogs = append(errorLogs, errLog) + logger.Warnf("ExecuteSQLFile %s", errLog) + } else { + executedCount++ + } + + // 每条语句执行后推送进度(但限频:每 100 条或每秒推一次) + total := executedCount + failedCount + if total%100 == 0 || total <= 10 { + snippet := stmt + if len(snippet) > 100 { + snippet = snippet[:100] + "..." + } + emitProgress("running", executedCount, failedCount, total, cr.n, snippet, "") + } + + return nil + }) + + duration := time.Since(startTime) + + if streamErr != nil && streamErr.Error() == "已取消" { + emitProgress("cancelled", executedCount, failedCount, executedCount+failedCount, cr.n, "", "用户取消执行") + logger.Warnf("ExecuteSQLFile 已取消:executed=%d failed=%d duration=%v", executedCount, failedCount, duration) + return connection.QueryResult{ + Success: false, + Message: fmt.Sprintf("执行已取消。已执行 %d 条,失败 %d 条,耗时 %v。", executedCount, failedCount, duration.Round(time.Millisecond)), + } + } + + if streamErr != nil { + emitProgress("error", executedCount, failedCount, executedCount+failedCount, cr.n, "", streamErr.Error()) + return connection.QueryResult{ + Success: false, + Message: fmt.Sprintf("文件读取错误: %v。已执行 %d 条。", streamErr, executedCount), + } + } + + emitProgress("done", executedCount, failedCount, executedCount+failedCount, totalSize, "", "") + + summary := fmt.Sprintf("执行完成。成功 %d 条,失败 %d 条,耗时 %v。", executedCount, failedCount, duration.Round(time.Millisecond)) + if len(errorLogs) > 0 { + maxShow := 20 + if len(errorLogs) < maxShow { + maxShow = len(errorLogs) + } + summary += "\n\n错误详情(前 " + fmt.Sprintf("%d", maxShow) + " 条):\n" + strings.Join(errorLogs[:maxShow], "\n") + if len(errorLogs) > maxShow { + summary += fmt.Sprintf("\n...还有 %d 条错误未显示", len(errorLogs)-maxShow) + } + } + + logger.Warnf("ExecuteSQLFile 完成:executed=%d failed=%d duration=%v", executedCount, failedCount, duration) + return connection.QueryResult{Success: failedCount == 0, Message: summary} +} + +// CancelSQLFileExecution 取消正在执行的 SQL 文件任务。 +func (a *App) CancelSQLFileExecution(jobID string) connection.QueryResult { + a.queryMu.Lock() + defer a.queryMu.Unlock() + + if ctx, exists := a.runningQueries[jobID]; exists { + ctx.cancel() + delete(a.runningQueries, jobID) + return connection.QueryResult{Success: true, Message: "已发送取消请求"} + } + return connection.QueryResult{Success: false, Message: "未找到该任务"} +} + +// countingReader 包装 io.Reader,追踪已读取的字节数。 +type countingReader struct { + r io.Reader + n int64 +} + +func (cr *countingReader) Read(p []byte) (int, error) { + n, err := cr.r.Read(p) + cr.n += int64(n) + return n, err +} + func (a *App) ImportConfigFile() connection.QueryResult { selection, err := runtime.OpenFileDialog(a.ctx, runtime.OpenDialogOptions{ Title: "Select Config File", diff --git a/internal/app/sql_split_stream.go b/internal/app/sql_split_stream.go new file mode 100644 index 0000000..69a206b --- /dev/null +++ b/internal/app/sql_split_stream.go @@ -0,0 +1,209 @@ +package app + +import ( + "bufio" + "io" + "strings" +) + +// sqlStreamSplitter 是一个流式 SQL 语句拆分器,适用于处理大文件。 +// 调用方通过 Feed(chunk) 逐块喂入数据,通过 Flush() 获取最后一条残余语句。 +// 内部维护与 splitSQLStatements 完全一致的状态机逻辑。 +type sqlStreamSplitter struct { + cur strings.Builder + inSingle bool + inDouble bool + inBacktick bool + escaped bool + inLineComment bool + inBlockComment bool + dollarTag string +} + +// Feed 将一个 chunk 喂入拆分器,返回在此 chunk 中完成的 SQL 语句列表。 +func (s *sqlStreamSplitter) Feed(chunk []byte) []string { + var statements []string + text := string(chunk) + + for i := 0; i < len(text); i++ { + ch := text[i] + next := byte(0) + if i+1 < len(text) { + next = text[i+1] + } + + // 行注释 + if s.inLineComment { + if ch == '\n' { + s.inLineComment = false + } + s.cur.WriteByte(ch) + continue + } + + // 块注释 + if s.inBlockComment { + s.cur.WriteByte(ch) + if ch == '*' && next == '/' { + s.cur.WriteByte('/') + i++ + s.inBlockComment = false + } + continue + } + + // Dollar-quoting + if s.dollarTag != "" { + if strings.HasPrefix(text[i:], s.dollarTag) { + s.cur.WriteString(s.dollarTag) + i += len(s.dollarTag) - 1 + s.dollarTag = "" + } else { + s.cur.WriteByte(ch) + } + continue + } + + // 转义字符 + if s.escaped { + s.escaped = false + s.cur.WriteByte(ch) + continue + } + if (s.inSingle || s.inDouble) && ch == '\\' { + s.escaped = true + s.cur.WriteByte(ch) + continue + } + + // 字符串开闭 + if !s.inDouble && !s.inBacktick && ch == '\'' { + if s.inSingle && next == '\'' { + // SQL 标准转义:两个连续单引号 + s.cur.WriteByte(ch) + s.cur.WriteByte(next) + i++ + continue + } + s.inSingle = !s.inSingle + s.cur.WriteByte(ch) + continue + } + if !s.inSingle && !s.inBacktick && ch == '"' { + s.inDouble = !s.inDouble + s.cur.WriteByte(ch) + continue + } + if !s.inSingle && !s.inDouble && ch == '`' { + s.inBacktick = !s.inBacktick + s.cur.WriteByte(ch) + continue + } + + // 在引号/反引号内部不做任何判断 + if s.inSingle || s.inDouble || s.inBacktick { + s.cur.WriteByte(ch) + continue + } + + // 行注释开始 + if ch == '-' && next == '-' { + s.inLineComment = true + s.cur.WriteByte(ch) + continue + } + if ch == '#' { + s.inLineComment = true + s.cur.WriteByte(ch) + continue + } + + // 块注释开始 + if ch == '/' && next == '*' { + s.inBlockComment = true + s.cur.WriteString("/*") + i++ + continue + } + + // Dollar-quoting 开始 + if ch == '$' { + if tag := parseSQLDollarTag(text[i:]); tag != "" { + s.dollarTag = tag + s.cur.WriteString(tag) + i += len(tag) - 1 + continue + } + } + + // 分号分隔 + if ch == ';' { + stmt := strings.TrimSpace(s.cur.String()) + if stmt != "" { + statements = append(statements, stmt) + } + s.cur.Reset() + continue + } + // 全角分号 + if ch == 0xEF && i+2 < len(text) && text[i+1] == 0xBC && text[i+2] == 0x9B { + stmt := strings.TrimSpace(s.cur.String()) + if stmt != "" { + statements = append(statements, stmt) + } + s.cur.Reset() + i += 2 + continue + } + + s.cur.WriteByte(ch) + } + + return statements +} + +// Flush 返回缓冲区中剩余的不完整语句(文件结束时调用)。 +func (s *sqlStreamSplitter) Flush() string { + stmt := strings.TrimSpace(s.cur.String()) + s.cur.Reset() + return stmt +} + +// streamSQLFile 从 reader 中流式读取 SQL 并逐条回调。 +// onStatement 返回 error 时停止读取并返回该 error。 +// 返回总处理语句数和可能的错误。 +func streamSQLFile(reader io.Reader, onStatement func(index int, stmt string) error) (int, error) { + splitter := &sqlStreamSplitter{} + scanner := bufio.NewScanner(reader) + // 设置最大 token 为 4MB,处理超长单行 + const maxLineSize = 4 * 1024 * 1024 + scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize) + + count := 0 + for scanner.Scan() { + line := scanner.Bytes() + // 保持换行符,因为行注释依赖 \n 来结束 + lineWithNewline := append(line, '\n') + stmts := splitter.Feed(lineWithNewline) + for _, stmt := range stmts { + if err := onStatement(count, stmt); err != nil { + return count, err + } + count++ + } + } + + if err := scanner.Err(); err != nil { + return count, err + } + + // 处理文件末尾不以分号结尾的最后一条语句 + if last := splitter.Flush(); last != "" { + if err := onStatement(count, last); err != nil { + return count, err + } + count++ + } + + return count, nil +}