Как мы переехали с Greenplum на Delta Table

У нас было 2 кластера Clickhouse, 1 кластер GreenPlum«a, 1 кластер Kubernetes«a, DataVault 2.0, гора dbt витрин и всего такого, а еще Dagster. Не то чтобы это все было нужно в архитектуре, но раз начал коллекционировать сервисы, то иди в своем увлечении до конца. Единственное, что нас беспокоило — это стоимость. 

Решение о полной переделке архитектуры пришло внезапно, как галлюцинация в ночной пустыне — мы поняли, что больше нельзя ждать. Теперь наши данные обрабатываются быстрее, чем мысли в голове на ЛСД, и мы можем персонализировать клиентский опыт так, что он становится почти реальным. 

Всем привет! На связи Артем, и если вы готовы погрузиться в мир цифрового безумия и увидеть, как мы превратили хаос в порядок, то держитесь крепче и читайте дальше.

Когда-то мы уже писали о том, как построить DataVault на Greenplum, со статьей можно ознакомиться здесь. Кратко расскажем о причинах, подтолкнувших нас пересмотреть подход — это оптимизация расходов и отсутствие экспертов в администрировании GreenPlum.

В первом случае нам бы хотелось платить только за время фактического использования ресурсов, а во втором, несмотря на то, что наш кластер был управляемым, мы регулярно сталкивались с проблемами поддержки и масштабирования системы, что также приводило к непредсказуемым затратам.

Выбор нового решения

При выборе нового решения мы учитывали несколько ограничений:

  • Dagster: для управления задачами

  • Kubernetes: для оркестрации контейнеров

  • Yandex Cloud: как основная облачная платформа

Основные компоненты нового решения:

  • Yandex Object Storage: для хранения

  • Delta Table: для управления

  • Apache Spark: для обработки

Почему именно эти технологии?

Переход с архитектуры Data Vault 2.0 на архитектуру Delta Lake (S3 + Delta Table) может предложить ряд существенных преимуществ:

  • Транзакционная целостность и ACID-свойства — Delta Lake обеспечивает надежность и целостность данных при параллельных операциях

  • Производительность и оптимизация запросов — структура Data Vault 2.0 может приводить к сложным и менее оптимизированным запросам, требующим значительных усилий для оптимизации

  • Гибкость и масштабируемость — Data Vault 2.0 хорошо подходит для интеграции из различных источников, но может потребовать значительных ресурсов для масштабирования под высокие нагрузки, тогда как Delta Lake спроектирован для работы с большими объемами данных в облачной среде и легко масштабируется в зависимости от нагрузки без серьезных затрат

  • Упрощенное управление схемой данных — delta table поддерживают автоматическую эволюцию схемы и принудительное соблюдение схемы, что упрощает внесение изменений в структуру данных и управление ими

  • Экосистема и интеграция с другими инструментами — delta table легко интегрируется с apache spark, что упрощает разработку и эксплуатацию аналитических приложений

Apache Spark

Одним из преимуществ Apache Spark является возможность его локального развертывания, что особенно удобно для разработки и тестирования. На текущий момент обработка наших данных происходит именно таким образом.

Как это работает?

В самом простом виде нашу архитектуру можно представить следующим образом:

Архитектура данных

Архитектура данных

Данные хранятся в Yandex Object Storage в формате Delta Table. Это позволяет нам воспользоваться преимуществами версионирования данных и транзакционной целостности. Помимо этого, мы используем S3 для хранения артефактов ML моделей.

В работе с форматом Delta Table правильное распределение данных по партициям играет ключевую роль. Учитывая разнообразие наших источников данных, мы внедрили партиционирование не только по дате, но и по источнику. Это значительно повышает скорость и эффективность чтения данных. При грамотном выборе партиции Spark применяет partition pruning, избегая необходимости читать все файлы в таблице, что оптимизирует производительность системы.

Для нашего MVP мы выбрали использование кластера Spark на одном узле, настроив SparkSession следующим образом:

SparkSession.builder.appName().master(f"local[*]")

Полная конфигурация Spark«a для работы с S3 может выглядеть следующим образом:

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

packages = [
    "io.delta:delta-core_2.12:2.0.0",
    "org.apache.hadoop:hadoop-aws:3.3.1",
]

builder = (SparkSession.builder.appName().master(f"local[*]")
    .config("spark.driver.memory", "16g")
    .config("spark.hadoop.fs.s3a.bucket..access.key", "")
    .config("spark.hadoop.fs.s3a.bucket..secret.key", "")
    .config("spark.hadoop.fs.s3a.bucket..endpoint", "")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.fast.upload", "true")
    .config("spark.sql.ui.explainMode", "extended")
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .config("spark.driver.extraJavaOptions", _spark_with_newer_jvm_compatibility_options)
)

spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Такой подход позволяет запускать Spark локально на той же машине, где создается сессия. Благодаря этому, мы можем запускать Spark непосредственно в подах Dagster и удобно отлаживать запросы локально.

В работе с Dagster мы используем ассеты и менеджеры. Вся бизнес-логика размещается внутри ассета, а управление вводом-выводом передается менеджеру. Таким образом, нам было достаточно написать Spark менеджер и адаптировать запросы в коде для работы со Spark. 

Для экономии ресурсов во время простоя мы внедрили автоскейл группы прерываемых узлов в Managed Service for Kubernetes от Яндекс Облака. Развернули с помощью Terraform, используя готовый модуль. Мы настроили группы так, чтобы они могли масштабироваться до нуля узлов при отсутствии нагрузки. Для подов Dagster у нас выделена отдельная группа узлов, где всегда работает хотя бы один узел. Таким образом, система практически не потребляет ресурсы в периоды простоя.

При необходимости также можно запросить чуть больше ресурсов для конкретных задач, настроив конфигурацию для каждого задания Dagster:

@job(    
    tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "250m", "memory": "64Mi"},
                    "limits": {"cpu": "500m", "memory": "2560Mi"},
                },
            },
    },
)
def my_job():
    my_op()

Либо, при использовании ассетов:
my_job = define_asset_job(
	name=’my_job’,
	selection=AssetSelection.assets(my_asset),
	tags={
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "250m", "memory": "64Mi"},
                    "limits": {"cpu": "500m", "memory": "2560Mi"},
                },
            },
    },
)

Для записи данных в Clickhouse мы используем библиотеку clickhouse-connect, так как наш исходный код уже был написан с ее помощью. Однако стоит отметить, что для Spark есть драйвер для работы с Clickhouse, позволяя использовать Spark SQL для выполнения запросов и обработки данных в Clickhouse.

Важно отметить, что для контроля расходов на использование хранилища необходимо регулярно обслуживать все таблицы с помощью команд VACUUM и OPTIMIZE. Команда VACUUM удаляет старые, неиспользуемые данные и освобождает пространство, что помогает снизить затраты на хранение. Команда OPTIMIZE, в свою очередь, не только уменьшает размер таблиц, но и значительно улучшает производительность запросов, ускоряя чтение данных. 

Важное замечание: Spark + Delta Table по умолчанию поддерживают механизм оптимистичных блокировок для write-запросов, выполняемых в одном Spark кластере, но не для write-запросов из разных Spark кластеров. Чтобы включить такую поддержку, необходимо добавить в конфигурацию DynamoDB. В Яндекс Облаке YDB реализует интерфейс DynamoDB, но его интеграция оказалась для нас сложной задачей. Поэтому мы решили использовать механизм пессимистичных блокировок, встроенный в Dagster, через функцию tag_concurrency_limits. Все джобы, записывающие данные в таблицу, мы помечаем тегом с названием этой таблицы. Dagster, в свою очередь, предотвращает одновременное выполнение таких джоб, что обеспечивает корректное управление блокировками и исключает конфликтные write-запросы. 

Таким образом, конфигурация вашего dagsterDaemon будет выглядеть как-то так:

dagsterDaemon:
 enabled: true


 image:
   repository: "docker.io/dagster/dagster-celery-k8s"
   tag: ~
   pullPolicy: Always


 heartbeatTolerance: 300


 runCoordinator:
   enabled: true
   type: QueuedRunCoordinator
   config:
     queuedRunCoordinator:
       maxConcurrentRuns: 10
       tagConcurrencyLimits:
         - key: "single-thread"
           value:
             applyLimitPerUniqueValue: true
           limit: 1

После чего в настройки вашего задания можно добавить:

my_job = define_asset_job(
	name=’my_job’,
	selection=AssetSelection.assets(my_asset),
	tags={
	        "single-thread”: "my_job”,
        "dagster-k8s/config": {
            "container_config": {
                "resources": {
                    "requests": {"cpu": "250m", "memory": "64Mi"},
                    "limits": {"cpu": "500m", "memory": "2560Mi"},
                },
            },
    },
)

Преимущества нового подхода:

Мы платим только за фактическое время работы, что позволяет эффективно управлять бюджетом и уменьшить издержки.

Мы легко адаптируемся под изменяющиеся нагрузки, обеспечивая высокую производительность и надежность

Мы по-прежнему поставляем готовые витрины в Clickhouse, что позволяет сохранить существующие процессы для аналитиков и избежать значительных изменений.

Заключение

Переход на новую архитектуру оказался успешным и принес значительную экономию расходов, улучшение гибкости и масштабируемости. Наш опыт показывает, что такие изменения могут быть менее сложными, чем ожидается, и приносить значительные выгоды. 

А в данный момент мы подготавливаем инфраструктуру для внедрения горизонтально масштабируемого Spark«a и поддержки Hive Metastore, о чем постараемся рассказать в следующей статье.

Ну вот и все!) Благодарю за внимание и буду рад услышать ваше мнение) Поделитесь своим опытом, задавайте вопросы и расскажите, как вы справляетесь с подобными задачами)

© Habrahabr.ru