mirror of
https://github.com/Syngnat/GoNavi.git
synced 2026-05-12 12:19:47 +08:00
- 前端改用通用 DB API,避免强制走 MySQL 接口导致 PostgreSQL 等连接异常 - 后端统一各数据源 timeout(Ping 超时 + 连接参数注入) - DSN 生成兼容特殊字符密码(Postgres/Oracle/达梦/金仓) - 增加文件日志与错误链输出,连接失败提示日志路径便于排障
205 lines
6.7 KiB
Go
205 lines
6.7 KiB
Go
package sync
|
|
|
|
import (
|
|
"GoNavi-Wails/internal/connection"
|
|
"GoNavi-Wails/internal/db"
|
|
"GoNavi-Wails/internal/logger"
|
|
"fmt"
|
|
"strings"
|
|
)
|
|
|
|
// SyncConfig defines the parameters for a synchronization task
|
|
type SyncConfig struct {
|
|
SourceConfig connection.ConnectionConfig `json:"sourceConfig"`
|
|
TargetConfig connection.ConnectionConfig `json:"targetConfig"`
|
|
Tables []string `json:"tables"` // Tables to sync
|
|
Mode string `json:"mode"` // "insert_update", "full_overwrite"
|
|
}
|
|
|
|
// SyncResult holds the result of the sync operation
|
|
type SyncResult struct {
|
|
Success bool `json:"success"`
|
|
Message string `json:"message"`
|
|
Logs []string `json:"logs"`
|
|
TablesSynced int `json:"tablesSynced"`
|
|
RowsInserted int `json:"rowsInserted"`
|
|
RowsUpdated int `json:"rowsUpdated"`
|
|
RowsDeleted int `json:"rowsDeleted"`
|
|
}
|
|
|
|
type SyncEngine struct {
|
|
}
|
|
|
|
func NewSyncEngine() *SyncEngine {
|
|
return &SyncEngine{}
|
|
}
|
|
|
|
// CompareAndSync performs the synchronization
|
|
func (s *SyncEngine) RunSync(config SyncConfig) SyncResult {
|
|
result := SyncResult{Success: true, Logs: []string{}}
|
|
logger.Infof("开始数据同步:源=%s 目标=%s 表数量=%d", formatConnSummaryForSync(config.SourceConfig), formatConnSummaryForSync(config.TargetConfig), len(config.Tables))
|
|
|
|
sourceDB, err := db.NewDatabase(config.SourceConfig.Type)
|
|
if err != nil {
|
|
logger.Error(err, "初始化源数据库驱动失败:类型=%s", config.SourceConfig.Type)
|
|
return s.fail(result, "初始化源数据库驱动失败: "+err.Error())
|
|
}
|
|
if config.SourceConfig.Type == "custom" {
|
|
// Custom DB setup would go here if needed
|
|
}
|
|
|
|
targetDB, err := db.NewDatabase(config.TargetConfig.Type)
|
|
if err != nil {
|
|
logger.Error(err, "初始化目标数据库驱动失败:类型=%s", config.TargetConfig.Type)
|
|
return s.fail(result, "初始化目标数据库驱动失败: "+err.Error())
|
|
}
|
|
|
|
// Connect Source
|
|
result.Logs = append(result.Logs, fmt.Sprintf("正在连接源数据库: %s...", config.SourceConfig.Host))
|
|
if err := sourceDB.Connect(config.SourceConfig); err != nil {
|
|
logger.Error(err, "源数据库连接失败:%s", formatConnSummaryForSync(config.SourceConfig))
|
|
return s.fail(result, "源数据库连接失败: "+err.Error())
|
|
}
|
|
defer sourceDB.Close()
|
|
|
|
// Connect Target
|
|
result.Logs = append(result.Logs, fmt.Sprintf("正在连接目标数据库: %s...", config.TargetConfig.Host))
|
|
if err := targetDB.Connect(config.TargetConfig); err != nil {
|
|
logger.Error(err, "目标数据库连接失败:%s", formatConnSummaryForSync(config.TargetConfig))
|
|
return s.fail(result, "目标数据库连接失败: "+err.Error())
|
|
}
|
|
defer targetDB.Close()
|
|
|
|
// Iterate Tables
|
|
for _, tableName := range config.Tables {
|
|
result.Logs = append(result.Logs, fmt.Sprintf("正在同步表: %s", tableName))
|
|
|
|
// 1. Get Columns & PKs (Naive approach: assume same schema)
|
|
cols, err := sourceDB.GetColumns(config.SourceConfig.Database, tableName)
|
|
if err != nil {
|
|
logger.Error(err, "获取源表列信息失败:表=%s", tableName)
|
|
result.Logs = append(result.Logs, fmt.Sprintf("获取表 %s 的列信息失败: %v", tableName, err))
|
|
continue
|
|
}
|
|
|
|
pkCol := ""
|
|
for _, col := range cols {
|
|
if col.Key == "PRI" || col.Key == "PK" {
|
|
pkCol = col.Name
|
|
break
|
|
}
|
|
}
|
|
|
|
if pkCol == "" {
|
|
result.Logs = append(result.Logs, fmt.Sprintf("跳过表 %s: 未找到主键 (同步需要主键)", tableName))
|
|
continue
|
|
}
|
|
|
|
// 2. Fetch Data (MEMORY INTENSIVE - PROTOTYPE ONLY)
|
|
// TODO: Implement paging/streaming
|
|
sourceRows, _, err := sourceDB.Query(fmt.Sprintf("SELECT * FROM %s", tableName))
|
|
if err != nil {
|
|
logger.Error(err, "读取源表失败:表=%s", tableName)
|
|
result.Logs = append(result.Logs, fmt.Sprintf("读取源表 %s 失败: %v", tableName, err))
|
|
continue
|
|
}
|
|
|
|
targetRows, _, err := targetDB.Query(fmt.Sprintf("SELECT * FROM %s", tableName))
|
|
if err != nil {
|
|
logger.Error(err, "读取目标表失败:表=%s", tableName)
|
|
// Table might not exist in target?
|
|
// Check if error is "table not found" -> Try to Create?
|
|
// For now, assume table exists.
|
|
result.Logs = append(result.Logs, fmt.Sprintf("读取目标表 %s 失败: %v", tableName, err))
|
|
continue
|
|
}
|
|
|
|
// 3. Compare (In-Memory Hash Map)
|
|
targetMap := make(map[string]map[string]interface{})
|
|
for _, row := range targetRows {
|
|
pkVal := fmt.Sprintf("%v", row[pkCol])
|
|
targetMap[pkVal] = row
|
|
}
|
|
|
|
var inserts []map[string]interface{}
|
|
var updates []connection.UpdateRow
|
|
// var deletes []map[string]interface{} // Not implemented in "insert_update" mode usually
|
|
|
|
for _, sRow := range sourceRows {
|
|
pkVal := fmt.Sprintf("%v", sRow[pkCol])
|
|
|
|
if tRow, exists := targetMap[pkVal]; exists {
|
|
// Update? Compare values
|
|
// Simplified: Compare string representations or iterate keys
|
|
// For prototype: assume update if exists
|
|
// Optimization: Check diff
|
|
changes := make(map[string]interface{})
|
|
for k, v := range sRow {
|
|
if fmt.Sprintf("%v", v) != fmt.Sprintf("%v", tRow[k]) {
|
|
changes[k] = v
|
|
}
|
|
}
|
|
if len(changes) > 0 {
|
|
updates = append(updates, connection.UpdateRow{
|
|
Keys: map[string]interface{}{pkCol: pkVal},
|
|
Values: changes,
|
|
})
|
|
}
|
|
} else {
|
|
// Insert
|
|
inserts = append(inserts, sRow)
|
|
}
|
|
}
|
|
|
|
// 4. Apply Changes
|
|
changeSet := connection.ChangeSet{
|
|
Inserts: inserts,
|
|
Updates: updates,
|
|
}
|
|
|
|
if len(inserts) > 0 || len(updates) > 0 {
|
|
result.Logs = append(result.Logs, fmt.Sprintf(" -> 需插入: %d 行, 需更新: %d 行", len(inserts), len(updates)))
|
|
|
|
// We need a BatchApplier interface or assume Database implements ApplyChanges
|
|
if applier, ok := targetDB.(db.BatchApplier); ok {
|
|
if err := applier.ApplyChanges(tableName, changeSet); err != nil {
|
|
result.Logs = append(result.Logs, fmt.Sprintf(" -> 应用变更失败: %v", err))
|
|
} else {
|
|
result.RowsInserted += len(inserts)
|
|
result.RowsUpdated += len(updates)
|
|
}
|
|
} else {
|
|
result.Logs = append(result.Logs, " -> 目标驱动不支持应用数据变更 (ApplyChanges).")
|
|
}
|
|
} else {
|
|
result.Logs = append(result.Logs, " -> 数据一致,无需变更.")
|
|
}
|
|
|
|
result.TablesSynced++
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func formatConnSummaryForSync(config connection.ConnectionConfig) string {
|
|
timeoutSeconds := config.Timeout
|
|
if timeoutSeconds <= 0 {
|
|
timeoutSeconds = 30
|
|
}
|
|
|
|
dbName := strings.TrimSpace(config.Database)
|
|
if dbName == "" {
|
|
dbName = "(default)"
|
|
}
|
|
|
|
return fmt.Sprintf("类型=%s 地址=%s:%d 数据库=%s 用户=%s 超时=%ds",
|
|
config.Type, config.Host, config.Port, dbName, config.User, timeoutSeconds)
|
|
}
|
|
|
|
func (s *SyncEngine) fail(res SyncResult, msg string) SyncResult {
|
|
res.Success = false
|
|
res.Message = msg
|
|
res.Logs = append(res.Logs, "致命错误: "+msg)
|
|
return res
|
|
}
|