Потоки (Stream) в NodeJS — реки, в которые войдешь дважды
Поток — это концепция, которая была сначала реализована в UNIX системах для передачи данных из одной программы в другую в операциях ввода/вывода. Это позволяет каждой программе быть очень специализированной в том, что она делает — быть независимым модулем. Сочетание таких простых программ помогает в создании более сложных систем путем «объединения» их в цепочку вызовов.
Потоки позволяют обмениваться данными небольшими частями, что в свою очередь дает возможность в своей работе не расходовать много памяти. Конечно, это зависит от того, как вы реализуется внутренний функционал потока.
Распространенная задача — парсинг файла большого объема. Например, в текстовом файле с данными логов нужно найти строку, содержащую определенный текст. Вместо того, чтобы файл полностью загрузить в память, и потом начать разбирать в нем строки в поисках нужной, мы можем его считывать небольшими порциями. Тем самым не занимаем память сверх необходимого, а лишь столько памяти, сколько нужно для буферизации считанных данных. Как только найдем требуемую запись, сразу прекратим дальнейшую работу. Или можем передать найденную запись в другой поток по цепочке, например, для преобразование в другой формат, или сохранения в другой файл.
Модуль stream предоставляет базовый API по работе с потоками в Node.JS. Документации Node.JS вполне достаточно, чтобы разобраться в данном вопросе, но мы попытаемся составить что-то вроде шпаргалки с пояснениями некоторых моментов.
Виды потоков
Есть четыре вида потоков:
- Readable — поток, который предоставляет данные на чтение;
- Writable — поток, в который данные можно записывать;
- Duplex — поток, из которого можно как читать данные (Readable), так и записывать в него (Writable), при этом процесс чтения и записи просиходит независимо друго от друга;
- Transform — разновидность Duplex потоков, которые могут изменять данные при их записи и чтении в/из потока (чаще используется как промежуточное звено в цепочке передачи данных).
Stream instanceof EventEmitter
Все потоки являются экземплярами EventEmitter, то есть можно генерировать события StreamClass.emit ('eventName', data), и обрабатывать их StreamClass.on ('eventName', (data)=>{});
Метод pipe
Чтобы передать данные из одного потока в другой, самый простой способ вызвать над потоками метод pipe:
Readable.pipe(Writable);//например, по "схеме" DataBase -> File
Readable.pipe(Transform).pipe(Writable);//DataBase -> преобразовать в JSON формат -> сохранить JSON в File
Duplex.pipe(Transform).pipe(Duplex);//прочитать из DataBase -> обработать -> записать обратно в DataBase результат
Последняя цепочка вызовов показывает, что реализовывать свои классы потоков лучше таким образом, чтобы каждый их них решал свою задачу.
Как видно — метод pipe возвращает экземпляр потока, который был передан в него, что и позволяет потоки объединять между собой.
Метод pipe, реализован таким образом, что он решает задачу контроля «скорости» передачи данных из одного потока в другой (превышение объема внутреннего буфера потока). Например, Writable поток работает на запись медленнее, чем их передает источник данных Readable. В этом случае передача данных «приостанавливается» до тех пор, пока Writable «не сообщит» (внутренний буфер очистится), что он готов принимать следующую порцию данных.
Buffering
Потоки хранят данные в своем внутреннем буфере. Размер буфера можно указать через параметр highWaterMark, который можно задать в конструкторе класса.
Физический смысл значение highWaterMark зависит от другой опции — objectMode.
new StreamObject({objectMode: false, highWaterMark: кол_во_байт}); //по умолчанию 16384 (16kb)
new StreamObject({objectMode: true, highWaterMark: кол_во_объектов});//по умолчанию 16
В Readable потоке данные буферизируются, когда над ним вызвается метод push (data), и остаются в буфере до тех пор, пока их не прочитают, вызвав метод read (). Как только общий размер внутреннего буфера Readable потока достигнет порогового значения, указанного в highWaterMark, поток временно прекратит чтение данных.
Для Writable буферизация происходит во время вызова над ним метода write (data). Метод вернет true, пока размер буфера не достиг значения highWaterMark, и false, когда буфер переполнен.
При использовании метода pipe (), как раз в этот момент он «останавливает» чтение данных, ожидает событие «drain», после чего передача данных возобновляется.
Object Mode
По умолчанию, потоки работают с данными в виде буфера, но так же могут работать как со строками, так и с другими объектами JavaScript (например, {«user»:{«name»: «Ivan», «last_name»: «Petrov»}}), за исключением null-объекта, который играет отдельную роль при передаче данных (если поток получает null, это является сигналом, что данных для обработки больше нет, и чтение или запись данных завершена). Как установить тот или иной режим потока при его инициализации покажем в примерах ниже.
Состояние flowing или paused потока Readable
- flowing — данные поступают непрерывно и как можно быстро для процесса, который их считывает;
- paused — режим по умолчанию для всех типов потоков, данные передаются только если их явно запросили — явный вызов метода read () (метод read () неявно вызывается «внутри» метода pipe ()).
Состояние flowing === true — автоматически если:
- данные передаются другим потокам через метод pipe ();
- и/или у него есть обработчик события 'data';
- и/или над ним вызван метод resume ().
Из состояния flowing в paused можно переключиться (flowing === false):
- если «разорвем» связь между источником данных и их потребителем (Readable.pipe (Writable); Readable.unpipe(Writable)), и/или удалим обработчик события 'data';
- или вызовем метод Readable.pause ().
На момент инициализации класса Readable flowing === null, то есть еще не реализован механизм чтения данных, и данные не генерируются.
Readable Streams — потоки как источник данных
Readable потоки работают в одном из двух состояний: flowing и paused. В состоянии paused для считывания данных необходимо явно вызывать метод read (). Когда вы передаете данные из одного потока в другой (R.pipe (W)), метод read () вызывается автоматически.
Весь текущий буфер данных можно получить с помощью свойства Readable._readableState.buffer.
'use strict';
const { Readable } = require('stream');
/**
* чтобы реализовать свой класс Readable потока, необходимо имплементировать метод _read().
* именно с нижним подчеркиванием перед именем
* сравните состояние потока (_readableState) во время инициализации, и по окончании чтения данных (on('end', ()=>{}))
*/
class Source extends Readable
{
constructor(array_of_data = [], opt = {})
{
super(opt);
this._array_of_data = array_of_data;
console.log('objectMode ', this._readableState.objectMode);//false по умолчанию, если не задано явно другое
console.log('highWaterMark ', this._readableState.highWaterMark);//16384
console.log('buffer ', this._readableState.buffer);//[] - пустой массив
console.log('length ', this._readableState.length);//0 - кол-во буфер объектов
console.log('flowing ', this._readableState.flowing);//null
//для краткости примеров, добавим обработчики событий в конструкторе
this.on('data', (chunk)=>
{
//при обработке события 'data' - данные считываются из буфера и удаляются из него
console.log('\n---');
console.log('Readable on data ');
//здесь chunk данные в виде буфера
console.log(`chunk = ${chunk} chunk isBuffer ${Buffer.isBuffer(chunk)} and chunk.length is ${chunk.length}`);
//кол-во данных в текущем буфере (кол-во буфер объектов)
console.log('buffer.length ', this._readableState.buffer.length);
console.log('данные: ', chunk.toString(), ' buffer of chunk ', this._readableState.buffer, ' buffer of chunk как строка ', this._readableState.buffer.toString());
})
.on('error',(err)=>
{
console.log('Readable on error ', err);
})
.on('end',()=>
{
console.log('Readable on end ');
console.log('objectMode ', this._readableState.objectMode);//false
console.log('highWaterMark ', this._readableState.highWaterMark);//16384
console.log('buffer ', this._readableState.buffer);//[] - пустой массив
console.log('buffer.length ', this._readableState.buffer.length);//0
console.log('flowing ', this._readableState.flowing);//true !!!так как у нас есть обработчик события 'data'
})
.on('close',()=>
{
console.log('Readable on close не все реализации генерируют это событие');
});
}
_read()
{
let data = this._array_of_data.shift()
if (!data) {
//сообщаем, что данные закончились
this.push(null);
} else {
this.push(data);
}
}
}
/*значение именно как строки, т.к. по умолчанию потоки работают либо со строками, либо с буфером. иначе будет выброшено сообщение об ошибке по время this.push(data) Readable on error TypeError: Invalid non-string/buffer chunk */
let array_of_data = ['1', '2', '3', '4', '5'];
let opts = {/* значения свойств по умолчанию */};
const R = new Source(array_of_data, opts);
array_of_data = ['1', '2', '3', '4', '5'];
opts = {
objectMode: false,
highWaterMark: 1//1 байт лимит для буферизации данных _readableState.buffer.length будет === 1
};
const R2 = new Source(array_of_data, opts);
array_of_data = ['1', '2', '3', '4', '5'];
opts = {
objectMode: false
, encoding: 'utf8'//если задать кодировку (поддерживаемую NodeJS), то поток будет работать с данными как со строками, а не как с буфером
};
const R3 = new Source(array_of_data, opts);//кодировку так же можно задать с помощью метода .setEncoding('utf8')
array_of_data = [1, 2, 3, 4, 5];
/*при таких "настройках" потока будет ошибка. если objectMode: true то не надо указывать кодировку - ни в параметрах, ни через метод Readable.setEncoding('utf8')*/
opts = {
objectMode: true
, encoding: 'utf8'
};
const R4 = new Source(array_of_data, opts);
//при objectMode: true можно передать как строки, или как числа (Number)
array_of_data = [1, 2, 3, 4, 5];
opts = {
objectMode: true
};
const R5 = new Source(array_of_data, opts); //highWaterMark 16 - значение по умолчанию для объектов
/*имитируем задержку при чтении данных (подобное может происходить при Writable.write(someData) === false). пример ниже взят из документации Node.JS.
выполните код, и увидите как данные прекращаются считываться, они накапливаются в буфере, а потом продолжают считываться*/
array_of_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
opts = {
objectMode: true
};
const R6 = new Source(array_of_data, opts);
R6.on('data', (chunk) => {
//приостанавливаем передачу данных на 1 секунду
R6.pause();
setTimeout(() => {
R6.resume();//возобновим работу потока
}, 1000);
});
Writable Streams — потоки для записи данных
Весь текущий буфер данных можно получить с помощью метода writable._writableState.getBuffer ().
'use strict';
const Source = require('./readable.js');
const { Writable } = require('stream');
class Writer extends Writable
{
constructor(opt = {})
{
super(opt);
console.log('objectMode ', this._writableState.objectMode);//false по умолчанию, если не задано явно true
console.log('highWaterMark ', this._writableState.highWaterMark);//16384
console.log('decodeStrings ', this._writableState.decodeStrings);//true по умолчанию; пеобразовывать ли в Buffer данные, до их передачи в метод _write()
console.log('buffer ', this._writableState.getBuffer());//[] - пустой массив
this.on('drain', ()=>
{
console.log('\n------ writable on drain');
})
.on('error', (err)=>
{
console.log('\n------ writable on error', err);
})
.on('finish', ()=>
{
console.log('\n------ writable on finish');
console.log('_writableState.getBuffer()', this._writableState.getBuffer());
});
}
/**
* @param chunk - строка|буфер|объект
* @param encoding - кодировка поступающих данных. если objectMode === true, значение encoding будет игнорироваться
* @param done - callback ф-ция. ее удобнее именовать именно так, потому что вы ее вызываете, когда по логике
* вашего метода _write, нужно сообщить, что завершили запись текущей части данных chunk, и готовы принять на запись
* следующую часть: done(err) - можно передать объект ошибки new Error(...)
* @private
*/
_write(chunk, encoding, done)
{
console.log('_writableState.getBuffer()', this._writableState.getBuffer());
console.log(typeof chunk );
//для пример с потоком Transform см ниже
if (typeof chunk === 'object') {
console.log('chunk = ', chunk.get(), chunk.get() +' in pow '+ chunk.get() +' = '+ chunk.inPow(chunk.get()));
} else {
console.log(`chunk = ${chunk}; isBuffer ${Buffer.isBuffer(chunk)}; chunk.length is ${chunk.length}; encoding = ${encoding}`);
}
/* Пример с ошибкой оставим закомментированным.
Добавим, что:
1) всегда добавляйте обработчик ошибок on('error', (err)=>{...})
2) если выбрасывается ошибка, то поток данных Readable не прекращает свою работу.
в этом слувае вам надо обрабатывать эту ситуацию - например, вызывать Readable.emit('error', err);
и прекращать читать данные Readable.puse(), после обработки ошибки продолжить работу Readable.remuse().
Это в общем случае, и все зависит от ваших задач при работе с потоками
//if (chunk > 3) return done(new Error('chunk > 3'));*/
done();
}
}
let array_of_data = ['1', '2', '3', '4', '5'];
let r_opts = {/* значения по умолчанию */};
const R = new Source(array_of_data, r_opts);
let w_opts = {/* значения по умолчанию */};
const W = new Writer(w_opts);
R.pipe(W);
array_of_data = ['1', '2', '3', '4', '5'];
r_opts = {encoding: 'utf8'};
const R1 = new Source(array_of_data, r_opts);
w_opts = {
decodeStrings: false//данные в _write будут строками в кодировке 'utf8', так как данные из источника - строки ( см r_opts),
};
const W1 = new Writer(w_opts);
R1.pipe(W1);
array_of_data = [1, 2, 3, 4, 5];
r_opts = {objectMode: true};
const R2 = new Source(array_of_data, r_opts);
w_opts = {
objectMode: true//если false, то при записи данных как объектов (см r_opts), будет ошибка "TypeError: Invalid non-string/buffer chunk"
};
const W2 = new Writer(w_opts);
R2.pipe(W2);
array_of_data = [1, 2, 3, 4, 5];
r_opts = {objectMode: true};
const R3 = new Source(array_of_data, r_opts);
w_opts = {
objectMode: true//если false, то при записи данных как объектов (см r_opts), будет ошибка "TypeError: Invalid non-string/buffer chunk"
, highWaterMark: 1 //ограничем буфер; при таком маленьком значении каждый раз будет вызываться событие 'drain'
};
const W3 = new Writer(w_opts);
R3.pipe(W3);
//Вариант без pipe()
const R3_1 = new Source(array_of_data, r_opts);
const W3_1 = new Writer(w_opts);
R3_1.on('data', (chunk)=> {
//R3_1._readableState.flowing === true
console.log('R3_1 in flowing mode', R3_1._readableState.flowing, 'R3_1 _readableState.buffer', R3_1._readableState.buffer);
toWriteOrNotToWriteThatIsTheQuestion(chunk, onDrain);
});
function onDrain() {
//R3_1._readableState.flowing === false, так как был вызван метод R3_1.pause() см toWriteOrNotToWriteThatIsTheQuestion
console.log('R3_1 in flowing mode', R3_1._readableState.flowing);
R3_1.resume();
}
/**
* если на данный момент не можем больше писать в поток Writable, нужно оставноить и получение данных из Readable (R3_1.pause())
* как только буфер очистится (событие 'drain'), мы продолжаем читать данные из источника Readable (см cb R3_1.resume(); ), и записывать в Writable
* @param data
* @param cb
*/
function toWriteOrNotToWriteThatIsTheQuestion(data, cb)
{
//во "внешнем коде" записывать данные через метод write(...), а не через _write(...)
if (!W3_1.write(data)) {
R3_1.pause();
W3_1.once('drain', cb);
} else {
process.nextTick(cb);
}
}
Transform Streams — потоки изменения данных
Transform — разновидность Duplex потоков. Решили сначала показать пример с ним.
'use strict';
const Readable = require('./readable.js');
const Writable = require('./writable.js');
const {Transform} = require('stream');
/*для примера того, что можем передавать не только строки, буфер, простые JS объекты,
но и экземпляры классов*/
class Chunk
{
constructor(chunk)
{
this.set(chunk);
}
set(chunk)
{
this._chunk = chunk;
}
get()
{
return this._chunk;
}
inPow(pow = 2)
{
return Math.pow(this.get(), pow);
}
}
class Transformer extends Transform
{
constructor(opt = {})
{
super(opt);
console.log('\n -------- Transform in constructor');
console.log('objectMode ', this._writableState.objectMode);//false по умолчанию, если не задано явно true
console.log('highWaterMark ', this._writableState.highWaterMark);//16384
console.log('decodeStrings ', this._writableState.decodeStrings);//true по умолчанию; пеобразовывать ли в Buffer данные, до их передачи в метод _write()
console.log('buffer ', this._writableState.getBuffer());//[] - пустой массив
this.on('close', ()=>
{
console.log('\n------ Transform on close');
})
.on('drain', ()=>
{
console.log('\n------ Transform on drain');
})
.on('error', (err)=>
{
console.log('\n------ Transform on error', err);
})
.on('finish', ()=>
{
console.log('\n------ Transform on finish');
})
.on('end', ()=>
{
console.log('\n------ Transform on end');
})
.on('pipe', ()=>
{
console.log('\n------ Transform on pipe');
});
}
/**
* метод, реализующий в себе запись данных (chunk поступают в поток Transform),
* и чтение данных - когда другой поток читает из Transform
* @param chunk
* @param encoding
* @param done - в общем случае done(err, chunk)
* @private
*/
_transform(chunk, encoding, done)
{
/*завершить обработку текущих данных chunk, и передать дальше на чтение можно двумя вариантами
done(null, chunk);
done(err, chunk); - в этом случае будет вызвано событие error
или так, что то же самое:
this.push(chunk);
done();
this.push(chunk);
done(err);*/
//преобразовали выходные данные в экземпляр класса Chunk (см. пример writable.js)
this.push(new Chunk(chunk));
done();
}
/**
* Кастомные transform потоки могут реализовать метод _flush.
Он будет вызван, когда нет больше данных на запись, но перед событием 'end' потока Readable (имеется ввиду Transform, так как это поток и на запись, и на чтение данных).
* @param done - done(err) можно передать объект ошибки Error
* @private
*/
_flush(done)
{
//TODO ... что-нибудь сделали дополнительно перед завершением работы потока
done();
}
}
let array_of_data = ['1', '2', '3', '4', '5'];
let r_opts = {
encoding: 'utf8'
};
const R = new Readable(array_of_data, r_opts);
let t_opts = {
readableObjectMode: true //читать из потока Transform будут объекты
, writableObjectMode: false//записывать в поток Transform можно либо строки или буфер
, decodeStrings: false
};
const T = new Transformer(t_opts);
let w_opts = {
objectMode: true//если false, будет выброшена ошибка
};
const W = new Writable(w_opts);
R.pipe(T).pipe(W);
Duplex Streams — потоки на запись и на чтение
Duplex реализуют в себе как Readable, таки Writable потоки. При этом их «работа» происходит независимо друг от друга.
Если вы заинтересовались темой потоков, предлагаем поэкспериментировать над реализацией своих Duplex потоков самостоятельно.
options Object передается в конструкторы Writable и Readable потока Duplex.
- allowHalfOpen boolean по умолчанию true. Если false, то когда поток на чтение завершает свою работу, то автоматически завершает работу и поток на запись;
- readableObjectMode boolean по умолчанию false. режим objectMode для потока readable. значение свойства игнорируется, если свойство objectMode = true;
- writableObjectMode boolean по умолчанию false. режим objectMode для потока writable. значение свойства игнорируется, если свойство objectMode = true.
Обработка ошибок
Когда в каком-то из звеньев было вызвано событие 'error', и если нужно об этом уведомить «предыдущие» потоки в цепочке, для них так же нужно вызвать событие 'error': StreamClass.emit («error», err), и обработать ситуацию. Или воспользоваться модулем pump (https://github.com/mafintosh/pump), с помощью которого можно решить данный вопрос.
Подводя итоги
С помощью потоков можно решать практически любую задачу:
- сохранение/чтение данных в/из файла или базы данных;
- архивация файлов;
- шифрование данных;
- передачу данных по сети (модули обмена сообщениями в реальном времени, реализовать трансляцию видео, аудио файлов)
Как говорится — на любой вкус.