Files
geekgeekrun/packages/pm/daemon.js

508 lines
15 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 如果是通过 Electron 运行,禁用 GUI 和 Dock 图标
if (typeof require !== 'undefined') {
try {
const { app } = require('electron');
if (app) {
// 隐藏 Dock 图标macOS
if (process.platform === 'darwin') {
app.dock?.hide();
}
// 防止显示窗口
app.on('ready', () => {
// 不创建任何窗口,保持后台运行
});
// 防止在没有窗口时退出
app.on('window-all-closed', (e) => {
e.preventDefault();
// 不退出应用,保持后台运行
});
}
} catch (e) {
// 不在 Electron 环境中,忽略
}
}
const net = require('net');
const { spawn } = require('child_process');
const path = require('path');
const split2 = require('split2');
const fs = require('fs')
const { tmpdir } = require('os')
const { randomUUID } = require('crypto')
const ipcWritePipe = fs.createWriteStream(null, { fd: 3 })
let ipcSocketName = process.env.GEEKGEEKRUND_PIPE_NAME
if (!ipcSocketName) {
process.env.GEEKGEEKRUND_PIPE_NAME = `geekgeekrun-d_${randomUUID()}`
ipcSocketName = process.env.GEEKGEEKRUND_PIPE_NAME
}
const ipcSocketPath = process.platform === 'win32'
? `\\\\.\\pipe\\${ipcSocketName}`
: path.join(tmpdir(), `${ipcSocketName}.sock`)
const workers = new Map(); // workerId -> { process, status, restartCount, socket, latestScreenshot, latestScreenshotAt }
const userProcessClients = new Set(); // GUI客户端连接集合
const stoppedWorkers = new Set(); // 被用户主动停止的workerId集合用于防止竞态条件
const pidToProcessInfoMap = new Map()
const socketToWorkerIdSetMap = new WeakMap()
// 创建TCP服务器
const server = net.createServer((socket) => {
console.log('客户端已连接');
// 使用 split2 按行分割流式数据,处理 JSONL 格式(每行一个 JSON
// split2 会自动处理 TCP 分包问题,确保每条完整的消息(以换行符结尾)才会触发
const splitStream = split2();
socket.pipe(splitStream).on('data', (line) => {
const trimmedLine = line.toString().trim();
if (!trimmedLine) {
return; // 跳过空行
}
let _callbackUuid
try {
const message = JSON.parse(trimmedLine);
_callbackUuid = message._callbackUuid
handleMessage(socket, message);
} catch (parseError) {
console.error('解析JSON消息失败:', parseError.message);
console.error('原始数据:', trimmedLine.substring(0, 100)); // 只打印前100个字符
sendResponse(socket, _callbackUuid, { error: '无效的JSON格式', details: parseError.message });
}
});
splitStream.on('error', (err) => {
console.error('split2 流处理错误:', err);
sendResponse(socket, null, { error: '流处理失败' });
});
socket.on('error', (err) => {
console.error('Socket错误:', err);
});
socket.on('close', () => {
console.log('客户端已断开连接');
// 清理GUI客户端连接
userProcessClients.delete(socket);
// 清理工具进程连接
for (const [workerId, workerInfo] of workers.entries()) {
if (workerInfo.socket === socket) {
console.log(`工具进程 ${workerId} 的连接已断开`);
workerInfo.socket = null;
}
}
const workerIdSet = socketToWorkerIdSetMap.get(socket) || new Set()
;[...workerIdSet].forEach(workerId => {
stopWorker(workerId);
})
});
});
// 处理消息
function handleMessage(socket, message) {
message.type !== 'get-status' && console.log('收到消息:', message);
const _callbackUuid = message._callbackUuid
if (message.type === 'ping') {
sendResponse(socket, _callbackUuid, {
success: true,
message: 'pong'
});
return
}
if (message.type === 'user-process-register') {
// GUI客户端的控制消息
// 标记为GUI客户端
if (!userProcessClients.has(socket)) {
userProcessClients.add(socket);
}
sendResponse(socket, _callbackUuid, {
success: true,
});
return;
}
const workerId = message.workerId;
const workerInfo = workers.get(workerId);
switch (message.type) {
// 从工具进程发送的消息(数据、心跳、截图等)
case 'check-should-exit': {
const workerId = message.workerId;
const shouldExit = stoppedWorkers.has(workerId) || !workers.has(workerId);
sendResponse(socket, _callbackUuid, {
workerId: workerId,
shouldExit: shouldExit
});
if (shouldExit) {
console.log(`工具进程 ${workerId} 查询是否应该退出,返回: 是`);
}
return
}
case 'worker-to-gui-message': {
// 将 prerequisite step 状态写入 worker 的 runtimeStorage
if (
workerInfo &&
message.data?.type === 'prerequisite-step-by-step-check' &&
message.data?.step?.id
) {
workerInfo.runtimeStorage = workerInfo.runtimeStorage || {}
workerInfo.runtimeStorage.stepStatusMapByStepId =
workerInfo.runtimeStorage.stepStatusMapByStepId || {}
workerInfo.runtimeStorage.stepStatusMapByStepId[
message.data.step.id
] = {
step: message.data.step,
runRecordId: message.data.runRecordId
}
}
// 转发工具进程消息到GUI客户端
broadcastToGUI({
type: 'worker-to-gui-message',
workerId: workerId,
data: message.data || message,
timestamp: Date.now()
});
return
}
case 'worker-screenshot': {
if (workerInfo && message.data && message.data.screenshot /* && workerInfo.socket === socket */) {
// 如果携带截图信息,则在守护进程内缓存一份,供 get-status 使用
try {
workerInfo.latestScreenshot = message.data.screenshot;
workerInfo.latestScreenshotAt = Date.now();
} catch (e) {
console.error('缓存 worker 截图信息失败:', e);
}
}
return
}
// 从GUI进程发送的消息数据、心跳、截图等
case 'start-worker': {
const {
workerId,
command,
args,
env
} = message
if (workers.has(workerId)) {
console.log(`工具进程 ${workerId} 已在运行`);
return;
}
startWorker({
workerId,
command,
args,
env
});
sendResponse(socket, _callbackUuid, {
success: true,
message: `工具进程 ${message.workerId} 已启动`,
workerId: message.workerId
});
let socketToWorkerIdSet = socketToWorkerIdSetMap.get(socket)
if (
!(socketToWorkerIdSet instanceof Set)
) {
socketToWorkerIdSet = new Set()
socketToWorkerIdSetMap.set(
socket,
socketToWorkerIdSet
)
}
socketToWorkerIdSet.add(workerId)
return
}
case 'stop-worker': {
stopWorker(message.workerId);
sendResponse(socket, _callbackUuid, {
success: true,
message: `工具进程 ${message.workerId} 已停止`,
workerId: message.workerId
});
return
}
case 'get-status': {
const status = getWorkersStatus();
sendResponse(socket, _callbackUuid, {
success: true,
workers: status
});
return
}
default: {
sendResponse(socket, _callbackUuid, { error: '未知的消息类型' });
}
}
}
// 启动工具进程
function startWorker({ workerId, command, args, env }, restartCount = 0) {
const noAutoRestartExitCodeSet = new Set([0]);
(env.GEEKGEEKRUND_NO_AUTO_RESTART_EXIT_CODE ?? '')
.split(',')
.map(n => parseInt(n))
.forEach(n => noAutoRestartExitCodeSet.add(n))
console.log(`启动工具进程: ${workerId}${restartCount > 0 ? ` (重启第${restartCount}次)` : ''}`);
// 添加参数使工具进程在后台运行,不显示 UI
const workerProcess = spawn(command, args, {
stdio: ['ignore', 'pipe', 'pipe'],
env: {
...process.env,
...env,
GEEKGEEKRUND_WORKER_ID: workerId,
GEEKGEEKRUND_RESTART_COUNT: restartCount.toString(),
}
});
let output = '';
workerProcess.stdout.on('data', (data) => {
output += data.toString();
console.log(`工具进程 ${workerId} 输出:`, data.toString().trim());
});
workerProcess.stderr.on('data', (data) => {
console.error(`工具进程 ${workerId} 错误:`, data.toString().trim());
});
workerProcess.on('exit', (code, signal) => {
console.log(`工具进程 ${workerId} 退出,代码: ${code}, 信号: ${signal}`);
const workerInfo = pidToProcessInfoMap.get(workerProcess.pid)
if (workerInfo) {
pidToProcessInfoMap.delete(workerProcess.pid)
// 关闭工具进程的TCP连接
if (workerInfo.socket) {
workerInfo.socket.destroy();
}
const shouldRestart = !noAutoRestartExitCodeSet.has(code) // && code !== null;
// 使用当前的 restartCount 加1而不是从 workerInfo 中取(因为可能已经被删除)
const restartCount = (workerInfo.restartCount || 0) + 1;
workers.delete(workerId);
// 通知GUI客户端工具进程已退出
broadcastToGUI({
type: 'worker-exited',
workerId: workerId,
code: code,
signal: signal,
restarting: shouldRestart && !stoppedWorkers.has(workerId),
restartCount: restartCount
});
// 如果进程意外退出(非正常停止),且不在停止列表中,自动重启
if (shouldRestart && !stoppedWorkers.has(workerId)) {
console.log(`工具进程 ${workerId} 意外退出,准备重启 (第${restartCount}次)`);
// 延迟重启,避免频繁重启
setTimeout(() => {
// 再次检查确保worker不在停止列表中且当前没有运行
if (!workers.has(workerId) && !stoppedWorkers.has(workerId)) {
startWorker({ workerId, command, args, env }, restartCount);
} else if (stoppedWorkers.has(workerId)) {
console.log(`工具进程 ${workerId} 在重启前被标记为停止,取消重启`);
// 从停止列表中移除,因为已经处理完毕
stoppedWorkers.delete(workerId);
broadcastToGUI({
type: 'worker-exited',
workerId: workerId,
code: code,
signal: signal,
restarting: false,
restartCount: restartCount
});
}
}, 2000);
} else if (stoppedWorkers.has(workerId)) {
// 如果是在停止列表中,清理标记
console.log(`工具进程 ${workerId} 已停止,清理停止标记`);
stoppedWorkers.delete(workerId);
}
} else {
// 如果workerInfo不存在可能是已经被stopWorker删除
// 检查停止列表,如果在则清理
if (stoppedWorkers.has(workerId)) {
stoppedWorkers.delete(workerId);
}
}
});
workerProcess.on('error', (err) => {
console.log(err)
})
const workerInfo = {
process: workerProcess,
status: 'running',
startTime: Date.now(),
restartCount, // 使用传入的重启次数
runtimeStorage: {
stepStatusMapByStepId: {}
},
// socket: null, // 工具进程的TCP连接稍后由工具进程注册
// lastHeartbeat: null,
command,
args,
env,
workerId,
}
workers.set(workerId, workerInfo);
pidToProcessInfoMap.set(workerProcess.pid, workerInfo);
// 定期发送状态更新
broadcastStatus();
}
// 停止工具进程
function stopWorker(workerId) {
const workerInfo = workers.get(workerId);
// 无论workerInfo是否存在都添加到停止列表防止竞态条件
stoppedWorkers.add(workerId);
console.log(`停止工具进程: ${workerId} (已添加到停止列表)`);
if (!workerInfo) {
console.log(`工具进程 ${workerId} 不存在,但已标记为停止(防止重启)`);
// 通知GUI客户端
broadcastToGUI({
type: 'worker-disconnected',
workerId: workerId,
message: `工具进程 ${workerId} 已停止`
});
// 延迟发送状态更新
setTimeout(() => broadcastStatus(), 500);
return;
}
// 关闭工具进程的TCP连接
if (workerInfo.socket) {
workerInfo.socket.destroy();
}
workerInfo.process.kill('SIGTERM');
workers.delete(workerId);
// 通知GUI客户端
broadcastToGUI({
type: 'worker-disconnected',
workerId: workerId,
message: `工具进程 ${workerId} 已断开`
});
// 延迟发送状态更新
setTimeout(() => broadcastStatus(), 500);
}
// 获取所有工具进程状态
function getWorkersStatus() {
const status = [];
for (const [workerId, workerInfo] of workers.entries()) {
status.push({
workerId,
status: workerInfo.status,
uptime: Date.now() - workerInfo.startTime,
restartCount: workerInfo.restartCount || 0,
runtimeStorage: workerInfo.runtimeStorage || {},
// connected: workerInfo.socket !== null && !workerInfo.socket.destroyed,
// lastHeartbeat: workerInfo.lastHeartbeat,
command: workerInfo.command,
args: workerInfo.args,
pid: workerInfo.process?.pid,
// 最新截图(通常是 data URL 或 base64 字符串),以及截图时间
screenshot: workerInfo.latestScreenshot ?? null,
screenshotAt: workerInfo.latestScreenshotAt ?? null,
});
}
return status;
}
// 广播状态更新给所有GUI客户端
function broadcastStatus() {
const status = getWorkersStatus();
broadcastToGUI({
type: 'status',
workers: status
});
}
// 广播消息给所有GUI客户端
function broadcastToGUI(message) {
userProcessClients.forEach(socket => {
if (!socket.destroyed) {
try {
sendResponse(socket, null, message);
} catch (e) {
console.error('广播消息失败:', e);
userProcessClients.delete(socket);
}
}
});
}
// 发送响应
function sendResponse(socket, _callbackUuid, response) {
try {
socket.write(JSON.stringify({
...response,
_callbackUuid
}) + '\n');
} catch (e) {
console.error('发送响应失败:', e);
}
}
// 启动服务器
new Promise((resolve, reject) => {
server.once('error', (err) => {
ipcWritePipe.write(
JSON.stringify({ type: 'DAEMON_FATAL', error: err }),
(err) => void err
)
reject(err)
})
server.listen(ipcSocketPath, (err) => {
console.log(`守护进程服务器运行在端口 ${ipcSocketPath}`);
ipcWritePipe.write(
JSON.stringify({ type: 'DAEMON_READY' }),
(err) => void err
)
if (!err) {
resolve(true)
}
else {
reject(err)
}
});
})
// 优雅关闭
process.on('SIGTERM', () => {
console.log('收到SIGTERM信号正在关闭所有工具进程...');
for (const [workerId, workerInfo] of workers.entries()) {
workerInfo.process.kill('SIGTERM');
}
server.close(() => {
console.log('守护进程已关闭');
process.exit(0);
});
});
process.on('SIGINT', () => {
console.log('收到SIGINT信号正在关闭所有工具进程...');
for (const [workerId, workerInfo] of workers.entries()) {
workerInfo.process.kill('SIGTERM');
}
server.close(() => {
console.log('守护进程已关闭');
process.exit(0);
});
});