PubSub почти бесплатно: особенности NOTIFY в PostgreSQL

Если ваши микросервисы уже используют общую базу PostgreSQL для хранения данных, или ей пользуются несколько экземпляров одного сервиса на разных серверах, можно относительно «дешево» получить возможность обмена сообщениями (PubSub) между ними без интеграции в архитектуру Redis, RabbitMQ-кластера или встройки в код приложения другой MQ-системы.

Для этого мы не будем писать сообщения в таблицы БД, поскольку это вызывает слишком большие накладные расходы сначала на запись передаваемого, а потом еще и на зачистку от уже прочитанного.

Передавать и получать данные мы станем с помощью механизма NOTIFY/LISTEN, а модельную реализацию соберем для Node.js.

ere_dvhj_y3sjkypbqeezzg_xn4.png

Но на этом пути лежат грабли, которые придется аккуратно обойти.

Особенности протокола


LISTEN

LISTEN канал
Приложение, использующее библиотеку libpq, выполняет команду LISTEN как обычную команду SQL, а затем оно должно периодически вызывать функцию PQnotifies, чтобы проверить, не поступили ли новые уведомления.

Если вы пишете не библиотеку для работы с PG, а уже конкретное приложение, в большинстве случаев, вы не будете иметь доступа к вызову этой функции.

Но если такую библиотеку уже написали для вас в соответствии с рекомендациями по обработке асинхронных запросов и уведомлений, вы автоматически получите сообщение в прикладном коде. Если нет — можно просто периодически выполнять SELECT 1 на соединении, тогда уведомление придет вместе с результатом запроса:

В очень старых выпусках libpq обеспечить своевременное получения сообщений от команды NOTIFY можно было только одним способом — постоянно отправлять команды, пусть даже пустые, а затем проверять PQnotifies после каждого вызова PQexec. Хотя этот метод всё ещё работает, он считается устаревшим ввиду неэффективного использования процессора.

С точки зрения, например, psql это выглядит вот так:

_tmp=# LISTEN test;
LISTEN
_tmp=# SELECT 1;
 ?column?
----------
        1
(1 row)

Asynchronous notification "test" with payload "abc123" received from server process with PID 63991.


Если для прикладной задачи мы можем согласиться на максимальную задержку доставки сообщения в пределах 1 сек — с таким интервалом и выполняем запрос. Заодно, этот способ помогает мониторить «живость» соединения, убеждаясь, что никто случайно не оборвал его со стороны сервера через pg_terminate_backend, или не произошел вообще внезапный «крэш» PG без всяких уведомлений клиентов.

NOTIFY

NOTIFY канал [ , сообщение ]
Команда NOTIFY отправляет событие уведомления вместе с дополнительной строкой «сообщения» всем клиентским приложениям, которые до этого выполнили в текущей базе данных LISTEN канал с указанным именем канала.

Строка «сообщения», которая будет передана вместе с уведомлением, … должна задаваться простой текстовой константой. В стандартной конфигурации её длина должна быть меньше 8000 байт.

То есть если наше «сообщение» внезапно содержит что-то сильно отличное от ASCII, то нам его придется экранировать, а если превысит размер в 8000 байт (не символов!) — то резать на блоки и потом склеивать. При этом нам стоит поберечь как пропускную способность канала, так и ресурсы сервера на обработку передачи таких блоков — то есть добавить к полезному контенту как можно меньше служебной «обвязки», но при этом и не «задушить» клиентское приложение, заставляя его заниматься упаковкой с gzip -9.

Из дополнительных плюсов механизма можно так же отметить привязку к «источнику» сообщения…

… дополнительной работы можно избежать, если проверить, не совпадает ли PID сигнализирующего процесса (указанный в данных события) с собственным PID сеанса (его можно узнать, обратившись к libpq). Если они совпадают, значит сеанс получил уведомление о собственных действиях, так что его можно игнорировать.

… и гарантированность порядка доставки:

Не считая фильтрации последующих экземпляров дублирующихся уведомлений, NOTIFY гарантирует, что уведомления от одной транзакции всегда поступают в том же порядке, в каком были отправлены. Также гарантируется, что сообщения от разных транзакций поступают в порядке фиксации этих транзакций.

Мы не станем ничего специально объединять, поэтому каждый наш запрос как раз будет соответствовать отдельной транзакции.

Но помните, что если на используемом для обмена соединении есть еще и прикладная активность, наш NOTIFY может оказаться внутри транзакции не по своей воле, поэтому могут возникнуть сайд-эффекты:

Транзакции оказывают значительное влияние на работу NOTIFY. Во-первых, если NOTIFY выполняется внутри транзакции, уведомления доставляются получателям после фиксирования транзакции и только в этом случае. Это разумно, так как в случае прерывания транзакции действие всех команд в ней аннулируется, включая NOTIFY.

Поэтому лучше использовать соединение, где заведомо не будет транзакций и длинных запросов.

AccessExclusiveLock on object 0 of class 1262 of database 0


Если внезапно ваши NOTIFY начали подтупливать и выдавать в лог ожидание такой блокировки, значит, вы все-таки уже «выросли из коротких штанишек», и пора задуматься о «взрослой» MQ.

Ведь очередь уведомлений хоть и достаточно велика (8GB в стандартных сборках), но все-таки конечна. Согласно ответу Tom Lane:

This lock is held while inserting the transaction’s notify message (s), after which the transaction commits and releases the lock.

То есть вариантов обхода останется не слишком много:

  • отправлять, но реже
    То есть агрегировать отправляемые показатели, если это какие-то счетчики, на более длинном интервале.
  • отправлять меньший объем
    Например, удалять из передаваемых JSON «дефолтные» с точки зрения приложения значения ключей.
  • отправлять только сигнал, вообще без контента
    Как вариант — завести несколько каналов, имя каждого уже будет нести само по себе какой-то прикладной смысл.
  • все-таки вынести отправку из БД


Передача «сложных» сообщений


Кодирование «тела»


В общем случае, мы можем захотеть передавать в сообщении не только разрешенные символы, но и русские буквы, и «всякую бинарщину» — поэтому было бы удобно использовать конвертацию в hex-представление для формирования передаваемой строки. И да, такой способ вполне работает:

NOTIFY test, E'\x20\x21'
Asynchronous notification "test" with payload " !" received from server process with PID 63991.


Но обратимся снова к документации:

Вы должны позаботиться, чтобы байтовые последовательности, которые вы создаёте таким образом, особенно в восьмеричной и шестнадцатеричной записи, образовывали допустимые символы в серверной кодировке. Когда сервер работает с кодировкой UTF-8, вместо такой записи байт следует использовать спецпоследовательности Unicode или альтернативный синтаксис Unicode, описанный в Подразделе 4.1.2.3. (В противном случае придётся кодировать символы UTF-8 вручную и выписывать их по байтам, что очень неудобно.)

Поэтому даже с банальным символом кавычки-лапки из win1251 мы хлебнем горя:

NOTIFY test, E'\x98'
-- ERROR:  invalid byte sequence for encoding "UTF8": 0x98


Поскольку »кодировать символы UTF-8 вручную и выписывать их по байтам» мы не хотим, сразу договоримся передавать тело сообщения упакованным в base64 при наличии в нем любых символов за пределами диапазона \x20-\x7E или при необходимости разбиения на сегменты. С одной стороны, такой метод упаковки не слишком сильно увеличивает избыточность (коэффициент 4:3), с другой — в любом языке реализован на уровне системных библиотек, и обеспечит минимальную дополнительную нагрузку.

Но даже если у нас нет «странных» символов, и сообщение умещается в один сегмент, все равно остается одна особенность — экранирование апострофа:

Чтобы включить апостроф в строку, напишите в ней два апострофа рядом, например: 'Жанна д''Арк'. Заметьте, это не то же самое, что двойная кавычка (»).


Идентификация сегментов


Следующая задача — корректно «порезать» сообщение на разрешенные к передаче блоки по 7999 байт, если его размер вдруг превысил это значение. Причем так, чтобы на получателе можно было собрать его без нарушения порядка или попадания в цепочку «чужих» сегментов. Для этого каждый из них надо как-то проидентифицировать.

Собственно, две «координаты» нам уже известны — это PID процесса-отправителя и имя канала, приходящие в каждом уведомлении. А порядок поступления сегментов нам гарантирует сам протокол.

Соседи-писатели

Мы не будем рассматривать случай, когда на одном соединении с БД (то есть заведомо в рамках одного прикладного процесса) активны одновременно несколько писателей в адрес одного и того же канала. Технически, это можно поддержать передачей дополнительного идентификатора в заголовке сегмента —, но лучше «расшарить» единственный PubSub-объект внутри своего приложения.


Ограничение контейнера


Чтобы собрать цельный контейнер из нескольких сегментов, нам надо знать момент его окончания. Для этого есть два типовых способа:

  • передача целевого размера (в байтах или сегментах) в первом из них
  • передача признака [не]последнего сегмента в каждом из них


Поскольку мы пишем все-таки PubSub, большинство сообщений у нас будут короткими и резервировать много байт под передачу размера невыгодно. Поэтому воспользуемся вторым способом, зарезервировав первый символ данных сегмента в качестве флага продолжения/окончания контейнера.

Передача объектов


Чтобы передавать в качестве «сообщения» как обычные текстовые строки, так и JSON-объекты, добавим еще один символ-признак для обратного преобразования на стороне получателя.

Поскольку мы приняли решение кодировать «неформат» в base64, для флагов можно взять любые разрешенные символы, не входящие в этот набор.

Итого, у нас получились следующие варианты передаваемых сегментов:

-- "короткая" текстовая строка
!simple string
-- "короткий и простой" объект
@{"a":1}
-- нефинальный сегмент в base64
#
-- завершающий сегмент в base64
$

Как видно, достаточно проанализировать при получении сегмента всего лишь первый символ, чтобы понять, что нужно с ним дальше делать.

Пишем реализацию PubSub


Наше приложение будет на Node.js, поэтому для работы с PostgreSQL воспользуемся модулем node-postgres.

Пишем стартовый каркас
Для начала создадим PubSub как наследника EventEmitter, чтобы иметь возможность генерировать события в адрес тех, кто подписался на конкретные каналы:
const util = require('util');
const EventEmitter = require('events').EventEmitter;

const PubSub = function(connection, interval, skipSelf) {
  // используем уже существующее соединение
  this.connection = connection;

  // подписываемся своей обработкой на получение всех уведомлений
  this.connection.on('notification', p._onmessage.bind(this));

  // не принимать уведомления от собственного соединения с БД
  this.skipSelf = skipSelf;

  // запускаем "антифриз"
  setInterval(() => {
    this.connection.query('SELECT 1');
  }, interval);

  // тут будем хранить сегменты "недополученных" сообщений
  this.slices = {};
};

util.inherits(PubSub, EventEmitter);

const p = PubSub.prototype;


Работаем с каналами
Поскольку LISTEN/UNLISTEN никак не ругаются при повторной подписке на канал или отписке от того, на что мы не были подписаны, то и усложнять ничего не будем.
// если в имени канала какой-то "неформат", его надо заключить в кавычки
// сам символ двойных кавычек - и так не допускается в имени канала
const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`;

p.subscribe = function(channel) {
  this.connection.query(`LISTEN ${quot(channel)}`);
  return this;
};

p.unsubscribe = function(channel) {
  this.connection.query(`UNLISTEN ${quot(channel)}`);
  return this;
};


Передача и прием сообщения
const PAYLOAD_LIMIT  = 8000 - 1;
const PAYLOAD_FL_STR = '!';
const PAYLOAD_FL_OBJ = '@';
const PAYLOAD_FL_SEQ = '#';
const PAYLOAD_FL_FIN = '$';
const PAYLOAD_SZ_HEAD = 1;
const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD;

// только "простые" символы
const reASCII = /^[\x20-\x7E]*$/;

// отправка
p.publish = function(channel, payload) {
  let query = `NOTIFY ${quot(channel)}`;
  if (payload !== null && payload !== undefined) {
    // кодируем тип сообщения - строка или объект
    let str = typeof payload == 'string'
      ? PAYLOAD_FL_STR + payload
      : PAYLOAD_FL_OBJ + JSON.stringify(payload);
    if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) {
      // отправляем сегменты base64-представления
      const b64 = Buffer.from(str).toString('base64');
      for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) {
        let fin = pos + PAYLOAD_SZ_DATA;
        let seg = fin >= len
          ? PAYLOAD_FL_FIN + b64.slice(pos)
          : PAYLOAD_FL_SEQ + b64.slice(pos, fin);
        this.connection.query(`${query}, '${seg}'`);
      }
    }
    else {
      // все уместилось в один сегмент с допустимыми символами?
      // не забываем экранировать апостроф
      str = str.replace(/'/g, "''");
      this.connection.query(`${query}, '${str}'`);
    }
  }
  else {
    // простой сигнал в канал без сообщения
    this.connection.query(query);
  }
  return this;
};

// прием и сборка
p._onmessage = function(msg) {
  const {processId, channel, payload} = msg;

  // пропускаем "свои"
  if (processId == this.connection.processID && this.skipSelf) {
    return;
  }

  // "координаты" источника
  const id = `${processId}:${channel}`;

  let rv;
  // тип сегмента
  let fl = payload.charAt(0);

  if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) {
    // base64
    const str = payload.slice(PAYLOAD_SZ_HEAD);
    const slices = this.slices;

    let b64;
    if (fl == PAYLOAD_FL_FIN) {
      // собираем контейнер
      if (slices[id]) {
        slices[id].push(str);
        b64 = slices[id].join('');
        delete slices[id];
      }
      else {
        b64 = str;
      }
    }
    else {
      // дописываем сегмент в массив
      if (slices[id]) {
        slices[id].push(str);
      }
      else {
        slices[id] = [str];
      }
    }

    if (b64) {
      rv = Buffer.from(b64, 'base64').toString();
      fl = rv.charAt(0);
    }
  }
  else {
    // простая строка/объект
    rv = payload;
  }

  if (rv !== undefined) { // может быть ''
    let res = {
      processId
    , channel
    };
    if (rv) {
      // распаковка сообщения в соответствии с типом
      let data = rv.slice(1);
      res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data;
    }

    this.emit(channel, res);
  }
};


Немного тестов
const pg = require('pg');

const pgsql = new pg.Client({
  host : 'example-db'
, port : 5432
, user : 'postgres'
, password : 'postgres'
, database : '_tmp'
});

pgsql.connect(err => {
  let psA = new PubSub(pgsql, 1000);
  let psB = new PubSub(pgsql, 1000);

  let chA = 'channel:A';
  let chB = 'channel:B';
  psA.subscribe(chA);
  psB.subscribe(chB);

  psA.on(chA, (msg) => {
    console.log('A:rcv', msg);
  });
  psB.on(chB, (msg) => {
    console.log('B:rcv', msg);
  });

  psB.publish(chA);
  psB.publish(chA, 'simple string');
  psB.publish(chA, 'мама мыла раму');
  psB.publish(chA, {a : 1});
  psA.publish(chB, 'мама мыла раму 100 раз '.repeat(100));
});


Все достаточно просто, поэтому можно легко реализовать на любом другом ЯП, используемом в вашем проекте, взяв за основу примеры работы с асинхронными уведомлениями:

© Habrahabr.ru