Коннектор ADB-TO-ADB

Foreign Data Wrapper коннектор ADB-TO-ADB

Foreign Data Wrapper коннектор ADB-TO-ADB

Введение

По опыту нашей продуктовой команды разработки и поддержки — пользователи, оперирующие большими объемами данных компании, часто используют несколько разрозненных кластеров Greenplum. 

Мотивация такого решения может быть разной: организационная — разные команды-владельцы бизнес-данных выстраивают свои модели данных, обрабатывают их нужным для них образом; техническая — распределенные по различным дата-центрам кластеры и т.п. Однако рано или поздно возникает задача использовать данные из «соседних» хранилищ. Это могут быть как разовые сценарии единичных запросов, так и организация более сложных ETL-процессов. Реализация подобных механизмов опять-таки может быть различной со своими достоинствами и недостатками, исходя из существующих возможностей и ограничений.

В этой статье рассматриваются детали предлагаемой нами реализации коннектора для выполнения так называемых гетерогенных запросов в рамках разных кластеров Arenadata DB и/или Greenplum — задача, которой наша команда разработки занималась в 2023 году. Этот коннектор позволяет объединять в запросах разные кластеры ADB, но при этом пользоваться возможностями установки соединений между сегментами. 

Но обо всём по порядку.

Существующие подходы и решения

Суть предоставления возможности выполнения гетерогенных запросов — дать пользователю инструментарий для осуществления доступа к данным таким образом, чтобы с точки зрения использования данных создавалось впечатление, будто бы они размещены в той же системе управления данными. Причём делать это, по возможности, максимально эффективно и прозрачно.

Какой же инструментарий есть в нашем распоряжении?

Platform Extension Framework (PXF)

Первое, что приходит на ум — это распространённый среди пользователей PXF-сервис, фреймворк для объединения разных источников данных.

К его достоинствами можно отнести:

  1. Проверенное временем решение на базе открытого исходного кода c возможностью доработки под свои потребности.

  2. Доступный «из коробки» набор коннекторов к популярным источникам данных (Hadoop-стек, доступные через JDBC источники данных, облачные хранилища).

  3. Возможность настройки мониторинга.

Но можно выделить и ряд недостатков:

  1. Необходимость поддерживать обособленное решение на базе своего стека.

  2. Выделение ресурсов, как правило, на тех же серверах, где развернута сама СУБД.

  3. Множественное преобразование и перекладывание одних и тех же данных на пути от представления в СУБД к типам, которыми оперирует сам PXF. 

Расширение postgres_fdw

Расширение postgres_fdw — фундамент или, можно сказать, источник вдохновения многих других существующих коннекторов.

Коннектор физически представляет собой расширение PostgreSQL, в случае Greenplum устанавливаемый на мастер-хост.

Изначально расширение ориентировано на работу с удалённым PostgreSQL, доработанное в рамках апстрима Greenplum для функционирования в распределённой СУБД. Взаимодействие организовано через стандартный протокол и библиотеку libpq, что позволяет прозрачно работать и с Greenplum (разумеется, в стиле «мастер-мастер»).

Взаимодействие вида «мастер-мастер» в postgres_fdw

Взаимодействие вида «мастер-мастер» в postgres_fdw

На схеме выше локальный кластер (обозначенный синим цветом) представлен мастером и парой сегментов, удаленный кластер — мастером (обозначен зеленым).

Для SELECT-запроса схематично процесс взаимодействия выглядит примерно так:

  1. Пользователь формирует SELECT-запрос.

  2. Запрос проходит стандартные этапы парсинга, анализа и планирования.

  3. Расширение устанавливает соединение с удалённым мастером, открывает транзакцию и отправляет запрос (с учётом некоторых нюансов, что не все условия, к примеру, можно отправить на удалённый кластер). 

  4. Обрабатывается ответ, и либо он возвращается пользователю, либо, в случае более сложных запросов, результат обрабатывается как-то иначе.

Отличительной особенностью этой схемы является работа в стиле «мастер-мастер». Сегменты в этом случае никак не задействованы.

В случае INSERT-запросов всё так же идёт через мастер, но при этом задействуется механизм prepared-выражений.

Обработка INSERT в postgres_fdw

Обработка INSERT в postgres_fdw

В чём плюсы для тех, кто решит воспользоваться для организации связи кластера расширением postgres_fdw:

  1. Решение с открытым исходным кодом — собрал и поставил.

  2. Работа в стиле «мастер-мастер» стабильна. Далее я поясню на примере greenplum_fdw, что под этим понимается.

  3. Свежие версии коннектора, которые поддерживают новый интерфейс FDW, умеют частично поддерживать push-down более сложных конструкций в виде соединений и т.п. Этой теме я уделю время в конце статьи.

При этом есть очевидные минусы:

  1. Поддержка параллельной работы (имеется ввиду взаимодействие сегментов между собой напрямую) отсутствует.

  2. Если попробовать вызвать DELETE или UPDATE для таблицы на удалённом кластере, то коннектор будет возвращать ошибки.

Расширение greenplum_fdw

От postgres_fdw перейдём к тяжёлой артиллерии — к greemplum_fdw, входящему в платную enterprise-версию Greenplum. И тут я хочу отметить, что в деле удалось поисследовать только бета-версию, и, возможно, какие-то проблемы, присущие бете, были исправлены в последующих версиях.

Что же не так с greenplum_fdw?

Первое неудобство — это жёсткое задание количества обработчиков. К примеру, локальный кластер развёрнут на 3 primary-сегментах.

Состав локального кластера

Состав локального кластера

Удаленный (remote) Greenplum-кластер при этом не совпадает по количеству сегментов — там их 4.

Состав удалённого кластера

Состав удалённого кластера

Если при объявлении внешней таблицы на локальном кластере не задать нужное количество обработчиков (либо установить его равным такому числу, которое не совпадает с числом ремоут-кластера), то запрос завершается ошибкой.

Ошибка при несоответствии количества сегментов

Ошибка при несоответствии количества сегментов

Задать нужное количество обработчиков — проблема сама по себе не такая большая. Хотя запрашивать у владельцев другого кластера эту информацию не всегда может быть удобным. Хуже, если конфигурация кластера поменяется и не будет произведено переконфигурирование — запросы перестанут работать до момента, пока на локальном кластере не вызвать ALTER SERVER и не настроить нужное число обработчиков.

Ещё более неприятная ситуация может возникнуть в случае, если мы захотим перенести какие-то данные с локального кластера на удалённый путём вставки в foreign-таблицу.

Допустим, на локальном кластере есть таблица, в которой 1100 записей, и мы запрашиваем вставку в таблицу на удалённом кластере.

Для такого запроса вставки в foreign-таблицу планировщик создаст план примерно следующего вида, в котором внизу расположен узел плана Seq Scan (последовательное чтение данных из таблицы), который получает данные локальной таблицы. Далее выполняется Redistribute Motion  данных между сегментами, следующим шагом идёт вставка с каждого сегмента по отдельности.

План выполнения для INSERT

План выполнения для INSERT

В целях эксперимента добавим отображение (сообщение уровня NOTICE) шага коммита транзакции на уровне сегментов.

NOTICE-сообщения отображают событие коммита транзакции для данного сегмента

NOTICE-сообщения отображают событие коммита транзакции для данного сегмента

Как мы видим ниже, с двух сегментов базе данных удалось закоммитить транзакцию, но с третьей произошла некая ошибочная ситуация.

Транзакция для процесса pid=114235 получила ошибку

Транзакция для процесса pid=114235 получила ошибку

В то время, как мы делали один INSERT, который по ожиданиям пользователя реляционной СУБД с поддержкой ACID должен быть атомарным, мы получаем 742 записи в удалённой таблице.

Нарушение принципа атомарности в greenplum_fdw

Нарушение принципа атомарности в greenplum_fdw

Предлагаю разобраться, в чём причина такой ситуации.

Смоделируем её следующим образом. В нашем случае у нас есть локальный кластер из 3 сегментов + мастер и удалённый кластер из мастера и 2 сегментов.

Схема INSERT для сценария с ошибкой вставки в greenplum_fdw

Схема INSERT для сценария с ошибкой вставки в greenplum_fdw

  1. От пользователя приходит INSERT -запрос.

  2. Он проходит стадии обработки запроса, устанавливает соединение и открывает отдельные транзакции с сегментов.

  3. Эти транзакции работают с мастером на удалённом кластере.

Тут важно понимать, что такая схема заложена в архитектуру postgres_fdw, поддерживающего пул соединений, в рамках которых при установлении соединения открывается транзакция. Она же и коммитится отдельно, не имея информации о статусе соседних. В какой-то момент одна из транзакций получает ошибку — и мы получаем приведённый выше результат.

Как итог, к достоинствам бета-версии greenplum_fdw можно отнести поддержку SELECT в MPP-стиле, но возможные проблемы с консистентностью данных при вставке ставят под сомнения возможность использования коннектора как двунаправленного. 

При этом у пользователя нет возможности управлять этими рисками: при выборе для вставки стратегии «мастер к мастеру» для SELECT-запросов также будет аналогичная стратегия. Это сведёт на нет выгоду от использования коннектора, поддерживающего параллельную работу через сегменты.

Также SELECT-запросы довольно жёстко привязаны к конфигурациям по числу сегментов.

Parallel Retrieve Cursors

Для понимания внутреннего устройства adb_fdw-коннектора нужно рассмотреть лежащий в его основе механизм параллельных курсоров. Курсоры, как известно, используются для получения данных построчно, в процедурном стиле. Параллельные курсоры в этом плане ничем не отличаются от своих собратьев.  Они объявляются на координаторе запросов (в терминах Greenplum — GP_ROLE_DISPATCH), однако эндпойнты (о них далее) создаются на сегментах. Их размещение зависит от запроса, а точнее, от результата его планирования. Ещё важно добавить, что для обслуживания эндпойнта — той точки, через которую происходит обмен данными — поднимается отдельный процесс бэкенда.

Рассмотрим три простых примера объявления параллельного курсора, которые приводят к разным конфигурациям размещения эндпойнтов.

Получение данных с мастера

К такой конфигурации эндпойнтов приводит необходимость собрать данные на мастере, например, как в данном случае для сортировки (ORDER BY C1).

Сбор данных на мастере

Сбор данных на мастере

Получение данных с одного из сегментов

Второй вариант — это получение данных от какого-то одного сегмента. Например, в случае если планировщик определяет, что в соответствии с ключом дистрибуции нужно задиспатчить план на какой-то определенный сегмент (WHERE C1=1).

Получение данных с одного из сегментов напрямую

Получение данных с одного из сегментов напрямую

Получение данных от всех сегментов

И третий случай — это получение данных от всех сегментов. Например, получение всех колонок без какой-либо фильтрации WHERE-условиями. Эндпойнты поднимаются на каждом сегменте, и данные могут быть возвращены напрямую с сегментов.

Получение данных со всех сегментов напрямую

Получение данных со всех сегментов напрямую

Чуть подробнее погрузимся в механику процесса эндпойнтов на сегментах. Например, у нас есть пользовательская сессия, в рамках которой на мастер пришел запрос. Он проходит обычную стадию парсинга и планирования.

Как устроены эндпойнты параллельных курсоров

Как устроены эндпойнты параллельных курсоров

Следующим шагом план или какая-то часть плана диспетчеризируется на сегмент и обрабатывается в рамках процесса бэкенда.

В нашем случае, так как это запрос на создание параллельного курсора для лежащего в его основе SELECT-запроса, бэкенд создаёт необходимую обвязку для обслуживания канала получения данных клиентом.

Для этого, в частности, в общей памяти поднимается очередь сообщений для обмена данным с тем самым отдельным процессом бэкенда, который используется клиентом для получения данных.  С этой целью клиент поднимает специальное utility-соединение, в рамках которого допускается только извлечение данных параллельного курсора. На схеме выше это соединение обозначено настройкой gp_retrieve_conn = true .

Именно этот механизм лежит в основе adb_fdw-коннектора.

Коннектор ADB-TO-ADB (adb_fdw)

Для создания конкурентных преимуществ на текущем этапе развития  в нашем adb_fdw-коннекторе были реализованы некоторые альтернативные технические решения.

Для начала посмотрим на него с пользовательской точки зрения.

Точкой входа во внешний кластер выступает определение SERVER — это мастер удалённого кластера. Ключевое слово mpp_execute, заданное как master, не означает, что все преимущества MPP для выполнения SELECT-запросов  сведены на нет. Наш коннектор сам определяет нужную конфигурацию обработчиков. Значение master необходимо, чтобы вставка работала только в режиме «мастер-мастер».

Также можно задать число обработчиков либо запросить коннектор самостоятельно определять число обработчиков на основе планирования запроса на удалённом кластере. Что является довольно удобным.

Далее, как обычно для FDW-коннекторов, задаётся пользовательский маппинг, и объявляется внешняя таблица.

Объявление SERVER, FOREIGN TABLE и пользовательского маппинга

Объявление SERVER, FOREIGN TABLE и пользовательского маппинга

В чем особенности работы нашего коннектора?

SELECT-запросы

Обработка SELECT-запросов в adb_fdw

Обработка SELECT-запросов в adb_fdw

Всё та же конфигурация кластера: 3 локальных сегмента и 2 удалённых. Пользователь делает запрос на SELECT трёх колонок без фильтров. Запрос в виде создания параллельного курсора прилетает на удалённый мастер. Удалённый мастер создаёт эндпойнты и возвращает локальному мастеру при вызове им функции gp_get_endpoints() служебную информацию по этим эндпойнтам.

Так как мы получили информацию, что эндпойнтов два, то нам будет достаточно двух обработчиков, которые и поднимаются в виде процессов бэкенда на локальном кластере (белые прямоугольники внутри зелёных на схеме выше). Сегменты поднимают служебное соединение — так называемую retrieve-сессию — и начинают параллельно получать данные напрямую с сегментов, отдавая их мастеру, так как в нашем случае этого требует запрос.

INSERT-запросы

На текущий момент не существует надёжного механизма, чтобы обеспечить целостность вставки в режиме параллельной работы коннектора. В будущих версиях ядра можно ожидать такую поддержку, но готового механизма всё ещё нет.

Обработка INSERT-запросов в adb_fdw

Обработка INSERT-запросов в adb_fdw

В нашем коннекторе целостность поставлена во главу угла — поддерживается вставка в стиле postgres_fdw в режиме «мастер-мастер».

Дальнейшие планы

В свежих версиях PostgreSQL и, как следствие, в Greenplum 7 интерфейс доступа к внешним источникам данных FDW расширился как несколькими новыми функциями, так и новыми параметрами существующих функций.

Суть этих изменений — в предоставлении возможности поддержки push-down как соединений, так и агрегатных функций. Это выводит подобные коннекторы на новый уровень в плане возможности строить более оптимальные планы, сокращать объёмы передаваемых данных и, в целом, ускорять выполнение запросов. 

Рассмотрение этой интересной и объёмной темы — предмет отдельной статьи. Сейчас хочется вкратце затронуть тему сложностей и подводных камней этих ожидаемых фич, которые мы планируем реализовать в следующих версиях коннектора в рамках Arenadata DB версии 7X.

Ограничения предикатов

Не все предикаты можно передать на удалённый кластер. Есть ограничения по возможности передачи выражения с заданными правилами сортировки (COLLATION). Мотивацией такого типа ограничений является то, что на удалённом кластере нельзя рассчитывать на такое же окружение, чтобы правила сортировки «не по умолчанию» сработали именно так, как на локальном кластере.

При явно заданных правилах сортировки (т.е. отличных от default collation с OID = 100) для отправки на удалённый кластер допускаются только выражения, которые связаны с объявлением для внешней таблицы (FOREIGN TABLE). Таким образом, если правило сортировки для колонки явно задано при объявлении внешней таблицы, то это условие считается безопасным к отправке условием. Напротив, если для выражения выводится некоторое правило сортировки, отличное от default, то это считается небезопасным и обрабатывается локально. Например, для данного выражения сравнения колонки внешней таблицы с константой, для которой задано правило сортировки, фильтр применяется локально, уже после вычитывания всей таблицы:

adb=# explain select count(c1) from foreign_table as ft where ft.c1 like 'foo' COLLATE "POSIX";
                            QUERY PLAN                             
-------------------------------------------------------------------
 Aggregate  (cost=3897.97..3897.98 rows=1 width=8)
   ->  Foreign Scan on ft  (cost=100.00..3897.72 rows=100 width=16)
         Filter: (c1 ~~ 'foo'::text COLLATE "POSIX")
 Optimizer: Postgres-based planner
(4 rows)

В случае выведения правила по умолчанию происходит push-down на удаленный кластер:

adb=# explain select count(ft.c1) from foreign_table ft where ft.c1 like 'foo';
                      QUERY PLAN                      
------------------------------------------------------
 Foreign Scan  (cost=1894.20..1894.23 rows=1 width=8)
   Relations: Aggregate on (public.ft)
 Optimizer: Postgres-based planner
(3 rows)

Также не допускаются к отправке на удалённый кластер выражения с волатильными функциями (VOLATILE). Простейшие примеры функций:  random() или now(). Для таких запросов условие обрабатывается локально:

adb=# explain select * from foreign_table as ft where ft.c1 >= random();
                          QUERY PLAN                           
---------------------------------------------------------------
 Foreign Scan on ft  (cost=100.00..4398.60 rows=33392 width=16)
   Filter: ((c1)::double precision >= random())
 Optimizer: Postgres-based planner

Joins push-down

Возможность передать задачу соединений таблиц во внешний кластер (так называемый join push-down), если они фигурируют в запросе, выглядит крайне полезной. Для начала по основным ограничениям:

  1. Базовое требование — таблицы должны быть связаны с одним и тем же внешним сервером (имеется в виду описание SERVER).

  2. Join push-down на текущий момент поддерживается только для внутренних (JOIN_INNER), левых и правых внешних соединений (JOIN_LEFT,  JOIN_RIGHT), а также полных соединений (JOIN_FULL). Дальнейшее развитие предполагает SEMI- и ANTI- соединения для поддержки SQL-конструкций вида NOT IN и ANY.

  3. Внутренняя и внешняя таблицы соединения должны позволять передавать предикаты соединения, предикаты-фильтры выборки во внешний кластер. Если предикат не может быть передан во внешний кластер, то push-down невозможен. Например, тут push-down возможен:

adb=# explain select count(*) from ft_a inner join ft_b on ft_a.id = ft_b.id where ft_a.ts >= current_date::timestamp;
                          QUERY PLAN                          
--------------------------------------------------------------
 Foreign Scan  (cost=350.53..350.56 rows=1 width=8)
   Relations: Aggregate on ((public.ft_a) INNER JOIN (public.ft_b))
 Optimizer: Postgres-based planner
(3 rows)

В данном случае выражение ft_a.ts >= current_date::timestamp может быть преобразовано в константу, передано в таком виде на удалённый кластер и выступать условием выборки.

Однако в случае такого запроса условие ft_a.ts >= current_timestamp не может быть передано, так как функция current_timestamp не является IMMUTABLE (точнее, является STABLE). Что приводит к двум отдельным Foreign Scan, локальному соединению и применению условия (Filter) для Foreign Scan:

adb=# explain select count(*) from ft_a inner join ft_b on ft_a.id = ft_b.id where ft_a.ts >= current_timestamp;
                                       QUERY PLAN                                        
-----------------------------------------------------------------------------------------
 Aggregate  (cost=664.92..664.93 rows=1 width=8)
   ->  Hash Join  (cost=239.04..658.24 rows=2670 width=0)
         Hash Cond: (ft_a.id = ft_b.id)
         ->  Foreign Scan on ft_a  (cost=100.00..480.00 rows=3333 width=12)
               Filter: (ts >= '2024-04-07 16:49:38.585287+03'::timestamp with time zone)
         ->  Hash  (cost=129.03..129.03 rows=801 width=4)
               ->  Foreign Scan on ft_b  (cost=100.00..129.03 rows=801 width=4)
 Optimizer: Postgres-based planner
(8 rows)

Aggregates push-down

Для того чтобы агрегатная функция могла быть передана для выполнения на удалённом кластере, должно выполняться несколько условий:

  • Если в выборке используется предикат, который не может быть отправлен на удалённый кластер, то такая агрегация может быть осуществлена только локально, так как условие предиката должно быть применено только в процессе выборки, а не после. Например, условие ft_a.id >= random() не позволяет сделать push-down агрегатной функции count(*), в то время как ft_a.ts >= current_date::timestamp позволяет.

  • Это должна быть либо простая агрегатная функция, например count,  min,  max и т.п., либо функция, поддерживающая частичную агрегацию (partial aggregation). Также с точки зрения контекста выполнения функции частичной агрегации это должна быть начальная фаза частичной агрегации.

  • Для второго случая частичной агрегации должны выполняться условия:  

    • Агрегатная функция не должна быть сортирующей (Ordered-Set Aggregate Functions не поддерживаются). В эту категорию попадают процентили (percentile), так как для их вычисления требуется упорядоченный набор значений.

    • Для агрегатной функции должно отсутствовать определение финальной функции (aggfinalfn): таким образом её применение не потребует фазы вычисления результирующего значения.

    • Если все-таки aggfinalfn объявлена вместе с функцией aggcombinefn, то поддерживается только определенный набор таких функций (в основном это вариации avg и sum).

  • Также не поддерживаются выражения c GROUPING SETS.

В чем же смысл подобных ограничений? Для распределённых СУБД вычисление простых агрегатов не представляет особых сложностей. Отдельные значения можно подсчитать на сегментах и в результирующей функции произвести финальный подсчёт. Несложно прикинуть такое разбиение на подзадачи для min,  max,  count и других подобных функций.

С функцией avg уже интереснее: для avg при возврате данных от сегмента помимо вычисленного среднего значения требуется и общее число строк, для которых это среднее было рассчитано. Только так финальная функция сможет рассчитать итоговое значение. По этой причине avg может быть диспетчеризирована на сегменты в виде array[count(), sum()]-выражения (для чисел с плавающей точкой выражение выглядит чуть сложнее).

Итоги

По сравнению с конкурирующими решениями, у коннектора adb_fdw есть ряд преимуществ, которые выгодно отличают его от рассмотренных альтернатив:

  1. Как и greenplum_fdw, наше решение поддерживает параллельную работу через сегменты в SELECT-запросах там, где запросы это позволяют делать, но осуществляет это в более удобном для пользователя варианте: отсутствует необходимость отслеживать число обработчиков.

  2. Взаимодействие «мастер-мастер» при INSERT-запросах позволяет нам не натыкаться на те проблемы целостности данных, которые могут встречаться в greenplum_fdw.

  3. Также в adb-fdw более аккуратно обрабатывается ситуация с отсутствием на текущий момент поддержки DELETE и UPDATE-запросов.

Внедрение поддержи push-down функций агрегатов и соединений в следующей версии коннектора adb_fdw ещё больше расширит области его применения, сократит объёмы передаваемых данных и увеличит скорость взаимодействия в наиболее распространённых сценариях использования.

© Habrahabr.ru