Создание плагина для логической репликации в PostgreSQL 9.4+
Как многие интересующися знают, в PostgreSQL в версии 9.4 появилась (наконец-то) логическая репликация. Теперь, чтобы сделать свою репликацию, необязательно разбираться с форматом бинарных wal файлов или писать триггеры (может были еще способы), а преобразовать данные в удобный для себя формат. Для этого достаточно написать плагин к PostgreSQL, который будет этим заниматься. В статье описывается плагин, который преобразует данные в JSON.Код плагина находится на гитхабе — github.com/ildus/decoder_json. Приветсвуются pull-requests с улучшениями (особенно по части улучшения поддержки типов), багфиксами и просто косметическими улучшениями. JSON был выбран за простоту. Это не окончательный вариант, возможно после тестирования на реальных данных окажется что нужен более производительный формат, и придется переделать. В статье я не буду приводить весь код плагина, а только части про которые мне кажется нужно рассказать.
Необходимые требования для создания плагина: знание С, установленные средства сборки (gcc, cmake), установленные пакеты (в debian-системах) postgresql-9.4, postgresql-server-dev-9.4 и аналогичные в других системах. После установки postgresql, в postgresql.conf надо установить значение max_replication_slots = 1 (или больше) и wal_level = logical.
Сам плагин представляет собой подключаемую библиотеку на C, из которой вызываются callback функции на события postgresql. При инициализации вызывается функция _PG_output_plugin_init со структурой, полям которой полям нужно назначить свои функции:
startup_cb — функция, вызываемая при инициализации плагина begin_cb — начало транзакции change_cb — запись данных commit_cb — потверждение транзакции shutdown_cb — деинициализация плагина Функция которая заполняет структуру: void _PG_output_plugin_init (OutputPluginCallbacks *cb) { cb→startup_cb = decoder_json_startup; cb→begin_cb = decoder_json_begin_txn; cb→change_cb = decoder_json_change; cb→commit_cb = decoder_json_commit_txn; cb→shutdown_cb = decoder_json_shutdown; } Теперь осталось определить эти пять функций. decoder_json_startup вызывается в начале декодирования и используется для задания опций декодирования и создания своего контекста памяти: Функция decoder_json_startup /* initialize this plugin */ static void decoder_json_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { ListCell *option; DecoderRawData *data;
data = palloc (sizeof (DecoderRawData)); data→context = AllocSetContextCreate (ctx→context, «Raw decoder context», ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); data→include_transaction = false; data→sort_keys = false;
ctx→output_plugin_private = data;
/* Default output format */ opt→output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
foreach (option, ctx→output_plugin_options) { DefElem *elem = lfirst (option);
Assert (elem→arg == NULL || IsA (elem→arg, String));
if (strcmp (elem→defname, «include_transaction») == 0) { /* if option does not provide a value, it means its value is true */ if (elem→arg == NULL) data→include_transaction = true; else if (! parse_bool (strVal (elem→arg), &data→include_transaction)) ereport (ERROR, (errcode (ERRCODE_INVALID_PARAMETER_VALUE), errmsg («could not parse value \»%s\» for parameter \»%s\», strVal (elem→arg), elem→defname))); } else if (strcmp (elem→defname, «sort_keys») == 0) { /* if option does not provide a value, it means its value is true */ if (elem→arg == NULL) data→sort_keys = true; else if (! parse_bool (strVal (elem→arg), &data→sort_keys)) ereport (ERROR, (errcode (ERRCODE_INVALID_PARAMETER_VALUE), errmsg («could not parse value \»%s\» for parameter \»%s\», strVal (elem→arg), elem→defname))); } else { ereport (ERROR, (errcode (ERRCODE_INVALID_PARAMETER_VALUE), errmsg («option \»%s\» = \»%s\» is unknown», elem→defname, elem→arg? strVal (elem→arg) :»(null)»))); } } } Здесь парсятся параметры, переданные в плагин и сохраняются в структуру. Созданный контекст памяти используется потом в decoder_json_change чтобы корректно почистить используемые ресурсы. Важные моменты: свои данные сохраняются в ctx→output_plugin_private opt→output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT — так задается что вывод плагина будет текстовый decoder_json_shutdown вызывается в конце декодирования, и используется для чистки ресурсов.Функция decoder_json_shutdown /* cleanup this plugin’s resources */ static void decoder_json_shutdown (LogicalDecodingContext *ctx) { DecoderRawData *data = ctx→output_plugin_private;
/* cleanup our own resources via memory context reset */ MemoryContextDelete (data→context); } Дальше самое интересное. Надо определить функции decoder_json_begin_txn, decoder_json_commit_txn и decoder_json_change которые собственно и генерируют строки, получаемые командами pg_logical_slot_peek_changes и pg_logical_slot_get_changes. Сгенеренную строку надо добавить в слот, это делается командами: OutputPluginPrepareWrite (ctx, true); appendStringInfoString (ctx→out, «some string»); OutputPluginWrite (ctx, true); Функции decoder_json_begin_txn и decoder_json_commit_txn пишут (или просто пропускают, если есть такое условие) команды начала и конца транзакции в слот — строки 'begin' и 'commit' соответсвенно.Функция decoder_json_change вызывается на событие изменения данных. В этой функции определяется какое именно событие произошло (INSERT, UPDATE, DELETE) и для каждого из них создается своя структура. Для UPDATE и DELETE важно наличие уникального (not null) или первичного ключа в таблице, иначе просто невозможно определить изменяемую (удаляемую) строку. Это зависит от значения параметра REPLICA IDENTITY для таблицы.
Эта функция принимает 4 параметра:
LogicalDecodingContext *ctx — контекст ReorderBufferTXN *txn Relation relation — сведения о изменяемой таблице ReorderBufferChange *change — сведения о данных Кратко про функцию можно сказать что тип операции определяется через change→action. Далее по данным в change (change→data.tp.newtuple и change→data.tp.oldtuple) создается JSON структура. JSON генерируется с помощью библиотеки libjansson.Вот здесь то и начинаются сложности. Если REPLICA IDENTITY для таблицы установлен в NOTHING или DEFAULT при отсутствующем первичном ключе, то невозможно определить изменяемые строки и в лог попадут только записи добавления. При обновлении или удалении данных с таблицы с DEFAULT, FULL, INDEX и при наличии уникального ключа, то его значение берется из newtuple или из oldtuple (если значение ключа изменяется запросом). При отсутствии уникального ключа и если FULL, то для идентификации используются все значения из oldtuple.
В результате строится JSON структура, вида {«a»: 0, «r»: «public.some_table», «c»: {«id»: 1}, «d»: {«a»: 2}}, где a — это тип действия, r — название таблицы, c — значения для идентификации строки, d — собственно данные.
Проверим работу. Сборка плагина и запуск тестов:
git clone https://github.com/ildus/decoder_json.git cd decoder_json # разрешаем запись всем в папку с библиотеками постгреса — ни в коем случае нельзя делать на продакшене sudo chmod a+rw `pg_config --pkglibdir` chmod a+rwx ./ # скачиваем и собираем libjansson, для генерации JSON make deps # переключаемся под юзера postgres sudo su postgres make make test Тестирование плагина вручную: # переключаемся под юзера postgres, создаем тестовую бд и открываем консоль работы с бд sudo su postgres createdb test_db psql test_db
# psql консоль test_db=# create table test1 (id serial primary key, name varchar); test_db=# SELECT * FROM pg_create_logical_replication_slot ('custom_slot', 'decoder_json'); slot_name | xlog_position -------------±-------------- custom_slot | 0/4D9F870 (1 row) Здесь мы указываем название слота и подключаемый плагин. В ответе мы видим название слота, и место (xlog позиция) с которого собственно начинается запись данных в слот. То что мы указали наш плагин, не означает что он уже работает, само декодирование начинается только когда мы забираем данные. Для этого используются функции pg_logical_slot_peek_changes и pg_logical_slot_get_changes. Они отличаются тем что get функция после получения данных чистит очередь.Добавление данных:
test_db=# insert into test1 values (1, 'bb'); INSERT 0 1 test_db=# insert into test1 values (2, 'bb'); INSERT 0 1 test_db=# select * from pg_logical_slot_get_changes ('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------±------±---------------------------------------------------- 0/BAB0968×48328 | begin 0/BAB0968×48328 | {«a»:0, «r»: «public.test1», «d»:{«id»:1, «name»: «bb»}} 0/BAB09F0×48328 | commit 0/BAB09F0×48329 | begin 0/BAB09F0×48329 | {«a»:0, «r»: «public.test1», «d»:{«id»:2, «name»: «bb»}} 0/BAB0A78×48329 | commit (6 rows) Изменение данных test_db=# update test1 set name = 'dd' where id=2; UPDATE 1 test_db=# select * from pg_logical_slot_get_changes ('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------±------±----------------------------------------------------------------- 0/BB4C700×48338 | begin 0/BB4C700×48338 | {«c»:{«id»:2}, «a»:1, «r»: «public.test1», «d»:{«id»:2, «name»: «dd»}} 0/BB4C798×48338 | commit (3 rows) Удаление данных: test_db=# delete from test1 where id=2; DELETE 1 test_db=# select * from pg_logical_slot_get_changes ('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------±------±---------------------------------------- 0/BB4C8A8×48339 | begin 0/BB4C8A8×48339 | {«c»:{«id»:2}, «a»:2, «r»: «public.test1»} 0/BB4C9C8×48339 | commit (3 rows) Использованные и полезные ресурсы: документация PostgresqlSQL — www.postgresql.org/docs/9.4/static/logicaldecoding.html пример плагина из PostgreSQL (https://github.com/postgres/postgres/tree/master/contrib/test_decoding) michael.otacoo.com/ — очень полезный блог, плагин decoder_raw автора этого блога использовался как основа для моего плагина. github.com/xstevens/decoderbufs — плагин который использует google protocol buffers как выходной формат.