Apache Kafka – Producer и Consumer. Простой пример Nodejs приложения

Привет! В продолжение темы изучения микросервисов решил разобраться с взаимодействием этих самых «сервисов», и написать простой пример взаимодействия двух сервисов между собой.

Перед чтением данной статьи, настоятельно рекомендую ознакомиться с данной статьей, по теме kafka (Kafka за 20 минут. Ментальная модель и как с ней работать)

Пример реализации можно найти тут…

И так, пример таков:

c009e5335da72f4688ca8b008324098f.png

В сервисе пользователей есть метод для регистрации этих самых пользователей, где после регистрации, необходимо создать профиль для пользователя.

Конечно в представленном примере, отсутствует логика «ответов на вопросы» по типу:

Пример направлен исключительно на демонстрацию работы общения двух сервисов между собой, с помощью Apache Kafka.

Реализация

И так, в данном примере user‑service выступает в качестве producer — то есть отправителем событий, а profile‑service — выступает в качестве consumer — то есть слушающего входящие события.

Создадим 2 абсолютно одинаковых сервиса со следующими файлами

  1. Создадим директорию microservices — куда поместим наши сервисы

  2. Создадим файлы для сервиса профиля:
    Следует поместить их в отдельную директорию profile

    profile.service.js
    import Fastify from 'fastify'
    
    const fastify = Fastify({
        logger: true,
    })
    
    fastify.listen({ port: 3001, host: "0.0.0.0" }, (err, address) => {
        if (err) throw err
        console.log("Profile service: Start Up!")
    })
    Dockerfile
    FROM node:22
    
    WORKDIR /profile-microservice
    
    COPY package.json .
    COPY yarn.lock .
    
    RUN yarn install
    
    COPY . .
    
    EXPOSE 3001
    
    CMD ["node", "profile.service.js"]
    package.json
    {
      "name": "microservice-kafka-learn_profile",
      "version": "1.0.0",
      "license": "MIT",
      "type": "module",
      "dependencies": {
        "@fastify/kafka": "^3.0.0",
        "fastify": "^5.0.0"
      }
    }
  3. Далее по тому же принципу что и для сервиса profile, создадим директорию и файловую структуру для сервиса user

Далее, в корне наше проекта создадим docker‑compose в котором подымим наши ранее созданный сервисы, а так же, сразу поместим туда нашу Apache Kafka

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka_broker:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  user_microservice:
    build: "./microservices/user"
    ports:
      - "3000:3000"
    depends_on:
      - kafka_broker

  profile_microservice:
    build: "./microservices/profile"
    ports:
      - "3001:3001"
    depends_on:
      - kafka_broker

На данном этапе, запуск docker‑compose должен привести вас к тому, что ваши сервисы user, profile, а так же kafka и zookeeper должны корректно работать.

Сценарий: «Пользователь успешно зарегистрирован»

Следующий шаг, давайте добавим в наш user.service.js файл подключение к Kafka

// ...

const fastify = Fastify({
    logger: true,
})

fastify.register(kafka, {
    producer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'client.id': 'user-service',
        'dr_cb': true
    },
});

// ...

Мы успешно подключились к нашей kafka, теперь давайте добавим метод «регистрации» пользователей

fastify.post('/user/register', async (request, reply) => {
    // .....логика регистрации пользователя.....
    // Считаем что тут у нас логика создания пользователя, которая прошла успешно
    // ...

    /*
    * Отправьте событие для создания профиля для успешно зарегистрированного пользователя.
    * */    
    fastify.kafka.push({
        topic: "user_register",
        payload: JSON.stringify({
            id: Date.now(),
            email: "user@example.com",
            username: "imbatman"
        }),
        key: 'user_register_key'
    })

    reply.send({
        message: "User successfully created!"
    })
})

Что тут происходит?

  1. В данном коде, мы считаем что успешно зарегистрировали пользователя и приступаем к отправке сообщения в kafka

  2. Создаем название topic'а, желательно вынести данные наименования топиков в константы, для дальнейшего использования их, к примеру, в других сервисах, как мы увидим в примере далее

  3. Формируем payload который хотим передать потенциальному получателю нашего события

  4. Формируем ключ нашего сообщения

  5. Отправляем!

  6. Отлично, ваши данные попали к брокеру!

Сценарий: «Создаем профиль пользователя»

Добавим слушателя нашего события для создания профиля в сервисе profile.service.js

Нам так же как и в сервисе user, необходимо подключиться к нашей Kafka в сервисе profile

import crypto from "node:crypto"

const groupId = crypto.randomBytes(20).toString('hex')

fastify.register(kafka, {
    consumer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'topic.metadata.refresh.interval.ms': 1000,
        'group.id': groupId,
    },
})

Перед подключением нам нужно сгенерировать идентификатор группы, он служит для идентификации группы потребителей (consumer). Все потребители с одинаковым group.id образуют одну группу и совместно потребляют сообщения из топиков, разделяя между собой партиции (Если вы прочли рекомендованную мной статью выше, понимаете о чем речь).

Далее, нам необходимо подписаться на наше событие регистрации пользователей, то есть — «user_register»

Добавим следующий код:

fastify.register(kafka, {
    consumer: {
        'metadata.broker.list': 'kafka:9092',
        'fetch.wait.max.ms': 10,
        'fetch.error.backoff.ms': 50,
        'topic.metadata.refresh.interval.ms': 1000,
        'group.id': groupId,
    },
}).after((err) => {
    if (err) throw err

    fastify.kafka
        .subscribe(["user_register"])
        .on("user_register", (msg, commit) => {
            const registeredUser = JSON.parse(msg.value.toString());

            console.log(registeredUser) // Тут наш зарегистрированный юзверь
            commit()
        })

    fastify.kafka.consume()
})

Разберем что тут происходит?

  1. Для начала, мы подписываемся в нашем сервисе на все необходимые для данного сервиса события .subscribe(["user_register"]). В данном случае, этим событием является «user_register»

    Опять же, повторюсь, что ключи топиков, необходимо хранить вне всех сервисов, для их общего доступа. В данном случае, сделано ради простоты примера.

  2. Далее, мы устанавливаем обработчик события
    .on("user_register", (msg, commit) => {

  3. В теле данного события мы получаем результат отправленного нами события в сервисе user, то есть:

    {
      id: "1726957487909",
      email: "user@example.com",
      username: "imbatman"
    }
  1. Нам остается обработать в нашем profile сервисе данное событие, и создать профиль пользователя, основываясь на предоставленных данных

  2. Далее, вызывая представленную в аргументе функцию commit, мы подтверждает получение и обработку сообщения Kafka, сигнализируя брокеру, что это сообщение может быть помечено как «прочитанное» в текущей группе потребителей, предотвращая его повторное получение

fastify.kafka.consume() в свою очередь, активирует процесс потребления сообщений из подписанных тем, в данном случае «user_register». После вызова этого метода, Fastify начнет обрабатывать входящие сообщения и передавать их в соответствующие обработчики событий, такие как on("user_register", ...)

Заключение

Надеюсь, на таком незамысловатом примере, мне в первую очередь, как человеку только что познакомившись с данной технологией, и вам, как читателям данной статьи, удалось уловить первые простые шаги и понимание взаимодействие субъектов в виде сервисов (как в данном примере) между собой!

© Habrahabr.ru