Превращение событий PostgreSQL в события Laravel
Краткая аннотация
В прошлый раз я описал, как можно поставить задачу (Job) в очередь Laravel из хранимой процедуры или триггера PostgreSQL.
В этой статье я расскажу, как можно преобразовать события, возникающие в PostgreSQL, в события Laravel.
Рабочий пример выложен на GitHub.
Вместо введения
Зачастую требуется знать, что происходит с данными в базе и оперативно реагировать на это.
Самый простой способ получить информацию об изменениях — регулярный опрос базы данных, с целью проверить, существуют ли изменения, на которые следует реагировать. И зачастую разработчики именно так и поступают. Это не самый эффективный метод, поскольку он может привести к излишней нагрузке на сервер и требует изменения существующих или создания новых таблиц.
Но PostgreSQL может оповещать клиентов через механизм под названием «LISTEN/NOTIFY». Документация находится здесь и здесь.
У этого механизма есть недостатки:
Требует долгосрочного открытого соединения с базой данных, что может быть непрактично.
Система уведомлений не гарантирует доставку или соблюдения порядка сообщений, не следует использовать эту технологию в качестве полнофункциональной очереди сообщений. «LISTEN/NOTIFY» следует использовать только для лёгкого взаимодействия между процессами.
Размер сообщения ограничен размером строки (8192 байта в PostgreSQL 13).
Для более глубокого понимания этого механизма можно обратиться к этому материалу.
Я же просто покажу, как создать команду Artisan в чем-то похожую на команду queue: work, которая будет делать своё дело и лучше в связке с Supervisor.
«Щупаем» технологию
Для быстрого ознакомления я создам docker-контейнер:
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5433:5432 --name pgsql postgres
Обратите внимание, что я использую порт 5433, потому, что у меня на родном порту крутится «стационарная» СУБД.
Теперь запускаем три терминала, подав во всех команду:
docker exec -it pgsql psql -U test
В первом и втором терминале подаём команду:
LISTEN my_event;
Эта команда «подписывает» клиента на уведомления от сервера PostgreSQL через канал с именем my_event. Если какой-либо другой клиент выполняет команду NOTIFY my_event, то все клиенты, которые выполнили команду LISTEN my_event, получат уведомление.
В третьем терминале подаём команду:
NOTIFY my_event, 'Hello, PostgreSQL!';
Затем в первом и втором терминале ещё раз подаём команду:
LISTEN my_event;
Здесь у меня есть некоторое недопонимание. Я считал, что первый и второй терминалы должны получать оповещение автоматически, но они почему-то требуют повторного вызова «LISTEN my_event».
Можете поиграться, и убедиться, что без подписки получить оповещение невозможно, также, как и прочитать его два раза.
Работает!
Два первых терминала можно закрыть. Третий можно оставить для опытов.
Создаём приложение Laravel
composer create-project laravel/laravel listen_notify
cd listen_notify
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache
Настройки соединения с базой данных в .env
DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5433
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test
Удалите все существующие миграции. Они нам не понадобятся.
Теперь создадим заготовки для команды artisan, event и listener, слушающий этот event
php atrisan make:command ListenNotifyCommand
php artisan make:event PostgresNotificationReceived
php artisan make:listener LogPostgresNotification --event=PostgresNotificationReceived
Настроим EventServiceProvider, добавим в массив $listen следующее значение:
protected $listen = [
Registered::class => [
SendEmailVerificationNotification::class,
],
// Добавленное значение
PostgresNotificationReceived::class => [
LogPostgresNotification::class,
]
];
Каркас приложения готов
Создаём слушателя событий Laravel
Не буду придумывать сложной логики. Просто запишу полученные данные в лог-файл
Содержимое файла app/Listeners/LogPostgresNotification.php
notification);
}
}
Создаём событие Laravel
Тоже не буду мудрствовать, просто передам payload через конструктор в event
Содержимое файла app/Events/PostgresNotificationReceived.php
Пишем логику команды listen: notify
Пойдём от простого к сложному. Для начала просто получим сообщение от PostgreSQL.
Для этого дорабатываем метод handle, не забывая доработать поля $signature и $description
Содержимое файла app/Console/Commands/ListenNotifyCommand.php
getPdo();
// Listen to the 'my_channel' notifications
$pdo->exec("LISTEN my_event");
$this->info('Starting');
// Forever loop
while (true) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
}
}
}
Проверяем, работает ли наша команда. Запускаем в терминале команду: php artisan listen: notify и в третьем терминале, (мы его не закрывали) вновь подаём NOTIFY my_event, 'Hello, PostgreSQL!';
Отлично! Работает!
Начало положено. Приложение на Laravel получило событие из PostgreSQL.
При изменении кода команды, не забывайте перезапускать процесс.
Добавляем обработку сигналов
Немного о сигналах, гуру могут пропустить
Сигналы являются частью стандартов POSIX и служат для асинхронного уведомления процесса о каком-либо событии в операционных системах Unix и похожих на них, например, Linux. Приложения в этих системах могут обрабатывать поступающие сигналы, например, на остановку процесса (SIGTERM, SIGINT), перезагрузку (SIGHUP) и т.д.
Windows не поддерживает POSIX сигналы. Он использует собственные механизмы для управления процессами и потоками, включая функции Windows API для отправки и обработки сигналов управления, таких как Ctrl+C.
Однако в Windows есть некоторые среды, такие как Windows Subsystem for Linux (WSL), которые предоставляют совместимость со стандартами POSIX и поддерживают POSIX-сигналы.
В контексте PHP и командной строки нам интересны следующие сигналы:
SIGINT (сигнал прерывания). Этот сигнал обычно посылается при нажатии Ctrl+C в терминале. Он сообщает процессу о необходимости остановиться.
SIGTERM (закончить выполнение). Это стандартный сигнал для остановки процесса в Unix. Программы могут перехватить этот сигнал и выполнить любую необходимую работу перед завершением. Если программа не перехватывает SIGTERM, она завершится немедленно.
SIGKILL (убить процесс немедленно). Этот сигнал нельзя перехватить или игнорировать. Когда процесс получает SIGKILL, он немедленно останавливается.
Добавляем два поля ($hasPcntl, $running) типа bool в класс ListenNotifyCommand и инициализируем их. Пишем метод — обработчик сигналов
protected bool $hasPcntl = false;
protected bool $running = true;
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
Для обработки сигналов потребуется расширение pcntl. Данное расширение недоступно для Windows, тем не менее, вполне возможно написать кроссплатформенное решение.
Дорабатываем метод handle
public function handle(): int
{
// Проверка, что модуль pcntl подключён
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
// Если модуль pcntl подключён, назначаем обработчики сигналов
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
$this->info('iter');
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
if ($this->hasPcntl) {
// Если модуль pcntl подключён, вызываем обработчики сигналов
pcntl_signal_dispatch();
}
}
// Возвращаем 0, как код завершения
return 0;
}
Файл app/Console/Commands/ListenNotifyCommand.php
hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
$this->info('iter');
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
if ($this->hasPcntl) {
pcntl_signal_dispatch();
}
}
return 0;
}
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
}
Можете запустить команду, а также вызвать в соседней консоли NOTIFY, всё должно работать. Если команда запущена в Linux и модуль pcntl подключён, то при нажатии Ctrl+C будет выведено сообщение: Received stop signal, shutting down… Это значит, что скрипт корректно обрабатывает сигналы и останавливается, вместо того, чтобы быть принудительно остановленным.
Настраиваем Supervisor для наблюдения за скриптом
Supervisor — это удобный инструмент для управления фоновыми процессами в операционных системах, похожих на Unix. Он отслеживает работу скрипта, автоматически перезапускает его при сбое и даёт возможность управлять его состоянием, такими как запуск, остановка и перезагрузка.
Supervisor также совместим с сигналами Unix, что дает возможность настраивать поведение процесса в зависимости от разных сигналов. Mы настроили скрипт на обработку сигналов SIGINT и SIGTERM, чтобы правильно завершить его, что хорошо согласуется с Supervisor.
Когда Supervisor отправляет сигнал SIGTERM процессу, он ожидает, что процесс завершит свою работу и передаст управление обратно системе.
Если процесс успешно обработал SIGTERM и корректно завершил свою работу, обычно возвращается код завершения 0.
Если процесс не вернул управление за разумное время (по умолчанию 10 секунд), Supervisor посылает SIGKILL. Это время можно изменить в настройках, опция stopwaitsecs.
Примерный файл конфигурации Supervisor
[program:postrgres_laravel]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/laravel/artisan listen:notify
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/postrgres_laravel.log
Подробнее про Supervisor в документации Laravel.
Сериализуем полезную нагрузку
Аргумент для NOTIFY — всегда строка. Т.е. если мы хотим передать что-либо cложное, нам нужно это сложное — сериализовать. PostgreSQL умеет работать с JSON, давайте используем это умение.
Создадим хранимую функцию, которая принимает на вход json, и отправляет его в канал my_event.
Файл миграции 2024_03_05_125805_create_send_notify_function.php
Выполняем миграцию
php artisan migrate
У меня всё прошло успешно. Теперь нужно чуть-чуть доработать handle
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
}
Здесь я выделяю $payload и вывожу её в терминал.
Давайте проверим, всё ли у нас работает. Запустите команду php artisan listen: notify. На этот раз в терминале psql подадим следующую конструкцию:
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!'));
Смотрим в терминал, работает. Давайте передадим что-либо менее тривиальное:
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)))
И опять работает!
Главное, не увлекаться, и помнить об ограничении на 8192 байта.
Сводим воедино
Теперь осталось совсем немного. В нашей команде отправить событие, чтобы его могли слушать все слушатели, которые на него подписаны. Для этого добавляем всего одну строчку:
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
// Новая строка
Event::dispatch(new PostgresNotificationReceived($payload));
}
Полный листинг файла app/Console/Commands/ListenNotifyCommand.php
hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
Event::dispatch(new PostgresNotificationReceived($payload));
}
if ($this->hasPcntl) {
pcntl_signal_dispatch();
}
}
return 0;
}
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
}
Запускаем команду: php artisan listen: notify и подаём в соседнем терминале команду
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)));
По нашей задумке, слушатель события пишет полезную нагрузку в лог. Смотрим в лог.
В логе полезная нагрузка
Запускаю виртуальную машину и проверяю, как скрипт отрабатывает сигналы в Linux
Видно, что сигнал отработал и приложение нормально завершилось
Видно, что в ответ на Ctrl+C скрипт выводит сообщение: Received stop signal, shutting down…
Что можно улучшить?
Скорее всего, обработку полученного сообщения, следует тотчас же закинуть, как задачу, в асинхронную очередь Laravel, чтобы обработка сообщения не тормозила «бесконечный» цикл, что может привести к пропуску сообщений или аварийному завершению скрипта процессом Supervisor.
Дальнейшее принятие решений о диспетчеризации событий выполнять в этой задаче.
Заключение
Вы можете использовать функцию send_notify в других хранимых функциях или триггерах PostgreSQL, передавая из триггеров значения переменных TG_TABLE_NAME и TG_OP для принятия решения, что и как обработать. В статье показана всего лишь основа для получения событий из PostgreSQL. Как применять их на практике, зависит только от воображения разработчика.
Рабочее приложение можно найти на GitHub