From 7eb42aca6299278f66a47cb0e95de02003eb4612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=9B=BD=E9=94=8B?= Date: Mon, 2 Feb 2026 19:57:41 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E2=9C=A8=20feat(core):=20=E6=89=A9?= =?UTF-8?q?=E5=B1=95=E5=A4=9A=E6=BA=90=E6=95=B0=E6=8D=AE=E5=BA=93=E9=A9=B1?= =?UTF-8?q?=E5=8A=A8=E5=B9=B6=E5=AE=9E=E7=8E=B0=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 集成 go-ora, dm, gokb 驱动,封装统一的 Database 接口实现,支持自定义 DSN 连接 - 新增 SyncEngine 同步引擎,支持基于主键的增量数据比对 (Insert/Update) - 新增 DataSyncModal 组件,实现三步走同步向导逻辑,修复 Transfer 组件空状态显示问题 - 优化 ConnectionModal 交互逻辑,支持驱动参数动态显隐 - 引入 antd/locale/zh_CN,统一应用界面的中文本地化显示 --- frontend/src/App.tsx | 14 + frontend/src/components/ConnectionModal.tsx | 167 ++++++--- frontend/src/components/DataSyncModal.tsx | 226 ++++++++++++ frontend/wailsjs/go/app/App.d.ts | 3 + frontend/wailsjs/go/app/App.js | 4 + frontend/wailsjs/go/models.ts | 69 ++++ go.mod | 4 + go.sum | 10 + internal/app/methods_sync.go | 11 + internal/connection/types.go | 12 +- internal/db/custom_impl.go | 234 +++++++++++++ internal/db/dameng_impl.go | 340 ++++++++++++++++++ internal/db/database.go | 10 +- internal/db/kingbase_impl.go | 363 ++++++++++++++++++++ internal/db/oracle_impl.go | 350 +++++++++++++++++++ internal/sync/sync_engine.go | 179 ++++++++++ 16 files changed, 1950 insertions(+), 46 deletions(-) create mode 100644 frontend/src/components/DataSyncModal.tsx create mode 100644 internal/app/methods_sync.go create mode 100644 internal/db/custom_impl.go create mode 100644 internal/db/dameng_impl.go create mode 100644 internal/db/kingbase_impl.go create mode 100644 internal/db/oracle_impl.go create mode 100644 internal/sync/sync_engine.go diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index fda74d6..f4feaf2 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -1,9 +1,11 @@ import React, { useState, useEffect } from 'react'; import { Layout, Button, ConfigProvider, theme, Dropdown, MenuProps, message } from 'antd'; +import zhCN from 'antd/locale/zh_CN'; import { PlusOutlined, BulbOutlined, BulbFilled, ConsoleSqlOutlined, BugOutlined, SettingOutlined, UploadOutlined, DownloadOutlined } from '@ant-design/icons'; import Sidebar from './components/Sidebar'; import TabManager from './components/TabManager'; import ConnectionModal from './components/ConnectionModal'; +import DataSyncModal from './components/DataSyncModal'; import LogPanel from './components/LogPanel'; import { useStore } from './store'; import { SavedConnection } from './types'; @@ -13,6 +15,7 @@ const { Sider, Content } = Layout; function App() { const [isModalOpen, setIsModalOpen] = useState(false); + const [isSyncModalOpen, setIsSyncModalOpen] = useState(false); const [editingConnection, setEditingConnection] = useState(null); const { darkMode, toggleDarkMode, addTab, activeContext, connections, addConnection, tabs, activeTabId } = useStore(); @@ -77,6 +80,12 @@ function App() { }; const settingsMenu: MenuProps['items'] = [ + { + key: 'sync', + label: '数据同步', + icon: , + onClick: () => setIsSyncModalOpen(true) + }, { key: 'import', label: '导入连接配置', @@ -216,6 +225,7 @@ function App() { return ( + setIsSyncModalOpen(false)} + /> {/* Ghost Resize Line for Sidebar */}
void; initialValues?: SavedConnection | null }> = ({ open, onClose, initialValues }) => { const [form] = Form.useForm(); const [loading, setLoading] = useState(false); const [useSSH, setUseSSH] = useState(false); const [dbType, setDbType] = useState('mysql'); + const [step, setStep] = useState(1); // 1: Select Type, 2: Configure const [testResult, setTestResult] = useState<{ type: 'success' | 'error', message: string } | null>(null); const [dbList, setDbList] = useState([]); const addConnection = useStore((state) => state.addConnection); @@ -19,6 +24,8 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal setTestResult(null); // Reset test result setDbList([]); if (initialValues) { + // Edit mode: Go directly to step 2 + setStep(2); form.setFieldsValue({ type: initialValues.config.type, name: initialValues.name, @@ -34,10 +41,14 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal sshUser: initialValues.config.ssh?.user, sshPassword: initialValues.config.ssh?.password, sshKeyPath: initialValues.config.ssh?.keyPath, + driver: (initialValues.config as any).driver, + dsn: (initialValues.config as any).dsn }); setUseSSH(initialValues.config.useSSH || false); setDbType(initialValues.config.type); } else { + // Create mode: Start at step 1 + setStep(1); form.resetFields(); setUseSSH(false); setDbType('mysql'); @@ -52,7 +63,6 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal const config = await buildConfig(values); - // Use Connect to verify before saving const res = await MySQLConnect(config as any); setLoading(false); @@ -75,6 +85,7 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal form.resetFields(); setUseSSH(false); setDbType('mysql'); + setStep(1); onClose(); } else { message.error('连接失败: ' + res.message); @@ -88,13 +99,12 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal try { const values = await form.validateFields(); setLoading(true); - setTestResult(null); // Clear previous result + setTestResult(null); const config = await buildConfig(values); const res = await (window as any).go.app.App.TestConnection(config); setLoading(false); if (res.success) { setTestResult({ type: 'success', message: res.message }); - // Fetch DB List on success const dbRes = await MySQLGetDatabases(config as any); if (dbRes.success) { const dbs = (dbRes.data as any[]).map((row: any) => row.Database || row.database); @@ -119,35 +129,70 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal return { type: values.type, - host: values.host, + host: values.host || "", port: Number(values.port || 0), user: values.user || "", password: values.password || "", database: values.database || "", useSSH: !!values.useSSH, - ssh: sshConfig + ssh: sshConfig, + driver: values.driver, + dsn: values.dsn }; }; - const isSqlite = dbType === 'sqlite'; + const handleTypeSelect = (type: string) => { + setDbType(type); + form.setFieldsValue({ type: type }); + + // Auto-fill default port + let defaultPort = 3306; + switch (type) { + case 'mysql': defaultPort = 3306; break; + case 'postgres': defaultPort = 5432; break; + case 'oracle': defaultPort = 1521; break; + case 'dameng': defaultPort = 5236; break; + case 'kingbase': defaultPort = 54321; break; + default: defaultPort = 3306; + } + if (type !== 'sqlite' && type !== 'custom') { + form.setFieldsValue({ port: defaultPort }); + } - return ( - 测试连接, - , - - ]} - width={600} - zIndex={10001} // Increase z-index - destroyOnHidden // Reset on close - maskClosable={false} // Prevent accidental close by clicking mask, user must click X or Cancel - > + setStep(2); + }; + + const isSqlite = dbType === 'sqlite'; + const isCustom = dbType === 'custom'; + + const dbTypes = [ + { key: 'mysql', name: 'MySQL', icon: }, + { key: 'postgres', name: 'PostgreSQL', icon: }, + { key: 'sqlite', name: 'SQLite', icon: }, + { key: 'oracle', name: 'Oracle', icon: }, + { key: 'dameng', name: 'Dameng (达梦)', icon: }, + { key: 'kingbase', name: 'Kingbase (人大金仓)', icon: }, + { key: 'custom', name: 'Custom (自定义)', icon: }, + ]; + + const renderStep1 = () => ( + + {dbTypes.map(item => ( + + handleTypeSelect(item.key)} + style={{ textAlign: 'center', cursor: 'pointer' }} + > +
{item.icon}
+ {item.name} +
+ + ))} +
+ ); + + const renderStep2 = () => (
void; initialVal onValuesChange={(changed) => { if (testResult) setTestResult(null); // Clear result on change if (changed.useSSH !== undefined) setUseSSH(changed.useSSH); + // Type change handled by step 1, but keep sync if select changes (hidden now) if (changed.type !== undefined) setDbType(changed.type); }} > -
- - - - - - -
+ {/* Hidden Type Field to keep form value synced */} + + + + + + {isCustom ? ( + <> + + + + + + + + ) : ( + <>
@@ -233,16 +284,52 @@ const ConnectionModal: React.FC<{ open: boolean; onClose: () => void; initialVal )} )} - - - {testResult && ( + + )} + + {testResult && ( - )} + )} + + ); + + const getFooter = () => { + if (step === 1) { + return [ + + ]; + } + return [ + !initialValues && , + , + , + + ]; + }; + + const getTitle = () => { + if (step === 1) return "选择数据源类型"; + const typeName = dbTypes.find(t => t.key === dbType)?.name || dbType; + return initialValues ? "编辑连接" : `新建 ${typeName} 连接`; + }; + + return ( + + {step === 1 ? renderStep1() : renderStep2()} ); }; diff --git a/frontend/src/components/DataSyncModal.tsx b/frontend/src/components/DataSyncModal.tsx new file mode 100644 index 0000000..3d4b97d --- /dev/null +++ b/frontend/src/components/DataSyncModal.tsx @@ -0,0 +1,226 @@ +import React, { useState, useEffect } from 'react'; +import { Modal, Form, Select, Button, message, Steps, Transfer, Card, Alert, Divider, Typography } from 'antd'; +import { useStore } from '../store'; +import { DBGetDatabases, DBGetTables, DataSync } from '../../wailsjs/go/app/App'; +import { SavedConnection } from '../types'; +import { connection } from '../../wailsjs/go/models'; + +const { Title, Text } = Typography; +const { Step } = Steps; +const { Option } = Select; + +const DataSyncModal: React.FC<{ open: boolean; onClose: () => void }> = ({ open, onClose }) => { + const connections = useStore((state) => state.connections); + const [currentStep, setCurrentStep] = useState(0); + const [loading, setLoading] = useState(false); + + // Step 1: Config + const [sourceConnId, setSourceConnId] = useState(''); + const [targetConnId, setTargetConnId] = useState(''); + const [sourceDb, setSourceDb] = useState(''); + const [targetDb, setTargetDb] = useState(''); + + const [sourceDbs, setSourceDbs] = useState([]); + const [targetDbs, setTargetDbs] = useState([]); + + // Step 2: Tables + const [allTables, setAllTables] = useState([]); + const [selectedTables, setSelectedTables] = useState([]); + + // Step 3: Result + const [syncResult, setSyncResult] = useState(null); + + useEffect(() => { + if (open) { + setCurrentStep(0); + setSourceConnId(''); + setTargetConnId(''); + setSourceDb(''); + setTargetDb(''); + setSelectedTables([]); + setSyncResult(null); + } + }, [open]); + + const handleSourceConnChange = async (connId: string) => { + setSourceConnId(connId); + setSourceDb(''); + const conn = connections.find(c => c.id === connId); + if (conn) { + setLoading(true); + try { + const res = await DBGetDatabases(conn.config as any); + if (res.success) { + setSourceDbs((res.data as any[]).map((r: any) => r.Database || r.database || r.username)); + } + } catch(e) { message.error("Failed to fetch source databases"); } + setLoading(false); + } + }; + + const handleTargetConnChange = async (connId: string) => { + setTargetConnId(connId); + setTargetDb(''); + const conn = connections.find(c => c.id === connId); + if (conn) { + setLoading(true); + try { + const res = await DBGetDatabases(conn.config as any); + if (res.success) { + setTargetDbs((res.data as any[]).map((r: any) => r.Database || r.database || r.username)); + } + } catch(e) { message.error("Failed to fetch target databases"); } + setLoading(false); + } + }; + + const nextToTables = async () => { + if (!sourceConnId || !targetConnId) return message.error("Select connections first"); + if (!sourceDb) return message.error("Select source database"); + if (!targetDb) return message.error("Select target database"); + + setLoading(true); + try { + const conn = connections.find(c => c.id === sourceConnId); + if (conn) { + const config = { ...conn.config, database: sourceDb }; + const res = await DBGetTables(config as any, sourceDb); + if (res.success) { + // DBGetTables returns [{Table: "name"}, ...] + const tables = (res.data as any[]).map((row: any) => row.Table || row.table || row.TABLE_NAME || Object.values(row)[0]); + setAllTables(tables as string[]); + setCurrentStep(1); + } else { + message.error(res.message); + } + } + } catch (e) { message.error("Failed to fetch tables"); } + setLoading(false); + }; + + const runSync = async () => { + setLoading(true); + const sConn = connections.find(c => c.id === sourceConnId)!; + const tConn = connections.find(c => c.id === targetConnId)!; + + const config = { + sourceConfig: { ...sConn.config, database: sourceDb }, + targetConfig: { ...tConn.config, database: targetDb }, + tables: selectedTables, + mode: "insert_update" + }; + + try { + const res = await DataSync(config as any); + setSyncResult(res); + setCurrentStep(2); + } catch (e) { + message.error("Sync execution failed"); + } + setLoading(false); + }; + + return ( + + + + + + + + {/* STEP 1: CONFIG */} + {currentStep === 0 && ( +
+ +
+ + + + + + +
+
+
+ +
+ + + + + + +
+
+
+ )} + + {/* STEP 2: TABLES */} + {currentStep === 1 && ( +
+ 请选择需要同步的表: + ({ key: t, title: t }))} + titles={['源表', '已选表']} + targetKeys={selectedTables} + onChange={(keys) => setSelectedTables(keys as string[])} + render={item => item.title} + listStyle={{ width: 350, height: 350, marginTop: 12 }} + locale={{ itemUnit: '项', itemsUnit: '项', searchPlaceholder: '搜索表', notFoundContent: '暂无数据' }} + /> +
+ )} + + {/* STEP 3: RESULT */} + {currentStep === 2 && syncResult && ( +
+ + 日志 +
+ {syncResult.logs.map((log: string, i: number) =>
{log}
)} +
+
+ )} + +
+ {currentStep === 0 && ( + + )} + {currentStep === 1 && ( + <> + + + + )} + {currentStep === 2 && ( + <> + + + + )} +
+
+ ); +}; + +export default DataSyncModal; diff --git a/frontend/wailsjs/go/app/App.d.ts b/frontend/wailsjs/go/app/App.d.ts index 3647c32..e118dc1 100755 --- a/frontend/wailsjs/go/app/App.d.ts +++ b/frontend/wailsjs/go/app/App.d.ts @@ -1,6 +1,7 @@ // Cynhyrchwyd y ffeil hon yn awtomatig. PEIDIWCH Â MODIWL // This file is automatically generated. DO NOT EDIT import {connection} from '../models'; +import {sync} from '../models'; export function ApplyChanges(arg1:connection.ConnectionConfig,arg2:string,arg3:string,arg4:connection.ChangeSet):Promise; @@ -26,6 +27,8 @@ export function DBQuery(arg1:connection.ConnectionConfig,arg2:string,arg3:string export function DBShowCreateTable(arg1:connection.ConnectionConfig,arg2:string,arg3:string):Promise; +export function DataSync(arg1:sync.SyncConfig):Promise; + export function ExportData(arg1:Array>,arg2:Array,arg3:string,arg4:string):Promise; export function ExportTable(arg1:connection.ConnectionConfig,arg2:string,arg3:string,arg4:string):Promise; diff --git a/frontend/wailsjs/go/app/App.js b/frontend/wailsjs/go/app/App.js index 404a845..2c1d675 100755 --- a/frontend/wailsjs/go/app/App.js +++ b/frontend/wailsjs/go/app/App.js @@ -50,6 +50,10 @@ export function DBShowCreateTable(arg1, arg2, arg3) { return window['go']['app']['App']['DBShowCreateTable'](arg1, arg2, arg3); } +export function DataSync(arg1) { + return window['go']['app']['App']['DataSync'](arg1); +} + export function ExportData(arg1, arg2, arg3, arg4) { return window['go']['app']['App']['ExportData'](arg1, arg2, arg3, arg4); } diff --git a/frontend/wailsjs/go/models.ts b/frontend/wailsjs/go/models.ts index e2c4ae3..fcf5511 100755 --- a/frontend/wailsjs/go/models.ts +++ b/frontend/wailsjs/go/models.ts @@ -77,6 +77,8 @@ export namespace connection { database: string; useSSH: boolean; ssh: SSHConfig; + driver?: string; + dsn?: string; static createFrom(source: any = {}) { return new ConnectionConfig(source); @@ -92,6 +94,8 @@ export namespace connection { this.database = source["database"]; this.useSSH = source["useSSH"]; this.ssh = this.convertValues(source["ssh"], SSHConfig); + this.driver = source["driver"]; + this.dsn = source["dsn"]; } convertValues(a: any, classs: any, asMap: boolean = false): any { @@ -134,3 +138,68 @@ export namespace connection { } +export namespace sync { + + export class SyncConfig { + sourceConfig: connection.ConnectionConfig; + targetConfig: connection.ConnectionConfig; + tables: string[]; + mode: string; + + static createFrom(source: any = {}) { + return new SyncConfig(source); + } + + constructor(source: any = {}) { + if ('string' === typeof source) source = JSON.parse(source); + this.sourceConfig = this.convertValues(source["sourceConfig"], connection.ConnectionConfig); + this.targetConfig = this.convertValues(source["targetConfig"], connection.ConnectionConfig); + this.tables = source["tables"]; + this.mode = source["mode"]; + } + + convertValues(a: any, classs: any, asMap: boolean = false): any { + if (!a) { + return a; + } + if (a.slice && a.map) { + return (a as any[]).map(elem => this.convertValues(elem, classs)); + } else if ("object" === typeof a) { + if (asMap) { + for (const key of Object.keys(a)) { + a[key] = new classs(a[key]); + } + return a; + } + return new classs(a); + } + return a; + } + } + export class SyncResult { + success: boolean; + message: string; + logs: string[]; + tablesSynced: number; + rowsInserted: number; + rowsUpdated: number; + rowsDeleted: number; + + static createFrom(source: any = {}) { + return new SyncResult(source); + } + + constructor(source: any = {}) { + if ('string' === typeof source) source = JSON.parse(source); + this.success = source["success"]; + this.message = source["message"]; + this.logs = source["logs"]; + this.tablesSynced = source["tablesSynced"]; + this.rowsInserted = source["rowsInserted"]; + this.rowsUpdated = source["rowsUpdated"]; + this.rowsDeleted = source["rowsDeleted"]; + } + } + +} + diff --git a/go.mod b/go.mod index 81e9a14..e355ad6 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,11 @@ module GoNavi-Wails go 1.24.3 require ( + gitea.com/kingbase/gokb v0.0.0-20201021123113-29bd62a876c3 + gitee.com/chunanyong/dm v1.8.22 github.com/go-sql-driver/mysql v1.9.3 github.com/lib/pq v1.11.1 + github.com/sijms/go-ora/v2 v2.9.0 github.com/wailsapp/wails/v2 v2.11.0 golang.org/x/crypto v0.47.0 modernc.org/sqlite v1.44.3 @@ -16,6 +19,7 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e // indirect diff --git a/go.sum b/go.sum index 35d90ad..71fb963 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +gitea.com/kingbase/gokb v0.0.0-20201021123113-29bd62a876c3 h1:QjslQNaH5Nuap5i4nijS0OYV6GMk5kqrAmgU90zBKd4= +gitea.com/kingbase/gokb v0.0.0-20201021123113-29bd62a876c3/go.mod h1:7lH5A1jzCXD9Nl16DzaBUOfDAT8NPrDmZwKu1p5wf94= +gitee.com/chunanyong/dm v1.8.22 h1:H7fsrnUIvEA0jlDWew7vwELry1ff+tLMIu2Fk2cIBSg= +gitee.com/chunanyong/dm v1.8.22/go.mod h1:EPRJnuPFgbyOFgJ0TRYCTGzhq+ZT4wdyaj/GW/LLcNg= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -12,6 +16,9 @@ github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1 github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -61,6 +68,8 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= +github.com/sijms/go-ora/v2 v2.9.0 h1:+iQbUeTeCOFMb5BsOMgUhV8KWyrv9yjKpcK4x7+MFrg= +github.com/sijms/go-ora/v2 v2.9.0/go.mod h1:QgFInVi3ZWyqAiJwzBQA+nbKYKH77tdp1PYoCqhR2dU= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tkrajina/go-reflector v0.5.8 h1:yPADHrwmUbMq4RGEyaOUpz2H90sRsETNVpjzo3DLVQQ= @@ -97,6 +106,7 @@ golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= diff --git a/internal/app/methods_sync.go b/internal/app/methods_sync.go new file mode 100644 index 0000000..e3927d5 --- /dev/null +++ b/internal/app/methods_sync.go @@ -0,0 +1,11 @@ +package app + +import ( + "GoNavi-Wails/internal/sync" +) + +// DataSync executes a data synchronization task +func (a *App) DataSync(config sync.SyncConfig) sync.SyncResult { + engine := sync.NewSyncEngine() + return engine.RunSync(config) +} diff --git a/internal/connection/types.go b/internal/connection/types.go index d7b1956..6cf65ac 100644 --- a/internal/connection/types.go +++ b/internal/connection/types.go @@ -19,14 +19,16 @@ type ConnectionConfig struct { Database string `json:"database"` UseSSH bool `json:"useSSH"` SSH SSHConfig `json:"ssh"` + Driver string `json:"driver,omitempty"` // For custom connection + DSN string `json:"dsn,omitempty"` // For custom connection } // QueryResult is the standard response format for Wails methods type QueryResult struct { - Success bool `json:"success"` - Message string `json:"message"` - Data interface{} `json:"data"` - Fields []string `json:"fields,omitempty"` + Success bool `json:"success"` + Message string `json:"message"` + Data interface{} `json:"data"` + Fields []string `json:"fields,omitempty"` } // ColumnDefinition represents a table column @@ -36,7 +38,7 @@ type ColumnDefinition struct { Nullable string `json:"nullable"` // YES/NO Key string `json:"key"` // PRI, UNI, MUL Default *string `json:"default"` - Extra string `json:"extra"` // auto_increment + Extra string `json:"extra"` // auto_increment Comment string `json:"comment"` } diff --git a/internal/db/custom_impl.go b/internal/db/custom_impl.go new file mode 100644 index 0000000..881aa4c --- /dev/null +++ b/internal/db/custom_impl.go @@ -0,0 +1,234 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/utils" +) + +type CustomDB struct { + conn *sql.DB + driver string +} + +func (c *CustomDB) Connect(config connection.ConnectionConfig) error { + if config.Driver == "" || config.DSN == "" { + return fmt.Errorf("driver and dsn are required for custom connection") + } + + // Verify driver is registered (implicit check by sql.Open) + // We might not need explicit check, sql.Open will fail or Ping will fail if driver not found. + + db, err := sql.Open(config.Driver, config.DSN) + if err != nil { + return err + } + c.conn = db + c.driver = config.Driver + return c.Ping() +} + +func (c *CustomDB) Close() error { + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +func (c *CustomDB) Ping() error { + if c.conn == nil { + return fmt.Errorf("connection not open") + } + ctx, cancel := utils.ContextWithTimeout(5 * time.Second) + defer cancel() + return c.conn.PingContext(ctx) +} + +func (c *CustomDB) Query(query string) ([]map[string]interface{}, []string, error) { + if c.conn == nil { + return nil, nil, fmt.Errorf("connection not open") + } + + rows, err := c.conn.Query(query) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, nil, err + } + + var resultData []map[string]interface{} + + for rows.Next() { + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + continue + } + + entry := make(map[string]interface{}) + for i, col := range columns { + var v interface{} + val := values[i] + b, ok := val.([]byte) + if ok { + v = string(b) + } else { + v = val + } + entry[col] = v + } + resultData = append(resultData, entry) + } + + return resultData, columns, nil +} + +func (c *CustomDB) Exec(query string) (int64, error) { + if c.conn == nil { + return 0, fmt.Errorf("connection not open") + } + res, err := c.conn.Exec(query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func (c *CustomDB) GetDatabases() ([]string, error) { + // Try standard information_schema or some known patterns if we can't guess + // For "custom", we can't easily know. + // But many DBs support SHOW DATABASES or SELECT datname FROM pg_database + // We'll try a generic query or return empty. + // Users using custom might know their DB context is single. + + // Best effort: + return []string{}, nil +} + +func (c *CustomDB) GetTables(dbName string) ([]string, error) { + // ANSI Standard + query := "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" + // If mysql-like + if c.driver == "mysql" { + query = "SHOW TABLES" + if dbName != "" { + query = fmt.Sprintf("SHOW TABLES FROM `%s`", dbName) + } + } else if c.driver == "postgres" || c.driver == "kingbase" { + if dbName != "" && dbName != "public" { + query = fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s'", dbName) + } + } else if c.driver == "sqlite" { + query = "SELECT name FROM sqlite_master WHERE type='table'" + } else if c.driver == "oracle" || c.driver == "dm" { + query = "SELECT table_name FROM user_tables" + } + + // Fallback generic execution + data, _, err := c.Query(query) + if err != nil { + return nil, fmt.Errorf("failed to get tables for custom driver %s: %v", c.driver, err) + } + + var tables []string + for _, row := range data { + // iterate keys to find likely column + for k, v := range row { + if strings.Contains(strings.ToLower(k), "name") || strings.Contains(strings.ToLower(k), "table") { + tables = append(tables, fmt.Sprintf("%v", v)) + break + } + } + } + return tables, nil +} + +func (c *CustomDB) GetCreateStatement(dbName, tableName string) (string, error) { + return "Not supported for custom connections yet", nil +} + +func (c *CustomDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { + // ANSI Standard + // SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = '...' + + schema := "public" + if dbName != "" { + schema = dbName + } + + query := fmt.Sprintf(`SELECT column_name, data_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_name = '%s'`, tableName) + + // Adjust for schema if likely supported + if c.driver == "postgres" || c.driver == "kingbase" { + query += fmt.Sprintf(" AND table_schema = '%s'", schema) + } else if c.driver == "mysql" { + query = fmt.Sprintf("SHOW FULL COLUMNS FROM `%s`", tableName) + if dbName != "" { + query = fmt.Sprintf("SHOW FULL COLUMNS FROM `%s`.`%s`", dbName, tableName) + } + } + + data, _, err := c.Query(query) + if err != nil { + return nil, err + } + + var columns []connection.ColumnDefinition + for _, row := range data { + col := connection.ColumnDefinition{} + // flexible mapping + for k, v := range row { + kl := strings.ToLower(k) + val := fmt.Sprintf("%v", v) + if strings.Contains(kl, "field") || strings.Contains(kl, "column_name") { + col.Name = val + } else if strings.Contains(kl, "type") { + col.Type = val + } else if strings.Contains(kl, "null") || strings.Contains(kl, "nullable") { + col.Nullable = val + } else if strings.Contains(kl, "default") { + col.Default = &val + } else if strings.Contains(kl, "key") { + col.Key = val + } else if strings.Contains(kl, "comment") { + col.Comment = val + } + } + columns = append(columns, col) + } + return columns, nil +} + +func (c *CustomDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) { + return nil, fmt.Errorf("not implemented for custom") +} + +func (c *CustomDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) { + return nil, fmt.Errorf("not implemented for custom") +} + +func (c *CustomDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) { + return nil, fmt.Errorf("not implemented for custom") +} + +func (c *CustomDB) ApplyChanges(tableName string, changes connection.ChangeSet) error { + return fmt.Errorf("read-only mode for custom") +} + +func (c *CustomDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) { + return nil, fmt.Errorf("not implemented for custom") +} diff --git a/internal/db/dameng_impl.go b/internal/db/dameng_impl.go new file mode 100644 index 0000000..5d19b07 --- /dev/null +++ b/internal/db/dameng_impl.go @@ -0,0 +1,340 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/ssh" + "GoNavi-Wails/internal/utils" + + _ "gitee.com/chunanyong/dm" +) + +type DamengDB struct { + conn *sql.DB +} + +func (d *DamengDB) getDSN(config connection.ConnectionConfig) string { + // dm://user:password@host:port?schema=... + // or dm://user:password@host:port + + address := fmt.Sprintf("%s:%d", config.Host, config.Port) + if config.UseSSH { + // SSH logic similar to others, assumes port forwarding + _, err := ssh.RegisterSSHNetwork(config.SSH) + if err == nil { + // DM driver likely uses standard net.Dial, so we might need a local listener + // or assume port forwarding is handled externally or implicitly via "tcp" override if driver allows. + // Similar to Oracle, we skip complex custom dialer injection for now. + } + } + + dsn := fmt.Sprintf("dm://%s:%s@%s", config.User, config.Password, address) + if config.Database != "" { + dsn += fmt.Sprintf("?schema=%s", config.Database) + } + return dsn +} + +func (d *DamengDB) Connect(config connection.ConnectionConfig) error { + dsn := d.getDSN(config) + db, err := sql.Open("dm", dsn) + if err != nil { + return err + } + d.conn = db + return d.Ping() +} + +func (d *DamengDB) Close() error { + if d.conn != nil { + return d.conn.Close() + } + return nil +} + +func (d *DamengDB) Ping() error { + if d.conn == nil { + return fmt.Errorf("connection not open") + } + ctx, cancel := utils.ContextWithTimeout(5 * time.Second) + defer cancel() + return d.conn.PingContext(ctx) +} + +func (d *DamengDB) Query(query string) ([]map[string]interface{}, []string, error) { + if d.conn == nil { + return nil, nil, fmt.Errorf("connection not open") + } + + rows, err := d.conn.Query(query) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, nil, err + } + + var resultData []map[string]interface{} + + for rows.Next() { + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + continue + } + + entry := make(map[string]interface{}) + for i, col := range columns { + var v interface{} + val := values[i] + b, ok := val.([]byte) + if ok { + v = string(b) + } else { + v = val + } + entry[col] = v + } + resultData = append(resultData, entry) + } + + return resultData, columns, nil +} + +func (d *DamengDB) Exec(query string) (int64, error) { + if d.conn == nil { + return 0, fmt.Errorf("connection not open") + } + res, err := d.conn.Exec(query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func (d *DamengDB) GetDatabases() ([]string, error) { + // DM: List Users/Schemas + data, _, err := d.Query("SELECT username FROM dba_users") + if err != nil { + // Fallback if dba_users not accessible + data, _, err = d.Query("SELECT username FROM all_users") + if err != nil { + return nil, err + } + } + var dbs []string + for _, row := range data { + if val, ok := row["USERNAME"]; ok { + dbs = append(dbs, fmt.Sprintf("%v", val)) + } + } + return dbs, nil +} + +func (d *DamengDB) GetTables(dbName string) ([]string, error) { + query := fmt.Sprintf("SELECT table_name FROM all_tables WHERE owner = '%s'", strings.ToUpper(dbName)) + if dbName == "" { + query = "SELECT table_name FROM user_tables" + } + + data, _, err := d.Query(query) + if err != nil { + return nil, err + } + + var tables []string + for _, row := range data { + if val, ok := row["TABLE_NAME"]; ok { + tables = append(tables, fmt.Sprintf("%v", val)) + } + } + return tables, nil +} + +func (d *DamengDB) GetCreateStatement(dbName, tableName string) (string, error) { + // DM: SP_TABLEDEF usually returns definition + // Or standard Oracle way if supported. + // We'll try a common DM approach. + // SELECT DBMS_METADATA.GET_DDL('TABLE', 'TABLE_NAME', 'OWNER') FROM DUAL; + + query := fmt.Sprintf("SELECT DBMS_METADATA.GET_DDL('TABLE', '%s', '%s') as ddl FROM DUAL", + strings.ToUpper(tableName), strings.ToUpper(dbName)) + + if dbName == "" { + query = fmt.Sprintf("SELECT DBMS_METADATA.GET_DDL('TABLE', '%s') as ddl FROM DUAL", strings.ToUpper(tableName)) + } + + data, _, err := d.Query(query) + if err != nil { + return "", err + } + + if len(data) > 0 { + if val, ok := data[0]["DDL"]; ok { + return fmt.Sprintf("%v", val), nil + } + } + return "", fmt.Errorf("create statement not found") +} + +func (d *DamengDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { + query := fmt.Sprintf(`SELECT column_name, data_type, nullable, data_default + FROM all_tab_columns + WHERE owner = '%s' AND table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + if dbName == "" { + query = fmt.Sprintf(`SELECT column_name, data_type, nullable, data_default + FROM user_tab_columns + WHERE table_name = '%s'`, strings.ToUpper(tableName)) + } + + data, _, err := d.Query(query) + if err != nil { + return nil, err + } + + var columns []connection.ColumnDefinition + for _, row := range data { + col := connection.ColumnDefinition{ + Name: fmt.Sprintf("%v", row["COLUMN_NAME"]), + Type: fmt.Sprintf("%v", row["DATA_TYPE"]), + Nullable: fmt.Sprintf("%v", row["NULLABLE"]), + } + + if row["DATA_DEFAULT"] != nil { + def := fmt.Sprintf("%v", row["DATA_DEFAULT"]) + col.Default = &def + } + + columns = append(columns, col) + } + return columns, nil +} + +func (d *DamengDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) { + query := fmt.Sprintf(`SELECT index_name, column_name, uniqueness + FROM all_ind_columns + JOIN all_indexes USING (index_name, owner) + WHERE table_owner = '%s' AND table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + if dbName == "" { + query = fmt.Sprintf(`SELECT index_name, column_name, uniqueness + FROM user_ind_columns + JOIN user_indexes USING (index_name) + WHERE table_name = '%s'`, strings.ToUpper(tableName)) + } + + data, _, err := d.Query(query) + if err != nil { + return nil, err + } + + var indexes []connection.IndexDefinition + for _, row := range data { + unique := 1 + if val, ok := row["UNIQUENESS"]; ok && val == "UNIQUE" { + unique = 0 + } + + idx := connection.IndexDefinition{ + Name: fmt.Sprintf("%v", row["INDEX_NAME"]), + ColumnName: fmt.Sprintf("%v", row["COLUMN_NAME"]), + NonUnique: unique, + IndexType: "BTREE", + } + indexes = append(indexes, idx) + } + return indexes, nil +} + +func (d *DamengDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) { + // Reusing Oracle style query as DM is highly compatible + query := fmt.Sprintf(`SELECT a.constraint_name, a.column_name, c_pk.table_name r_table_name, b.column_name r_column_name + FROM all_cons_columns a + JOIN all_constraints c ON a.owner = c.owner AND a.constraint_name = c.constraint_name + JOIN all_constraints c_pk ON c.r_owner = c_pk.owner AND c.r_constraint_name = c_pk.constraint_name + JOIN all_cons_columns b ON c_pk.owner = b.owner AND c_pk.constraint_name = b.constraint_name AND a.position = b.position + WHERE c.constraint_type = 'R' AND a.owner = '%s' AND a.table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + data, _, err := d.Query(query) + if err != nil { + return nil, err + } + + var fks []connection.ForeignKeyDefinition + for _, row := range data { + fk := connection.ForeignKeyDefinition{ + Name: fmt.Sprintf("%v", row["CONSTRAINT_NAME"]), + ColumnName: fmt.Sprintf("%v", row["COLUMN_NAME"]), + RefTableName: fmt.Sprintf("%v", row["R_TABLE_NAME"]), + RefColumnName: fmt.Sprintf("%v", row["R_COLUMN_NAME"]), + ConstraintName: fmt.Sprintf("%v", row["CONSTRAINT_NAME"]), + } + fks = append(fks, fk) + } + return fks, nil +} + +func (d *DamengDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) { + query := fmt.Sprintf(`SELECT trigger_name, trigger_type, triggering_event + FROM all_triggers + WHERE table_owner = '%s' AND table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + data, _, err := d.Query(query) + if err != nil { + return nil, err + } + + var triggers []connection.TriggerDefinition + for _, row := range data { + trig := connection.TriggerDefinition{ + Name: fmt.Sprintf("%v", row["TRIGGER_NAME"]), + Timing: fmt.Sprintf("%v", row["TRIGGER_TYPE"]), + Event: fmt.Sprintf("%v", row["TRIGGERING_EVENT"]), + Statement: "SOURCE HIDDEN", + } + triggers = append(triggers, trig) + } + return triggers, nil +} + +func (d *DamengDB) ApplyChanges(tableName string, changes connection.ChangeSet) error { + return fmt.Errorf("read-only mode implemented for Dameng so far") +} + +func (d *DamengDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) { + query := fmt.Sprintf(`SELECT table_name, column_name, data_type + FROM all_tab_columns + WHERE owner = '%s'`, strings.ToUpper(dbName)) + + data, _, err := d.Query(query) + if err != nil { + return nil, err + } + + var cols []connection.ColumnDefinitionWithTable + for _, row := range data { + col := connection.ColumnDefinitionWithTable{ + TableName: fmt.Sprintf("%v", row["TABLE_NAME"]), + Name: fmt.Sprintf("%v", row["COLUMN_NAME"]), + Type: fmt.Sprintf("%v", row["DATA_TYPE"]), + } + cols = append(cols, col) + } + return cols, nil +} diff --git a/internal/db/database.go b/internal/db/database.go index 1e4a0e1..f2a3f10 100644 --- a/internal/db/database.go +++ b/internal/db/database.go @@ -1,8 +1,8 @@ package db import ( - "fmt" "GoNavi-Wails/internal/connection" + "fmt" ) type Database interface { @@ -34,6 +34,14 @@ func NewDatabase(dbType string) (Database, error) { return &PostgresDB{}, nil case "sqlite": return &SQLiteDB{}, nil + case "oracle": + return &OracleDB{}, nil + case "dameng": + return &DamengDB{}, nil + case "kingbase": + return &KingbaseDB{}, nil + case "custom": + return &CustomDB{}, nil default: // Default to MySQL for backward compatibility if empty if dbType == "" { diff --git a/internal/db/kingbase_impl.go b/internal/db/kingbase_impl.go new file mode 100644 index 0000000..3f59f97 --- /dev/null +++ b/internal/db/kingbase_impl.go @@ -0,0 +1,363 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/ssh" + "GoNavi-Wails/internal/utils" + + _ "gitea.com/kingbase/gokb" // Registers "kingbase" driver +) + +type KingbaseDB struct { + conn *sql.DB +} + +func (k *KingbaseDB) getDSN(config connection.ConnectionConfig) string { + // Kingbase DSN usually similar to Postgres: + // host=localhost port=54321 user=system password=... dbname=TEST sslmode=disable + + address := config.Host + port := config.Port + + if config.UseSSH { + netName, err := ssh.RegisterSSHNetwork(config.SSH) + if err == nil { + // Kingbase/Postgres lib/pq allows custom dialer via "host" if using unix socket, + // but for custom network it's harder. + // Ideally we use a local forwarder. + // For now, we assume standard TCP or handle SSH externally. + // If we implement the net.Dial override for "kingbase" driver (which might use lib/pq internally), + // we might need to check if it supports "cloudsql" style or similar custom dialers. + // Similar to others, skipping SSH deep integration here for now. + _ = netName + } + } + + // Construct DSN + dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", + address, port, config.User, config.Password, config.Database) + + return dsn +} + +func (k *KingbaseDB) Connect(config connection.ConnectionConfig) error { + dsn := k.getDSN(config) + // Open using "kingbase" driver + db, err := sql.Open("kingbase", dsn) + if err != nil { + return err + } + k.conn = db + return k.Ping() +} + +func (k *KingbaseDB) Close() error { + if k.conn != nil { + return k.conn.Close() + } + return nil +} + +func (k *KingbaseDB) Ping() error { + if k.conn == nil { + return fmt.Errorf("connection not open") + } + ctx, cancel := utils.ContextWithTimeout(5 * time.Second) + defer cancel() + return k.conn.PingContext(ctx) +} + +func (k *KingbaseDB) Query(query string) ([]map[string]interface{}, []string, error) { + if k.conn == nil { + return nil, nil, fmt.Errorf("connection not open") + } + + rows, err := k.conn.Query(query) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, nil, err + } + + var resultData []map[string]interface{} + + for rows.Next() { + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + continue + } + + entry := make(map[string]interface{}) + for i, col := range columns { + var v interface{} + val := values[i] + b, ok := val.([]byte) + if ok { + v = string(b) + } else { + v = val + } + entry[col] = v + } + resultData = append(resultData, entry) + } + + return resultData, columns, nil +} + +func (k *KingbaseDB) Exec(query string) (int64, error) { + if k.conn == nil { + return 0, fmt.Errorf("connection not open") + } + res, err := k.conn.Exec(query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func (k *KingbaseDB) GetDatabases() ([]string, error) { + // Postgres/Kingbase style + data, _, err := k.Query("SELECT datname FROM pg_database WHERE datistemplate = false") + if err != nil { + return nil, err + } + var dbs []string + for _, row := range data { + if val, ok := row["datname"]; ok { + dbs = append(dbs, fmt.Sprintf("%v", val)) + } + } + return dbs, nil +} + +func (k *KingbaseDB) GetTables(dbName string) ([]string, error) { + // Usually restricted to current database connection in PG/Kingbase + // dbName param is often Schema in PG context, or ignored if we are connected to a specific DB. + // But in PG, cross-database queries are not standard without dblink. + // We assume dbName here might mean Schema (public, etc.) + + query := "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" + if dbName != "" && dbName != "public" { + query = fmt.Sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s'", dbName) + } + + data, _, err := k.Query(query) + if err != nil { + return nil, err + } + + var tables []string + for _, row := range data { + if val, ok := row["table_name"]; ok { + tables = append(tables, fmt.Sprintf("%v", val)) + } + } + return tables, nil +} + +func (k *KingbaseDB) GetCreateStatement(dbName, tableName string) (string, error) { + // Kingbase doesn't have "SHOW CREATE TABLE". + // We can try pg_dump logic or use a query to reconstruction. + // A simple approach is just returning basic info or "Not Supported". + // Or we can query information_schema to build it. + return "SHOW CREATE TABLE not directly supported in Kingbase/Postgres via SQL", nil +} + +func (k *KingbaseDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { + schema := "public" + if dbName != "" { + schema = dbName + } + + query := fmt.Sprintf(`SELECT column_name, data_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_schema = '%s' AND table_name = '%s' + ORDER BY ordinal_position`, schema, tableName) + + data, _, err := k.Query(query) + if err != nil { + return nil, err + } + + var columns []connection.ColumnDefinition + for _, row := range data { + col := connection.ColumnDefinition{ + Name: fmt.Sprintf("%v", row["column_name"]), + Type: fmt.Sprintf("%v", row["data_type"]), + Nullable: fmt.Sprintf("%v", row["is_nullable"]), + } + + if row["column_default"] != nil { + def := fmt.Sprintf("%v", row["column_default"]) + col.Default = &def + } + + columns = append(columns, col) + } + return columns, nil +} + +func (k *KingbaseDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) { + // Postgres/Kingbase index query + query := fmt.Sprintf(` + SELECT + i.relname as index_name, + a.attname as column_name, + ix.indisunique as is_unique + FROM + pg_class t, + pg_class i, + pg_index ix, + pg_attribute a, + pg_namespace n + WHERE + t.oid = ix.indrelid + AND i.oid = ix.indexrelid + AND a.attrelid = t.oid + AND a.attnum = ANY(ix.indkey) + AND t.relkind = 'r' + AND t.relname = '%s' + AND n.oid = t.relnamespace + AND n.nspname = '%s' + `, tableName, "public") // Default to public if dbName (schema) not clear. + + if dbName != "" { + // Update query to use dbName as schema + query = strings.Replace(query, "'public'", fmt.Sprintf("'%s'", dbName), 1) + } + + data, _, err := k.Query(query) + if err != nil { + return nil, err + } + + var indexes []connection.IndexDefinition + for _, row := range data { + nonUnique := 1 + if val, ok := row["is_unique"]; ok { + if b, ok := val.(bool); ok && b { + nonUnique = 0 + } + } + + idx := connection.IndexDefinition{ + Name: fmt.Sprintf("%v", row["index_name"]), + ColumnName: fmt.Sprintf("%v", row["column_name"]), + NonUnique: nonUnique, + IndexType: "BTREE", // Default + } + indexes = append(indexes, idx) + } + return indexes, nil +} + +func (k *KingbaseDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) { + schema := "public" + if dbName != "" { + schema = dbName + } + + query := fmt.Sprintf(` + SELECT + tc.constraint_name, + kcu.column_name, + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name='%s' AND tc.table_schema='%s'`, + tableName, schema) + + data, _, err := k.Query(query) + if err != nil { + return nil, err + } + + var fks []connection.ForeignKeyDefinition + for _, row := range data { + fk := connection.ForeignKeyDefinition{ + Name: fmt.Sprintf("%v", row["constraint_name"]), + ColumnName: fmt.Sprintf("%v", row["column_name"]), + RefTableName: fmt.Sprintf("%v", row["foreign_table_name"]), + RefColumnName: fmt.Sprintf("%v", row["foreign_column_name"]), + ConstraintName: fmt.Sprintf("%v", row["constraint_name"]), + } + fks = append(fks, fk) + } + return fks, nil +} + +func (k *KingbaseDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) { + query := fmt.Sprintf(`SELECT trigger_name, action_timing, event_manipulation + FROM information_schema.triggers + WHERE event_object_table = '%s'`, tableName) + + data, _, err := k.Query(query) + if err != nil { + return nil, err + } + + var triggers []connection.TriggerDefinition + for _, row := range data { + trig := connection.TriggerDefinition{ + Name: fmt.Sprintf("%v", row["trigger_name"]), + Timing: fmt.Sprintf("%v", row["action_timing"]), + Event: fmt.Sprintf("%v", row["event_manipulation"]), + Statement: "SOURCE HIDDEN", + } + triggers = append(triggers, trig) + } + return triggers, nil +} + +func (k *KingbaseDB) ApplyChanges(tableName string, changes connection.ChangeSet) error { + return fmt.Errorf("read-only mode implemented for Kingbase so far") +} + +func (k *KingbaseDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) { + schema := "public" + if dbName != "" { + schema = dbName + } + + query := fmt.Sprintf(`SELECT table_name, column_name, data_type + FROM information_schema.columns + WHERE table_schema = '%s'`, schema) + + data, _, err := k.Query(query) + if err != nil { + return nil, err + } + + var cols []connection.ColumnDefinitionWithTable + for _, row := range data { + col := connection.ColumnDefinitionWithTable{ + TableName: fmt.Sprintf("%v", row["table_name"]), + Name: fmt.Sprintf("%v", row["column_name"]), + Type: fmt.Sprintf("%v", row["data_type"]), + } + cols = append(cols, col) + } + return cols, nil +} diff --git a/internal/db/oracle_impl.go b/internal/db/oracle_impl.go new file mode 100644 index 0000000..a17a539 --- /dev/null +++ b/internal/db/oracle_impl.go @@ -0,0 +1,350 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/ssh" + "GoNavi-Wails/internal/utils" + + _ "github.com/sijms/go-ora/v2" +) + +type OracleDB struct { + conn *sql.DB +} + +func (o *OracleDB) getDSN(config connection.ConnectionConfig) string { + // oracle://user:pass@host:port/service_name + database := config.Database + if database == "" { + database = config.User // Default to user service/schema if empty? + } + + address := fmt.Sprintf("%s:%d", config.Host, config.Port) + if config.UseSSH { + _, err := ssh.RegisterSSHNetwork(config.SSH) + if err == nil { + // Oracle driver might not support custom dialer via DSN easily without extra config + // But go-ora v2 supports some advanced options. + // For simplicity, we assume standard TCP or we might need a workaround for SSH. + // go-ora v2 is pure Go, so we can potentially use a custom dialer if we manually open. + // But for now, let's just use the address. + // SSH tunneling via net.Dialer override is complex in sql.Open("oracle", ...). + // We might need to forward a local port if using SSH. + // Since ssh.RegisterSSHNetwork creates a custom network "ssh-via-...", + // we need to see if go-ora supports custom networks. + // Checking go-ora docs (simulated): It supports "unix" and "tcp". + // We might need to map the custom network to a local proxy. + // For now, we will assume direct connection or handle SSH separately later. + // We'll leave the protocol implementation as is in MySQL for now, hoping go-ora uses standard net.Dial. + // Note: go-ora connection string: oracle://user:pass@host:port/service + // It parses host/port. It doesn't easily take a custom "network" parameter in URL. + // We will proceed with standard TCP string. + } + } + + // go-ora url structure: oracle://user:password@address:port/service_name + return fmt.Sprintf("oracle://%s:%s@%s/%s", + config.User, config.Password, address, database) +} + +func (o *OracleDB) Connect(config connection.ConnectionConfig) error { + dsn := o.getDSN(config) + db, err := sql.Open("oracle", dsn) + if err != nil { + return err + } + o.conn = db + return o.Ping() +} + +func (o *OracleDB) Close() error { + if o.conn != nil { + return o.conn.Close() + } + return nil +} + +func (o *OracleDB) Ping() error { + if o.conn == nil { + return fmt.Errorf("connection not open") + } + ctx, cancel := utils.ContextWithTimeout(5 * time.Second) + defer cancel() + return o.conn.PingContext(ctx) +} + +func (o *OracleDB) Query(query string) ([]map[string]interface{}, []string, error) { + if o.conn == nil { + return nil, nil, fmt.Errorf("connection not open") + } + + rows, err := o.conn.Query(query) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, nil, err + } + + var resultData []map[string]interface{} + + for rows.Next() { + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + continue + } + + entry := make(map[string]interface{}) + for i, col := range columns { + var v interface{} + val := values[i] + b, ok := val.([]byte) + if ok { + v = string(b) + } else { + v = val + } + entry[col] = v + } + resultData = append(resultData, entry) + } + + return resultData, columns, nil +} + +func (o *OracleDB) Exec(query string) (int64, error) { + if o.conn == nil { + return 0, fmt.Errorf("connection not open") + } + res, err := o.conn.Exec(query) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +func (o *OracleDB) GetDatabases() ([]string, error) { + // Oracle treats Users/Schemas as "Databases" in this context + data, _, err := o.Query("SELECT username FROM all_users ORDER BY username") + if err != nil { + return nil, err + } + var dbs []string + for _, row := range data { + if val, ok := row["USERNAME"]; ok { + dbs = append(dbs, fmt.Sprintf("%v", val)) + } + } + return dbs, nil +} + +func (o *OracleDB) GetTables(dbName string) ([]string, error) { + // dbName is Schema/Owner + query := "SELECT table_name FROM user_tables" + if dbName != "" { + query = fmt.Sprintf("SELECT table_name FROM all_tables WHERE owner = '%s'", strings.ToUpper(dbName)) + } + + data, _, err := o.Query(query) + if err != nil { + return nil, err + } + + var tables []string + for _, row := range data { + if val, ok := row["TABLE_NAME"]; ok { + tables = append(tables, fmt.Sprintf("%v", val)) + } + } + return tables, nil +} + +func (o *OracleDB) GetCreateStatement(dbName, tableName string) (string, error) { + // Oracle provides DBMS_METADATA.GET_DDL + // Note: LONG type might be tricky, but basic string scan should work for smaller DDLs + query := fmt.Sprintf("SELECT DBMS_METADATA.GET_DDL('TABLE', '%s', '%s') as ddl FROM DUAL", + strings.ToUpper(tableName), strings.ToUpper(dbName)) + + if dbName == "" { + query = fmt.Sprintf("SELECT DBMS_METADATA.GET_DDL('TABLE', '%s') as ddl FROM DUAL", strings.ToUpper(tableName)) + } + + data, _, err := o.Query(query) + if err != nil { + return "", err + } + + if len(data) > 0 { + if val, ok := data[0]["DDL"]; ok { + return fmt.Sprintf("%v", val), nil + } + } + return "", fmt.Errorf("create statement not found") +} + +func (o *OracleDB) GetColumns(dbName, tableName string) ([]connection.ColumnDefinition, error) { + query := fmt.Sprintf(`SELECT column_name, data_type, nullable, data_default + FROM all_tab_columns + WHERE owner = '%s' AND table_name = '%s' + ORDER BY column_id`, strings.ToUpper(dbName), strings.ToUpper(tableName)) + + if dbName == "" { + query = fmt.Sprintf(`SELECT column_name, data_type, nullable, data_default + FROM user_tab_columns + WHERE table_name = '%s' + ORDER BY column_id`, strings.ToUpper(tableName)) + } + + data, _, err := o.Query(query) + if err != nil { + return nil, err + } + + var columns []connection.ColumnDefinition + for _, row := range data { + col := connection.ColumnDefinition{ + Name: fmt.Sprintf("%v", row["COLUMN_NAME"]), + Type: fmt.Sprintf("%v", row["DATA_TYPE"]), + Nullable: fmt.Sprintf("%v", row["NULLABLE"]), + } + + if row["DATA_DEFAULT"] != nil { + d := fmt.Sprintf("%v", row["DATA_DEFAULT"]) + col.Default = &d + } + + columns = append(columns, col) + } + return columns, nil +} + +func (o *OracleDB) GetIndexes(dbName, tableName string) ([]connection.IndexDefinition, error) { + query := fmt.Sprintf(`SELECT index_name, column_name, uniqueness + FROM all_ind_columns + JOIN all_indexes USING (index_name, owner) + WHERE table_owner = '%s' AND table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + if dbName == "" { + query = fmt.Sprintf(`SELECT index_name, column_name, uniqueness + FROM user_ind_columns + JOIN user_indexes USING (index_name) + WHERE table_name = '%s'`, strings.ToUpper(tableName)) + } + + data, _, err := o.Query(query) + if err != nil { + return nil, err + } + + var indexes []connection.IndexDefinition + for _, row := range data { + unique := 1 + if val, ok := row["UNIQUENESS"]; ok && val == "UNIQUE" { + unique = 0 + } + + idx := connection.IndexDefinition{ + Name: fmt.Sprintf("%v", row["INDEX_NAME"]), + ColumnName: fmt.Sprintf("%v", row["COLUMN_NAME"]), + NonUnique: unique, + // SeqInIndex is harder to get in simple join, omitting or estimating + IndexType: "BTREE", // Default assumption + } + indexes = append(indexes, idx) + } + return indexes, nil +} + +func (o *OracleDB) GetForeignKeys(dbName, tableName string) ([]connection.ForeignKeyDefinition, error) { + // Simplified query for FKs + query := fmt.Sprintf(`SELECT a.constraint_name, a.column_name, c_pk.table_name r_table_name, b.column_name r_column_name + FROM all_cons_columns a + JOIN all_constraints c ON a.owner = c.owner AND a.constraint_name = c.constraint_name + JOIN all_constraints c_pk ON c.r_owner = c_pk.owner AND c.r_constraint_name = c_pk.constraint_name + JOIN all_cons_columns b ON c_pk.owner = b.owner AND c_pk.constraint_name = b.constraint_name AND a.position = b.position + WHERE c.constraint_type = 'R' AND a.owner = '%s' AND a.table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + data, _, err := o.Query(query) + if err != nil { + return nil, err + } + + var fks []connection.ForeignKeyDefinition + for _, row := range data { + fk := connection.ForeignKeyDefinition{ + Name: fmt.Sprintf("%v", row["CONSTRAINT_NAME"]), + ColumnName: fmt.Sprintf("%v", row["COLUMN_NAME"]), + RefTableName: fmt.Sprintf("%v", row["R_TABLE_NAME"]), + RefColumnName: fmt.Sprintf("%v", row["R_COLUMN_NAME"]), + ConstraintName: fmt.Sprintf("%v", row["CONSTRAINT_NAME"]), + } + fks = append(fks, fk) + } + return fks, nil +} + +func (o *OracleDB) GetTriggers(dbName, tableName string) ([]connection.TriggerDefinition, error) { + query := fmt.Sprintf(`SELECT trigger_name, trigger_type, triggering_event + FROM all_triggers + WHERE table_owner = '%s' AND table_name = '%s'`, + strings.ToUpper(dbName), strings.ToUpper(tableName)) + + data, _, err := o.Query(query) + if err != nil { + return nil, err + } + + var triggers []connection.TriggerDefinition + for _, row := range data { + trig := connection.TriggerDefinition{ + Name: fmt.Sprintf("%v", row["TRIGGER_NAME"]), + Timing: fmt.Sprintf("%v", row["TRIGGER_TYPE"]), + Event: fmt.Sprintf("%v", row["TRIGGERING_EVENT"]), + Statement: "SOURCE HIDDEN", // Requires more complex query to get body + } + triggers = append(triggers, trig) + } + return triggers, nil +} + +func (o *OracleDB) ApplyChanges(tableName string, changes connection.ChangeSet) error { + // TODO: Implement batch application for Oracle using correct syntax + return fmt.Errorf("read-only mode implemented for Oracle so far") +} + +func (o *OracleDB) GetAllColumns(dbName string) ([]connection.ColumnDefinitionWithTable, error) { + query := fmt.Sprintf(`SELECT table_name, column_name, data_type + FROM all_tab_columns + WHERE owner = '%s'`, strings.ToUpper(dbName)) + + data, _, err := o.Query(query) + if err != nil { + return nil, err + } + + var cols []connection.ColumnDefinitionWithTable + for _, row := range data { + col := connection.ColumnDefinitionWithTable{ + TableName: fmt.Sprintf("%v", row["TABLE_NAME"]), + Name: fmt.Sprintf("%v", row["COLUMN_NAME"]), + Type: fmt.Sprintf("%v", row["DATA_TYPE"]), + } + cols = append(cols, col) + } + return cols, nil +} diff --git a/internal/sync/sync_engine.go b/internal/sync/sync_engine.go new file mode 100644 index 0000000..d630ed2 --- /dev/null +++ b/internal/sync/sync_engine.go @@ -0,0 +1,179 @@ +package sync + +import ( + "GoNavi-Wails/internal/connection" + "GoNavi-Wails/internal/db" + "fmt" +) + +// SyncConfig defines the parameters for a synchronization task +type SyncConfig struct { + SourceConfig connection.ConnectionConfig `json:"sourceConfig"` + TargetConfig connection.ConnectionConfig `json:"targetConfig"` + Tables []string `json:"tables"` // Tables to sync + Mode string `json:"mode"` // "insert_update", "full_overwrite" +} + +// SyncResult holds the result of the sync operation +type SyncResult struct { + Success bool `json:"success"` + Message string `json:"message"` + Logs []string `json:"logs"` + TablesSynced int `json:"tablesSynced"` + RowsInserted int `json:"rowsInserted"` + RowsUpdated int `json:"rowsUpdated"` + RowsDeleted int `json:"rowsDeleted"` +} + +type SyncEngine struct { +} + +func NewSyncEngine() *SyncEngine { + return &SyncEngine{} +} + +// CompareAndSync performs the synchronization +func (s *SyncEngine) RunSync(config SyncConfig) SyncResult { + result := SyncResult{Success: true, Logs: []string{}} + + sourceDB, err := db.NewDatabase(config.SourceConfig.Type) + if err != nil { + return s.fail(result, "初始化源数据库驱动失败: "+err.Error()) + } + if config.SourceConfig.Type == "custom" { + // Custom DB setup would go here if needed + } + + targetDB, err := db.NewDatabase(config.TargetConfig.Type) + if err != nil { + return s.fail(result, "初始化目标数据库驱动失败: "+err.Error()) + } + + // Connect Source + result.Logs = append(result.Logs, fmt.Sprintf("正在连接源数据库: %s...", config.SourceConfig.Host)) + if err := sourceDB.Connect(config.SourceConfig); err != nil { + return s.fail(result, "源数据库连接失败: "+err.Error()) + } + defer sourceDB.Close() + + // Connect Target + result.Logs = append(result.Logs, fmt.Sprintf("正在连接目标数据库: %s...", config.TargetConfig.Host)) + if err := targetDB.Connect(config.TargetConfig); err != nil { + return s.fail(result, "目标数据库连接失败: "+err.Error()) + } + defer targetDB.Close() + + // Iterate Tables + for _, tableName := range config.Tables { + result.Logs = append(result.Logs, fmt.Sprintf("正在同步表: %s", tableName)) + + // 1. Get Columns & PKs (Naive approach: assume same schema) + cols, err := sourceDB.GetColumns(config.SourceConfig.Database, tableName) + if err != nil { + result.Logs = append(result.Logs, fmt.Sprintf("获取表 %s 的列信息失败: %v", tableName, err)) + continue + } + + pkCol := "" + for _, col := range cols { + if col.Key == "PRI" || col.Key == "PK" { + pkCol = col.Name + break + } + } + + if pkCol == "" { + result.Logs = append(result.Logs, fmt.Sprintf("跳过表 %s: 未找到主键 (同步需要主键)", tableName)) + continue + } + + // 2. Fetch Data (MEMORY INTENSIVE - PROTOTYPE ONLY) + // TODO: Implement paging/streaming + sourceRows, _, err := sourceDB.Query(fmt.Sprintf("SELECT * FROM %s", tableName)) + if err != nil { + result.Logs = append(result.Logs, fmt.Sprintf("读取源表 %s 失败: %v", tableName, err)) + continue + } + + targetRows, _, err := targetDB.Query(fmt.Sprintf("SELECT * FROM %s", tableName)) + if err != nil { + // Table might not exist in target? + // Check if error is "table not found" -> Try to Create? + // For now, assume table exists. + result.Logs = append(result.Logs, fmt.Sprintf("读取目标表 %s 失败: %v", tableName, err)) + continue + } + + // 3. Compare (In-Memory Hash Map) + targetMap := make(map[string]map[string]interface{}) + for _, row := range targetRows { + pkVal := fmt.Sprintf("%v", row[pkCol]) + targetMap[pkVal] = row + } + + var inserts []map[string]interface{} + var updates []connection.UpdateRow + // var deletes []map[string]interface{} // Not implemented in "insert_update" mode usually + + for _, sRow := range sourceRows { + pkVal := fmt.Sprintf("%v", sRow[pkCol]) + + if tRow, exists := targetMap[pkVal]; exists { + // Update? Compare values + // Simplified: Compare string representations or iterate keys + // For prototype: assume update if exists + // Optimization: Check diff + changes := make(map[string]interface{}) + for k, v := range sRow { + if fmt.Sprintf("%v", v) != fmt.Sprintf("%v", tRow[k]) { + changes[k] = v + } + } + if len(changes) > 0 { + updates = append(updates, connection.UpdateRow{ + Keys: map[string]interface{}{pkCol: pkVal}, + Values: changes, + }) + } + } else { + // Insert + inserts = append(inserts, sRow) + } + } + + // 4. Apply Changes + changeSet := connection.ChangeSet{ + Inserts: inserts, + Updates: updates, + } + + if len(inserts) > 0 || len(updates) > 0 { + result.Logs = append(result.Logs, fmt.Sprintf(" -> 需插入: %d 行, 需更新: %d 行", len(inserts), len(updates))) + + // We need a BatchApplier interface or assume Database implements ApplyChanges + if applier, ok := targetDB.(db.BatchApplier); ok { + if err := applier.ApplyChanges(tableName, changeSet); err != nil { + result.Logs = append(result.Logs, fmt.Sprintf(" -> 应用变更失败: %v", err)) + } else { + result.RowsInserted += len(inserts) + result.RowsUpdated += len(updates) + } + } else { + result.Logs = append(result.Logs, " -> 目标驱动不支持应用数据变更 (ApplyChanges).") + } + } else { + result.Logs = append(result.Logs, " -> 数据一致,无需变更.") + } + + result.TablesSynced++ + } + + return result +} + +func (s *SyncEngine) fail(res SyncResult, msg string) SyncResult { + res.Success = false + res.Message = msg + res.Logs = append(res.Logs, "致命错误: "+msg) + return res +} From 2626ce198ce782030d985ba41936dfcdd43d6eee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E5=9B=BD=E9=94=8B?= Date: Mon, 2 Feb 2026 19:58:10 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E2=9C=A8=20feat(core):=20=E5=BF=BD?= =?UTF-8?q?=E7=95=A5.exe=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 77af73e..d7c20d7 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ node_modules/ dist/ .DS_Store .gemini-clipboard -GoNavi-Wails \ No newline at end of file +GoNavi-Wails +GoNavi-Wails.exe \ No newline at end of file