NodeJS Basic Load Balancer (using NodeJS Cluster Module)

JS
S
JavaScript

This is a plain simple example of the internals of PM2 / ForeverJS / similar node modules. Uses a round robin strategy to distribute the load across many workers (= number of CPUs)

1// load_balancer.js
2// ------------------------------------------------------------------------------------------
3const cluster = require('cluster');
4const os = require('os');
5console.log(`Master PID: ${process.pid}`);
6/*
7    Creating a new V8 instance on all Processor Cores
8    Take advantage of all the available processing power.
9    The Cluster is running in either master or slave.
10    Due to the usage of Node fork() API, a communication channel is available between the master process and each worker.
11
12    Availability Strategy
13    kill -SIGUSR2 PID // send signal to the cluster process
14*/
15
16// Worker Processes Fork Factory
17// --------------------------------------------
18let workers = 0;
19const cpus = os.cpus().length;
20let totalUsers = 50;
21let workersArray = [];
22
23if (cluster.isMaster) {
24    console.log(`Forking for ${cpus} CPUs`);
25    for (let i = 0; i < cpus; i++) {
26        cluster.fork();
27    }
28    // Availabaility Strategy
29    cluster.on('exit', (worker, code, signal) => {
30        if (code !== 0 && !worker.exitedAfterDisconnect) { // prevent forking new processes if the cluster disconnected a worker
31            console.log(`Worker ${worker.id} crashed. ` +
32                'Starting a new worker...');
33            cluster.fork();
34        }
35    });
36} else {
37    require('./server');
38}
39
40// Master - Workers Communication Hub
41// Wait until all workers are ready (listening)
42// --------------------------------------------
43cluster.addListener('listening', (worker) => {
44    workers++;
45    if (workers == os.cpus().length) {
46        workersArray = Object.values(cluster.workers); // copy references to array (easy indexing)
47        broadCastToAllWorkers();
48        setInterval(() => {
49            updateWorkersWithCachedValue();
50        }, 5000);
51    }
52});
53
54const broadCastToAllWorkers = function () {
55    Object.values(cluster.workers).forEach(worker => {
56        worker.send(`Worker ${worker.process.pid}`);
57    });
58}
59
60const numberOfUsersInDB = function () {
61    return totalUsers++;
62}
63
64const updateWorkersWithCachedValue = () => {
65    const usersCount = numberOfUsersInDB();
66    Object.values(cluster.workers).forEach(worker => {
67        worker.send({ usersCount });
68    });
69};
70
71
72// Intentional Restart / Server Code Update with 0 Downtime
73// 1 at a time to have 100% availability)
74// --------------------------------------------
75process.on('SIGUSR2', () => {
76    console.log('SIGUSR2 RECEIVED', workersArray);
77    restartWorker(0); // start with the first indexed worker
78});
79
80// Recursive function to restart all workers one by one
81const restartWorker = (workerIndex) => {
82    const worker = workersArray[workerIndex];
83    if (!worker) return;
84
85    // Current worker disconnection successfull
86    worker.on('exit', () => {
87        if (!worker.exitedAfterDisconnect) return; // Guard: Not caused by -SIGUSR2
88        console.log(`>> Exited process ${worker.process.pid}`);
89
90        // Create a new Worker and Disconnect the next one (i++)
91        cluster.fork().on('listening', (newWorker) => {
92            restartWorker(workerIndex + 1);
93        });
94    });
95
96    worker.disconnect(); // disconnect the existing worker
97};
98
99// server.js
100// ------------------------------------------------------------------------------------------
101const http = require('http');
102const pid = process.pid;
103
104let usersCount;
105http.createServer((req, res) => {
106    for (let i = 0; i < 1e7; i++); // simulate CPU work
107    res.write(`Handled by process ${pid}.`);
108    res.end(`Cached value: ${usersCount}`);
109}).listen(7000, () => {
110    console.log(`Started process ${pid}`);
111});
112
113// Communication Channel (from the fork() API for cluster processes)
114process.on('message', msg => {
115    const stringified = JSON.stringify(msg);
116    console.log(`Message received from master: ${stringified}`);
117    if (msg.usersCount) {
118        usersCount = msg.usersCount;
119    }
120});
121
122// Crash Simulation (Availability Simulation)
123// setTimeout(()=> {
124//     process.exit(1);
125// },3000);
126
127
128

Created on 8/3/2017