Приручаем многопоточность в Node.js (часть 4: координатор против синхронного кода)
В предыдущей части мы научились эффективно передавать данные вспомогательным потокам из основного через разделяемую память, используя Atomics
-операции и блокировки.
Но мы рассматривали все-таки идеальную ситуацию, когда основной поток больше ничем не занимался, кроме обмена с «подчиненными» уже заранее готовыми данными. В реальных же приложениях такое встречается достаточно редко — обычно эти самые данные приходится готовить непосредственно перед передачей. И, бывает, в этом участвует существенная доля синхронного кода, что для JavaScript крайне неприятно, но иногда неизбежно — например, при вычислении регулярных выражений.
Давайте оценим, насколько синхронные операции «роняют» производительность нашего тестового приложения. И узнаем, как можно в разы улучшить ее, «скрестив ужа с ежом», используя выделенный поток-координатор из позапрошлой части статьи совместно с разделяемой памятью.
Добавим немного синхронности
Возьмем тот же самый тест из предыдущей части про разделяемую память и вместо массива сохраним все «сообщения» во временный каталог:
const fs = require('node:fs');
const {tmpdir} = require('node:os');
const {sep} = require('node:path');
// создаем временную папку и в ней файлы со всеми "сообщениями"
const dir = fs.mkdtempSync(tmpdir() + sep);
messages.forEach((data, i) => {
const fn = i.toString(16).padStart(3, '0');
fs.writeFileSync(dir + sep + fn, data);
});
А уже в ходе теста будем их синхронно вычитывать с помощью readFileSync
, намеренно «притормаживая» EventLoop в основном потоке приложения:
// получаем список всех сообщений в папке
const fns = fs.readdirSync(dir)
.sort()
.map(fn => dir + sep + fn);
remain = fns.length;
tsh = hrtime();
tfs = 0n; // длительность синхронных операций
fns.forEach((fn, id) => {
const ts = hrtime();
const data = fs.readFileSync(fn); // тяжелый синхронный код
tfs += hrtime() - ts; // ... и его продолжительность
pool.postMessage({id, data});
});
Тут мы замеряем как общее время отработки теста, так и отдельно длительность синхронных операций. Их разность даст нам, конечно, не само время обработки (пока основной поток синхронно читает, остальные-то работают), но некоторую оценку общей потери производительности.
Полный код и результаты тестов
const {
Worker
, isMainThread
, parentPort
, workerData
} = require('node:worker_threads');
const {
randomBytes
, createHash
} = require('node:crypto');
const hrtime = process.hrtime.bigint;
const THREAD_FREE = -1;
const EventEmitter = require('events');
class WorkersPool extends EventEmitter {
#queue;
#workersPool;
constructor({queue, workersPool}) {
super();
this.#queue = queue;
this.#workersPool = [...workersPool];
}
#shareMessage(worker, {id, data}) {
worker.data.set(data, 0);
worker.lock[0] = id;
Atomics.notify(worker.lock, 0, 1);
const lock = Atomics.waitAsync(worker.lock, 0, id);
if (lock.value === 'not-equal') {
this.#onMessage(worker);
}
else {
lock.value.then(result => {
this.#onMessage(worker);
});
}
}
#onMessage(worker) {
const msg = this.#queue.shift();
if (msg) {
this.#shareMessage(worker, msg);
}
else {
this.#workersPool.push(worker);
}
}
postMessage(msg) {
const worker = this.#workersPool.pop();
if (worker) {
this.#shareMessage(worker, msg);
}
else {
this.#queue.push(msg);
}
}
}
if (isMainThread) {
const taskSize = 1 << 16;
const tsg = hrtime();
const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));
const fs = require('node:fs');
const {tmpdir} = require('node:os');
const {sep} = require('node:path');
// создаем временную папку и в ней файлы со всеми "сообщениями"
const dir = fs.mkdtempSync(tmpdir() + sep);
messages.forEach((data, i) => {
const fn = i.toString(16).padStart(3, '0');
fs.writeFileSync(dir + sep + fn, data);
});
console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');
const hashes = messages.map(() => undefined);
let remain;
const workers = [];
let active = 1;
let tsh;
let tfs;
process
.on('test:start', () => {
hashes.fill();
const Pow2Buffer = require('./Pow2Buffer');
pool = new WorkersPool({
queue : new Pow2Buffer(8, 16)
, workersPool : workers.slice(0, active)
});
// получаем список всех сообщений в папке
const fns = fs.readdirSync(dir)
.sort()
.map(fn => dir + sep + fn);
remain = fns.length;
tsh = hrtime();
tfs = 0n; // длительность синхронных операций
fns.forEach((fn, id) => {
const ts = hrtime();
const data = fs.readFileSync(fn); // тяжелый синхронный код
tfs += hrtime() - ts; // ... и его продолжительность
pool.postMessage({id, data});
});
})
.on('test:end', () => {
const duration = hrtime() - tsh;
console.log(
'hashed ' + active.toString().padStart(2) + ':'
, (Number(duration - tfs)/1e6 | 0).toString().padStart(4)
, 'ms'
, '| fs.read'
, (Number(tfs)/1e6 | 0).toString().padStart(4)
, 'ms'
, '| total'
, (Number(duration)/1e6 | 0).toString().padStart(4)
, 'ms'
);
if (active < n) {
active++;
process.emit('test:start');
}
else {
process.exit();
}
});
const n = 16;
Promise.all(
Array(n).fill().map(_ => new Promise((resolve, reject) => {
// выделяем сегменты разделяемой памяти
const sharedBufferData = new SharedArrayBuffer(taskSize);
const sharedBufferLock = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
// инициализируем "пустое" состояние блокировки
const lock = new Int32Array(sharedBufferLock);
lock.fill(THREAD_FREE);
// передаем в поток ссылки на сегменты разделяемой памяти
const worker = new Worker(__filename,
{
workerData : {
data : sharedBufferData
, lock : sharedBufferLock
}
}
);
worker.data = new Uint8Array(sharedBufferData);
worker.lock = lock;
worker
.on('online', () => resolve(worker))
.on('message', ({id, hash}) => {
hashes[id] = hash;
if (!--remain) {
process.emit('test:end');
}
});
}))
)
.then(result => {
workers.push(...result);
process.emit('test:start');
});
}
else {
const {data, lock} = workerData;
const sharedData = new Uint8Array(data);
const sharedLock = new Int32Array(lock);
do {
const lock = Atomics.wait(sharedLock, 0, THREAD_FREE);
parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')});
sharedLock[0] = THREAD_FREE;
Atomics.notify(sharedLock, 0, 1);
}
while (true);
}
generated: 1959 ms
hashed 1: 1237 ms | fs.read 4152 ms | total 5390 ms
hashed 2: 616 ms | fs.read 555 ms | total 1171 ms
hashed 3: 411 ms | fs.read 667 ms | total 1079 ms
hashed 4: 344 ms | fs.read 801 ms | total 1145 ms
hashed 5: 326 ms | fs.read 560 ms | total 887 ms
hashed 6: 307 ms | fs.read 619 ms | total 926 ms
hashed 7: 336 ms | fs.read 574 ms | total 911 ms
hashed 8: 315 ms | fs.read 572 ms | total 887 ms
hashed 9: 315 ms | fs.read 908 ms | total 1224 ms
hashed 10: 305 ms | fs.read 904 ms | total 1209 ms
hashed 11: 343 ms | fs.read 580 ms | total 924 ms
hashed 12: 300 ms | fs.read 598 ms | total 899 ms
hashed 13: 326 ms | fs.read 597 ms | total 924 ms
hashed 14: 308 ms | fs.read 584 ms | total 893 ms
hashed 15: 315 ms | fs.read 580 ms | total 895 ms
hashed 16: 310 ms | fs.read 579 ms | total 890 ms
Синхронный код основного потока снижает общую производительность
В точке максимальной эффективности »4 потока на 4 ядрах», которую мы вычислили в прошлый раз, условно «чистое» время ухудшилось на 15% -, а ведь мы всего лишь на некоторое время заняли основной поток.
К сожалению, отреагировать на освобождение вспомогательного потока и дать ему следующую задачу основной может не раньше, чем завершит текущую синхронную операцию:
Ожидание вспомогательного потока из-за синхронного кода в основном
Поток-координатор
Но давайте тогда процесс «раздачи заданий» вытащим в отдельный поток, описанный во второй части нашего цикла — пусть в нем будет минимум синхронного кода и его реакции ничем не затормаживаются:
Из основного потока передавать информацию в координатор мы будем «по ссылке», без копирования, с помощью transferList:
fns.forEach((fn, id) => {
const ts = hrtime();
const data = fs.readFileSync(fn); // тяжелый синхронный код
tfs += hrtime() - ts; // ... и его продолжительность
coordinator.postMessage(
{id, data}
, [data.buffer] // передача по ссылке через transferList
);
});
…, а уже между координатором и вспомогательными потоками — копировать через разделяемую память, которую передадим при увязке дочерних потоков:
// ...
Promise.all(
workers.slice(0, active)
.flatMap(worker => {
const shared = { // сегменты разделяемой памяти вспомогательного процесса
data : worker.data
, lock : worker.lock
};
return [{worker}, {worker : coordinator, shared}];
})
.map(adressee => new Promise((resolve, reject) => {
const {worker, shared} = adressee;
worker.signalPort.once('message', () => resolve(adressee));
worker.signalPort.postMessage(shared); // shared попадет только в координатор
}))
)
// ...
Но если мы оставим в потоке бесконечный синхронный цикл ожидания блокировки, у нас пройдет только первый тест, и все встанет — поскольку «слушать» информацию от нового экземпляра координатора будет уже некому.
Поэтому переведем основной рабочий код тоже на Atomics.waitAsync, заодно сможем вернуть и оценку загрузки EventLoop:
// ...
case 'worker':
const {data, lock} = workerData;
const sharedData = new Uint8Array(data);
const sharedLock = new Int32Array(lock);
const processMessage = () => {
// обрабатываем сообщение
parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')})
// уведомляем координатор о своей доступности
sharedLock[0] = THREAD_FREE;
Atomics.notify(sharedLock, 0, 1);
// ... и возвращаемся к ожиданию блокировки
wait();
};
const wait = () => {
const lock = Atomics.waitAsync(sharedLock, 0, THREAD_FREE);
if (lock.value === 'not-equal') {
// если значение изменилось, то поток уже обработал задачу, и реагируем сразу
processMessage();
}
else {
// иначе ждем разрешения Promise блокировки
lock.value.then(result => {
processMessage();
});
}
};
// сигнализируем готовность и начинаем ждать
signalPort.on('message', () => {
signalPort.postMessage(undefined);
wait();
});
break;
// ...
Полный код и результаты тестов
const {
Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
} = require('node:worker_threads');
const {
randomBytes
, createHash
} = require('node:crypto');
const hrtime = process.hrtime.bigint;
const THREAD_FREE = -1;
if (isMainThread) {
const taskSize = 1 << 16;
const tsg = hrtime();
const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));
const fs = require('node:fs');
const {tmpdir} = require('node:os');
const {sep} = require('node:path');
// создаем временную папку и в ней файлы со всеми "сообщениями"
const dir = fs.mkdtempSync(tmpdir() + sep);
messages.forEach((data, i) => {
const fn = i.toString(16).padStart(3, '0');
fs.writeFileSync(dir + sep + fn, data);
});
console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');
const hashes = messages.map(() => undefined);
let remain;
const workers = [];
let active = 1;
let tsh;
let tfs;
let coordinator;
process
.on('test:start', () => {
hashes.fill();
const channel = new MessageChannel();
coordinator = new Worker(__filename,
{
workerData : {
signalPort : channel.port1
, workerType : 'coordinator'
}
, transferList : [channel.port1]
}
);
coordinator.signalPort = channel.port2;
coordinator.signalPort.setMaxListeners(0);
coordinator.on('online', () => {
Promise.all(
workers.slice(0, active)
.flatMap(worker => {
const shared = { // сегменты разделяемой памяти вспомогательного процесса
data : worker.data
, lock : worker.lock
};
return [{worker}, {worker : coordinator, shared}];
})
.map(adressee => new Promise((resolve, reject) => {
const {worker, shared} = adressee;
worker.signalPort.once('message', () => resolve(adressee));
worker.signalPort.postMessage(shared); // shared попадет только в координатор
}))
)
.then(result => {
// получаем список всех сообщений
const fns = fs.readdirSync(dir)
.sort()
.map(fn => dir + sep + fn);
remain = fns.length;
[coordinator, ...workers].forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
tsh = hrtime();
tfs = 0n;
fns.forEach((fn, id) => {
const ts = hrtime();
const data = fs.readFileSync(fn); // тяжелый синхронный код
tfs += hrtime() - ts; // ... и его продолжительность
coordinator.postMessage(
{id, data}
, [data.buffer] // передача по ссылке через transferList
);
});
});
});
})
.on('test:end', () => {
const duration = hrtime() - tsh;
[coordinator, ...workers].forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0)/active;
console.log(
'hashed ' + active.toString().padStart(2) + ':'
, (Number(duration - tfs)/1e6 | 0).toString().padStart(4)
, 'ms'
, '| fs.read'
, (Number(tfs)/1e6 | 0).toString().padStart(4)
, 'ms'
, '| total'
, (Number(duration)/1e6 | 0).toString().padStart(4)
, 'ms | ' + (avg * 100 | 0) + ' | '
, (coordinator.util * 100 | 0).toString().padStart(3) + ' c'
, workers.map(
worker => (worker.util * 100 | 0).toString().padStart(3)
).join(' ')
);
if (active < n) {
active++;
process.emit('test:start');
}
else {
process.exit();
}
});
const n = 16;
Promise.all(
Array(n).fill().map(_ => new Promise((resolve, reject) => {
// выделяем сегменты разделяемой памяти
const sharedBufferData = new SharedArrayBuffer(taskSize);
const sharedBufferLock = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
// инициализируем "пустое" состояние блокировки
const lock = new Int32Array(sharedBufferLock);
lock.fill(THREAD_FREE);
const channel = new MessageChannel();
// передаем в поток ссылки на сегменты разделяемой памяти
const worker = new Worker(__filename,
{
workerData : {
signalPort : channel.port1
, workerType : 'worker'
, data : sharedBufferData
, lock : sharedBufferLock
}
, transferList : [channel.port1]
}
);
worker.signalPort = channel.port2;
worker.data = new Uint8Array(sharedBufferData);
worker.lock = lock;
worker
.on('online', () => resolve(worker))
.on('message', ({id, hash}) => {
hashes[id] = hash;
if (!--remain) {
process.emit('test:end');
}
});
}))
)
.then(result => {
workers.push(...result);
process.emit('test:start');
});
}
else {
const {signalPort, workerType} = workerData;
switch (workerType) {
case 'worker':
const {data, lock} = workerData;
const sharedData = new Uint8Array(data);
const sharedLock = new Int32Array(lock);
const processMessage = () => {
// обрабатываем сообщение
parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')})
// уведомляем координатор о своей доступности
sharedLock[0] = THREAD_FREE;
Atomics.notify(sharedLock, 0, 1);
// ... и возвращаемся к ожиданию блокировки
wait();
};
const wait = () => {
const lock = Atomics.waitAsync(sharedLock, 0, THREAD_FREE);
if (lock.value === 'not-equal') {
// если значение изменилось, то поток уже обработал задачу, и реагируем сразу
processMessage();
}
else {
// иначе ждем разрешения Promise блокировки
lock.value.then(result => {
processMessage();
});
}
};
// сигнализируем готовность и начинаем ждать
signalPort.on('message', () => {
signalPort.postMessage(undefined);
wait();
});
break;
case 'coordinator':
const pool = [];
const queue = new (require('./Pow2Buffer'))(8, 16);
const shareMessage = (worker, {id, data}) => {
worker.data.set(data, 0);
worker.lock[0] = id;
Atomics.notify(worker.lock, 0, 1);
const lock = Atomics.waitAsync(worker.lock, 0, id);
if (lock.value === 'not-equal') {
// если значение изменилось, то поток уже обработал задачу, и реагируем сразу
onMessage(worker);
}
else {
// иначе ждем разрешения Promise блокировки
lock.value.then(result => {
onMessage(worker);
});
}
}
const onMessage = worker => {
const msg = queue.shift();
if (msg) {
shareMessage(worker, msg);
}
else {
pool.push(worker);
}
}
// по сигнальному каналу передаем порт worker'а
signalPort.on('message', worker => {
// добавляем в пул и подписываемся на обработку сигнала готовности от worker'а
pool.push(worker);
signalPort.postMessage(undefined);
});
// обработка входящего сообщения координатору
parentPort.on('message', message => {
const worker = pool.pop();
if (worker) {
shareMessage(worker, message);
}
else {
queue.push(message);
}
});
break;
}
}
generated: 2043 ms
hashed 1: 373 ms | fs.read 2387 ms | total 2761 ms | 42 | 10 c 42 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
hashed 2: 118 ms | fs.read 602 ms | total 720 ms | 89 | 39 c 98 79 0 0 0 0 0 0 0 0 0 0 0 0 0 0
hashed 3: 119 ms | fs.read 570 ms | total 690 ms | 80 | 33 c 100 99 40 0 0 0 0 0 0 0 0 0 0 0 0 0
hashed 4: 75 ms | fs.read 603 ms | total 679 ms | 77 | 23 c 100 100 96 12 0 0 0 0 0 0 0 0 0 0 0 0
hashed 5: 204 ms | fs.read 931 ms | total 1135 ms | 81 | 27 c 100 100 100 98 11 0 0 0 0 0 0 0 0 0 0 0
hashed 6: 146 ms | fs.read 994 ms | total 1141 ms | 82 | 12 c 100 100 100 100 88 9 0 0 0 0 0 0 0 0 0 0
hashed 7: 260 ms | fs.read 952 ms | total 1212 ms | 81 | 22 c 100 100 100 100 100 67 5 0 1 0 0 0 1 1 0 1
hashed 8: 142 ms | fs.read 1433 ms | total 1576 ms | 89 | 16 c 100 100 100 100 100 100 100 13 0 0 0 0 0 0 0 0
hashed 9: 253 ms | fs.read 1554 ms | total 1807 ms | 86 | 9 c 100 100 100 100 100 100 100 74 0 0 0 0 0 0 0 0
hashed 10: 196 ms | fs.read 1587 ms | total 1784 ms | 80 | 7 c 100 100 100 100 100 100 100 100 6 2 0 0 0 0 0 0
hashed 11: 311 ms | fs.read 1948 ms | total 2259 ms | 87 | 21 c 100 100 100 100 100 100 100 100 81 81 4 0 0 0 0 0
hashed 12: 237 ms | fs.read 1973 ms | total 2210 ms | 83 | 7 c 100 100 100 100 100 100 100 100 100 100 0 0 0 0 0 0
hashed 13: 362 ms | fs.read 2014 ms | total 2377 ms | 78 | 14 c 100 100 100 100 100 100 100 100 100 100 5 5 8 0 0 0
hashed 14: 73 ms | fs.read 2280 ms | total 2354 ms | 77 | 6 c 100 100 100 100 100 100 100 100 100 100 7 38 37 5 0 0
hashed 15: 413 ms | fs.read 2513 ms | total 2927 ms | 83 | 12 c 100 100 100 100 100 100 100 100 100 100 52 100 100 3 1 0
hashed 16: 241 ms | fs.read 2619 ms | total 2861 ms | 86 | 21 c 100 100 100 100 100 100 100 100 100 100 100 100 100 52 11 12
Координатор решает!
Код нашего приложения уже совсем перестал быть простым, но зато мы все-таки еще почти в 4 раза уменьшили время обработки.
При этом по координатору еще остается запас для другой полезной работы — например, для динамического управления количеством вспомогательных потоков. Но про это — в следующей части.