1// load_balancer.js
2// ------------------------------------------------------------------------------------------
3const cluster = require('cluster');
4const os = require('os');
5console.log(`Master PID: ${process.pid}`);
6/*
7 Creating a new V8 instance on all Processor Cores
8 Take advantage of all the available processing power.
9 The Cluster is running in either master or slave.
10 Due to the usage of Node fork() API, a communication channel is available between the master process and each worker.
11
12 Availability Strategy
13 kill -SIGUSR2 PID // send signal to the cluster process
14*/
15
16// Worker Processes Fork Factory
17// --------------------------------------------
18let workers = 0;
19const cpus = os.cpus().length;
20let totalUsers = 50;
21let workersArray = [];
22
23if (cluster.isMaster) {
24 console.log(`Forking for ${cpus} CPUs`);
25 for (let i = 0; i < cpus; i++) {
26 cluster.fork();
27 }
28 // Availabaility Strategy
29 cluster.on('exit', (worker, code, signal) => {
30 if (code !== 0 && !worker.exitedAfterDisconnect) { // prevent forking new processes if the cluster disconnected a worker
31 console.log(`Worker ${worker.id} crashed. ` +
32 'Starting a new worker...');
33 cluster.fork();
34 }
35 });
36} else {
37 require('./server');
38}
39
40// Master - Workers Communication Hub
41// Wait until all workers are ready (listening)
42// --------------------------------------------
43cluster.addListener('listening', (worker) => {
44 workers++;
45 if (workers == os.cpus().length) {
46 workersArray = Object.values(cluster.workers); // copy references to array (easy indexing)
47 broadCastToAllWorkers();
48 setInterval(() => {
49 updateWorkersWithCachedValue();
50 }, 5000);
51 }
52});
53
54const broadCastToAllWorkers = function () {
55 Object.values(cluster.workers).forEach(worker => {
56 worker.send(`Worker ${worker.process.pid}`);
57 });
58}
59
60const numberOfUsersInDB = function () {
61 return totalUsers++;
62}
63
64const updateWorkersWithCachedValue = () => {
65 const usersCount = numberOfUsersInDB();
66 Object.values(cluster.workers).forEach(worker => {
67 worker.send({ usersCount });
68 });
69};
70
71
72// Intentional Restart / Server Code Update with 0 Downtime
73// 1 at a time to have 100% availability)
74// --------------------------------------------
75process.on('SIGUSR2', () => {
76 console.log('SIGUSR2 RECEIVED', workersArray);
77 restartWorker(0); // start with the first indexed worker
78});
79
80// Recursive function to restart all workers one by one
81const restartWorker = (workerIndex) => {
82 const worker = workersArray[workerIndex];
83 if (!worker) return;
84
85 // Current worker disconnection successfull
86 worker.on('exit', () => {
87 if (!worker.exitedAfterDisconnect) return; // Guard: Not caused by -SIGUSR2
88 console.log(`>> Exited process ${worker.process.pid}`);
89
90 // Create a new Worker and Disconnect the next one (i++)
91 cluster.fork().on('listening', (newWorker) => {
92 restartWorker(workerIndex + 1);
93 });
94 });
95
96 worker.disconnect(); // disconnect the existing worker
97};
98
99// server.js
100// ------------------------------------------------------------------------------------------
101const http = require('http');
102const pid = process.pid;
103
104let usersCount;
105http.createServer((req, res) => {
106 for (let i = 0; i < 1e7; i++); // simulate CPU work
107 res.write(`Handled by process ${pid}.`);
108 res.end(`Cached value: ${usersCount}`);
109}).listen(7000, () => {
110 console.log(`Started process ${pid}`);
111});
112
113// Communication Channel (from the fork() API for cluster processes)
114process.on('message', msg => {
115 const stringified = JSON.stringify(msg);
116 console.log(`Message received from master: ${stringified}`);
117 if (msg.usersCount) {
118 usersCount = msg.usersCount;
119 }
120});
121
122// Crash Simulation (Availability Simulation)
123// setTimeout(()=> {
124// process.exit(1);
125// },3000);
126
127
128
Created on 8/3/2017