[Перевод] Ox: Императивный подход к параллельной потоковой обработке данных

e8956c1770b924a8eea62f2281aace26.png

Ox, библиотека Scala для безопасного параллелизма и отказоустойчивости в императивном стиле (direct‑style) на JVM, получила новую реализацию параллельной потоковой обработки данных (concurrent streaming). Она позволяет определять конвейеры обработки данных с помощью функционального API, императивного API или сразу обоих вариантов одновременно.

Потоковая обработка данных в Ox была и раньше: предыдущая реализация была основана исключительно на каналах (channels). Хоть она и работала, но все‑таки имела свои недостатки: каждый этап преобразования вводил асинхронную границу. В некоторых ситуациях это может быть неэффективно: если вы оперируете всего лишь несколькими неблокирующими и не требующими больших затрат CPU этапами, такими как .filter, .mapStateful или .interleave, асинхронные границы просто не нужны. Следовательно, такой подход приводил к избыточному параллелизму.

Чтобы улучшить эту ситуацию, мы ввели flow API. Каналы по‑прежнему остаются основным структурным элементом, предлагающим императивный способ определения преобразований данных. Flow‑потоки («flow» — букв. «поток», но чтобы не возникало путаницы с thread«ами и stream«ами, мы будем использовать эту форму) поверх каналов предлагают функциональный API с рядом высокоуровневых комбинаторов, хорошо известных по другим потоковым библиотекам, таким как Akka Streams или fs2.

Давайте же разберемся, как flow‑потоки работают и взаимодействуют с каналами!

Так же как и каналы в Ox были вдохновлены алгоритмом из Kotlin, асинхронные flow‑потоки тоже уже были там реализованы.

Пример flow-потока

Вот небольшой пример flow API в действии, запускаемый с помощью Scala CLI:

//> using dep "com.softwaremill.ox::core:0.5.0"

import ox.flow.Flow
import java.net.URI

@main def capitals(): Unit =
  Flow
    .fromValues("Poland", "Argentina", "Italy", "Germany", 
      "Republic of India",  "France", "Japan", "United Kingdom", 
      "Australia", "USA")
    .map(_.toLowerCase().replace(" ", "%20"))
    .mapPar(3)(country =>
      val getCapital = new URI(
        s"https://restcountries.com/v3.1/name/$country?fields=capital")
      Flow
        .fromInputStream(getCapital.toURL().openStream())
        .linesUtf8
        .take(1)
        .runToList()
    )
    .runForeach(println)

Первая особенность flow‑потоков заключается в том, что они вычисляются лениво. Вызывая методы .map, .filter и т. д., мы лишь описываем, как должны быть преобразованы данные. Ничего не будет происходить до фактического запуска flow‑потока.

Второй важный момент заключается в том, что параллелизм скрыт. Вы описываете то, что вам нужно, с помощью предоставленных высокоуровневых методов — нет необходимости использовать каналы напрямую, не говоря уже о других примитивах параллелизма.

Наконец, мы можем использовать эффективные блокирующие операции без ущерба для выразительности и производительности. Это стало возможным благодаря виртуальным потокам (virtual threads) и структурированному параллелизму Java, реализованным в Java 21+ и Ox.

Flow-поток под микроскопом

Чтобы лучше понять, как работают flow‑потоки, давайте рассмотрим комбинаторы (combinators), используемые в примере выше. Во‑первых, у нас есть Flow.fromValues(...). Он создает Flow[String]: описание flow‑потока, которое при запуске выдает некоторое количество строк. В данном случае это конечный список стран, но потенциально flow‑потоки могут быть и бесконечными.

Flow
  .fromValues(...)
  .map(country => ...)

Затем мы создаем.map для этого flow‑потока, приводя к нижнему регистру имена и делая простенькое URL‑кодирование (не делайте так в продакшене;)). И снова никакие данные не обрабатываются, поскольку мы еще не запустили flow‑поток. Мы просто получили еще один экземпляр Flow[String], который содержит два этапа: один производит значения, а другой преобразует их. Если бы в этот момент был запущен flow‑поток, оба этапа выполнялись бы в вызывающем thread‑потоке без какого‑либо параллелизма.

Flow
  .fromValues(...)
  .map(country => ...)
  .mapPar(3)(country =>
      ...
  )

Все становится еще интереснее, когда мы доходим до .mapPar. Этот комбинатор производит преобразование (mapping) каждого элемент входящего flow‑потока, выполняя до 3 одновременных вызовов функции преобразования (mapping function). Это полезно, если маппинг‑функция производит какие‑либо эффекты или, в противном случае, блокирует. Так и здесь: мы выполняем HTTP GET‑запрос для каждой страны, чтобы преобразовать название страны в название столицы.

При запуске .mapPar запускает определенный ранее конвейер (который выдает столицы) в фоновом форке (fork). Любые элементы, выдаваемые конвейером, отправляются в канал, который по умолчанию имеет емкость в 16 элементов (ее можно настроить). Другой фоновый форк получает элементы из этого канала (названия столиц в нижнем регистре) и создает дополнительные форки, которые выполняют HTTP‑запросы. Наконец, в вызывающем thread‑потоке мы собираем ответы, следя за сохранением первоначального порядка.

Весь этот процесс (к счастью) скрыт от конечного пользователя, и в нем используются форки и concurrency scopes Ox. Это означает, что если что‑то пойдет не так, например, HTTP‑запрос выбросит IOException, все будет очищено, и только после этого произойдет распространение исключения. «Очистка» в данном контексте означает прерывание и ожидание завершения всех форков, которые еще выполняются.

Flow
  .fromValues(...)
  .map(_.toLowerCase().replace(" ", "%20"))
  .mapPar(3)(country =>
      ...
  )
  .runForeach(println)

Через пару мгновений мы вернемся к реализации функции преобразования, а пока давайте быстро рассмотрим последнюю операцию основного flow‑потока: .runForeach(println). Этот метод фактически запускает flow‑поток, блокируя вызывающий thread‑поток до тех пор, пока не будут обработаны все элементы. Только после вызова этого метода начинается обработка данных и происходят любые эффекты, являющиеся частью потока (stream).

Наконец, давайте посмотрим на реализацию функции преобразования стран в столицы:

val getCapital = new URI(s"https://restcountries.com/v3.1/name/$country?fields=capital")
Flow
  .fromInputStream(getCapital.toURL().openStream())
  .linesUtf8
  .take(1)
  .runToList()

Скорее для демонстрации, нежели по реальной необходимости, в ней также используется Flow. Сначала мы открываем InputStream (используя URL Java — опять же, не делайте этого в продакшене, используйте вместо этого sttp‑клиент). Он используется для создания Flow[Chunk[Byte]] с помощью Flow.fromInputStream. Этот метод выдает фрагменты байтов, считанные из входного потока, и гарантирует, что InputStream будет закрыт, независимо от того, завершился ли flow‑поток успешно или с ошибкой.

Затем мы используем вспомогательный метод .linesUtf8, который разбирает байтовые блоки на строки. Нас интересует только первая строка (так как в ней обычно одна столица), и мы запускаем flow‑поток, собирая все выданные элементы в список с помощью .runToList(). Здесь не хватает парсинга JSON, так как это стандартный формат ответов, но мы оставим это как упражнение для читателя :).

Доступные комбинаторы

Flow API содержит ряд комбинаторов, хорошо известных любому пользователю библиотеки коллекций Scala — преобразование, фильтрация, сбор и т. д. Есть также некоторые специфические для потока комбинаторы, такие как троттлинг, чередование/перемешивание (interleaving/interspersing), группировка элементов на основе количества элементов или времени, преобразование с учетом состояния и другие.

Аналогично, объект‑компаньон Flow содержит ряд способов удобного создания flow‑потоков, таких как итерация по предоставленным значениям, развертка по функциям или чтение из InputStream, как описано выше. Если этих способов недостаточно, то для гибкого определения потоков может пригодиться метод .usingEmit:

import ox.flow.Flow

def isNoon(): Boolean = ???

val intFlow = Flow.usingEmit: emit =>
  emit(1)
  for i <- 4 to 50 do emit(i)
  if isNoon() then emit(42)

Также можно ввести явные асинхронные границы с помощью .async(). Это может быть полезно, если создание следующего элемента и потребление предыдущего должны выполняться параллельно, или если время обработки потребителя меняется, и производитель должен буферизировать элементы. Под капотом здесь используются каналы.

Императивный API

Как уже говорилось в начале, функциональный API — это только одна сторона поддержки потоковой передачи в Ox. Каналы предлагают императивный API, который обеспечивает максимальную гибкость, цена которой заключается в больше подробности описания. При таком подходе вы можете использовать блокирующие операции .send() и .receive() для получения, преобразования и передачи данных на последующие этапы конвейера.

Flow‑потоки и каналы полностью совместимы: вы можете создать flow‑поток из канала и запустить flow‑поток в канал. Как было описано выше, параллелизм внутри этапов flow‑потока реализуется с помощью каналов.

В качестве небольшого примера приведем метод, преобразующий канал строк в канал целых чисел:

import ox.Ox
import ox.channels.Source
import ox.flow.Flow

def transformChannel(ch: Source[String])(using Ox): Source[Int] =
  Flow.fromSource(ch)
    .mapConcat(_.split(" "))
    .mapConcat(_.toIntOption)
    .filter(_ % 2 == 0)
    .runToChannel()

Обратите внимание, что мы должны запускать этот метод в рамках concurrency scope (используя Ox), поскольку запуск flow‑потока на канал создает фоновый форк, который запускает flow‑поток и отправляет все испускаемые элементы на возвращенный канал. Эта concurrency scope также является областью, в которой обрабатываются ошибки.

Расширение flow API 

Вы можете создавать собственные этапы обработки flow‑потока, реализуя пользовательский FlowStage:

trait FlowStage[+T]:
  def run(emit: FlowEmit[T]): Unit

trait FlowEmit[-T]:
  /** Передает значение для последующей обработки. */
  def apply(t: T): Unit

Метод run должен реализовать логику обработки данных с помощью ранее определенного конвейера (доступного в виде экземпляра FlowStage). Как будет выполняться предыдущий конвейер, синхронно или асинхронно, полностью зависит от реализации этапа flow‑потока.

Ox содержит одно расширение flow API, ориентированное на производителей и потребителей Kafka, доступное как часть модуля kafka. Мы надеемся увидеть гораздо больше комбинаторов, доступных как в рамках основного API, так и в рамках интеграций внутри и за пределами проекта Ox! Если вы хотите принять участие, форум сообщества — отличное отправная точка.

Попробуйте Ox уже сегодня!

Ox лицензирован под Apache2. Исходники доступны на Maven Central. Все, что вам нужно, чтобы начать экспериментировать со структурированным параллелизмом и блокированием потоков данных, — это добавить следующую зависимость в вашу сборку:

// зависимость для sbt
"com.softwaremill.ox”%% "core”% "0.5.0”

// зависимость для scala‑cli
//> using dep com.softwaremill.ox::core:0.5.0

Мы будем благодарны за любые ваши отзывы — как о функционале потоковой передачи, так и о наборе функций Ox в целом. Чего вам не хватает для написания приложений на Scala в императивном стиле? Форум сообщества и проблемы на GitHub — отличное место, чтобы оставлять свои комментарии/багрепорты.

Ближайшие открытые уроки по Scala, которые пройдут в рамках курса Otus:

  • 16 декабря — Эффекты в Scala. Обсудим, что такое эффекты и какие они бывают; узнаем о понятии функционального эффекта и какие задачи они могут решать; реализуем свой функциональный эффект. Записаться

  • 23 декабря — Основы и особенности языка Scala. Попрактикуемся в написании функций и методов, создании иммутабельных конструкций, получим представление о возможностях композиции. Записаться

© Habrahabr.ru