Быстрая обработка данных в data lake с помощью SQL
Кому пришла в голову идея отправлять SQL запросы в data lake? Оказывается, это позволяет компаниям более гибко и эффективно анализировать свои данные за счёт уменьшения потребности в ETL и снижения нагрузки на корпоративное хранилище. Рассмотрим, какие популярные SQL-движки умеют это делать и как им это удаётся.
Меня зовут Владимир Озеров, я руковожу компанией Querify Labs. Мы уже порядка 10 лет занимаемся распределённым SQL, делаем всевозможные SQL-движки, в частности CedrusData — коммерческий движок на основе опенсорс проекта Trino. Сегодня поговорим про то, каким образом устроен ряд SQL-движков, которые обрабатывают данные от data lake.
SQL & Data lake
Если посмотреть на путь развития аналитических систем в различных компаниях, можно увидеть, что на протяжении многих лет мы сталкиваемся с одной и той же проблемой. Сначала данных не очень много. Потом их количество увеличивается. А бизнес говорит, что надо принимать решения на их основе. Тогда все поголовно бегут в Data Warehouse и запускают запросы. Это приводит к увеличению количества пользователей и появлению разнородных нагрузок.
Когда данных мало, справляются и монолитные системы. Когда данных становится больше, мы часто используем системы на основе Shared-nothing архитектуры (Greenplum, Vertica, Teradata и прочее), которая обеспечивает горизонтальную масштабируемость.
По мере того, как увеличивается нагрузка, она также становится разноплановой. Поэтому в современном мире чаще используется другой подход — disaggregated storage. Он заключается в отделении вычислительных узлов от данных.
Spark, Trino, Presto являются проявлением такого подхода. Многие вендоры классических Data Warehouse идут в эту же сторону. Например, Vertica Eon, Clickhouse S3.
Disaggregated storage
Shared-nothing архитектура обладает рядом недостатков:
Тяжело масштабировать.
Не справляется с разнородными нагрузками.
Не подходит для облака.
Vendor lock.
Большая часть из них вытекает из того, что вычисление и хранение объединены. Это значит, что если вы хотите изменить размер кластера, вам нужно перемещать данные. Это довольно сложные процессы. При их выполнении ваши КХД могут терять устойчивость. И непонятно, как переносить данные в облако и обеспечить scale up/scale down. Из-за этого данные хранятся в форматах конкретных вендоров: Greenplum-данные, Teradata-данные, которые не могут быть прочитаны никем другим. По мере привлечения всё новых пользователей этим всё труднее управлять.
Из этого и вытекают плюсы подхода с отделением compute от storage:
Эластичная масштабируемость.
Гибкое управление нагрузкой: разные движки, множественные кластеры.
Любое окружение.
Открытые форматы хранения.
Вы можете очень быстро удалять и добавлять новые узлы в кластер. Накручивать туда Kubernetes с автоскейлингом и вообще делать всё что угодно.
Этот подход позволяет стартовать множество кластеров к одним и тем же данным под каждую нагрузку. Например, вы можете выделить разные кластера для отделов продаж и маркетинга. Или просматривать одни и те же данные с помощью разных продуктов. Например, положить в data lake Parquet и просмотреть эти данные сначала Spark«ом для батч-процессинга, а потом Flink«ом, и отдельно Trino. Так можно организовывать достаточно гибкие архитектуры под любое окружение и требования.
LakeHouse
Lake house — это концепция, когда вы строите слоистую инфраструктуру. Внизу расположен классический data lake (HDFS, Ozone, S3-совместимые storage, облачные, on-premise и прочее). А сверху вы можете добавлять любые форматы хранения от простейшего CSV и до Parquet/ORC. А ещё транзакции и схемы с помощью Iceberg. Наконец, можно подключать различные движки. Например, Spark, Trino, Flink в зависимости от того, с какой нагрузкой сталкиваетесь.
Звучит привлекательно, но среди всех этих продуктов можно выделить группу движков, которые направлены на интерактивный анализ данных в data lake.
Распределённые SQL-движки
Самые популярные опенсорсные движки, на текущем этапе — Trino и Dremio.
Эти продукты появились примерно в одно время в недрах больших корпораций, а потом вышли в опенсорс. В результате их используют многие коммерческие вендоры, и мы в том числе. Эти движки отлично справляются с быстрой интерактивной обработкой данных в data lake.
Вот их основные характеристики:
Отделение compute от storage.
Работают с открытыми форматами данных.
Распределённые.
Массивно-параллельные.
Обеспечивают вычисления в памяти (преимущественно).
Колоночные.
Компилируемые.
Движки только обрабатывают данные, а сами данные находятся в вашем data lake. Но учитывая, что мы работаем с открытыми форматами данных вроде Parquet/ORC, которые нуждаются в декодировании и переформатировании, возникают вопросы:
Насколько быстро в принципе такие движки могут работать?
Можно ли их рассматривать, как полноценную замену КХД?
На самом деле эти технологии активно развиваются и есть первые истории успеха. Коллеги с Авито рассказывали, как они планомерно мигрируют с Vertica на комбинацию Trino (для обработки данных) и CEPH (для хранения данных).
Это вполне себе история успешного переезда на архитектуру с отделённым SQL-движком, который заходит в data lake.
Data skipping
У таких движков, как правило, внутри мощные оптимизаторы запросов. Они представляют из себя запрос в виде реляционного дерева, с которого можно выводить большое количество всевозможных оптимизаций.
Рассмотрим самые важные оптимизации для data lake.
Pruning неиспользованных колонок
В аналитических нагрузках часто бывают широкие таблицы, но конкретные запросы затрагивают лишь небольшое количество колонок. А мы хотим понять, какие колонки задействованы в запросе и обращаться только к ним. Для этого оптимизатор просматривает планы и понимает, что требуется только небольшой набор колонок.
Поэтому если вы используете такого рода движки, придётся хранить данные в data lake в колоночном формате. Тогда вы получите хорошую производительность. На практике компаниям бывает интересно попробовать такие подходы, но начать пилотный проект всё равно решают с JSON (или CSV). А в таком случае на производительность рассчитывать не стоит, потому что колонки не будут пруниться.
Фильтры pushdown
Filter pushdown используется во многих движках в разных ипостасях. Когда мы работаем с data lake, фильтр pushdown направлен в первую очередь на то, чтобы сканировать как можно меньше данных.
В запросе можно задать фильтры конкретным колонкам, потом преобразовать и последовательно опустить вниз к операторам сканирования. Причём какие-то фильтры могут быть тривиальными, как «data = number», а другие завёрнуты в функции, из которых ещё предстоит вывести конкретные ограничения.
Например, мы используем фильтр по колонке day, где указываем конкретный год:
В этом случае движки часто не понимают этот фильтр. Поэтому такого рода функции предварительно разворачивают в более понятные выражения. На картинке приведён тривиальный пример, но на самом деле для развёртывания используют довольно много операторов: like, truncate, >, < по колонкам и прочее.
Получив понятный фильтр, который относится к колонкам партиционирования или бакетирования данных, мы можем его использовать, чтобы отбрасывать partitions и buckets, то есть не обращаться к ним вовсе.
Также это означает, что если вы партиционируете или бакетируете таблицы, то в идеале фильтры должны быть понятны движку. Какие-то движки могут делать продвинутые трансформации над не очень понятными фильтрами, а какие-то с этим не справятся. Поэтому важно понимать, что чем понятнее вы запишите фильтр, тем с большей вероятностью он будет использован. Это очень похоже на подход sargable. Часто говорят о sargable-аргументах или sargable-запросах в контексте SQL сервера, когда для них используется индекс.
На практике возможности движков по переписыванию фильтров сильно разнятся. Поэтому придётся экспериментировать.
Динамические фильтры
Ещё одна суперважная оптимизация в такого рода движках — рантайм или динамические фильтры.
Допустим, есть большая таблица фактов, которую вы джойните и пишете фильтры по dimensions. Обычная ситуация — схема «звезда», когда вы пишете фильтры листьевым оператором, а явных фильтров на таблице фактов нет.
Запрос буквально получает продажи за один день. Это может быть доля процента от всех данных в таблице. Если мы будем исполнять такой запрос в лоб, то из таблицы продаж притянем почти 3 миллиарда записей только для того, чтобы большая их часть была сброшена в процессе выполнения оператора join.
Вместо этого можно отсканировать dimension. Посмотреть в операторе join чему равны значения в этом условии. Таким образом сформировать в runtime дополнительный фильтр, который будет направлен к оператору сканирования большой таблицы. Так вы прочитаете только небольшое количество записей. В моем случае из 3 миллиардов записей после оптимизации осталось 9 миллионов. В результате время выполнения запроса радикально сократилось.
У всех движков, которые работают с data lake, такая оптимизация в том или ином виде есть:
Без этого никак! Иначе придётся бесконечно сканировать большие куски данных.
Динамические фильтры устроены довольно сложно. Вы сначала сканируете правую часть join, потом собираете статистики, потом их ещё надо разослать на другие узлы. Оказывается, движки не всё из этого умеют. В каких-то случаях вы собираете динамический фильтр справа, а он оказывается не селективным или таблица справа оказывается большой. Из-за этого в крайних случаях запрос даже может замедляться. Поэтому в идеале решение — ставить ли динамические фильтры, надо принимать на основе оценок стоимости статистик и прочего. Но обычно этого не происходит. Они или ставятся или нет, в зависимости от того, что вы указали в конфигурации.
Поэтому, чтобы не получить проблемы, надо понимать, как это устроено, и читать документацию. Там может быть написано, что оптимизации отключаются автоматически и их надо включить вручную. Или вы можете сконфигурировать таймауты. А ожидание сканирования таблицы, займёт не более 3 секунд. А иногда фильтры получаются слишком большие, из-за кучи уникальных значений. Поэтому движок может отказаться их пробрасывать.
Поэкспериментируйте с параметрами и при необходимости произведите тонкую настройку. Как и с любой сложной системой без экспериментов уверенности не почувствуете. Естественно, системы типа Trino стремятся выбрать адекватные дефолты, но лучше проверить.
Параллельные чтения
Если в кратком описании вашего движка нет слова массивно-параллельный, никто с вами серьёзно разговаривать не будет. Поэтому важно понимать, как в таких движках обеспечивается параллелизм.
Сплиты
Чтобы эффективно и параллельно читать данные, data lake:
i. делит таблицу на partiotion/bucket;
ii. в процессе записи таблицы через Spark указывает небольшие размеры row групп.
Кроме того, у файлов, структурированных как Parquet/ORC, есть своя внутрення структура, которая описывает офсеты row групп и других элементов.
Например, Trino устроен таким образом.
Мы делаем листинг директории в data lake, который описывает текущую таблицу, и понимаем, какие там файлы. Если файлы не очень большие, то один будет прочитан в один поток. Если же большие, понадобится несколько потоков. В дальнейшем задача на чтение тех или иных кусков файлов, которые мы называем сплитами, распределяются по worker-узлам — workers далее.
Workers начинают читать отдельные части вашей таблицы независимо друг от друга. Потом они обмениваются данными, чтобы исполнять промежуточные операторы.
Стоит помнить, что файловая структура таблиц должна соотносится с возможностями кластера. Если за огромной таблицей записать один огромный файл с одной огромной row-группой, никакого параллелизма не будет, и работать всё будет не очень хорошо. Если же разбить таблицу на слишком маленькие partition по несколько килобайт, то больше потеряете на накладных расходах. Ведь пока вы сделаете сетевой вызов, получите небольшое количество данных.
Хорошая практика — отталкиваться от базовых рекомендаций. Например, создавать относительно небольшие row-группы (64 Мб, 128 Мб, 256 Мб). Чем больше узлов в кластере, тем более мелкие row-группы дадут вам больше параллелизм. Но только до определённого предела, потому что потом вы будете получать больше накладных расходов. Поэтому проще всего отталкиваться от общепринятых дефолтов.
Если у вас много таких сплитов, движки могут принимать консервативные стратегии по их распределению на узлы. Например, в Presto какой бы мощный worker вы не поставили, в рамках одного worker«а по умолчанию будет сканироваться не более 16 потоков. Допустим, у вас 128 ядер и колоссальный сетевой канал. Но если в Presto стоит дефолтное значение, он будет тянуть данные понемногу.
Когда у вас очень большие таблицы, встанет вопрос о правильной конфигурации скедулинга. Вот несколько примеров для Trino:
Но у каждого движка свои особенности. Обычно сигнал о том, что таблицы плохо побиты или скедулер неправильно работает — недостаточная утилизация сети или CPU в процессе сканирования. Но из коробки, как правило, это срабатывает адекватно.
Группировка сплитов
Так как у нас, в отличие от классических КХД, compute от storage отделён мы не обязаны сканировать partition на том же узле, где он находится. У нас вообще нет понятия partition, но мы можем делать такое партиционирование виртуально — получить список всех наших bucket и по какому-то принципу сгруппировать их по узлам.
Например, можно сделать так, чтобы все сплиты, которые относятся к partition за дату 22 марта, пойдут на один и тот же узел. Это делается потому, что многим операторам в распределённых системах требуется шаффлить данные под собой. Это относится к агрегатам, потому что все записи с одним и тем же ключом агрегации должны оказаться в одном worker. Это относится к join, к window-функциям, и к большому количеству операторов.
В нашем случае, если мы сканируем таблицу sales, которая партиционирована по дате, то по дате и делаем агрегацию. Но сплиты группировать не будем. Иначе получится, что мы сначала отсканируем таблицу, потом сделаем предагрегацию, потом — шаффл и только в конце посчитаем агрегаты. Потому что именно после шаффла все записи с одной и той же датой окажутся на одном узле.
Но если заранее сгруппировать сплиты, можно избежать шаффла, потому что все записи с одной и той же датой и так находятся на одном узле. В конечном счёте это приводит к тому, что структура дерева операторов упрощается, меньше данных передаётся по сети между workers, и запрос выполняется быстрее.
Но так работает не всегда. Потому что как только мы жёстко привязываем конкретные части данных к конкретным узлам, получаем тоже, что и в классических КХД — горящие partition, data skew и прочее. Оказывается, что если один partition больше другого, и он поступил на конкретный узел, скорость, latency запроса будет упираться в самый медленный partition.
Поэтому решение лучше принимать на основе затрат, хотя в реальности его принимают либо вообще без учёта затрат, либо по их поверхностной оценке. Например, в Spark есть оптимизация горячих partition. Поэтому можно сделать реоптимизацию в runtime или принять другое решение по количеству bucket. В Trino, Dremio, Presto вообще почти ничего не заимплементировано. Поэтому придётсяконфигурировать его самостоятельно. И хотя движок вас немного подстраховывает, рассчитывать на это не стоит.
Если вы работаете с partition-таблицами и у вас данные более или менее равномерно распределены, а partition больше, чем узлов, то для многих запросов вы скорее всего получите ускорение. Ведь не будет паразитных шаффлов. Но если есть data skew, то, наоборот, можно получить замедление.
По сравнению с классическими КХД, здесь, по крайней мере, есть свобода выбора. Хотим — используем, хотим — не используем. А если используем Greenplum, то просто разбиваем его на partition. Поэтому такого рода оптимизации лучше использовать только после изучения документации.
Кэширование
Кэширование очень актуально при работе с data lake. Да и вообще с аналитическими системами, потому что мы, как правило, работаем с медленно изменяемыми данными. Например, записали partition с продажами за последний день. В data lake или даже в КХД, они, скорее всего, уже не изменятся. Поэтому когда будем многократно отправлять к ним SQL-запросы, будем делать одни и те же вычисления и получать те же самые данные.
Поэтому мы можем ускорить подобные вычисления за счёт кэширования метаинформации и самих данных.
Кэширование: метаданные
Обработка файлов в data lake начинается с выяснения размера файла, положение его блоков, модификации данных.
Если мы работаем с Parquet/ORC, то у них есть footer, по которым можно прочитать метаинформацию о внутренней структуре файла. Для этого сначала получаем статус, потом идём в data lake за кусочком footer, анализируем его, и только после этого понимаем, где находятся конкретные колонки, индексы, Блум-фильтры и прочее. Тогда и начинаем непосредственно читать данные. При этом все эти шаги обычно выдают одну и ту же информацию, потому что данные изменяются медленно.
Чтобы ускорить процесс — кэшируем метаданные. Каждый раз мы идём в data lake и смотрим, что файл не изменился с прошедшего чтения. Если это так, то мы переиспользуем закэшированную метаинформацию внутри worker. Она, как правило, не занимает много места, поэтому её можно хранить в памяти, тем самым уменьшив количество сетевых вызовов к вашему data lake.
Такого рода оптимизации есть и в Presto и в движке CedrusData — они позволяют уменьшить latency.
Можно использовать более агрессивные стратегии и кэшировать даже информацию о статусах файлов или листинге директорий. Для этого однократно идём в data lake, видим там какой-то файл и предполагаем, что он всегда будет в неизменном виде. Это более агрессивная стратегия, потому что файл может измениться. Например, пользователи говорят, что ваш Iceberg разбит на маленькие кусочки и медленно работает, оптимизируйте! Администраторы оптимизировали, а ваш движок по-прежнему думает, что там старые файлы, поэтому оптимизация не срабатывает.
Такую стратегию надо использовать с опаской. Она доступна, например, в Presto, как и кэширование статусов файлов.
Кэширование: данные
При чтении данных обычно возникает 2 вызова:
Передача данных по сети.
Декодинг данных из формата data lake в формат движка.
Первое связано с тем, что данные находятся удалённо. Значит, читать их дорого. По крайней мере, значительно дороже, чем если бы данные были просто у вас под ногами как в Shared-nothing системах.
Вторая проблема менее очевидная. Если мы работаем с открытыми форматами вроде Parquet/ORC, то данные сначала надо декодировать. Условно, Parquet надо разжать, может быть, расшифровать, а потом ещё походить по его row-группам, посмотреть на repetition level и прочее. Только тогда мы получим колонку значений, которую будет обрабатывать движок. Если вы профилируете, то увидите, что на это тратится значительная доля CPU.
Одно из решений этой проблемы заключается в том, чтобы кэшировать данные на worker-узлах, не вникая в их внутреннюю структуру.
Есть проект Alluxio, который реализует фактически кэширующую файловую систему. Допустим, у вас есть Hadoop, вы с ним работаете, как с файловой системой. Вы можете поставить Alluxio сверху, как обёртку любой файловой системы Hadoop. Alluxio будет брать данные и прозрачно кэшировать их на узлах по границам каких-то своих внутренних блоков или по какому-то фиксированному размеру, но не вдаваясь в подробности, что за данные под ним находятся.
Если мы используем такой подход, то это помогает избегать сетевых вызовов, потому что данные теперь под ногами. Но проблема с декодингом остаётся, потому что мы по-прежнему ничего не понимаем про структуру этих данных.
Чтобы избавиться от декодинга, можно углубиться в структуру конкретных данных и однократно декодировать тот же Parquet/ORC. А уже готовые к употреблению движком вектора сохранить. Этот подход есть в популярном проекте, который раньше назывался Varada, сейчас Starburst Warp Speed. Это израильский стартап, который прозрачно для Trino сохраняет закэшированные данные на worker-узлах уже в декодированном формате.
Так мы получаем следующие преимущества:
Не тратим CPU на декодинг.
Можно добавлять дополнительные структуры данных (например, индексы).
Когда вы понимаете, что именно кэшируете, то можете строить дополнительную структуру данных. Например, индексы и Блум-фильтры. В Varada даже умеют строить индексы Lucene по строкам и сразу вместе с ними сохранять. Или вы можете эти данные отсортировать и только часть из них читать. Но есть и большая проблема. Данные в Parquet/ORC классно сжимаются, а если вы храните просто вектора, то их размер может быть на самом деле очень большим. Для примера, вы берёте TPC-DS 1 терабайт, если сгенерировали данные, он у вас и будет 1 Тб примерно. Но если вы их перегоните в Parquet и сожмёте каким-нибудь Snappy, у вас получится где-то 300 Гб. А потом, если вы их прочитаете и закэшируете просто в лоб, то снова будет 1 Тб, потому что вы потеряли сжатие, всякие оптимизации Parquet и прочее.
Поэтому если вы такие данные храните локально, в идеале их надо еще сжимать, а на это тоже придётся потратить CPU. Но подобрав нормальный алгоритм, можно сэкономить. Например, мы в CedrusData сжимаем данные с помощью Zstd. С теми же расходами CPU сжимается на 20–30% больше, чем при сжатии с помощью Parquet.
Когда вы начинаете кэшировать данные на worker-узлах, то начинаете думать как сделать кэш эффективным. Например, читайте файл с того узла, где он уже закэширован. А если не будете это делать, то после большого количества запусков окажется, что у вас полные датасеты на каждом worker-узле и вам не хватит дисков.
Чтобы запросы приходили на один и тот же узел на чтение одного и того же файла, вам придется снова привязывать compute к storage.
Обычно движки используют стратегии, которые позволяют привязывать сканы конкретных файлов или частей файлов к конкретному worker-узлу. При этом эта привязка не строгая. Например, движок может сказать: если этот worker не загружен, а данные в нём — закэшированы, то отправьте туда запрос. А если worker перегружен, то в другое место. В этом случае мы всё равно пойдём в сетку, и будем декодировать Parquet, но узлы в нём не будут перегружены и другие запросы не пострадают. Поэтому несмотря на то, что проблема совмещения появляется снова, мы не обязаны слепо ей следовать. Мы можем динамически на уровне движка принимать решение, хотим ли использовать закэшированные данные. Но это даёт нам не только гибкость, но и сложности конфигурирования.
Стоит использовать алгоритмы, чтобы маппинг с конкретного файла на узел не поплыл слишком сильно, если у вас изменилась топология кластера. Потому что если мы закэшировали имя файла и взяли остаток от деления, то один узел ушёл, и привязка всех файлов изменилась. Тогда весь кэш становится бесполезным и всё кэшируется заново. А если использовать алгоритмы, в которых даже при изменении количества узлов в кластере привязка файла к узлу остаётся, то тогда такого рода спецэффектов не будет происходить.
Пример таких алгоритмов — Consistent hashing или другие со схожими свойствами. Важно понимать, что совмещение compute и storage в этом случае тоже чревато проблемами.
Кэширование: промежуточные результаты
Можно кэшировать не только файлы, но и промежуточные результаты запросов. Так, например, делает Presto в некоторых случаях.
Допустим, вы сканируете какой-то файл, добавляете фильтрацию, а потом считаете агрегат. Если агрегаты хорошо схлопываются, чтобы всё постоянно не пересчитывать, можно сохранить результат расчёта этого агрегата прямо в памяти.
Для этого берём наш план с уникальной сигнатурой, которая описывает, что план делает. Если есть файл, который мы хотим прочитать, объединяем всё в ключ для кэша. Тогда value этого ключа — то, что получилось после запуска операции. Получается, что вы многократно делаете одни и те же агрегаты, но можете сохранять их в памяти и отдавать готовый результат, даже не сканируя файл и не прогоняя агрегации заново. Конечно, в этом случае тоже нужны перепроверки, что файлы не изменились.
Проблема Presto в том, что она кэширует только очень простые планы. Данные должны быть денормализованные, потому что join не поддерживаются. Но по roadmap продуктов видно, что скоро там появятся кэши такого рода.
Можно кэшировать более сложные части поддеревьев, анализировать нагрузку, которая приходит от пользователей, выделять повторяющиеся части и кэшировать именно их, потому что всё в кэш не влезет.
Но правильно подобрать для них эвристики. Конечно, это не тривиальная задача, но теоретически это возможно.
Кэширование: материализованные представления
В OLTP, как и в обычных КХД можно использовать материализованные вьюшки. У вас под ногами data lake с таблицами. Вы можете прочитать эти данные, в виде материализованных представлений и там же сохранить , чтобы потом использовать. Эта техника позволяет сэкономить на повторяющихся вычислениях. Ещё в data lake можно отслеживать изменения в файлах.
Если вы хотите, чтобы материализованная вьюшка всегда давала актуальные данные, то можете каждый раз перепроверять файлы, на основе которых она была построена: совпадают ли они с файлами текущей таблицы. Если используете табличные форматы вроде data lake или Iceberg, то можете сохранить ID снапшотов Iceberg, по которым была построена материализованная вьюшка. Это позволит перепроверять, что они по-прежнему не изменились. Так можно не только материализовывать вычисления, но и гарантировать, что полученный ответ актуален. Такого рода вещи поддерживаются практически во всех движках типа Trino, Presto, Dremio.