Разгоняем JS-парсер с помощью WebAssembly (часть 3: SIMD)

В предыдущей статье мы остановились на варианте, который с помощью SWAR-хинта превращает 8 последовательных цифр в одно числовое 32bit-значение. Но что если мы предположим, что все значения у нас, в основном, невелики — до 3 цифр? Тогда нам вполне достаточно использовать всего лишь 32bit-инструкции, а SWAR будет выполнен за 2 операции вместо 3 — сплошной выигрыш!

Давайте перепишем наш код так, чтобы первый блок из 4 символов обрабатывался 32bit-инструкциями, а если понадобится второй блок из 8 символов — уже 64bit-инструкциями.

И… вместо 26ms получаем 28ms! Значит, наше предположение относительно длины чисел не оправдалось, и в первом блоке выгоднее обрабатывать сразу побольше символов.

большие - но по 5, маленькие - но по 3большие -, но по 5, маленькие -, но по 3

То есть больше размерность регистра — лучше? И такие регистры есть — это 128-битные SSE-регистры XMM — в WebAssembly они доступны нам как переменные с типом v128 и суперскалярные операции над ними.

Обратите внимание, что пока не все эти операции реализованы в разных JavaScript-движках.

В зависимости от используемой инструкции v128-регистр интерпретируется либо как единый блок (например, для битовых логических операций and, or, xor, not), либо как набор блоков меньшей размерности:

67c90c73e04cb7ad861416ca29266bc6.png

Как включить SIMD в Node.js

Для начала нам придется немного доработать наш «компилятор» compile-test.js: и включить дополнительные расширения:

const wasmModule = wabt.parseWat(
  inputWat
, readFileSync(inputWat, 'utf8')
, {simd : true} // включаем поддержку SIMD-feature
);

Посмотреть, какие существуют другие дополнительные «фичи» компиляции в WASM можно на демо-странице wat2wasm.

Поскольку в Node.js пока поддержка SIMD проходит в качестве экспериментальной, запускать наш парсер придется с включенными соответствующими V8-опциями:

  --experimental-wasm-simd (enable prototype SIMD opcodes for wasm)
        type: bool  default: false
  --wasm-simd-post-mvp (allow experimental SIMD operations for prototyping that are not included in the current proposal)
        type: bool  default: false

Теперь команда на запуск выглядит вот так:

node --experimental-wasm-simd --wasm-simd-post-mvp --expose-gc buffers-test.js

SIMD-парсинг натуральных чисел на WASM

Собственно, нам необходимо решить те же задачи, что и для SWAR-варианта, но уже с учетом возможностей v128-инструкций. За основу возьмем алгоритм, приведенный на StackOverflow.

Узнаем длину числа

  1. вычитаем «нулевой» символ 0x30 из каждого, с переходом через 0 — это означает, что наши стоп-символы 0x0A, 0x20 и 0x2C превратятся в 0xDA, 0xF0 и 0xFC соответственно

  2. вычисляем для каждого байта, что он <= 0x09 — то есть исходно находился в диапазоне 0x30..0x39, что соответствует символам '0'..'9'

  3. находим первый false-байт с помощью ctz(not(bitmask)) — это и есть позиция нашего стоп-символа

Для реализации этого в коде нам понадобятся две глобальные переменные-«константы»: «все нули» и «все девятки»:

  (global $all_zero v128
    (v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30)
  )
  (global $all_nine v128
    (v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09)
  )
  ;; разбор числа - SIMD
  (func $parseNumber
    (local $ch i32)
    (local $xword v128)
    (local $xn i32)

    ;; xword = line[off..+15]
    (local.set $xword
      (v128.load align=4
        (global.get $off)
      )
    )

    ;; xn = num digits
    (local.set $xn
      (i32.ctz                         ;;        * = 5
        (i32.xor                       ;; 0b...  1  0  0  0  0  0
          (i8x16.bitmask               ;; 0b...  0  1  1  1  1  1
            (i8x16.le_u                ;; 0x... 00 FF FF FF FF FF
              (i8x16.sub               ;; 0x... F0 05 04 03 02 01
                (local.get $xword)     ;; 0x... 20 35 34 33 32 31
                (global.get $all_zero) ;; 0x... 30 30 30 30 30 30 
              )
              (global.get $all_nine)   ;; 0x... 09 09 09 09 09 09
            )
          )
          (i32.const 0xFFFFFFFF)       ;; 0b...  1  1  1  1  1  1
        )
      )
    )

Определяем стоп-символ

8b55d7abd922a4710f36c081a46db616.jpeg

Просто так взять xn-й байт из v128-значения нельзя — нет соответствующей инструкции. Зато есть i8x16.extract_lane, которая позволяет взять заранее указанный байт (нас вполне устроит #0).

Но чтобы в нулевом байте оказался xn-й, нужно «переложить» байты в соответствии с маской с помощью swizzle.

А соответствующую маску, состоящую из единственного повторяющегося значения xn, нам позволит сделать инструкция splat.

    ;; ch = xword[xn]
    (local.set $ch           ;; ch = 0x20
      (i8x16.extract_lane_u  ;; xmm[0]
        0                    ;; константа
        (i8x16.swizzle       ;; 0x... 20 20 20 20 20 20
          (local.get $xword) ;; 0x... 20 35 34 33 32 31
          (i8x16.splat       ;; 0x... 05 05 05 05 05 05
            (local.get $xn)  ;; xn = 5
          )
        )
      )
    )

Нормализуем число

Аналогичным образом, с помощью swizzle «развернем» число, чтобы младший разряд оказался в младшем байте, и сразу зачистим от не-цифр за счет свойства этой операции «занулять» байты, которым соответствуют индексы больше 0x0F.

  (global $shuf_reverse v128
    (v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00)
  )
  (global $all_digit v128
    (v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F)
  )
    ;; clean & reverse
    (local.set $xword
      (v128.and                        ;; 0x... 00 01 02 03 04 05
        (i8x16.swizzle                 ;; 0x... 00 31 32 33 34 35
          (local.get $xword)           ;; 0x... 20 35 34 33 32 31
          (i8x16.sub                   ;; 0x... FF 00 01 02 03 04
            (global.get $shuf_reverse) ;; 0x... 0A 0B 0C 0D 0E 0F
            (i8x16.splat               ;; 0x... 0B 0B 0B 0B 0B 0B
              (i32.sub                 ;; = 0x0B
                (i32.const 16)
                (local.get $xn)        ;; xn = 5
              )
            )
          )
        )
        (global.get $all_digit)        ;; 0x... 0F 0F 0F 0F 0F 0F
      )
    )

SIMD-конвертация

Итак, мы получили состояние переменной (читай, регистра), когда в каждом из 16 байтов находятся значения цифр, позиционно соответствующие десятичным разрядам числа, которое мы хотим получить — в младшем байте младший разряд:

P O N M L K J I H G F E D C B A == A * 10^0 + B * 10^1 + C * 10^2 + D * 10^3 + ...

В идеале, для реализации алгоритма нам подошли бы инструкции расширенного умножения типа i16x8.extmul_[low|high]_i8x16_u и расширенного попарного сложения i16x8.extadd_pairwise_i8x16_u.

Первая из этих инструкций умножает нижние/верхние половины пары XMM-регистров, воспринимая их как набор 8bit-чисел на входе, расширяя выход до 16bit-чисел:

0x...   FC   FD   FE   FF
         x    x    x    x
0x...   04   03   02   01
         =    =    =    =
0x... 03F0 02F7 01FC 00FF = i16x8.extmul_low_i8x16_u

Вторая попарно складывает соседние «числа» в регистре:

0x... FC + FD   FE + FF
0x...    01F9      01FD = i16x8.extadd_pairwise_i8x16_u

Поскольку они пока не поддерживаются, нам придется их эмулировать с помощью доступных инструкций.

Итак, сначала мы выделим из нашего регистра с помощью i16x8.widen_low_i8x16_u нижнюю половину, расширив 8bit-числа до 16bit и перемножим с маской степеней 10:

  (global $stepA_pow10 v128
    (v128.const i16x8 1 10 100 1000 1 10 100 1000)
  )
;; ...
    ;; i16x8.extmul_low_i8x16_u - not implemented
    (local.set $xtempL
      (i16x8.mul                  ;; H000 G00  F0   E  D000 C00  B0   A
        (i16x8.widen_low_i8x16_u  ;;  0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A
          (local.get $xword)      ;;                ... H G F E D C B A
        )
        (global.get $stepA_pow10) ;; 1000 100  10   1  1000 100  10   1
      )
    )

Степени выбраны с таким умыслом, чтобы ни в одной из 16bit-ячеек результат не мог превзойти 2^16.

Затем эмулируем extadd_pairwise с помощью битового сдвига, сложения и, когда необходимо, наложения битовой маски:

    ;; i32x4.extadd_pairwise_i16x8_u - not implemented
    (local.set $xtempL
      (i16x8.add                  ;; H000 HG00   F0   FE D000 DC00   B0   BA
        (i32x4.shr_u              ;;    0 H000    0   F0    0 D000    0   B0
          (local.get $xtempL)     ;; H000  G00   F0    E D000  C00   B0    A
          (i32.const 16)
        )
        (local.get $xtempL)       ;; H000  G00   F0    E D000  C00   B0    A
      )
    )

То же самое проделаем со старшими байтами регистра:

    (local.set $xtempH
      (i16x8.mul                  ;; P000 O00  N0   M  L000 K00  J0   I
        (i16x8.widen_high_i8x16_u ;;  0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I
          (local.get $xword)      ;;  P O N M L K J I ...
        )                                                              
        (global.get $stepA_pow10) ;; 1000 100  10   1  1000 100  10   1
      )                           
    )

    (local.set $xtempH
      (i16x8.add                  ;; P000 PO00   N0   NM L000 LK00   J0   JI
        (i32x4.shr_u              ;;    0 P000    0   N0    0 L000    0   J0
          (local.get $xtempH)     ;; P000  O00   N0    M L000  K00   J0    I
          (i32.const 16)
        )
        (local.get $xtempH)       ;; P000  O00   N0    M L000  K00   J0    I
      )
    )

Упакуем оба полученных значения обратно в один регистр, отбросив лишние блоки с помощью shuffle:

    ;; shuffle
    (local.set $xword
      (i8x16.shuffle              ;; PO00   NM LK00   JI HG00   FE DC00   BA
        0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D 
        (local.get $xtempL)       ;; H000 HG00   F0   FE D000 DC00   B0   BA
        (local.get $xtempH)       ;; P000 PO00   N0   NM L000 LK00   J0   JI
      )
    )

Повторим сложение-умножение соседних блоков еще дважды — для i16×8 и i32×4, уже с наложением маски:

  (global $stepB_pow10 v128
    (v128.const i32x4 1 10000 100000000 0)
  )
  (global $stepB_mask v128
    (v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000)
  )
;; ...
    (local.set $xword
      (i16x8.add                  ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
        (i32x4.shr_u              ;;    0 PO00    0 LK00    0 HG00    0 DC00
          (local.get $xword)      ;; PO00   NM LK00   JI HG00   FE DC00   BA
          (i32.const 16)
        )
        (local.get $xword)        ;; PO00   NM LK00   JI HG00   FE DC00   BA
      )
    )

    (local.set $xword
      (v128.and                   ;;    0 PONM    0 LKJI    0 HGFE    0 DCBA
        (local.get $xword)        ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
        (global.get $stepA_mask)  ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x...
      )
    )

    (local.set $xword
      (i32x4.mul                  ;;         0 LKJI00000000  HGFE0000     DCBA
        (local.get $xword)        ;;    0 PONM       0 LKJI    0 HGFE   0 DCBA
        (global.get $stepB_pow10) ;;         0    100000000     10000        1
      )                           
    )

    (local.set $xword
      (i32x4.add                  ;;         0 LKJI00000000  HGFE0000 HGFEDCBA
        (i64x2.shr_u              ;;         0            0         0 HGFE0000
          (local.get $xword)      ;;         0 LKJI00000000  HGFE0000     DCBA
          (i32.const 32)
        )
        (local.get $xword)        ;;         0 LKJI00000000  HGFE0000     DCBA
      )
    )

    (local.set $xword
      (v128.and                   ;;         0 LKJI00000000         0 HGFEDCBA
        (local.get $xword)        ;;         0 LKJI00000000  HGFE0000 HGFEDCBA
        (global.get $stepB_mask)  ;;  00000000     FFFFFFFF  00000000 FFFFFFFF 0x...
      )
    )

Заметим, что самый старший блок цифр 'PONM' мы потеряли при умножениях, поскольку он не влез в размерность i32. Если все-таки предполагаются значения такой длины, данный сегмент можно вытащить отдельно через extract_lane.

Полный код SIMD-версии
(module
  (import "js" "memory" (memory 1))

  ;; текущее смещение "курсора"
  (global $off (import "js" "off") (mut i32))
  ;; текущее значение
  (global $val (mut i64)
    (i64.const 0)
  )
  (global $mask (mut i32)
    (i32.const 0)
  )
  (global $state (mut i32)
    (i32.const 0)
  )
  (global $key (mut i32)
    (i32.const 0)
  )

  (global $stepA_pow10 v128
    (v128.const i16x8 1 10 100 1000 1 10 100 1000)
  )
  (global $stepB_pow10 v128
    (v128.const i32x4 1 10000 100000000 0)
  )

  (global $stepA_mask v128
    (v128.const i16x8 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000)
  )
  (global $stepB_mask v128
    (v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000)
  )
  (global $all_zero v128
    (v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30)
  )
  (global $all_nine v128
    (v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09)
  )
  (global $shuf_reverse v128
    (v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00)
  )
  (global $all_digit v128
    (v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F)
  )

  ;; таблица косвенной адресации функций
  (table 128 anyfunc)
  (elem (i32.const 0)
    $null ;; 00
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 10
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 20
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 30
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 40
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 50
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $null ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 60
    $null ;;  1
    $null ;;  2
    $null ;;  3
    $step_d ;;  4
    $null ;;  5
    $null ;;  6
    $null ;;  7
    $step_h ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $step_l ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
    $null ;; 70
    $null ;;  1
    $step_r ;;  2
    $step_s ;;  3
    $step_t ;;  4
    $null ;;  5
    $null ;;  6
    $step_w ;;  7
    $null ;;  8
    $null ;;  9
    $null ;;  A
    $null ;;  B
    $null ;;  C
    $null ;;  D
    $null ;;  E
    $null ;;  F
  )

  ;; обновляем маску атрибутов и значение
  (func $setData
    ;; buf[key << 3] = val
    (i64.store
      (i32.shl
        (global.get $key)
        (i32.const 3)
      )
      (global.get $val)
    )
    ;; mask |= 1 << key
    (global.set $mask
      (i32.or
        (global.get $mask)
        (i32.shl
          (i32.const 1)
          (global.get $key)
        )
      )
    )
  )

  (func $step (param $off_v i32)
    ;; $off += $off_v
    (global.set $off
      (i32.add
        (global.get $off)
        (local.get $off_v)
      )
    )
  )

  ;; разбор числа - SIMD
  (func $parseNumber
    (local $ch i32)
    (local $xword v128)
    (local $xn i32)
    (local $xtempL v128)
    (local $xtempH v128)

    ;; xword = line[off..+15]
    (local.set $xword
      (v128.load align=4
        (global.get $off)
      )
    )

    ;; xn = num digits
    (local.set $xn
      (i32.ctz
        (i32.xor
          (i8x16.bitmask
            (i8x16.le_u
              (i8x16.sub
                (local.get $xword)
                (global.get $all_zero)
              )
              (global.get $all_nine)
            )
          )
          (i32.const 0xFFFFFFFF)
        )
      )
    )

    ;; ch = xword[xn]
    (local.set $ch           ;; ch = 0x20
      (i8x16.extract_lane_u  ;; xmm[0]
        0                    ;; константа
        (i8x16.swizzle       ;; 0x... 20 20 20 20 20 20
          (local.get $xword) ;; 0x... 20 35 34 33 32 31
          (i8x16.splat       ;; 0x... 05 05 05 05 05 05
            (local.get $xn)  ;; xn = 5
          )
        )
      )
    )

    ;; clean & reverse
    (local.set $xword
      (v128.and                        ;; 0x... 00 01 02 03 04 05
        (i8x16.swizzle                 ;; 0x... 00 31 32 33 34 35
          (local.get $xword)           ;; 0x... 20 35 34 33 32 31
          (i8x16.sub                   ;; 0x... FF 00 01 02 03 04
            (global.get $shuf_reverse) ;; 0x... 0A 0B 0C 0D 0E 0F
            (i8x16.splat               ;; 0x... 0B 0B 0B 0B 0B 0B
              (i32.sub                 ;; = 0x0B
                (i32.const 16)
                (local.get $xn)        ;; xn = 5
              )
            )
          )
        )
        (global.get $all_digit)        ;; 0x... 0F 0F 0F 0F 0F 0F
      )
    )

    ;; i16x8.extmul_low_i8x16_u - not implemented
    (local.set $xtempL
      (i16x8.mul                  ;; H000 G00  F0   E  D000 C00  B0   A
        (i16x8.widen_low_i8x16_u  ;;  0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A
          (local.get $xword)      ;;                ... H G F E D C B A
        )
        (global.get $stepA_pow10) ;; 1000 100  10   1  1000 100  10   1
      )
    )

    ;; i32x4.extadd_pairwise_i16x8_u - not implemented
    (local.set $xtempL
      (i16x8.add                  ;; H000 HG00   F0   FE D000 DC00   B0   BA
        (i32x4.shr_u              ;;    0 H000    0   F0    0 D000    0   B0
          (local.get $xtempL)     ;; H000  G00   F0    E D000  C00   B0    A
          (i32.const 16)
        )
        (local.get $xtempL)       ;; H000  G00   F0    E D000  C00   B0    A
      )
    )
    
    (local.set $xtempH
      (i16x8.mul                  ;; P000 O00  N0   M  L000 K00  J0   I
        (i16x8.widen_high_i8x16_u ;;  0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I
          (local.get $xword)      ;;  P O N M L K J I ...
        )                                                              
        (global.get $stepA_pow10) ;; 1000 100  10   1  1000 100  10   1
      )                           
    )

    (local.set $xtempH
      (i16x8.add                  ;; P000 PO00   N0   NM L000 LK00   J0   JI
        (i32x4.shr_u              ;;    0 P000    0   N0    0 L000    0   J0
          (local.get $xtempH)     ;; P000  O00   N0    M L000  K00   J0    I
          (i32.const 16)
        )
        (local.get $xtempH)       ;; P000  O00   N0    M L000  K00   J0    I
      )
    )

    ;; shuffle
    (local.set $xword
      (i8x16.shuffle              ;; PO00   NM LK00   JI HG00   FE DC00   BA
        0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D 
        (local.get $xtempL)       ;; H000 HG00   F0   FE D000 DC00   B0   BA
        (local.get $xtempH)       ;; P000 PO00   N0   NM L000 LK00   J0   JI
      )
    )

    (local.set $xword
      (i16x8.add                  ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
        (i32x4.shr_u              ;;    0 PO00    0 LK00    0 HG00    0 DC00
          (local.get $xword)      ;; PO00   NM LK00   JI HG00   FE DC00   BA
          (i32.const 16)
        )
        (local.get $xword)        ;; PO00   NM LK00   JI HG00   FE DC00   BA
      )
    )

    (local.set $xword
      (v128.and                   ;;    0 PONM    0 LKJI    0 HGFE    0 DCBA
        (local.get $xword)        ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
        (global.get $stepA_mask)  ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x...
      )
    )

    (local.set $xword
      (i32x4.mul                  ;;         0 LKJI00000000  HGFE0000     DCBA
        (local.get $xword)        ;;    0 PONM       0 LKJI    0 HGFE   0 DCBA
        (global.get $stepB_pow10) ;;         0    100000000     10000        1
      )                           
    )

    (local.set $xword
      (i32x4.add                  ;;         0 LKJI00000000  HGFE0000 HGFEDCBA
        (i64x2.shr_u              ;;         0            0         0 HGFE0000
          (local.get $xword)      ;;         0 LKJI00000000  HGFE0000     DCBA
          (i32.const 32)
        )
        (local.get $xword)        ;;         0 LKJI00000000  HGFE0000     DCBA
      )
    )

    (local.set $xword
      (v128.and                   ;;         0 LKJI00000000         0 HGFEDCBA
        (local.get $xword)        ;;         0 LKJI00000000  HGFE0000 HGFEDCBA
        (global.get $stepB_mask)  ;;  00000000     FFFFFFFF  00000000 FFFFFFFF 0x...
      )
    )

    ;; return
    (global.set $val
      (i64.add
        (i64x2.extract_lane
          0
          (local.get $xword)
        )
        (i64x2.extract_lane
          1
          (local.get $xword)
        )
      )
    )

    ;; шагаем на n символов
    ;; ch == '\n' || ch == ' ' => +1
    ;; ch == ','               => +2
    (call $step
      (i32.add
        (local.get $xn)
        (i32.add
          (i32.const 1)
          (i32.eq
            (local.get $ch)
            (i32.const 0x2C)
          )
        )
      )
    )
  )

  ;; [state, off] = [state_v, off + off_v]
  (func $iterate0 (param $state_v i32) (param $off_v i32)
    ;; state = state_v
    (global.set $state
      (local.get $state_v)
    )
    ;; off += off_v
    (call $step
      (local.get $off_v)
    )
  )

  ;; [key, off] = [state + state_v, off + off_v]
  (func $iterate1 (param $state_v i32) (param $off_v i32)
    ;; key = state + state_v
    (global.set $key
      (i32.add
        (global.get $state)
        (local.get $state_v)
      )
    )
    ;; off += off_v
    (call $step
      (local.get $off_v)
    )
  )

  (func $null (result i32)
    i32.const 0
  )

  (func $step_s (result i32) ;; shared
    ;; $state = 0
    ;; $off += 7
    (call $iterate0
      (i32.const 0)
      (i32.const 7)
    )

    i32.const 1
  )

  (func $step_l (result i32) ;; local
    ;; $state = 4
    ;; $off += 6
    (call $iterate0
      (i32.const 4)
      (i32.const 6)
    )

    i32.const 1
  )

  (func $step_t (result i32) ;; temp
    ;; $state = 8
    ;; $off += 5
    (call $iterate0
      (i32.const 8)
      (i32.const 5)
    )

    i32.const 1
  )

  (func $step_h (result i32) ;; hit
    ;; key = state + 0;
    ;; $off += 4
    (call $iterate1
      (i32.const 0)
      (i32.const 4)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_r (result i32) ;; read
    ;; key = state + 1;
    ;; $off += 5
    (call $iterate1
      (i32.const 1)
      (i32.const 5)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_d (result i32) ;; dirtied
    ;; key = state + 2;
    ;; $off += 8
    (call $iterate1
      (i32.const 2)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func $step_w (result i32) ;; written
    ;; key = state + 3;
    ;; $off += 8
    (call $iterate1
      (i32.const 3)
      (i32.const 8)
    )
    ;;
    call $parseNumber
    call $setData

    i32.const 1
  )

  (func (export "parseBuffers") (result i32)
    ;; mask = 0
    (global.set $mask
      (i32.const 0)
    )
    ;; off += 'Buffers: '.length
    (global.set $off
      (i32.add
        (global.get $off)
        (i32.const 9)
      )
    )
    ;; for (...)
    (block
      (loop
        ;; if (table[line[off]]()) continue;
        (br_if 0
          (call_indirect (result i32)
            (i32.load8_u
              (global.get $off)
            )
          )
        )
        ;; break, end loop
      )
    )

    ;; сохраняем текущее смещение
    (i32.store
      (i32.const 0xFFFC) ;; 16383 * 4
      (global.get $off)
    )
    ;; возвращаем маску
    global.get $mask
  )
)

Запускаем! Увы, наши ожидания не оправдались — время ухудшилось с 26ms до 30ms. Неужели мы написали такую плохую реализацию конвертации чисел?

Давайте выключим все эти вычисления, оставив лишь вычисление количества цифр xn и стоп-символа ch — и все равно уже 27ms!

Так что основные потери у нас возникают уже на чтении слишком больших блоков, когда нам это вовсе не требуется. Представьте — мы прочитали 16 байт, обнаружили там пусть даже 7 цифр плюс стоп-символ, но остальные-то 8 байт попросту выкинули и перечитаем их заново.

Так что SIMD — полезно, но не всегда!

Заметки на полях

Можно ли ускорить приведенный выше алгоритм? Конечно!

  1. Число можно парсить в v128, даже если оно находится не в самом начале, если использовать i8x16.add_sat_u для добавления смещения.

  2. Вычленить ключевые символы (s, l, t, h, r, d, w) можно прямо в xword, если сначала получить битовую маску «послепробельных» символов, а затем конвертировать ее в позиционное представление нужных i8-блоков.

  3. Вместо таблицы функций можно использовать таблицу соответствий «ключевой символ» => {тип, длина, значение}.

Полный код buffers-test.js
const { readFileSync } = require('fs');

const fn = 'buffers-test';

const run = async () => {
  const buffer = readFileSync(`${fn}.wasm`);
  const module = await WebAssembly.compile(buffer);
  
  const memory = new WebAssembly.Memory({initial : 1});
  
  const data = readFileSync('buffers.txt');
  memory.grow((data.byteLength >> 16) + 8);
  const prj32 = new Uint32Array(memory.buffer);
  const prj8 = new Uint8Array(memory.buffer);

  // единственный раз передадим исходное смещение через переменную
  const off = new WebAssembly.Global({value : 'i32', mutable : true}, 0x80000);

  data.copy(prj8, off.value);
  // дописываем отсутствующий '\n' у последней строки
  prj8[off.value + data.length] = 0x0A;

  const importObject = {
    js : {
      memory
    , off
    }
  };

  // 10^N для домножения первого сегмента
  let pow10 = 1;
  for (let i = 0; i < 8; i++) {
    prj32[(i << 1) + 32] = pow10;
    pow10 *= 10;
  }

  // таблица соответствий ключевых символов
  let keys = {
    's' : 0x010700 // 'shared '
  , 'l' : 0x010604 // 'local '
  , 't' : 0x010508 // 'temp '
  , 'h' : 0x000400 // 'hit='
  , 'r' : 0x000501 // 'read='
  , 'd' : 0x000802 // 'dirtied='
  , 'w' : 0x000803 // 'written='
  };
  Object.entries(keys).forEach(([key, val]) => prj32[48 + key.charCodeAt()] = val);

  // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/clz32
  const clz = Math.clz32;
  const ctz = integer => {
    integer |= integer << 16;
    integer |= integer << 8;
    integer |= integer << 4;
    integer |= integer << 2;
    integer |= integer << 1;
    return 32 - clz(~integer) | 0;
  };

  // https://stackoverflow.com/questions/43122082/efficiently-count-the-number-of-bits-in-an-integer-in-javascript/43122214
  const popcnt = n => {
    n = n - ((n >> 1) & 0x55555555)
    n = (n & 0x33333333) + ((n >> 2) & 0x33333333)
    return ((n + (n >> 4) & 0xF0F0F0F) * 0x1010101) >> 24
  };

  // обратная битовая маска
  for (let i = 0; i < (1 << 16); i++) {
    if (popcnt(i) < 5) {
      let r = 0xFFFFFFFF;
      for (let v = i; v; ) {
        let bp = 31 - clz(v);
        r <<= 8;
        r |= bp;
        v ^= 1 << bp;
      }
      prj32[16384 + i] = r;
    }
  }

  const instance = await WebAssembly.instantiate(module, importObject);
  const parseBuffersWASM = instance.exports.parseBuffers;

  const buffersKeys = ['shared-hit', 'shared-read', 'shared-dirtied', 'shared-written', 'local-hit', 'local-read', 'local-dirtied', 'local-written', 'temp-hit', 'temp-read', 'temp-dirtied', 'temp-written'];
  const m32to64 = 0x100000000;

  const parseBuffers = () => {
    let mask = parseBuffersWASM();
    switch (mask) {
      case 1:
        return {'shared-hit'  : prj32[0] + prj32[1] * m32to64};
      case 2:
        return {'shared-read' : prj32[2] + prj32[3] * m32to64};
      case 3:
        return {
          'shared-hit'  : prj32[0] + prj32[1] * m32to64
        , 'shared-read' : prj32[2] + prj32[3] * m32to64
        };
      default:
        let rv = {};
        for (let i = 0; mask; mask >>= 1, i++) {
          if (mask & 1) {
            let off = i << 1;
            rv[buffersKeys[i]] = prj32[off] + prj32[off + 1] * m32to64;
          }
        }
        return rv;
    }
  };

  for (let lim = data.length + off.value; prj32[16383] < lim; ) {
    let obj = parseBuffers();
  }
  off.value = 0x80000;
  prj32[16383] = off.value;

  const hrb = process.hrtime();
  // -- 8< --
  for (let lim = data.length + off.value; prj32[16383] < lim; ) {
    let obj = parseBuffers();
  }
  // -- 8< --
  const hre = process.hrtime(hrb);
  const usec = hre[0] * 1e+9 + hre[1];
  console.log(usec);
};

run();
Полный код buffers-test.wat
(module
  (import "js" "func" (func $js_func (param i32)))
  (import "js" "memory" (memory 1))

  ;; текущее смещение "курсора"
  (global $off (import "js" "off") (mut i32))
  ;; текущее значение
  (global $val (mut i64)
    (i64.const 0)
  )

  (global $stepA_pow10 v128
    (v128.const i16x8 1 10 100 1000 1 10 100 1000)
  )
  (global $stepB_pow10 v128
    (v128.const i32x4 1 10000 100000000 0)
  )

  (global $stepA_mask v128
    (v128.const i16x8 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000 0xFFFF 0x0000)
  )
  (global $stepB_mask v128
    (v128.const i32x4 0xFFFFFFFF 0x00000000 0xFFFFFFFF 0x00000000)
  )
  (global $all_zero v128
    (v128.const i8x16 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30 0x30)
  )
  (global $all_nine v128
    (v128.const i8x16 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09 0x09)
  )
  (global $shuf_straight v128
    (v128.const i8x16 0x00 0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08 0x09 0x0A 0x0B 0x0C 0x0D 0x0E 0x0F)
  )
  (global $shuf_reverse v128
    (v128.const i8x16 0x0F 0x0E 0x0D 0x0C 0x0B 0x0A 0x09 0x08 0x07 0x06 0x05 0x04 0x03 0x02 0x01 0x00)
  )
  (global $all_digit v128
    (v128.const i8x16 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F 0x0F)
  )
  (global $all_space v128
    (v128.const i8x16 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20 0x20)
  )
  (global $all_0A v128
    (v128.const i8x16 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A 0x0A)
  )

  ;; обновляем маску атрибутов и значение в памяти
  (func $setData (param $key i32) (param $mask i32) (result i32)
    ;; buf[key << 3] = val
    (i64.store
      (i32.shl
        (local.get $key)
        (i32.const 3)
      )
      (global.get $val)
    )
    ;; mask |= 1 << key
    (i32.or
      (local.get $mask)
      (i32.shl
        (i32.const 1)
        (local.get $key)
      )
    )
  )

  ;; разбор числа - SIMD
  (func $parseNumber (param $xword v128) (param $off i32) (result i32)
    (local $xn i32)
    (local $xtempL v128)
    (local $xtempH v128)

    ;; xn = num digits
    (local.set $xn
      (i32.ctz                           ;;           ^ = 5
        (i32.xor                         ;; 0b...     1  0  0  0  0  0
          (i32.shr_u                     ;; 0b...     0  1  1  1  1  1
            (i8x16.bitmask               ;; 0b...  0  1  1  1  1  1  0
              (i8x16.le_u                ;; 0x... 00 FF FF FF FF FF 00
                (i8x16.sub               ;; 0x... F0 05 04 03 02 01 0D
                  (local.get $xword)     ;; 0x... 20 35 34 33 32 31 3D
                  (global.get $all_zero) ;; 0x... 30 30 30 30 30 30 30
                )
                (global.get $all_nine)   ;; 0x... 09 09 09 09 09 09 09
              )
            )
            (local.get $off)             ;; off = 1
          )
          (i32.const 0xFFFFFFFF)         ;; 0b...  1  1  1  1  1  1  1
        )
      )
    )

    ;; clean & reverse
    (local.set $xword
      (v128.and                          ;; 0x... 00 00 01 02 03 04 05
        (i8x16.swizzle                   ;; 0x... 00 00 31 32 33 34 35
          (local.get $xword)             ;; 0x... 20 35 34 33 32 31 3D
          (i8x16.add_sat_u               ;; 0x... FF FF 01 02 03 04 05
            (i8x16.sub                   ;; 0x... FF FF 00 01 02 03 04
              (global.get $shuf_reverse) ;; 0x... 09 0A 0B 0C 0D 0E 0F
              (i8x16.splat               ;; 0x... 0B 0B 0B 0B 0B 0B 0B
                (i32.sub                 ;; = 0x0B
                  (i32.const 16)
                  (local.get $xn)        ;; xn = 5
                )
              )
            )
            (i8x16.splat                 ;; 0x... 01 01 01 01 01 01 01
              (local.get $off)           ;; off = 1
            )
          )
        )
        (global.get $all_digit)          ;; 0x... 0F 0F 0F 0F 0F 0F 0F
      )
    )

    ;; i16x8.extmul_low_i8x16_u - not implemented
    (local.set $xtempL
      (i16x8.mul                  ;; H000 G00  F0   E  D000 C00  B0   A
        (i16x8.widen_low_i8x16_u  ;;  0 H|0 G|0 F|0 E | 0 D|0 C|0 B|0 A
          (local.get $xword)      ;;                ... H G F E D C B A
        )
        (global.get $stepA_pow10) ;; 1000 100  10   1  1000 100  10   1
      )
    )

    ;; i32x4.extadd_pairwise_i16x8_u - not implemented
    (local.set $xtempL
      (i16x8.add                  ;; H000 HG00   F0   FE D000 DC00   B0   BA
        (i32x4.shr_u              ;;    0 H000    0   F0    0 D000    0   B0
          (local.get $xtempL)     ;; H000  G00   F0    E D000  C00   B0    A
          (i32.const 16)
        )
        (local.get $xtempL)       ;; H000  G00   F0    E D000  C00   B0    A
      )
    )
    
    (local.set $xtempH
      (i16x8.mul                  ;; P000 O00  N0   M  L000 K00  J0   I
        (i16x8.widen_high_i8x16_u ;;  0 P|0 O|0 N|0 M | 0 L|0 K|0 J|0 I
          (local.get $xword)      ;;  P O N M L K J I ...
        )                                                              
        (global.get $stepA_pow10) ;; 1000 100  10   1  1000 100  10   1
      )                           
    )

    (local.set $xtempH
      (i16x8.add                  ;; P000 PO00   N0   NM L000 LK00   J0   JI
        (i32x4.shr_u              ;;    0 P000    0   N0    0 L000    0   J0
          (local.get $xtempH)     ;; P000  O00   N0    M L000  K00   J0    I
          (i32.const 16)
        )
        (local.get $xtempH)       ;; P000  O00   N0    M L000  K00   J0    I
      )
    )

    ;; shuffle
    (local.set $xword
      (i8x16.shuffle              ;; PO00   NM LK00   JI HG00   FE DC00   BA
        0x00 0x01 0x04 0x05 0x08 0x09 0x0C 0x0D 0x10 0x11 0x14 0x15 0x18 0x19 0x1C 0x1D 
        (local.get $xtempL)       ;; H000 HG00   F0   FE D000 DC00   B0   BA
        (local.get $xtempH)       ;; P000 PO00   N0   NM L000 LK00   J0   JI
      )
    )

    (local.set $xword
      (i16x8.add                  ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
        (i32x4.shr_u              ;;    0 PO00    0 LK00    0 HG00    0 DC00
          (local.get $xword)      ;; PO00   NM LK00   JI HG00   FE DC00   BA
          (i32.const 16)
        )
        (local.get $xword)        ;; PO00   NM LK00   JI HG00   FE DC00   BA
      )
    )

    (local.set $xword
      (v128.and                   ;;    0 PONM    0 LKJI    0 HGFE    0 DCBA
        (local.get $xword)        ;; PO00 PONM LK00 LKJI HG00 HGFE DC00 DCBA
        (global.get $stepA_mask)  ;; 0000 FFFF 0000 FFFF 0000 FFFF 0000 FFFF 0x...
      )
    )

    (local.set $xword
      (i32x4.mul                  ;;         0 LKJI00000000  HGFE0000     DCBA
        (local.get $xword)        ;;    0 PONM       0 LKJI    0 HGFE   0 DCBA
        (global.get $stepB_pow10) ;;         0    100000000     10000        1
      )                           
    )

    (local.set $xword
      (i32x4.add                  ;;         0 LKJI00000000  HGFE0000 HGFEDCBA
        (i64x2.shr_u              ;;         0            0         0 HGFE0000
          (local.get $xword)      ;;         0 LKJI00000000  HGFE0000     DCBA
          (i32.const 32)
        )
        (local.get $xword)        ;;         0 LKJI00000000  HGFE0000     DCBA
      )
    )

    (local.set $xword
      (v128.and                   ;;         0 LKJI00000000         0 HGFEDCBA
        (local.get $xword)        ;;         0 LKJI00000000  HGFE0000 HGFEDCBA
        (global.get $stepB_mask)  ;;  00000000     FFFFFFFF  00000000 FFFFFFFF 0x...
      )
    )

    (global.set $val
      (i64.add
        (i64x2.extract_lane
          0
          (local.get $xword)
        )
        (i64x2.extract_lane
          1
          (local.get $xword)
        )
      )
    )
    
    ;; return
    (local.get $xn)
  )

  (func (export "parseBuffers") (result i32)
    (local $ch i32)

    (local $chp i32)
    (local $chv i32)
    (local $chm i32)
    (local $chi i32)

    (local $ln i32)
    (local $chpos i32)
    (local $xword v128)
    (local $xn i32)
    (local $xbp i32)
    (local $eol i32)
    (local $esp i32)
    (local $vtemp i64)

    (local $mask i32)
    (local $state i32)
    (local $key i32)

    ;; mask = 0
    (local.set $mask
      (i32.const 0)
    )
    ;; val = 0
    (global.set $val
      (i64.const 0)
    )
    ;; off += 'Buffers: '.length
    (global.set $off
      (i32.add
        (global.get $off)
        (i32.const 9)
      )
    )
    ;; первый символ всегда "послепробельный"
    (local.set $esp
      (i32.const 1)
    )
    (local.set $xbp
      (i32.const -1)
    )
    ;; for (...)
    (block
      (loop
        ;; xword = line[off..+15]
        (local.set $xword
          (v128.load align=4
            (global.get $off)
          )
        )

        ;; xbp == -1 -> не парсим сейчас число
        ;; xbp >=  0 -> начиная отсюда и парсим
        (if
          (i32.ne
            (local.get $xbp)
            (i32.const -1)
          )
          (then
            ;; store temp
            (local.set $vtemp
              (global.get $val)
            )

            ;; xn = num digits
            (local.set $xn
              (call $parseNumber
                (local.get $xword)
                (local.get $xbp)
              )
            )

            (if
              (i32.eqz
                (local.get $xbp)
              )
              (then
                ;; val = val * 10 ^ memory[128 + (n << 3)]:i32 + vtemp
                (global.set $val
                  (i64.add
                    (i64.mul
                      (local.get $vtemp)
                      (i64.load align=4
                        (i32.add
                          (i32.shl
                            (local.get $xn)
                            (i32.const 3)
                          )
                          (i32.const 128)
                        )
                      )
                    )
                    (global.get $val)
                  )
                )
              )
            )

            ;; store value
            (local.set $mask
              (call $setData
                (local.get $key)
                (local.get $mask)
              )
            )

            ;; xbp = -1
            (local.set $xbp
              (i32.const -1)
            )
          )
        )

        ;; off += 16
        (global.set $off
          (i32.add
            (global.get $off)
            (i32.const 16)
          )
        )

        ;; формируем битовую маску "послепробельных" символов
        (local.set $chm
          (i32.and
            (i32.add
              (i32.shl
                ;; space mask
                (i8x16.bitmask
                  (i8x16.eq
                    (local.get $xword)
                    (global.get $all_space)
                  )
                )
                (i32.const 1)
              )
              ;; выставляем для первого символа, что он "послепробельный",
              ;; если пробел был последним символом предыдущего блока
              (local.get $esp)
            )
            (i32.const 0xFFFF)
          )
        )

        ;; по таблице превращаем битовую маску в позиции битов
        ;; shared hit=1 read
        ;; 1000000100000100 => 0x FF 0D 07 00
        ;; chp = memory[0x10000 + (chm << 2)]:i32
        (local.set $chp
          (i32.load align=4
            (i32.add
              (i32.const 0x10000)
              (i32.shl
                (local.get $chm)
                (i32.const 2)
              )
            )
          )
        )
        ;; превращаем позиции символов в их значения
        (local.set $chv
          (i32x4.extract_lane
            0
            (i8x16.swizzle
              (local.get $xword)
              (i32x4.replace_lane
                0
                (i8x16.splat
                  (i32.const 0xFF)
                )
                (local.get $chp)
              )
            )
          )
        )

        ;; перебираем символы согласно выставленным битам маски, пока она не обнулится
        (block
          (loop
            ;; if (chv == 0) break
            (br_if 1
              (i32.eqz
                (local.get $chv)
              )
            )

            (local.set $chpos
              (i32.and
                (local.get $chp)
                (i32.const 0xFF)
              )
            )

            ;; ch = xword[chpos]
            (local.set $ch
              (i32.and
                (local.get $chv)
                (i32.const 0xFF)
              )
            )

            ;; ch = memory[0xC0 + ch]
            (local.set $ch
              (i32.load align=4
                (i32.add
                  (i32.shl
                    (local.get $ch)
                    (i32.const 2)
                  )
                  (i32.const 0xC0)
                )
              )
            )

            ;; ln = ch >> 8
            (local.set $ln
              (i32.shr_u
                (local.get $ch)
                (i32.const 8)
              )
            )

            ;; позиция начала числа, смещение хранится в старшем байте ch
            (local.set $xbp
              (i32.and
                (i32.add
                  (local.get $chpos)
                  (local.get $ln)
                )
                (i32.const 0xFF)
              )
            )

            ;; ch & 0x10000
            (if
              (i32.eqz
                (i32.and
                  (local.get $ch)
                  (i32.const 0x10000)
                )
              )
              ;; key
              (then
                (local.set $key
                  (i32.and
                    (i32.add
                      (local.get $state)
                      (local.get $ch)
                    )
                    (i32.const 0x0F)
                  )
                )

                ;; если число начинается в следующем блоке - переходим к нему
                (if
                  (i32.ge_u
                    (local.get $xbp)
                    (i32.const 0x10)
                  )
                  (then
                    ;; val = 0
                    (global.set $val
                      (i64.const 0)
                    )
                    ;; xbp -= 16
                    (local.set $xbp
                      (i32.and
                        (local.get $xbp)
                        (i32.const 0x0F)
                      )
                    )
                    (br 1)
                  )
                )

                ;; xn = num digits
                (local.set $xn
                  (call $parseNumber
                    (local.get $xword)
                    (local.get $xbp)
                  )
                )

                ;; xbp = xbp + xn == 0x10 ? 0 : -1
                (local.set $xbp
                  (if (result i32)
                    (i32.eq
                      (i32.add
                        (local.get $xbp)
                        (local.get $xn)
                      )
                      (i32.const 0x10)
                    )
                    (then
                      (i32.const 0)
                    )
                    (else
                      ;; store value
                      (local.set $mask
                        (call $setData
                          (local.get $key)
                          (local.get $mask)
                        )
                      )
                      (i32.const -1)
                    )
                  )
                )
              )
              ;; state
              (else
                (local.set $state
                  (i32.and
                    (local.get $ch)
                    (i32.const 0x0F)
                  )
                )
              )
            )

            ;; chv >>= 8
            (local.set $chv
              (i32.shr_u
                (local.get $chv)
                (i32.const 8)
              )
            )
            ;; chp >>= 8
            (local.set $chp
              (i32.shr_u
                (local.get $chp)
                (i32.const 8)
              )
            )
            
            ;; do loop
            (br 0)
          )
        )

        ;; esp = xword[15] == 0x20
        (local.set $esp
          (i32.eq
            (i8x16.extract_lane_u
              15
              (local.get $xword)
            )
            (i32.const 0x20)
          )
        )

        ;; check EOL
        (local.set $eol
          (i8x16.bitmask
            (i8x16.eq
              (local.get $xword)
              (global.get $all_0A)
            )
          )
        )

        ;; if (!eol) break
        (br_if 0
          (i32.eqz
            (local.get $eol)
          )
        )

        ;; break, end loop

        ;; store value
        (local.set $mask
          (call $setData
            (local.get $key)
            (local.get $mask)
          )
        )

        ;; определяем позицию перевода строки
        (local.set $eol
          (i32.ctz
            (local.get $eol)
          )
        )

        ;; встаем на начало следующей строки
        ;; off += 15 - eol
        (global.set $off
          (i32.sub
            (i32.add
              (global.get $off)
              (local.get $eol)
            )
            (i32.const 15)
          )
        )

        ;; break
        (br 1)
      )
    )

    ;; сохраняем текущее смещение
    (i32.store
      (i32.const 0xFFFC) ;; 16383 * 4
      (global.get $off)
    )
    ;; возвращаем маску
    (local.get $mask)
  )
)

Увы, все эти оптимизации не позволяют обогнать SWAR-код из прошлой части, а лишь добиться паритета с ним.

В целом, для нашей конкретной задачи использование SIMD оказывается малоэффективным, поскольку расстояния в памяти между значимой информацией достаточно велики, а сама она мала. Например, для полного декодирования такой строки длиной 47 символов достаточно прочитать всего 11 из них:

Buffers: shared hit=123 read=45, temp written=6
         ^      ^   *** ^    **  ^    ^       *

Материалы:

© Habrahabr.ru