Распределенный SQL в Picodata
Недавно мы выпустили новую значительную версию Picodata — распределенной in-memory СУБД с открытым исходным кодом. Это продукт на основе Tarantool c поддержкой плагинов на Rust и некоторыми другими интересными особенностями, о которых можно почитать в статье Picodata: простое масштабирование Tarantool.
Одно из главных улучшений в новом релизе Picodata 23.12 — возможность выполнять распределенные (кластерные) SQL-запросы непосредственно из консоли Picodata, без дополнительных настроек. Можно управлять глобальными и шардированными таблицами (DDL), модифицировать данные в них (DML) и, разумеется, читать из них (DQL). Также, мы теперь поддерживаем централизованное управление пользователями, ролями и привилегиями на основе списков контроля доступа (ACL), опять же — в рамках всего кластера.
В этой статье я сосредоточусь на нескольких примерах простых SQL-запросов и покажу, как они выполняются для таблиц, распределенных по нескольким шардам. Это позволит лучше понять, как устроены такие таблицы и какие задачи мы решаем для работы с ними.
Что такое распределенный SQL?
Практически все реляционные СУБД позволяют выполнять SQL-запросы локально, в рамках одного узла. Однако, когда таблицы с данными шардированы, т. е. распределены по нескольким узлам, то работать с ними уже сложнее. Шардирование имеет много преимуществ: оно многократно увеличивает возможности СУБД как в плане объема хранимых данных (большие таблицы могут не помещаться на один узел), так и в скорости доступа к ним. Но оно же и порождает ряд задач, которые нужно как-то решать.
В некотором смысле, мы в Picodata «стоим на плечах гигантов», так как вместе с Tarantool мы получили в наследство Vshard — библиотеку шардирования (сегментирования)), которая уже предоставляет некий базовый фреймворк для создания кластерной логики. Vshard позволяет решать конкретные задачи, учитывая предполагаемые запросы к данным и выбранную стратегию шардирования, и для этого можно написать специализированный код. Но это будет императивный подход, так как изначально нужно знать, как именно и откуда потребуется извлечь данные. Это хорошо и полезно, но этого мало.
Зачем нам собственная реализация?
Итак, Vshard абстрагирован от того, какие функции вызывать на конкретном реплицированном узле кластера. Это низкоуровневый инструмент для написания императивного кода под конкретный процесс обработки данных, и мы считаем, что ему не хватает универсальности. С помощью Vshard можно организовать логику, задействующую основные SQL-операции — «прочитать» (SELECT), «записать» (INSERT) и «удалить» (DELETE) — для строк, размещение которых в кластере заранее известно. Но мы решили предложить пользователям нечто большее, а именно декларативный фреймворк, который позволит писать произвольные SQL-запросы с поддержкой более сложных операций, включая соединение, сортировку, группировку, а также возможность комбинировать их, включать в подзапросы и т. д. Всё то, что привычно использовать в локальных SQL-запросах, должно автоматически и прозрачно для пользователя выполняться и распределенно. У нас это получилось: консоль Picodata уже сейчас умеет исполнять распределенный SQL. Не без ограничений и некоторых оговорок, но основная функциональность уже здесь, и ее можно попробовать. Ядром Picodata SQL выступает наша внутренняя библиотека, умеющая решать множество нетривиальных задач: разбивать пользовательский запрос на части, оптимизировать его, выяснять сценарии перемещения данных, определять, на каких репликасетах хранятся нужные части таблицы, консолидировать полученные от узлов хранения промежуточные результаты и возвращать пользователю ответ.
Внутреннее устройство
Основа Picodata SQL — это Rust-библиотека, работающая на каждом узле кластера. Несмотря на то, что в Picodata каждый узел является и роутером, и узлом хранения, мы все же разделяем их по ролям: для нас роутером выступает узел, на который пришел запрос, а в роли storage выступает узел хранения с целевыми данными. На первом строится общий план запроса и координируется его выполнение по частям, на втором — происходит выполнение этих частей (локальных SQL-запросов), полученных от роутера.
Таким образом, задача роутера состоит в обработке исходного SQL-запроса, нарезании его на части, отправки этих частей через RPC на узлы хранения и сбор результата. Под «обработкой» скрывается логика, о которой и пойдет речь ниже.
Сперва мы обрабатываем запрос с помощью библиотеки грамматического разбора, использующей PEG (Parsing Expression Grammar). Результатом разбора является абстрактное синтаксическое дерево (AST), которое подтверждает валидность запроса и его соответствие нашей грамматике, т.е. множеству поддерживаемых SQL-запросов. Рассмотрим его на примере конкретного SQL-запроса.
Синтаксическое дерево
Возьмем для примера простой запрос на чтение данных из таблицы с некоторым условием:
select b from t where a = 1
После синтаксического анализа дерево будет иметь следующий вид:
Дерево синтаксического разбора, в данном случае, выглядит исходно так
Основу AST составляют следующие три компонента:
проекция (Projection)
выборка (Selection)
сканирование (Scan)
Наша задача — превратить это дерево в пригодный для исполнения план запроса (IR, Intermediate Representation). Это происходит в несколько этапов. Сначала мы реорганизуем его узлы для более естественного порядка исполнения и расстановки ссылок (references). На верхнем уровне получается ориентированный граф:
Базовая схема, к которой мы будем добавлять детали
Это основа для будущего IR, который также будет читаться снизу вверх. Здесь, на примере нашего тестового запроса, предполагается такая логика: сначала произойдет сканирование таблицы t
, затем проверится фильтр (a = 1
) и далее мы сформируем проекцию, содержащую лишь искомую колонку b
.
Но на текущем этапе для AST требуется расставить ссылки на реляционные операторы для того, чтобы в дальнейшем знать, в каких операторах искать эти ссылки:
Добавляем ссылки (references)
Например, на схеме видно, что ссылка b
ссылается на результат выборки, полученной ниже по плану и уже включающей в себя ссылку a
.
Также для перед формированием плана запроса нужно выяснить по метаданным запрошенных таблиц, есть ли у них упомянутые в запросе колонки и определить типы данных. Для удобства будем считать, что в нашем примере используется один тип данных.
Формирование плана запроса
Превращение абстрактного дерева в конкретный план запроса (IR) происходит, опять же, в несколько этапов. Сначала требуется реорганизовать узлы. Каждому узлу в AST будет соответствовать несколько узлов плана IR, в котором к каждому родительскому реляционному узлу добавятся expression-узлы. Построение плана происходит снизу вверх, так что сначала формируется часть плана, относящаяся к сканированию:
IR начинается с самого нижнего узла, в данном случае это Scan
Expression-узлы помогают лучше понять семантику, содержащуюся за реляционными узлами. На схеме выше видно, что в сканирование попадают две колонки таблицы t
(а
и b
) посредством узла output tuple, описывающего формат выходных данных. У каждого узла имеется свойство Distribution, которое отвечает за распределение данных и изначально у всех узлов проинициализировано как None. Оно будет вычислено позже, при расстановке motion-узлов (об этом чуть ниже).
Остальные элементы из AST, относящиеся к сканированию, также попадают в план запроса:
Добавляем в план проверку равенства
Имея блок сканирования и относящуюся к нему проверку равенства (a=1
), мы теперь можем объединить их под вышестоящей выборкой (Selection) и показать на плане результат выборки:
Блок выборки из таблицы t сформирован
Оставшаяся часть синтаксического дерева также под конец переносится в план запроса:
Теперь есть и проекция
Как и требуется, в проекцию попадает только колонка b
.
Посмотрим, как план запроса будет выглядеть в собранном виде:
План запроса (IR) в собранном виде
Выполнение такого плана основано на обходе дерева и вычисления бакетов, на которых нужно выполнить его отдельные элементы (локальные SQL-запросы) в определенном порядке. Это требует понимания того, как устроен Vshard и предлагаемый им уровень абстракции для хранения данных.
Шардирование и бакеты
В концепции Vshard между данными и собственно устройством хранения предусмотрена дополнительная виртуальная сущность — «бакет» (bucket). Все данные хранятся в таких пронумерованных бакетах, которые атомарны и локальны; т.е. можно работать с бакетами, не задумываясь о том, на каком физическом узле они находятся. У каждого кортежа в таблице есть свой номер бакета (bucket_id), который можно посчитать, зная ключ шардирования (распределения) таблицы и общее число бакетов. Из каждого кортежа таблицы мы можем извлечь значения колонок шардирования (ключа) и по специальной формуле вычислить bucket_id этого кортежа. Поскольку при DML-операциях используется та же самая формула, мы можем быть уверены в том, что однажды записав (INSERT) через Picodata данные в таблицу, мы сможем гарантированно прочитать их (SELECT), обратившись к тому же bucket_id.
Ключ шардирования таблицы содержит одну или несколько колонок, но важны не их имена, а порядок и используемый тип данных. Поэтому, например, следующие две таблицы имеют одинаковое распределение по первой колонке несмотря на их разные имена (будем считать, что тип данных один и тот же):
Ключ распределения — первая колонка
Посмотрим теперь, как будет выглядеть итоговый план IR для запроса select b from t where a = 1
с учетом распределения таблицы по колонке a
, обозначенного желтыми output-узлами:
Output-узлы показывают, какие части таблицы затронуты в каждом случае
Если действия внутри отдельных реляционных операторов не накладывают каких-либо ограничений на данные (например, Scan "t”
), то предполагается, что данная часть дерева будет выполнена на всех бакетах. Если же ограничение присутствует (a = 1
внутри Selection), то будут задействованы только бакеты, содержащие отфильтрованные строки.
На уровне Selection становится ясно, что нам достаточно использовать набор бакетов, соответствующих фильтру, т.к. выполнение запроса на всех бакетах избыточно.
Верхний узел (Projection) не накладывает собственных ограничений на бакеты, поэтому он использует всё тот же набор, полученный для Selection.
Итак, план запроса готов, и, если для его выполнения не требуется перемещение данных, работа с ним завершена. Но если оно всё же нужно, ситуация становится сложнее, так как в план нужно добавить т.н. motion-узлы. Посмотрим, что это, и для чего они могут понадобиться.
Перемещение данных
Перемещение данных — это подгрузка недостающих частей таблицы с других узлов хранения для того, чтобы результат запроса был корректен. Перемещение данных происходит в тех случаях, когда реляционный оператор пытается обработать таблицы из нескольких дочерних узлов (вне зависимости от того, разные там таблицы или одинаковые) или, если оператор занимается агрегацией (SUM
, COUNT
…). Перемещение данных происходит по следующей схеме:
на роутере собираются запрошенные данные со всех узлов хранения
собранные данные объединяются в виртуальную таблицу с новым распределением (ключом шардирования)
роутер отправляет на узлы хранения только нужные им строки из этой виртуальной таблицы
Таким образом, перемещение обеспечивает корректность выполнения локальных запросов за счет копирования недостающих данных на каждый узел хранения в кластере. Мы различаем полное перемещение (копирование все таблицы) и частичное (копирование только недостающих колонок).
Решение о том, что нам требуется перемещение данных, принимается на стадии доработки IR путем добавления в него motion-узлов. Они добавляются только по необходимости, если запрос действительно требует докачки данных. Такая необходимость возникает, если при обходе дерева IR выясняется, что семантика реляционного узла (Projection, Selection) и распределение его детей не позволяют «выполнить» данный реляционный узел корректно без перекачки данных. То есть, временная таблица, полученная из поддерева под motion-узлом, должна быть перераспределена для вышестоящего реляционного узла.
Так же как и у реляционных узлов, у каждого motion-узла есть признак distribution, который отражает распределение лежащих под ним данных. В основном, мы имеем дело с двумя значениями Distribution:
Distribution::Any
— неизвестное распределениеDistribution::Segment
— распределение по набору колонок (ключу шардирования).
Motion-узлы в плане запроса
Рассмотрим теперь другой пример SQL-запроса, в котором используется подзапрос:
select a from t1 where a in (select b from t2)
На этот раз мы будем учитывать ключ шардирования, который задает разделение таблиц по узлам хранения. Пусть первая таблица распределена по a
, а вторая — по b
. Тогда план запроса в общем виде будет выглядеть так:
Для удобства восприятия обозначим на плане подзапрос (SQ) без лишних деталей
Здесь нам нужно получить все строки колонки a
из t1
, в которых a
совпадает с b
из подзапроса (SQ). Если хотя бы одна строка из SQ лежит не на том же узле, где лежит строчка из t1
с такими значениями колонок, то потребуется перемещение данных. Однако, это не так: строки из таблицы t1
и строки из подзапроса, у которых a
= b
, лежат на одном узле и имеют одинаковые bucket_id, а это значит, что перемещение не требуется.
Но, стоит немного изменить запрос:
select a from t1 where a not in (select b from t2)
,
как ситуация изменится:
Если в запросе есть NOT in, то без пермещения данных не обойтись
Здесь тоже требуется сравнить строки из t1
со всеми строками t2
, но прежнее условие больше не выполняется: часть нужных нам строк может лежать на других узлах.
Перемещение не обязательно бывает всегда полным. Если мы вернёмся к примеру select a from t1 where a in (select b from t2)
, но предположим, что t2
шардирована по другой колонке (например, c
), то перемещение будет частичным, а значение distribution для проекции будет Any:
Ключ таблицы не совпадаетс запрошенной колонкой, а это значит, что требуется частичное перемещение
На каждый узел будут перемещены только те данные, которые удовлетворяют равенству (условию подзапроса).
Выполнение запроса с перемещением данных
Наличие motion-узлов разбивает общее дерево IR на поддеревья. При выполнении распределенного запроса наша библиотека будет обходить их снизу вверх. Сначала она выясняет, на каких бакетах нужно выполнить поддерево. Затем для каждого поддерева будут выполняться функции map и reduce:
Промежуточный запрос с локальным SQL отправляется на несколько узлов (map)
Результаты предыдущего шага объединяются с помощью функции reduce.
Результат выполнения поддерева в виде временной таблицы линкуется к таблице вышестоящего узла:
Схема выполнения поддерева запроса
Соответственно, результат нескольких таких поддеревьях в итоге формирует родительскую таблицу:
Объединение результатов от нескольких поддеревьев
Если между поддеревьями нет никаких связей (например, references), то они могут выполняться параллельно. При отсутствии в IR motion-узлов, Picodata выполнит весь запрос как одно дерево, не разбивая его на части.
Запросы на изменение данных
В примерах выше мы показывали разбор SELECT, однако мы поддерживаем также и DML-запросы в масштабе кластера. Отличие состоит в том, что в таком случае мы не используем локальный SQL, а вместо этого задействуем Tarantool API c помощью нашего Rust-крейта для Tarantool.
Для DML-запросов точно так же нужны motion-узлы для докачки недостающих частей таблицы, и точно так же это зависит от того, какое распределение у детей реляционного узла в каждом случае. Например, мы хотим вставить в таблицу t1
результат читающего запроса из t2
:
insert into t1 select * from t2
Посмотрим на IR:
Пример плана запроса для вставки данных
Так как таблица t2
распределена по второй колонке (d
), потребуется частичное перемещение. Поддерево motion-узла после исполнения сформирует промежуточную таблицу с колонками c
и d
, а роутер для вычисления bucket_id перешардирует ее по колонке c
. В соответствие с bucket_id части этой таблицы отправятся на соответствующие узлы хранения, и лишь после этого можно будет выполнить INSERT. Таким образом, Motion(Segment(c))
отражает новый ключ распределения для временной таблицы для данного поддерева.
Исполнение плана на узлах хранения
Описанные выше действия относятся к работе роутера, который маршрутизирует выполнение запроса. Части запроса рассылаются по RPC на узлы хранения в виде плана, закодированного в бинарном виде. На storage присланное сообщение декодируется в IR и затем десериализуется обратно в SQL:
Исполнение фрагмента плана на узле хранения
Для повышения скорости процесс десериализации использует кэш локального Tarantool.
Вопросы производительности
Получение данных из отдельных шардов неизбежно означает, что в распределенном SQL в принципе невозможно достичь скоростей, характерных для локальных БД. Но мы делаем всё возможное, чтобы исправить «узкие места» там, где это получается сделать, и задать разумные ограничения в остальных случаях.
Мы активно кэшируем запросы, используя параметризацию и шаблоны SQL. Смысл здесь в том, что однажды разобранное внутреннее представление с проверенными метаданными можно переиспользовать: взять из кэша, подставить в него новые константы и сэкономить таким образом на разборе IR каждый раз с нуля.
Например, используя для запроса Lua-обертку, можно делать параметризированные запросы, вроде такого:
pico.sql([[select "name" from "characters" where "id" = ?]], {1})
Следующий подобный запрос, в котором вместо 1
будет другое значение, сможет использовать ранее кэшированный IR. Учитывая то, что компиляция IR>SQL на узлах хранения обходится дорого, кэш очень выручает.
Продолжая тему производительности, нельзя обойти стороной и другую проблему, которая неизбежно возникает при эксплуатации БД: сбор из шардов очень больших объёмов данных. Достаточно сказать, что по естественным причинам разбор массивных запросов в принципе не сулит хорошей производительности. Узкими местами являются как глубокая однопоточность самого Tarantool, так и небесконечность ОЗУ. Чтобы обеспечить хотя бы базовую защиту от подобного DdoS, мы в Picodata добавили ограничения на количество кортежей во временной таблице и операций при исполнении запроса на storage. Благодаря этому, наш распределенный SQL научился вежливо отказываться от непомерно больших запросов/результатов вместо того, чтобы падать.
Мы рекомендуем использовать наш распределенный SQL с запросами разумного размера, особенно, если промежуточные данные помещаются в кэш отдельных узлов, и в целом для запросов с низкой селективностью. Особенно хорошо Picodata работает в тех случаях, когда запрос «попадает» в ключи шардирования и избавляет нас от необходимости обходить все узлы хранения.
Что ещё
Мы стремимся к тому, чтобы сделать распределенный SQL в Picodata максимально дружелюбным и похожим на локальный. Но в нашем недавнем релизе есть и другие интересные нововведения: управление привилегиями и ролями (кластерный ACL), разные способы ведения журнала аудита и даже webui, который уже сейчас очень удобен для мониторинга кластера.
У нас есть «джентльменский набор» для быстрого старта: неплохой выбор пакетов под разные дистрибутивы Linux и сайт документации, где есть справочники и руководства по Picodata.
Мы всегда рады обратной связи! Попробуйте Picodata в деле и поделитесь вашим мнением в нашем Telegram-канале!