RabbitMQ и Kotlin: делаем свою event-driven Jira на Spring
В этой статье мы рассмотрим, как можно в один клик развернуть RabbitMQ в облаке. Также напишем простое приложение на Spring Boot и Kotlin, которое будет взаимодействовать с этим брокером сообщений. Пример проекта вы можете посмотреть на github.

Представьте, что в один прекрасный день к вам пришёл заказчик и говорит: «в рамках импортозамещения мы должны отказаться от Jira и вместо неё разработать собственный инструмент для управления задачами». Конкретно вам поручено разработать небольшой сервис. Этот сервис оперирует статусами задач и подзадач, которые в неё входят.
На этапе MVP статусная модель содержит всего 3 значения: NEW
, IN_PROGRESS
и COMPLETED
. Статус основной задачи определяется в зависимости от статусов подзадач. Основная мысль такая: достаточно взять одну задачу в работу — и вся задача автоматом тоже берётся в работу. Но для завершения основной задачи должны быть выполнены все сабтаски. Более формально это определяется набором следующих правил:
если все подзадачи в статусе
NEW
— родительская задача также в статусеNEW
если хотя бы одна подзадача в статусе
IN_PROGRESS
илиNEW
— родительская задача переходит вIN_PROGRESS
если все подзадачи в статусе
COMPLETED
— задача также переходит вCOMPLETED

И ещё требуется отправлять уведомления (например, по email) при смене статусов задач. Причём этот функционал можно включать и выключать.
И тут мы должны принять важное архитектурное решение. Можно конечно при смене статусов сабтасок ещё и обрабатывать статусы родительских тасок. И если они меняются, тут же ещё и встраивать логику по отправке email. Но это делает логику не расширяемой и сложной в поддержке. Если завтра появится ещё несколько статусов, мы получим спагетти-код.
Чтобы этого избежать, лучше сразу сделать компоненты системы менее связанными и обеспечить их взаимодействие через события. То есть если меняется статус сабтаски (а только это действие напрямую исходит от пользователя) — мы генерим соответствующий эвент. Далее компонент обработки статуса получает это событие, выполняет проверку остальных сабтасок по указанной выше логике и при необходимости меняет статус. В это же самое время компонент, отвечающий за уведомление, параллельно перехватывает событие и отправляет email.
При таком подходе эти три компонента работают независимо, что упрощает их логику и тестирование.
При построении системы можно использовать разные брокеры сообщений, но мы рассмотрим наиболее популярный из них — RabbitMQ.
RabbitMQ и основные понятия
RabbitMQ (в обиходе «кролик») — это брокер сообщений, основанный на протоколе AMQP (Advanced Message Queuing Protocol). Это открытый протокол передачи сообщений, который нужен для общения разных частей системы между собой. Многие популярные фреймворки (в том числе и Spring) поддерживают этот протокол.
При работе с кроликом нам нужно рассмотреть три основные сущности: Queue, Exchange и Binding. В базовом варианте событие публикуется в обменник (Exchange). Затем на основе правил привязки (Binding) и ключа маршрутизации (Routing Key) это событие попадает в одну или несколько очередей (Queue). И уже из очереди сообщение вычитается соответствующим потребителем (Listener).
В Rabbit поддерживается несколько видов Exchange:
Fanout Exchange — независимо от ключа маршрутизации просто отправляем сообщение во все привязанные очереди. По сути это broadcast. Причём ещё и с минимальной задержкой, т.к. нет необходимости выполнять какие-либо проверки.
Direct Exchange — основной тип обменника, где привязка к очереди осуществляется по ключу. Здесь ключом может быть произвольная строка.
Topic Exchange — похож на Direct, но ключ можно задавать с помощью шаблона. При создании шаблона используются 0 или более слов (буквы A-Z и a-z и цифры 0–9), разделенных точкой, а также символы * и #. Использование шаблона позволяет более гибко настраивать маршрутизацию.
Topic Exchange работает чуть медленее, но он лучше всего подходит для дальнейшего расширения логики благодаря своей гибкости. К тому же вряд ли у нас задачи будут генериться тысячами каждую секунду.
Есть ещё и другие виды обменников. Более подробно cм. RabbitMQ. Часть 2. Разбираемся с Exchanges.
Разворачиваем RabbitMQ в облаке
Платформа dockhost.ru позволяет разворачивать различные брокеры сообщений и базы данных буквально в один клик. Для этого перейдём в «Панель Управления» и найдём пункт меню «Приложения». Среди приложений выберем RabbitMQ и наиболее подходящую конфигурацию для него. От количества потребляемых ресурсов зависит стоимость хостинга для кролика. В тестовых целях подойдёт и минимальный конфиг.

После того как брокер будет установлен, всё в том же разделе «Приложения» появится новая вкладка «Установлено». Переходим в неё и увидим новый инстанс с выбранной нами конфигурацией. Чтобы перейти в админку кролика, нажимаем кнопку «Web». Логин и пароль можно посмотреть в меню в разделе «Переменные окружения». Однако для наших целей можно вообще не заходить в админку. Все необходимые сущности мы создадим программно.
Чтобы сделать эндпоинт самого брокера доступным извне, нужно открыть порт. Для этого выберем в меню «Сетевые сервисы» — «Порты». Нажимаем «Добавить», затем выбираем контейнер кролика и даём порту какое-то понятное имя.

После этого Dockhost сгенерит бесплатный домен третьего уровня, по которому мы сможем обращаться к rabbit.
Делаем заготовку проекта
Теперь перейдём к написанию самого приложения на Spring. Заготовку проекта проще всего сделать с помощью Spring Initializr. В качестве языка выбираем Kotlin, тип проекта — Gradle — Kotlin, версию JDK выбираем 21.
Затем добавим 4 зависимости:
Spring Web — функционал Rest-контроллеров.
Spring Data JPA — работа с базой данных.
H2 Database — драйвер H2. Это база, которая работает в оперативной памяти и не требует установки, что очень удобно для демонстрации. Минусом является то, что после перезапуска приложения данные теряются. Но вы можете потом переключиться на другую СУБД (например, Postgres). Код при этом править не потребуется. Только параметры подключения.
Spring for RabbitMQ — собственно, компонент для интеграции с кроликом.
После добавления зависимостей нажимаем «Generate», скачиваем проект и открываем в IDE.
В проекте сразу найдём файл application.properties
и для удобства переименуем его в application.yml
. Затем вставим следующий конфиг:
spring:
application:
name: spring-rabbitmq-example
rabbitmq:
host: ${RABBIT_HOST}
port: ${RABBIT_PORT}
virtual-host: /
username: ${RABBIT_USER}
password: ${RABBIT_PASSWORD}
datasource:
url: jdbc:h2:mem:mydb
username: sa
password: password
driver-class-name: org.h2.Driver
В секции rabbitmq мы прописываем параметры подключения к брокеру сообщений. В целях безопасности мы не хардкодим их, а передаём через переменные окружения. Хост и порт прописываем те, которые нам сгенерил Dockhost при открытии порта. Логин и пароль можно использовать те же, что и при входе в админку, но лучше сгенерить отдельные (там же, в админке rabbit).
В секции datasource мы прописываем подключение к H2. Параметры подключения к ней хардкодим, т.к. тут БД по сути является частью приложения.
Структура хранения тасок и сабтасок
Теперь реализуем слой взаимодействия с БД с помощью Spring Data JPA.
Определим сущность сабтаски. В ней связь с родительской таской осуществляется по полю taskId.
@Entity
@Table(name = "subtask")
class SubtaskEntity (
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
var id: Int = 0,
@Column(name = "task_id")
var taskId: Int,
var status: Status,
var description: String,
)
В Kotlin для сущностей приходится использовать обычный class и делать все поля с модификатором var, т.е. изменяемыми. Хотя логичнее было бы делать неизменяемый data class. Такие ограничения накладывает Spring Data JPA. Если всё-таки использовать data-класс, то ошибки вы не получите, но на каждую модификацию строки в БД он будет создавать новый объект, что на больших масштабах может заметно сказаться на производительности.
Сама статусная модель представлена как обычный enum:
enum class Status {
NEW,
IN_PROGRESS,
COMPLETED,
}
Теперь определим интерфейс репозитория для сабтаски:
@Repository
interface SubtaskRepository : CrudRepository
Похожим образом определим родительскую сущность для таски. Однако здесь будем использовать связь «один-ко-многим», чтобы легко подгружать все дочерние сабтаски. Для этого используем аннотацию @OneToMany
с параметром FetchType.EAGER
(чтобы сразу подгружать связанные сущности).
@Entity
@Table(name = "task")
class TaskEntity (
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
var id: Int = 0,
var description: String,
var status: Status,
@OneToMany(targetEntity = SubtaskEntity::class, fetch = FetchType.EAGER)
@JoinColumn(name = "task_id")
var subtasks: List,
)
Репозиторий тасок также определяется в одну строку:
@Repository
interface TaskRepository : CrudRepository
После того как мы разметили сущности, при каждом старте приложения Spring будет автоматически создавать структуру таблиц в H2.
Создаём сущности RabbitMQ
Для работы с RabbitMQ в Spring используется RabbitTemplate. Однако если мы хотим отправлять эвенты в формате json, это надо настроить в явном виде. Создадим конфигурационный бин RabbitConfig
.
@Configuration
class RabbitConfig {
@Bean
fun rabbitTemplate(connectionFactory: ConnectionFactory): RabbitTemplate {
val template = RabbitTemplate(connectionFactory)
template.messageConverter = jsonConverter()
return template
}
@Bean
fun jsonConverter(): MessageConverter {
return Jackson2JsonMessageConverter()
}
}
Теперь сконфигурируем объекты Queue
, Exchange
и Binding
. У нас будет два таких набора: один для статусов сабтасок, а второй — для отправки уведомлений. Имена этих объектов удобно хранить как константы в классах-синглтонах (ключевое слово object):
object Exchanges {
const val TASK_STATUS_EXCHANGE = "task.status.exchange"
const val SUBTASK_STATUS_EXCHANGE = "subtask.status.exchange"
}
object Queues {
const val SUBTASK_STATUS_QUEUE = "subtask.status.queue"
const val NOTIFICATION_QUEUE = "notification.queue"
}
object RoutingKeys {
const val TASK_STATUS = "task.status"
const val SUBTASK_STATUS = "subtask.status"
}
Далее добавим в бин RabbitConfig
очередь для статусов сабтасок, обменник и связь между ними.
@Bean
fun subtaskStatusQueue() = Queue(Queues.SUBTASK_STATUS_QUEUE, true)
@Bean
fun subtaskStatusExchange() = TopicExchange(Exchanges.SUBTASK_STATUS_EXCHANGE, true, false)
@Bean
fun subtaskStatusBinding(subtaskStatusQueue: Queue, subtaskStatusExchange: Exchange) = BindingBuilder
.bind(subtaskStatusQueue)
.to(subtaskStatusExchange)
.with(RoutingKeys.SUBTASK_STATUS)
.noargs()
Вторым параметром при объявлении очереди и обменника идёт durable
, т.е. флаг, указывающий на то, что объект сохраняется при перезапуске самого брокера сообщений. Ставим в true
, т.к. мы не хотим просто так терять сообщения.
Третий бин получает на вход через параметры первые два бина. Причём имена параметров должны соответствовать именам методов, в которых определяются эти бины, иначе Spring не сможет автоматически их сопоставить.
Здесь же сразу объявим Exchange для статусов родительских задач, а также очередь для уведомлений по email и связку между ними.
@Bean
fun taskStatusExchange() = TopicExchange(Exchanges.TASK_STATUS_EXCHANGE, true, false)
@Bean
fun notificationQueue() = Queue(Queues.NOTIFICATION_QUEUE, true)
@Bean
fun notificationBinding(notificationQueue: Queue, taskStatusExchange: Exchange) = BindingBuilder
.bind(notificationQueue)
.to(taskStatusExchange)
.with(RoutingKeys.TASK_STATUS)
.noargs()
В итоге взаимосвязь между объектами Rabbit можно представить следующим образом:

Все эти объекты Spring автоматически создаст при старте приложения в RabbitMQ. Однако их можно создать и руками в админке. Но чтобы приложение «знало» про них, бины всё равно должны быть определены в контексте Spring.
Пишем rest-контроллер
Для создания таски напишем rest-контроллер. В реальном приложении эта логика должна находиться в сервисном слое, но я для краткости его делать не буду.
@RestController
@RequestMapping("/tasks")
class TaskController(
private val taskRepository: TaskRepository,
private val subtaskRepository: SubtaskRepository,
private val rabbitTemplate: RabbitTemplate,
) {
@PostMapping
fun createTask(@RequestBody task: TaskDto): TaskEntity {
val entity = TaskEntity(
status = Status.NEW,
subtasks = emptyList(),
description = task.description,
)
return taskRepository.save(entity)
.also { logger.info { "New task with id=${it.id} created." } }
}
}
Здесь мы делаем обработчик POST-запроса, который принимает объект TaskDto
:
data class TaskDto(
val description: String,
)
В нём есть только одно поле с описанием задачи. В реальном проекте здесь полей было бы гораздо больше. Например, заголовок, исполнитель, оценка в стори-поинтах, проект и т.п.
На основании полученного описания задачи мы создаём объект TaskEntity
со статусом NEW
и пока без подзадач. После вызова метода save()
на уровне БД сгенерится её уникальный идентификатор, который мы увидим в ответе на запрос.
Далее в этом же контроллере создадим второй метод для создания сабтаски. В body обработчик POST-запроса также принимает только описание подзадачи, а дополнительным параметром из @PathVariable
берём id родительской задачи.
@PostMapping("/{taskId}/subtasks")
fun createSubtask(@PathVariable taskId: Int, @RequestBody subtask: TaskDto): SubtaskEntity {
val entity = SubtaskEntity(
taskId = taskId,
status = Status.NEW,
description = subtask.description,
)
val modified = subtaskRepository.save(entity)
logger.info { "New subtask with id=${modified.id} created." }
rabbitTemplate.convertAndSend(
Exchanges.SUBTASK_STATUS_EXCHANGE,
RoutingKeys.SUBTASK_STATUS,
RabbitEvent(modified.id)
)
return modified
}
После сохранения сабтаски со статусом NEW
мы отправляем событие в очередь с помощью rabbitTemplate
. Сам объект события представляет собой обобщённый data-класс с единственным полем payload
:
data class RabbitEvent(
val payload: T,
)
В этом поле мы будем передавать только id целевого объекта, а не всю сущность целиком. В любой момент мы можем подгрузить актуальную информацию из БД. При необходимости сюда можно добавлять дополнительную мета-информацию. Например, тип события: создание, обновление, удаление и т.п.
Для наглядности добавим третий метод в контроллер, который будет возвращать информацию о задаче. Благодаря связи «один-ко-многим» вместе с ней будет возвращаться информация обо всех связанных подзадачах.
@GetMapping("/{taskId}")
fun getTaskWithSubtasks(@PathVariable taskId: Int): TaskEntity {
return taskRepository.findById(taskId).get()
}
Ну и, наконец, четвёртый метод для изменения статуса сабтаски.
@PatchMapping("/{taskId}/subtasks/{subtaskId}")
fun changeSubtaskStatus(
@PathVariable taskId: Int,
@PathVariable subtaskId: Int,
@RequestBody dto: StatusDto,
): SubtaskEntity {
val subtask = subtaskRepository.findById(subtaskId).get()
subtask.status = dto.status
val modified = subtaskRepository.save(subtask)
rabbitTemplate.convertAndSend(
Exchanges.SUBTASK_STATUS_EXCHANGE,
RoutingKeys.SUBTASK_STATUS,
RabbitEvent(modified.id)
)
return modified
}
Данный PATCH-метод вызывается вручную пользователем нашего таск-трекера, когда он взял задачу в работу или завершил её. Именно этот метод в основном будет инициировать смену родительской задачи и отправку уведомления. Поэтому здесь так же публикуем событие в очередь статусов подзадач.
Метод принимает на вход объект StatusDto
с единственным поле status
, а также id
подзадачи, в которой нужно этот статус установить.
data class StatusDto(
val status: Status,
)
Обработчик очереди статусов сабтасок
Теперь напишем компонент, который будет получать эвент каждый раз при изменении статуса подзадачи. Назовём его SubtaskStatusListener
.
@Component
class SubtaskStatusListener(
private val taskRepository: TaskRepository,
private val subtaskRepository: SubtaskRepository,
private val rabbitTemplate: RabbitTemplate,
) {
@RabbitListener(queues = [Queues.SUBTASK_STATUS_QUEUE])
fun onSubtaskStatusChanged(event: RabbitEvent) {
try {
val subtaskId = event.payload
logger.info { "Subtask status change event received (subtaskId=$subtaskId)." }
val subtask = subtaskRepository.findById(subtaskId).get()
val task = taskRepository.findById(subtask.taskId).get()
val actualTaskStatus = getActualTaskStatus(task)
if (task.status != actualTaskStatus) {
task.status = actualTaskStatus
val modifiedTask = taskRepository.save(task)
logger.info { "New task status is $actualTaskStatus for taskId=${task.id}." }
rabbitTemplate.convertAndSend(
Exchanges.TASK_STATUS_EXCHANGE,
RoutingKeys.TASK_STATUS,
RabbitEvent(modifiedTask.id)
)
}
} catch (e: Exception) {
logger.info { "Error while receiving subtask status change event: $e" }
}
}
}
Мы создаём метод onSubtaskStatusChanged()
, который принимает RabbitEvent
, типизированный целым числом. На этот метод вешаем аннотацию @RabbitListener
и указываем, какую очередь он «слушает».
Внутри метода подгружаем сначала сабтаску по id, затем ищем связанную с ней таску и вычисляем актуальный статус родительской задачи на основе статусов всех связанных подзадач и той логики, которую я приводил в начале статьи:
private fun getActualTaskStatus(task: TaskEntity) = when {
task.subtasks.all { it.status == Status.NEW } -> Status.NEW
task.subtasks.any { it.status in setOf(Status.NEW, Status.IN_PROGRESS) } -> Status.IN_PROGRESS
task.subtasks.all { it.status == Status.COMPLETED } -> Status.COMPLETED
else -> task.status
}
В конце обработчика проверяем, не изменился ли статус задачи? И если изменился, генерим событие и отправляем во второй обменник. Это событие уже может быть получено обработчиком уведомлений по email.
Всю обработку события мы обязательно оборачиваем в конструкцию try-catch
, т.к. если при обработке произойдёт ошибка и мы её не перехватим, то рискуем уйти в бесконечный цикл. Опционально в секцию catch
можно добавить отправку в отдельную очередь dlq (dead letter queue — «очередь недоставленных писем») и повесить на неё мониторинг, чтобы сразу узнать о проблеме.
Обработчик очереди уведомлений
Похожим образом определяется второй обработчик, NotificationQueue
.
@Component
class NotificationListener(
private val taskRepository: TaskRepository,
) {
@RabbitListener(queues = [Queues.NOTIFICATION_QUEUE])
fun onNotificationEvent(event: RabbitEvent) {
try {
logger.info { "Task status change event received: ${event.payload}." }
val task = taskRepository.findById(event.payload).get()
// notification logic
logger.info { "New status for task '${task.description}' is ${task.status}." }
logger.info { "Task status change notification sent." }
} catch (e: Exception) {
logger.info { "Error while receiving task status change event: $e" }
}
}
}
Сюда прилетает эвент всякий раз, когда меняется статус родительской задачи. Мы также получаем RabbitEvent
, но уже из очереди уведомлений. И id объекта, который в нём содержится — это id родительской таски. Поэтому затем идём в БД и подгружаем актуальную информацию по ней.
Для каждого такого события мы отправляем уведомление (например, по email). Поскольку работа с email — это отдельная тема, то рассматривать её здесь не будем.
Тестируем логику таск-менеджера
Теперь запускаем наше приложение и отправляем сначала POST-запрос на создание родительской задачи.
При постановке задачи указываем максимально исчерпывающее описание.
// POST /tasks
{
"description": "Сделать всё как положено."
}
// Ответ:
{
"id": 1,
"description": "Сделать всё как положено.",
"status": "NEW",
"subtasks": []
}
Далее эта задача попадает в команду разработки и происходит её декомпозиция. Каждый участник процесса создаёт свою сабтаску.
// POST /tasks/1/subtasks
{
"description": "Запилить на бэке."
}
// POST /tasks/1/subtasks
{
"description": "Запилить на фронте."
}
Затем самый шустрый разработчик берёт первую сабтаску в работу:
// PATCH /tasks/1/subtasks/1
{
"status": "IN_PROGRESS"
}
Это инициирует отправку события и изменение статуса основной задачи. В итоге статусы задач примут следующий вид:
// GET /tasks/1
{
"id": 1,
"description": "Сделать всё как положено.",
"status": "IN_PROGRESS",
"subtasks": [
{
"id": 1,
"taskId": 1,
"status": "IN_PROGRESS",
"description": "Запилить на бэке."
},
{
"id": 2,
"taskId": 1,
"status": "NEW",
"description": "Запилить на фронте."
}
]
}
То есть вместе с первой сабтаской в работу автоматически перешла родительская задача.
А если мы посмотрим в логи, то увидим также запись о том, что было отправлено уведомление:
NotificationListener : Task status change event received: 1.
NotificationListener : New status for task 'Сделать всё как положено.' is IN_PROGRESS.
NotificationListener : Task status change notification sent.
То есть цепочка объектов в RabbitMQ, настроенная нами, работает корректно.
Создание и удаление объектов в RabbitMQ динамически
Если вернуться к изначальной постановке задачи, у нас было ещё одно условие: уведомления можно включать и отключать. Самый простой способ — это сделать где-то в приложении флаг и проверять его через if
в NotificationListener
. Однако при работе с очередями более правильно просто удалить связку между taskStatusExchange
и notificationQueue
. Поскольку связка notificationBinding
— это объект в RabbitMQ, то для управления объектами динамически мы можем использовать rabbitAdmin
.
Создадим новый бин в RabbitConfig
.
@Bean
fun rabbitAdmin(connectionFactory: ConnectionFactory): RabbitAdmin {
return RabbitAdmin(connectionFactory)
}
Затем пропишем его в rest-контроллер вместе с бином notificationBinding
. А также добавим ещё пару методов:
class TaskController(
private val taskRepository: TaskRepository,
private val subtaskRepository: SubtaskRepository,
private val rabbitTemplate: RabbitTemplate,
private val rabbitAdmin: RabbitAdmin, // динамическое управление объектами RabbitMQ
private val notificationBinding: Binding, // связка между exchange и очередью
) {
// ...
@DeleteMapping("/notifications")
fun deactivateNotifications() {
rabbitAdmin.removeBinding(notificationBinding)
logger.info { "Task status notifications deactivated." }
}
@PutMapping("/notifications")
fun activateNotifications() {
rabbitAdmin.declareBinding(notificationBinding)
logger.info { "Task status notifications activated." }
}
}
В DELETE-методе мы используем removeBinding()
для удаления связки между очередью и обменником. То есть этот метод будет отключать уведомления. А в методе PUT мы используем declareBinding()
для того, чтобы включить уведомления обратно. RabbitAdmin
также содержит подобные пары методов для создания и удаления очередей и обменников.
Заключение
Мы убедились, что компонент Spring for RabbitMQ позволяет легко взаимодействовать с брокером сообщений через RabbitTemplate
. Независимо от того, как вы создаёте объекты (вручную или программно), они в любом случае должны быть описаны в конфигурации Spring. Чтобы деактивировать очередь, достаточно убрать связь между очередью и обменником. Чтобы сделать это динамически можно использовать RabbitAdmin
.
Пример проекта содержит Dockerfile
, поэтому вы сможете легко развернуть его в любом облачном хостинге. Но проще всего это сделать в dockhost.ru.
Ещё больше статей и видеогайдов по разработке на Java, Kotlin и Spring вы найдёте на моём сайте devmark.ru.