Приручаем многопоточность в Node.js (часть 2: очередь, каналы и координатор)

Multithreading как он естьMultithreading как он есть

В первой части статьи мы остановились на моменте, когда с помощью распределения задач между потоками по алгоритму Round-robin мы добились-таки ускорения работы приложения за счет многопоточности.

Но вот неприятность: такой алгоритм очень неравномерно нагружает потоки и не полностью утилизирует их возможности — пока кто-то простаивает, другой уже копит очередь. Как это можно обойти?

Пул потоков и очередь задач

Давайте попробуем вместо «запихивания» задачи в очередь каждого конкретного потока сформировать общую очередь для некоторого пула потоков:

  • при получении задачи отдаем ее первому свободному потоку или ставим в общую очередь, когда таких нет;

  • при освобождении потока сразу даем ему следующую задачу из очереди.

В качестве такой очереди можно использовать и обычный массив. Но лучше не стоит из-за проблем производительности Array.shift(), описанных в прошлой статье — так что возьмем реализацию на основе кольцевого буфера, приведенную там же.

Схема работы с общей очередьюСхема работы с общей очередью

Основным отличием от схемы с Round-robin, приведенной в прошлой части, является попадание следующей задачи в поток только когда он уже готов ей заниматься, причем сразу при завершении предыдущей.

Модификации в нашем коде будут достаточно невелики — само описание класса WorkersPool:

// ...
const EventEmitter = require('events');
class WorkersPool extends EventEmitter {
  #queue;
  #workersPool;
  #currentWorker;
  #onMessageHandler = Symbol('handler');

  constructor({queue, workersPool}) {
    super();

    this.#queue = queue;
    this.#workersPool = [...workersPool];

    // навешиваем свой обработчик в самое начало цепочки
    this.#workersPool.forEach(worker => {
      worker[this.#onMessageHandler] = this.#onMessage.bind(this, worker);
      worker.prependListener('message', worker[this.#onMessageHandler]);
    });
  }

  destructor() {
    //  гасим свои обработчики
    this.#workersPool.forEach(worker => {
      worker.off('message', worker[this.#onMessageHandler]);
      delete worker[this.#onMessageHandler];
    });
  }

  #onMessage(worker) {
    // если в очереди что-то есть - передаем вызвавшему
    const msg = this.#queue.shift();
    if (msg) {
      worker.postMessage(msg);
    }
    // если нет - оставляем его ждать в пуле
    else {
      this.#workersPool.push(worker);
    }
  }

  postMessage(msg) {
    // достаем из пула свободный воркер
    const worker = this.#workersPool.pop();
    if (worker) {
      worker.postMessage(msg);
    }
    else {
      // если его нет - оставляем сообщение в очереди
      this.#queue.push(msg);
    }
  }
}
// ...

… и пара мест его использования:

// ...
if (isMainThread) {
  // ...
  let pool; // это наш пул
  process
    .on('test:start', () => {
      // ...
      // пул потоков с эффективной очередью
      const Pow2Buffer = require('./Pow2Buffer');
      pool = new WorkersPool({
        queue : new Pow2Buffer(8, 16)
      , workersPool : workers.slice(0, active)
      });
      // ...
      messages.forEach((data, id) => {
        pool.postMessage({id, data}); // отправляем все в пул
      });
    })
    .on('test:end', () => {
      // ...
      if (active < n) {
        pool.destructor(); // зачищаем потоки пула от наших подписок
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });
// ...

Полный код и результаты тестов

const {
  Worker
, isMainThread
, parentPort
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const EventEmitter = require('events');
class WorkersPool extends EventEmitter {
  #queue;
  #workersPool;
  #currentWorker;
  #onMessageHandler = Symbol('handler');

  constructor({queue, workersPool}) {
    super();

    this.#queue = queue;
    this.#workersPool = [...workersPool];

    // навешиваем свой обработчик в самое начало цепочки
    this.#workersPool.forEach(worker => {
      worker[this.#onMessageHandler] = this.#onMessage.bind(this, worker);
      worker.prependListener('message', worker[this.#onMessageHandler]);
    });
  }

  destructor() {
    //  гасим свои обработчики
    this.#workersPool.forEach(worker => {
      worker.off('message', worker[this.#onMessageHandler]);
      delete worker[this.#onMessageHandler];
    });
  }

  #onMessage(worker) {
    // если в очереди что-то есть - передаем вызвавшему
    const msg = this.#queue.shift();
    if (msg) {
      worker.postMessage(msg);
    }
    // если нет - оставляем его ждать в пуле
    else {
      this.#workersPool.push(worker);
    }
  }

  postMessage(msg) {
    // достаем из пула свободный воркер
    const worker = this.#workersPool.pop();
    if (worker) {
      worker.postMessage(msg);
    }
    else {
      // если его нет - оставляем в очереди
      this.#queue.push(msg);
    }
  }
}

if (isMainThread) {
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(1 << 16));
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;

  let pool; // это наш пул
  process
    .on('test:start', () => {
      hashes.fill();
      remain = hashes.length;
    
      // пул потоков с эффективной очередью
      const Pow2Buffer = require('./Pow2Buffer');
      pool = new WorkersPool({
        queue : new Pow2Buffer(8, 16)
      , workersPool : workers.slice(0, active)
      });

      // фиксируем состояние нагрузки на начало теста
      workers.forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
      tsh = hrtime();
      messages.forEach((data, id) => {
        pool.postMessage({id, data});
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;
      workers.forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
      const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0)/active;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms | ' + (avg * 100 | 0) + ' | '
      , workers.map(
          worker => (worker.util * 100 | 0).toString().padStart(2)
        ).join(' ')
      );

      if (active < n) {
        pool.destructor(); // зачищаем пул
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => {
      return new Promise((resolve, reject) => {
        const worker = new Worker(__filename);
        worker
          .on('online', () => resolve(worker))
          .on('message', ({id, hash}) => {
            hashes[id] = hash;
            if (!--remain) {
              process.emit('test:end');
            }
          });
      });    
    })
  )
    .then((result) => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  parentPort.on('message', ({id, data}) => {
    parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
  });
}
generated: 278 ms
hashed  1: 1449 ms | 82 |  82  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  2:  701 ms | 82 |  81 84  0  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  3:  485 ms | 81 |  78 79 85  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  4:  370 ms | 79 |  77 79 78 81  0  0  0  0  0  0  0  0  0  0  0  0
hashed  5:  347 ms | 68 |  66 64 70 69 72  0  0  0  0  0  0  0  0  0  0  0
hashed  6:  360 ms | 57 |  56 59 53 52 57 62  0  0  0  0  0  0  0  0  0  0
hashed  7:  310 ms | 54 |  54 50 54 50 56 55 58  0  0  0  0  0  0  0  0  0
hashed  8:  312 ms | 46 |  40 45 47 50 43 45 44 50  0  0  0  0  0  0  0  0
hashed  9:  311 ms | 42 |  35 42 42 44 38 41 44 57 38  0  0  0  0  0  0  0
hashed 10:  301 ms | 36 |  35 36 31 34 40 38 37 38 36 39  0  0  0  0  0  0
hashed 11:  312 ms | 32 |  27 31 32 32 29 29 37 32 35 35 30  0  0  0  0  0
hashed 12:  338 ms | 30 |  26 34 24 38 27 31 39 28 25 27 30 31  0  0  0  0
hashed 13:  321 ms | 27 |  23 24 23 26 26 24 32 30 26 28 28 31 25  0  0  0
hashed 14:  316 ms | 29 |  28 21 27 39 26 29 30 18 25 36 29 37 25 33  0  0
hashed 15:  314 ms | 26 |  24 29 28 28 18 28 28 37 17 17 30 25 27 28 32  0
hashed 16:  344 ms | 28 |  23 24 27 28 26 35 23 36 22 37 26 43 22 23 25 23

Queue vs Round-RobinQueue vs Round-Robin

При количестве активных потоков не меньше количества ядер процессора реализация с общей очередью выигрывает по времени при более равномерной и существенно меньшей средней утилизации потоков.

Каналы данных и сигналов

Впрочем, в этой радужной картинке есть и толика печали. Средняя нагрузка на потоках уменьшилась ровно за счет того, что мы не можем их теперь эффективно загрузить — особенно хорошо это видно при малом их количестве.

Это объясняется тем, что нагрузка на основной поток заметно выросла из-за необходимости не только реагировать на прикладную активность, но и заниматься обработкой очереди — как минимум, реагировать на каждый ответ, отмечая служебный поток как свободный.

Но ведь мы запросто можем разделить каналы обмена с потоком, на канал данных и канал сигналов как отдельные экземпляры MessageChannel. И будем передавать сигнал освобождения потока только однократно при обработке сразу пачки задач, получаемых через receiveMessageOnPort.

Структура MessageChannelСтруктура MessageChannel

Схематично MessageChannel может быть представлен, как два порта, один из которых принадлежит основному (или любому другому) потоку, а другой — вспомогательному. В каждый из них можно как неограниченно писать, поскольку за ним стоит очередь, так и читать из него — получая события или вычитывая самостоятельно.

Собственно, доступный нам на стороне потока parentPort является ровно такой же «половинкой» встроенного по умолчанию в Worker экземпляра MessageChannel.

Схема работы с разделенными каналами данных и сигналовСхема работы с разделенными каналами данных и сигналов

В отличие от предыдущего варианта, очередь отправляется в дата-порт конкретного потока сразу блоками из нескольких задач, а основной поток формирует следующую порцию не на каждую отдельную завершенную задачу, а при поступлении через сигнал-порт сообщения об окончании обработки всего блока.

Понятно, что если задачи не идеально одинаковы, отправляя сразу блоками, мы снова можем какие-то из потоков перегрузить. Поэтому будем стараться отдавать потоку не больше 1/N от остающейся длины очереди.

Соответственно, теперь нам достаточно вместо пула потоков иметь пул дата-каналов (точнее, их портов) для отправки данных и подписку на сигнальный порт:

class WorkersPool extends EventEmitter {
  #queue;
  #workers;
  #portPool;

  constructor({queue, workersPool}) {
    super();

    this.#queue = queue;
    this.#workers = workersPool.length;
    this.#portPool = [];

    workersPool.forEach(worker => {
      const channelS = new MessageChannel();
      const channelD = new MessageChannel();
      // передаем каналы данных и сигналов потоку
      worker.postMessage(
        {
          portS : channelS.port1
        , portD : channelD.port1
        }
      , [ // массив transferList - без этого работать не будет
          channelS.port1
        , channelD.port1
        ]
      );
      // добавляем канал данных в пул
      this.#portPool.push(channelD.port2);
      // при получении сигнала об освобождении потока, запускаем передачу данных ему
      channelS.port2.on('message', this.#onMessage.bind(this, channelD.port2));
    });
  }

  #onMessage(port) {
    // передаем текущему потоку от 1 записи до 1/N всей очереди
    const part = Math.ceil(this.#queue.length / this.#workers);
    if (part) {
      for (let i = 0; i < part; i++) {
        port.postMessage(this.#queue.shift());
      }
    }
    else {
      this.#portPool.push(port);
    }
  }

  postMessage(msg) {
    // достаем из пула свободный канал
    const port = this.#portPool.pop();
    if (port) {
      port.postMessage(msg);
    }
    else {
      // если его нет - оставляем в очереди
      this.#queue.push(msg);
    }
  }
}

Для передачи порта обмена в поток мы воспользовались такой структурой как transferList. В отличие от остальных элементов сообщения, упомянутые в нем объекты не копируются, а передаются потоку в готовом виде как структура в разделяемой памяти. Чтобы исключить конфликты доступа, на стороне передающего они при этом разрушаются и в дальнейшем не могут быть использованы.

Остальной код никак не меняется, разве что деструктор для очистки подписок теперь звать не обязательно.

Полный код и результаты тестов

const {
  Worker
, isMainThread
, parentPort
, MessageChannel       // канал обмена с потоком
, receiveMessageOnPort // синхронная обработка очереди
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const EventEmitter = require('events');
class WorkersPool extends EventEmitter {
  #queue;
  #workers;
  #portPool;

  constructor({queue, workersPool}) {
    super();

    this.#queue = queue;
    this.#workers = workersPool.length;
    this.#portPool = [];

    workersPool.forEach(worker => {
      const channelS = new MessageChannel();
      const channelD = new MessageChannel();
      // передаем каналы данных и сигналов потоку
      worker.postMessage(
        {
          portS : channelS.port1
        , portD : channelD.port1
        }
      , [ // transferList
          channelS.port1
        , channelD.port1
        ]
      );
      // добавляем канал данных в пул
      this.#portPool.push(channelD.port2);
      // при получении сигнала об освобождении потока, запускаем передачу данных ему
      channelS.port2.on('message', this.#onMessage.bind(this, channelD.port2));
    });
  }

  #onMessage(port) {
    // передаем текущему потоку от 1 записи до 1/N всей очереди
    const part = Math.ceil(this.#queue.length/this.#workers);
    if (part) {
      for (let i = 0; i < part; i++) {
        port.postMessage(this.#queue.shift());
      }
    }
    else {
      this.#portPool.push(port);
    }
  }

  postMessage(msg) {
    // достаем из пула свободный канал
    const port = this.#portPool.pop();
    if (port) {
      port.postMessage(msg);
    }
    else {
      // если его нет - оставляем в очереди
      this.#queue.push(msg);
    }
  }
}

if (isMainThread) {
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(1 << 16));
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;

  let pool;
  process
    .on('test:start', () => {
      hashes.fill();
      remain = hashes.length;
    
      const Pow2Buffer = require('./Pow2Buffer');
      pool = new WorkersPool({
        queue : new Pow2Buffer(8, 16)
      , workersPool : workers.slice(0, active)
      });

      workers.forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
      tsh = hrtime();
      messages.forEach((data, id) => {
        pool.postMessage({id, data});
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;
      workers.forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
      const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0)/active;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms | ' + (avg * 100 | 0) + ' | '
      , workers.map(
          worker => (worker.util * 100 | 0).toString().padStart(2)
        ).join(' ')
      );

      if (active < n) {
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => {
      return new Promise((resolve, reject) => {
        const worker = new Worker(__filename);
        worker
          .on('online', () => resolve(worker))
          .on('message', ({id, hash}) => {
            hashes[id] = hash;
            if (!--remain) {
              process.emit('test:end');
            }
          });
      });    
    })
  )
    .then((result) => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  // обработка атомарного сообщения
  const processMessage = ({id, data}) => parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
  // обработка стартовой установки портов
  parentPort.on('message', ({portS, portD}) => {
    portD.on('message', message => {
      // при получении новых данных
      processMessage(message);
      // ... извлекаем всю остальную очередь канала данных до конца
      do {
        const recv = receiveMessageOnPort(portD);
        if (!recv) {
          break;
        }
        processMessage(recv.message);
      }
      while (true);
      // все обработали - сообщаем в сигнальный канал, что поток свободен
      portS.postMessage(undefined);
    });
  });
}
generated: 278 ms
hashed  1: 1120 ms | 99 |  99  0  0  0  0  0  0  0  0  0  0  0  0  0  1  0
hashed  2:  596 ms | 93 |  99 87  0  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  3:  452 ms | 90 |  99 89 82  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  4:  403 ms | 85 |  80 87 98 75  0  0  0  0  0  0  0  0  0  0  0  0
hashed  5:  483 ms | 57 |  47 86 59 49 43  0  0  0  0  0  0  0  0  0  0  0
hashed  6:  427 ms | 62 |  44 46 63 92 73 56  0  0  0  0  0  0  0  0  0  0
hashed  7:  334 ms | 74 |  67 68 76 78 90 84 54  0  0  0  0  0  0  0  0  0
hashed  8:  519 ms | 36 |  17 16 41 14 41 50 95 15  0  0  0  0  0  0  0  0
hashed  9:  332 ms | 70 |  51 45 65 67 88 79 79 88 65  0  0  0  0  0  0  0
hashed 10:  448 ms | 41 |  17 35 49 34 35 37 96 52 32 26  0  0  0  0  0  0
hashed 11:  317 ms | 70 |  42 59 63 76 54 79 67 89 69 87 82  0  0  0  0  0
hashed 12:  314 ms | 71 |  53 55 84 53 63 91 60 82 80 84 82 65  0  0  0  0
hashed 13:  350 ms | 52 |  42 39 41 55 30 47 66 50 70 72 67 57 42  0  0  0
hashed 14:  325 ms | 64 |  46 47 45 59 27 76 67 66 60 82 76 88 84 73  0  0
hashed 15:  316 ms | 51 |  27 65 31 28 64 18 51 53 68 76 58 67 74 49 41  0
hashed 16:  321 ms | 48 |  17 77 32 41 21 41 25 76 82 45 39 58 54 83 44 40

MessageChannel против всехMessageChannel против всех

На малом количестве потоков почти вышли на показатели Round-robin, а вот на большом иногда существенно «штормит» от неравномерности распределения очереди. В общем, не можем признать данный опыт однозначно положительным.

Поток-координатор

Давайте попробуем объединить оба предыдущих метода.

В работе с очередью нас не устраивали задержки обработки готовности задач на стороне основного потока — так давайте ее вообще вынесем в другой, специализированный, поток - координатор, который сам будет распределять задачи между рабочими потоками.

Схема увязки рабочих потоков и координатораСхема увязки рабочих потоков и координатора

В коде мы сразу через workerData передаем сигнальный канал и тип потока. Для этого придется немного модифицировать код запуска рабочих потоков:

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => new Promise((resolve, reject) => {
      // формируем вспомогательный поток и служебный канал общения с ним
      const channel = new MessageChannel();
      const worker = new Worker(__filename,
        {
          workerData   : {
            signalPort : channel.port1
          , workerType : 'worker'
          }
        , transferList : [channel.port1]
        }
      );
      worker.signalPort = channel.port2;
      worker
        .on('online', () => resolve(worker))
        .on('message', ({id, hash}) => {
          hashes[id] = hash;
          if (!--remain) {
            process.emit('test:end');
          }
        });
    }))
  )
// ...

А потом уже через этот сигнальный канал передаем порт обмена между координатором и worker’ом:

// ...
  process
    .on('test:start', () => {
      // ...
      // формируем поток-координатор и служебный канал общения с ним
      const channel = new MessageChannel();
      const coordinator = new Worker(__filename,
        {
          workerData   : {
            signalPort : channel.port1
          , workerType : 'coordinator'
          }
        , transferList : [channel.port1]
        }
      );
      coordinator.signalPort = channel.port2;
      coordinator.signalPort.setMaxListeners(0);

      coordinator.on('online', () => {
        Promise.all(
          workers.slice(0, active)
            // для каждого активного потока формируем канал обмена информацией
            // один порт вспомогательному потоку, второй - координатору
            .flatMap(worker => {
              const {port1, port2} = new MessageChannel();
              return [{worker, port : port1}, {worker : coordinator, port : port2}];
            })
            .map(adressee => new Promise((resolve, reject) => {
              const {worker, port} = adressee;
              worker.signalPort.once('message', () => resolve(adressee));
              worker.signalPort.postMessage(port, [port]);
            }))
        )
          .then(ports => {
            // запуск теста
            workers.forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
            tsh = hrtime();
            messages.forEach((data, id) => {
              coordinator.postMessage({id, data}); // отправляем все координатору
            });
          });
      });
// ...

А вот код самих потоков поменяется намного сильнее. Обратите внимание на конструкцию switch-case — в многопоточных приложениях приходится применять ее достаточно часто как для разделения логики различных типов потоков, так и при обработке разных типов сообщений:

// ...
else {
  const {signalPort, workerType} = workerData;
  switch (workerType) {
    case 'worker':
      const processMessage = ({id, data}) => parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
      // по сигнальному каналу передаем порт координатора
      signalPort.on('message', port => {
        port.on('message', message => {
          processMessage(message);
          port.postMessage(undefined); // отправляем 'ready for task'
        });
        signalPort.postMessage(undefined); // отправляем 'online'
      });
      break;
    case 'coordinator':
      const pool = [];
      const queue = new (require('./Pow2Buffer'))(8, 16);
      // по сигнальному каналу передаем порт worker'а
      signalPort.on('message', port => {
        // добавляем в пул и подписываемся на обработку сигнала готовности от worker'а
        pool.push(port);
        port.on('message', () => {
          const message = queue.shift();
          if (message) {
            port.postMessage(message);
          }
          else {
            pool.push(port);
          }
        });
        signalPort.postMessage(undefined);
      });
      // обработка входящего сообщения координатору
      parentPort.on('message', message => {
        const port = pool.pop();
        if (port) {
          port.postMessage(message);
        }
        else {
          queue.push(message);
        }
      });
      break;
  }
}

Здесь мы организуем все подписки, чтобы задача, попавшая в координатор, была выдана первому же свободному (или освободившемуся) потоку, который потом сам скинет результат в parentPort основному потоку, сообщив координатору о своей доступности:

Схема обработки задачи через координаторСхема обработки задачи через координаторПолный код и результаты тестов

const {
  Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

if (isMainThread) {
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(1 << 16));
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;

  process
    .on('test:start', () => {
      hashes.fill();
      remain = hashes.length;

      // формируем поток-координатор и служебный канал общения с ним
      const channel = new MessageChannel();
      const coordinator = new Worker(__filename,
        {
          workerData   : {
            signalPort : channel.port1
          , workerType : 'coordinator'
          }
        , transferList : [channel.port1]
        }
      );
      coordinator.signalPort = channel.port2;
      coordinator.signalPort.setMaxListeners(0);

      coordinator.on('online', () => {
        Promise.all(
          workers.slice(0, active)
            // для каждого активного потока формируем канал обмена информацией
            // один порт вспомогательному потоку, второй - координатору
            .flatMap(worker => {
              const {port1, port2} = new MessageChannel();
              return [{worker, port : port1}, {worker : coordinator, port : port2}];
            })
            .map(adressee => new Promise((resolve, reject) => {
              const {worker, port} = adressee;
              worker.signalPort.once('message', () => resolve(adressee));
              worker.signalPort.postMessage(port, [port]);
            }))
        )
          .then(ports => {
            // запуск теста
            workers.forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
            tsh = hrtime();
            messages.forEach((data, id) => {
              coordinator.postMessage({id, data}); // отправляем все координатору
            });
          });
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;
      workers.forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
      const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0)/active;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms | ' + (avg * 100 | 0) + ' | '
      , workers.map(
          worker => (worker.util * 100 | 0).toString().padStart(2)
        ).join(' ')
      );

      if (active < n) {
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => new Promise((resolve, reject) => {
      // формируем вспомогательный поток и служебный канал общения с ним
      const channel = new MessageChannel();
      const worker = new Worker(__filename,
        {
          workerData   : {
            signalPort : channel.port1
          , workerType : 'worker'
          }
        , transferList : [channel.port1]
        }
      );
      worker.signalPort = channel.port2;
      worker
        .on('online', () => resolve(worker))
        .on('message', ({id, hash}) => {
          hashes[id] = hash;
          if (!--remain) {
            process.emit('test:end');
          }
        });
    }))
  )
    .then(result => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  const {signalPort, workerType} = workerData;
  switch (workerType) {
    case 'worker':
      const processMessage = ({id, data}) => parentPort.postMessage({id, hash : createHash('sha256').update(data).digest('hex')});
      // по сигнальному каналу передаем порт координатора
      signalPort.on('message', port => {
        port.on('message', message => {
          processMessage(message);
          port.postMessage(undefined); // отправляем 'ready for task'
        });
        signalPort.postMessage(undefined); // отправляем 'online'
      });
      break;
    case 'coordinator':
      const pool = [];
      const queue = new (require('./Pow2Buffer'))(8, 16);
      // по сигнальному каналу передаем порт worker'а
      signalPort.on('message', port => {
        // добавляем в пул и подписываемся на обработку сигнала готовности от worker'а
        pool.push(port);
        port.on('message', () => {
          const message = queue.shift();
          if (message) {
            port.postMessage(message);
          }
          else {
            pool.push(port);
          }
        });
        signalPort.postMessage(undefined);
      });
      // обработка входящего сообщения координатору
      parentPort.on('message', message => {
        const port = pool.pop();
        if (port) {
          port.postMessage(message);
        }
        else {
          queue.push(message);
        }
      });
      break;
  }
}
generated: 281 ms
hashed  1: 1396 ms | 81 |  81  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  2:  804 ms | 75 |  74 76  0  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  3:  604 ms | 66 |  65 67 67  0  0  0  0  0  0  0  0  0  0  0  0  0
hashed  4:  478 ms | 62 |  62 61 63 63  0  0  0  0  0  0  0  0  0  0  0  0
hashed  5:  483 ms | 51 |  50 50 48 54 52  0  0  0  0  0  0  0  0  0  0  0
hashed  6:  495 ms | 49 |  48 51 45 50 49 48  0  0  0  0  0  0  0  0  0  0
hashed  7:  500 ms | 40 |  40 39 40 43 37 41 41  0  0  0  0  0  0  0  0  0
hashed  8:  549 ms | 32 |  33 33 33 23 24 42 32 34  0  0  0  0  0  0  0  0
hashed  9:  602 ms | 28 |  21 35 25 27 35 20 26 30 30  0  0  0  0  0  0  0
hashed 10:  518 ms | 27 |  22 29 17 32 28 24 33 35 33 22  0  0  0  0  0  0
hashed 11:  522 ms | 28 |  27 22 24 26 34 29 39 33 29 21 24  0  0  0  0  0
hashed 12:  543 ms | 25 |  31 19 19 25 24 31 30 27 25 28 16 22  0  1  1  0
hashed 13:  541 ms | 23 |  21 20 19 24 20 28 32 21 27 28 21 17 21  1  2  1
hashed 14:  549 ms | 22 |  17 22 21 20 25 19 18 25 26 29 25 21 23 20  0  0
hashed 15:  557 ms | 23 |  31 33 32 27 17 25 20 20 21 23 20 17 23 18 19  0
hashed 16:  570 ms | 22 |  24 23 19 21 19 29 23 21 21 13 26 22 21 24 24 26

Очередь в отдельном потоке-координатореОчередь в отдельном потоке-координаторе

И… результаты хоть и радуют равномерностью распределения нагрузки между потоками, вообще не впечатляют с точки зрения общей производительности.

То есть такой подход вполне допустим, если само формирование задачи для потока или обработка ее выполнения являются вычислительно-сложными — тогда их имеет смысл выносить в координатор. Но в большинстве типовых случаев такой подход показывает себя не лучшим образом.

А все потому, что, с одной стороны, мы сначала копируем контент задачи из основного потока в координатор, потом из координатора в рабочий поток, а с другой — worker бесцельно ждет, когда мы отреагируем на его готовность. И чем больше активных потоков участвует в такой схеме, тем большую долю времени они вынужденно простаивают.

Ожидание потоков в схеме с координаторомОжидание потоков в схеме с координатором

А нельзя ли как-то сделать, чтобы потоки забирали и распределяли задачи между собой «сами», не ожидая бесцельно, пока им кто-то это скомандует? Оказывается, можно, но про это — в следующей части.

© Habrahabr.ru