Apache Kafka – Producer и Consumer. Простой пример Nodejs приложения
Привет! В продолжение темы изучения микросервисов решил разобраться с взаимодействием этих самых «сервисов», и написать простой пример взаимодействия двух сервисов между собой.
Перед чтением данной статьи, настоятельно рекомендую ознакомиться с данной статьей, по теме kafka (Kafka за 20 минут. Ментальная модель и как с ней работать)
Пример реализации можно найти тут…
И так, пример таков:
В сервисе пользователей есть метод для регистрации этих самых пользователей, где после регистрации, необходимо создать профиль для пользователя.
Конечно в представленном примере, отсутствует логика «ответов на вопросы» по типу:
Пример направлен исключительно на демонстрацию работы общения двух сервисов между собой, с помощью Apache Kafka.
Реализация
И так, в данном примере user‑service выступает в качестве producer — то есть отправителем событий, а profile‑service — выступает в качестве consumer — то есть слушающего входящие события.
Создадим 2 абсолютно одинаковых сервиса со следующими файлами
Создадим директорию microservices — куда поместим наши сервисы
Создадим файлы для сервиса профиля:
profile.service.js
Следует поместить их в отдельную директорию profileDockerfileimport Fastify from 'fastify' const fastify = Fastify({ logger: true, }) fastify.listen({ port: 3001, host: "0.0.0.0" }, (err, address) => { if (err) throw err console.log("Profile service: Start Up!") })
package.jsonFROM node:22 WORKDIR /profile-microservice COPY package.json . COPY yarn.lock . RUN yarn install COPY . . EXPOSE 3001 CMD ["node", "profile.service.js"]
{ "name": "microservice-kafka-learn_profile", "version": "1.0.0", "license": "MIT", "type": "module", "dependencies": { "@fastify/kafka": "^3.0.0", "fastify": "^5.0.0" } }
Далее по тому же принципу что и для сервиса profile, создадим директорию и файловую структуру для сервиса user
Далее, в корне наше проекта создадим docker‑compose в котором подымим наши ранее созданный сервисы, а так же, сразу поместим туда нашу Apache Kafka
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka_broker:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
user_microservice:
build: "./microservices/user"
ports:
- "3000:3000"
depends_on:
- kafka_broker
profile_microservice:
build: "./microservices/profile"
ports:
- "3001:3001"
depends_on:
- kafka_broker
На данном этапе, запуск docker‑compose должен привести вас к тому, что ваши сервисы user, profile, а так же kafka и zookeeper должны корректно работать.
Сценарий: «Пользователь успешно зарегистрирован»
Следующий шаг, давайте добавим в наш user.service.js файл подключение к Kafka
// ...
const fastify = Fastify({
logger: true,
})
fastify.register(kafka, {
producer: {
'metadata.broker.list': 'kafka:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'client.id': 'user-service',
'dr_cb': true
},
});
// ...
Мы успешно подключились к нашей kafka, теперь давайте добавим метод «регистрации» пользователей
fastify.post('/user/register', async (request, reply) => {
// .....логика регистрации пользователя.....
// Считаем что тут у нас логика создания пользователя, которая прошла успешно
// ...
/*
* Отправьте событие для создания профиля для успешно зарегистрированного пользователя.
* */
fastify.kafka.push({
topic: "user_register",
payload: JSON.stringify({
id: Date.now(),
email: "user@example.com",
username: "imbatman"
}),
key: 'user_register_key'
})
reply.send({
message: "User successfully created!"
})
})
Что тут происходит?
В данном коде, мы считаем что успешно зарегистрировали пользователя и приступаем к отправке сообщения в kafka
Создаем название topic'а, желательно вынести данные наименования топиков в константы, для дальнейшего использования их, к примеру, в других сервисах, как мы увидим в примере далее
Формируем payload который хотим передать потенциальному получателю нашего события
Формируем ключ нашего сообщения
Отправляем!
Отлично, ваши данные попали к брокеру!
Сценарий: «Создаем профиль пользователя»
Добавим слушателя нашего события для создания профиля в сервисе profile.service.js
Нам так же как и в сервисе user, необходимо подключиться к нашей Kafka в сервисе profile
import crypto from "node:crypto"
const groupId = crypto.randomBytes(20).toString('hex')
fastify.register(kafka, {
consumer: {
'metadata.broker.list': 'kafka:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'topic.metadata.refresh.interval.ms': 1000,
'group.id': groupId,
},
})
Перед подключением нам нужно сгенерировать идентификатор группы, он служит для идентификации группы потребителей (consumer). Все потребители с одинаковым group.id образуют одну группу и совместно потребляют сообщения из топиков, разделяя между собой партиции (Если вы прочли рекомендованную мной статью выше, понимаете о чем речь).
Далее, нам необходимо подписаться на наше событие регистрации пользователей, то есть — «user_register»
Добавим следующий код:
fastify.register(kafka, {
consumer: {
'metadata.broker.list': 'kafka:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'topic.metadata.refresh.interval.ms': 1000,
'group.id': groupId,
},
}).after((err) => {
if (err) throw err
fastify.kafka
.subscribe(["user_register"])
.on("user_register", (msg, commit) => {
const registeredUser = JSON.parse(msg.value.toString());
console.log(registeredUser) // Тут наш зарегистрированный юзверь
commit()
})
fastify.kafka.consume()
})
Разберем что тут происходит?
Для начала, мы подписываемся в нашем сервисе на все необходимые для данного сервиса события
.subscribe(["user_register"])
. В данном случае, этим событием является «user_register»Опять же, повторюсь, что ключи топиков, необходимо хранить вне всех сервисов, для их общего доступа. В данном случае, сделано ради простоты примера.
Далее, мы устанавливаем обработчик события
.on("user_register", (msg, commit) => {
В теле данного события мы получаем результат отправленного нами события в сервисе user, то есть:
{ id: "1726957487909", email: "user@example.com", username: "imbatman" }
Нам остается обработать в нашем profile сервисе данное событие, и создать профиль пользователя, основываясь на предоставленных данных
Далее, вызывая представленную в аргументе функцию commit, мы подтверждает получение и обработку сообщения Kafka, сигнализируя брокеру, что это сообщение может быть помечено как «прочитанное» в текущей группе потребителей, предотвращая его повторное получение
fastify.kafka.consume()
в свою очередь, активирует процесс потребления сообщений из подписанных тем, в данном случае «user_register». После вызова этого метода, Fastify начнет обрабатывать входящие сообщения и передавать их в соответствующие обработчики событий, такие как on("user_register", ...)
Заключение
Надеюсь, на таком незамысловатом примере, мне в первую очередь, как человеку только что познакомившись с данной технологией, и вам, как читателям данной статьи, удалось уловить первые простые шаги и понимание взаимодействие субъектов в виде сервисов (как в данном примере) между собой!