Приручаем многопоточность в Node.js (часть 4: координатор против синхронного кода)

7fece6bd0bfedcf09e4f578ad654e209.jpeg

В предыдущей части мы научились эффективно передавать данные вспомогательным потокам из основного через разделяемую память, используя Atomics-операции и блокировки.

Но мы рассматривали все-таки идеальную ситуацию, когда основной поток больше ничем не занимался, кроме обмена с «подчиненными» уже заранее готовыми данными. В реальных же приложениях такое встречается достаточно редко — обычно эти самые данные приходится готовить непосредственно перед передачей. И, бывает, в этом участвует существенная доля синхронного кода, что для JavaScript крайне неприятно, но иногда неизбежно — например, при вычислении регулярных выражений.

Давайте оценим, насколько синхронные операции «роняют» производительность нашего тестового приложения. И узнаем, как можно в разы улучшить ее, «скрестив ужа с ежом», используя выделенный поток-координатор из позапрошлой части статьи совместно с разделяемой памятью.

Добавим немного синхронности

Возьмем тот же самый тест из предыдущей части про разделяемую память и вместо массива сохраним все «сообщения» во временный каталог:

  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });

А уже в ходе теста будем их синхронно вычитывать с помощью readFileSync, намеренно «притормаживая» EventLoop в основном потоке приложения:

      // получаем список всех сообщений в папке
      const fns = fs.readdirSync(dir)
        .sort()
        .map(fn => dir + sep + fn);

      remain = fns.length;
      tsh = hrtime();
      tfs = 0n; // длительность синхронных операций

      fns.forEach((fn, id) => {
        const ts = hrtime();
        const data = fs.readFileSync(fn); // тяжелый синхронный код
        tfs += hrtime() - ts;             // ... и его продолжительность
        pool.postMessage({id, data});
      });

Тут мы замеряем как общее время отработки теста, так и отдельно длительность синхронных операций. Их разность даст нам, конечно, не само время обработки (пока основной поток синхронно читает, остальные-то работают), но некоторую оценку общей потери производительности.

Полный код и результаты тестов

const {
  Worker
, isMainThread
, parentPort
, workerData
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const THREAD_FREE = -1;

const EventEmitter = require('events');
class WorkersPool extends EventEmitter {
  #queue;
  #workersPool;

  constructor({queue, workersPool}) {
    super();

    this.#queue = queue;
    this.#workersPool = [...workersPool];
  }

  #shareMessage(worker, {id, data}) {
    worker.data.set(data, 0);
    worker.lock[0] = id;
    Atomics.notify(worker.lock, 0, 1);

    const lock = Atomics.waitAsync(worker.lock, 0, id);
    if (lock.value === 'not-equal') {
      this.#onMessage(worker);
    }
    else {
      lock.value.then(result => {
        this.#onMessage(worker);
      });
    }
  }

  #onMessage(worker) {
    const msg = this.#queue.shift();
    if (msg) {
      this.#shareMessage(worker, msg);
    }
    else {
      this.#workersPool.push(worker);
    }
  }

  postMessage(msg) {
    const worker = this.#workersPool.pop();
    if (worker) {
      this.#shareMessage(worker, msg);
    }
    else {
      this.#queue.push(msg);
    }
  }
}

if (isMainThread) {
  const taskSize = 1 << 16;
  
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));

  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;
  let tfs;

  process
    .on('test:start', () => {
      hashes.fill();

      const Pow2Buffer = require('./Pow2Buffer');
      pool = new WorkersPool({
        queue : new Pow2Buffer(8, 16)
      , workersPool : workers.slice(0, active)
      });

      // получаем список всех сообщений в папке
      const fns = fs.readdirSync(dir)
        .sort()
        .map(fn => dir + sep + fn);

      remain = fns.length;
      tsh = hrtime();
      tfs = 0n; // длительность синхронных операций

      fns.forEach((fn, id) => {
        const ts = hrtime();
        const data = fs.readFileSync(fn); // тяжелый синхронный код
        tfs += hrtime() - ts;             // ... и его продолжительность
        pool.postMessage({id, data});
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration - tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| fs.read'
      , (Number(tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| total'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms'
      );

      if (active < n) {
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => new Promise((resolve, reject) => {
      // выделяем сегменты разделяемой памяти
      const sharedBufferData = new SharedArrayBuffer(taskSize);
      const sharedBufferLock = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
      // инициализируем "пустое" состояние блокировки
      const lock = new Int32Array(sharedBufferLock);
      lock.fill(THREAD_FREE);
      // передаем в поток ссылки на сегменты разделяемой памяти
      const worker = new Worker(__filename,
        {
          workerData : {
            data : sharedBufferData
          , lock : sharedBufferLock
          }
        }
      );
      worker.data = new Uint8Array(sharedBufferData);
      worker.lock = lock;
      worker
        .on('online', () => resolve(worker))
        .on('message', ({id, hash}) => {
          hashes[id] = hash;
          if (!--remain) {
            process.emit('test:end');
          }
        });
    }))
  )
    .then(result => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  const {data, lock} = workerData;
  const sharedData = new Uint8Array(data);
  const sharedLock = new Int32Array(lock);

  do {
    const lock = Atomics.wait(sharedLock, 0, THREAD_FREE);
    parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')});
    sharedLock[0] = THREAD_FREE;
    Atomics.notify(sharedLock, 0, 1);
  }
  while (true);
}
generated: 1959 ms
hashed  1: 1237 ms | fs.read 4152 ms | total 5390 ms
hashed  2:  616 ms | fs.read  555 ms | total 1171 ms
hashed  3:  411 ms | fs.read  667 ms | total 1079 ms
hashed  4:  344 ms | fs.read  801 ms | total 1145 ms
hashed  5:  326 ms | fs.read  560 ms | total  887 ms
hashed  6:  307 ms | fs.read  619 ms | total  926 ms
hashed  7:  336 ms | fs.read  574 ms | total  911 ms
hashed  8:  315 ms | fs.read  572 ms | total  887 ms
hashed  9:  315 ms | fs.read  908 ms | total 1224 ms
hashed 10:  305 ms | fs.read  904 ms | total 1209 ms
hashed 11:  343 ms | fs.read  580 ms | total  924 ms
hashed 12:  300 ms | fs.read  598 ms | total  899 ms
hashed 13:  326 ms | fs.read  597 ms | total  924 ms
hashed 14:  308 ms | fs.read  584 ms | total  893 ms
hashed 15:  315 ms | fs.read  580 ms | total  895 ms
hashed 16:  310 ms | fs.read  579 ms | total  890 ms

Синхронный код основного потока снижает общую производительностьСинхронный код основного потока снижает общую производительность

В точке максимальной эффективности »4 потока на 4 ядрах», которую мы вычислили в прошлый раз, условно «чистое» время ухудшилось на 15% -, а ведь мы всего лишь на некоторое время заняли основной поток.

К сожалению, отреагировать на освобождение вспомогательного потока и дать ему следующую задачу основной может не раньше, чем завершит текущую синхронную операцию:

Ожидание вспомогательного потока из-за синхронного кода в основномОжидание вспомогательного потока из-за синхронного кода в основном

Поток-координатор

Но давайте тогда процесс «раздачи заданий» вытащим в отдельный поток, описанный во второй части нашего цикла — пусть в нем будет минимум синхронного кода и его реакции ничем не затормаживаются:

d7e75ce9bafe8df0a76c437d29fd4a14.png

Из основного потока передавать информацию в координатор мы будем «по ссылке», без копирования, с помощью transferList:

            fns.forEach((fn, id) => {
              const ts = hrtime();
              const data = fs.readFileSync(fn); // тяжелый синхронный код
              tfs += hrtime() - ts;             // ... и его продолжительность
              coordinator.postMessage(
                {id, data}
              , [data.buffer] // передача по ссылке через transferList
              );
            });

…, а уже между координатором и вспомогательными потоками — копировать через разделяемую память, которую передадим при увязке дочерних потоков:

// ...
        Promise.all(
          workers.slice(0, active)
            .flatMap(worker => {
              const shared = { // сегменты разделяемой памяти вспомогательного процесса
                data : worker.data
              , lock : worker.lock
              };
              return [{worker}, {worker : coordinator, shared}];
            })
            .map(adressee => new Promise((resolve, reject) => {
              const {worker, shared} = adressee;
              worker.signalPort.once('message', () => resolve(adressee));
              worker.signalPort.postMessage(shared); // shared попадет только в координатор
            }))
        )
// ...

Но если мы оставим в потоке бесконечный синхронный цикл ожидания блокировки, у нас пройдет только первый тест, и все встанет — поскольку «слушать» информацию от нового экземпляра координатора будет уже некому.

Поэтому переведем основной рабочий код тоже на Atomics.waitAsync, заодно сможем вернуть и оценку загрузки EventLoop:

// ...
    case 'worker':
      const {data, lock} = workerData;
      const sharedData = new Uint8Array(data);
      const sharedLock = new Int32Array(lock);

      const processMessage = () => {
        // обрабатываем сообщение
        parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')})
        // уведомляем координатор о своей доступности
        sharedLock[0] = THREAD_FREE;
        Atomics.notify(sharedLock, 0, 1);
        // ... и возвращаемся к ожиданию блокировки
        wait();
      };

      const wait = () => {
        const lock = Atomics.waitAsync(sharedLock, 0, THREAD_FREE);
        if (lock.value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          processMessage();
        }
        else {
          // иначе ждем разрешения Promise блокировки
          lock.value.then(result => {
            processMessage();
          });
        }
      };
      // сигнализируем готовность и начинаем ждать
      signalPort.on('message', () => {
        signalPort.postMessage(undefined);
        wait();
      });
      break;
// ...

Полный код и результаты тестов

const {
  Worker
, isMainThread
, parentPort
, workerData
, MessageChannel
} = require('node:worker_threads');

const {
  randomBytes
, createHash
} = require('node:crypto');

const hrtime = process.hrtime.bigint;

const THREAD_FREE = -1;

if (isMainThread) {
  const taskSize = 1 << 16;
  
  const tsg = hrtime();
  const messages = Array(1 << 12).fill().map(_ => randomBytes(taskSize));

  const fs = require('node:fs');
  const {tmpdir} = require('node:os');
  const {sep} = require('node:path');

  // создаем временную папку и в ней файлы со всеми "сообщениями"
  const dir = fs.mkdtempSync(tmpdir() + sep);
  messages.forEach((data, i) => {
    const fn = i.toString(16).padStart(3, '0');
    fs.writeFileSync(dir + sep + fn, data);
  });
  console.log('generated:', Number(hrtime() - tsg)/1e6 | 0, 'ms');

  const hashes = messages.map(() => undefined);
  let remain;

  const workers = [];
  let active = 1;
  let tsh;
  let tfs;

  let coordinator;
  process
    .on('test:start', () => {
      hashes.fill();

      const channel = new MessageChannel();
      coordinator = new Worker(__filename,
        {
          workerData   : {
            signalPort : channel.port1
          , workerType : 'coordinator'
          }
        , transferList : [channel.port1]
        }
      );
      coordinator.signalPort = channel.port2;
      coordinator.signalPort.setMaxListeners(0);

      coordinator.on('online', () => {
        Promise.all(
          workers.slice(0, active)
            .flatMap(worker => {
              const shared = { // сегменты разделяемой памяти вспомогательного процесса
                data : worker.data
              , lock : worker.lock
              };
              return [{worker}, {worker : coordinator, shared}];
            })
            .map(adressee => new Promise((resolve, reject) => {
              const {worker, shared} = adressee;
              worker.signalPort.once('message', () => resolve(adressee));
              worker.signalPort.postMessage(shared); // shared попадет только в координатор
            }))
        )
          .then(result => {
            // получаем список всех сообщений
            const fns = fs.readdirSync(dir)
              .sort()
              .map(fn => dir + sep + fn);
            remain = fns.length;

            [coordinator, ...workers].forEach(worker => worker.eLU = worker.performance.eventLoopUtilization());
            tsh = hrtime();
            tfs = 0n;

            fns.forEach((fn, id) => {
              const ts = hrtime();
              const data = fs.readFileSync(fn); // тяжелый синхронный код
              tfs += hrtime() - ts;             // ... и его продолжительность
              coordinator.postMessage(
                {id, data}
              , [data.buffer] // передача по ссылке через transferList
              );
            });
          });
      });
    })
    .on('test:end', () => {
      const duration = hrtime() - tsh;
      [coordinator, ...workers].forEach(worker => worker.util = worker.performance.eventLoopUtilization(worker.eLU).utilization);
      const avg = workers.slice(0, active).reduce((sum, worker) => sum + worker.util, 0)/active;

      console.log(
        'hashed ' + active.toString().padStart(2) + ':'
      , (Number(duration - tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| fs.read'
      , (Number(tfs)/1e6 | 0).toString().padStart(4)
      , 'ms'
      , '| total'
      , (Number(duration)/1e6 | 0).toString().padStart(4)
      , 'ms | ' + (avg * 100 | 0) + ' | '
      , (coordinator.util * 100 | 0).toString().padStart(3) + ' c'
      , workers.map(
          worker => (worker.util * 100 | 0).toString().padStart(3)
        ).join(' ')
      );

      if (active < n) {
        active++;
        process.emit('test:start');
      }
      else {
        process.exit();
      }
    });

  const n = 16;
  Promise.all(
    Array(n).fill().map(_ => new Promise((resolve, reject) => {
      // выделяем сегменты разделяемой памяти
      const sharedBufferData = new SharedArrayBuffer(taskSize);
      const sharedBufferLock = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
      // инициализируем "пустое" состояние блокировки
      const lock = new Int32Array(sharedBufferLock);
      lock.fill(THREAD_FREE);

      const channel = new MessageChannel();
      // передаем в поток ссылки на сегменты разделяемой памяти
      const worker = new Worker(__filename,
        {
          workerData : {
            signalPort : channel.port1
          , workerType : 'worker'
          , data : sharedBufferData
          , lock : sharedBufferLock
          }
        , transferList : [channel.port1]
        }
      );
      worker.signalPort = channel.port2;
      worker.data = new Uint8Array(sharedBufferData);
      worker.lock = lock;
      worker
        .on('online', () => resolve(worker))
        .on('message', ({id, hash}) => {
          hashes[id] = hash;
          if (!--remain) {
            process.emit('test:end');
          }
        });
    }))
  )
    .then(result => {
      workers.push(...result);
      process.emit('test:start');
    });
}
else {
  const {signalPort, workerType} = workerData;
  switch (workerType) {
    case 'worker':
      const {data, lock} = workerData;
      const sharedData = new Uint8Array(data);
      const sharedLock = new Int32Array(lock);

      const processMessage = () => {
        // обрабатываем сообщение
        parentPort.postMessage({id : sharedLock[0], hash : createHash('sha256').update(sharedData).digest('hex')})
        // уведомляем координатор о своей доступности
        sharedLock[0] = THREAD_FREE;
        Atomics.notify(sharedLock, 0, 1);
        // ... и возвращаемся к ожиданию блокировки
        wait();
      };

      const wait = () => {
        const lock = Atomics.waitAsync(sharedLock, 0, THREAD_FREE);
        if (lock.value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          processMessage();
        }
        else {
          // иначе ждем разрешения Promise блокировки
          lock.value.then(result => {
            processMessage();
          });
        }
      };
      // сигнализируем готовность и начинаем ждать
      signalPort.on('message', () => {
        signalPort.postMessage(undefined);
        wait();
      });
      break;
    case 'coordinator':
      const pool = [];
      const queue = new (require('./Pow2Buffer'))(8, 16);

      const shareMessage = (worker, {id, data}) => {
        worker.data.set(data, 0);
        worker.lock[0] = id;
        Atomics.notify(worker.lock, 0, 1);

        const lock = Atomics.waitAsync(worker.lock, 0, id);
        if (lock.value === 'not-equal') {
          // если значение изменилось, то поток уже обработал задачу, и реагируем сразу
          onMessage(worker);
        }
        else {
          // иначе ждем разрешения Promise блокировки
          lock.value.then(result => {
            onMessage(worker);
          });
        }
      }

      const onMessage = worker => {
        const msg = queue.shift();
        if (msg) {
          shareMessage(worker, msg);
        }
        else {
          pool.push(worker);
        }
      }

      // по сигнальному каналу передаем порт worker'а
      signalPort.on('message', worker => {
        // добавляем в пул и подписываемся на обработку сигнала готовности от worker'а
        pool.push(worker);
        signalPort.postMessage(undefined);
      });
      // обработка входящего сообщения координатору
      parentPort.on('message', message => {
        const worker = pool.pop();
        if (worker) {
          shareMessage(worker, message);
        }
        else {
          queue.push(message);
        }
      });
      break;
  }
}
generated: 2043 ms
hashed  1:  373 ms | fs.read 2387 ms | total 2761 ms | 42 |   10 c  42   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
hashed  2:  118 ms | fs.read  602 ms | total  720 ms | 89 |   39 c  98  79   0   0   0   0   0   0   0   0   0   0   0   0   0   0
hashed  3:  119 ms | fs.read  570 ms | total  690 ms | 80 |   33 c 100  99  40   0   0   0   0   0   0   0   0   0   0   0   0   0
hashed  4:   75 ms | fs.read  603 ms | total  679 ms | 77 |   23 c 100 100  96  12   0   0   0   0   0   0   0   0   0   0   0   0
hashed  5:  204 ms | fs.read  931 ms | total 1135 ms | 81 |   27 c 100 100 100  98  11   0   0   0   0   0   0   0   0   0   0   0
hashed  6:  146 ms | fs.read  994 ms | total 1141 ms | 82 |   12 c 100 100 100 100  88   9   0   0   0   0   0   0   0   0   0   0
hashed  7:  260 ms | fs.read  952 ms | total 1212 ms | 81 |   22 c 100 100 100 100 100  67   5   0   1   0   0   0   1   1   0   1
hashed  8:  142 ms | fs.read 1433 ms | total 1576 ms | 89 |   16 c 100 100 100 100 100 100 100  13   0   0   0   0   0   0   0   0
hashed  9:  253 ms | fs.read 1554 ms | total 1807 ms | 86 |    9 c 100 100 100 100 100 100 100  74   0   0   0   0   0   0   0   0
hashed 10:  196 ms | fs.read 1587 ms | total 1784 ms | 80 |    7 c 100 100 100 100 100 100 100 100   6   2   0   0   0   0   0   0
hashed 11:  311 ms | fs.read 1948 ms | total 2259 ms | 87 |   21 c 100 100 100 100 100 100 100 100  81  81   4   0   0   0   0   0
hashed 12:  237 ms | fs.read 1973 ms | total 2210 ms | 83 |    7 c 100 100 100 100 100 100 100 100 100 100   0   0   0   0   0   0
hashed 13:  362 ms | fs.read 2014 ms | total 2377 ms | 78 |   14 c 100 100 100 100 100 100 100 100 100 100   5   5   8   0   0   0
hashed 14:   73 ms | fs.read 2280 ms | total 2354 ms | 77 |    6 c 100 100 100 100 100 100 100 100 100 100   7  38  37   5   0   0
hashed 15:  413 ms | fs.read 2513 ms | total 2927 ms | 83 |   12 c 100 100 100 100 100 100 100 100 100 100  52 100 100   3   1   0
hashed 16:  241 ms | fs.read 2619 ms | total 2861 ms | 86 |   21 c 100 100 100 100 100 100 100 100 100 100 100 100 100  52  11  12

Координатор решает!Координатор решает!

Код нашего приложения уже совсем перестал быть простым, но зато мы все-таки еще почти в 4 раза уменьшили время обработки.

При этом по координатору еще остается запас для другой полезной работы — например, для динамического управления количеством вспомогательных потоков. Но про это — в следующей части.

© Habrahabr.ru