Что нового в Apache Kafka 4.0?

602cb7717f1049c70628bfce16273c0d.jpg

Не за горами выход релиза 4.0 Apache Kafka. Согласно графику релиза, 15 января состоится code freeze, а через пару недель или позже, после стабилизации, версия 4.0 увидит свет. Самое время присмотреться, что же в неё вошло.

Развитие Apache Kafka происходит в рамках процесса процесса работы с KIP’ами, Kafka Improvement Proposals. В релиз 4.0 их вошло 37 штук. Из них наиболее интересной мне показалась доработка, связанная с введением новой концепции, которую разработчики смело сравнили с очередями. Давайте посмотрим, что у них получилось. Остальные доработки, наверное, важны не менее.

Очереди!

«Очередям» в релизе посвящено 2 KIP’а:

KIP-932

Queues for Kafka

Очереди в kafka

Разработчики наконец признали важность полноценной реализации в Kafka сценария коллективной и конкурентной обработки потребителями событий, то есть без необходимости эксклюзивного назначения доступа к партиции топика. Для реализации данной функциональности, в дополнение к хорошо известным группам потребителей добавляются разделяемые группы (share group) — число потребителей в таких группах может превышать число партиций. Защита доступа к записи обеспечивается механизмом блокировки (по умолчанию, но 30с, может быть переопределено свойствомgroup.share.record.lock.duration.ms).

Для доступа к топикам посредством разделяемых групп вводится новый интерфейс KafkaShareConsumer по аналогии со старым KafkaConsumer.

KIP-1043

Administration of groups

Администрирование групп

В продолжение предыдущего KIP’а, в связи с расширением номенклатуры типов групп, которое продолжится и далее (см. KIP-1071), был добавлен новый способ получения информации о группах посредством утилиты kafka-groups.sh .

Как обычно, новая функциональность заявлена как экспериментальная, изменяющаяся и не рекомендуется к применению в продуктиве. Использование нового класса, в целом, аналогично старому.

Поддерживаются различные сценарии подтверждения, неявный:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer consumer = new KafkaShareConsumer<>(props);
consumer.subscribe(Arrays. asList("foo"));
while (true) {
  ConsumerRecords records = consumer.poll(Duration. ofMillis(100));
  for (ConsumerRecord record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    doProcessing(record);
  }
}  

Альтернативно, можно использовать consumer.commitSync() или consumer.commitAsync() для передачи подтверждений, но это менее эффективно, поскольку приводит к дополнительным вызовам. Возможно также подтверждение на уровне отдельных записей:

Properties props = new Properties();
props.setProperty("bootstrap. servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer consumer = new KafkaShareConsumer<>(props);
consumer.subscribe(Arrays. asList("foo"));
while (true) {
  ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord record : records) {
    try {
      doProcessing(record);
      consumer.acknowledge(record, AcknowledgeType.ACCEPT);
    } catch (Exception e) {
      consumer.acknowledge(record, AcknowledgeType.REJECT);
      break;
    }
  }
  consumer.commitSync();
}

Транзакционное поведение групп определяется свойством group.share.isolation.level. Оно применяется ко всей группе, а не к отдельному потребителю. Свойство может принимать значение read_committed или read_uncommitted (по-умолчанию).

В рамках данного KIP также добавлена новая утилита командной строки kafka-console-share-consumer.sh. Помимо вышеперечисленных, добавлено множество новых свойств, изменено несколько старых, появились новые API. К сожалению, новой -perf.sh утилиты пока нет.

В общем, это новая, обширная и интересная тема, которая заслуживает подробного разбора.

Что ещё интересного?

Вот ещё несколько улучшений релиза:

KIP-1112

allow custom processor wrapping

Возможность пользовательских расширений потоковых процессоров

Добавлена возможность, наподобие аспектов, добавлять обёртку вокруг процессоров в Streams. Например, для добавления логирования или отладки. В поставке готовых содержательных врапперов нет, но идея интересная. Для включения опции надо задать конфигурацию processor.wrapper.class .

KIP-1104

Allow Foreign Key Extraction from Both Key and Value in KTable Joins

Возможность join’ов на основе ключа и значения

Утверждается, что таким образом в некоторых потоковых сценариях можно предотвратить оверхед.

KIP-1106

Add duration based offset reset option for consumer clients

Добавление возможность указания сдвига на основе смещения по времени

В релизе 3.6 была введена возможность долговременного (многоуровневого) хранения данных (KIP-405, см. подробно тут). С учётом, что теперь данные в кластере могут храниться годы, прежних возможностей по указанию смещения (earliest и latest) уже не достаточно. В этом релизе появилась возможность указания сдвига по времени через опцию by_duration: . duration может принимать значение в обычном формате ISO8601, например, 30d. Аналогичные опции доступны для shared group и streams.

KIP-1105

Make remote log manager thread-pool configs dynamic

Динамическая настройка RemoteLogManager

Если вы уже пользуетесь долговременным хранением, то вас заинтересует возможность налету менять настройки пула потоков RemoteLogManager, системного механизма, находящегося под капотом данной фичи.

KIP-1089

Allow disabling heartbeats replication in MirrorSourceConnector

Возможность отключения репликации heartbeat-топиков в MirrorSourceConnector

Данное улучшение закрывает потенциально возможную проблему, когда при настройке репликации MirrorMaker 2.0 вы создаёте несколько коннекторов (например, для топиков различными настройками сжатия). Дело в том, что ранее в таком случае каждый из коннекторов безусловно добавлял себе в обработку служебные heartbeat-топики. Теперь их можно явно исключить из репликации.

KIP-1074

Allow the replication of user internal topics

Возможность репликации internal-топиков

До релиза 4.0 топики, название которых заканчивалось на «internal» исключались из репликации. Как оказалось, так могут называться и пользовательские топики с данными, поэтому условие исключение было дополнено, и теперь нереплицируемые топики должны дополнительно начинаться на «mm2».

KIP-724 удалил совместимость с клиентами и брокерами версии 2.1 и ниже. Обновляйтесь!

Также в релизе в модулях broker и tools удалена поддержка Java 11.

Важными с точки зрения совместимости кода являются обновления библиотек:

© Habrahabr.ru