Пишем свой FDW для PostgreSQL

Привет, Хабр!
В этой статье быстро разберём, как устроен PostgreSQL — от хранения данных в Heap и работы с FSM/VM до создания своего FDW с компрессией на базе zlib.
Архитектура PostgreSQL
Heap: основа хранения данных в PostgreSQL
PostgreSQL использует Heap как основной механизм хранения данных. Каждая таблица представлена в виде набора блоков фиксированного размера (по умолчанию 8 КБ). Эти блоки размещаются в файлах внутри каталога данных PostgreSQL, а сами файлы именуются в соответствии с relfilenode.
Записи в таблицах (или «tuple») хранятся внутри блоков, а доступ к ним осуществляется через внутренние указатели. Когда происходит операция INSERT, UPDATE или DELETE, PostgreSQL решает, как разместить данные, минимизируя фрагментацию.
Каждый 8-КБ блок данных (страница, Page) в PostgreSQL имеет фиксированную структуру:
Компонент блока | Описание |
---|---|
PageHeaderData | Заголовок блока, содержит служебные данные |
ItemIdData | Таблица смещений, указывающая на начало каждой строки в блоке |
Tuple Data | Собственно данные (tuple) |
Free Space | Свободное место для новых записей |
Каждая строка в PostgreSQL состоит из заголовка и самих данных:
Поле | Описание |
---|---|
xmin | ID транзакции, которая создала запись |
xmax | ID транзакции, которая удалила/обновила запись |
ctid | Указатель на саму запись (или новую версию, если был UPDATE) |
data | Собственно данные записи |
PostgreSQL использует MVCC, что означает, что UPDATE и DELETE фактически не переписывают существующую строку, а создают новую версию. Это ведет к накоплению мусорных данных, что требует периодического выполнения VACUUM, чтобы освободить место.
Как узнать размеры блоков и страниц в PostgreSQL:
SELECT pg_relation_size('your_table'); -- размер таблицы в байтах
SELECT relpages FROM pg_class WHERE relname = 'your_table'; -- количество страниц
Free Space Map (FSM)
Free Space Map (FSM) — это механизм PostgreSQL, который помогает управлять свободным местом в блоках таблицы. Когда выполняется INSERT, PostgreSQL сначала обращается к FSM, чтобы найти, где есть место для записи, а не перебирает всю таблицу.
FSM — это битовая карта, где для каждого блока хранится информация о доступном пространстве:
Если в блоке достаточно места — FSM отмечает его как «свободный».
Если блок переполнен — он исключается из FSM.
VACUUM обновляет FSM, указывая, какие блоки теперь имеют свободное место.
Как посмотреть данные FSM:
SELECT pg_freespace('your_table');
Эта команда покажет, сколько свободного места осталось в каждом блоке таблицы.
Visibility Map (VM)
Visibility Map — это вспомогательная структура, которая хранит информацию о том, какие блоки данных не требуют дополнительной проверки при выполнении VACUUM и индексного сканирования.
Зачем нужен VM:
Оптимизация VACUUM: если все строки в блоке уже видимы для всех транзакций, VACUUM может пропустить этот блок, снижая нагрузку на диск.
Ускорение индексного сканирования: если блок полностью «видимый», PostgreSQL может пропустить проверку xmin/xmax, ускоряя SELECT‑запросы.
Как проверить статус Visibility Map:
SELECT relallvisible FROM pg_class WHERE relname = 'your_table';
Если relallvisible > 0
, значит в таблице есть блоки, которые не требуют VACUUM.
pg_filenode
Каждая таблица, индекс или другой объект базы данных представлен как файл в файловой системе. PostgreSQL управляет этим с помощью pg_filenode
.
Команда | Описание |
---|---|
oid | Уникальный идентификатор объекта |
relfilenode | Имя физического файла, в котором хранятся данные |
relpages | Количество страниц в файле |
Как узнать, в каком файле хранится таблица:
SELECT oid, relfilenode, relname FROM pg_class WHERE relname = 'your_table';
Этот запрос показывает pg_filenode таблицы, по которому можно найти соответствующий файл в каталоге базы данных.
Файлы с данными таблиц и индексов хранятся в каталоге PGDATA/base/
. Например:
ls $PGDATA/base/16384/
Если relfilenode
таблицы 12 345, то данные этой таблицы будут храниться в файле $PGDATA/base/16384/12345
.
Foreign Data Wrapper (FDW)
FDW — это механизм, позволяющий PostgreSQL работать с данными, которые хранятся за пределами стандартной системы хранения.
FDW — это набор хук‑функций, которые вызываются сервером PostgreSQL при выполнении следующих операций:
Чтение (SELECT)
Запись (INSERT, UPDATE, DELETE)
Сканирование (Foreign Scan)
Планирование запроса (Planner Hook)
FDW позволяет подменить стандартное хранение и реализовать альтернативный файловый backend, например:
Хранение данных в другом формате (например, с компрессией).
Подключение к внешним источникам (например, к NoSQL или облачному хранилищу).
Создание собственной структуры хранения (например, файловая система, оптимизированная под конкретные задачи).
Разработка собственного FDW
Создадим расширение на языке C, которое реализует интерфейс FDW. Расширение будет состоять из нескольких компонентов.
Для начала создадим базовую структуру расширения. Проект будет набором файлов:
custom_fdw.c
— основной код FDW.Makefile
— для сборки расширения.custom_fdw.control
— описание расширения для PostgreSQL.
Начнем с основного файла custom_fdw.c
:
#include "postgres.h"
#include "fmgr.h"
#include "access/reloptions.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "utils/elog.h"
#include "catalog/pg_type.h"
#include "commands/explain.h"
#include "executor/executor.h"
#include "utils/builtins.h"
#include "nodes/pg_list.h"
#include
#include
#include
/* Определяем структуру для хранения состояния FDW */
typedef struct CustomFdwState {
char *storage_path;
FILE *data_file;
} CustomFdwState;
/* Прототипы наших функций FDW */
extern Datum custom_fdw_handler(PG_FUNCTION_ARGS);
extern Datum custom_fdw_validator(PG_FUNCTION_ARGS);
static void customBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *customIterateForeignScan(ForeignScanState *node);
static void customEndForeignScan(ForeignScanState *node);
static void customReScanForeignScan(ForeignScanState *node);
/* Здесь также можно добавить прототипы для функций планирования, если они вам понадобятся */
static void customGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
static void customGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
static ForeignScan *customGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid,
ForeignPath *best_path, List *tlist, List *scan_clauses);
Самая первая функция, которую должен экспортировать FDW — это обработчик, возвращающий структуру FdwRoutine с указанием всех реализованных методов:
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(custom_fdw_handler);
PG_FUNCTION_INFO_V1(custom_fdw_validator);
Datum
custom_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine *fdw_routine = makeNode(FdwRoutine);
/* Привязываем наши функции к FDW */
fdw_routine->GetForeignRelSize = customGetForeignRelSize;
fdw_routine->GetForeignPaths = customGetForeignPaths;
fdw_routine->GetForeignPlan = customGetForeignPlan;
fdw_routine->BeginForeignScan = customBeginForeignScan;
fdw_routine->IterateForeignScan = customIterateForeignScan;
fdw_routine->ReScanForeignScan = customReScanForeignScan;
fdw_routine->EndForeignScan = customEndForeignScan;
PG_RETURN_POINTER(fdw_routine);
}
/* Простая функция-валидатор, чтобы проверить корректность переданных опций */
Datum
custom_fdw_validator(PG_FUNCTION_ARGS)
{
/* Здесь можно реализовать проверку опций, переданных в CREATE SERVER или FOREIGN TABLE */
PG_RETURN_VOID();
}
Теперь перейдем к основе нашего FDW — операциям чтения данных. Здесь реализуем функции, которые вызываются при выполнении операций сканирования.
Функция customBeginForeignScan
открывает файл, в котором хранится кастомный набор данных.
static void
customBeginForeignScan(ForeignScanState *node, int eflags)
{
CustomFdwState *state = palloc(sizeof(CustomFdwState));
/* Пусть путь к файлу хранится как опция, но для простоты зададим его статически */
state->storage_path = "data/custom_storage.dat";
state->data_file = fopen(state->storage_path, "rb");
if (state->data_file == NULL)
elog(ERROR, "Не удалось открыть файл кастомного хранилища: %s", state->storage_path);
/* Сохраняем состояние в структуре FDW */
node->fdw_state = (void *) state;
}
Функция customIterateForeignScan
отвечает за возврат следующей строки данных в виде tuple
. Тут начинается парсинг и преобразование сырых данных в формат, понятный PostgreSQL.
static TupleTableSlot *
customIterateForeignScan(ForeignScanState *node)
{
CustomFdwState *state = (CustomFdwState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
bool eof = false;
char buffer[1024];
/* Пробуем прочитать строку из файла */
if (fgets(buffer, sizeof(buffer), state->data_file) == NULL)
eof = true;
if (eof)
{
ExecClearTuple(slot);
return slot;
}
/* Представим, что данные разделены запятыми, и у нас два столбца:
первый – целочисленный идентификатор, второй – текстовое поле */
char *token = strtok(buffer, ",");
if (token == NULL)
{
elog(WARNING, "Неверный формат строки: %s", buffer);
ExecClearTuple(slot);
return slot;
}
int col1 = atoi(token);
token = strtok(NULL, ",");
char *col2 = token ? pstrdup(token) : NULL;
if (col2 && col2[strlen(col2)-1] == '\n')
col2[strlen(col2)-1] = '\0';
/* Создаем массив Datum и массив флагов isnull */
Datum values[2];
bool nulls[2] = {false, false};
values[0] = Int32GetDatum(col1);
values[1] = CStringGetTextDatum(col2);
/* Формируем tuple */
slot = ExecClearTuple(slot);
slot = ExecStoreTuple(heap_form_tuple(slot->tts_tupleDescriptor, values, nulls), slot, false);
return slot;
}
Не забываем о функции перезапуска — она нужна для повторного прохода по данным, и завершающая функция для корректного закрытия файлового дескриптора:
static void
customReScanForeignScan(ForeignScanState *node)
{
CustomFdwState *state = (CustomFdwState *) node->fdw_state;
rewind(state->data_file); // Перематываем файл в начало
}
static void
customEndForeignScan(ForeignScanState *node)
{
CustomFdwState *state = (CustomFdwState *) node->fdw_state;
if (state)
{
if (state->data_file)
fclose(state->data_file);
pfree(state);
}
}
Для полноты картины стоит добавить минимальные реализации функций планирования. Они могут быть заглушками, если вы не планируется оптимизировать запросы на уровне FDW:
static void
customGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
{
/* Здесь можно оценить количество строк, стоимость запроса и т.д.
Пока оставим пустышку */
}
static void
customGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
{
/* Генерируем простой план обхода данных */
add_path(baserel, (Path *) create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
baserel->rows,
10.0, /* произвольная стоимость */
0, /* startup cost */
baserel->rows, /* total cost */
NIL, /* no extra plan */
NULL, /* no fdw_private data */
NIL));
}
static ForeignScan *
customGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid,
ForeignPath *best_path, List *tlist, List *scan_clauses)
{
/* Преобразуем условия сканирования, убираем лишние фильтры */
List *fdw_private = NIL;
scan_clauses = extract_actual_clauses(scan_clauses, false);
return make_foreignscan(tlist, scan_clauses, baserel->relid, NIL, fdw_private,
NIL, NIL, NIL);
}
Интеграция с алгоритмом компрессии на базе zlib
Теперь усложним задачу — добавим сжатие данных.
Будем использовать библиотеку zlib. Функция, которая сжимает входную строку:
#include
#define CHUNK 16384
/* Функция compress_data сжимает данные из input и записывает сжатый результат в output */
int compress_data(const char *input, size_t input_len, char **output, size_t *output_len)
{
int ret;
int flush;
unsigned have;
z_stream strm;
unsigned char out[CHUNK];
size_t total_written = 0;
size_t allocated = CHUNK;
*output = palloc(allocated);
/* Инициализация zlib */
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
if (ret != Z_OK)
return ret;
strm.avail_in = input_len;
strm.next_in = (unsigned char *) input;
do {
strm.avail_out = CHUNK;
strm.next_out = out;
flush = (strm.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
ret = deflate(&strm, flush);
if (ret == Z_STREAM_ERROR) {
deflateEnd(&strm);
return ret;
}
have = CHUNK - strm.avail_out;
if (total_written + have > allocated)
{
allocated *= 2;
*output = repalloc(*output, allocated);
}
memcpy(*output + total_written, out, have);
total_written += have;
} while (flush != Z_FINISH);
deflateEnd(&strm);
*output_len = total_written;
return Z_OK;
}
Используем стандартный цикл сжатия из zlib. Он позволяет сжимать данные по блокам, динамически расширяя буфер, если требуется.
Допустим, хочется не только читать данные, но и писать их в файловую систему с компрессией. Для этого можно добавить функцию записи данных, которая будет вызываться, например, при выполнении INSERT через FDW:
static void
customInsertData(const char *data)
{
char *compressed;
size_t compressed_len;
if (compress_data(data, strlen(data), &compressed, &compressed_len) != Z_OK)
elog(ERROR, "Ошибка сжатия данных");
/* Открываем файл для добавления данных (append binary) */
FILE *fp = fopen("data/custom_storage.dat", "ab");
if (!fp)
elog(ERROR, "Не удалось открыть файл для записи: data/custom_storage.dat");
static void
customInsertData(const char *data)
{
char *compressed;
size_t compressed_len;
if (compress_data(data, strlen(data), &compressed, &compressed_len) != Z_OK)
elog(ERROR, "Ошибка сжатия данных");
/* Открываем файл для добавления данных (append binary) */
FILE *fp = fopen("data/custom_storage.dat", "ab");
if (!fp)
elog(ERROR, "Не удалось открыть файл для записи: data/custom_storage.dat");
if (fwrite(compressed, 1, compressed_len, fp) != compressed_len)
elog(ERROR, "Ошибка записи сжатых данных в файл");
fclose(fp);
pfree(compressed);
}
if (fwrite(compressed, 1, compressed_len, fp) != compressed_len)
elog(ERROR, "Ошибка записи сжатых данных в файл");
fclose(fp);
pfree(compressed);
}
Код можно интегрировать в обработку INSERT‑операций, чтобы любые данные, записываемые в нашу файловую систему, автоматически сжимались.
А как насчет вас? Использовали ли вы подобные подходы, и какой у вас опыт работы с этим? Делитесь в комментариях.
24 марта в Otus пройдет открытый урок «Безопасность в PostgreSQL». На нем вы сможете разобрать механизмы безопасности PostgreSQL: шифрование (SSL/TLS), контроль доступа (pg_hba.conf, роли, RLS) и аудит (pg_audit). Узнаете, как защититься от SQL-инъекций и DDoS-атак. Настроите безопасную конфигурацию на практике. Записаться можно по ссылке.