[Из песочницы] Логическая репликация из PostgreSQL в Erlang
Довольно типичная схема при разработке системы, когда основная логика обработки сосредоточена в приложении (в нашем случае Erlang), а данные для работы этого приложения (настройки, профили пользователей и т. д.) в базе данных (PostgreSQL). Приложение Erlang кэширует настройки в ETS для ускорения обработки и снижения нагрузки на БД путём отказа от постоянных запросов. При этом изменение этих данных происходит через отдельный (возможно, внешний) сервис.
В таких ситуациях встаёт задача поддержания закэшированных данных в актуальном состоянии. Есть разные подходы для решения этой задачи. Один из них — это логическая репликация PostgreSQL. О нем и пойдёт речь ниже.
Логическая репликация использует протокол потоковой репликации PostgreSQL для получения изменения данных в таблицах PostgreSQL путём чтения WAL логов, фильтрации нужный таблиц и отправки этих изменений подписчику. Этот механизм аналогичный тому, который используется для физической репликации для создания standby БД.
Логическая репликация предоставляет следующие преимущества:
- получение изменения без задержек в реальном времени;
- фильтрация получаемых изменений по таблицам и операциям (INSERT/DELETE/UPDATE);
- полнота и целостность данных, получаемых подписчиком. Подписчик получает изменения в том же порядке, как они происходили в БД;
- нет потери данных в случае временной остановки подписчика. PostgreSQL запоминает, где остановилась репликация;
Для работы с логической репликацией необходим плагин, который декодирует WAL записи от сервера в более удобный формат.
До версии PostgreSQL 10 можно использовать расширение/extension pglogical_output plugin.
Начиная с PostgreSQL 10 pgoutput plugin.
В этой статье будем рассматривать pgoutput plugin.
На стороне PostgreSQL необходимо выполнить следующие шаги:
Выставить параметры для поддержки логической репликации в
postgresql.confwal_level = 'logical' max_replication_slots = 5 max_wal_senders = 5
Создать роль, которая будет использоваться для репликации. Роль должна иметь атрибут
REPLICATION
илиSUPERUSER
.CREATE USER epgl_test WITH REPLICATION PASSWORD 'epgl_test';
Разрешить доступ для этой роли в pg_hba.conf c
database = replication
host replication epgl_test 127.0.0.1/32 trust
Создать публикацию/publication. При создании публикации мы указываем таблицы, которые мы планируем получать в приложении Erlang
CREATE PUBLICATION epgl_test FOR TABLE public.test_table1, public.test_table3; ALTER PUBLICATION epgl_test ADD TABLE public.test_table2; -- добавить таблицу в уже существующую публикацию
Не так давно поддержка протокола потоковой репликации была добавлена в популярную Erlang библиотеку для работы с PostgreSQL EPGSQL. На основе этой библиотеки мы и будем строить логику получения изменений в Erlang.
Так как формат непосредственно данных в сообщении XlogData
протокола зависит от того, какой плагин используется для слота репликации, библиотека EPGSQL
не декодирует данные, а вызывает Callback-метод или посылает сообщение процессу асинхронно.
Подключение к БД
Должно быть создано специальное репликационное соединение с БД, для этого надо передать флаг replication
.
В рамках репликационного соединение к БД можно выполнять только репликационные команды (например DROP_REPLICATION_SLOT, CREATE_REPLICATION_SLOT).
Выполнить обычный запрос через это соединение нельзя.
Создание репликационного слота
Репликационный слот используются для отслеживания текущей позиции переданного WAL-лога.
При создании репликационного слота задаётся плагин для декодирования.
С версии PostgreSQL 10 появилась возможность создавать временные репликационные слоты, которые автоматически удаляются при закрытии репликационного соединения.
Если приложение считывает начальное состояние таблиц каждый раз при старте, то я рекомендую использовать временные репликационные слоты, в этом случае не надо будет заботиться об удалении созданных репликационных слотов (DROP_REPLICATION_SLOT). Удалять старые/не используемые репликационные слоты крайне важно, потому что PostgreSQL не удаляет WAL логи пока подписчики всех репликационных слотов не получат изменения. Если остался не активный репликационный слот, то WAL логи начнут накапливаться и рано или поздно произойдёт переполнение файловой системы.
Получение начального состояния таблиц
При создании репликационного слота (см. предыдущий шаг), автоматически создаётся snapshot, который показывает состояние базы данных на момент создания слота. Этот snapshot может быть использован для загрузки начального состояния таблиц, которое было на начало репликации.
Snapshot доступен только пока репликационное соединение, в котором была выполнена команда CREATE_REPLICATION_SLOT
не закрыто.
Для загрузки начальных данных должно быть создано новое обычное/не репликационное соединение к БД, так как выполнить SELECT в репликационном соединении нельзя. В этом соединении устанавливаем snapshot SET TRANSACTION SNAPSHOT SnapshotName
и извлекаем нужные данные.
Запуск репликации
Запускаем репликацию для созданного репликационного слота. При запуске репликации передаём дополнительные параметры для плагина, для pgoutput это имя созданной публикации.
Все шаги вместе
start_replication() ->
%% Создание репликационного соединения
{ok, ReplConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}, {replication, "database"}]),
%% Создание репликационного слота
{ok, _, [{_, _, SnapshotName}|_]} = epgsql:squery(ReplConn,
"CREATE_REPLICATION_SLOT epgl_repl_slot TEMPORARY LOGICAL pgoutput").
%% Получение начального состояния таблиц
{ok, NormalConn} = epgsql:connect(Host, User, Password, [{database, DBName}, {port, Port}]),
{ok, _, _} = epgsql:squery(NormalConn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"),
{ok, _, _} = epgsql:squery(NormalConn, ["SET TRANSACTION SNAPSHOT '", SnapshotName, "'"]),
%% select/load data epgsql:equery(NormalConn,...
epgsql:close(NormalConn),
%% Запуск репликации
ReplSlot = "epgl_repl_slot",
Callback = ?MODULE,
CbInitState = #{},
WALPosition = "0/0",
PluginOpts = "proto_version '1', publication_names '\"epgl_test\"'",
ok = epgsql:start_replication(ReplConn, ReplSlot, Callback, CbInitState, WALPosition, PluginOpts).
handle_x_log_data(StartLSN, EndLSN, Data, CbState) ->
io:format("~p~n", [{StartLSN, EndLSN, Data}]),
{ok, EndLSN, EndLSN, CbState}.
Есть два варианта взаимодействия с библиотекой EPGSQL
:
Синхронный. В качестве Callback передаётся имя модуля. Библиотека для полученных данных будет вызывать функцию
CallbackModule:handle_x_log_data
. Функция должна возвращать LastFlushedLSN, LastAppliedLSN, который посылается в ответ PostgreSQL, чтобы отслеживать текущее положение репликационного слота. В своих проектах мы используем только этот вариант;Асинхронный. В качестве Callback передаётся pid процесса, который будет получать сообщения вида
{epgsql, self(), {x_log_data, StartLSN, EndLSN, WALRecord}}
. После обработки процесс должен сообщить обработанный LSN через вызовepgsql:standby_status_update(Conn, FlushedLSN, AppliedLSN)
;
Дополнительно, чтобы использовать описанный подход, необходимо реализовать декодирование сообщений из формата плагина репликационного слота в более привычные для Erlang структуры. Или воспользоваться библиотекой с GitHub, которая реализует декодирование для двух плагинов и упрощает выполнение репликационных команд.