[Перевод] Генерация конвейеров обработки данных в Dataflow

muiiwo2vpdxfh2d-vjay2wsixlq.png


Эта статья посвящена всем практикующим специалистам по данным, заинтересованным в освоении запуска, стандартизации и автоматизации пакетных конвейеров данных в Netflix.

О Dataflow мы писали в статье под названием Data pipeline asset management with Dataflow. Та статья представляла подробное знакомство с одним из наиболее технических аспектов Dataflow, но сам этот инструмент толком не описывала. На сей раз мы оправдаем заявленное вступление, после чего сосредоточимся на одной из основных возможностей Dataflow — образцах рабочих потоков. Для начала же мы коротко разберём Dataflow в общем.

Dataflow


Dataflow — это утилита командной строки, предназначенная для оптимизации рабочего процесса и упрощения разработки конвейеров данных в Netflix. Взгляните на этот высокоуровневый вывод команды help:

$ dataflow --help
Usage: dataflow [OPTIONS] COMMAND [ARGS]...

Options:
  --docker-image TEXT  Url of the docker image to run in.
  --run-in-docker      Run dataflow in a docker container.
  -v, --verbose        Enables verbose mode.
  --version            Show the version and exit.
  --help               Show this message and exit.

Commands:
  migration  Manage schema migration.
  mock       Generate or validate mock datasets.
  project    Manage a Dataflow project.
  sample     Generate fully functional sample workflows.


Как видите, интерфейс командной строки Dataflow разделён на четыре основных области (или команды). Наиболее используемая из них — это project, которая помогает разработчикам управлять репозиториями конвейеров данных посредством создания, тестирования, развёртывания и некоторых других действий.

Команда migration является особой функцией, разработанной Стивеном Хеннеке для полноценной автоматизации взаимодействия, а также отслеживания изменений в таблицах хранилищ данных. Благодаря внутренней системе data lineage (линия преемственности данных), разработанной Гиришом Лингаппой, миграция помогает определить нижестоящие точки использования интересующей таблицы. Помимо этого, она позволяет создавать сообщение для всех владельцев этих зависимостей. После запуска миграции Dataflow будет также отслеживать её прогресс и поможет вам взаимодействовать с нижестоящими пользователями.

Команда mock является ещё одной самостоятельной функцией, позволяющей создавать мок-файлы данных в формате YAML для выбранных таблиц, столбцов и строк данных из хранилища Netflix. Основная цель этой команды — упростить модульное тестирование конвейеров данных, но технически её также можно использоваться в любых других ситуациях для получения читаемого формата небольших наборов данных.

Все перечисленные команды мы, скорее всего, опишем в следующих статьях, а в этой сосредоточимся именно на команде sample.

Образцы рабочих потоков


Образцы рабочих потоков — это набор шаблонов, которые можно использовать для создания собственного конвейера данных. Придуманы они были в основном ради предоставления возможности опробовать продакшен-код ETL (извлечения, преобразования и загрузки), который встречается в экосистеме данных Netflix.

Весь код, который вы получаете в подобных образцах рабочих потоков, полностью функционален, подстроен под вашу среду и изолирован от рабочих потоков, сгенерированных другими разработчиками. Этот конвейер можно безопасно запускать, как только он окажется у вас в каталоге. При этом он не только создаст красивую сводную таблицу и заполнит её реальными данными, но также представит вам полный набор рекомендуемых компонентов:

  • чистый DDL-код;
  • подходящие установки метаданных таблицы;
  • задачу преобразования (на предпочтительном языке), обёрнутую в выбранный паттерн WAP (Write-Audit-Publish);
  • пробный набор результатов аудита сгенерированных данных;
  • полностью функциональные модульные тесты для логики преобразования.


Не менее важно и то, что эти пробные рабочие потоки непрерывно тестируются как часть протокола изменения кода Dataflow, так что в их работоспособности можно быть уверенным. Это один из способов укрепления доверия со стороны внутренней базы пользователей.
Далее мы разберём бизнес-логику этих образцов потоков.

▍ Бизнес-логика


В Dataflow есть несколько вариантов образцов рабочих потоков, но бизнес-логика используется в них одна. Это было сознательное решение с целью открыто показать отличия между разными языками, на которых может быть написан ETL. Понятно, что инструменты создаются под разные случаи использования, поэтому мы планируем дополнительно добавить образцы кода для других задач обработки данных (помимо классической пакетной ETL), например, для построения и оценки моделей машинного обучения.

Бизнес-логика, используемая в нашем шаблоне, вычисляет топ-100 фильмов/телешоу в каждой стране, где Netflix работает на постоянной основе. Это не фактический продакшен-конвейер, работающий в Netflix, поскольку данный код сильно упрощён, но при этом он хорошо демонстрирует пакетную ETL-задачу с различными стадиями преобразования. Разберём эти стадии.

Шаг 1: время просмотра всех фильмов и телешоу в стране инкрементально суммируется на ежедневной основе:

WITH STEP_1 AS (
   SELECT
       title_id
       , country_code
       , SUM(view_hours) AS view_hours
   FROM some_db.source_table
   WHERE playback_date = CURRENT_DATE
   GROUP BY
       title_id
       , country_code
)


Шаг 2: все тайтлы каждой страны ранжируются в порядке убывания от наиболее до наименее просматриваемых:

WITH STEP_2 AS (
   SELECT
       title_id
       , country_code
       , view_hours
       , RANK() OVER (
          PARTITION BY country_code
          ORDER BY view_hours DESC
       ) AS title_rank
   FROM STEP_1
)


Шаг 1: все тайтлы отфильтровываются до топ-100:

WITH STEP_3 AS (
   SELECT
       title_id
       , country_code
       , view_hours
       , title_rank
   FROM STEP_2
   WHERE title_rank <= 100
)


Теперь с помощью описанного выше трехэтапного преобразования мы создадим данные, которые можно будет записать в следующую таблицу Iceberg:

CREATE TABLE IF NOT EXISTS ${TARGET_DB}.dataflow_sample_results (
  title_id INT COMMENT "Title ID of the movie or show."
  , country_code STRING COMMENT "Country code of the playback session."
  , title_rank INT COMMENT "Rank of a given title in a given country."
  , view_hours DOUBLE COMMENT "Total viewing hours of a given title in a given country."
)
COMMENT
  "Example dataset brought to you by Dataflow. For more information on this
   and other examples please visit the Dataflow documentation page."
PARTITIONED BY (
  date DATE COMMENT "Playback date."
)
STORED AS ICEBERG;


Из структуры таблицы видно, что мы ежедневно собираемся загружать в неё около 19,000 строк, которые будут выглядеть так:

 sql> SELECT * FROM foo.dataflow_sample_results
      WHERE date = 20220101 and country_code = 'US'
      ORDER BY title_rank LIMIT 5;

 title_id | country_code | title_rank | view_hours | date
----------+--------------+------------+------------+----------
 11111111 | US           |          1 |   123      | 20220101
 44444444 | US           |          2 |   111      | 20220101
 33333333 | US           |          3 |   98       | 20220101
 55555555 | US           |          4 |   55       | 20220101
 22222222 | US           |          5 |   11       | 20220101
(5 rows)


Разобравшись с бизнес-логикой, можно переходить к разбору компонентов, или шаблонного-кода, наших пробных рабочих потоков.

▍ Компоненты


Рассмотрим наиболее распространённые компоненты рабочих потоков, которые мы используем в Netflix. Они вписываются не во все случаи ETL, но используются достаточно часто для включения в каждый образец рабочего потока. Всё же именно автор потока решает, хочет ли он использовать все эти шаблоны или же оставить лишь некоторые. В любом случае они есть в готовом виде, и их можно при необходимости задействовать.

▍ Определения рабочих потоков


Ниже вы видите типичную файловую структуру пакета пробных рабочих потоков, написанную на SparkSQL:

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│   └── dataflow_sparksql_sample.sql
└── src
    ├── mocks
    │   ├── dataflow_pyspark_sample.yaml
    │   └── some_db.source_table.yaml
    ├── sparksql_write.sql
    └── test_sparksql_write.py


Три верхних файла определяют серию шагов (то есть задач), их темп и последовательность, в которой они должны выполняться.

Это один из способов, которым можно связывать компоненты в единый рабочий поток. В каждом пакете пробных потоков присутствует три файла определения, работающих совместно для обеспечения гибкой функциональности. Код образца рабочего потока предполагает паттерн ежедневного использования, но его также легко подстроить для выполнения с иной периодичностью. Оркестровку рабочих потоков мы реализуем с помощью собственного планировщика Netflix под названием Maestro.

Файл определения main содержит логику одного выполнения, которая в нашем случае описывает сбор данных за день. Состоит эта логика из DDL-кода, метаданных таблицы, шага преобразования и некоторых шагов аудита. Построена она для выполнения в одну дату и должна вызываться из рабочих потоков daily или backfill. Поток main также можно вызывать вручную во время разработки с произвольными параметрами среды выполнения, чтобы пронаблюдать его в действии.

Поток daily выполняет main на ежедневной основе в течение установленного количества прошедших дней. Иногда это необходимо для восполнения данных, поступающих с задержкой. В этом случае мы определяем расписание срабатывания, схемы уведомлений и обновляем «последние» (high-water mark) временные метки в целевой таблице.

Рабочий поток backfill выполняет main в течение заданного диапазона дней. Это пригождается для повторного утверждения данных, чаще всего ввиду изменения логики преобразования, но иногда и в качестве ответа на вышестоящие изменения данных.

▍ DDL


Зачастую первым шагом в конвейере данных идёт определение структуры целевой таблицы и метаданных столбцов через утверждение DDL. Мы понимаем, что некоторые специалисты предпочитают, чтобы их итоговая схема являлась неявным результатом самого кода преобразования, но явное утверждение выходной схемы не только пригождается для добавления комментариев уровня таблицы (и столбца), но также служит для оценки логики преобразования.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│   └── dataflow_sparksql_sample.sql
└── src
    ├── mocks
    │   ├── dataflow_pyspark_sample.yaml
    │   └── some_db.source_table.yaml
    ├── sparksql_write.sql
    └── test_sparksql_write.py


Как правило, мы предпочитаем выполнять команды DDL в составе самого рабочего потока, а не вне расписания, поскольку это упрощает процесс разработки. Ниже показан пример привязывания SQL-файла создания таблицы к определению рабочего потока main.

      - job:
          id: ddl
          type: Spark
          spark:
              script: $S3{./ddl/dataflow_sparksql_sample.sql}
              parameters:
                  TARGET_DB: ${TARGET_DB}


▍ Метаданные


Этап определения метаданных обеспечивает контекст для самой выходной таблицы, а также её данных. Атрибуты устанавливаются через Metacat — внутреннюю платформу управления метаданными в Netflix. Ниже приведён пример внедрения этого этапа в определение рабочего потока main:

 - job:
     id: metadata
     type: Metadata
     metacat:
         tables:
           - ${CATALOG}/${TARGET_DB}/${TARGET_TABLE}
         owner: ${username}
         tags:
           - dataflow
           - sample
         lifetime: 123
         column_types:
           date: pk
           country_code: pk
           rank: pk


▍ Преобразование


Этап (или этапы) преобразования разработчик может выполнять на предпочитаемом языке. В примере ниже используется SparkSQL.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│   └── dataflow_sparksql_sample.sql
└── src
    ├── mocks
    │   ├── dataflow_pyspark_sample.yaml
    │   └── some_db.source_table.yaml
    ├── sparksql_write.sql
    └── test_sparksql_write.py


Как вариант, на этом этапе можно использовать паттерн Write-Audit-Publish, чтобы убедиться в верности данных до того, как они станут доступными остальной компании. Вот пример:

 - template:
     id: wap
     type: wap
     tables:
         - ${CATALOG}/${DATABASE}/${TABLE}
     write_jobs:
       - job:
           id: write
           type: Spark
           spark:
               script: $S3{./src/sparksql_write.sql}


▍ Аудит


Этапы аудита определяются для оценки качества данных. Если «блокирующий» аудит провалится, задача остановится, и этап write не состоится, что исключит представление пользователям ошибочных данных. Это необязательный шаг, который, к тому же можно настроить. Вот частичный пример аудита из рабочего потока main:

         data_auditor:
            audits:
              - function: columns_should_not_have_nulls
                blocking: true
                params:
                    table: ${TARGET_TABLE}
                    columns:
                      - title_id
                      …


▍ Последняя временная метка


Успешная запись обычно сопровождается вызовом метаданных для установки актуального времени (или high-water mark) датасета. Это позволяет другим использующим нашу таблицу процессам получать уведомление и начинать действовать. Вот пример задачи high-water mark из main:

      - job:
         id: hwm
         type: HWM
         metacat:
           table: ${CATALOG}/${TARGET_DB}/${TARGET_TABLE}
           hwm_datetime: ${EXECUTION_DATE}
           hwm_timezone: ${EXECUTION_TIMEZONE}


▍ Модульные тесты


Артефакты модульных тестов также генерируются как часть структуры пробного рабочего потока. Они состоят из мок-данных, фактического кода теста и простой схемы выполнения, зависящей от языка рабочего потока. В примере ниже это файлы dataflow_pyspark_sample.yaml, some_db.source_table.yaml и test_sparksql_write.py.

.
├── backfill.sch.yaml
├── daily.sch.yaml
├── main.sch.yaml
├── ddl
│   └── dataflow_sparksql_sample.sql
└── src
    ├── mocks
    │   ├── dataflow_pyspark_sample.yaml
    │   └── some_db.source_table.yaml
    ├── sparksql_write.sql
    └── test_sparksql_write.py


Эти модульные тесты должны проверять один «модуль» преобразования данных. Их можно выполнять во время разработки для быстрого перехвата опечаток или синтаксических ошибок в коде, а также на стадии автоматизированного тестирования/развёртывания, чтобы убедиться, что изменения кода не приведут к сбою каких-либо тестов.

Модульные тесты должны выполняться быстро, обеспечивая оперативную обратную связь и возможность быстро проводить итерации во время цикла разработки. Выполнение кода для базы данных продакшена может происходить медленно, особенно с учётом нагрузки, необходимой для систем обработки распределённых данных вроде Apache Spark. Имитация данных позволяет выполнять тесты локально на их небольшой выборке для оценки кода преобразования.

Языки


С течением времени задачи по извлечению данных из исходных систем Netflix оказались востребованы широким спектром конечных пользователей, таких как инженеры, аналитики данных, маркетологи и другие. Делая акцент на удобстве, Dataflow позволяет этим разным категориям людей реализовывать свои задачи с лёгкостью. Многие пользователи данных задействуют SparkSQL, PySpark и Scala. Небольшое, но растущее число учёных и аналитиков также используют R в сочетании с интерфейсом Sparklyr или прочими инструментами обработки данных вроде Metaflow.

Осознавая, что ландшафт данных и используемые конечными пользователями технологии не однородны, Dataflow выстраивает адаптивный путь. Эта платформа обеспечивает различные средства, или шаблоны, для извлечения данных, и в этом разделе мы познакомимся с несколькими из них. Первыми мы разберём SparkSQL и подход Python, а закончим случаями применения Scala и R.

Следующая команда поможет пользователю понять, как и с чего начинать после установки Dataflow:

$ dataflow sample workflow --help                                                         
Dataflow (0.6.16)

Usage: dataflow sample workflow [OPTIONS] RECIPE [TARGET_PATH]

Create a sample workflow based on selected RECIPE and land it in the
specified TARGET_PATH.

Currently supported workflow RECIPEs are: spark-sql, pyspark,
scala and sparklyr.

  If TARGET_PATH:
  - if not specified, current directory is assumed
  - points to a directory, it will be used as the target location

Options:
  --source-path TEXT         Source path of the sample workflows.
  --workflow-shortname TEXT  Workflow short name.
  --workflow-id TEXT         Workflow ID.
  --skip-info                Skip the info about the workflow sample.
  --help                     Show this message and exit.


Опять же, мы предположим, что у нас есть каталог strange-data, в котором пользователь создаёт шаблоны рабочих потоков на всех четырёх предлагаемых Dataflow языках. Чтобы лучше проиллюстрировать генерацию образцов потоков при помощи Dataflow, мы рассмотрим полноценную команду, используемую для создания одного из них:

$ cd stranger-data
$ dataflow sample workflow spark-sql ./sparksql-workflow


Повторяя эту команду для каждого типа языка преобразования, можно получить следующую структуру каталогов:

.
├── pyspark-workflow
│   ├── main.sch.yaml
│   ├── daily.sch.yaml
│   ├── backfill.sch.yaml
│   ├── ddl
│   │   └── ...
│   ├── src
│   │   └── ...
│   └── tox.ini
├── scala-workflow
│   ├── build.gradle
│   └── ...
├── sparklyR-workflow
│   └── ...
└── sparksql-workflow
    └── ...


Ранее мы уже разобрали бизнес-логику этих образцов потоков и показали соответствующую SparkSQL-версию преобразования данных. Теперь мы разберём иные подходы к написанию данных на других языках.

▍ PySpark


Приведённый ниже частичный код PySpark имеет ту же функциональность, что и пример SparkSQL выше, но при этом задействует интерфейс датафреймов Python.

def main(args, spark):
   
    source_table_df = spark.table(f"{some_db}.{source_table})

    viewing_by_title_country = (
        source_table_df.select("title_id", "country_code",      
        "view_hours")
        .filter(col("date") == date)
        .filter("title_id IS NOT NULL AND view_hours > 0")
        .groupBy("title_id", "country_code")
        .agg(F.sum("view_hours").alias("view_hours"))
    )

    window = Window.partitionBy(
        "country_code"
    ).orderBy(col("view_hours").desc())

    ranked_viewing_by_title_country = viewing_by_title_country.withColumn(
        "title_rank", rank().over(window)
    )

    ranked_viewing_by_title_country.filter(
        col("title_rank") <= 100
    ).withColumn(
        "date", lit(int(date))
    ).select(
        "title_id",
        "country_code",
        "title_rank",
        "view_hours",
        "date",
    ).repartition(1).write.byName().insertInto(
        target_table, overwrite=True
    )


▍ Scala


Scala является ещё одним поддерживаемым Dataflow способом, предлагающим ту же готовую бизнес-логику для образца потока.

package com.netflix.spark

object ExampleApp {
  import spark.implicits._

  def readSourceTable(sourceDb: String, dataDate: String): DataFrame =
    spark
      .table(s"$someDb.source_table")
      .filter($"playback_start_date" === dataDate)

  def viewingByTitleCountry(sourceTableDF: DataFrame): DataFrame = {
    sourceTableDF
      .select($"title_id", $"country_code", $"view_hours")
      .filter($"title_id".isNotNull)
      .filter($"view_hours" > 0)
      .groupBy($"title_id", $"country_code")
      .agg(F.sum($"view_hours").as("view_hours"))
  }

  def addTitleRank(viewingDF: DataFrame): DataFrame = {
    viewingDF.withColumn(
      "title_rank", F.rank().over(
        Window.partitionBy($"country_code").orderBy($"view_hours".desc)
      )
    )
  }

  def writeViewing(viewingDF: DataFrame, targetTable: String, dataDate: String): Unit = {
    viewingDF
      .select($"title_id", $"country_code", $"title_rank", $"view_hours")
      .filter($"title_rank" <= 100)
      .repartition(1)
      .withColumn("date", F.lit(dataDate.toInt))
      .writeTo(targetTable)
      .overwritePartitions()
  }

def main():
    sourceTableDF = readSourceTable("some_db", "source_table", 20200101)
    viewingDf = viewingByTitleCountry(sourceTableDF)
    titleRankedDf = addTitleRank(viewingDF)
    writeViewing(titleRankedDf)


▍ R / sparklyR


Поскольку в Netflix неуклонно растёт число пользователей R, этот язык также входит в набор инструментов Dataflow:

suppressPackageStartupMessages({
  library(sparklyr)
  library(dplyr)
})

...

main <- function(args, spark) {
  title_df <- tbl(spark, g("{some_db}.{source_table}"))

  title_activity_by_country <- title_df |>
    filter(title_date == date) |>
    filter(!is.null(title_id) & event_count > 0) |>
    select(title_id, country_code, event_type) |>
    group_by(title_id, country_code) |>
    summarize(event_count = sum(event_type, na.rm = TRUE))

  ranked_title_activity_by_country <- title_activity_by_country  |>
    group_by(country_code) |>
    mutate(title_rank = rank(desc(event_count)))

  top_25_title_by_country <- ranked_title_activity_by_country |>
    ungroup() |>
    filter(title_rank <= 25) |>
    mutate(date = as.integer(date)) |>
    select(
      title_id,
      country_code,
      title_rank,
      event_count,
      date
    )

    top_25_title_by_country |>
      sdf_repartition(partitions = 1) |>
      spark_insert_table(target_table, mode = "overwrite")
}
  main(args = args, spark = spark)
}


Заключение


Как видите, мы в Netflix стремимся облегчить жизнь дата-инженеров, предлагая уже протоптанные пути и рекомендации по структуризации кода и стараясь сохранять достаточно широкий выбор вариантов под разные задачи.

Использование грамотного набора предустановок при создании конвейера данных в Netflix упрощает онбординг, а также обеспечивает лучшие практики стандартизации и централизации. Немного скажем о них.

▍ Онбординг


Налаживание работы новой команды или вертикали бизнеса всегда требует определённых усилий, особенно в рамках культуры «строгой согласованности и слабой связанности». Наличие хорошо задокументированной отправной точки — избавляет от ряда сложностей, возникающих при начале с нуля, и значительно ускоряет первую итерацию цикла разработки.

▍ Стандартизация


Стандартизация упрощает жизнь новым участникам команды, а также тем, кто уже знаком с областью и стеком используемых технологий.

Некоторая передача работы между людьми и командами неизбежна. В этом случае наличие схемы и шаблонов стандартизации устраняет возникающие в ходе такого обмена трения. Кроме того, с код-ревью и рекомендациями проще работать, отталкиваясь от общей основы.
Стандартизация также делает схему проекта более интуитивной и минимизирует риск человеческих ошибок по мере роста базы кода.

Централизация лучших практик


Инфраструктура данных непрерывно развивается. Удобство доступа к централизованному набору продуманных предустановок является критически важным моментом, обеспечивая развитие лучших практик параллельно с технологиями и предоставляя пользователям информацию о последних новшествах в технологическом стеке.

Что ещё лучше, Dataflow предлагает лучшие практики, которые представляют все эти концепции в контексте фактических случаев использования. Вместо чтения документации — вы можете инициализировать «реальный» проект, по необходимости его изменить и использовать в качестве отправной точки.

ymoc6_v0doy8yrm1y4xsrjlxotc.jpeg

© Habrahabr.ru