add @geekgeekrun/pm

This commit is contained in:
geekgeekrun
2025-12-29 23:38:37 +08:00
parent 418ca66a30
commit f4ca2ebbbd
4 changed files with 719 additions and 0 deletions

432
packages/pm/daemon.js Normal file
View File

@@ -0,0 +1,432 @@
// 如果是通过 Electron 运行,禁用 GUI 和 Dock 图标
if (typeof require !== 'undefined') {
try {
const { app } = require('electron');
if (app) {
// 隐藏 Dock 图标macOS
if (process.platform === 'darwin') {
app.dock?.hide();
}
// 防止显示窗口
app.on('ready', () => {
// 不创建任何窗口,保持后台运行
});
// 防止在没有窗口时退出
app.on('window-all-closed', (e) => {
e.preventDefault();
// 不退出应用,保持后台运行
});
}
} catch (e) {
// 不在 Electron 环境中,忽略
}
}
const net = require('net');
const { spawn } = require('child_process');
const path = require('path');
const split2 = require('split2');
const PORT = 12345;
const workers = new Map(); // workerId -> { process, status, restartCount, socket }
const guiClients = new Set(); // GUI客户端连接集合
const stoppedWorkers = new Set(); // 被用户主动停止的workerId集合用于防止竞态条件
// 创建TCP服务器
const server = net.createServer((socket) => {
console.log('客户端已连接');
// 使用 split2 按行分割流式数据,处理 JSONL 格式(每行一个 JSON
// split2 会自动处理 TCP 分包问题,确保每条完整的消息(以换行符结尾)才会触发
const splitStream = split2();
socket.pipe(splitStream).on('data', (line) => {
const trimmedLine = line.toString().trim();
if (!trimmedLine) {
return; // 跳过空行
}
try {
const message = JSON.parse(trimmedLine);
handleMessage(socket, message);
} catch (parseError) {
console.error('解析JSON消息失败:', parseError.message);
console.error('原始数据:', trimmedLine.substring(0, 100)); // 只打印前100个字符
sendResponse(socket, { error: '无效的JSON格式', details: parseError.message });
}
});
splitStream.on('error', (err) => {
console.error('split2 流处理错误:', err);
sendResponse(socket, { error: '流处理失败' });
});
socket.on('error', (err) => {
console.error('Socket错误:', err);
});
socket.on('close', () => {
console.log('客户端已断开连接');
// 清理GUI客户端连接
guiClients.delete(socket);
// 清理工具进程连接
for (const [workerId, workerInfo] of workers.entries()) {
if (workerInfo.socket === socket) {
console.log(`工具进程 ${workerId} 的连接已断开`);
workerInfo.socket = null;
}
}
});
});
// 处理消息
function handleMessage(socket, message) {
console.log('收到消息:', message);
// 工具进程注册消息
if (message.type === 'worker-register') {
const workerId = message.workerId;
// 检查是否在停止列表中(防止竞态条件)
if (stoppedWorkers.has(workerId)) {
console.log(`工具进程 ${workerId} 尝试注册,但已被标记为停止,拒绝注册`);
sendResponse(socket, {
error: `工具进程 ${workerId} 已被停止`,
shouldExit: true // 通知子进程应该退出
});
return;
}
const workerInfo = workers.get(workerId);
if (workerInfo) {
workerInfo.socket = socket;
console.log(`工具进程 ${workerId} 已注册TCP连接`);
sendResponse(socket, {
success: true,
type: 'worker-registered',
message: `工具进程 ${workerId} 连接已注册`
});
// 通知GUI客户端
broadcastToGUI({
type: 'worker-connected',
workerId: workerId,
message: `工具进程 ${workerId} 已连接`
});
} else {
// 如果workerInfo不存在但不在停止列表中可能是进程启动但还未完全初始化
// 这种情况下也拒绝注册,让进程退出
if (!stoppedWorkers.has(workerId)) {
console.log(`工具进程 ${workerId} 尝试注册但workerInfo不存在`);
}
sendResponse(socket, {
error: `工具进程 ${workerId} 未找到`,
shouldExit: true
});
}
return;
}
// 工具进程查询是否应该退出
if (message.type === 'check-should-exit') {
const workerId = message.workerId;
const shouldExit = stoppedWorkers.has(workerId) || !workers.has(workerId);
sendResponse(socket, {
type: 'check-should-exit-response',
workerId: workerId,
shouldExit: shouldExit
});
if (shouldExit) {
console.log(`工具进程 ${workerId} 查询是否应该退出,返回: 是`);
}
return;
}
// 工具进程发送的消息(数据、心跳等)
if (message.type === 'worker-message' || message.type === 'worker-heartbeat' || message.type === 'worker-data') {
const workerId = message.workerId;
const workerInfo = workers.get(workerId);
if (workerInfo && workerInfo.socket === socket) {
// 转发工具进程消息到GUI客户端
broadcastToGUI({
type: 'worker-message',
workerId: workerId,
data: message.data || message,
timestamp: Date.now()
});
// 如果是心跳,更新最后心跳时间
if (message.type === 'worker-heartbeat') {
workerInfo.lastHeartbeat = Date.now();
}
} else {
sendResponse(socket, { error: '未注册的工具进程连接' });
}
return;
}
// GUI客户端的控制消息
// 标记为GUI客户端
if (!guiClients.has(socket)) {
guiClients.add(socket);
}
switch (message.type) {
case 'start-worker':
startWorker(message.workerId);
sendResponse(socket, {
success: true,
message: `工具进程 ${message.workerId} 已启动`,
workerId: message.workerId
});
break;
case 'stop-worker':
stopWorker(message.workerId);
sendResponse(socket, {
success: true,
message: `工具进程 ${message.workerId} 已停止`,
workerId: message.workerId
});
break;
case 'get-status':
const status = getWorkersStatus();
sendResponse(socket, {
success: true,
type: 'status',
workers: status
});
break;
default:
sendResponse(socket, { error: '未知的消息类型' });
}
}
// 启动工具进程
function startWorker(workerId, restartCount = 0) {
if (workers.has(workerId)) {
console.log(`工具进程 ${workerId} 已在运行`);
return;
}
console.log(`启动工具进程: ${workerId}${restartCount > 0 ? ` (重启第${restartCount}次)` : ''}`);
// 使用 Electron 可执行程序路径,从环境变量获取,如果没有则回退到 node
const electronPath = process.env.ELECTRON_EXEC_PATH || 'node';
console.log(`使用执行程序路径: ${electronPath}`);
// 添加参数使工具进程在后台运行,不显示 UI
const workerProcess = spawn(electronPath, [
'--no-sandbox',
'--disable-gpu',
'--disable-dev-shm-usage',
path.join(__dirname, 'worker.js'),
`--worker-id=${workerId}`,
`--restart-count=${restartCount.toString()}`
], {
stdio: ['ignore', 'pipe', 'pipe'],
env: {
...process.env,
ELECTRON_EXEC_PATH: electronPath // 继续传递给子进程(如果需要)
}
});
let output = '';
workerProcess.stdout.on('data', (data) => {
output += data.toString();
console.log(`工具进程 ${workerId} 输出:`, data.toString().trim());
});
workerProcess.stderr.on('data', (data) => {
console.error(`工具进程 ${workerId} 错误:`, data.toString().trim());
});
workerProcess.on('exit', (code, signal) => {
console.log(`工具进程 ${workerId} 退出,代码: ${code}, 信号: ${signal}`);
const workerInfo = workers.get(workerId);
if (workerInfo) {
// 关闭工具进程的TCP连接
if (workerInfo.socket) {
workerInfo.socket.destroy();
}
const shouldRestart = code !== 0 // && code !== null;
// 使用当前的 restartCount 加1而不是从 workerInfo 中取(因为可能已经被删除)
const restartCount = (workerInfo.restartCount || 0) + 1;
workers.delete(workerId);
// 通知GUI客户端工具进程已退出
broadcastToGUI({
type: 'worker-exited',
workerId: workerId,
code: code,
signal: signal,
restarting: shouldRestart && !stoppedWorkers.has(workerId),
restartCount: restartCount
});
// 如果进程意外退出(非正常停止),且不在停止列表中,自动重启
if (shouldRestart && !stoppedWorkers.has(workerId)) {
console.log(`工具进程 ${workerId} 意外退出,准备重启 (第${restartCount}次)`);
// 延迟重启,避免频繁重启
setTimeout(() => {
// 再次检查确保worker不在停止列表中且当前没有运行
if (!workers.has(workerId) && !stoppedWorkers.has(workerId)) {
startWorker(workerId, restartCount);
} else if (stoppedWorkers.has(workerId)) {
console.log(`工具进程 ${workerId} 在重启前被标记为停止,取消重启`);
// 从停止列表中移除,因为已经处理完毕
stoppedWorkers.delete(workerId);
}
}, 2000);
} else if (stoppedWorkers.has(workerId)) {
// 如果是在停止列表中,清理标记
console.log(`工具进程 ${workerId} 已停止,清理停止标记`);
stoppedWorkers.delete(workerId);
}
} else {
// 如果workerInfo不存在可能是已经被stopWorker删除
// 检查停止列表,如果在则清理
if (stoppedWorkers.has(workerId)) {
stoppedWorkers.delete(workerId);
}
}
});
workers.set(workerId, {
process: workerProcess,
status: 'running',
startTime: Date.now(),
restartCount, // 使用传入的重启次数
socket: null, // 工具进程的TCP连接稍后由工具进程注册
lastHeartbeat: null
});
// 定期发送状态更新
broadcastStatus();
}
// 停止工具进程
function stopWorker(workerId) {
const workerInfo = workers.get(workerId);
// 无论workerInfo是否存在都添加到停止列表防止竞态条件
stoppedWorkers.add(workerId);
console.log(`停止工具进程: ${workerId} (已添加到停止列表)`);
if (!workerInfo) {
console.log(`工具进程 ${workerId} 不存在,但已标记为停止(防止重启)`);
// 通知GUI客户端
broadcastToGUI({
type: 'worker-disconnected',
workerId: workerId,
message: `工具进程 ${workerId} 已停止`
});
// 延迟发送状态更新
setTimeout(() => broadcastStatus(), 500);
return;
}
// 关闭工具进程的TCP连接
if (workerInfo.socket) {
workerInfo.socket.destroy();
}
workerInfo.process.kill('SIGTERM');
workers.delete(workerId);
// 通知GUI客户端
broadcastToGUI({
type: 'worker-disconnected',
workerId: workerId,
message: `工具进程 ${workerId} 已断开`
});
// 延迟发送状态更新
setTimeout(() => broadcastStatus(), 500);
}
// 获取所有工具进程状态
function getWorkersStatus() {
const status = [];
for (const [workerId, workerInfo] of workers.entries()) {
status.push({
workerId,
status: workerInfo.status,
uptime: Date.now() - workerInfo.startTime,
restartCount: workerInfo.restartCount || 0,
connected: workerInfo.socket !== null && !workerInfo.socket.destroyed,
lastHeartbeat: workerInfo.lastHeartbeat
});
}
return status;
}
// 广播状态更新给所有GUI客户端
function broadcastStatus() {
const status = getWorkersStatus();
broadcastToGUI({
type: 'status',
workers: status
});
}
// 广播消息给所有GUI客户端
function broadcastToGUI(message) {
guiClients.forEach(socket => {
if (!socket.destroyed) {
try {
sendResponse(socket, message);
} catch (e) {
console.error('广播消息失败:', e);
guiClients.delete(socket);
}
}
});
}
// 发送响应
function sendResponse(socket, response) {
try {
socket.write(JSON.stringify(response) + '\n');
} catch (e) {
console.error('发送响应失败:', e);
}
}
// 启动服务器
server.listen(PORT, () => {
console.log(`守护进程服务器运行在端口 ${PORT}`);
});
// 优雅关闭
process.on('SIGTERM', () => {
console.log('收到SIGTERM信号正在关闭所有工具进程...');
for (const [workerId, workerInfo] of workers.entries()) {
workerInfo.process.kill('SIGTERM');
}
server.close(() => {
console.log('守护进程已关闭');
process.exit(0);
});
});
process.on('SIGINT', () => {
console.log('收到SIGINT信号正在关闭所有工具进程...');
for (const [workerId, workerInfo] of workers.entries()) {
workerInfo.process.kill('SIGTERM');
}
server.close(() => {
console.log('守护进程已关闭');
process.exit(0);
});
});

25
packages/pm/package.json Normal file
View File

@@ -0,0 +1,25 @@
{
"name": "@geekgeekrun/pm",
"version": "0.0.1",
"description": "Electron多进程架构",
"main": "main.js",
"scripts": {
"start:daemon": "electron ./daemon",
"dev:daemon": "electron ./daemon --dev",
"start:worker": "electron ./worker",
"dev:worker": "electron ./worker --dev"
},
"keywords": [
"electron",
"multi-process"
],
"author": "",
"license": "MIT",
"devDependencies": {
"electron": "28.2.0"
},
"dependencies": {
"JSONStream": "^1.3.5",
"split2": "^4.2.0"
}
}

244
packages/pm/worker.js Normal file
View File

@@ -0,0 +1,244 @@
// 如果是通过 Electron 运行,禁用 GUI 和 Dock 图标
if (typeof require !== 'undefined') {
try {
const { app } = require('electron');
if (app) {
// 隐藏 Dock 图标macOS
if (process.platform === 'darwin') {
app.dock?.hide();
}
// 防止显示窗口
app.on('ready', () => {
// 不创建任何窗口,保持后台运行
});
// 防止在没有窗口时退出
app.on('window-all-closed', (e) => {
e.preventDefault();
// 不退出应用,保持后台运行
});
}
} catch (e) {
// 不在 Electron 环境中,忽略
}
}
// 工具进程示例
const net = require('net');
const split2 = require('split2');
// 解析命令行参数
let workerId = 'unknown';
let restartCount = 0;
for (let i = 2; i < process.argv.length; i++) {
const arg = process.argv[i];
if (arg.startsWith('--worker-id=')) {
workerId = arg.split('=')[1] || 'unknown';
} else if (arg.startsWith('--restart-count=')) {
restartCount = parseInt(arg.split('=')[1] || '0', 10);
}
}
console.log(`工具进程 ${workerId} 已启动 (PID: ${process.pid})${restartCount > 0 ? `,这是第${restartCount}次重启` : ''}`);
const DAEMON_PORT = 12345;
let daemonSocket = null;
let reconnectTimer = null;
let isShuttingDown = false;
// 连接到守护进程
function connectToDaemon() {
if (isShuttingDown) return;
daemonSocket = new net.Socket();
daemonSocket.connect(DAEMON_PORT, 'localhost', () => {
console.log(`[工具进程 ${workerId}] 已连接到守护进程`);
// 注册工具进程连接
sendToDaemon({
type: 'worker-register',
workerId: workerId
});
// 连接成功后立即检查是否应该退出
setTimeout(() => {
checkShouldIExit();
}, 500);
});
// 使用 split2 按行分割流式数据,处理 JSONL 格式(每行一个 JSON
// split2 会自动处理 TCP 分包问题,确保每条完整的消息(以换行符结尾)才会触发
const splitStream = split2();
daemonSocket.pipe(splitStream).on('data', (line) => {
const trimmedLine = line.toString().trim();
if (!trimmedLine) {
return; // 跳过空行
}
try {
const message = JSON.parse(trimmedLine);
handleDaemonMessage(message);
} catch (parseError) {
console.error(`[工具进程 ${workerId}] 解析JSON消息失败:`, parseError.message);
console.error('原始数据:', trimmedLine.substring(0, 100)); // 只打印前100个字符
}
});
splitStream.on('error', (err) => {
console.error(`[工具进程 ${workerId}] split2 流处理错误:`, err);
});
daemonSocket.on('error', (err) => {
console.error(`[工具进程 ${workerId}] 守护进程连接错误:`, err.message);
daemonSocket = null;
// 尝试重连(如果不是正在关闭)
if (!isShuttingDown) {
if (reconnectTimer) clearTimeout(reconnectTimer);
reconnectTimer = setTimeout(() => {
console.log(`[工具进程 ${workerId}] 尝试重新连接守护进程...`);
connectToDaemon();
}, 2000);
}
});
daemonSocket.on('close', () => {
console.log(`[工具进程 ${workerId}] 守护进程连接已关闭`);
daemonSocket = null;
// 尝试重连(如果不是正在关闭)
if (!isShuttingDown) {
if (reconnectTimer) clearTimeout(reconnectTimer);
reconnectTimer = setTimeout(() => {
console.log(`[工具进程 ${workerId}] 尝试重新连接守护进程...`);
connectToDaemon();
}, 2000);
}
});
}
// 发送消息到守护进程
function sendToDaemon(message) {
if (daemonSocket && !daemonSocket.destroyed) {
try {
daemonSocket.write(JSON.stringify(message) + '\n');
} catch (e) {
console.error(`[工具进程 ${workerId}] 发送消息失败:`, e);
}
} else {
console.warn(`[工具进程 ${workerId}] 守护进程未连接,消息已丢弃:`, message.type);
}
}
// 处理守护进程消息
function handleDaemonMessage(message) {
if (message.type === 'worker-registered') {
console.log(`[工具进程 ${workerId}] 连接已注册到守护进程`);
} else if (message.type === 'check-should-exit-response') {
// 处理是否应该退出的查询响应
if (message.shouldExit) {
console.log(`[工具进程 ${workerId}] 守护进程指示应该退出,正在退出...`);
shutdown();
}
} else if (message.error) {
console.error(`[工具进程 ${workerId}] 守护进程错误:`, message.error);
// 如果守护进程要求退出(例如:已被标记为停止)
if (message.shouldExit) {
console.log(`[工具进程 ${workerId}] 守护进程要求退出,正在退出...`);
shutdown();
}
}
}
// 检查是否应该退出
function checkShouldIExit() {
if (isShuttingDown) return;
if (!daemonSocket || daemonSocket.destroyed) {
// 如果守护进程未连接,暂时不检查,等待重连
return;
}
sendToDaemon({
type: 'check-should-exit',
workerId: workerId
});
}
// 初始连接
connectToDaemon();
// 模拟工作负载
let counter = 0;
const interval = setInterval(() => {
counter++;
console.log(`[工具进程 ${workerId}] 运行中... 计数: ${counter}`);
// 发送工作数据到守护进程
sendToDaemon({
type: 'worker-data',
workerId: workerId,
data: {
counter: counter,
timestamp: Date.now(),
message: `工具进程 ${workerId} 工作数据`
}
});
// 模拟随机错误(用于测试自动重启功能)
// 取消下面的注释来测试自动重启
// if (Math.random() < 0.001) {
// console.error(`[工具进程 ${workerId}] 模拟错误,退出`);
// process.exit(1);
// }
}, 2000);
// 处理退出信号
function shutdown() {
if (isShuttingDown) return;
isShuttingDown = true;
clearInterval(interval);
if (reconnectTimer) clearTimeout(reconnectTimer);
if (daemonSocket && !daemonSocket.destroyed) {
daemonSocket.destroy();
}
console.log(`[工具进程 ${workerId}] 正在退出...`);
process.exit(0);
}
process.on('SIGTERM', () => {
console.log(`[工具进程 ${workerId}] 收到SIGTERM信号正在退出...`);
shutdown();
});
process.on('SIGINT', () => {
console.log(`[工具进程 ${workerId}] 收到SIGINT信号正在退出...`);
shutdown();
});
// 处理未捕获的异常
process.on('uncaughtException', (error) => {
console.error(`[工具进程 ${workerId}] 未捕获的异常:`, error);
clearInterval(interval);
if (daemonSocket && !daemonSocket.destroyed) {
daemonSocket.destroy();
}
process.exit(1);
});
// 定期发送心跳到守护进程,并检查是否应该退出
setInterval(() => {
sendToDaemon({
type: 'worker-heartbeat',
workerId: workerId,
timestamp: Date.now()
});
// 每次心跳时也检查是否应该退出
checkShouldIExit();
}, 5000);

18
pnpm-lock.yaml generated
View File

@@ -63,6 +63,19 @@ importers:
specifier: ^3.3.2
version: 3.3.2
packages/pm:
dependencies:
JSONStream:
specifier: ^1.3.5
version: 1.3.5
split2:
specifier: ^4.2.0
version: 4.2.0
devDependencies:
electron:
specifier: 28.2.0
version: 28.2.0
packages/run-core-of-geek-auto-start-chat-with-boss:
dependencies:
'@geekgeekrun/dingtalk-plugin':
@@ -5947,6 +5960,11 @@ packages:
engines: {node: '>=0.10.0'}
dev: false
/split2@4.2.0:
resolution: {integrity: sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==}
engines: {node: '>= 10.x'}
dev: false
/sprintf-js@1.1.3:
resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==}
requiresBuild: true