Как обрабатывать объекты Avro с помощью датасетов Spark 3.2 & Scala 2.12
Привет!
В этом посте разберем, как обрабатывать объекты JVM, сгенерированные из схем Avro, в датасетах Spark. Вместе с этим рассмотрим, как организовать код при помощи шаблона функционального программирования «типированный класс» (type class) на языке Scala.
Датафрейм в Spark — абстракция таблицы со столбцами и строками, покрывает большинство кейсов обработки данных. Тем не менее, бывает так, что данные имеют настолько специфичную структуру, что привести их к общему знаменателю в датафрейме достаточно сложно. В этом случае можно прибегнуть к программному интерфейсу Spark Dataset. Он позволяет манипулировать не абстрактными строками и столбцами, а конкретными объектами виртуальной машины Java.
Типичный кейс использования датасетов — чтение сообщений Kafka в формате Avro.
Процесс состоит из 4 этапов:
Генерация классов-образцов (case class) исходя из схемы Avro-сообщения.
Чтение Avro-сообщений в Dataset Spark, типированный классом-образцом.
Реализация операции обработки сообщения. В частности, напишем функцию, которая приведет сообщение к плоской структуре.
С кодом можно ознакомиться здесь.
Генерация классов-образцов Scala
Представим, что нам необходимо обрабатывать данные клиента со следующей структурой:
Модель данных клиента
Клиент (Customer
) может иметь несколько аккаунтов. Аккаунт, в свою очередь, имеет несколько групп взаимодействия (InteractionGroups
). Группа взаимодействия содержит в себе взаимодействия, совершенные клиентом в определенную дату. Взаимодействие имеет один из типов, обозначающий канал взаимодействия: сайт, твиттер, инстаграм или звонок.
Отступление
Наверняка, у вас возникли претензии к этой модели :) Это справедливо. Данные специально структурированы «криво», чтобы смоделировать реальный кейс. Зачастую необходимо обрабатывать данные из устаревших систем, структуру которых мы изменить не можем. Приходится работать с тем, что есть.
Схема сообщения Avro, соответствующая данной модели, будет выглядеть так:
{
"type": "record",
"name": "Customer",
"namespace": "com.github.aleksandrachasch.avro",
"fields": [
{
"name": "Id",
"type": "string"
},
{
"name": "Name",
"type": "string"
},
{
"name": "Surname",
"type": "string"
},
{
"name": "Accounts",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "Account",
"namespace": "com.github.aleksandrachasch.avro",
"fields": [
{
"name": "AccountId",
"type": "int"
},
{
"name": "AccountType",
"type": "string"
},
{
"name": "InteractionGroups",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "InteractionGroup",
"namespace": "com.github.aleksandrachasch.avro",
"fields": [
{
"name": "Id",
"type": "string"
},
{
"name": "date",
"type": "string"
},
{
"name": "interactions",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "Interaction",
"namespace": "com.github.aleksandrachasch.avro",
"fields": [
{
"name": "Id",
"type": "string"
},
{
"name": "type",
"type": {
"type": "enum",
"name": "InteractionType",
"namespace": "com.github.aleksandrachasch.avro",
"symbols": [
"WEBSITE",
"TWITTER",
"CALL",
"INSTAGRAM"
]
}
}
]
}
}
}
]
}
}
}
]
}
}
}
]
}
Теперь из этой схемы можно сгенерировать классы-образцы Scala. Затем мы будем оперировать экземплярами этих классов.
В реальной жизни процесс генерации классов-образцов выглядит примерно так:
Чтение схемы Avro. Например, из
Confluent Schema Registry
— регистра cхем топиков кластера Kafka, к которому можно обращаться через API.Генерация классов-образцов в фазе компиляции
Maven
илиSBT
. Сгенерированные классы при этом копируются прямо вsrc/
проекта. В случае обновления схемы в регистре мы увидим изменения в нашем проекте. Будет понятно уже в фазе компиляции, насколько совместимы эти изменения с существующим кодом.
В этом проекте будем использовать Maven
и avrohugger-maven-plugin
. Плагин использует библиотеку AvroHugger для генерации классов Scala.
В pom.xml
добавляем плагин:
at.makubi.maven.plugin
avrohugger-maven-plugin
1.6
${basedir}/src/main/resources
${basedir}/src/main/scala
STANDARD
generate-sources
generate-scala-sources
В конфигурации необходимо указать путь к файлу с схемой Avro (по умолчанию src/main/avro
), а таже путь к сгенерированным классам (по умолчанию target/generated-sources/avro
):
${basedir}/src/main/resources
${basedir}/src/main/scala
Внимание! сгенерированные файлы перепишут уже существующие классы-образцы в папке src/main/scala.
Также указываем тип записи Avro (SpecificRecord
или GenericRecord
).
По умолчанию плагин генерирует специфический тип записи (SpecificRecord
). В этом случае все классы-образцы будут наследовать абстрактный класс org.apache.avro.specific.SpecificRecordBase
. Помимо самого класса-образца и аргументов создадутся методы чтения и записи новых значений атрибутов. Рекомендуется использовать этот тип записи, если планируется обрабатывать данные специфичными Avro-функциями, использующими Avro Specific API
.
В нашем случае мы сами будем писать функцию обработки, поэтому подойдет GenericRecord
:
STANDARD
Этот тип записи генерирует обычные классы-образцы, без дополнительных методов и абстракций.
Выполняем команду Maven
:
mvn avrohugger:generate-scala-sources
В папке src/main/scala
появятся пять сгенерированных классов-образцов, соответствующих пяти объектам данных.
Сгенерированные классы-образцы
Классы принадлежат пакету com.github.aleksandrachasch.avro
, который указан в поле namespace
в схеме Avro.
Вот как выглядит, например, класс Customer
:
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.github.aleksandrachasch.avro
case class Customer(Id: String, Name: String, Surname: String, Accounts: Seq[Account])
Чтение Avro в Spark Dataset
Теперь создадим датасет Spark, типированный сгенерированным классом Customer
.
В качестве примера создадим клиента с несколькими акканутами и взаимодействиями:
{
"Id": "1",
"Name": "John",
"Surname": "Doe",
"Accounts": [
{
"AccountId": 111,
"AccountType": "private",
"InteractionGroups": [
{
"Id": 222,
"date": "20220102",
"interactions": [
{
"Id": "1",
"type": "WEBSITE"
},
{
"Id": "2",
"type": "INSTAGRAM"
}
]
},
{
"Id": 223,
"date": "20220103",
"interactions": [
{
"Id": "4",
"type": "TWITTER"
}
]
}
]
},
{
"AccountId": 112,
"AccountType": "public",
"InteractionGroups": [
{
"Id": 333,
"date": "20220105",
"interactions": [
{
"Id": "6",
"type": "TWITTER"
},
{
"Id": "7",
"type": "INSTAGRAM"
}
]
}
]
}
]
}
Avro
— это бинарный формат данных. Именно так будет выглядеть сообщение, если его десереализовать, применив соответствующую cхему.
Создадим датасет:
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import com.github.aleksandrachasch.avro.Customer
import spark.implicits._
val ds = spark.read.format("json")
.schema(ScalaReflection.schemaFor[Customer].dataType.asInstanceOf[StructType])
.option("mode", "FAILFAST")
.option("multiline", true)
.load("src/main/resources/customer-data.json")
.as[Customer]
Типом переменной ds
будет Dataset[Customer]
.
Если входные данные не соответствуют классам-образцам, то появится ошибка во время выполнения (runtime). Например, если ввести неcуществующий тип взаимодействия InteractionType
:
"interactions": [
{
"Id": "6",
"type": "TWITTER"
},
{
"Id": "7",
"type": "VK"
}
]
Caused by: java.util.NoSuchElementException: No value found for 'VK'
at scala.Enumeration.withName$2(Enumeration.scala:125)
at scala.Enumeration.withName(Enumeration.scala:125)
at com.github.aleksandrachasch.avro.InteractionType.withName(InteractionType.scala)
Создание функции обработки класса-образца с помощью шаблона программирования type class
Допустим, нам необходимо привести информацию о взаимодействиях клиента к плоской структуре:
Модель данных FlattenedCustomer
Создадим класс-образец, соответствующий этой структуре:
case class FlattenedCustomer(CustomerId: String, Name: String, Surname: String,
AccountId: Int, AccountType: String,
InteractionGroupId: String, InteractionGroupDate: String,
InteractionId: String, InteractionType : String)
Задача сводится к тому, чтобы преобразовать Dataset[Customer]
в Dataset[FlattenedCustomer]
. Тогда соответствующая функция будет брать в качестве аргумента объект Customer
и возвращать список элементов FlattenedCustomer
— по одному элементу на каждое взаимодействие:
def customerFlatten(c : Customer): Seq[FlattenedCustomer] = ???
Возникает вопрос, где эту функцию реализовать. Можно было бы добавить ее в качестве метода к классу Customer
:
case class Customer(Id: String, Name: String, Surname: String, Accounts: Seq[Account]) {
def customerFlatten: Seq[FlattenedCustomer] = ???
}
Но, в связи с тем, что классы-образцы генерируются автоматически, каждая новая генерация удалит созданный вручную метод. Вообще, не рекомендуется изменять классы и файлы, созданные автоматически.
С другой стороны, можно реализовать функцию в отдельном объекте и импортировать ее при обработке датасета.
Однако, есть более лаконичный и функциональный способ — типированный класс (type class).
Type class — это шаблон функционального программирования, который позволяет создавать «ad hoc» полиморфизм на объектах, реализацию которых модифицировать нельзя. Например, если вы имеете дело со сторонней библиотекой, которую необходимо расширить вашим собственным функционалом.
В нашем случае «сторонняя библиотека» — это автоматически сгенерированные классы-образцы, которые мы не можем изменять.
Шаблон type class обеспечит четкое раделение кода, зависящего от внешних источников и программ (схемы Avro), и собственного ключевого функционала нашей программы.
К тому же шаблон позволяет легко добавлять методы обработки других объектов.
Шаблон type class состоит из нескольких элементов:
Трейт с методом
customerFlatten
и параметрами входа (T
) и выхода (C
):trait DataProcessor[T, C] { def customerFlatten(t: T): Seq[C] }
Объект-компаньон со вспомогательными функциями:
object DataProcessor { def apply[T, C](implicit processor: DataProcessor[T, C]): DataProcessor[T, C] = processor implicit class DataProcessorOps[T, C](val t : T) extends AnyVal { def customerFlatten(implicit processor: DataProcessor[T, C]): Seq[C] = { processor.customerFlatten(t) } } }
Функция
apply
, в зависимости от типовT
иС
, возвращает соответствующий экземплярDataProcessor
. Это происходит за счет неявного (implicit
) параметраprocessor
. Например, при вызове функцииapply[Customer, FlattenedCustomer]
компилятор Scala автоматически найдет и вернет экземпляр классаDataProcessor[Customer, FlattenedCustomer]
.За счёт того же механизма неявных параметров неявный класс
DataProcessorOps
позволяeт вызывать функциюcustomerFlatten
на экземплярах классаT
, как если бы он был собственным методом классаT
(например,t.customerFlatten
).Наконец, реализация
DataProcessor
для конкретных объектовCustomer
иFlattenedCustomer
:package com.github.aleksandrachasch.avro.ops object CustomerDataProcessor { implicit val customerDataProcessor: DataProcessor[Customer, FlattenedCustomer] = (t: Customer) => t.Accounts.flatMap(acc => acc.InteractionGroups.flatMap( group => group.interactions.map( interaction => FlattenedCustomer.apply(t.Id, t.Name, t.Surname, acc.AccountId, acc.AccountType, group.Id, group.date, interaction.Id, interaction.`type`.toString) ) ) ) }
Реализованная функция автоматически соотносится с функцией
customerFlatten,
т.к. типы параметра (Customer
) и выхода (Seq[FlattenedCustomer]
) совпадают.Как можно заметить, в самой функции оперируем атрибутами классов-образцов. В датафрейме это были бы названия столбцов и вложенных полей. В отличие от датафрейма, здесь ошибки будут видны уже в ходе компиляции.
Использование функции на датасете получается кратким и лаконичным:
import com.github.aleksandrachasch.avro.ops.CustomerDataProcessor._
val flattenedDs = ds.flatMap(_.customerFlatten)
flattenedDs.show(false)
flattenedDs
Как можно заметить, в коде нет декларации новых экземпляров классов (new
). Механизм неявных параметров и классов (implicit
) позволяет минимизировать количество экземпляров в JVM, а также различных переменных в коде. Нет необходимости собственноручно инициализировать классы — за нас это делает Scala.
Для обработки другого объекта достаточно реализовать DataProcessor[T, C]
с соответствующими типами входа T
и выхода C
.
Надеюсь, этот пост был вам полезен.
Кстати, уже в конце февраля в OTUS стартует новый поток курса DataOps. В преддверии старта курса приглашаю всех желающих на бесплатный демоурок. Регистрация доступна по ссылке.
До встречи!