mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-06-05 14:09:36 +08:00
* feat: SQL执行中时,增加取消执行功能 (#172)
Co-authored-by: liujie <469282686@qq.com>
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
"GoNavi-Wails/internal/db"
|
||||
"GoNavi-Wails/internal/logger"
|
||||
proxytunnel "GoNavi-Wails/internal/proxy"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const dbCachePingInterval = 30 * time.Second
|
||||
@@ -25,19 +26,27 @@ type cachedDatabase struct {
|
||||
lastPing time.Time
|
||||
}
|
||||
|
||||
type queryContext struct {
|
||||
cancel context.CancelFunc
|
||||
started time.Time
|
||||
}
|
||||
|
||||
// App struct
|
||||
type App struct {
|
||||
ctx context.Context
|
||||
dbCache map[string]cachedDatabase // Cache for DB connections
|
||||
mu sync.RWMutex // Mutex for cache access
|
||||
updateMu sync.Mutex
|
||||
updateState updateState
|
||||
ctx context.Context
|
||||
dbCache map[string]cachedDatabase // Cache for DB connections
|
||||
mu sync.RWMutex // Mutex for cache access
|
||||
updateMu sync.Mutex
|
||||
updateState updateState
|
||||
queryMu sync.RWMutex
|
||||
runningQueries map[string]queryContext // queryID -> cancelFunc and start time
|
||||
}
|
||||
|
||||
// NewApp creates a new App application struct
|
||||
func NewApp() *App {
|
||||
return &App{
|
||||
dbCache: make(map[string]cachedDatabase),
|
||||
dbCache: make(map[string]cachedDatabase),
|
||||
runningQueries: make(map[string]queryContext),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,3 +417,43 @@ func (a *App) getDatabaseWithPing(config connection.ConnectionConfig, forcePing
|
||||
logger.Infof("数据库连接成功并写入缓存:%s 缓存Key=%s", formatConnSummary(effectiveConfig), shortKey)
|
||||
return dbInst, nil
|
||||
}
|
||||
|
||||
// generateQueryID generates a unique ID for a query using UUID v4
|
||||
func generateQueryID() string {
|
||||
return "query-" + uuid.New().String()
|
||||
}
|
||||
|
||||
// CancelQuery cancels a running query by its ID
|
||||
func (a *App) CancelQuery(queryID string) connection.QueryResult {
|
||||
a.queryMu.Lock()
|
||||
defer a.queryMu.Unlock()
|
||||
|
||||
if ctx, exists := a.runningQueries[queryID]; exists {
|
||||
ctx.cancel()
|
||||
delete(a.runningQueries, queryID)
|
||||
logger.Infof("查询已取消:queryID=%s", queryID)
|
||||
return connection.QueryResult{Success: true, Message: "查询已取消"}
|
||||
}
|
||||
logger.Warnf("取消查询失败:queryID=%s 不存在或已完成", queryID)
|
||||
return connection.QueryResult{Success: false, Message: "查询不存在或已完成"}
|
||||
}
|
||||
|
||||
// CleanupStaleQueries removes queries older than maxAge
|
||||
func (a *App) CleanupStaleQueries(maxAge time.Duration) {
|
||||
a.queryMu.Lock()
|
||||
defer a.queryMu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for id, ctx := range a.runningQueries {
|
||||
if now.Sub(ctx.started) > maxAge {
|
||||
// Query likely finished or stuck, remove from tracking
|
||||
delete(a.runningQueries, id)
|
||||
// Query expired, silently remove
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateQueryID generates a unique query ID for cancellation tracking
|
||||
func (a *App) GenerateQueryID() string {
|
||||
return generateQueryID()
|
||||
}
|
||||
|
||||
@@ -376,12 +376,21 @@ func (a *App) MySQLShowCreateTable(config connection.ConnectionConfig, dbName st
|
||||
}
|
||||
|
||||
func (a *App) DBQuery(config connection.ConnectionConfig, dbName string, query string) connection.QueryResult {
|
||||
return a.DBQueryWithCancel(config, dbName, query, "")
|
||||
}
|
||||
|
||||
func (a *App) DBQueryWithCancel(config connection.ConnectionConfig, dbName string, query string, queryID string) connection.QueryResult {
|
||||
runConfig := normalizeRunConfig(config, dbName)
|
||||
|
||||
// Generate query ID if not provided
|
||||
if queryID == "" {
|
||||
queryID = generateQueryID()
|
||||
}
|
||||
|
||||
dbInst, err := a.getDatabase(runConfig)
|
||||
if err != nil {
|
||||
logger.Error(err, "DBQuery 获取连接失败:%s", formatConnSummary(runConfig))
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
return connection.QueryResult{Success: false, Message: err.Error(), QueryID: queryID}
|
||||
}
|
||||
|
||||
query = sanitizeSQLForPgLike(runConfig.Type, query)
|
||||
@@ -392,6 +401,21 @@ func (a *App) DBQuery(config connection.ConnectionConfig, dbName string, query s
|
||||
ctx, cancel := utils.ContextWithTimeout(time.Duration(timeoutSeconds) * time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Store cancel function for potential manual cancellation
|
||||
a.queryMu.Lock()
|
||||
a.runningQueries[queryID] = queryContext{
|
||||
cancel: cancel,
|
||||
started: time.Now(),
|
||||
}
|
||||
a.queryMu.Unlock()
|
||||
|
||||
// Ensure query is removed from tracking when done
|
||||
defer func() {
|
||||
a.queryMu.Lock()
|
||||
delete(a.runningQueries, queryID)
|
||||
a.queryMu.Unlock()
|
||||
}()
|
||||
|
||||
lowerQuery := strings.TrimSpace(strings.ToLower(query))
|
||||
isReadQuery := strings.HasPrefix(lowerQuery, "select") || strings.HasPrefix(lowerQuery, "show") || strings.HasPrefix(lowerQuery, "describe") || strings.HasPrefix(lowerQuery, "explain")
|
||||
// MongoDB JSON 命令中的 find/count/aggregate 也属于读查询
|
||||
@@ -410,9 +434,9 @@ func (a *App) DBQuery(config connection.ConnectionConfig, dbName string, query s
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error(err, "DBQuery 查询失败:%s SQL片段=%q", formatConnSummary(runConfig), sqlSnippet(query))
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
return connection.QueryResult{Success: false, Message: err.Error(), QueryID: queryID}
|
||||
}
|
||||
return connection.QueryResult{Success: true, Data: data, Fields: columns}
|
||||
return connection.QueryResult{Success: true, Data: data, Fields: columns, QueryID: queryID}
|
||||
} else {
|
||||
var affected int64
|
||||
if e, ok := dbInst.(interface {
|
||||
@@ -424,9 +448,9 @@ func (a *App) DBQuery(config connection.ConnectionConfig, dbName string, query s
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error(err, "DBQuery 执行失败:%s SQL片段=%q", formatConnSummary(runConfig), sqlSnippet(query))
|
||||
return connection.QueryResult{Success: false, Message: err.Error()}
|
||||
return connection.QueryResult{Success: false, Message: err.Error(), QueryID: queryID}
|
||||
}
|
||||
return connection.QueryResult{Success: true, Data: map[string]int64{"affectedRows": affected}}
|
||||
return connection.QueryResult{Success: true, Data: map[string]int64{"affectedRows": affected}, QueryID: queryID}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
149
internal/app/methods_db_cancel_test.go
Normal file
149
internal/app/methods_db_cancel_test.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"GoNavi-Wails/internal/connection"
|
||||
)
|
||||
|
||||
func TestGenerateQueryID(t *testing.T) {
|
||||
app := NewApp()
|
||||
id := app.GenerateQueryID()
|
||||
if id == "" {
|
||||
t.Fatal("GenerateQueryID returned empty string")
|
||||
}
|
||||
// Should start with "query-"
|
||||
if !strings.HasPrefix(id, "query-") {
|
||||
t.Fatalf("Expected query ID to start with 'query-', got: %s", id)
|
||||
}
|
||||
// Should be reasonably unique (not equal to another generated ID)
|
||||
id2 := app.GenerateQueryID()
|
||||
if id == id2 {
|
||||
t.Fatal("Two consecutive GenerateQueryID calls returned identical IDs")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelQuery_NonExistent(t *testing.T) {
|
||||
app := NewApp()
|
||||
res := app.CancelQuery("non-existent-query-id")
|
||||
if res.Success {
|
||||
t.Fatal("CancelQuery should fail for non-existent query ID")
|
||||
}
|
||||
if !strings.Contains(res.Message, "不存在") && !strings.Contains(res.Message, "not exist") {
|
||||
t.Fatalf("Expected error message about query not existing, got: %s", res.Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelQuery_ValidQuery(t *testing.T) {
|
||||
app := NewApp()
|
||||
|
||||
// First, generate a query ID and simulate a running query
|
||||
queryID := app.GenerateQueryID()
|
||||
|
||||
// Store a cancel function in runningQueries map
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
app.queryMu.Lock()
|
||||
app.runningQueries[queryID] = queryContext{
|
||||
cancel: cancel,
|
||||
started: time.Now(),
|
||||
}
|
||||
app.queryMu.Unlock()
|
||||
|
||||
// Ensure cleanup after test
|
||||
defer func() {
|
||||
app.queryMu.Lock()
|
||||
delete(app.runningQueries, queryID)
|
||||
app.queryMu.Unlock()
|
||||
}()
|
||||
|
||||
// Cancel the query
|
||||
res := app.CancelQuery(queryID)
|
||||
if !res.Success {
|
||||
t.Fatalf("CancelQuery should succeed for valid query ID, got: %s", res.Message)
|
||||
}
|
||||
|
||||
// Verify query removed from map
|
||||
app.queryMu.Lock()
|
||||
_, exists := app.runningQueries[queryID]
|
||||
app.queryMu.Unlock()
|
||||
if exists {
|
||||
t.Fatal("Query should be removed from runningQueries after cancellation")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupStaleQueries(t *testing.T) {
|
||||
app := NewApp()
|
||||
|
||||
// Add a stale query (started 2 hours ago)
|
||||
queryID := app.GenerateQueryID()
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
app.queryMu.Lock()
|
||||
app.runningQueries[queryID] = queryContext{
|
||||
cancel: cancel,
|
||||
started: time.Now().Add(-2 * time.Hour),
|
||||
}
|
||||
app.queryMu.Unlock()
|
||||
|
||||
// Cleanup queries older than 1 hour
|
||||
app.CleanupStaleQueries(1 * time.Hour)
|
||||
|
||||
// Verify stale query was removed
|
||||
app.queryMu.Lock()
|
||||
_, exists := app.runningQueries[queryID]
|
||||
app.queryMu.Unlock()
|
||||
if exists {
|
||||
t.Fatal("Stale query should be removed by CleanupStaleQueries")
|
||||
}
|
||||
|
||||
// Add a fresh query (started 30 minutes ago)
|
||||
freshID := app.GenerateQueryID()
|
||||
_, cancel2 := context.WithCancel(context.Background())
|
||||
app.queryMu.Lock()
|
||||
app.runningQueries[freshID] = queryContext{
|
||||
cancel: cancel2,
|
||||
started: time.Now().Add(-30 * time.Minute),
|
||||
}
|
||||
app.queryMu.Unlock()
|
||||
defer cancel2()
|
||||
|
||||
// Cleanup queries older than 1 hour
|
||||
app.CleanupStaleQueries(1 * time.Hour)
|
||||
|
||||
// Verify fresh query still exists
|
||||
app.queryMu.Lock()
|
||||
_, exists = app.runningQueries[freshID]
|
||||
app.queryMu.Unlock()
|
||||
if !exists {
|
||||
t.Fatal("Fresh query should not be removed by CleanupStaleQueries")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
app.queryMu.Lock()
|
||||
delete(app.runningQueries, freshID)
|
||||
app.queryMu.Unlock()
|
||||
}
|
||||
|
||||
func TestDBQueryWithCancel_QueryIDPropagation(t *testing.T) {
|
||||
// This test verifies that query ID is properly propagated in QueryResult
|
||||
// Since we can't easily mock database connections, we'll test the integration
|
||||
// by checking that DBQueryWithCancel returns a QueryResult with QueryID field
|
||||
|
||||
app := NewApp()
|
||||
|
||||
// Create a minimal config for a database type that doesn't require actual connection
|
||||
config := connection.ConnectionConfig{
|
||||
Type: "duckdb",
|
||||
Host: ":memory:", // In-memory duckdb for testing
|
||||
}
|
||||
|
||||
// This will fail because we can't actually connect, but we can test the error path
|
||||
result := app.DBQueryWithCancel(config, "", "SELECT 1", "test-query-id")
|
||||
|
||||
// The query should fail (no actual database), but QueryID should be present
|
||||
if result.QueryID != "test-query-id" {
|
||||
t.Fatalf("Expected QueryID 'test-query-id' in result, got: %s", result.QueryID)
|
||||
}
|
||||
}
|
||||
@@ -696,11 +696,11 @@ func (a *App) CheckDriverNetworkStatus() connection.QueryResult {
|
||||
}
|
||||
|
||||
data := map[string]interface{}{
|
||||
"reachable": allReachable,
|
||||
"summary": summary,
|
||||
"recommendedProxy": !allReachable,
|
||||
"proxyConfigured": proxyConfigured,
|
||||
"proxyEnv": proxyEnv,
|
||||
"reachable": allReachable,
|
||||
"summary": summary,
|
||||
"recommendedProxy": !allReachable,
|
||||
"proxyConfigured": proxyConfigured,
|
||||
"proxyEnv": proxyEnv,
|
||||
"downloadChainReachable": downloadChainReachable,
|
||||
"downloadRequiredHosts": []string{
|
||||
"github.com",
|
||||
@@ -709,8 +709,8 @@ func (a *App) CheckDriverNetworkStatus() connection.QueryResult {
|
||||
"objects.githubusercontent.com",
|
||||
"raw.githubusercontent.com",
|
||||
},
|
||||
"checkedAt": time.Now().Format(time.RFC3339),
|
||||
"checks": checks,
|
||||
"checkedAt": time.Now().Format(time.RFC3339),
|
||||
"checks": checks,
|
||||
}
|
||||
if logPath := strings.TrimSpace(logger.Path()); logPath != "" {
|
||||
data["logPath"] = logPath
|
||||
|
||||
@@ -55,6 +55,7 @@ type QueryResult struct {
|
||||
Message string `json:"message"`
|
||||
Data interface{} `json:"data"`
|
||||
Fields []string `json:"fields,omitempty"`
|
||||
QueryID string `json:"queryId,omitempty"` // Unique ID for query cancellation
|
||||
}
|
||||
|
||||
// ColumnDefinition represents a table column
|
||||
|
||||
Reference in New Issue
Block a user