Доступ к таблицам из Си расширений для Postgres
В этот раз я расскажу не про использование Python или очередной трюк с CSS/HTML и, увы, не про то, как я 5 лет портировал Вангеры, а про один важный аспект написания расширений для замечательной СУБД PostgresSQL.
На самом деле, уже есть достаточно много статей о том, как написать расширение для Postgres на Си (к примеру, эта), в том числе и на русском языке. Но, как правило, в них описываются достаточно простые случаи. В этих статьях и инструкциях авторы реализуют функции, которые получают на вход данные, как-то их обрабатывают, а затем возвращают одно число, строку или пользовательский тип. В них нет пояснений, что делать, если из Си кода нужно пробежаться по обычной таблице, существующей в базе, или индексу.
К таблицам из Си можно получить доступ через хорошо описанный, но медленный SPI (Server Programming Interface), так же есть очень сложный способ, через буферы, а я расскажу про компромиссный вариант. Под катом я постарался дать примеры кода с подробными пояснениями.
Предполагаю, что свои простенькие функции вы уже писали, и видели, что они объявляются хитрым способом:
Datum to_regclass(PG_FUNCTION_ARGS);
и просто так вызвать такую функцию нельзя. Давайте лучше сразу разберём на примере функции to_regclass:
Datum my_index_oid_datum = DirectFunctionCall1(to_regclass, CStringGetDatum("my_index"));
Oid my_index_oid = DatumGetObjectId(my_index_oid_datum);
В этом коде я вызываю функцию to_regclass, с помощью макроса, чтобы преобразовать имя объекта базы данных (индекса, таблицы и т.д.) в его Oid (уникальный номер в каталоге). У этой функции только один аргумент, поэтому у макроса с говорящим названием DirectFunctionCall1 в конце стоит единица. В файле include/fmgr.h объявлены такие макросы вплоть до 9 аргументов. Сами аргументы всегда представлены универсальным типом Datum, именно поэтому Си строка «my_index» приводится к Datum посредством функции CStringGetDatum. Постгресовые функции в принципе общаются посредством Datum, поэтому и результатом работы нашего макроса будет значение типа Datum. После этого требуется преобразовать его к типу Oid при помощи макроса DatumGetObjectId. Все возможные варианты конвертации нужно смотреть тут: include/postgres.h.
Также поясню ещё одну вещь: в Си принято объявлять переменные в начале блока, но я для наглядности объявляю их там, где начинаю использовать. На практике так не пишут.
Сразу поясню почему SPI медленный. Дело в том, что запрос, выполненный с помощью SPI, проходит все этапы разбора и планирования. Кроме того, идти простым путём, где нет магии, мне кажется неинтересным.
Следующее, про что хочется сказать, это названия — в Postgres они путают! Из-за долгой истории проекта в коде осталось много странных названий для типов, методов и функций.
Прежде, чем читать дальше, желательно иметь базовые представления о MVCC Postgres. Все приведённые ниже примеры работают только в рамках уже созданной транзакции и, если вы вдруг залезли туда, где её ещё нет, то вам понадобится куда больше магии.
Итак, предположим, мы хотим просто пробежаться по таблице, которая содержит два поля: int id и text nickname, и вывести их в лог. Для начала нам надо открыть heap (таблицу) с определённой блокировкой:
RangeVar *table_rv = makeRangeVar("public", "my_table", -1);
Relation table_heap = heap_openrv(table_rv, AccessShareLock);
Вместо функции heap_openrv можно использовать heap_open, у которой первый аргумент это Oid таблицы (можно получить при помощи функции в первой части статьи). Думаю, назначение RangeVar интуитивно понятно, а вот на блокировках остановимся подробнее. Типы блокировок объявлены в файле include/storage/lockdefs.h с достаточно понятными комментариями. Вы можете увидеть эту информацию в таблице:
AccessShareLock | SELECT |
RowShareLock | SELECT FOR UPDATE/FOR SHARE |
RowExclusiveLock | INSERT, UPDATE, DELETE |
ShareUpdateExclusiveLock | VACUUM (non-FULL), ANALYZE, CREATE INDEX CONCURRENTLY |
ShareLock | CREATE INDEX (WITHOUT CONCURRENTLY) |
ShareRowExclusiveLock | like EXCLUSIVE MODE, but allows ROW SHARE |
ExclusiveLock | blocks ROW SHARE/SELECT…FOR UPDATE |
AccessExclusiveLock | ALTER TABLE, DROP TABLE, VACUUM FULL, and unqualified LOCK TABLE |
Так как мы хотели только пробежаться по табличке, то есть выполнить, SeqScan, то выбираем AccessShareLock. После того, как мы открыли heap, нам необходимо проинициализировать процесс сканирования таблицы:
HeapScanDesc heapScan = heap_beginscan(sr_plans_heap, SnapshotSelf, 0, (ScanKey) NULL);
Как и ожидалось, этой функции первым аргументом передаём наш heap, а вот SnapshotSelf требует пояснений. Работа MVCC в Postgres предполагает, что в каждый момент времени может существовать несколько версий одной строки таблицы, и именно в snapshot (моментальный снимок транзакции) записано, какие мы можем видеть, а какие нет. Кроме SnapshotSelf, т.е. текущего snapshot транзакции, есть, к примеру, SnapshotAny, подставив который, мы бы смогли увидеть также все удалённые и изменённые tuples (строки таблицы). Другие виды можете глянуть в include/utils/tqual.h. Следующие аргументы у heap_beginscan — это количество ключей поиска (ScanKey) и собственно сами ключи. Ключи поиска (ScanKey) — это по сути условия, т.е. то, что вы пишете в WHERE. Для работы с heap ключи поиска не очень нужны, т.к. вы всегда сами в своём коде можете сделать проверку условий. А вот при поиске по индексу мы от их инициализации и использования никуда не уйдём.
И теперь самое главное, явись цикл:
Datum values[2];
bool nulls[2];
for (;;)
{
HeapTuple local_tuple;
local_tuple = heap_getnext(heapScan, ForwardScanDirection);
if (local_tuple == NULL)
break;
heap_deform_tuple(local_tuple, table_heap->rd_att, values, nulls);
elog(WARNING,
"Test id:%i nick:%s",
DatumGetInt32(values[0]),
TextDatumGetCString(values[1])
);
}
В этом цикле мы вызываем функцию heap_getnext, с помощью которой получаем следующий tuple, пока нам не вернётся нулевой указатель. Функция heap_getnext получает наш HeapScanDesc и направление сканирования, для нас будут актуальны два: это прямое сканирование — ForwardScanDirection и обратное — BackwardScanDirection. Теперь нам осталось распаковать tuple и получить доступ к его полям, для этого мы вызываем heap_deform_tuple, куда передаём наш tuple, после чего его описание (которое берём из heap), и два массива (один для значений, а другой для определения NULL значений). Дальше, пользуясь уже знакомыми для нас функциями, преобразуем элементы массива values (состоящего из Datum) к обычным Си типам.
А теперь не забудем закрыть наше сканирование heap (таблицы) и закрыть сам heap:
heap_endscan(heapScan);
heap_close(sr_plans_heap, AccessShareLock);
Закрываем heap мы с таким же типом блокировки, с которым и открывали его.
API поиска по индексу будет аналогичным поиску по heap, но только потребует больше строк кода для инициализации. В коде постараемся выводить сообщения только для строк, где первый аргумент даёт ответ на главный вопрос жизни, вселенной и всего такого. Как и для heap, для начала приведём кусок кода со всеми подготовительными работами:
RangeVar *table_rv = makeRangeVar("public", "my_table", -1);
Relation table_heap = heap_openrv(table_rv, AccessShareLock);
table_idx_oid = DatumGetObjectId(DirectFunctionCall1(
to_regclass,
StringGetDatum("my_table_idx")
));
Relation table_idx_rel = index_open(table_idx_oid, AccessShareLock);
indexScan = index_beginscan(table_heap, table_idx_rel, SnapshotSelf, 1, 0);
ScanKeyData key;
ScanKeyInit(&key, 1, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(42));
index_rescan(indexScan, &key, 1, NULL, 0);
Итак, для поиска по индексу нам так же, как и в прошлом примере, надо открыть heap (таблицу) с которой связан этот индекс. При помощи to_regclass мы находим oid для нашего индекса my_table_idx, после открываем его с помощью index_open, находя нужный нам Relation. После чего инициализируем процесс сканирования индекса index_beginscan, тут главное отличие от heap_beginscan в том, что у нас будет 1 ключ поиска (ScanKey).
ScanKeyInit, как понятно из названия, инициализирует ключ для поиска. Первым аргументом идёт сам ключ (тип ScanKeyData), после мы указываем порядковый номер аргумента, по которому будет вестись поиск (нумерация с 1), дальше идёт стратегия поиска. По факту это похоже на оператор в условии (остальные стратегии можно посмотреть тут include/access/startnum.h), после мы указываем непосредственно oid функции, которая будет проводить нашу операцию сравнения (эти oid объявлены в файле include/utils/fmgroids.h). И наконец, последний наш аргумент — это Datum, который должен содержать значение, по которому должен производится поиск.
Дальше идёт ещё одна новая функция index_rescan, и служит она для того, чтобы запустить поиск по индексу. Одного index_beginscan тут недостаточно. На вход эта функция получает список ключей (у нас только один), количество этих ключей, после чего ключи для сортировки и количество ключей для сортировки (они используются для условия ORDER BY, которого в этом примере нет). Вроде все подготовки прошли, и можно показывать основной цикл, правда, он будет очень похож на то что было с heap:
for (;;)
{
HeapTuple local_tuple;
ItemPointer tid = index_getnext_tid(indexScan, ForwardScanDirection);
if (tid == NULL)
break;
local_tuple = index_fetch_heap(indexScan);
heap_deform_tuple(local_tuple, table_heap->rd_att, values, nulls);
elog(WARNING,
"Test id:%i nick:%s",
DatumGetInt32(values[0]),
DatumGetCString(PG_DETOAST_DATUM(values[1]))
);
}
Так как теперь мы бежим по индексу, а не самому heap, то получаем мы ItemPointer, специальный указатель на запись в индексе, (если вас интересуют подробности, обращайтесь к соответствующей документации www.postgresql.org/docs/9.5/static/storage-page-layout.html или сразу к файлу include/storage/bufpage.h) используя который мы ещё должны получить tuple из heap. В данном цикле index_getnext_tid функционально похож на heap_getnext и добавляется только index_fetch_heap, а остальное полностью аналогично.
Для окончания нашей операции, как можно догадаться, нам надо будет закрыть поиск по индексу, сам индекс и открытый heap:
index_endscan(indexScan);
index_close(table_idx_rel, heap_lock);
heap_close(table_heap, heap_lock);
Итак, мы научились делать SeqScan и IndexScan, т.е. искать по нашей табличке и даже использовать для этого индекс, но как теперь в неё что-то добавить? Для этого нам нужны будут функции simple_heap_insert и index_insert.
Прежде чем менять что-то в таблице и в связанных индексах, их надо открыть с нужными блокировками способом, показанным ранее, после чего можно выполнять вставку:
values[0] = Int32GetDatum(42);
values[1] = CStringGetDatum("First q");
tuple = heap_form_tuple(table_heap->rd_att, values, nulls);
simple_heap_insert(table_heap, tuple);
index_insert(
table_idx_rel,
values,
nulls,
&(tuple->t_self),
table_heap,
UNIQUE_CHECK_NO
);
Тут мы делаем обратную операцию, т.е. из массивов values и nulls формируем tuple
после чего добавляем его в heap и потом добавляем соответствующую запись в индекс. После предыдущих объяснений это код для вас должен быть понятен.
Для обновления tuple нужно сделать следующее:
values[0] = Int32GetDatum(42);
replaces[0] = true;
newtuple = heap_modify_tuple(
local_tuple,
RelationGetDescr(table_heap),
values,
nulls,
replaces
);
simple_heap_update(table_heap, &newtuple->t_self, newtuple);
У нас появляется массив булевых переменных replaces, где хранится информация, какое поле изменилось. После чего мы формируем новый tuple на базе старого, но с нашими правками при помощи heap_modify_tuple. И в самом конце выполняем само обновление simple_heap_update. Так как у нас появился новый tuple, а старый помечен как удаленный, то нужно также добавить запись в индекс для нового tuple способом, что был показан ранее.
Теперь удалить tuple не составит труда с функцией simple_heap_delete, и она не требует пояснений. Хочется отметить, что соответствующую запись в индексе удалять не нужно, это произойдёт автоматически при выполнении очистки операцией VACUUM.
Мы научились получать доступ к таблице из Си кода, в том числе по индексу. Я постарался максимально подробно описать каждую функцию и её назначение, но, если что-то оказалось непонятно, спрашивайте в комментариях, я постараюсь ответить и дополнить статью.