Создание баз данных за выходные
В этой статье я расскажу вам, как использовать DataFusion для создания собственного опыта разработки баз данных.
Базы данных являются одними из самых сложных частей программного обеспечения, задуманных с момента появления вычислительной эры более полувека назад. [1] Почти каждая часть технологии в конечном итоге касается базы данных в той или иной форме. Несмотря на повсеместность баз данных в программном стеке, подавляющее большинство разработчиков были приучены относиться к базам данных как к более или менее черному ящику — сложным плотным чарам программного обеспечения, созданным волшебниками и знатоками, укрывшимися в элитных рядах компаний, занимающихся базами данных, или в таких местах, как Google. Поговорка для остальных из нас, как она есть, — никогда не пытайтесь написать свою собственную базу данных .
Тем не менее, несмотря на их долговечность, мы наблюдаем постоянные инновации в этой области, которая впервые началась с появлением Hadoop около 2 десятилетий назад. На сайте ClickBench теперь перечислено более 50 баз данных в его наборе тестов [2]. И это только аналитические движки. С учетом последних тенденций переписывания всех больших данных на Rust [3] не проходит и месяца, чтобы интересный новый проект не оказался в тренде Hacker News. В этой статье мы рассмотрим, насколько легко (или сложно) создавать базы данных с помощью Apache Datafusion и можете ли вы, будучи простым смертным, на самом деле реально создать собственную базу данных и внедрить инновации вокруг опыта разработчика.
Большинство современных баз данных можно разбить на вычислительные и хранилищные слои, со сложными механизмами запросов, отвечающими за «вычислительную» часть базы данных. Механизм запросов обычно состоит из анализатора запросов, генерации логического плана и затем генерации физического плана для запуска вычислений на механизме выполнения. Запрос обычно проходит через несколько фаз оптимизации при генерации логического плана, а также генерации физического плана. Независимо от того, каков целевой вариант использования конечной системы, механизм запросов более или менее следует этой модели.
Учитывая десятилетия исследований баз данных, которые были проведены в каждом из этих отдельных слоев, планка для написания функционального движка запросов с функциями ставок таблиц остается поразительно высокой. И вам нужно прибить все это, прежде чем вы сможете приступить к написанию функций, специфичных для вашего варианта использования. Хотя существует множество проектов, которые помогают вам писать некоторые из этих слоев по отдельности, Apache DataFusion остается единственной игрой в городе, которая помогает вам со всем спектром.
Вы можете подумать о DataFusion extensible database development toolkit. На самом базовом уровне вы можете использовать его как механизм запросов a la DuckDB с его встроенными фронтендами SQL и Dataframe, в то же время вы можете расширять или даже полностью заменять различные слои, чтобы полностью построить свой собственный опыт.
В оставшейся части этой статьи мы рассмотрим, как расширить DataFusion, добавив собственные операторы в его механизм выполнения, а затем пропустить его через Физический и Логический планировщики и представить его на внешнем интерфейсе.
Создание топового DataFusion
Архитектура DataFusion
В Denormalized мы создаем Duck DB, как одноузловой опыт для приложений потоковой обработки. Хотя DataFusion и поддерживает неограниченные вычисления, у него нет потокового оконного оператора. Windows лежит в основе приложений потоковой обработки, они предоставляют простой способ объединения бесконечных потоков данных в конечные блоки, чтобы мы могли применять к ним агрегации.
Для этого урока мы реализуем простой оконный оператор для бесконечных потоков. Наш оператор будет иметь следующую сигнатуру --
pub fn window(
self
group_expr: Vec
Написание плана выполнения
ExecutionPlan представляет собой узел в физическом плане DataFusion. Это то, куда будет помещен фактический код с нашими пользовательскими вычислениями. Модель выполнения DataFusions основана на pull, что означает, что выполнение начинается с приемников и продвигается вверх по физическому плану. Вызов метода execute для этого признака создает асинхронный SendableRecordBatchStream пакетов записей путем инкрементного получения раздела выходных данных путем выполнения вычислений над входными данными Execution Plan.
В нашем случае использования метод execute () ExecutionPlan возвращает struct GroupedWindowAggStream , который реализует RecordBatchStream, обертку вокруг свойства futures: Stream. Фактические вычисления должны быть реализованы в poll_next () реализации Stream.
impl RecordBatchStream for GroupedWindowAggStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for GroupedWindowAggStream {
type Item = Result
В нашем примере вызов poll_next_inner для окна потоковой передачи решает эту проблему.
Обработка входящих данных.
Накопление рядов в открытых окнах.
Обновление водяного знака (который находится за мьютексом)
Открытие новых окон при необходимости.
Закрытие всех запускающих окон и создание из них выходных RecordBatches.
Подключение к Physical Planner
Создав наш собственный Execution Plan, нам нужно сообщить Physical Planner о его существовании. Реализация ExtensionPlanner для нашего ExtensionPlan — это все, что нам нужно сделать здесь.
Расширение логического плана
Теперь, когда у нас есть реализованный вместе с ExtensionPlanner пользовательский план выполнения, нам нужно добавить сопутствующий узел в логический план. Это не только позволяет нам выставлять его на фронтенды SQL/DataFrame, но и подключаться к логическим оптимизаторам для оптимизаций, таких как выталкивание предикатов.
В DataFusion нам сначала необходимо реализовать определяемый пользователем узел логического плана , а затем добавить LogicalPlanExtension в конструктор логического плана, который предоставляет его интерфейсам SQL/DataFrame.
Логический план к физическому плану
Последняя часть головоломки — это точка соприкосновения, где логический план преобразуется в физический план. Для этого мы реализуем пользовательский QueryPlanner , который гарантирует, что физический планировщик инициализируется с пользовательскими расширениями, которые мы написали для нашего ExecutionPlan.
Пользовательские правила оптимизации
Поскольку наш оператор реализовал группу путем агрегации, нам нужно гарантировать, что все строки для определенной группы окажутся в одном разделе. Для этого мы добавим новое физическое правило оптимизации , чтобы добавить оператор HashPartition к нашим групповым ключам.
Собираем все вместе
Наконец, все, что нам нужно сделать, это создать сеанс DataFusion с помощью пользовательского QueryPlanner, который мы написали выше, а также с дополнительным правилом физического оптимизатора, которое мы добавили, и вуаля, теперь у нас есть собственная расширенная версия DataFusion.
let state = SessionStateBuilder::new() .with_default_features() .with_config(config) .with_query_planner(Arc::new(StreamingQueryPlanner {})) .with_optimizer_rules(get_default_optimizer_rules()).with_physical_optimizer_rule(Arc::new(EnsureHashPartititionOnGroupByForStreamingAggregates::new(), )) .build();