feat: Docker 集群增强 — Gateway 通讯API、像素兵种系统、互动组件、UI 优化

This commit is contained in:
晴天
2026-03-09 05:35:30 +08:00
parent 5b8a7ab76f
commit 727903f94b
18 changed files with 3017 additions and 174 deletions

View File

@@ -556,6 +556,9 @@ function isDockerAvailable() {
return fs.existsSync(DOCKER_SOCKET)
}
// === 镜像拉取进度追踪 ===
const _pullProgress = new Map()
// === 实例注册表 ===
const DEFAULT_LOCAL_INSTANCE = { id: 'local', name: '本机', type: 'local', endpoint: null, gatewayPort: 18789, addedAt: 0, note: '' }
@@ -648,7 +651,7 @@ const ALWAYS_LOCAL = new Set([
'instance_health_check', 'instance_health_all',
'docker_info', 'docker_list_containers', 'docker_create_container',
'docker_start_container', 'docker_stop_container', 'docker_restart_container',
'docker_remove_container', 'docker_container_logs', 'docker_pull_image',
'docker_remove_container', 'docker_container_logs', 'docker_container_exec', 'docker_gateway_chat', 'docker_pull_image', 'docker_pull_status',
'docker_list_images', 'docker_list_nodes', 'docker_add_node', 'docker_remove_node',
'docker_cluster_overview',
'auth_check', 'auth_login', 'auth_logout',
@@ -986,6 +989,128 @@ const handlers = {
return true
},
async docker_gateway_chat({ nodeId, containerId, message, timeout = 120000 } = {}) {
if (!containerId || !message) throw new Error('缺少 containerId 或 message')
// 1. 查找容器的 Gateway 端口
const nodes = readDockerNodes()
const node = nodeId ? nodes.find(n => n.id === nodeId) : nodes[0]
if (!node) throw new Error('节点不存在')
const resp = await dockerRequest('GET', `/containers/${containerId}/json`, null, node.endpoint)
if (resp.status >= 400) throw new Error('容器不存在或无法访问')
const ports = resp.data?.NetworkSettings?.Ports || {}
const gwBinding = ports['18789/tcp']
if (!gwBinding || !gwBinding[0]?.HostPort) throw new Error('该容器没有暴露 Gateway 端口 (18789)')
const gwPort = gwBinding[0].HostPort
// 2. 通过 WebSocket 连接 GatewayNode 22 内置 WebSocket
return new Promise((resolve, reject) => {
const wsUrl = `ws://127.0.0.1:${gwPort}/ws`
let ws
try { ws = new WebSocket(wsUrl) } catch (e) { return reject(new Error(`无法连接 Gateway: ${e.message}`)) }
let result = '', handshakeOk = false, sessionKey = 'agent:main:cluster-task', done = false
const timer = setTimeout(() => { if (!done) { done = true; ws.close(); reject(new Error('Gateway 通信超时')) } }, timeout)
const challengeTimer = setTimeout(() => { if (!handshakeOk) doConnect('') }, 3000)
function doConnect(nonce) {
try {
const frame = handlers.create_connect_frame({ nonce, gatewayToken: '' })
ws.send(JSON.stringify(frame))
} catch {
ws.send(JSON.stringify({ type: 'req', id: 'connect-1', method: 'connect', params: {} }))
}
}
function sendChat() {
const id = `chat-${Date.now().toString(36)}`
ws.send(JSON.stringify({
type: 'req', id, method: 'chat.send',
params: { sessionKey, message, deliver: false, idempotencyKey: id }
}))
}
ws.addEventListener('message', (evt) => {
let msg
try { msg = JSON.parse(typeof evt.data === 'string' ? evt.data : evt.data.toString()) } catch { return }
// connect.challenge
if (msg.type === 'event' && msg.event === 'connect.challenge') {
clearTimeout(challengeTimer)
doConnect(msg.payload?.nonce || '')
return
}
// connect 响应
if (msg.type === 'res' && msg.id?.startsWith('connect')) {
clearTimeout(challengeTimer)
if (msg.ok) {
handshakeOk = true
const defaults = msg.payload?.snapshot?.sessionDefaults
if (defaults?.mainSessionKey) sessionKey = defaults.mainSessionKey
else sessionKey = `agent:${defaults?.defaultAgentId || 'main'}:cluster-task`
sendChat()
} else {
done = true; clearTimeout(timer); ws.close()
reject(new Error(msg.error?.message || 'Gateway 握手失败'))
}
return
}
// chat 事件流
if (msg.type === 'event' && msg.event === 'chat') {
const p = msg.payload
if (p?.state === 'delta') {
const content = p.message?.content
if (typeof content === 'string' && content.length > result.length) result = content
}
if (p?.state === 'final') {
const content = p.message?.content
if (typeof content === 'string' && content) result = content
done = true; clearTimeout(timer); ws.close()
resolve({ ok: true, result })
}
return
}
// chat.send 确认
if (msg.type === 'res' && !msg.id?.startsWith('connect')) {
if (!msg.ok) {
done = true; clearTimeout(timer); ws.close()
reject(new Error(msg.error?.message || '任务发送失败'))
}
}
})
ws.addEventListener('error', () => {
if (!done) { done = true; clearTimeout(timer); clearTimeout(challengeTimer); reject(new Error(`无法连接 ${wsUrl}`)) }
})
ws.addEventListener('close', (e) => {
clearTimeout(timer); clearTimeout(challengeTimer)
if (!done) {
done = true
if (result) resolve({ ok: true, result })
else if (e.code === 4001 || e.code === 4003) reject(new Error('Gateway 认证失败'))
else resolve({ ok: true, result: result || '(无回复)' })
}
})
})
},
async docker_container_exec({ nodeId, containerId, cmd } = {}) {
const nodes = readDockerNodes()
const node = nodeId ? nodes.find(n => n.id === nodeId) : nodes[0]
if (!node) throw new Error('节点不存在')
if (!containerId) throw new Error('缺少 containerId')
if (!cmd || !Array.isArray(cmd)) throw new Error('cmd 必须是字符串数组')
// Step 1: 创建 exec 实例
const createResp = await dockerRequest('POST', `/containers/${containerId}/exec`, {
AttachStdout: true, AttachStderr: true, Cmd: cmd
}, node.endpoint)
if (createResp.status >= 400) throw new Error(`exec 创建失败: ${JSON.stringify(createResp.data)}`)
const execId = createResp.data?.Id
if (!execId) throw new Error('exec 创建失败: 无 ID')
// Step 2: 启动 exec
const startResp = await dockerRequest('POST', `/exec/${execId}/start`, { Detach: true }, node.endpoint)
if (startResp.status >= 400) throw new Error(`exec 启动失败: ${JSON.stringify(startResp.data)}`)
return { ok: true, execId }
},
async docker_container_logs({ nodeId, containerId, tail = 200 } = {}) {
const nodes = readDockerNodes()
const node = nodeId ? nodes.find(n => n.id === nodeId) : nodes[0]
@@ -998,14 +1123,97 @@ const handlers = {
return logs
},
async docker_pull_image({ nodeId, image, tag = 'latest' } = {}) {
async docker_pull_image({ nodeId, image, tag = 'latest', requestId } = {}) {
const nodes = readDockerNodes()
const node = nodeId ? nodes.find(n => n.id === nodeId) : nodes[0]
if (!node) throw new Error('节点不存在')
const imgFull = `${image || OPENCLAW_IMAGE}:${tag}`
const resp = await dockerRequest('POST', `/images/create?fromImage=${encodeURIComponent(image || OPENCLAW_IMAGE)}&tag=${tag}`, null, node.endpoint)
if (resp.status !== 200) throw new Error(resp.data?.message || '拉取镜像失败')
return `镜像 ${imgFull} 拉取完成`
const rid = requestId || `pull-${Date.now()}`
_pullProgress.set(rid, { status: 'connecting', image: imgFull, layers: {}, message: '连接 Docker...', percent: 0 })
const endpoint = node.endpoint
const apiPath = `/images/create?fromImage=${encodeURIComponent(image || OPENCLAW_IMAGE)}&tag=${tag}`
try {
await new Promise((resolve, reject) => {
const opts = { path: apiPath, method: 'POST', headers: { 'Content-Type': 'application/json' } }
if (endpoint && endpoint.startsWith('tcp://')) {
const url = new URL(endpoint.replace('tcp://', 'http://'))
opts.hostname = url.hostname
opts.port = parseInt(url.port) || 2375
} else {
opts.socketPath = endpoint || DOCKER_SOCKET
}
const req = http.request(opts, (res) => {
if (res.statusCode !== 200) {
let errData = ''
res.on('data', chunk => errData += chunk)
res.on('end', () => {
const err = (() => { try { return JSON.parse(errData).message } catch { return `HTTP ${res.statusCode}` } })()
_pullProgress.set(rid, { ..._pullProgress.get(rid), status: 'error', message: err })
reject(new Error(err))
})
return
}
_pullProgress.set(rid, { ..._pullProgress.get(rid), status: 'pulling', message: '正在拉取镜像层...' })
let lastError = null
res.on('data', (chunk) => {
const text = chunk.toString()
for (const line of text.split('\n').filter(Boolean)) {
try {
const obj = JSON.parse(line)
if (obj.error) { lastError = obj.error; continue }
const p = _pullProgress.get(rid)
if (obj.id && obj.progressDetail) {
p.layers[obj.id] = {
status: obj.status || '',
current: obj.progressDetail.current || 0,
total: obj.progressDetail.total || 0,
}
}
if (obj.status) p.message = obj.id ? `${obj.id}: ${obj.status}` : obj.status
// 计算总体进度
const layers = Object.values(p.layers)
if (layers.length > 0) {
const totalBytes = layers.reduce((s, l) => s + (l.total || 0), 0)
const currentBytes = layers.reduce((s, l) => s + (l.current || 0), 0)
p.percent = totalBytes > 0 ? Math.round((currentBytes / totalBytes) * 100) : 0
p.layerCount = layers.length
p.completedLayers = layers.filter(l => l.status === 'Pull complete' || l.status === 'Already exists').length
}
_pullProgress.set(rid, p)
} catch {}
}
})
res.on('end', () => {
if (lastError) {
_pullProgress.set(rid, { ..._pullProgress.get(rid), status: 'error', message: lastError })
reject(new Error(lastError))
} else {
_pullProgress.set(rid, { ..._pullProgress.get(rid), status: 'done', message: '拉取完成', percent: 100 })
resolve()
}
})
})
req.on('error', (e) => {
_pullProgress.set(rid, { ..._pullProgress.get(rid), status: 'error', message: e.message })
reject(new Error('Docker 连接失败: ' + e.message))
})
req.setTimeout(600000, () => {
_pullProgress.set(rid, { ..._pullProgress.get(rid), status: 'error', message: '超时' })
req.destroy()
reject(new Error('镜像拉取超时10分钟'))
})
req.end()
})
} finally {
// 30秒后清理进度数据
setTimeout(() => _pullProgress.delete(rid), 30000)
}
return { message: `镜像 ${imgFull} 拉取完成`, requestId: rid }
},
docker_pull_status({ requestId } = {}) {
if (!requestId) return { status: 'unknown' }
return _pullProgress.get(requestId) || { status: 'unknown' }
},
async docker_list_images({ nodeId } = {}) {