Постановка задачи (Job) в очередь Laravel из хранимой процедуры или триггера PostgreSQL
Зачем это понадобилось?
Есть приложение A, которое крутится где-то там, и к которому у меня нет доступа. Это приложение использует API реализованное с помощью хранимых процедур. Есть приложение B, которое настраивает данные для приложения А, а также строит отчёты по работе приложения А. К приложению B и хранимым процедурам у меня доступ есть.
Со временем приложение B обрасло телеграм-ботом. Получать оповещения в режиме реального времени всем понравилось, и тут поступила хотелка: когда в приложении А происходит событие X, оповещать пользователей приложения B через телеграм-бота.
Задача была бы несложной, если можно было бы договориться о том, чтобы приложение A уведомляло приложение B о наступлении события X.
Но договориться не удалось.
Анализ системы показал, что при наступлении события X, приложение A вызывает хранимую процедуру Y. Немного подумав, я решил запустить задание (Job) в приложении B, банально вставив строчку в таблицу jobs. А уж это задание отправит оповещение через телеграм-бота.
Что нужно для успеха?
Драйвер очереди должен быть выбран, как database. Должны быть настроены и запущены воркеры.
Реализация
Лучше один раз увидеть и пощупать, чем сто раз прочитать теорию, поэтому реализацию описываю в виде туториала для тестового приложения.
Кто считает это излишним, переходите сразу к этому разделу. Вернуться к началу никогда не поздно.
К сожалению, элегантного решения для общего случая я не нашёл, задача решается «в лоб» и для каждого задания решение будет уникальным.
Но общую идею я опишу.
Наше тестовое приложение будет отправлять письмо и эта отправка будет инициирована хранимой процедурой.
Создание тестового приложения
composer create-project laravel/laravel pgsql_job
cd pgsql_job
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache
Настройте соединение с базой данных. Если базы нет, можно запустить
docker-контейнер
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5432:5432 --name pgsql postgres
После этого обращаться к базе. Настройки в .env
DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5432
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test
В качестве драйвера очереди укажите database (QUEUE_CONNECTION=database).
Опыты будем ставить на отправке писем. Чтобы не настраивать соединение, поставьте MAIL_MAILER=log. Пусть письма падают в логи.
Создание уведомления
make:php artisan make:notification TestNotify
Заходим в app/Notifications/TestNotify.php и изменяем конструктор
public function __construct(public $subject, public $id)
{
//
}
и метод toMail ()
public function toMail(object $notifiable): MailMessage
{
return (new MailMessage)
->subject($this->subject)
->line('ID: ' . $this->id);
}
В результате должно получиться
subject($this->subject)
->line('ID: ' . $this->id);
}
public function toArray(object $notifiable): array
{
return [
//
];
}
}
Давайте проверим, работает ли наше уведомление.
Заходим в tinker
php artisan tinker
После появления приглашения вводим код
use App\Notifications\TestNotify;
Notification::route('mail', 'test@mail.ru')->notify(new TestNotify('Test Subject', 123456));
Смотрим в лог
Отлично! Всё на месте. И тема письма, и ID и почтовый адрес. Лог можно удалить, чтобы не искать факт повторной отправки письма.
Создание задания
php artisan make:job TestJob
Заходим в app/Jobs/TestJob.php и изменяем конструктор
public function __construct(public $email, public $subject, public $id)
{
//
}
и метод handle ()
public function handle(): void
{
Notification::route('mail', $this->email)->notify(new TestNotify($this->subject, $this->id));
}
В результате должно получиться
email)->notify(new TestNotify($this->subject, $this->id));
}
}
Временно настроим драйвер очереди на sync (QUEUE_CONNECTION=sync) и проверим работу нашего задания в tinker. Если вы ещё из него не вышли — выйдите. Запущенный tinker ничего не знает о нашем новом задании. Заходим в tinker и после появления приглашения вводим
use App\Jobs\TestJob;
\Bus::dispatch(new TestJob('tset@liam.ur', 'Subject Test', 654321));
Почему так странно отправили задание? Ответ находится здесь.
Смотрим в лог
Опять всё на месте. Удаляем лог, менем драйвер очереди на database.
Мы получили уведомление и задание, которое это уведомление отправляет. Настала пора «засунуть» отправку задания в хранимую функцию. Но для начала поймём, как должна выглядеть запись о постановке задания в очередь.
Декодирование записи о задании из таблицы jobs
Драйвер очереди переключили? Выйдите и войдите в тинкер. Иначе тинкер ничего не будет знать об этом переключении. Давайте ещё раз с помощью тинкер отправим задание в очередь. В таблице jobs должна появиться запись.
Запись из таблицы jobs
Поля available_at и created_at равны результату выполнения функции time () на момент вставки записи в таблицу. Это количество секунд, прошедших с начала эпохи Unix (1 января 1970 00:00:00 GMT) до текущего времени.
Поле payload содержит в себе json, с которым мы сейчас разберёмся.
Заходим в тинкер, выполняем
json_decode(\DB::table('jobs')->first()->payload);
Результат выполнения
> json_decode(\DB::table('jobs')->first()->payload);
= {#6275
+"uuid": "0c2c7167-804a-4bed-9991-8827571f6be8",
+"displayName": "App\Jobs\TestJob",
+"job": "Illuminate\Queue\CallQueuedHandler@call",
+"maxTries": null,
+"maxExceptions": null,
+"failOnTimeout": false,
+"backoff": null,
+"timeout": null,
+"retryUntil": null,
+"data": {#6298
+"commandName": "App\Jobs\TestJob",
+"command": "O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}",
},
}
Собственно, создать такой json в PostgreSQL почти не проблема. Единственное затруднение — это содержимое data→command, в котором хранится сериализованный объект App\Jobs\TestJob
Подаём команду в tinker
unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);
Получаем
> unserialize(json_decode(\DB::table('jobs')->first()->payload)->data->command);
= App\Jobs\TestJob {#6318
+email: "tset@liam.ur",
+subject: "Subject Test",
+id: 654321,
+job: null,
+connection: null,
+queue: null,
+chainConnection: null,
+chainQueue: null,
+chainCatchCallbacks: null,
+delay: null,
+afterCommit: null,
+middleware: [],
+chained: [],
}
Проблема в том, что PostgreSQL ничего не знает про объекты PHP и об алгоритме сериализации. Поэтому я принял решение, работать с этим значением, как со строкой.
Теперь можно приступать к работе с PostgreSQL
Вставка записи о задании в таблицу jobs средствами PostgreSQL
Чтобы вставить запись о задании в таблицу jobs нам нужно:
сгенерировать uuid
сосчитать количество прошедших с начала эпохи Unix (1 января 1970 00:00:00 GMT) до текущего времени
сформировать json, в который следует вставить необходимые для выполнения задания данные
Учим PostgreSQL генерировать uuid
Чтобы сгенерировать uuid в PostgreSQL нужны функции, содержащиеся в модуле uuid-ossp. Зачастую этот модуль отключён и его нужно включить. Я покажу, как проверить, подключены ли нужные функции на примере докер-контейнера, описанного выше.
Подключаюсь к запущенному контейнеру
docker container exec -it pgsql bash
В контейнере подключаюсь к терминальному клиенту для работы с PostgreSQL — psql
# psql -U test
Проверяю, какие функции подключены
test=# \df
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+------+------------------+---------------------+------
(0 rows)
Видно, что функций для работы с uuid нет. Пробуем их добавить.
test=# CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION
test-# \df
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+--------------------+------------------+---------------------------+------
public | uuid_generate_v1 | uuid | | func
public | uuid_generate_v1mc | uuid | | func
public | uuid_generate_v3 | uuid | namespace uuid, name text | func
public | uuid_generate_v4 | uuid | | func
public | uuid_generate_v5 | uuid | namespace uuid, name text | func
public | uuid_nil | uuid | | func
public | uuid_ns_dns | uuid | | func
public | uuid_ns_oid | uuid | | func
public | uuid_ns_url | uuid | | func
public | uuid_ns_x500 | uuid | | func
(10 rows)
Нам нужна функция uuid_generate_v4. Пробуем ею воспользоваться.
test=# select uuid_generate_v4();
uuid_generate_v4
--------------------------------------
8a7a8be0-e4cd-4c0a-8275-dcf2ed58c5a9
(1 row)
Отлично, работает!
Если в вашем случае функции не добавились, обратитесь к документации по PostgreSQL.
В прошлом этот модуль зависел от библиотеки OSSP UUID, что отразилось в его имени. Хотя библиотеку OSSP UUID всё ещё можно найти по адресу http://www.ossp.org/pkg/lib/uuid/, она плохо поддерживается и её становится всё сложнее портировать на новые платформы. Поэтому модуль
uuid-ossp
теперь на некоторых платформах можно собирать без библиотеки OSSP. Во FreeBSD и некоторых других ОС на базе BSD подходящие функции формирования UUID включены в системную библиотекуlibc
. В Linux, macOS и некоторых других платформах подходящие функции предоставляются библиотекойlibuuid
, которая изначально пришла из проектаe2fsprogs
(хотя в современных дистрибутивах Linux она является частью пакетаutil-linux-ng
). Вызываяconfigure
, передайте ключ--with-uuid=bsd
, чтобы использовать функции BSD, либо--with-uuid=e2fs
, чтобы использоватьlibuuid
изe2fsprogs
, либо ключ--with-uuid=ossp
, чтобы использовать библиотеку OSSP UUID. В конкретной системе может быть установлено сразу несколько библиотек, поэтомуconfigure
не выбирает библиотеку автоматически.
Учим PostgreSQL считать секунды
В PostgreSQL отсутствует функция, аналогичная функции time () в PHP. Поэтому получить нужное количество секунд можно с помощью такой конструкции
test=# select extract(epoch from current_timestamp)::integer;
extract
------------
1697631498
(1 row)
Собираем json
Самое простое, взять за основу уже готовую запись валидного задания. Берём за основу результат поданной в тинкер команды: json_decode (\DB: table ('jobs')→first ()→payload) и на основе полученного результата собираем json.
select json_build_object(
'uuid', uuid_generate_v4(),
'displayName', 'App\Jobs\TestJob',
'job', 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries', null,
'maxExceptions', null,
'failOnTimeout', false,
'backoff', null,
'timeout', null,
'retryUntil', null,
'data', (json_build_object('commandName', 'App\Jobs\TestJob',
'command', 'O:16:"App\Jobs\TestJob":3:{s:5:"email";s:12:"tset@liam.ur";s:7:"subject";s:12:"Subject Test";s:2:"id";i:654321;}'))
);
Обратите внимание, что вместо передачи известного uuid генерируется новый.
Данный запрос возвращает валидный payload для задачи, но нам нужно не просто поставить в очередь задачу, а передать ей данные.
Разберём поле command. Я передаю этот параметр строкой. В сериализованной строке передаётся тип данных и их размерность, если это требуется. Мы передаём в задачу три параметра: два строковых — email и subject и один целочисленный. Который отображается в теле письма. Найдём эти параметры в строке.
email: s:12: «tset@liam.ur». s — строка, 12 — длина строки, «tset@liam.ur» — содержимое строки, экранированное двойными кавычками.
Аналогично для subject.
В случае id: i:654321 i — целое число, 654321 — его значение.
Таким образом, надо подменить следующие параметры: размерность и содержимое.
Следует помнить, что в PostgreSQL с вычислением длины строк, закодированных в utf-8, ровно такая же беда, как и в PHP. Поэтому длину строковых переменных следует вычислять с помощью функции octet_length ();
Функция PostgreSQL
Создаём миграцию
php artisan make:migration create_set_job_function
Содержимое миграции
Выполняем миграцию
php artisan migrate
Переходим в терминал контейнера и пробуем вызвать эту функцию
test=# select set_job('e@mail.ru', 'Кирилица в заголовке', 09876);
set_job
---------
(1 row)
В терминале запускаем воркер
php artisan queue:work --once
И смотрим в лог
Упс. Почти всё правильно. Домашнее задание для читателей: найти ошибку и исправить код хранимой функции, чтобы она работала правильно при любом наборе данных.
Эпилог
Вот таким нехитрым способом можно заставить Laravel реагировать на события, происходящие в базе данных: хранимых функциях и триггерах.