Как обрабатывать объекты Avro с помощью датасетов Spark 3.2 & Scala 2.12

image-loader.svg

Привет!

В этом посте разберем, как обрабатывать объекты JVM, сгенерированные из схем Avro, в датасетах Spark. Вместе с этим рассмотрим, как организовать код при помощи шаблона функционального программирования «типированный класс» (type class) на языке Scala.

Датафрейм в Spark — абстракция таблицы со столбцами и строками, покрывает большинство кейсов обработки данных. Тем не менее, бывает так, что данные имеют настолько специфичную структуру, что привести их к общему знаменателю в датафрейме достаточно сложно. В этом случае можно прибегнуть к программному интерфейсу Spark Dataset. Он позволяет манипулировать не абстрактными строками и столбцами, а конкретными объектами виртуальной машины Java.

Типичный кейс использования датасетов — чтение сообщений Kafka в формате Avro.

Процесс состоит из 4 этапов:

  1. Генерация классов-образцов (case class) исходя из схемы Avro-сообщения.

  2. Чтение Avro-сообщений в Dataset Spark, типированный классом-образцом.

  3. Реализация операции обработки сообщения. В частности, напишем функцию, которая приведет сообщение к плоской структуре.

С кодом можно ознакомиться здесь.

Генерация классов-образцов Scala

Представим, что нам необходимо обрабатывать данные клиента со следующей структурой:

Модель данных клиентаМодель данных клиента

Клиент (Customer) может иметь несколько аккаунтов. Аккаунт, в свою очередь, имеет несколько групп взаимодействия (InteractionGroups). Группа взаимодействия содержит в себе взаимодействия, совершенные клиентом в определенную дату. Взаимодействие имеет один из типов, обозначающий канал взаимодействия: сайт, твиттер, инстаграм или звонок.

Отступление

Наверняка, у вас возникли претензии к этой модели :) Это справедливо. Данные специально структурированы «криво», чтобы смоделировать реальный кейс. Зачастую необходимо обрабатывать данные из устаревших систем, структуру которых мы изменить не можем. Приходится работать с тем, что есть.

Схема сообщения Avro, соответствующая данной модели, будет выглядеть так:

{
  "type": "record",
  "name": "Customer",
  "namespace": "com.github.aleksandrachasch.avro",
  "fields": [
    {
      "name": "Id",
      "type": "string"
    },
    {
      "name": "Name",
      "type": "string"
    },
    {
      "name": "Surname",
      "type": "string"
    },
    {
      "name": "Accounts",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "Account",
          "namespace": "com.github.aleksandrachasch.avro",
          "fields": [
            {
              "name": "AccountId",
              "type": "int"
            },
            {
              "name": "AccountType",
              "type": "string"
            },
            {
              "name": "InteractionGroups",
              "type": {
                "type": "array",
                "items": {
                  "type": "record",
                  "name": "InteractionGroup",
                  "namespace": "com.github.aleksandrachasch.avro",
                  "fields": [
                    {
                      "name": "Id",
                      "type": "string"
                    },
                    {
                      "name": "date",
                      "type": "string"
                    },
                    {
                      "name": "interactions",
                      "type": {
                        "type": "array",
                        "items": {
                          "type": "record",
                          "name": "Interaction",
                          "namespace": "com.github.aleksandrachasch.avro",
                          "fields": [
                            {
                              "name": "Id",
                              "type": "string"
                            },
                            {
                              "name": "type",
                              "type": {
                                "type": "enum",
                                "name": "InteractionType",
                                "namespace": "com.github.aleksandrachasch.avro",
                                "symbols": [
                                  "WEBSITE",
                                  "TWITTER",
                                  "CALL",
                                  "INSTAGRAM"
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            }
          ]
        }
      }
    }
  ]
}

Теперь из этой схемы можно сгенерировать классы-образцы Scala. Затем мы будем оперировать экземплярами этих классов.

В реальной жизни процесс генерации классов-образцов выглядит примерно так:

  1. Чтение схемы Avro. Например, из Confluent Schema Registry— регистра cхем топиков кластера Kafka, к которому можно обращаться через API.

  2. Генерация классов-образцов в фазе компиляции Maven или SBT . Сгенерированные классы при этом копируются прямо в src/ проекта. В случае обновления схемы в регистре мы увидим изменения в нашем проекте. Будет понятно уже в фазе компиляции, насколько совместимы эти изменения с существующим кодом.

В этом проекте будем использовать Maven и avrohugger-maven-plugin. Плагин использует библиотеку AvroHugger для генерации классов Scala.

В pom.xml добавляем плагин:

  
  at.makubi.maven.plugin  
  avrohugger-maven-plugin  
  1.6  
      
    ${basedir}/src/main/resources    
    ${basedir}/src/main/scala       
    STANDARD
    
      
          
      generate-sources      
              
        generate-scala-sources      
          
      
  

В конфигурации необходимо указать путь к файлу с схемой Avro (по умолчанию src/main/avro), а таже путь к сгенерированным классам (по умолчанию target/generated-sources/avro):

${basedir}/src/main/resources    
${basedir}/src/main/scala  

Внимание! сгенерированные файлы перепишут уже существующие классы-образцы в папке src/main/scala.

Также указываем тип записи Avro (SpecificRecord или GenericRecord).

По умолчанию плагин генерирует специфический тип записи (SpecificRecord). В этом случае все классы-образцы будут наследовать абстрактный класс org.apache.avro.specific.SpecificRecordBase. Помимо самого класса-образца и аргументов создадутся методы чтения и записи новых значений атрибутов. Рекомендуется использовать этот тип записи, если планируется обрабатывать данные специфичными Avro-функциями, использующими Avro Specific API.

В нашем случае мы сами будем писать функцию обработки, поэтому подойдет GenericRecord:

STANDARD

Этот тип записи генерирует обычные классы-образцы, без дополнительных методов и абстракций.

Выполняем команду Maven:

mvn avrohugger:generate-scala-sources

В папке src/main/scala появятся пять сгенерированных классов-образцов, соответствующих пяти объектам данных.

Сгенерированные классы-образцыСгенерированные классы-образцы

Классы принадлежат пакету com.github.aleksandrachasch.avro, который указан в поле namespace в схеме Avro.

Вот как выглядит, например, класс Customer:

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.github.aleksandrachasch.avro

case class Customer(Id: String, Name: String, Surname: String, Accounts: Seq[Account])

Чтение Avro в Spark Dataset

Теперь создадим датасет Spark, типированный сгенерированным классом Customer.

В качестве примера создадим клиента с несколькими акканутами и взаимодействиями:

{
  "Id": "1",
  "Name": "John",
  "Surname": "Doe",
  "Accounts": [
    {
      "AccountId": 111,
      "AccountType": "private",
      "InteractionGroups": [
        {
          "Id": 222,
          "date": "20220102",
          "interactions": [
            {
              "Id": "1",
              "type": "WEBSITE"
            },
            {
              "Id": "2",
              "type": "INSTAGRAM"
            }
          ]
        },
        {
          "Id": 223,
          "date": "20220103",
          "interactions": [
            {
              "Id": "4",
              "type": "TWITTER"
            }
          ]
        }
      ]
    },
    {
      "AccountId": 112,
      "AccountType": "public",
      "InteractionGroups": [
        {
          "Id": 333,
          "date": "20220105",
          "interactions": [
            {
              "Id": "6",
              "type": "TWITTER"
            },
            {
              "Id": "7",
              "type": "INSTAGRAM"
            }
          ]
        }
      ]
    }
  ]
}

Avro — это бинарный формат данных. Именно так будет выглядеть сообщение, если его десереализовать, применив соответствующую cхему.

Создадим датасет:

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType

import com.github.aleksandrachasch.avro.Customer

import spark.implicits._

val ds = spark.read.format("json")
	.schema(ScalaReflection.schemaFor[Customer].dataType.asInstanceOf[StructType])
	.option("mode", "FAILFAST")
	.option("multiline", true)
	.load("src/main/resources/customer-data.json")
	.as[Customer]

Типом переменной ds будет Dataset[Customer].

Если входные данные не соответствуют классам-образцам, то появится ошибка во время выполнения (runtime). Например, если ввести неcуществующий тип взаимодействия InteractionType:

"interactions": [
  {
    "Id": "6",
    "type": "TWITTER"
  },
  {
    "Id": "7",
    "type": "VK"
  }
]
Caused by: java.util.NoSuchElementException: No value found for 'VK'
	at scala.Enumeration.withName$2(Enumeration.scala:125)
	at scala.Enumeration.withName(Enumeration.scala:125)
	at com.github.aleksandrachasch.avro.InteractionType.withName(InteractionType.scala)

Создание функции обработки класса-образца с помощью шаблона программирования type class

Допустим, нам необходимо привести информацию о взаимодействиях клиента к плоской структуре:

Модель данных FlattenedCustomerМодель данных FlattenedCustomer

Создадим класс-образец, соответствующий этой структуре:

case class FlattenedCustomer(CustomerId: String, Name: String, Surname: String,
                             AccountId: Int, AccountType: String,
                             InteractionGroupId: String, InteractionGroupDate: String,
                             InteractionId: String, InteractionType : String)

Задача сводится к тому, чтобы преобразовать Dataset[Customer] в Dataset[FlattenedCustomer]. Тогда соответствующая функция будет брать в качестве аргумента объект Customer и возвращать список элементов FlattenedCustomer— по одному элементу на каждое взаимодействие:

def customerFlatten(c : Customer): Seq[FlattenedCustomer] = ???

Возникает вопрос, где эту функцию реализовать. Можно было бы добавить ее в качестве метода к классу Customer:

case class Customer(Id: String, Name: String, Surname: String, Accounts: Seq[Account]) {
	def customerFlatten: Seq[FlattenedCustomer] = ???
}

Но, в связи с тем, что классы-образцы генерируются автоматически, каждая новая генерация удалит созданный вручную метод. Вообще, не рекомендуется изменять классы и файлы, созданные автоматически.

С другой стороны, можно реализовать функцию в отдельном объекте и импортировать ее при обработке датасета.

Однако, есть более лаконичный и функциональный способ — типированный класс (type class).

Type class — это шаблон функционального программирования, который позволяет создавать «ad hoc» полиморфизм на объектах, реализацию которых модифицировать нельзя. Например, если вы имеете дело со сторонней библиотекой, которую необходимо расширить вашим собственным функционалом.

В нашем случае «сторонняя библиотека» — это автоматически сгенерированные классы-образцы, которые мы не можем изменять.

Шаблон type class обеспечит четкое раделение кода, зависящего от внешних источников и программ (схемы Avro), и собственного ключевого функционала нашей программы.

К тому же шаблон позволяет легко добавлять методы обработки других объектов.

Шаблон type class состоит из нескольких элементов:

  1. Трейт с методом customerFlatten и параметрами входа (T) и выхода (C):

    trait DataProcessor[T, C] {
    	def customerFlatten(t: T): Seq[C]
    }
  1. Объект-компаньон со вспомогательными функциями:

    object DataProcessor {
      def apply[T, C](implicit processor: DataProcessor[T, C]): DataProcessor[T, C] =
        processor
      implicit class DataProcessorOps[T, C](val t : T) extends AnyVal {
        def customerFlatten(implicit processor: DataProcessor[T, C]): Seq[C] = {
          processor.customerFlatten(t)
        }
      }
    }

    Функция apply, в зависимости от типов T и С, возвращает соответствующий экземпляр DataProcessor. Это происходит за счет неявного (implicit) параметра processor. Например, при вызове функции apply[Customer, FlattenedCustomer] компилятор Scala автоматически найдет и вернет экземпляр класса DataProcessor[Customer, FlattenedCustomer].

    За счёт того же механизма неявных параметров неявный класс DataProcessorOps позволяeт вызывать функцию customerFlatten на экземплярах класса T, как если бы он был собственным методом класса T (например, t.customerFlatten).

  2. Наконец, реализация DataProcessor для конкретных объектов Customer и FlattenedCustomer:

    package com.github.aleksandrachasch.avro.ops
    
    object CustomerDataProcessor {
    
      implicit val customerDataProcessor: DataProcessor[Customer, FlattenedCustomer] =
        (t: Customer) => t.Accounts.flatMap(acc => acc.InteractionGroups.flatMap(
          group => group.interactions.map(
            interaction => FlattenedCustomer.apply(t.Id, t.Name, t.Surname, acc.AccountId, acc.AccountType,
              group.Id, group.date, interaction.Id, interaction.`type`.toString)
          )
        )
        )
    }

    Реализованная функция автоматически соотносится с функцией customerFlatten, т.к. типы параметра (Customer) и выхода (Seq[FlattenedCustomer]) совпадают.

    Как можно заметить, в самой функции оперируем атрибутами классов-образцов. В датафрейме это были бы названия столбцов и вложенных полей. В отличие от датафрейма, здесь ошибки будут видны уже в ходе компиляции.

Использование функции на датасете получается кратким и лаконичным:

import com.github.aleksandrachasch.avro.ops.CustomerDataProcessor._

val flattenedDs = ds.flatMap(_.customerFlatten)
flattenedDs.show(false)

flattenedDsflattenedDs

Как можно заметить, в коде нет декларации новых экземпляров классов (new). Механизм неявных параметров и классов (implicit) позволяет минимизировать количество экземпляров в JVM, а также различных переменных в коде. Нет необходимости собственноручно инициализировать классы — за нас это делает Scala.

Для обработки другого объекта достаточно реализовать DataProcessor[T, C] с соответствующими типами входа T и выхода C.

Надеюсь, этот пост был вам полезен.

Кстати, уже в конце февраля в OTUS стартует новый поток курса DataOps. В преддверии старта курса приглашаю всех желающих на бесплатный демоурок. Регистрация доступна по ссылке.

До встречи!

© Habrahabr.ru