Многопоточность JavaScript с SharedArrayBuffer и Atomics: основы

b233aec713d40d7a43ec913361859e7e.png

Привет, Хабр!

JavaScript по традиции известен как однопоточный язык. Т.е код выполняется последовательно, и одновременное выполнение нескольких задач может быть проблематичным. Если код сталкивается с тяжелыми вычислительными задачами, это может привести к задержкам и замедлению интерфейса юзера. Поэтому один поток не для каких-либо интенсивных вычислений или обработки больших объемов данных.

Чтобы обойти эти ограничения, были введены Web Workers — они позволяют выполнять JS-код в фоновом потоке, параллельно с основным. Однако, все сложилось так, что простой обмен данными между основным потоком и воркерами через postMessage имеет свои ограничения и может быть недостаточно хорошим для некоторых задач.

Здесь помогают SharedArrayBuffer и Atomics.

SharedArrayBuffer

SharedArrayBuffer — это особый тип буфера, который позволяет нескольким потокам разделять один и тот же блок памяти. В отличие от того же дефолтного ArrayBuffer, который доступен только одному потоку, SharedArrayBuffer дает общую область памяти, к которой могут обращаться несколько рабочих потоков.

Внутренне SharedArrayBuffer представляет собой блок памяти фикс. размера, который можно использовать для хранения бинарных данных. Он используется в связке с типизированными массивами, такими как Uint8Array или Int32Array.

Но перед тем как работать с SharedArrayBuffer очень важно использовать определенные требования по настройке среды — настройку заголовков HTTP.

COOP заголовок позволяет защитить сайт от атак типа cross-origin. Он должен быть установлен на значение same-origin:

Cross-Origin-Opener-Policy: same-origin

COEP заголовок предотвращает встраивание сайта в другие документы. Он должен быть установлен на значение require-corp или credentialless:

Cross-Origin-Embedder-Policy: require-corp

Теперь рассмотрим как создать и передать SharedArrayBuffer между потоками.

Создание SharedArrayBuffer

Конструктор SharedArrayBuffer используется для создания буфера определенного размера в байтах. Он принимает два параметра:

  • length: размер буфера в байтах.

  • options (необязательный): объект с параметрами, например как maxByteLength, который определяет максимальный размер буфера.

const sab = new SharedArrayBuffer(1024);
const growableSab = new SharedArrayBuffer(8, { maxByteLength: 16 });

Методы SharedArrayBuffer

grow позволяет увеличивать размер SharedArrayBuffer, если он был создан с параметром maxByteLength. Новый размер должен быть меньше или равен maxByteLength:

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });

if (buffer.growable) {
  buffer.grow(12);
}

slice возвращает новый SharedArrayBuffer, содержащий копию байтов из оригинального буфера от индекса start (включительно) до end (исключительно).

const sab = new SharedArrayBuffer(1024);
const slicedSab = sab.slice(2, 100);

Так можно создавать буфеы, основанные на подмножествах данных из исходного буфера, без изменения оригинального.

Свойства SharedArrayBuffer

byteLength возвращает длину SharedArrayBuffer в байтах. Это значение устанавливается при создании буфера и не может быть изменено.

const sab = new SharedArrayBuffer(1024);
console.log(sab.byteLength); // 1024

Свойство growable указывает, может ли SharedArrayBuffer изменять свой размер. Если буфер был создан с параметром maxByteLength, это свойство будет равно true.

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
console.log(buffer.growable); // true

maxByteLength возвращает максимальный размер буфера в байтах, который был установлен при создании буфера.

const buffer = new SharedArrayBuffer(8, { maxByteLength: 16 });
console.log(buffer.maxByteLength); // 16

Atomics

Atomics — это встроенный объект в JS, который предоставляет набор статических методов для выполнения атомарных операций. Атомарные операции выполняются как единое, неделимое действие, именно так atomics позволяет избегать гонок данных.

Основные методы Atomics:

Atomics.add (typedArray, index, value)
Атомарно добавляет значение к элементу массива по заданному индексу и возвращает предыдущее значение:

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
Atomics.add(int32, 0, 5); // добавляет 5 к int32[0]

Atomics.sub (typedArray, index, value)
Атомарно вычитает значение из элемента массива по заданному индексу и возвращает предыдущее значение:

Atomics.sub(int32, 0, 2); // вычитает 2 из int32[0]

Atomics.load (typedArray, index)
Атомарно считывает значение элемента массива по заданному индексу:

let value = Atomics.load(int32, 0); // считывает значение int32[0]

Atomics.store (typedArray, index, value)
Атомарно записывает значение в элемент массива по заданному индексу:

Atomics.store(int32, 0, 10); // Записывает 10 в int32[0]

Atomics.compareExchange (typedArray, index, expectedValue, replacementValue)
Атомарно сравнивает текущее значение элемента массива по заданному индексу с ожидаемым значением. Если они равны, то записывает новое значение и возвращает старое:

Atomics.compareExchange(int32, 0, 10, 20); // если int32[0] равно 10, то записывает 20 и возвращает старое значение

Atomics.exchange (typedArray, index, value)
Атомарно записывает значение в элемент массива по заданному индексу и возвращает старое значение:

Atomics.exchange(int32, 0, 15); // записывает 15 в int32[0] и возвращает старое значение

Atomics.wait (typedArray, index, value, timeout)
Проверяет значение элемента массива и ожидает его изменения до заданного значения или истечения времени тайм-аута. Возвращает «ok», «not-equal» или «timed-out»:

let result = Atomics.wait(int32, 0, 10, 1000); // ожидает изменения int32[0] до 10 или 1000 мс

Atomics.notify (typedArray, index, count)
Уведомляет потоки, ожидающие изменения элемента массива по заданному индексу. Возвращает количество уведомленных потоков:

let notified = Atomics.notify(int32, 0, 1); // уведомляет один поток, ожидающий изменения int32[0]

Atomics можно юзать для синхронных операций. Например, для синхронизации работы потоков:

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);

// в главном потоке
const worker = new Worker('worker.js');
worker.postMessage(sab);

Atomics.store(int32, 0, 0);
Atomics.wait(int32, 0, 0); // ожидание изменения int32[0]

// В рабочем потоке (worker.js)
self.onmessage = function(event) {
    const int32 = new Int32Array(event.data);
    Atomics.store(int32, 0, 1);
    Atomics.notify(int32, 0, 1); // уведомление главного потока
};

Объединяем Atomics и SharedArrayBuffer

Счетчик запросов в реал тайме

Допустим, есть веб-сервис, который обрабатывает большое количество запросов, и нам нужно следить за кол-вом активных запросов в реальном времени:

// главный поток
const buffer = new SharedArrayBuffer(4); // Создаем буфер на 4 байта
const counter = new Int32Array(buffer);

const worker = new Worker('worker.js');
worker.postMessage(buffer);

// обрабатываем новый запрос
function handleRequest() {
    Atomics.add(counter, 0, 1); // увеличиваем счетчик
    console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);
    
    // симулируем завершение запроса через 2 секунды
    setTimeout(() => {
        Atomics.sub(counter, 0, 1); // уменьшаем счетчик
        console.log(`Активные запросы: ${Atomics.load(counter, 0)}`);
    }, 2000);
}

// рабочий поток (worker.js)
self.onmessage = function(event) {
    const counter = new Int32Array(event.data);
    setInterval(() => {
        console.log(`[Worker] Активные запросы: ${Atomics.load(counter, 0)}`);
    }, 1000);
};

Используем SharedArrayBuffer для хранения счетчика активных запросов, а Atomics дает безопасное увеличение и уменьшение счетчика из разных потоков.

Параллельная обработка данных

Рассмотрим задачу параллельной обработки большого массива данных, например, применение функции к каждому элементу массива:

/// главный поток
const buffer = new SharedArrayBuffer(1024 * 4); // буфер для 1024 чисел
const data = new Int32Array(buffer);

// инициализируем массив случайными числами
for (let i = 0; i < data.length; i++) {
    data[i] = Math.floor(Math.random() * 100);
}

const worker = new Worker('worker.js');
worker.postMessage(buffer);

function processData() {
    for (let i = 0; i < data.length; i++) {
        data[i] *= 2; // применяем функцию к каждому элементу
    }
    console.log('Обработка данных завершена');
}

processData();

// рабочий поток (worker.js)
self.onmessage = function(event) {
    const data = new Int32Array(event.data);
    for (let i = 0; i < data.length; i++) {
        Atomics.store(data, i, Atomics.load(data, i) * 2); // атомарное умножение на 2
    }
    console.log('Рабочий поток: обработка данных завершена');
};

Синхронизация состояния между потоками

Предположим, есть некая игрушка, где несколько потоков должны синхронизировать свое состояние:

// главный поток
const buffer = new SharedArrayBuffer(4); // буфер на 4 байта для хранения состояния
const state = new Int32Array(buffer);

const worker = new Worker('worker.js');
worker.postMessage(buffer);

function updateState(newState) {
    Atomics.store(state, 0, newState); // обовляем состояние
    Atomics.notify(state, 0); // уведомляем рабочий поток об изменении состояния
    console.log(`Состояние обновлено: ${newState}`);
}

updateState(1);

// рабочий поток (worker.js)
self.onmessage = function(event) {
    const state = new Int32Array(event.data);

    function waitForStateChange() {
        Atomics.wait(state, 0, Atomics.load(state, 0)); // ожидаем изменения состояния
        console.log(`Рабочий поток: состояние изменено на ${Atomics.load(state, 0)}`);
        waitForStateChange(); // рекурсивно продолжаем ожидание изменений
    }

    waitForStateChange();
};

Про востребованные языки программирования и практические инструменты мои коллеги из OTUS рассказывают в рамках онлайн-курсов. По этой ссылке вы можете ознакомиться с полным каталогом курсов, а также записаться на открытые уроки.

Habrahabr.ru прочитано 2041 раз