Управление схемами в Kafka с использованием Schema Registry

f1e7a19a33536085096b790e12f15939

Приветствую читателей! Меня зовут Темирлан, и на протяжении последних нескольких лет я активно использовал Apache Kafka в проектах в сферах финансовых технологий FinTech. Этот опыт позволил мне не только глубоко изучить возможности и преимущества Kafka, но и столкнуться с уникальными вызовами, связанными с обработкой и управлением большими потоками данных в критически важных системах. В этой статье я хочу поделиться своими знаниями и опытом работы с Schema Registry, ключевым компонентом для управления схемами данных в Apache Kafka.

Apache Kafka является мощным инструментом для обработки и передачи потоковых данных в реальном времени, который находит широкое применение в различных индустриях для обработки огромных объемов данных с низкой задержкой. В центре этой платформы лежит способность эффективно распределять данные между множеством производителей (producers) и потребителей (consumers), при этом поддерживая высокую пропускную способность и масштабируемость. Однако, с увеличением количества и разнообразия данных, возникает необходимость в управлении структурами этих данных, что обеспечивает Schema Registry. Этот компонент является критически важным для поддержания согласованности данных в Kafka, поскольку он управляет схемами сообщений и обеспечивает совместимость между различными версиями схем, что позволяет системам бесперебойно обмениваться данными даже при изменении структуры сообщений.

Schema Registry — это централизованное хранилище для схем сообщений, используемых в Apache Kafka, обеспечивающее управление и контроль версий схем данных. Основная задача Schema Registry — обеспечить, чтобы все сообщения, отправляемые в Kafka, соответствовали определенной схеме, что предотвращает возможные ошибки данных, вызванные несоответствием или изменением структуры данных. Schema Registry поддерживает проверку схем и управление версиями, позволяя разработчикам безопасно модифицировать схему данных без риска нарушения обработки или хранения данных.

Интеграция Schema Registry с Kafka осуществляется через использование специальных сериализаторов и десериализаторов. Например, при использовании Apache Kafka для отправки сообщений, производители данных могут автоматически регистрировать схемы с помощью следующего кода на Java:

import org.apache.kafka.common.serialization.Serializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

var producer = new KafkaProducer(props);

Этот код настраивает производителя Kafka для использования KafkaAvroSerializer, который взаимодействует с Schema Registry для регистрации схем и обеспечивает, что все участники системы работают с согласованными и верифицированными схемами, что критически важно для эффективности и надежности потоков данных.

Управление схемами играет ключевую роль в поддержании качества и согласованности данных в распределенных системах, таких как Apache Kafka. Без эффективного управления схемами, системы могут столкнуться с серьезными проблемами, включая schema drift, когда изменения в структуре данных не согласованы между производителями и потребителями данных. Это может привести к ошибкам в обработке данных, потере данных или даже полному сбою системы при обработке несоответствующих сообщений.

Централизованное управление схемами, предоставляемое решениями вроде Schema Registry, предлагает множество преимуществ:

  1. Согласованность и совместимость схем: управление версиями схем и обеспечение обратной и вперёд совместимости позволяет системам эффективно адаптироваться к изменениям без риска нарушения работы приложений.

  2. Упрощение разработки и обслуживания: разработчики могут вносить изменения в схемы без страха нарушить процессы обработки данных, что сокращает время на тестирование и внедрение новых функций.

  3. Улучшенная надежность данных: гарантия, что все сообщения соответствуют зарегистрированным схемам, снижает вероятность ошибок данных и улучшает общую надежность системы.

Примером практического применения централизованного управления схемами может служить сценарий, когда производитель данных вносит изменение в структуру отправляемых сообщений. Благодаря Schema Registry, изменение схемы регистрируется и верифицируется заранее, а потребители данных получают уведомление о новой версии схемы. Это позволяет потребителям данных подготовиться к обработке новой структуры данных без перебоев в их сервисах. Пример кода, иллюстрирующий это:

Consumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-awesome-topic"));
try {
    while (true) {
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord record : records) {
            GenericRecord value = record.value();
            // Обработка полученных данных с учетом актуальной схемы
        }
    }
} finally {
    consumer.close();
}

Этот код демонстрирует, как потребитель Kafka использует KafkaAvroDeserializer для десериализации сообщений в соответствии с актуальной схемой, предоставленной через Schema Registry. Таким образом, централизованное управление схемами не только упрощает обслуживание и разработку, но и повышает общую устойчивость и надежность системы обработки данных.

Теперь о каждом пункте подробнее!

Совместимость схем: Schema Registry поддерживает как обратную, так и вперёд совместимость схем. Это означает, что система может обрабатывать как старые, так и новые форматы данных без сбоев. Пример кода на Java показывает, как можно настроить сериализатор для отправки данных, учитывая обратную совместимость:

var producer = new KafkaProducer(props);
// Предположим, schema - это ваша Avro схема
var avroRecord = new GenericData.Record(schema);
// Заполнение avroRecord данными
producer.send(new ProducerRecord("your-awesome-topic", avroRecord));
producer.close();

Интеграция с API и сериализаторы/десериализаторы Kafka: Schema Registry тесно интегрирован с Kafka через API, которое позволяет сериализаторам и десериализаторам автоматически извлекать схемы для кодирования и декодирования сообщений. Ниже пример кода для десериализатора:

var consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("topic"));
try {
    while (true) {
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord record : records) {
            System.out.println(record.value());
        }
    }
} finally {
    consumer.close();
}

Версионирование схем: в основе управления схемами в Schema Registry лежит принцип версионирования. Каждый раз, когда производится изменение в схеме, оно регистрируется как новая версия в Registry. Это позволяет не только сохранять исторические версии схем, но и обеспечивать управление совместимостью между ними. Schema Registry поддерживает четыре режима совместимости: NONE, BACKWARD, FORWARD, и FULL. Например, режим гарантирует, что новые данные могут быть прочитаны старыми схемами, что критически важно для обеспечения стабильности систем при разработке и масштабировании.


var schemaString = "{\\"namespace\\": \\"example.avro\\", \\"type\\": \\"record\\", " +
                       "\\"name\\": \\"User\\", \\"fields\\": [{\\"name\\": \\"name\\", \\"type\\": \\"string\\"}]}";
var parser = new Schema.Parser();
var avroSchema = parser.parse(schemaString);
var client = new CachedSchemaRegistryClient("", 10);
int registeredSchemaId = client.register("user-value", new AvroSchema(avroSchema));
// Установка режима совместимости
client.updateCompatibility("user-value", "BACKWARD");
System.out.println("Schema registered with ID: " + registeredSchemaId);

В этом примере мы создаем новую схему для записей пользователя, регистрируем ее в Schema Registry и устанавливаем режим совместимости на BACKWARD . Это обеспечивает, что все последующие версии схемы будут совместимы с предыдущими версиями, что позволяет старым приложениям корректно обрабатывать данные, произведенные новыми версиями приложений.

В данной статье мы рассмотрели ключевую роль Schema Registry в экосистеме Apache Kafka, начиная с обеспечения совместимости и версионирования схем до интеграции с API и сериализаторами/десериализаторами. Schema Registry служит важным инструментом для управления схемами сообщений, что необходимо для поддержания целостности и надежности данных в системах реального времени. Он предоставляет механизмы для обеспечения обратной и вперёд совместимости схем, что позволяет системам легко адаптироваться к изменениям без риска сбоев. Централизованное управление схемами через Schema Registry упрощает разработку и обслуживание распределенных систем, повышая их устойчивость и эффективность.

© Habrahabr.ru