Node.js Cluster with Workers
JS
S
JavaScriptCreating a new V8 instance on all Processor Cores Take advantage of all the available processing power. The Cluster is running in either master or slave. Due to the usage of Node fork() API, a communication channel is available between the master process and each worker.
1const cluster = require('cluster');
2const os = require('os');
3console.log(`Master PID: ${process.pid}`);
4/*
5 Availability Strategy
6 kill -SIGUSR2 PID // send signal to the cluster process
7*/
8
9// Worker Processes Fork Factory
10// --------------------------------------------
11let workers = 0;
12const cpus = os.cpus().length;
13let totalUsers = 50;
14let workersArray = [];
15
16if (cluster.isMaster) {
17 console.log(`Forking for ${cpus} CPUs`);
18 for (let i = 0; i < cpus; i++) {
19 cluster.fork();
20 }
21 // Availabaility Strategy
22 cluster.on('exit', (worker, code, signal) => {
23 if (code !== 0 && !worker.exitedAfterDisconnect) { // prevent forking new processes if the cluster disconnected a worker
24 console.log(`Worker ${worker.id} crashed. ` +
25 'Starting a new worker...');
26 cluster.fork();
27 }
28 });
29} else {
30 require('./server');
31}
32
33// Master - Workers Communication Hub
34// Wait until all workers are ready (listening)
35// --------------------------------------------
36cluster.addListener('listening', (worker) => {
37 workers++;
38 if (workers == os.cpus().length) {
39 workersArray = Object.values(cluster.workers); // copy references to array (easy indexing)
40 broadCastToAllWorkers();
41 setInterval(() => {
42 updateWorkersWithCachedValue();
43 }, 5000);
44 }
45});
46
47const broadCastToAllWorkers = function () {
48 Object.values(cluster.workers).forEach(worker => {
49 worker.send(`Worker ${worker.process.pid}`);
50 });
51}
52
53const numberOfUsersInDB = function () {
54 return totalUsers++;
55}
56
57const updateWorkersWithCachedValue = () => {
58 const usersCount = numberOfUsersInDB();
59 Object.values(cluster.workers).forEach(worker => {
60 worker.send({ usersCount });
61 });
62};
63
64
65// Intentional Restart / Server Code Update with 0 Downtime
66// 1 at a time to have 100% availability)
67// --------------------------------------------
68process.on('SIGUSR2', () => {
69 console.log('SIGUSR2 RECEIVED', workersArray);
70 restartWorker(0); // start with the first indexed worker
71});
72
73// Recursive function to restart all workers one by one
74const restartWorker = (workerIndex) => {
75 const worker = workersArray[workerIndex];
76 if (!worker) return;
77
78 // Current worker disconnection successfull
79 worker.on('exit', () => {
80 if (!worker.exitedAfterDisconnect) return; // Guard: Not caused by -SIGUSR2
81 console.log(`>> Exited process ${worker.process.pid}`);
82
83 // Create a new Worker and Disconnect the next one (i++)
84 cluster.fork().on('listening', (newWorker) => {
85 restartWorker(workerIndex + 1);
86 });
87 });
88
89 worker.disconnect(); // disconnect the existing worker
90};Created on 2/27/2019