[Перевод] Синхронный Запрос-Ответ с использованием Apache Kafka

Архитектуры, управляемые событиями (Event Driven Architecture), в целом, и Apache Kafka, в частности, привлекли в последнее время большое внимание. Для реализации всех преимуществ архитектуры, управляемой событиями, механизм делегирования событий должен быть по своей сути асинхронным. Тем не менее, могут существовать некоторые особые сценарии/потоки использования, в которых требуется семантика Синхронного Запроса-Ответа. В этом выпуске показано, как реализовать «Запрос-Ответ» с помощью Apache Kafka.

Перевел @middle_java
Дата оригинальной статьи: 26 October 2018

Apache Kafka по своей природе асинхронна. Следовательно, семантика «Запрос-Ответ» для Apache Kafka не является естественной. Тем не менее эта задача не нова. Паттерн Интеграции Корпоративных приложений (Enterprise Integration Pattern) Request-Reply обеспечивает проверенный механизм синхронного обмена сообщениями по асинхронным каналам:

wwcy0hvrj6iie7amwxpk1twfbui.gif

Паттерн Return Address (Адрес Возврата) дополняет паттерн Request-Reply механизмом указания запрашивающей стороной адреса, на который должен быть отправлен ответ:

rwndqqxh42kt82xadcbvgjb76wa.gif

Недавно в Spring Kafka 2.1.3 была добавлена поддержка из коробки паттерна «Request Reply», а в версии 2.2 были отполированы некоторые из его шероховатостей. Рассмотрим, как работает эта поддержка:

На стороне клиента: ReplyingKafkaTemplate


Хорошо известная абстракция Template (Шаблон) формирует в Spring основу для клиентской части механизма Request-Reply.

 @Bean
 public ReplyingKafkaTemplate < String, Request, Reply > replyKafkaTemplate(
     ProducerFactory < String, Request > pf,
     KafkaMessageListenerContainer < String, Reply > lc) {
     return new ReplyingKafkaTemplate < > (pf, lc);
 }


Здесь все довольно прямолинейно: мы настраиваем ReplyingKafkaTemplate, который отправляет сообщения-запросы со String ключами и получает сообщения-ответы со String ключами. Вместе с тем ReplyingKafkaTemplate должен быть основан на ProducerFactory Запроса, ConsumerFactory Ответа и MessageListenerContainer с соответствующими конфигурациями консюмера и продюсера. Следовательно, необходимая конфигурация довольно развесиста:

 @Value("${kafka.topic.car.reply}")
 private String replyTopic;

 @Bean
 public Map < String, Object > consumerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

     return props;
 }

 @Bean
 public Map < String, Object > producerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
     return props;
 }

 @Bean
 public ProducerFactory < String, Request > requestProducerFactory() {
     return new DefaultKafkaProducerFactory < > (producerConfigs());
 }

 @Bean
 public ConsumerFactory < String, Reply > replyConsumerFactory() {
     return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
         new JsonSerializer < Reply > ());
 }

 @Bean
 public KafkaMessageListenerContainer < String, Reply > replyListenerContainer() {
     ContainerProperties containerProperties = new ContainerProperties(replyTopic);
     return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties);
 }


В этом случае использование replyKafkaTemplate для отправки синхронного запроса и получения ответа выглядит следующим образом:

@Value("${kafka.topic.car.request}")
private String requestTopic;

@Value("${kafka.topic.car.reply}")
private String replyTopic;

@Autowired
private ReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;

...
RequestReply request = RequestReply.request(...);
//создаем producer record
ProducerRecord < String, Request > record = new ProducerRecord < String, Request > (requestTopic, request);
// устанавливаем топик для ответа в заголовке
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// отправляем запрос в топик Kafka и асинхронно получаем ответ в указанный топик для ответа
RequestReplyFuture < String, Request, Reply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
sendAndReceive.addCallback(new ListenableFutureCallback < ConsumerRecord < String, Reply >> () {
    @Override
    public void onSuccess(ConsumerRecord < String, Reply > result) {
        // получаем значение consumer record
        Reply reply = result.value();
        System.out.println("Reply: " + reply.toString());
    }
});


Здесь также много бойлерплейта и низкоуровневого API, да еще этот устаревший API ListenableFuture вместо современной CompletableFuture.

requestReplyKafkaTemplate заботится о генерации и установке заголовка KafkaHeaders.CORRELATION_ID, но мы должны явно задать заголовок KafkaHeaders.REPLY_TOPIC для запроса. Обратите также внимание, что этот же топик для ответа был излишне заинжектен выше в replyListenerContainer. Гадость какая-то. Не совсем то, чего я ожидал от абстракции Spring.

Серверная сторона: @SendTo


На стороне сервера обычный KafkaListener, прослушивающий топик для запроса, дополнительно декорирован аннотацией @SendTo, чтобы предоставить сообщение-ответ. Объект, возвращаемый методом слушателя, автоматически оборачивается (wrapped) в ответное сообщение, добавляется CORRELATION_ID и ответ публикуется в топике, указанном в заголовке REPLY_TOPIC.

 @Bean
 public Map < String, Object > consumerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

     return props;
 }

 @Bean
 public Map < String, Object > producerConfigs() {
     Map < String, Object > props = new HashMap < > ();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

     return props;
 }

 @Bean
 public ConsumerFactory < String, Request > requestConsumerFactory() {
     return new DefaultKafkaConsumerFactory < > (consumerConfigs(), new StringDeserializer(),
         new JsonSerializer < Request > ());
 }

 @Bean
 public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, Request >> requestListenerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory < String, Request > factory =
         new ConcurrentKafkaListenerContainerFactory < > ();
     factory.setConsumerFactory(requestConsumerFactory());
     factory.setReplyTemplate(replyTemplate());
     return factory;
 }

 @Bean
 public ProducerFactory < String, Reply > replyProducerFactory() {
     return new DefaultKafkaProducerFactory < > (producerConfigs());
 }

 @Bean
 public KafkaTemplate < String, Reply > replyTemplate() {
     return new KafkaTemplate < > (replyProducerFactory());
 }


Здесь также требуется некоторая конфигурация, но конфигурация слушателя проще:

 @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory")
 @SendTo()
 public Reply receive(Request request) {
     Reply reply = ...;
     return reply;
 }


Но как насчет нескольких экземпляров консюмера?


Все вроде работает, пока мы не используем несколько экземпляров консюмера. Если у нас есть несколько экземпляров клиента, мы должны убедиться, что ответ отправляется в корректный экземпляр клиента. Документация Spring Kafka предполагает, что каждый консюмер может использовать уникальный топик или, что с запросом отправляется дополнительное значение заголовка KafkaHeaders.RESPONSE_PARTITION — четырехбайтовое поле, содержащее BIG-ENDIAN-представление целочисленного номера раздела. Использование раздельных топиков для разных клиентов явно не очень гибко, поэтому мы выбираем явную настройку REPLY_PARTITION. Тогда клиент должен знать, на какую партицию он назначен. В документации предлагается использовать явную конфигурацию для выбора конкретной партиции. Давайте добавим ее к нашему примеру:

 @Value("${kafka.topic.car.reply.partition}")
 private int replyPartition;

 ...

 @Bean
 public KafkaMessageListenerContainer < String, RequestReply > replyListenerContainer() {
     ContainerProperties containerProperties = new ContainerProperties(replyTopic);
     TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition);
     return new KafkaMessageListenerContainer < > (replyConsumerFactory(), containerProperties, initialOffset);
 }
 private static byte[] intToBytesBigEndian(final int data) {
         return new byte[] {
             (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),
             (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),
         };
     }

     ...
     record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
 record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));
 RequestReplyFuture < String, RequestReply, RequestReply > sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
 ...


Не очень красиво, но это работает. Требуемая конфигурация обширна и API выглядит низкоуровнево. Необходимость явной настройки партиций усложняет процесс динамического масштабирования количества клиентов. Очевидно, можно сделать лучше.

Инкапсулирование обработки топика для ответа и партиции


Давайте начнем с инкапсуляции паттерна Return Address, передавая вместе топик для ответа и партицию. Топик для ответа должен быть заинжектен в RequestReplyTemplate и, следовательно, вообще не должен присутствовать в API. Когда речь идет о партициях для ответа, сделаем наоборот: извлечем партицию (партиции), назначенную слушателю топика для ответа, и передадим эту партицию автоматически. Это избавляет клиента от необходимости заботиться об этих заголовках.
При этом, давайте также сделаем API таким, чтобы он напоминал стандартный KafkaTemplate (перегрузим метод sendAndReceive () упрощенными параметрами и добавим соответствующие перегруженные методы, использующие настроенный по умолчанию топик):

public class PartitionAwareReplyingKafkaTemplate < K, V, R > extends ReplyingKafkaTemplate < K, V, R > {

    public PartitionAwareReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,
        GenericMessageListenerContainer < K, R > replyContainer) {
        super(producerFactory, replyContainer);
    }

    private TopicPartition getFirstAssignedReplyTopicPartition() {
        if (getAssignedReplyTopicPartitions() != null &&
            getAssignedReplyTopicPartitions().iterator().hasNext()) {
            TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Using partition " + replyPartition.partition());
            }
            return replyPartition;
        } else {
            throw new KafkaException("Illegal state: No reply partition is assigned to this instance");
        }
    }

    private static byte[] intToBytesBigEndian(final int data) {
        return new byte[] {
            (byte)((data >> 24) & 0xff), (byte)((data >> 16) & 0xff),
            (byte)((data >> 8) & 0xff), (byte)((data >> 0) & 0xff),
        };
    }

    public RequestReplyFuture < K,
    V,
    R > sendAndReceiveDefault(@Nullable V data) {
        return sendAndReceive(getDefaultTopic(), data);
    }

    public RequestReplyFuture < K,
    V,
    R > sendAndReceiveDefault(K key, @Nullable V data) {
        return sendAndReceive(getDefaultTopic(), key, data);
    }

    ...

    public RequestReplyFuture < K,
    V,
    R > sendAndReceive(String topic, @Nullable V data) {
        ProducerRecord < K, V > record = new ProducerRecord < > (topic, data);
        return doSendAndReceive(record);
    }

    public RequestReplyFuture < K,
    V,
    R > sendAndReceive(String topic, K key, @Nullable V data) {
        ProducerRecord < K, V > record = new ProducerRecord < > (topic, key, data);
        return doSendAndReceive(record);
    }

    ...

    @Override
    public RequestReplyFuture < K,
    V,
    R > sendAndReceive(ProducerRecord < K, V > record) {
        return doSendAndReceive(record);
    }

    protected RequestReplyFuture < K,
    V,
    R > doSendAndReceive(ProducerRecord < K, V > record) {
        TopicPartition replyPartition = getFirstAssignedReplyTopicPartition();
        record.headers()
            .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes()))
            .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION,
                intToBytesBigEndian(replyPartition.partition())));
        return super.sendAndReceive(record);
    }
}


Следующий шаг: Адаптируем ListenableFuture к более современной CompletableFuture.

public class CompletableFutureReplyingKafkaTemplate < K, V, R > extends PartitionAwareReplyingKafkaTemplate < K, V, R > {

    public CompletableFutureReplyingKafkaTemplate(ProducerFactory < K, V > producerFactory,
        GenericMessageListenerContainer < K, R > replyContainer) {
        super(producerFactory, replyContainer);
    }

    public CompletableFuture < R > requestReplyDefault(V value) {
        return adapt(sendAndReceiveDefault(value));
    }

    public CompletableFuture < R > requestReplyDefault(K key, V value) {
        return adapt(sendAndReceiveDefault(key, value));
    }

    ...

    public CompletableFuture < R > requestReply(String topic, V value) {
        return adapt(sendAndReceive(topic, value));
    }

    public CompletableFuture < R > requestReply(String topic, K key, V value) {
        return adapt(sendAndReceive(topic, key, value));
    }

    ...

    private CompletableFuture < R > adapt(RequestReplyFuture < K, V, R > requestReplyFuture) {
        CompletableFuture < R > completableResult = new CompletableFuture < R > () {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean result = requestReplyFuture.cancel(mayInterruptIfRunning);
                super.cancel(mayInterruptIfRunning);
                return result;
            }
        };
        // добавим коллбек к результату отправки запроса
        requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback < SendResult < K, V >> () {
            @Override
            public void onSuccess(SendResult < K, V > sendResult) {
                // NOOP
            }
            @Override
            public void onFailure(Throwable t) {
                completableResult.completeExceptionally(t);
            }
        });
        // добавим коллбек к ответу
        requestReplyFuture.addCallback(new ListenableFutureCallback < ConsumerRecord < K, R >> () {
            @Override
            public void onSuccess(ConsumerRecord < K, R > result) {
                completableResult.complete(result.value());
            }
            @Override
            public void onFailure(Throwable t) {
                completableResult.completeExceptionally(t);
            }
        });
        return completableResult;
    }
}


Упакуем это в утилитную библиотеку и теперь у нас есть API, который намного больше соответствует основной философии проектирования Spring «Соглашение над Конфигурацией» («Convention over Configuration»). Вот итоговый код клиента:

 @Autowired
 private CompletableFutureReplyingKafkaTemplate < String, Request, Reply > requestReplyKafkaTemplate;

 ...

 requestReplyKafkaTemplate.requestReply(request).thenAccept(reply - >
     System.out.println("Reply: " + reply.toString());
 );


Подводим итоги


Подводя итог, Spring для Kafka 2.2 обеспечивает полностью функциональную реализацию паттерна Request-Reply в Apache Kafka, но API по-прежнему имеет некоторые шероховатости. В этом выпуске мы увидели, что некоторые дополнительные абстракции и адаптации API могут дать более логичный высокоуровневый API.

Предупреждение 1:
Одним из главных преимуществ архитектуры, управляемой событиями, является разделение (decoupling) продюсеров и консюмеров событий, что позволяет создавать гораздо более гибкие и эволюционирующие системы. Использование синхронной семантики «Запрос-Ответ» является полной противоположностью, когда запрашивающая и отвечающая стороны сильно связаны между собой. Следовательно, ее следует использовать только в случае необходимости.

Предупреждение 2:
Если требуется синхронный Запрос-Ответ, то протокол на основе HTTP намного проще и эффективнее, чем использование асинхронного канала типа Apache Kafka.
Тем не менее, могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл. Разумно выбирайте лучший инструмент для работы.

Полностью рабочий пример можно найти на сайте github.com/callistaenterprise/blog-synchronous-kafka.

Комментарии


Federico • 7 месяцев назад
А когда у нас есть гибридные потребности, допустим, в 50% кейсов нам нужен Запрос-Ответ и в 50% нам нужно событийное управление? Как нам это сделать? Конфигурация, необходимая Spring Kafka, выглядит довольно ужасно.

Jehanzeb Qayyum • 6 месяцев назад
Теперь Spring имеет дефолтную поддержку с использованием партиций в одном общем топике для ответа.

Начиная с версии 2.2, шаблон пытается определить топик для ответа или партицию из сконфигурированного контейнера ответа (reply container).

https://docs.spring.io/spring-kafka/reference/html/#replying-template

nir rozenberg • 8 месяцев назад
Привет,
В последнем абзаце вы написали, что могут быть сценарии, когда синхронный Запрос-Ответ через Kafka имеет смысл по сравнению с HTTP.
Можно привести примеры таких сценариев?
Спасибо,
Nir

Janne Keskitalo nir rozenberg • 8 месяцев назад
Мы собираемся разделить систему обработки транзакций большого объема на несколько микросервисов и у меня есть идея использовать обмен сообщениями Kafka «Запрос-Ответ» для достижения похожих способов обработки (processing affinity). В основном Kafka используется для маршрутизации всех вызовов одного клиента в один и тот же процесс обработчика транзакций, который затем последовательно их выполняет по одному. Такой вид обработки гарантирует линеаризируемость (https://stackoverflow.com/a/19515375/7430325), причинно-следственную связность, а также позволяет эффективное кэширование. По сути, усилия по координации были бы перенесены из базы данных в Kafka и мы могли бы запустить базу данных в строгом режиме изоляции Serializable.
Мне еще предстоит углубиться в детали нашей семантики транзакций, чтобы увидеть, где здесь не дотягивает, так что это пока просто идея.

Перевел @middle_java

© Habrahabr.ru