[Из песочницы] Балансировщик запросов к базе на node.js

В этой статье я опишу две абстракции-классы, написанные средствами nodejs, которые предоставляет функционал распределения запросов по открытым каналам (tcp-socket). При этом учитывается общая загруженность системы и, если каналов не хватает, открываются новые, по мере уменьшения общего количества запросов — «лишние» каналы закрываются.Этот балансировщик можно использовать для распределения запросов по каналам, которые представляют собой по сути net.Socket. Для этого нужно внести изменения в метод по открытию и закрытию канала, добавлению запроса в канал.

В примере, который я опишу, используется библиотека pg, предоставляющая функционал по открытию сокетов к серверу с базой данных. При этом дефолтовое управление пулом коннектов, предоставляемое библиотекой, никак не используется.Для начала рассмотрим класс Connect, с помощью которого будет осуществляться управление одной сущностью — коннектом:

/* Конструктор класса коннект, в качестве аргумента строка формата «pg://USER: PASSWORD@HOST: PORT/DATABASE» */ function Connect (connString) { // сохраняем параметры в свойстве объекта this._connString = connString;

// свойство отвечающее, за запуск обработки запросов this._isRun = false;

// максимальное количество запросов помещенных в сокет, после которого будет вызвано событие «maxCount» this._maxQueryCount = 100;

// служебное свойство, используемое в методе _nextTick this._worked = false;

// количество запросов, висящих на коннекте this._queryCount = 0;

// «движок» класса this._emitter = new (require ('events').EventEmitter);

// делаем «селфи» var self = this;

// на открытие коннекта создаем обработчик «open», в котором регистрируем массив коннектов this._emitter.on ('open', function () { self._arrayQuery = []; });

// на событие ошибки будет сгенерирована ошибка, которая если не навесить обработчик, повалит выполнение скрипта this._emitter.on ('error', function (err) { throw err; });

// на событие достижения лимита этого коннекта, пометим его флагом this._emitter.on ('maxCount', function (message) { self._setMax = true; });

// при создании экземпляра класса открываем коннект до базы, здесь может быть открытие любого коннекта, // который представляет собой по сути net.Socket pg.connect (this._connString, function (err, client, done) { if (err) { return self._emitter.emit ('error', err); }

// запишем в «внутреннее» свойство ссылку на клиент, который общается с базой self._client = client;

// «мягкое закрытие» клиента self._done = done;

// вызываем событие готовности (передаем событие далее по цепочке) self._emitter.emit ('open'); }); }

/* метод, который предоставляет функционал по «навешиванию» обработчиков на события */ Connect.prototype.on = function (typeEvent, func) { if (typeEvent == 'error') { // если это обработчик на ошибки подменяем стандартный обработчик пользовательским this._emitter.removeAllListeners ('error'); }

this._emitter.addListener (typeEvent, func); };

/* метод, которые запускает работу по обработке запросов */ Connect.prototype.start = function () { this._isRun = true; this._nextTick (); };

/* метод, которые останавливает работу по обработке запросов */ Connect.prototype.stop = function () { this._isRun = false; };

/* метод, возвращающий состоянии коннекта (заполнен оли он) */ Connect.prototype.isFull = function () { return this._setMax; };

/* метод, закрывающий мягко коннект (т.е. если на коннекте висят запросы, программа дождется их выполнения и закроет коннект) */ Connect.prototype.close = function () { if (this._done) { this._emitter.emit ('close'); this._done (); } else { this._emitter.emit ('error', new Error ('connect is not active')); } };

/* метод, возвращающий массив обрабатываемых запросов */ Connect.prototype.queryQueue = function () { return this._arrayQuery; };

/* главный рабочий метод класса — добавление запроса. В качестве аргументов сам запрос в виде строки, параметры запроса, коллбэк на завершении запроса */ Connect.prototype.addQuery = function (query, params, cb) { if (!(typeof query == 'string')) { return this._emitter.emit ('error', new Error ('not valid query')); }

if (!(typeof params == «object») || !(params instanceof Array)) { return this._emitter.emit ('error', new Error ('not valid argument')); }

this._queryCount++; this._arrayQuery.push ({ query: query, params: params, callback: cb });

if (this._queryCount > this._maxQueryCount) { this._emitter.emit ('maxCount', 'in queue added too many requests, the waiting time increases'); }

this._nextTick (); };

/* метод по манипулированию максимальным количеством запросов в коннекте */ Connect.prototype.maxQueryCount = function (count) { if (count) { this._maxQueryCount = count; } else { return this._maxQueryCount; } };

/* возвращает количество обрабатываемых запросов */ Connect.prototype.queryCount = function () { return this._queryCount; };

/* внутренний метод класса, ответственный за выполнение запросов, в данном случае реализован вариант, когда все запросы сразу отправляются к базе, возможна реализация в случае с последовательным выполнением запросы хранятся во внутреннем буффере (массиве _arrayQuery) и отправляются к базе по мере выполнения предыдущего */ Connect.prototype._nextTick = function () { var self = this; if (this._worked) { return; }

while (this._isRun && this._arrayQuery.length>0) { this._worked = true; var el = this._arrayQuery.shift ();

// здесь используется синтаксис библиотеки pg, к которой мы привязаны this._client.query (el.query, el.params, function (err, result) { self._queryCount--; if (err) { return el.callback (err); } el.callback (null, result);

if (self._queryCount==0) { self._emitter.emit ('drain'); self._setMax = false; }

}) }

this._worked = false; }; Теперь непосредственно класс Balanser, который будет управлять нашими коннектами: открывать новые, закрывать лишние, распределять между ними запросы, предоставлять единый вход для сервиса

/* конструктор класса балансировщика, который будет распределять запросы */ function Balanser (minCountConnect, maxCountConnect) { // записываем в свойство максимальный предел открытых коннектов до базы this._maxCountConnect = maxCountConnect;

// записываем в свойство минимальный предел открытых коннектов до базы this._minCountConnect = minCountConnect;

// массив коннектов this._connectArray = [];

// закрываемые коннекты this._closedConnect = [];

// массив задач this._taskArray = [];

// служебный флаг this._run = false;

// движок класса this._emitter = new (require ('events').EventEmitter);

// запускаем инициализацию this._init (); }

/* метод инициализации класса, открывающий коннекты последовательно, один за другим */ Balanser.prototype._init = function () { this._cursor = 0; this.activQuery = 0; var self = this;

var i=0;

// рекурсивный вызов функции, добавляющей новый коннект var cycle = function () { i++; if (i

this._addNewConnect (cycle); };

/* собственно метод, открывающий соединение, используем класс коннекта */ Balanser.prototype._addNewConnect = function (cb) { var self = this;

var connect = new GPSconnect (connString); connect.on ('open', function () { self._connectArray.push (connect); cb (); }); };

/* метод, по проверке «загруженности» коннекта и возвращающий индекс коннекта */ Balanser.prototype._cycle = function (pos) { for (var i=pos; i

/* метод, заполняющий коннект запросами */ Balanser.prototype._next = function (connect, el) { connect.addQuery (el.query, el.params, el.cb); connect.start (); this._distribution (); };

/* Главный метод класса — распределяет запросы между коннектами. Распределение проходит по принципу «Round-robin» с проверкой на загруженность коннекта. Это нужно в случае, если какой то запрос оказался «тяжелым», чтобы снять нагрузку с этого коннекта и перераспределить запросы на другие коннекты код оформлен конечно криво, надеюсь в скором времени поправить */ Balanser.prototype._distribution = function () { if (this._taskArray.length>0) { var el = this._taskArray.shift (); this._cursor = this._cycle (this._cursor); var self = this;

if (this._cursor

} else { this._cursor = this._cycle (0);

if (this._cursor this._connectArray[i].queryCount ()) { break; } } if (i==this._connectArray.length) { i = 0; } this._cursor = i;

var connect = this._connectArray[this._cursor]; this._next (connect, el); } } } else { this._run = false; } };

/* метод, который предоставляет функционал по «навешиванию» обработчиков на события */ Balanser.prototype.on = function (typeEvent, func) { this._emitter.addListener (typeEvent, func); };

/* метод, вызываемый для проверки количества открытых коннектов, и если необходимости в таком количестве нет «лишние» коннекты исключается из системы распределения */ Balanser.prototype._removeLoad = function () { var self = this;

var temp = this._connectArray[0].maxQueryCount ().toFixed (); var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp;

if (currentCount< this._connectArray.length ) { while( this._connectArray.length != currentCount ) { var poppedConnect = this._connectArray.pop(); if(poppedConnect.queryCount()==0) { poppedConnect.close(); } else { poppedConnect.index = self._closedConnect.length; poppedConnect.on('drain', function() { poppedConnect.close(); self._closedConnect.slice(poppedConnect.index, 1); }); self._closedConnect.push(poppedConnect); } } } };

/* Cобственно метод, который предоставляет вход-трубу, через который добавляются все запросы. Параметр tube, возможно использовать для дифференсации запросов между собой, пока он никак не используется. */ Balanser.prototype.addQuery = function (tube, query, params, cb) { this.activQuery++; var self = this;

this._removeLoad (); var wrappCb = function () { self.activQuery--; cb.apply (this, arguments); };

this._taskArray.push ({ query: query, params: params, cb: wrappCb }); if (! this._run) { this._run = true; this._distribution (); } }; Как все это проверить? Для тестирования я использую запрос «select pg_sleep (1)», который выполняется 1 секунду и имитирует запрос к базе.

10 000 таких запросов обрабатывались балансировщиком ~101590 ms, при максимальном количестве запросов на коннект равным 100 и границах общего количества каналов=сокетов от 10 до 100.

Использованный скрипт:

var balancer = new Balanser (10,100); balancer.on ('ready', function () {

var y=0; var time = +new Date (); for (var i=0; i<10000; i++) { balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) { if(err) console.log(err); y++; if(y==10000) { console.log(balancer._connectArray.length); console.log(+new Date()-time); } }); } }); Все исходники доступны на гитхабе.Балансировщик еще, конечно, сырой, многое нужно допилить/переписать, так что прошу сильно не ругать. Если нужно, могу заняться им плотнее.

© Habrahabr.ru