Разгоняем JS-парсер с помощью WebAssembly (часть 3: SIMD)
В предыдущей статье мы остановились на варианте, который с помощью SWAR-хинта превращает 8 последовательных цифр в одно числовое 32bit-значение. Но что если мы предположим, что все значения у нас, в основном, невелики — до 3 цифр? Тогда нам вполне достаточно использовать всего лишь 32bit-инструкции, а SWAR будет выполнен за 2 операции вместо 3 — сплошной выигрыш!
Давайте перепишем наш код так, чтобы первый блок из 4 символов обрабатывался 32bit-инструкциями, а если понадобится второй блок из 8 символов — уже 64bit-инструкциями.
И… вместо 26ms получаем 28ms! Значит, наше предположение относительно длины чисел не оправдалось, и в первом блоке выгоднее обрабатывать сразу побольше символов.
большие -, но по 5, маленькие -, но по 3То есть больше размерность регистра — лучше? И такие регистры есть — это 128-битные SSE-регистры XMM — в WebAssembly они доступны нам как переменные с типом v128
и суперскалярные операции над ними.
Обратите внимание, что пока не все эти операции реализованы в разных JavaScript-движках.
В зависимости от используемой инструкции v128
-регистр интерпретируется либо как единый блок (например, для битовых логических операций and, or, xor, not
), либо как набор блоков меньшей размерности:
Как включить SIMD в Node.js
Для начала нам придется немного доработать наш «компилятор» compile-test.js
: и включить дополнительные расширения:
const wasmModule = wabt.parseWat(
inputWat
, readFileSync(inputWat, 'utf8')
, {simd : true} // включаем поддержку SIMD-feature
);
Посмотреть, какие существуют другие дополнительные «фичи» компиляции в WASM можно на демо-странице wat2wasm.
Поскольку в Node.js пока поддержка SIMD проходит в качестве экспериментальной, запускать наш парсер придется с включенными соответствующими V8-опциями:
--experimental-wasm-simd (enable prototype SIMD opcodes for wasm)
type: bool default: false
--wasm-simd-post-mvp (allow experimental SIMD operations for prototyping that are not included in the current proposal)
type: bool default: false
Теперь команда на запуск выглядит вот так:
node --experimental-wasm-simd --wasm-simd-post-mvp --expose-gc buffers-test.js
SIMD-парсинг натуральных чисел на WASM
Собственно, нам необходимо решить те же задачи, что и для SWAR-варианта, но уже с учетом возможностей v128
-инструкций. За основу возьмем алгоритм, приведенный на StackOverflow.
Узнаем длину числа
вычитаем «нулевой» символ
0x30
из каждого, с переходом через 0 — это означает, что наши стоп-символы0x0A, 0x20 и 0x2C
превратятся в0xDA, 0xF0 и 0xFC
соответственновычисляем для каждого байта, что он
<= 0x09
— то есть исходно находился в диапазоне0x30..0x39
, что соответствует символам'0'..'9'
находим первый
false
-байт с помощьюctz(not(bitmask))
— это и есть позиция нашего стоп-символа
Для реализации этого в коде нам понадобятся две глобальные переменные-«константы»: «все нули» и «все девятки»:
(global $all_zero v128
(v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30)
)
(global $all_nine v128
(v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09)
)
;; разбор числа - SIMD
(func $parseNumber
(local $ch i32)
(local $xword v128)
(local $xn i32)
;; xword = line[off..+15]
(local.set $xword
(v128.load align=4
(global.get $off)
)
)
;; xn = num digits
(local.set $xn
(i32.ctz ;; * = 5
(i32.xor ;; 0b... 1 0 0 0 0 0
(i8x16.bitmask ;; 0b... 0 1 1 1 1 1
(i8x16.le_u ;; 0x... 00 FF FF FF FF FF
(i8x16.sub ;; 0x... F0 05 04 03 02 01
(local.get $xword) ;; 0x... 20 35 34 33 32 31
(global.get $all_zero) ;; 0x... 30 30 30 30 30 30
)
(global.get $all_nine) ;; 0x... 09 09 09 09 09 09
)
)
(i32.const 0xFFFFFFFF) ;; 0b... 1 1 1 1 1 1
)
)
)
Определяем стоп-символ
Просто так взять xn
-й байт из v128-значения нельзя — нет соответствующей инструкции. Зато есть i8x16.extract_lane
, которая позволяет взять заранее указанный байт (нас вполне устроит #0
).
Но чтобы в нулевом байте оказался xn
-й, нужно «переложить» байты в соответствии с маской с помощью swizzle
.
А соответствующую маску, состоящую из единственного повторяющегося значения xn
, нам позволит сделать инструкция splat
.
;; ch = xword[xn]
(local.set $ch ;; ch = 0x20
(i8x16.extract_lane_u ;; xmm[0]
0 ;; константа
(i8x16.swizzle ;; 0x... 20 20 20 20 20 20
(local.get $xword) ;; 0x... 20 35 34 33 32 31
(i8x16.splat ;; 0x... 05 05 05 05 05 05
(local.get $xn) ;; xn = 5
)
)
)
)
Нормализуем число
Аналогичным образом, с помощью swizzle
«развернем» число, чтобы младший разряд оказался в младшем байте, и сразу зачистим от не-цифр за счет свойства этой операции «занулять» байты, которым соответствуют индексы больше 0x0F
.
(global $shuf_reverse v128
(v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00)
)
(global $all_digit v128
(v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F)
)
;; clean & reverse
(local.set $xword
(v128.and ;; 0x... 00 01 02 03 04 05
(i8x16.swizzle ;; 0x... 00 31 32 33 34 35
(local.get $xword) ;; 0x... 20 35 34 33 32 31
(i8x16.sub ;; 0x... FF 00 01 02 03 04
(global.get $shuf_reverse) ;; 0x... 0A 0B 0C 0D 0E 0F
(i8x16.splat ;; 0x... 0B 0B 0B 0B 0B 0B
(i32.sub ;; = 0x0B
(i32.const 16)
(local.get $xn) ;; xn = 5
)
)
)
)
(global.get $all_digit) ;; 0x... 0F 0F 0F 0F 0F 0F
)
)
SIMD-конвертация
Итак, мы получили состояние переменной (читай, регистра), когда в каждом из 16 байтов находятся значения цифр, позиционно соответствующие десятичным разрядам числа, которое мы хотим получить — в младшем байте младший разряд:
P O N M L K J I H G F E D C B A == A * 10^0 + B * 10^1 + C * 10^2 + D * 10^3 + ...
В идеале, для реализации алгоритма нам подошли бы инструкции расширенного умножения типа i16x8.extmul_[low|high]_i8x16_u
и расширенного попарного сложения i16x8.extadd_pairwise_i8x16_u
.
Первая из этих инструкций умножает нижние/верхние половины пары XMM-регистров, воспринимая их как набор 8bit-чисел на входе, расширяя выход до 16bit-чисел:
0x... FC FD FE FF
x x x x
0x... 04 03 02 01
= = = =
0x... 03F0 02F7 01FC 00FF = i16x8.extmul_low_i8x16_u
Вторая попарно складывает соседние «числа» в регистре:
0x... FC + FD FE + FF
0x... 01F9 01FD = i16x8.extadd_pairwise_i8x16_u
Поскольку они пока не поддерживаются, нам придется их эмулировать с помощью доступных инструкций.
Итак, сначала мы выделим из нашего регистра с помощью i16x8.widen_low_i8x16_u
нижнюю половину, расширив 8bit-числа до 16bit и перемножим с маской степеней 10:
(global $stepA_pow10 v128
(v128.const i16x8 1 10 100 1000 1 10 100 1000)
)
;; ...
;; i16x8.extmul_low_i8x16_u - not implemented
(local.set $xtempL
(i16x8.mul ;; H000 G00 F0 E D000 C00 B0 A
(i16x8.widen_low_i8x16_u ;; 0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A
(local.get $xword) ;; ... H G F E D C B A
)
(global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1
)
)
Степени выбраны с таким умыслом, чтобы ни в одной из 16bit-ячеек результат не мог превзойти 2^16.
Затем эмулируем extadd_pairwise
с помощью битового сдвига, сложения и, когда необходимо, наложения битовой маски:
;; i32x4.extadd_pairwise_i16x8_u - not implemented
(local.set $xtempL
(i16x8.add ;; H000 HG00 F0 FE D000 DC00 B0 BA
(i32x4.shr_u ;; 0 H000 0 F0 0 D000 0 B0
(local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A
(i32.const 16)
)
(local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A
)
)
То же самое проделаем со старшими байтами регистра:
(local.set $xtempH
(i16x8.mul ;; P000 O00 N0 M L000 K00 J0 I
(i16x8.widen_high_i8x16_u ;; 0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I
(local.get $xword) ;; P O N M L K J I ...
)
(global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1
)
)
(local.set $xtempH
(i16x8.add ;; P000 PO00 N0 NM L000 LK00 J0 JI
(i32x4.shr_u ;; 0 P000 0 N0 0 L000 0 J0
(local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I
(i32.const 16)
)
(local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I
)
)
Упакуем оба полученных значения обратно в один регистр, отбросив лишние блоки с помощью shuffle
:
;; shuffle
(local.set $xword
(i8x16.shuffle ;; PO00 NM LK00 JI HG00 FE DC00 BA
0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D
(local.get $xtempL) ;; H000 HG00 F0 FE D000 DC00 B0 BA
(local.get $xtempH) ;; P000 PO00 N0 NM L000 LK00 J0 JI
)
)
Повторим сложение-умножение соседних блоков еще дважды — для i16×8 и i32×4, уже с наложением маски:
(global $stepB_pow10 v128
(v128.const i32x4 1 10000 100000000 0)
)
(global $stepB_mask v128
(v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000)
)
;; ...
(local.set $xword
(i16x8.add ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
(i32x4.shr_u ;; 0 PO00 0 LK00 0 HG00 0 DC00
(local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA
(i32.const 16)
)
(local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA
)
)
(local.set $xword
(v128.and ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA
(local.get $xword) ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
(global.get $stepA_mask) ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x...
)
)
(local.set $xword
(i32x4.mul ;; 0 LKJI00000000 HGFE0000 DCBA
(local.get $xword) ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA
(global.get $stepB_pow10) ;; 0 100000000 10000 1
)
)
(local.set $xword
(i32x4.add ;; 0 LKJI00000000 HGFE0000 HGFEDCBA
(i64x2.shr_u ;; 0 0 0 HGFE0000
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA
(i32.const 32)
)
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA
)
)
(local.set $xword
(v128.and ;; 0 LKJI00000000 0 HGFEDCBA
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 HGFEDCBA
(global.get $stepB_mask) ;; 00000000 FFFFFFFF 00000000 FFFFFFFF 0x...
)
)
Заметим, что самый старший блок цифр 'PONM'
мы потеряли при умножениях, поскольку он не влез в размерность i32. Если все-таки предполагаются значения такой длины, данный сегмент можно вытащить отдельно через extract_lane
.
(module
(import "js" "memory" (memory 1))
;; текущее смещение "курсора"
(global $off (import "js" "off") (mut i32))
;; текущее значение
(global $val (mut i64)
(i64.const 0)
)
(global $mask (mut i32)
(i32.const 0)
)
(global $state (mut i32)
(i32.const 0)
)
(global $key (mut i32)
(i32.const 0)
)
(global $stepA_pow10 v128
(v128.const i16x8 1 10 100 1000 1 10 100 1000)
)
(global $stepB_pow10 v128
(v128.const i32x4 1 10000 100000000 0)
)
(global $stepA_mask v128
(v128.const i16x8 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000)
)
(global $stepB_mask v128
(v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000)
)
(global $all_zero v128
(v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30)
)
(global $all_nine v128
(v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09)
)
(global $shuf_reverse v128
(v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00)
)
(global $all_digit v128
(v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F)
)
;; таблица косвенной адресации функций
(table 128 anyfunc)
(elem (i32.const 0)
$null ;; 00
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 10
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 20
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 30
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 40
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 50
$null ;; 1
$null ;; 2
$null ;; 3
$null ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 60
$null ;; 1
$null ;; 2
$null ;; 3
$step_d ;; 4
$null ;; 5
$null ;; 6
$null ;; 7
$step_h ;; 8
$null ;; 9
$null ;; A
$null ;; B
$step_l ;; C
$null ;; D
$null ;; E
$null ;; F
$null ;; 70
$null ;; 1
$step_r ;; 2
$step_s ;; 3
$step_t ;; 4
$null ;; 5
$null ;; 6
$step_w ;; 7
$null ;; 8
$null ;; 9
$null ;; A
$null ;; B
$null ;; C
$null ;; D
$null ;; E
$null ;; F
)
;; обновляем маску атрибутов и значение
(func $setData
;; buf[key << 3] = val
(i64.store
(i32.shl
(global.get $key)
(i32.const 3)
)
(global.get $val)
)
;; mask |= 1 << key
(global.set $mask
(i32.or
(global.get $mask)
(i32.shl
(i32.const 1)
(global.get $key)
)
)
)
)
(func $step (param $off_v i32)
;; $off += $off_v
(global.set $off
(i32.add
(global.get $off)
(local.get $off_v)
)
)
)
;; разбор числа - SIMD
(func $parseNumber
(local $ch i32)
(local $xword v128)
(local $xn i32)
(local $xtempL v128)
(local $xtempH v128)
;; xword = line[off..+15]
(local.set $xword
(v128.load align=4
(global.get $off)
)
)
;; xn = num digits
(local.set $xn
(i32.ctz
(i32.xor
(i8x16.bitmask
(i8x16.le_u
(i8x16.sub
(local.get $xword)
(global.get $all_zero)
)
(global.get $all_nine)
)
)
(i32.const 0xFFFFFFFF)
)
)
)
;; ch = xword[xn]
(local.set $ch ;; ch = 0x20
(i8x16.extract_lane_u ;; xmm[0]
0 ;; константа
(i8x16.swizzle ;; 0x... 20 20 20 20 20 20
(local.get $xword) ;; 0x... 20 35 34 33 32 31
(i8x16.splat ;; 0x... 05 05 05 05 05 05
(local.get $xn) ;; xn = 5
)
)
)
)
;; clean & reverse
(local.set $xword
(v128.and ;; 0x... 00 01 02 03 04 05
(i8x16.swizzle ;; 0x... 00 31 32 33 34 35
(local.get $xword) ;; 0x... 20 35 34 33 32 31
(i8x16.sub ;; 0x... FF 00 01 02 03 04
(global.get $shuf_reverse) ;; 0x... 0A 0B 0C 0D 0E 0F
(i8x16.splat ;; 0x... 0B 0B 0B 0B 0B 0B
(i32.sub ;; = 0x0B
(i32.const 16)
(local.get $xn) ;; xn = 5
)
)
)
)
(global.get $all_digit) ;; 0x... 0F 0F 0F 0F 0F 0F
)
)
;; i16x8.extmul_low_i8x16_u - not implemented
(local.set $xtempL
(i16x8.mul ;; H000 G00 F0 E D000 C00 B0 A
(i16x8.widen_low_i8x16_u ;; 0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A
(local.get $xword) ;; ... H G F E D C B A
)
(global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1
)
)
;; i32x4.extadd_pairwise_i16x8_u - not implemented
(local.set $xtempL
(i16x8.add ;; H000 HG00 F0 FE D000 DC00 B0 BA
(i32x4.shr_u ;; 0 H000 0 F0 0 D000 0 B0
(local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A
(i32.const 16)
)
(local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A
)
)
(local.set $xtempH
(i16x8.mul ;; P000 O00 N0 M L000 K00 J0 I
(i16x8.widen_high_i8x16_u ;; 0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I
(local.get $xword) ;; P O N M L K J I ...
)
(global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1
)
)
(local.set $xtempH
(i16x8.add ;; P000 PO00 N0 NM L000 LK00 J0 JI
(i32x4.shr_u ;; 0 P000 0 N0 0 L000 0 J0
(local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I
(i32.const 16)
)
(local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I
)
)
;; shuffle
(local.set $xword
(i8x16.shuffle ;; PO00 NM LK00 JI HG00 FE DC00 BA
0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D
(local.get $xtempL) ;; H000 HG00 F0 FE D000 DC00 B0 BA
(local.get $xtempH) ;; P000 PO00 N0 NM L000 LK00 J0 JI
)
)
(local.set $xword
(i16x8.add ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
(i32x4.shr_u ;; 0 PO00 0 LK00 0 HG00 0 DC00
(local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA
(i32.const 16)
)
(local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA
)
)
(local.set $xword
(v128.and ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA
(local.get $xword) ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
(global.get $stepA_mask) ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x...
)
)
(local.set $xword
(i32x4.mul ;; 0 LKJI00000000 HGFE0000 DCBA
(local.get $xword) ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA
(global.get $stepB_pow10) ;; 0 100000000 10000 1
)
)
(local.set $xword
(i32x4.add ;; 0 LKJI00000000 HGFE0000 HGFEDCBA
(i64x2.shr_u ;; 0 0 0 HGFE0000
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA
(i32.const 32)
)
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA
)
)
(local.set $xword
(v128.and ;; 0 LKJI00000000 0 HGFEDCBA
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 HGFEDCBA
(global.get $stepB_mask) ;; 00000000 FFFFFFFF 00000000 FFFFFFFF 0x...
)
)
;; return
(global.set $val
(i64.add
(i64x2.extract_lane
0
(local.get $xword)
)
(i64x2.extract_lane
1
(local.get $xword)
)
)
)
;; шагаем на n символов
;; ch == '\n' || ch == ' ' => +1
;; ch == ',' => +2
(call $step
(i32.add
(local.get $xn)
(i32.add
(i32.const 1)
(i32.eq
(local.get $ch)
(i32.const 0x2C)
)
)
)
)
)
;; [state, off] = [state_v, off + off_v]
(func $iterate0 (param $state_v i32) (param $off_v i32)
;; state = state_v
(global.set $state
(local.get $state_v)
)
;; off += off_v
(call $step
(local.get $off_v)
)
)
;; [key, off] = [state + state_v, off + off_v]
(func $iterate1 (param $state_v i32) (param $off_v i32)
;; key = state + state_v
(global.set $key
(i32.add
(global.get $state)
(local.get $state_v)
)
)
;; off += off_v
(call $step
(local.get $off_v)
)
)
(func $null (result i32)
i32.const 0
)
(func $step_s (result i32) ;; shared
;; $state = 0
;; $off += 7
(call $iterate0
(i32.const 0)
(i32.const 7)
)
i32.const 1
)
(func $step_l (result i32) ;; local
;; $state = 4
;; $off += 6
(call $iterate0
(i32.const 4)
(i32.const 6)
)
i32.const 1
)
(func $step_t (result i32) ;; temp
;; $state = 8
;; $off += 5
(call $iterate0
(i32.const 8)
(i32.const 5)
)
i32.const 1
)
(func $step_h (result i32) ;; hit
;; key = state + 0;
;; $off += 4
(call $iterate1
(i32.const 0)
(i32.const 4)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_r (result i32) ;; read
;; key = state + 1;
;; $off += 5
(call $iterate1
(i32.const 1)
(i32.const 5)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_d (result i32) ;; dirtied
;; key = state + 2;
;; $off += 8
(call $iterate1
(i32.const 2)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func $step_w (result i32) ;; written
;; key = state + 3;
;; $off += 8
(call $iterate1
(i32.const 3)
(i32.const 8)
)
;;
call $parseNumber
call $setData
i32.const 1
)
(func (export "parseBuffers") (result i32)
;; mask = 0
(global.set $mask
(i32.const 0)
)
;; off += 'Buffers: '.length
(global.set $off
(i32.add
(global.get $off)
(i32.const 9)
)
)
;; for (...)
(block
(loop
;; if (table[line[off]]()) continue;
(br_if 0
(call_indirect (result i32)
(i32.load8_u
(global.get $off)
)
)
)
;; break, end loop
)
)
;; сохраняем текущее смещение
(i32.store
(i32.const 0xFFFC) ;; 16383 * 4
(global.get $off)
)
;; возвращаем маску
global.get $mask
)
)
Запускаем! Увы, наши ожидания не оправдались — время ухудшилось с 26ms до 30ms. Неужели мы написали такую плохую реализацию конвертации чисел?
Давайте выключим все эти вычисления, оставив лишь вычисление количества цифр xn
и стоп-символа ch
— и все равно уже 27ms!
Так что основные потери у нас возникают уже на чтении слишком больших блоков, когда нам это вовсе не требуется. Представьте — мы прочитали 16 байт, обнаружили там пусть даже 7 цифр плюс стоп-символ, но остальные-то 8 байт попросту выкинули и перечитаем их заново.
Так что SIMD — полезно, но не всегда!
Заметки на полях
Можно ли ускорить приведенный выше алгоритм? Конечно!
Число можно парсить в
v128
, даже если оно находится не в самом начале, если использоватьi8x16.add_sat_u
для добавления смещения.Вычленить ключевые символы (
s, l, t, h, r, d, w
) можно прямо вxword
, если сначала получить битовую маску «послепробельных» символов, а затем конвертировать ее в позиционное представление нужныхi8
-блоков.Вместо таблицы функций можно использовать таблицу соответствий «ключевой символ» =>
{тип, длина, значение}
.
const { readFileSync } = require('fs');
const fn = 'buffers-test';
const run = async () => {
const buffer = readFileSync(`${fn}.wasm`);
const module = await WebAssembly.compile(buffer);
const memory = new WebAssembly.Memory({initial : 1});
const data = readFileSync('buffers.txt');
memory.grow((data.byteLength >> 16) + 8);
const prj32 = new Uint32Array(memory.buffer);
const prj8 = new Uint8Array(memory.buffer);
// единственный раз передадим исходное смещение через переменную
const off = new WebAssembly.Global({value : 'i32', mutable : true}, 0x80000);
data.copy(prj8, off.value);
// дописываем отсутствующий '\n' у последней строки
prj8[off.value + data.length] = 0x0A;
const importObject = {
js : {
memory
, off
}
};
// 10^N для домножения первого сегмента
let pow10 = 1;
for (let i = 0; i < 8; i++) {
prj32[(i << 1) + 32] = pow10;
pow10 *= 10;
}
// таблица соответствий ключевых символов
let keys = {
's' : 0x010700 // 'shared '
, 'l' : 0x010604 // 'local '
, 't' : 0x010508 // 'temp '
, 'h' : 0x000400 // 'hit='
, 'r' : 0x000501 // 'read='
, 'd' : 0x000802 // 'dirtied='
, 'w' : 0x000803 // 'written='
};
Object.entries(keys).forEach(([key, val]) => prj32[48 + key.charCodeAt()] = val);
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/clz32
const clz = Math.clz32;
const ctz = integer => {
integer |= integer << 16;
integer |= integer << 8;
integer |= integer << 4;
integer |= integer << 2;
integer |= integer << 1;
return 32 - clz(~integer) | 0;
};
// https://stackoverflow.com/questions/43122082/efficiently-count-the-number-of-bits-in-an-integer-in-javascript/43122214
const popcnt = n => {
n = n - ((n >> 1) & 0x55555555)
n = (n & 0x33333333) + ((n >> 2) & 0x33333333)
return ((n + (n >> 4) & 0xF0F0F0F) * 0x1010101) >> 24
};
// обратная битовая маска
for (let i = 0; i < (1 << 16); i++) {
if (popcnt(i) < 5) {
let r = 0xFFFFFFFF;
for (let v = i; v; ) {
let bp = 31 - clz(v);
r <<= 8;
r |= bp;
v ^= 1 << bp;
}
prj32[16384 + i] = r;
}
}
const instance = await WebAssembly.instantiate(module, importObject);
const parseBuffersWASM = instance.exports.parseBuffers;
const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
const m32to64 = 0x100000000;
const parseBuffers = () => {
let mask = parseBuffersWASM();
switch (mask) {
case 1:
return {'shared-hit' : prj32[0] + prj32[1] * m32to64};
case 2:
return {'shared-read' : prj32[2] + prj32[3] * m32to64};
case 3:
return {
'shared-hit' : prj32[0] + prj32[1] * m32to64
, 'shared-read' : prj32[2] + prj32[3] * m32to64
};
default:
let rv = {};
for (let i = 0; mask; mask >>= 1, i++) {
if (mask & 1) {
let off = i << 1;
rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64;
}
}
return rv;
}
};
for (let lim = data.length + off.value; prj32[16383] < lim; ) {
let obj = parseBuffers();
}
off.value = 0x80000;
prj32[16383] = off.value;
const hrb = process.hrtime();
// -- 8< --
for (let lim = data.length + off.value; prj32[16383] < lim; ) {
let obj = parseBuffers();
}
// -- 8< --
const hre = process.hrtime(hrb);
const usec = hre[0] * 1e+9 + hre[1];
console.log(usec);
};
run();
(module
(import "js" "func" (func $js_func (param i32)))
(import "js" "memory" (memory 1))
;; текущее смещение "курсора"
(global $off (import "js" "off") (mut i32))
;; текущее значение
(global $val (mut i64)
(i64.const 0)
)
(global $stepA_pow10 v128
(v128.const i16x8 1 10 100 1000 1 10 100 1000)
)
(global $stepB_pow10 v128
(v128.const i32x4 1 10000 100000000 0)
)
(global $stepA_mask v128
(v128.const i16x8 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000)
)
(global $stepB_mask v128
(v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000)
)
(global $all_zero v128
(v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30)
)
(global $all_nine v128
(v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09)
)
(global $shuf_straight v128
(v128.const i8x16 0x00 0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08 0x09 0x0A 0x0B 0x0C 0x0D 0x0E 0x0F)
)
(global $shuf_reverse v128
(v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00)
)
(global $all_digit v128
(v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F)
)
(global $all_space v128
(v128.const i8x16 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20)
)
(global $all_0A v128
(v128.const i8x16 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A)
)
;; обновляем маску атрибутов и значение в памяти
(func $setData (param $key i32) (param $mask i32) (result i32)
;; buf[key << 3] = val
(i64.store
(i32.shl
(local.get $key)
(i32.const 3)
)
(global.get $val)
)
;; mask |= 1 << key
(i32.or
(local.get $mask)
(i32.shl
(i32.const 1)
(local.get $key)
)
)
)
;; разбор числа - SIMD
(func $parseNumber (param $xword v128) (param $off i32) (result i32)
(local $xn i32)
(local $xtempL v128)
(local $xtempH v128)
;; xn = num digits
(local.set $xn
(i32.ctz ;; ^ = 5
(i32.xor ;; 0b... 1 0 0 0 0 0
(i32.shr_u ;; 0b... 0 1 1 1 1 1
(i8x16.bitmask ;; 0b... 0 1 1 1 1 1 0
(i8x16.le_u ;; 0x... 00 FF FF FF FF FF 00
(i8x16.sub ;; 0x... F0 05 04 03 02 01 0D
(local.get $xword) ;; 0x... 20 35 34 33 32 31 3D
(global.get $all_zero) ;; 0x... 30 30 30 30 30 30 30
)
(global.get $all_nine) ;; 0x... 09 09 09 09 09 09 09
)
)
(local.get $off) ;; off = 1
)
(i32.const 0xFFFFFFFF) ;; 0b... 1 1 1 1 1 1 1
)
)
)
;; clean & reverse
(local.set $xword
(v128.and ;; 0x... 00 00 01 02 03 04 05
(i8x16.swizzle ;; 0x... 00 00 31 32 33 34 35
(local.get $xword) ;; 0x... 20 35 34 33 32 31 3D
(i8x16.add_sat_u ;; 0x... FF FF 01 02 03 04 05
(i8x16.sub ;; 0x... FF FF 00 01 02 03 04
(global.get $shuf_reverse) ;; 0x... 09 0A 0B 0C 0D 0E 0F
(i8x16.splat ;; 0x... 0B 0B 0B 0B 0B 0B 0B
(i32.sub ;; = 0x0B
(i32.const 16)
(local.get $xn) ;; xn = 5
)
)
)
(i8x16.splat ;; 0x... 01 01 01 01 01 01 01
(local.get $off) ;; off = 1
)
)
)
(global.get $all_digit) ;; 0x... 0F 0F 0F 0F 0F 0F 0F
)
)
;; i16x8.extmul_low_i8x16_u - not implemented
(local.set $xtempL
(i16x8.mul ;; H000 G00 F0 E D000 C00 B0 A
(i16x8.widen_low_i8x16_u ;; 0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A
(local.get $xword) ;; ... H G F E D C B A
)
(global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1
)
)
;; i32x4.extadd_pairwise_i16x8_u - not implemented
(local.set $xtempL
(i16x8.add ;; H000 HG00 F0 FE D000 DC00 B0 BA
(i32x4.shr_u ;; 0 H000 0 F0 0 D000 0 B0
(local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A
(i32.const 16)
)
(local.get $xtempL) ;; H000 G00 F0 E D000 C00 B0 A
)
)
(local.set $xtempH
(i16x8.mul ;; P000 O00 N0 M L000 K00 J0 I
(i16x8.widen_high_i8x16_u ;; 0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I
(local.get $xword) ;; P O N M L K J I ...
)
(global.get $stepA_pow10) ;; 1000 100 10 1 1000 100 10 1
)
)
(local.set $xtempH
(i16x8.add ;; P000 PO00 N0 NM L000 LK00 J0 JI
(i32x4.shr_u ;; 0 P000 0 N0 0 L000 0 J0
(local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I
(i32.const 16)
)
(local.get $xtempH) ;; P000 O00 N0 M L000 K00 J0 I
)
)
;; shuffle
(local.set $xword
(i8x16.shuffle ;; PO00 NM LK00 JI HG00 FE DC00 BA
0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D
(local.get $xtempL) ;; H000 HG00 F0 FE D000 DC00 B0 BA
(local.get $xtempH) ;; P000 PO00 N0 NM L000 LK00 J0 JI
)
)
(local.set $xword
(i16x8.add ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
(i32x4.shr_u ;; 0 PO00 0 LK00 0 HG00 0 DC00
(local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA
(i32.const 16)
)
(local.get $xword) ;; PO00 NM LK00 JI HG00 FE DC00 BA
)
)
(local.set $xword
(v128.and ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA
(local.get $xword) ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
(global.get $stepA_mask) ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x...
)
)
(local.set $xword
(i32x4.mul ;; 0 LKJI00000000 HGFE0000 DCBA
(local.get $xword) ;; 0 PONM 0 LKJI 0 HGFE 0 DCBA
(global.get $stepB_pow10) ;; 0 100000000 10000 1
)
)
(local.set $xword
(i32x4.add ;; 0 LKJI00000000 HGFE0000 HGFEDCBA
(i64x2.shr_u ;; 0 0 0 HGFE0000
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA
(i32.const 32)
)
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 DCBA
)
)
(local.set $xword
(v128.and ;; 0 LKJI00000000 0 HGFEDCBA
(local.get $xword) ;; 0 LKJI00000000 HGFE0000 HGFEDCBA
(global.get $stepB_mask) ;; 00000000 FFFFFFFF 00000000 FFFFFFFF 0x...
)
)
(global.set $val
(i64.add
(i64x2.extract_lane
0
(local.get $xword)
)
(i64x2.extract_lane
1
(local.get $xword)
)
)
)
;; return
(local.get $xn)
)
(func (export "parseBuffers") (result i32)
(local $ch i32)
(local $chp i32)
(local $chv i32)
(local $chm i32)
(local $chi i32)
(local $ln i32)
(local $chpos i32)
(local $xword v128)
(local $xn i32)
(local $xbp i32)
(local $eol i32)
(local $esp i32)
(local $vtemp i64)
(local $mask i32)
(local $state i32)
(local $key i32)
;; mask = 0
(local.set $mask
(i32.const 0)
)
;; val = 0
(global.set $val
(i64.const 0)
)
;; off += 'Buffers: '.length
(global.set $off
(i32.add
(global.get $off)
(i32.const 9)
)
)
;; первый символ всегда "послепробельный"
(local.set $esp
(i32.const 1)
)
(local.set $xbp
(i32.const -1)
)
;; for (...)
(block
(loop
;; xword = line[off..+15]
(local.set $xword
(v128.load align=4
(global.get $off)
)
)
;; xbp == -1 -> не парсим сейчас число
;; xbp >= 0 -> начиная отсюда и парсим
(if
(i32.ne
(local.get $xbp)
(i32.const -1)
)
(then
;; store temp
(local.set $vtemp
(global.get $val)
)
;; xn = num digits
(local.set $xn
(call $parseNumber
(local.get $xword)
(local.get $xbp)
)
)
(if
(i32.eqz
(local.get $xbp)
)
(then
;; val = val * 10 ^ memory[128 + (n << 3)]:i32 + vtemp
(global.set $val
(i64.add
(i64.mul
(local.get $vtemp)
(i64.load align=4
(i32.add
(i32.shl
(local.get $xn)
(i32.const 3)
)
(i32.const 128)
)
)
)
(global.get $val)
)
)
)
)
;; store value
(local.set $mask
(call $setData
(local.get $key)
(local.get $mask)
)
)
;; xbp = -1
(local.set $xbp
(i32.const -1)
)
)
)
;; off += 16
(global.set $off
(i32.add
(global.get $off)
(i32.const 16)
)
)
;; формируем битовую маску "послепробельных" символов
(local.set $chm
(i32.and
(i32.add
(i32.shl
;; space mask
(i8x16.bitmask
(i8x16.eq
(local.get $xword)
(global.get $all_space)
)
)
(i32.const 1)
)
;; выставляем для первого символа, что он "послепробельный",
;; если пробел был последним символом предыдущего блока
(local.get $esp)
)
(i32.const 0xFFFF)
)
)
;; по таблице превращаем битовую маску в позиции битов
;; shared hit=1 read
;; 1000000100000100 => 0x FF 0D 07 00
;; chp = memory[0x10000 + (chm << 2)]:i32
(local.set $chp
(i32.load align=4
(i32.add
(i32.const 0x10000)
(i32.shl
(local.get $chm)
(i32.const 2)
)
)
)
)
;; превращаем позиции символов в их значения
(local.set $chv
(i32x4.extract_lane
0
(i8x16.swizzle
(local.get $xword)
(i32x4.replace_lane
0
(i8x16.splat
(i32.const 0xFF)
)
(local.get $chp)
)
)
)
)
;; перебираем символы согласно выставленным битам маски, пока она не обнулится
(block
(loop
;; if (chv == 0) break
(br_if 1
(i32.eqz
(local.get $chv)
)
)
(local.set $chpos
(i32.and
(local.get $chp)
(i32.const 0xFF)
)
)
;; ch = xword[chpos]
(local.set $ch
(i32.and
(local.get $chv)
(i32.const 0xFF)
)
)
;; ch = memory[0xC0 + ch]
(local.set $ch
(i32.load align=4
(i32.add
(i32.shl
(local.get $ch)
(i32.const 2)
)
(i32.const 0xC0)
)
)
)
;; ln = ch >> 8
(local.set $ln
(i32.shr_u
(local.get $ch)
(i32.const 8)
)
)
;; позиция начала числа, смещение хранится в старшем байте ch
(local.set $xbp
(i32.and
(i32.add
(local.get $chpos)
(local.get $ln)
)
(i32.const 0xFF)
)
)
;; ch & 0x10000
(if
(i32.eqz
(i32.and
(local.get $ch)
(i32.const 0x10000)
)
)
;; key
(then
(local.set $key
(i32.and
(i32.add
(local.get $state)
(local.get $ch)
)
(i32.const 0x0F)
)
)
;; если число начинается в следующем блоке - переходим к нему
(if
(i32.ge_u
(local.get $xbp)
(i32.const 0x10)
)
(then
;; val = 0
(global.set $val
(i64.const 0)
)
;; xbp -= 16
(local.set $xbp
(i32.and
(local.get $xbp)
(i32.const 0x0F)
)
)
(br 1)
)
)
;; xn = num digits
(local.set $xn
(call $parseNumber
(local.get $xword)
(local.get $xbp)
)
)
;; xbp = xbp + xn == 0x10 ? 0 : -1
(local.set $xbp
(if (result i32)
(i32.eq
(i32.add
(local.get $xbp)
(local.get $xn)
)
(i32.const 0x10)
)
(then
(i32.const 0)
)
(else
;; store value
(local.set $mask
(call $setData
(local.get $key)
(local.get $mask)
)
)
(i32.const -1)
)
)
)
)
;; state
(else
(local.set $state
(i32.and
(local.get $ch)
(i32.const 0x0F)
)
)
)
)
;; chv >>= 8
(local.set $chv
(i32.shr_u
(local.get $chv)
(i32.const 8)
)
)
;; chp >>= 8
(local.set $chp
(i32.shr_u
(local.get $chp)
(i32.const 8)
)
)
;; do loop
(br 0)
)
)
;; esp = xword[15] == 0x20
(local.set $esp
(i32.eq
(i8x16.extract_lane_u
15
(local.get $xword)
)
(i32.const 0x20)
)
)
;; check EOL
(local.set $eol
(i8x16.bitmask
(i8x16.eq
(local.get $xword)
(global.get $all_0A)
)
)
)
;; if (!eol) break
(br_if 0
(i32.eqz
(local.get $eol)
)
)
;; break, end loop
;; store value
(local.set $mask
(call $setData
(local.get $key)
(local.get $mask)
)
)
;; определяем позицию перевода строки
(local.set $eol
(i32.ctz
(local.get $eol)
)
)
;; встаем на начало следующей строки
;; off += 15 - eol
(global.set $off
(i32.sub
(i32.add
(global.get $off)
(local.get $eol)
)
(i32.const 15)
)
)
;; break
(br 1)
)
)
;; сохраняем текущее смещение
(i32.store
(i32.const 0xFFFC) ;; 16383 * 4
(global.get $off)
)
;; возвращаем маску
(local.get $mask)
)
)
Увы, все эти оптимизации не позволяют обогнать SWAR-код из прошлой части, а лишь добиться паритета с ним.
В целом, для нашей конкретной задачи использование SIMD оказывается малоэффективным, поскольку расстояния в памяти между значимой информацией достаточно велики, а сама она мала. Например, для полного декодирования такой строки длиной 47 символов достаточно прочитать всего 11 из них:
Buffers: shared hit=123 read=45, temp written=6
^ ^ *** ^ ** ^ ^ *
Материалы: