Учимся общаться между микросервисами на Node.js через RabbitMQ

Это продолжение статьи «Пишем первый микросервис на Node.js с общением через RabbitMQ», которая была неплохо принята пользователями хабра.

В этой статье я расскажу о том, как нужно правильно общаться между микросервисами, чтобы микросервисы оставались изолированными.


Как делать не стоит

Зачем нужно общаться между микросервисами? Используешь одну базу данных, читаешь оттуда что хочешь — делов-то!

Нет, так делать нельзя. Концепция микросервисов заключается в том, что они изолированы друг от друга, никто ни о ком ничего не знает (практически). Скорее всего, в будущем, когда система начнет разрастаться, вы захотите расширять функционал и вам понадобится общаться между микросервисами: например, пользователь купил товар, значит, нужно отправить уведомление о продаже продавцу.


Преимущества изоляции


Надежность

Предположим, имеется монолитное приложение, в котором есть несколько контроллеров:


  1. Товары
  2. Скидки
  3. Блог
  4. Пользователи

В один прекрасный день у нас падает база данных: теперь мы не можем получить ни товары, ни скидки, ни статьи для блога, ни пользователей. Сайт полностью недоступен, клиенты не могут зайти, бизнес теряет прибыль.

Что будет в микросервисной архитектуре?

В другой вселенной, в этот же день падает база данных микросервиса пользователей, он становится недоступным: пользователи не могут выйти из аккаунта, зарегистрироваться и залогиниться. Казалось бы, что все плохо и бизнес так же теряет прибыль, но нет: потенциальные покупатели могут посмотреть имеющиеся товары, почитать блог компании, находить скидки.

Благодаря тому, что у каждого микросервиса своя база данных, сайд-эффектов становится куда меньше.

Это называется постепенная деградация.


Абстракция

В большом приложении очень сложно сосредоточиться на одной задаче, поскольку поменяв какую-нибудь небольшую миддлвару, можно сломать какой-то контроллер. Захотите использовать новый клиент для redis — нет, нельзя, тот контроллер, который мы написали три года назад, использует версию 0.1.0. Хотите наконец-то использовать новые возможности Node.js 10? А может 12? Извините, но в монолите используется 6 версия.


Как общаться

Раз уж мы заговорили о примере «пользователь купил товар, отправляем уведомление о продаже продавцу», тогда его и реализуем.

Схема следующая:


  1. Пользователь отправляет запрос в микросервис market на покупку вещи по ссылке /market/buy/: id
  2. В базе записывается флаг, что товар продан
  3. Из микросервиса market отправляется запрос в микросервис notifications, к которому подключены клиенты через WebSocket
  4. Микросервис notifications отправляет сообщение продавцу о продаже вещи


Пишем gateway

const Gateway = require('micromq/gateway');

// создаем гейтвей
const gateway = new Gateway({
  microservices: ['market'],
  rabbit: {
    url: process.env.RABBIT_URL,
  },
});

// создаем эндпоинт и делегируем его в микросервис market
gateway.post('/market/buy/:id', (req, res) => res.delegate('market'));

// слушаем порт и принимаем запросы
gateway.listen(process.env.PORT);

Гейтвей у нас состоит лишь из одного эндпоинта, но этого хватит для примера и тренировок.


Пишем микросервис notifications

const MicroMQ = require('micromq');
const WebSocket = require('ws');

// создаем микросервис
const app = new MicroMQ({
  name: 'notifications',
  rabbit: {
    url: process.env.RABBIT_URL,
  },
});

// поднимаем сервер для принятия запросов по сокетам
const ws = new WebSocket.Server({
  port: process.env.PORT,
});

// здесь будем хранить клиентов
const clients = new Map();

// ловим событие коннекта
ws.on('connection', (connection) => {
  // ловим все входящие сообщения
  connection.on('message', (message) => {
    // парсим сообщение, чтобы извлечь оттуда тип события и параметры.
    // не забудьте в продакшене добавить try/catch, когда будете парсить json!
    const { event, data } = JSON.parse(message);

    // на событие 'authorize' сохраняем коннект пользователя и связываем его с айди
    if (event === 'authorize' && data.userId) {
      // сохраняем коннект и связываем его с айди пользователя
      clients.set(data.userId, connection);
    }
  });
});

// не забудьте реализовать удаление клиента после дисконнекта,
// иначе получите утечку памяти!
ws.on('close', ...);

// создаем действие notify, которое могут вызывать другие микросервисы
app.action('notify', (meta) => {
  // если нет айди пользователя или текста, тогда возвращаем статус 400
  if (!meta.userId || !meta.text) {
    return [400, { error: 'Bad data' }];
  }

  // получаем коннект конкретного пользователя
  const connection = clients.get(meta.userId);

  // если не удалось найти коннект, тогда возвращаем статус 404
  if (!connection) {
    return [404, { error: 'User not found' }];
  }

  // отправляем сообщение клиенту
  connection.send(meta.text);

  // возвращаем 200 и ответ
  return { ok: true };
});

// запускаем микросервис
app.start();

Здесь мы поднимаем вебсокет-сервер и микросервис одновременно, чтобы принимать запросы и по вебсокетам, и по RabbitMQ.

Схема следующая:


  1. Пользователь устанавливает соединение с нашим вебсокет-сервером
  2. Пользователь авторизовывается, отправляя событие authorize со своим userId внутри
  3. Мы сохраняем соединение пользователя, чтобы в дальнейшем можно было отправлять ему уведомления
  4. Приходит событие по RabbitMQ о том, что нужно отправить уведомление пользователю
  5. Проверяем валидность входящих данных
  6. Получаем соединение пользователя
  7. Отправляем уведомление


Пишем микросервис market

const MicroMQ = require('micromq');
const { Items } = require('./api/mongodb');

// создаем микросервис
const app = new MicroMQ({
  name: 'market',
  rabbit: {
    url: process.env.RABBIT_URL,
  },
});

// создаем эндпоинт для покупки предмета
app.post('/market/buy/:id', async (req, res) => {
  const { id } = req.params;

  // ищем предмет с нужным айди
  const item = await Items.findOne({ id, isSold: false });

  // если предмета нет, возвращаем 404
  if (!item) {
    res.status(404).json({
      error: 'Item not found',
    });

    return;
  }

  // обновляем предмет, ставя флаг о том, что он продан
  await Items.updateOne({
    id,
  }, {
    $set: {
      isSold: true,
    },
  });

  // отправляем уведомление продавцу о том, что предмет продан
  req.app.ask('notifications', {
    server: {
      action: 'notify',
      meta: {
        userId: item.sellerId,
        text: JSON.stringify({
          event: 'notification',
          data: {
            text: `Item #${id} was sold!`,
          },
        }),
      },
    },
  })
    // обрабатываем ошибку, если не получилось отправить уведомление
    .catch(err => console.log('Cannot send message via notifications microservice', err));

  // отвечаем пользователю о том, что все прошло успешно
  res.json({
    ok: true,
  });
});

// запускаем микросервис
app.start();

Схема следующая:


  1. Получаем запрос пользователя на покупку предмета
  2. Ищем предмет с нужным айди и убеждаемся, что он еще не продан
  3. Помечаем предмет как проданный
  4. Отправляем в бэкграунде уведомление продавцу о продаже
  5. Отвечаем клиенту


Проверяем


  1. Запускаем 3 процесса
  2. Отправялем POST /market/buy/1
  3. Получаем в ответ { ok: true }
  4. Продавец получает уведомление
$ PORT=9000 node ./src/gateway.js
$ PORT=9001 node ./src/notifications.js
$ MONGODB_URL=mongodb://localhost:27017/my-super-microservice node ./src/market.js

94lftpbwx-90pp29qhwsdv4bx98.png


© Habrahabr.ru