Как мы сделали для разработчиков универсальную шину событий, не требующую знаний Kafka и прочих брокеров

d604c70e414879ebb4c40c555ab48f24.png

Привет!

Меня зовут Петр Коробейников, я техлид команды DBaaS for Redis в #CloudMTS.
Некоторое время назад я озадачился созданием общего набора инструментов для наших команд разработки. Цель была проста: разработчик не тратит время на погружение в логику работы конкретного инструмента, берет готовую инструкцию и просто делает свое дело — пишет код. Типовое окружение поможет переходить ребятам из команды в команду и быстро адаптироваться, а новичку — проще приступить к работе.

Сегодня я хочу рассказать про один из элементов такого типового окружения, который позволяет быстро начать работу с брокерами сообщений. Даже если разработчик Kafka и прочие брокеры до этого в глаза не видел. Речь пойдет о шине данных или событий (EventBus) и про то, как мы настроили ее кодогенерацию.

Что собой представляет шина данных и зачем она нужна?

В своей работе мы имеем дело с микросервисной архитектурой. Для асинхронного общения между сервисами мы используем шину событий. В данный момент под ее капотом Kafka (про нее писали здесь), но на ее месте может быть NSQ, Pulsar или даже Pub/Sub в Redis. В этом и суть, что разработчику для настройки обмена сообщениями не будет важно, что там внутри.

В целом схема работает следующим образом:

  • Разработчик описывает сообщение/событие по строгой protobuf-спецификации. Он прописывает, что, откуда и кому должно быть отправлено. Если разработчик решит написать что-то не по форме, то код просто не скомпилируется.

  • На основе созданной спецификации разработчик запускает автоматическую кодогенерацию кода для продьюсеров и консьюмеров. 

  • Разработчик не ломает голову, как оформить сообщение, и не тратит время на ручное создание продьюсеров и косьюмеров. Профит!

Теперь поговорим обо всем подробнее.

Спецификация

Строгую спецификацию мы создали на основе Protocol Buffers (protobuf). Хотя с этой технологией мы работаем давно, в основном в контексте gRPC, мы все же решили посмотреть на другие варианты.

Кратко расскажу, почему не выбрали инструменты ниже:

AsyncAPI. Достойная альтернатива, которая позволяет использовать любые средства доставки. Но спецификацию пришлось бы писать в виде развесистого yaml-файла, очень близкую к спецификации OpenAPI (Swagger). Это очередная история про программирование на yaml, которая всегда вызывает определенную неприязнь у разработчиков. Существуют инструменты, аналогичные Swagger, которые умеют генерировать спецификацию уже по написанному коду. Но это тоже большая боль — любой сгенерированный, например, по аннотациям swagger-файл всегда неточен. Мы же используем принцип — specification first.

Помимо этого, по субъективным ощущениям, документация у AsyncAPI неполная и отстает от реальности, хотя сообщество разработчиков достаточно часто проводит открытые встречи. Их записи можно посмотреть на официальном канале.

Cloudevents. Также поддерживает множество способов передачи сообщений: AMQP (RabbitMQ), Kafka, MQTT, NATS  и прочее.

Основная проблема — это отсутствие схемы самого сообщения, схемы полезной нагрузки (payload): по умолчанию данные могут быть записаны в произвольном формате без возможности специфицировать структуру. В целом Cloudevents больше про то, кто продьюсер, кому нужно доставить сообщение и когда оно было отправлено. 

oneof data {

    // Binary data
    bytes binary_data = 2;

    // String data
    string text_data = 3;

    // Protobuf Message data
    google.protobuf.Any proto_data = 4;
}

Источник

Кроме того, показалось, что порог входа в Cloudevents достаточно высокий. А по опыту адаптации в прошлом стало понятно, что и тогда, и сейчас это оверкилл.

В итоге мы написали отдельный плагин на go для protoc (protoc-gen-go-eventbus).
Плагин поддерживает опцию, в которой мы задаем топик для событий, место, куда события будут публиковаться и откуда читатели будут вычитывать его.

Для формирования имени топика мы используем следующий формат: сначала идет namespace проекта, потом название проекта и сущность или действие, которое выполняется.

syntax = "proto3";


package eventbus;


option go_package = "./;eventbus";


import "cloud.mts.ru/protobuf/eventbus/annotation.proto";
import "dbaas-redis.redis-hub-gateway.types.proto";


message ApplianceCreateClaimedEvent {
 option (cloud.mts.ru.protobuf.eventbus.topic) = "dbaas-redis.redis-hub-gateway.appliance-create-claimed";


 message Resource {
   int32 total_cpu = 1;
   IntegerUnit total_ram = 2;
   IntegerUnit disk_size = 3;
   string disk_policy = 4;
 }


 message ApplianceVersion {
   string id = 1;
   string name = 2;
 }


 string appliance_id = 1;
 string project_id = 2;
 // …
}

Пример спецификации

И сам получившийся плагин protoc-gen-go-eventbus, и protoc у нас запечён в отдельный docker-образ. Так мы ушли от необходимости устанавливать protoc на компьютеры разработчиков и поддерживать актуальность его версии: актуальная используемая версия со всеми необходимыми плагинами и аннотациями есть в образе, доступном из внутреннего реджистри.

Hidden text

О подходе с запеканием инструментов в докер-образы вы можете почитать в моем блоге.

Автоматическая кодогенерация

Ориентируясь на данные из спецификации, автоматически создается сам экземпляр продьюсера. У всех у них есть метод Produce, который принимает первым аргументом ctx, и событие, которое было сгенерировано по protobuf с помощью спецификации выше. В таком виде событие отправляется от продьюсера в шину событий.

if err := s.applianceCreateEventProducer.Produce(ctx, applianceRequest); err != nil {
  logging.LoggerFromContext(ctx).
     With(zap.String("applianceID", applianceRequest.ApplianceId)).
     Error("create appliance failed", zap.Error(&errCreateAppliance{err: err}))


  return err
}

Пример вызова продьюсера

У консьюмера схожий с продьюсером API. При генерации инстанса консьюмера также создается и метод потребления. Первым аргументом принимается ctx, а вторым — функция, в которую будут подаваться полученные события.
На примере ниже обрабатываем событие создания кластера.

Если обработка прошла без ошибок, мы возвращаем nil.

grp.Go(func() error {
  return applianceCreateClaimedEventConsumer.Consume(ctx, func(ctx context.Context, event *eventbus.ApplianceCreateClaimedEvent) error {
     // do some work
     return err
  })
})

Пример вызова консьюмера

Если возвращаем с ошибкой, то это сообщение нужно будет вычитать еще раз.

Немного про мониторинг

Ни одно наше решение не работает без мониторинга. И шина событий — не исключение. Наиболее подходящим для нас паттерном снятия метрик в данной ситуации оказался RED (Rate, Errors, Duration). Все метрики снимаются автоматически внутри сгенерированного кода. Разработчику не нужно ничего писать самостоятельно.

А дальше снятые метрики проходят по всем известному механизму:

  1. Попадают в endpoint/metrics сервиса;

  2. Оттуда при очередном «скрейпе» попадают в VictoriaMetrics;

  3. И оказываются на дашборде в grafana.

Для работы с grafana у нас тоже есть набор инструментов, позволяющий отлаживать в локальной среде на собственном ноутбуке дашборды, описанные с помощью grafonnet. Внутри — общедоступные инструменты с открытым исходным кодом, ну, а мы предоставляем к ним простой и понятный интерфейс командной строки. Про это обязательно расскажу в одной из будущих статей.

Что в итоге имеем

При относительной простоте (разработка самого инструмента заняла 4 дня, а на внедрение ушел один спринт) шина данных приносит много пользы в моменте и в перспективе.

  • Низкий порог входа для разработчиков. Разработчику не нужно уметь в Kafka или другой брокер сообщений. Он даже может и не знать, что там внутри. У нас есть шина событий, в которую понятно, как отправлять события автоматически, без регистраций и СМС. 

  • Универсальность. В этой схеме Kafka можно заменить на что угодно.

  • Защита от человеческой ошибки. Благодаря типизации схемы.

  • Упрощение модульных тестов. Типизированная спецификация позволяет нам воспроизводить различные сценарии (в том числе и с ошибками) с помощью моков. При этом мы не будем вызывать реальный код, а данные в шину не записываются.  

  • Простой API. Разработчику не нужно долго разбираться, как этим всем пользоваться: описал, сгенерировал и вперед.

На этом пока всё. Подключайтесь к конструктивной дискуссии в комментариях. May the source be with you

© Habrahabr.ru