[recovery mode] Cluster + EXPRESS + socket.io без REDIS
Это конечно все хорошо, но не стоит забывать о том, что Node.js работает в одном потоке. Для этого у нас есть отличный инструмент Cluset.
Передо мной встал вопрос, а можно ли сделать приложения с кластеризацией, но не использовать для этого REDIS? Также для удобства используем EXPRESS.
Создадим простой пример (пока без кластера): Пример из официальной документации.
var app = require('express')();
var server = require('http').Server(app);
var io = require('socket.io')(server);
server.listen(3038);
app.get('/', function (req, res) {
res.sendfile(__dirname + '/index.html');
});
io.on('connection', function (socket) {
socket.emit('news', { hello: 'world' });
socket.on('my other event', function (data) {
console.log(data);
});
});
Пока ничего сложного, все понятно, вешаем сервер на порт 3038 с express и soket.io. Клиентский код будет неизменным:
Test socket.io
Немного пояснений по клиентскому коду. Создаем объект soket с настройками (порт 3038, таймаут подключения 10 мс). В случае ошибок подключения более 5 раз, отключаем работу soket, если менее, то обнуляем счетчик. А также подключаемся к комнате news.
Следующим этапом будет создание кластера. Т.к. сокет нельзя создать на одном порту, то каждый новый worker будет создавать soket.io на новом порту:
var cluster = require('cluster');//Включаем cluster
var cpuCount = require('os').cpus().length;//Количество ядер процессора
var io = [];
//В мастере создаем worker'ов равное количеству ядер процессоров
if (cluster.isMaster) {
for (var i = 0; i < cpuCount; i += 1) {
var worker = cluster.fork();
}
}
//В воркере
if (cluster.isWorker) {
var worker_id = cluster.worker.id;
var express = require('express');
var app = express();
var server = require('http').Server(app);
io[worker_id] = require('socket.io')(server);
server.listen(3030+worker_id);
io[worker_id].on('connection', function (socket) {
console.log( socket.id );
console.log( "WORKER ID :"+worker_id );
socket.emit('news', { hello: 'world' });
socket.on('my other event', function (data) {
console.log(data);
});
});
var app_express = express();
app_express.listen(8081);
app_express.use(express.static('public'));//отдаем статичные данные (см. код клиента)
app_express.get('/', function (request, response) {
response.send('Hello from Worker '+worker_id);
console.log( '------' );
});
app_express.get('/get_port', function (request, response) {
response.send(3030+worker_id);
console.log( 'get_port' );
});
}
app_express.get('/api', function (request, response) {
response.setHeader('Content-Type', 'application/json');
var id = request.param('id');
var msg = request.param('msg');
var port = request.param('port');
var JSON_DATA = {
"worker_id":worker_id
,"id":id
,"msg":msg
,"port":port
};
});
Схематично покажем как все это выглядит:
Когда мы открываем в браузере localhost:8081 cluster сервирует нас на worker по своему правилу. У нас есть вызов:
app_express.get('/api', function (request, response){...}
В этом вызове при открытии localhost:8081/api? msg=test&port=3032&id=100500 мы хотим передать клиенту подключенному по порту 3032 id=100500 сообщение с msg=teest.
Самый простой способ, создать клиента и открыть soket.io по определенному порту
var http = require('http');
var client = http.createClient(3032 , "localhost");
request = client.request();
request.on('response', function( res ) {
res.on('data', function( data ) {
console.log( data );
} );
} );
request.end();
Но к примеру нам нужно отправить broadcast сообщение, и нам неизвестен порт на котором сервируется клиента. Проблема в следующем, мы достоверно не знаем что при открытии localhost:8081/api? msg=test&port=3032&id=100500 мы будем сервироваться на worker 2.
На рисунке показано что при отправки broadcast сообщения с worker1 мы не имеем возможности отправить на прямую клиентам soket.io 2, soket.io 3, soket.io 4, soket.io…
Для решения этой задачи мы можем воспользоваться методами в cluster. В cluster’e мы можем передавать сообщения из master → worker и из worker → master. Для наглядности изобразим это схематично.
Пример работы с отправкой master → worker и worker → master представлен ниже:
// Кластерное приложение
var cluster = require('cluster');
var io = [];
var cpuCount = require('os').cpus().length;
var workers = [];
if (cluster.isMaster) {
// Count the machine's CPUs
// Create a worker for each CPU
for (var i = 0; i < cpuCount; i += 1) {
var worker = cluster.fork();
//Слушаем сообщение от workera
worker.on('message', function(data) {
//отправляем всем worker'ам сообщение
for (var j in workers) {workers[j].send(data);}
});
//Добавляем объект worker в массив
workers.push(worker);
}
}
if (cluster.isWorker) {
//------------------------------------------------------------------------------------//
var worker_id = cluster.worker.id;
var express = require('express');
var app = express();
var server = require('http').Server(app);
io[worker_id] = require('socket.io')(server);
server.listen(3030+worker_id);
io[worker_id].on('connection', function (socket) {
console.log( socket.id );
console.log( "WORKER ID :"+worker_id );
socket.emit('news', { hello: 'world' });
socket.on('my other event', function (data) {
console.log(data);
});
});
//------------------------------------------------------------------------------------//
var app_express = express();
app_express.listen(8081);
app_express.use(express.static('public'));//отдаем статичные данные
app_express.get('/', function (request, response) {
response.send('Hello from Worker '+worker_id);
console.log( '------' );
});
app_express.get('/get_port', function (request, response) {
response.send(3030+worker_id);
console.log( 'get_port' );
});
app_express.get('/api', function (request, response) {
//process.send("----------------------");
response.setHeader('Content-Type', 'application/json');
var id = request.param('id');
var msg = request.param('msg');
var JSON_DATA = {
"worker_id":worker_id
,"id":id
,"msg":msg
};
io[port-3030].to(msg.id).emit('news', msg.msg);
response.send( JSON.stringify(JSON_DATA) );
//Отправляем всем процессам данные
process.send(JSON_DATA);
});
//Обработка сообщений от worker//
process.on('message', function(msg){
console.log(worker_id);
console.log(msg.id);
console.log(msg.msg);
io[worker_id].to(msg.id).emit('news', msg.msg);
});
}
Надеюсь статья будет полезна для новичков, чтобы познакомиться с cluster и общением между master → worker и worker → master. Кратко рассмотрен soket.io. И самое главное не используется redis.