Прозрачный переход PgQ -> RabbitMQ
Дорогой хаброчитатель, я хочу поделиться опытом по поводу прозрачного для приложения перехода с очереди PgQ на amqp. Возможно это покажется велосипедом, возможно какие-то мысли пригодятся. Статья предполагает знакомство с основами PgQ и rabbitmq.
ПредпосылкиТак исторически сложилось, что в нашем проекте очень активно используется PgQ. При всех своих недостатках, у PgQ есть неоспоримое преимущество — транзакционность с базой данных, чем активно пользовался наш код. То есть, можно быть уверенным, что и в очередь событие попадет, и база обновится. Или ни того, ни другого не случится. И это преимущество должно быть перенесено на новый движок очередей.Подробно описывать причины ухода с PgQ я тут не буду, это тема для другой статьи. Остановлюсь только конкретно на самом переходе.
Ход размышлений, pg_amqp Гугление наводит на расширение для PostgreSQL — pg_amqp. Оно предоставляет хранимые процедуры в PostgreSQL для отправки в amqp. Расширение отлично работает на уровне логики приложения: откатив транзакцию в PostgreSQL — данные в amqp не попадут. А если закомитим — попадут. BEGIN; INSERT INTO some_table (…) VALUES (…); SELECT amqp.publish (broker_id, 'amqp.direct', 'foo', 'bar'); ROLLBACK; //Данные и в базу не вставились, и в amqp не отправились На самом деле расширение не гарантирует то что сообщение попадет в amqp. Внутри всего лишь последовательный коммит транзакции сначала в PostgreSQL, а потом в amqp. И если пропадет соединение с amqp между двумя комитами — сообщение пропадет. Несмотря на то что вероятность такого события маленькая, потерянные пакеты будут. А учитывая, что мы работаем с реальными деньгами и торговыми счетами, это недопустимо.Для тех, кому потеря 0.01% пакетов допустима — остаток статьи можно не читать. Просто используйте pg_amqp, если хотите уйти с PgQ на amqp.
Начинаем строить велосипед * Далее вместо абстрактного amqp будет конкретный rabbitmq.Но ведь у нас остается PostgreSQL, внутри которого транзакции полноценные. И мы можем транзакционно вставлять все пакеты в какую-то таблицу, а потом как-то досылать в amqp то что туда не дошло.
Сказано — сделано.
Вся работа с PgQ у нас в приложении делалось с помощью одной хранимой процедуры, которую я мог свободно менять.
Я создал таблицу
amqp.message ( id bigint default nextval ('amqp_message_id_sequence') primary key, pid bigint, queue varchar (128), message text ) и тригер, который при вставке в таблицу отправлял эти данные в amqp. А хранимку вставки в pgq заменил на вставку данных в эту таблицу. Overhead при этом только в отправке данных в amqp, так как в PgQ тоже при каждом событии происходит вставка в таблицу. Зачем нужен pid объясню позже.Теперь сообщения есть и в таблице, и у получателя из rabbitmq. В таблицу сообщения пишутся гарантированно в рамках транзакции PostgreSQL, а в amqp отправляются почти все сообщения, с помощью pg_amqp. Но как понять какие сообщения пришли, а какие нет? И как держать эту таблицу во вменяемых размерах (желательно десятки или сотни строк), чтобы не потерять производительность?
Тут на помощь приходит сам rabbitmq. Ведь он умеет сообщения дублировать в несколько очередей
Так давайте одну очередь отдадим нашему бизнес-коду, а вторую будем использовать для подтверждения получения пакета? Сказано — сделано. Создаем exchange, 2 очереди и досыльщик, который просто удаляет из таблицы amqp.message полученное сообщение.
В итоге, есть таблица, в которой хранятся только те сообщения, которые «в пути». Размер таблицы всегда небольшой, так как сообщения сразу же после вставки удаляются. Размер таблицы можно поставить на мониторинг. А бизнес-код нашего приложения теперь работает только с rabbitmq и ничего не знает о магии под капотом.
Вот как выглядит итоговая схема работы Но теперь появляется важный вопрос:, а как понять, что какой-то пакет не пришел? Ведь строка в таблице amqp.message ещё не гарантирует, что сообщение пропало — оно может быть просто «в пути». Нам нужно быть в этом уверенным, чтобы досылать пакет, иначе мы можем создать дубль пакета, и кому-то зачислим 200$ вместо 100$ :) И в то же время определить, что пакет не пришел и дослать его нужно максимально быстро, чтобы минимально нарушать порядок следования пакетов в очереди.
Вот тут начинается основное шаманство Все наши пакеты пронумерованы по возрастанию, но ведь система многопоточна, и пакеты не обязаны приходить в rabbitmq в том порядке, в котором они лежат в таблице. А вот в рамках одного процесса, который отправляет сообщения в amqp, они должны быть строго упорядочены. PostgreSQL предоставляет возможность посмотреть pid текущего процесса (pg_backend_pid ()). И мы можем ожидать, что в рамках одного pg_backend_pid () пакеты будут строго упорядочены по возрастанию (напоминаю, что id пакета мы генерируем с помощью nextval). А следовательно, при получении пакета с id N, все пакеты от этого же pg_backend_pid с id ниже N являются не доставленными и их нужно дослать.Итого, нам нужно сделать досыльщик очереди, который делает всего 2 вещи:
Слушает очередь «Очередь досыльщика» Проверяет, нет ли в таблице amqp.message сообщений с текущим pid и id меньшим чем текущий. Если есть — досылает их (досылает опять таки транзакционно, через таблицу amqp.message) Удаляет из таблицы amqp.message сообщения по id Профит! У нас все сообщения доходят до адресата, и при этом мы полностью избавились от PgQ. Код основного приложения практически не изменился.
Накладные расходы на все:
Системная таблица amqp.message, в которую мы вставляем каждое сообщение, а потом удаляем Досыльщик, который удаляет строки из amqp.message и досылает сообщения Обращу внимание на то, что логика досыльщика совершенно устойчива к падениям. Его в любой момент можно убить, он снова запустится и продолжит работу без каких-либо проблем.
Система не учитывает случай, когда процесс postgres отправил пакет в amqp, который не дошел, и больше пакеты не отправляет. Буду благодарен, если кто-то подскажет, как автоматически обработать эту ситуцию. Сейчас это решается просто мониторингом, но ни одного такого события пока не было. Вообще факт досылки сообщения — очень редкое событие. У нас используется pgbouncer, что уменьшает множество различных pg_backend_pid.
Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.