[Перевод] Стриминговая аналитика с применением Apache Pulsar и структурированные потоки Spark

image

Эта статья написана в соавторстве Даниэлем и Джианнисом Полизосом, который ещё в 2017 году был одним из первых студентов Rock the JVM. Сейчас Джианнис — старший разработчик и контрибьютор Apache Pulsar, многообещающего нового инструментария для передачи распределённых сообщений и потоковых данных. В этой статье сочетаются два наших любимых технических инструмента: Apache Pulsar и Apache Spark.

Потоковая обработка — важный и необходимый аспект современных инфраструктур данных. Сегодня компании стремятся поставить себе на службу потоковую передачу и аналитику данных в реальном времени, чтобы быстрее предоставлять пользователям результаты, повышать удобство работы с ресурсом и, соответственно, поднимать его бизнес-ценность.

Примеров такого рода сколько угодно: представьте себе онлайн-сервис, предоставляющий пользователю рекомендации на основе того, какие действия пользователь совершает на веб-странице. Ещё можно представить IoT-компанию, желающую отслеживать показания сенсоров и своевременно реагировать на потенциальные сбои. К этой же категории относятся системы компьютерного зрения, которые должны в режиме реального времени анализировать видеозаписи или обнаруживать случаи мошенничества в банковских системах; этот список можно продолжать и продолжать.

Как правило, в конвейерах для потоковой обработки данных требуется уровень хранения потоков, например, Apache Pulsar или Apache Kafka. Далее для выполнения более тонких задач по обработке потоков нам потребуется движок потоковых вычислений, например, Apache Flink или Spark Structured Streaming.

Когда требуется обеспечить унифицированную пакетную обработку и работу с потоками в системах, развёрнутых в облаке, Apache Pulsar отлично подходит для полной технической поддержки таких вычислительных движков. Apache Pulsar предназначен для работы с облачной (cloud-native) инфраструктурой, а также сделан в расчёте на стратегии унифицированной пакетной обработки данных и работу с потоками.

1. Роль Apache Pulsar в конвейеризации потоковых данных


Apache Pulsar превосходно справляется с хранением потоков событий и с выполнением легковесных вычислительных задач, связанных с обработкой потоков. Он замечательно подходит для долгосрочного хранения данных, а также может применяться для сохранения результатов в каких-нибудь приложениях, расположенных ниже в конвейере.

Движок для унифицированной пакетной обработки требует поддержки со стороны унифицированного уровня хранилища для пакетных и потоковых операций — только так все возможности могут быть реализованы в полном объёме.

Иными словами, потоковая система должна не только перемещать данные в конвейерах, но и подолгу хранить данные в потоковой форме и служить единым источником истины для обрабатывающего движка.

В том хранилище с многоуровневой архитектурой, что применяется в Apache Pulsar, данные в потоковой форме можно хранить неопределённо долго.

Pulsar отлично подходит для долговременного хранения данных и может применяться для сохранения результатов в каком-либо приложении, расположенном ниже по конвейеру.

Apache Pulsar — это распределённый журнал, рассчитанный только на запись и легко поддающийся горизонтальному масштабированию. Топики или их сегменты можно распределять по множеству брокеров, поддерживая высокую пропускную способность благодаря параллельному чтению топиков на множестве машин и по множеству сегментов.

В топиках Pulsar также применяется архитектура, выстроенная на основе сегментов — то есть, в каждом из таких топиков также содержится последовательность сегментов, в которой отдельные сегменты периодически закрываются и становятся неизменяемыми.

Эти сегменты хранятся в особом механизме, поверх которого расположен Apache BookKeeper, но также могут выгружаться и в долгосрочные недорогие хранилища, например, в Amazon S3 — при этом активно используется многоуровневое хранилище Pulsar.

Благодаря такой гибкости, та архитектура хранилища данных, поверх которой расположен Pulsar, идеально подходит для пакетного доступа. Механизм пакетного доступа применим для доступа к архивным данным на выделенном уровне, а над этим уровнем расположится слой для должной потоковой обработки. Pulsar предлагает потоковые API, обеспечивающие как низкие задержки, так и высокую пропускную способность. В свою очередь, архитектура Pulsar приспособлена к поддержке современных инфраструктур данных и призвана унифицировать пакетную и потоковую обработку. Pulsar Functions — это легковесный фреймворк для бессерверной обработки потоков. Здесь обратите особое внимание на термин легковесный.

Итак, Apache Pulsar не относится к продвинутым движкам для потоковой обработки, но наделяет вас гибкостью, позволяющей реализовать много распространённых случаев потоковой обработки. Давайте немного пристальнее к ним присмотримся.

Pulsar Functions позволяет реализовать распространённые случаи работы с потоковыми данными, например:

  • Простые варианты агрегации данных, напр., счётчики.
  • Паттерны маршрутизации:


  1. Динамическая маршрутизация или маршрутизация на основе содержимого;
  2. Паттерн «Разветвитель» (Splitter).


  • Преобразование сообщений:


  1. Маскировка данных по соображениям безопасности;
  2. Насыщение данных из внешних систем;
  3. Валидация полезной нагрузки данных;
  4. Фильтрация содержимого.


  • Развёртывание и оценка моделей машинного обучения в режиме реального времени.


«Легковесность» Apache Pulsar также подразумевает, что он отлично подходит для граничной аналитики — в условиях ограниченности ресурсов такой фреймворк может быть очень полезен для работы с устройствами, а подобные аналитические возможности позволяют перейти прямо к сбору данных для решения простых аналитических задач — например, для одновариантного/многовариантного анализа временных рядов.

Но в тех практических случаях, когда требуются более замысловатые вычисления, например,

  • Группирование агрегатов
  • Объединение различных потоков данных
  • Продвинутая работа с временными окнами для выполнения вычислений
  • Поддержка продвинутых водяных знаков для обработки запаздывающих событий
  • Обработка больших объёмов состояния


приходится опираться на непростые движки для поддержки потоковых вычислений, например, Spark Structured Streaming и Apache Flink. Pulsar призван обеспечить отличную интеграцию с ними, так, чтобы можно было в полной мере использовать их продвинутые возможности.

2. Разбор практического случая: вовлечение пользователя в реальном времени


Теперь давайте подробнее (на примере) разберём, каковы роли всех этих различных компонентов в конвейере для потоковой обработки данных.
Представьте себе популярный интернет-магазин. Пользователь щёлкает мышью по различным товарам, добавляет позиции себе в корзину и (желательно) оформляет покупку тех или иных товаров. С точки зрения бизнеса, основанного на данных, можно представить несколько вещей, достижимых при активном использовании потоковой обработки:

  1. Если пользователь — это клиент-подписчик, собирающий баллы по программе лояльности. При совершении новой покупки мы, возможно, захотим рассчитать в режиме реального времени, сколько баллов лояльности собрал пользователь, а потом предложим ему большую скидку на следующую покупку.
  2. Если пользователь добавил в корзину несколько элементов, щёлкнул по корзине, но оформление заказа так и не завершил — то в ответ мы хотели бы послать электронное сообщение (может быть, с купоном на небольшую скидку, если данный пользователь не зарегистрирован) — и напомнить, что оформление заказа нужно завершить.


Вот примерное наглядное представление этого конвейера с данными:

image

У нас есть события, генерируемые на сайте в процессе работы, в том числе, информация о пользователе, информация о товаре, а также информация о пользовательских щелчках (соответствующих событиях). Обратите внимание, что топики с информацией о пользователе и о товаре компактифицируются. Это означает, что мы указываем ключ для топика, и такой ключ служит уникальным идентификатором каждого пользователя (или товара).

Нас интересуют только самые последние значения из этих записей, и именно это мы получаем из компактифицированного топика: последнее обновление для каждой записи, извлекаемое по ключу.

Рассмотрим задание Spark, объединяющее множество потоков данных. Теперь на секунду вообразите, что данные о пользователе или товаре мы будем хранить не в топиках Pulsar, а во внешней базе данных, например, в MySQL.

В таком сценарии, возможно, было бы целесообразнее применить Pulsar Function, которая читала бы записи и подыскивала во внешней системе необходимую информацию о пользователе и товаре, а затем подхватывала бы её.

3. Сочленение Apache Pulsar/Spark


Вернёмся к разбору нашего примера и подробнее рассмотрим реализацию корзины заказов, где пользователь забросил оформление покупки. При этом нам потребуется скомбинировать Apache Pulsar и Spark Structured Streaming.

Чтобы интегрировать Apache Pulsar и Apache Spark, воспользуемся библиотекой pulsar-spark connector, которую разработала и поддерживает StreamNative.

Первым делом мы должны настроить зависимости в рамках нашего проекта — так что давайте добавим в наш файл build.sbt следующий код:

lazy val sparkVersion       = "3.2.0"
lazy val circeVersion       = "0.14.1"
lazy val pulsar4sVersion    = "2.8.1"
lazy val pulsarSparkVersion = "3.1.1.3"

val ingestionDependencies = Seq(
  "com.clever-cloud.pulsar4s" % "pulsar4s-core_2.12"  % pulsar4sVersion,
  "com.clever-cloud.pulsar4s" % "pulsar4s-circe_2.12" % pulsar4sVersion
)

val analysisDependencies = Seq(
  "io.circe"                    %%  "circe-core"                  %   circeVersion,
  "io.circe"                    %%  "circe-generic"               %   circeVersion,
  "io.circe"                    %%  "circe-parser"                %   circeVersion,
  "org.apache.spark"            %%  "spark-sql"                   % sparkVersion,
  "io.streamnative.connectors"  %   "pulsar-spark-connector_2.12" % pulsarSparkVersion
)

libraryDependencies ++= ingestionDependencies ++ analysisDependencies


Допустим, внутри Pulsar у нас будут следующие события, связанные с щелчками мыши (для данного примера они сгенерированы синтетически):

----- got message -----
key:[536638900], properties:[], content:{"eventTime":1572559255,"eventType":"view","productId":1307519,"categoryId":2053013558920217191,"categoryCode":"computers.notebook","brand":"acer","price":"1209.55","userId":536638900,"userSession":"e1e8125d-da26-49ee-a6fa-78b3ff8dc341"}
----- got message -----
key:[532364121], properties:[], content:{"eventTime":1572559256,"eventType":"view","productId":12708937,"categoryId":2053013553559896355,"categoryCode":"","brand":"michelin","price":"72.72","userId":532364121,"userSession":"0a899268-31eb-46de-898d-09b2da950b24"}
----- got message -----
key:[513998949], properties:[], content:{"eventTime":1572559256,"eventType":"view","productId":50600085,"categoryId":2134905044833666047,"categoryCode":"auto.accessories.compressor","brand":"laston","price":"113.93","userId":513998949,"userSession":"a7b196d9-afe5-4dc8-9648-d578fef55abf"}
----- got message -----
key:[544501248], properties:[], content:{"eventTime":1572559256,"eventType":"view","productId":1003316,"categoryId":2053013555631882655,"categoryCode":"electronics.smartphone","brand":"apple","price":"928.38","userId":544501248,"userSession":"e330d051-37ad-4dc3-b1ee-ff16a28b7998"}
----- got message -----
key:[515240495], properties:[], content:{"eventTime":1572559256,"eventType":"view","productId":30000218,"categoryId":2127425436764865054,"categoryCode":"construction.tools.welding","brand":"magnetta","price":"254.78","userId":515240495,"userSession":"0253151d-5c84-4809-ba02-38ac405494e1"}
----- got message -----
key:[566280567], properties:[], content:{"eventTime":1572559258,"eventType":"view","productId":1004322,"categoryId":2053013555631882655,"categoryCode":"electronics.smartphone","brand":"huawei","price":"334.37","userId":566280567,"userSession":"8cd74350-34e7-423b-ab02-53108a89354b"}
----- got message -----
key:[559033632], properties:[], content:{"eventTime":1572559258,"eventType":"view","productId":22700084,"categoryId":2053013556168753601,"categoryCode":"","brand":"force","price":"244.28","userId":559033632,"userSession":"fe9544f7-0c09-4c85-a2f7-1d978a2710be"}
----- got message -----
key:[531148230], properties:[], content:{"eventTime":1572559259,"eventType":"view","productId":1004767,"categoryId":2053013555631882655,"categoryCode":"electronics.smartphone","brand":"samsung","price":"242.63","userId":531148230,"userSession":"57a91bce-a1fb-4609-bb26-d53430dc6618"}


На Scala это можно смоделировать в виде событий в следующей форме:

case class Event(userid: String,
                 eventTime: Long,
                 eventType: String,
                 productId: String,
                 categoryId: String,
                 categoryCode: String,
                 brand: String,
                 price: Double,
                 userSession: String)

object Event {
  def empty(): Event = Event("", 0L, "" , "", "", "", "", 0.0, "")
}


В нашей простой реализации, приводимой для примера, считаем, что процесс оформления заказа заброшен, если в рамках пользовательского сеанса у нас есть события cart, но нет событий purchase. Мы собираемся проталкивать события в Pulsar, а затем считывать и анализировать их при помощи.

Давайте разберём их по очереди.

3.1. Продьюсер: Pulsar


Сымитируем эти события так, как описано в следующей вводной статье по Pulsar. Сначала обустроим контейнер Docker, в котором будет выполняться Pulsar:

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --name pulsar \
  apachepulsar/pulsar:2.8.0 \
  bin/pulsar standalone

А в другом окне терминала создадим топик events, который будет принимать наши значения:

docker exec -it pulsar bash
bin/pulsar-admin topics create events 


Теперь в нашем приложении на Scala нужно настроить продьюсер Pulsar. Исходя из того, что мы сможем без труда выполнить синтаксический разбор наших данных (в данном случае — из файла с рандомизированными реалистичными данными), требуется предусмотреть необходимую обвязку для Pulsar:

import com.sksamuel.pulsar4s.{DefaultProducerMessage, EventTime, MessageId, ProducerConfig, PulsarClient, Topic}
import com.sksamuel.pulsar4s.circe._
import io.circe.generic.auto._
import io.circe.{Decoder, Encoder, Json, Printer}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.common.schema.{SchemaInfo, SchemaType}

lazy val topicName      = "events"
lazy val eventsFilePath = "src/main/resources/events.csv"
lazy val serviceUrl     = "pulsar://localhost:6650"
lazy val adminUrl       = "http://localhost:8080"
lazy val producerName   = "event-producer"
lazy val threadPoolSize = 4

// events parsed elsewhere
val events: List[Event] = loadEvents(eventsFilePath, withHeader = true)

val pulsarClient = PulsarClient(serviceUrl)
val topic = Topic(topicName)

val eventProducer = pulsarClient.producer[Event](
  ProducerConfig(
    topic,
    producerName = Some(producerName),
    enableBatching = Some(true),
    blockIfQueueFull = Some(true)
  )
)


После этого нам потребуется обработать все события из списка, асинхронно отправив их в Pulsar. Ниже в общих чертах показано, как примерно это будет выглядеть:

val messageIdFutures: Seq[Future[MessageId]] = events.map { event =>
  Thread.sleep(10)
  
  // marshal an event for Pulsar
  val message: DefaultProducerMessage[Event] = DefaultProducerMessage[Event](
    Some(event.userid),
    event,
    eventTime = Some(EventTime(event.eventTime)))
  
  //send it
  val messageIdFuture = eventProducer.sendAsync(message)

  // being quite verbose to track messages
  messageIdFuture.onComplete {
    case Success(id) => println(s"Ack received for message with id '$id'.")
    case Failure(exception) => println(s"Failed to sent message - Reason: $exception")
  }

  // keep the Future for analysis at the source, e.g. check message IDs
  messageIdFuture
}


Затем можно воспользоваться полученными в результате Futures для отслеживания ID сообщений и для проверки того, успешно ли прошла отправка, т. д. В данном демо-примере мы просто закроем приложение, как только эта работа будет завершена.

Future.sequence(messageIdFutures) // turn the Seq[Future[...]] into Future[Seq[...]]
  .andThen {
    case Success(_) => println("Producer finished sending event records.")
    case Failure(e) => println(s"A failure occurred. Check the logs and the stack trace: $e")
  }
  .andThen { 
    case _ => eventProducer.close()
  }


3.2. Потребитель: Spark Structured Streaming


Spark Structured Streaming — это движок для потоковых вычислений, предоставляющий более продвинутые возможности, которые пригодятся нам в нашем практическом случае:

  • Поддержка сеансовых окон — можно создавать сеансовые окна, основываясь на идентификаторах eventTime и userSession.
  • Поддержка агрегирования groupBy — мы хотим агрегировать eventTypes в пользовательских сеансовых окнах.
  • Поддержка водяных знаков — позволяет обрабатывать запаздывающие события.
  • Фильтрация по сложным типам данных — фильтруем строки в зависимости от фильтрационного предиката, выставленного для списка.


Когда все зависимости будут на месте, давайте сначала подключимся к Pulsar и начнём считывать события из топика events.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

val clickEventsDF = spark
        .readStream
        .format("pulsar")
        .option("service.url", "pulsar://localhost:6650")
        .option("admin.url", "http://localhost:8080")
        .option("topic", "events")
        // the following two configs are optional for this demo but important if you're doing authenticated data access
        .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
        .option("pulsar.client.authParams","token:you-token<>")
        .load()


Можно с лёгкостью подключиться к Pulsar при помощи Spark через соединительные URL, имя входного топика, а также воспользовавшись безопасным методом на ваш выбор, если активирован механизм безопасности. Полный список доступных конфигурационных опций приводится в Github коннектора Pulsar.

Когда соединение установлено, преобразуем данные в такой пользовательский тип, который удобно анализировать:

case class AnalysisEvent(userSession: String, userId: String, eventTime: Timestamp, eventType: String)

import spark.implicits._

val parsedEvents = clickEventsDF
  .selectExpr("CAST(value AS STRING)").as[String]
  .map { line =>
    val event = io.circe.parser.decode[Event](line).getOrElse(Event.empty())
    AnalysisEvent(event.userSession, event.userid, new Timestamp(event.eventTime), event.eventType)
  }.as[AnalysisEvent]


Имея такой Dataset, теперь можно примерно набросать, как будет выглядеть реализация:

  • Отбрасываем события старше 30 минут
  • Используем сеансовые окна длительностью по 80 минут, в Spark 3.2 такая возможность предоставляется автоматически
  • Агрегируем события по их типу
  • Ищем события, связанные с использованием корзины заказов
val checkoutEvents = parsedEvents
  .withWatermark("eventTime", "30 minutes")
  .groupBy(col("userSession"), col("userId"),  session_window(col("eventTime"), "80 minutes"))
  .agg(collect_list("eventType").as("eventTypes"))
  .filter(array_contains(col("eventTypes"),"cart"))
  .select(
    col("session_window.start").as("sessionWindowStart"),
    col("session_window.end").as("sessionWindowEnd"),
    col("userId"),
    col("userSession"),
    col("eventTypes")
  )


Вывод должен выглядеть примерно так:

+-------------------+-------------------+---------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------+
|sessionWindowStart |sessionWindowEnd   |userId   |userSession                         |eventTypes                                                                                                                    |
+-------------------+-------------------+---------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------+
|2019-11-01 02:21:03|2019-11-01 02:56:28|564133858|ef95cac1-6580-44f0-a10b-c7e836982154|[view, view, cart, cart, purchase, view, view, view, cart, view]                                                    |
|2019-11-01 00:59:55|2019-11-01 01:34:27|566286947|b86c0313-d0cd-4222-888f-2ed03a1d697f|[view, view, view, cart, purchase, view]                                                                                      |
|2019-11-01 02:22:39|2019-11-01 02:54:45|528271890|b425dd2d-dae4-4915-8e8f-b1d4d2717471|[view, cart, view, view]                                                                                                      |
|2019-11-01 01:49:20|2019-11-01 02:21:31|553414803|5006939e-fbe7-4e1c-809c-ffa3b16eb20c|[view, view, view, cart, view]                                                                                                |
|2019-11-01 01:54:59|2019-11-01 02:29:24|556325945|ad789442-8756-41d7-b05a-11b90124032d|[view, view, view, cart, purchase, view, cart, cart, cart, cart, cart, cart, cart, cart, cart, cart, cart]                    |
|2019-11-01 00:37:42|2019-11-01 01:14:30|549630515|8847ab0c-1f0b-42fb-9ff4-f44b9f523b4b|[view, view, view, view, view, cart, view, view, view, view, cart, view, view, view]                                          |
|2019-11-01 00:13:32|2019-11-01 00:44:49|563558500|e0729b6c-eafe-4b0f-9d66-6ee777d08488|[view, cart, view, view, view, view, view]                                                                                    |
|2019-11-01 02:15:30|2019-11-01 02:49:24|512411198|64a9195f-e4ee-448f-9241-9b4d23467f5d|[view, cart, view, view, cart, view]                                                                                          |
|2019-11-01 01:44:05|2019-11-01 02:15:30|515742799|ca33e1b9-ecf5-4b50-ba76-007248bad43d|[view, cart, cart, cart, cart, view]                                                                                          |
|2019-11-01 00:41:39|2019-11-01 01:12:36|557332447|70c0ccdf-9452-49cc-bfa5-568096d64680|[view, cart, view]                                                                                                            |
|2019-11-01 01:40:47|2019-11-01 02:34:50|520761071|6dad05da-a718-4f05-92bc-1ed9796404cd|[view, view, view, view, view, view, view, view, view, view, view, cart, purchase]                                            |
|2019-11-01 00:37:04|2019-11-01 01:08:52|562200921|ca5a71f1-33c8-4fcd-b793-5fcea6f260c0|[view, cart, purchase, view]                                                                                                  |
|2019-11-01 00:10:07|2019-11-01 00:46:10|516426931|ef4867dd-b922-4d92-abec-9a75acb2b769|[view, view, view, cart, purchase, view, cart, cart, view, view]                                                    |
|2019-11-01 02:07:33|2019-11-01 02:45:30|554459781|c43b43dd-dc54-4d1c-bfd8-3bcbdfb94870|[view, cart, purchase, view, view]                                                                                            |
|2019-11-01 02:11:34|2019-11-01 02:52:13|539100846|365b1088-8a4f-47e4-a6e9-e66d56b1b998|[view, cart, cart, cart, view, cart, view, view, view, view, view]                                                            |
|2019-11-01 01:34:14|2019-11-01 02:08:51|519456951|f2136bd5-a50d-4e05-8a90-27b288065459|[view, cart, cart, purchase]                                                                                                  |
|2019-11-01 01:47:49|2019-11-01 02:26:31|515561413|98a12974-b589-48fe-b6f5-a55cd844cdd8|[view, view, view, view, cart, view]                                                                                          |
|2019-11-01 01:29:06|2019-11-01 02:00:39|518913698|afa1ad69-55bb-4ef8-9d02-6648596ca5ec|[view, cart, purchase, view]                                                                                                  |
|2019-11-01 01:53:53|2019-11-01 02:36:26|556601011|45658f52-9e11-45fa-a8d6-9414d349aa4d|[view, view, view, view, view, view, cart, cart, view, view, view, view, view, view, view, view, view, view, view, view, view]|
|2019-11-01 01:01:28|2019-11-01 01:34:04|538178630|9cf447dc-7aa8-47f2-8c3b-44318cf5608a|[view, view, view, cart, cart, cart, purchase, view]                                                                          |
+-------------------+-------------------+---------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------+


Некоторые замечания по поводу сеансовых окон:

  • Размер сеансового окна подбирается автоматически в зависимости от длины окна, которая коррелирует с длиной вводимой информации.
  • Сеансовое окно запускается с началом ввода и расширяется в случае, если в пределах заданного промежутка в него поступает новый ввод.
  • Когда такой промежуток задан статически, сеансовое окно закрывается, если заданный промежуток времени истёк, а нового ввода (с момента получения последнего ввода) так и не поступило.


При использовании сеансовых окон и группировании их по пользовательским сеансам, можно без труда проверить, есть ли у конкретного пользователя события работы с корзиной заказов при отсутствии событий покупки. Так можно легко отфильтровать события, трактуемые нами как незавершенная работа с корзиной и перенаправить их в нижележащий топик для дальнейшей обработки. Приводя пример действия, которое здесь было бы уместно, допустим, что пользователю можно было бы направить сообщение с напоминанием (например, через мобильное сообщение, если таковое уместно, или по электронной почте) — по истечении некоторого времени, например, через час.

3.3. Синтаксический разбор событий


Этот фрагмент определённо находится на периферии статьи, так как статья посвящена Pulsar и Spark, поэтому для вашего удобства привожу следующий простой код. С его помощью были разобраны события, сведённые в этом csv-файле.

import java.sql.Timestamp
import java.util.concurrent.{ExecutorService, Executors}
import scala.concurrent.{ExecutionContext, Future}
import scala.io.BufferedSource
import scala.util.{Failure, Success, Try}

def loadEvents(path: String, withHeader: Boolean = false): List[Event] = {
  val r = Try(scala.io.Source.fromFile(path))
    .map(source => sourceHandler(source, withHeader))
    .fold(_ => List.empty, identity)

  val result = Try(scala.io.Source.fromFile(path)) match {
    case Success(source) =>
      Some(sourceHandler(source, withHeader))
    case Failure(exception) =>
      println(s"Failed to read file '$path' - Reason: $exception")
      None
  }

  result.getOrElse(List.empty)
}

private def sourceHandler(source: BufferedSource, withHeader: Boolean): List[Event] = {
  val events: List[Event] = source.getLines()
    .map(toEvent)
    .toList

  if (withHeader) events.drop(1) else events
}

private def toEvent(line: String): Event = {
  Try {
    val tokens = line.split(",")
    val eventTime = Timestamp.valueOf(tokens(0).replace(" UTC", ""))

    Event(
      tokens(7),
      eventTime.getTime,
      tokens(1),
      tokens(2),
      tokens(3),
      tokens(4),
      tokens(5),
      tokens(6).toDouble,
      tokens(8)
    )
  }.getOrElse(Event.empty())
}


4. Заключение


В этой статье мы рассмотрели Apache Pulsar в качестве «хребта» современной инфраструктуры данных, разобрали, какие варианты работы с потоковыми данными может поддерживать Pulsar, а также как его можно использовать в сочетании со Spark Structured Streaming для реализации сравнительно сложных вариантов потоковой обработки, требующих привлечения.
Наконец, мы разобрали реалистичный практический случай, в котором представили образец конвейера потоковых данных и поговорили о том, какие роли в этом конвейере играют Apache Pulsar и Spark Structured Streaming.

mxuanbovcusqgmqdgugvpnql8vq.jpeg

© Habrahabr.ru