Иструменты Node.js разработчика. Очереди заданий (job queue)
При реализации бэка веб-приложений и мобильных приложений, даже самых простых, уже стало привычным использование таких инструментов как: базы данных, почтовый (smtp) сервер, redis-сервер. Набор используемых инструментов постоянно расширяется. Например, очереди сообщений, судя по количеству установок пакета amqplib (650 тыс. установок в неделю), используется наравне с реляционными базами данных (пакет mysql 460 тыс. установок в неделю и pg 800 тыс. установок в неделю).
Сегодня я хочу рассказать об очередях заданий (job queue), которые пока используются на порядок реже, хотя необходимость в них возникает, практически, во всех реальных проектах
Итак, очереди заданий позволяют асинхронно выполнить некоторую задачу, фактически, выполнить функцию с заданными входными параметрами и в установленное время.
В зависимости от параметров, задание может выполняться:
- сразу после добавления в очередь заданий;
- однократно в установленное время;
- многократно по расписанию.
Очереди заданий позволяют передать выполняемому заданию параметры, отследить и повторно выполнить задания, закончившиеся с ошибкой, установить ограничение на количество одновременно выполняемых заданий.
Подавляющее большинство приложений на Node.js связаны с разработкой REST-API для веб-приложений и мобильных приложений. Сократить время выполнения REST-API — важно для комфортной работы пользователя с приложением. В то же время, вызов REST-API может инициировать длительные и/или ресурсоёмкие операции. Например, после совершения покупки необходимо отправить пользователю пуш-сообщение на мобильное приложение, или же отправить запрос о совершении покупки на REST-API CRM. Эти запросы можно выполнить асинхронно. Как сделать это правильно, если у Вас нет инструмента для работы с очередями заданий? Например, можно отправить сообщение в очередь сообщений, запустить worker, который будет читать эти сообщения и выполнять на основании этих сообщений необходимую работу.
Фактически, примерно это и делают очереди заданий. Однако, если присмотреться внимательно, то очереди заданий имеют несколько принципиальных отличий от очереди сообщений. Во-первых, в очередь сообщений кладут сообщения (статику), а очереди заданий подразумевают выполнение какой-то работы (вызов функции). Во-вторых, очередь заданий подразумевают наличие какого-то процессора (воркера), который будет выполнять заданную работу. При этом нужен дополнительный функционал. Количество процессоров-воркеров должно прозрачно масштабироваться в случае повышения нагрузки. С другой стороны необходимо ограничивать количество одновременно работающих заданий на одном процессоре-воркере, чтобы сгладить пиковые нагрузки и не допустить отказов в обслуживании. Это показывает что есть необходимость в инструменте, который мог бы запускать асинхронные задания, настраивая различные параметры, так же просто как мы делаем запрос по REST-API (а лучше если еще проще).
При помощи очередей сообщений относительно просто реализовать очередь заданий, которые выполняются немедленно после постановки задания в очередь. Но часто требуются выполнить задание однократно в установленное время или же по расписанию. Для этих задач широко используют ряд пакетов, которые реализуют логику работы cron в linux. Чтобы не быть голословным, скажу, что пакет node-cron имеет 480 тыс. установок в неделю, node-schedule — 170 тыс. установок в неделю.
Использовать node-cron это, конечно, удобнее, чем аскетичный setInterval (), но лично я сталкивался с целым рядом проблем при его использовании. Если выразить общий недостаток — это отсутствие контроля за количеством одновременно выполняемых заданий (это стимулирует пиковые нагрузки: повышение нагрузки замедляет работу заданий, замедление работы заданий увеличивает количество одновременно выполняемых заданий, а это в свою очередь еще больше грузит систему), невозможность для повышения производительности запустить node-cron на нескольких ядрах (в этом случае все задания независимо выполняются на каждом ядре) и отсутствие средств для отслеживания и перезапуска заданий, закончившихся с ошибкой.
Я надеюсь, что показал, что необходимость в таком инструменте, как очередь заданий есть наравне с такими инструментами как базы данных. И такие средства появились, хотя еще недостаточно широко применяются. Перечислю наиболее популярные из них:
Я сегодня буду рассматривать применение пакета bull, с которым работаю сам. Почему я выбрал именно этот пакет (хотя не навязываю свой выбор другим). На тот момент, когда я начал искать удобную реализацию очереди сообщений, проект bee-queue был уже остановлен. Реализация kue, по бенчмаркам приведенным в репозитарии bee-queue, сильно отставала от других реализаций и, кроме того, не содержала средств для запуска периодически выполняемых заданий. Проект agenda реализует очереди с сохранением в базе данных mongodb. Это для некоторых кейсов большой плюс, если нужно сверх-надежность при постановке заданий в очередь. Однако не только это решающий фактор. Я, естественно, испытывал все варианты библиотек на выносливость, генерируя большое количество заданий в очереди, и так и не смог добиться от agenda бесперебойной работы. При превышении какого-то количества заданий, agenda останавливалась и прекращала ставить задания в работу.
Поэтому я остановился на bull который реализует удобный API, при достаточном быстродействии с возможностью масштабирования, так как в качестве бэка пакет bull использует redis-сервер. В том числе, можно использовать кластер серверов redis.
При создании очереди очень важно выбрать оптимальные параметры очереди заданий. Параметров много, и значение некоторых из них дошло до меня не сразу. После многочисленных экспериментов я остановился на таких параметрах:
const Bull = require('bull');
const redis = {
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null,
connectTimeout: 180000
};
const defaultJobOptions = {
removeOnComplete: true,
removeOnFail: false,
};
const limiter = {
max: 10000,
duration: 1000,
bounceBack: false,
};
const settings = {
lockDuration: 600000, // Key expiration time for job locks.
stalledInterval: 5000, // How often check for stalled jobs (use 0 for never checking).
maxStalledCount: 2, // Max amount of times a stalled job will be re-processed.
guardInterval: 5000, // Poll interval for delayed jobs and added jobs.
retryProcessDelay: 30000, // delay before processing next job in case of internal error.
drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs).
};
const bull = new Bull('my_queue', { redis, defaultJobOptions, settings, limiter });
module.exports = { bull };
В тривиальных случаях нет необходимости создавать много очередей, так как в каждой очереди можно задавать имена для разных заданий, и с каждым именем связывать свой процессор-воркер:
const { bull } = require('../bull');
bull.process('push:news', 1, `${__dirname}/push-news.js`);
bull.process('push:status', 2, `${__dirname}/push-status.js`);
...
bull.process('some:job', function(...args) { ... });
Я использую возможность, которая идет в bull «из коробки» — распараллеливать процессоры-воркеры на нескольких ядрах. Для этого вторым параметром задается количество ядер на которым будет запущен процессор-воркер, а в третьем параметре имя файла с определением функции обработки задания. Если такая фича не нужна, в качестве второго параметра можно просто передать callback-функцию.
Задание в очередь ставиться вызовом метода add (), которому в параметрах передается имя очереди и объект, который в последующем будет передаваться обработчику задания. Например, в хуке ORM после создания записи с новой новостью, я могу асинхронно отправить всем клиентам пуш сообщение:
afterCreate(instance) {
bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options);
}
Обработчик события принимает в параметрах объект задания с параметрами, переданными в метод add () и функцию done (), которую необходимо вызвать для подтверждения выполнения задания или же для того чтобы сообщить, что задание закончилось с ошибкой:
const { firebase: { admin } } = require('../firebase');
const { makePayload } = require('./makePayload');
module.exports = (job, done) => {
const { id, title, message } = job.data;
const data = {
id: String(id),
type: 'news',
};
const payloadRu = makePayload(title.ru, message.ru, data);
const payloadEn = makePayload(title.en, message.en, data);
return Promise.all([
admin.messaging().send({ ...payloadRu, condition: "'news' in topics && 'ru' in topics" }),
admin.messaging().send({ ...payloadEn, condition: "'news' in topics && 'en' in topics" }),
])
.then(response => done(null, response))
.catch(done);
};
Для просмотра состояния очереди задания можно воспользоваться средством arena-bull:
const Arena = require('bull-arena');
const redis = {
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null,
connectTimeout: 180000
};
const arena = Arena({
queues: [
{
name: 'my_gueue',
hostId: 'My Queue',
redis,
},
],
},
{
basePath: '/',
disableListen: true,
});
module.exports = { arena };
И напоследок маленький лайфхак. Как я уже говорил, bull использует redis-сервер в качестве бэка. Вероятность того что при рестарте redis-сервера задания пропадут весьма мала. Но зная тот факт что сисадмины иногда могут просто «почистить кэш редиса», при этом удалив все задания в частности, я был обеспокоен прежде всего периодически выполняемыми заданиями, которые в этом случае остановились навсегда. В связи с этим я нашело возможность как возобновлять такие периодические задания:
const cron = '*/10 * * * * *';
const { bull } = require('./app/services/bull');
bull.getRepeatableJobs()
.then(jobs => Promise.all(_.map(jobs, (job) => {
const [name, cron] = job.key.split(/:{2,}/);
return bull.removeRepeatable(name, { cron });
})))
.then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }));
setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);
То есть задание сначала исключается из очереди, а затем ставится вновь, и все это (увы) по setInterval (). Собственно без такого вот лайфхака я бы возможно не решился юзать периодические таски на bull.
apapacy@gmail.com
3 июля 2019 года