Асинхронное чтение строк из большого текстового файла на Node.js — Часть 1

Асинхронность node.js часто приводят как одно из достоинств платформы. Действительно, организация программы в виде небольших задач и последовательное их выполнение в рамках одного потока дает некоторое преимущество в распределении ресурсов и большей отзывчивостью приложения. Но это также приводит к необходимости организовывать код в нетривиальной манере. Какой именно, я покажу на примере библиотеки для чтения строк из файла большого объема (500Mb и более).Поскольку статья получается довольно большой, публикую в двух частях. В первой будет рассматриваться асинхронность, во второй — файловые операции.

Часть 1: От последовательного кода к асинхронномуЗадача чтения строки из текстового файла в кодировке UTF-8 не очень простая. Основная сложность — в правильной обработке очередного блока бинарных данных, граница которого может попасть в середину UTF-8 символа или разделителя строк.Давайте пока предположим, что эта задача успешно решена. Тогда последовательный алгоритм будет простым:

function processLineSync (line) { // … }

var line, reader = createTextReader (file); while (line = reader.readLineSync ()) processLineSync (line); reader.close (); Для тестирования сделаем простую реализацию createTextReader ():

function createTextReader () { var counter = 0; return { readLineSync: () => { counter += 1; return counter < 10000000 ? counter.toString() : null; }, readLine : done => { counter += 1; done (null, counter < 10000000 ? counter.toString() : null); }, close : () => { } }; } Нужно заметить, что цикл, код чтения очередной строки и её обработка — последовательные. Соответственно, их выполнение блокирует процесс node.js.

Для того чтобы написать правильный асинхронный код, надо понять, чем асинхронный отличается от последовательного. Для этого предположим, что у метода processLineSync () есть асинхронный аналог — processLine (line, err => { }). Если мы попробуем выполнить асинхронную операцию в цикле, то получим то или иное переполнение. Давайте посмотрим на примере.

function processLine (line, done) { setImmediate (() => { console.log ('ok'); done (); }); }

var line, reader = createTextReader (file); while (line = reader.readLineSync ()) processLine (line, () => { }); reader.close (); Через пару минут этот код падает с ошибкой «out of memory». При этом не выведя ни одного «ok».

А происходит вот что: while (line = reader.readLineSync ())…; вычитывает данные синхронно и создает (а не вызывает) обработчики строк. Естественно, создание обработчика приводит к выделению памяти для него, и через конечное число итераций память заканчивается. Обработчики создаются, так как асинхронная операция в node.js является некоторым объектом, содержащим функцию, который будет вызван после завершения текущего блока. А текущий блок все никак не завершается, что и приводит к переполнению.

Если цикл не годится, что же тогда делать? Получается, что тело цикла само по себе должно быть функцией. И вызывать саму себя для продолжения итераций. Этот код уже намного сложнее простого цикла и в самом простом случае представляет собой следующую конструкцию:

function asyncWhile (criteria, iteration, done) { if (criteria ()) { iteration (() => asyncWhile (criteria, iteration, done)); } else { done (); } } Ну и пример её использования:

var line, reader = createTextReader (file); asyncWhile ( () => { line = reader.readLineSync (); return line!= null; }, done => processLine (line, () => done ()), () => reader.close ()); Работает уже лучше, но пока выглядит немного неуклюже, так как используются глобальные переменные. Также отсутствует обработка ошибок и будет переполнение стека (но об этом чуть позже). Также было бы неплохо читать данные из файла асинхронно.

Чтобы уменьшить зависимость от глобальной переменной в теле цикла, перебросим её в виде параметра. А контекст цикла поместим в контекст функций. Получим следующее:

function asyncWhile (context, criteria, iteration, done) { var value = criteria.call (context); if (value) { iteration.call (context, value, () => asyncWhile (context, criteria, iteration, done)); } else { done.call (context); } }

asyncWhile (createTextReader (file), () => this.readLineSync (), (line, done) => processLine (line, () => done ()), () => this.close ()); Можно заметить, что параметры третьего аргумента совпадают с параметрами processLine. Аналогично для второго параметра processLine. Они оставлены, чтобы продемонстрировать подход, но, конечно, их можно сократить до передачи функции непосредственно:

var reader = createTextReader (file); asyncWhile (reader, reader.readLineSync, processLine, reader.close); Теперь надо добавить обработку ошибок. Для этого предположим, что ошибка возвращается первым параметром callback функции processLine. Такой метод возвращения ошибки является довольно стандартным для node.js.

Добавляем обработку ошибок и получаем:

function asyncWhile (context, criteria, iteration, done) { var value = criteria.call (context); if (value) { iteration (value, err => { if (err) { done.call (context, err); } else { asyncWhile (context, criteria, iteration, done); } }); } else { done.call (context); } } Использование почти не изменилось:

asyncWhile (createTextReader (file), () => this.readLineSync (), (line, done) => processLine (line, (err) => { if (err) console.log (err); done (); }), () => this.close ()); Теперь давайте подсчитаем количество строк в файле:

asyncWhile ({ reader: createTextReader (file), counter: 0 }, () => this.reader.readLineSync (), (line, done) => { this.counter += 1; done (); }, () => { console.log (this.counter); this.reader.close (); }); И почти сразу получаем «Maximum call stack size exceeded». Анализ стека показывает многократно повторяющийся шаблон asyncWhile () → Object. → asyncWhile () →… Это довольно стандартная проблема, которая обходится обновлением стека через вызов функции из основного цикла node.js в момент рекурсии:

function asyncWhile (context, criteria, iteration, done) { var value = criteria.call (context); if (value) { iteration.call (context, value, err => { if (err) { done.call (context, err); } else { setImmediate (() => asyncWhile (context, criteria, iteration, done)); } }); } else { done.call (context, null); } } setImmediate () помещает обработчик в очередь и продолжает выполнение. Этот обработчик будет вызван по завершению текущего блока.

Предпоследним штрихом заменим блокирующее чтение из файла на асинхронное (не забыв про обработку ошибок):

function asyncWhile (context, criteria, iteration, done) { criteria.call (context, (err, value) => { if (err) { done.call (context, err); } else { if (value) { iteration.call (context, value, err => { if (err) { done.call (context, err); } else { setImmediate (() => asyncWhile (context, criteria, iteration, done)); } }); } else { done.call (context, null); } } }); } И последним сделаем createTextReader тоже асинхронным:

function createTextReader (file, done) { var counter = 0; done (null, { readLine: (done) => { counter += 1; done (null, counter < 10000000 ? counter.toString() : null); }, close : () => { } }); } Используем:

createTextReader (file, (err, reader) => { asyncWhile (reader, done => this.readLine ((err, line) => done (err, line)), (line, done) => processLine (line, (err) => { if (err) console.log (err); done (); }), () => this.close ()); }); Как видно, асинхронный код намного сложнее синхронного. И намного медленнее — примерно в 40 раз. С другой стороны, он не блокирует поток, что позволяет запускать несколько таких обработчиков параллельно (в смысле node.js). Ну и чем более сложный обработчик для каждой записи, тем больше время обработки превышает расходы на поддержку асинхронности.

Об авторе: Александр Неткачев — старший разработчик на С# и F#. Поддерживает сайт alexatnet.com, проводит вебинары (Code&Coffe), помогает с кодом начинающим разработчикам (CodeReview4U).

© Habrahabr.ru