[Из песочницы] Балансировщик запросов к базе на node.js
В этой статье я опишу две абстракции-классы, написанные средствами nodejs, которые предоставляет функционал распределения запросов по открытым каналам (tcp-socket). При этом учитывается общая загруженность системы и, если каналов не хватает, открываются новые, по мере уменьшения общего количества запросов — «лишние» каналы закрываются.Этот балансировщик можно использовать для распределения запросов по каналам, которые представляют собой по сути net.Socket. Для этого нужно внести изменения в метод по открытию и закрытию канала, добавлению запроса в канал.
В примере, который я опишу, используется библиотека pg, предоставляющая функционал по открытию сокетов к серверу с базой данных. При этом дефолтовое управление пулом коннектов, предоставляемое библиотекой, никак не используется.Для начала рассмотрим класс Connect, с помощью которого будет осуществляться управление одной сущностью — коннектом:
/* Конструктор класса коннект, в качестве аргумента строка формата «pg://USER: PASSWORD@HOST: PORT/DATABASE» */ function Connect (connString) { // сохраняем параметры в свойстве объекта this._connString = connString;
// свойство отвечающее, за запуск обработки запросов this._isRun = false;
// максимальное количество запросов помещенных в сокет, после которого будет вызвано событие «maxCount» this._maxQueryCount = 100;
// служебное свойство, используемое в методе _nextTick this._worked = false;
// количество запросов, висящих на коннекте this._queryCount = 0;
// «движок» класса this._emitter = new (require ('events').EventEmitter);
// делаем «селфи» var self = this;
// на открытие коннекта создаем обработчик «open», в котором регистрируем массив коннектов this._emitter.on ('open', function () { self._arrayQuery = []; });
// на событие ошибки будет сгенерирована ошибка, которая если не навесить обработчик, повалит выполнение скрипта this._emitter.on ('error', function (err) { throw err; });
// на событие достижения лимита этого коннекта, пометим его флагом this._emitter.on ('maxCount', function (message) { self._setMax = true; });
// при создании экземпляра класса открываем коннект до базы, здесь может быть открытие любого коннекта, // который представляет собой по сути net.Socket pg.connect (this._connString, function (err, client, done) { if (err) { return self._emitter.emit ('error', err); }
// запишем в «внутреннее» свойство ссылку на клиент, который общается с базой self._client = client;
// «мягкое закрытие» клиента self._done = done;
// вызываем событие готовности (передаем событие далее по цепочке) self._emitter.emit ('open'); }); }
/* метод, который предоставляет функционал по «навешиванию» обработчиков на события */ Connect.prototype.on = function (typeEvent, func) { if (typeEvent == 'error') { // если это обработчик на ошибки подменяем стандартный обработчик пользовательским this._emitter.removeAllListeners ('error'); }
this._emitter.addListener (typeEvent, func); };
/* метод, которые запускает работу по обработке запросов */ Connect.prototype.start = function () { this._isRun = true; this._nextTick (); };
/* метод, которые останавливает работу по обработке запросов */ Connect.prototype.stop = function () { this._isRun = false; };
/* метод, возвращающий состоянии коннекта (заполнен оли он) */ Connect.prototype.isFull = function () { return this._setMax; };
/* метод, закрывающий мягко коннект (т.е. если на коннекте висят запросы, программа дождется их выполнения и закроет коннект) */ Connect.prototype.close = function () { if (this._done) { this._emitter.emit ('close'); this._done (); } else { this._emitter.emit ('error', new Error ('connect is not active')); } };
/* метод, возвращающий массив обрабатываемых запросов */ Connect.prototype.queryQueue = function () { return this._arrayQuery; };
/* главный рабочий метод класса — добавление запроса. В качестве аргументов сам запрос в виде строки, параметры запроса, коллбэк на завершении запроса */ Connect.prototype.addQuery = function (query, params, cb) { if (!(typeof query == 'string')) { return this._emitter.emit ('error', new Error ('not valid query')); }
if (!(typeof params == «object») || !(params instanceof Array)) { return this._emitter.emit ('error', new Error ('not valid argument')); }
this._queryCount++; this._arrayQuery.push ({ query: query, params: params, callback: cb });
if (this._queryCount > this._maxQueryCount) { this._emitter.emit ('maxCount', 'in queue added too many requests, the waiting time increases'); }
this._nextTick (); };
/* метод по манипулированию максимальным количеством запросов в коннекте */ Connect.prototype.maxQueryCount = function (count) { if (count) { this._maxQueryCount = count; } else { return this._maxQueryCount; } };
/* возвращает количество обрабатываемых запросов */ Connect.prototype.queryCount = function () { return this._queryCount; };
/* внутренний метод класса, ответственный за выполнение запросов, в данном случае реализован вариант, когда все запросы сразу отправляются к базе, возможна реализация в случае с последовательным выполнением запросы хранятся во внутреннем буффере (массиве _arrayQuery) и отправляются к базе по мере выполнения предыдущего */ Connect.prototype._nextTick = function () { var self = this; if (this._worked) { return; }
while (this._isRun && this._arrayQuery.length>0) { this._worked = true; var el = this._arrayQuery.shift ();
// здесь используется синтаксис библиотеки pg, к которой мы привязаны this._client.query (el.query, el.params, function (err, result) { self._queryCount--; if (err) { return el.callback (err); } el.callback (null, result);
if (self._queryCount==0) { self._emitter.emit ('drain'); self._setMax = false; }
}) }
this._worked = false; }; Теперь непосредственно класс Balanser, который будет управлять нашими коннектами: открывать новые, закрывать лишние, распределять между ними запросы, предоставлять единый вход для сервиса
/* конструктор класса балансировщика, который будет распределять запросы */ function Balanser (minCountConnect, maxCountConnect) { // записываем в свойство максимальный предел открытых коннектов до базы this._maxCountConnect = maxCountConnect;
// записываем в свойство минимальный предел открытых коннектов до базы this._minCountConnect = minCountConnect;
// массив коннектов this._connectArray = [];
// закрываемые коннекты this._closedConnect = [];
// массив задач this._taskArray = [];
// служебный флаг this._run = false;
// движок класса this._emitter = new (require ('events').EventEmitter);
// запускаем инициализацию this._init (); }
/* метод инициализации класса, открывающий коннекты последовательно, один за другим */ Balanser.prototype._init = function () { this._cursor = 0; this.activQuery = 0; var self = this;
var i=0;
// рекурсивный вызов функции, добавляющей новый коннект
var cycle = function () {
i++;
if (i this._addNewConnect (cycle);
}; /* собственно метод, открывающий соединение, используем класс коннекта */
Balanser.prototype._addNewConnect = function (cb) {
var self = this; var connect = new GPSconnect (connString);
connect.on ('open', function () {
self._connectArray.push (connect);
cb ();
});
}; /* метод, по проверке «загруженности» коннекта и возвращающий индекс коннекта */
Balanser.prototype._cycle = function (pos) {
for (var i=pos; i /* метод, заполняющий коннект запросами */
Balanser.prototype._next = function (connect, el) {
connect.addQuery (el.query, el.params, el.cb);
connect.start ();
this._distribution ();
};
/*
Главный метод класса — распределяет запросы между коннектами.
Распределение проходит по принципу «Round-robin» с проверкой на загруженность коннекта.
Это нужно в случае, если какой то запрос оказался «тяжелым»,
чтобы снять нагрузку с этого коннекта и перераспределить запросы на другие коннекты
код оформлен конечно криво, надеюсь в скором времени поправить
*/
Balanser.prototype._distribution = function () {
if (this._taskArray.length>0) {
var el = this._taskArray.shift ();
this._cursor = this._cycle (this._cursor);
var self = this; if (this._cursor } else {
this._cursor = this._cycle (0); if (this._cursor var connect = this._connectArray[this._cursor];
this._next (connect, el);
}
}
} else {
this._run = false;
}
}; /* метод, который предоставляет функционал по «навешиванию» обработчиков на события */
Balanser.prototype.on = function (typeEvent, func) {
this._emitter.addListener (typeEvent, func);
};
/*
метод, вызываемый для проверки количества открытых коннектов, и если необходимости в таком количестве нет
«лишние» коннекты исключается из системы распределения
*/
Balanser.prototype._removeLoad = function () {
var self = this; var temp = this._connectArray[0].maxQueryCount ().toFixed ();
var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp; if (currentCount< this._connectArray.length ) {
while( this._connectArray.length != currentCount ) {
var poppedConnect = this._connectArray.pop();
if(poppedConnect.queryCount()==0) {
poppedConnect.close();
} else {
poppedConnect.index = self._closedConnect.length;
poppedConnect.on('drain', function() {
poppedConnect.close();
self._closedConnect.slice(poppedConnect.index, 1);
});
self._closedConnect.push(poppedConnect);
}
}
}
}; /*
Cобственно метод, который предоставляет вход-трубу, через который добавляются все запросы.
Параметр tube, возможно использовать для дифференсации запросов между собой,
пока он никак не используется.
*/
Balanser.prototype.addQuery = function (tube, query, params, cb) {
this.activQuery++;
var self = this; this._removeLoad ();
var wrappCb = function () {
self.activQuery--;
cb.apply (this, arguments);
}; this._taskArray.push ({ query: query, params: params, cb: wrappCb });
if (! this._run) {
this._run = true;
this._distribution ();
}
};
Как все это проверить? Для тестирования я использую запрос «select pg_sleep (1)», который выполняется 1 секунду и имитирует запрос к базе. 10 000 таких запросов обрабатывались балансировщиком ~101590 ms, при максимальном количестве запросов на коннект равным 100 и границах общего количества каналов=сокетов от 10 до 100. Использованный скрипт:
var balancer = new Balanser (10,100);
balancer.on ('ready', function () { var y=0;
var time = +new Date ();
for (var i=0; i<10000; i++) {
balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) {
if(err) console.log(err);
y++;
if(y==10000) {
console.log(balancer._connectArray.length);
console.log(+new Date()-time);
}
});
}
});
Все исходники доступны на гитхабе.Балансировщик еще, конечно, сырой, многое нужно допилить/переписать, так что прошу сильно не ругать. Если нужно, могу заняться им плотнее.