Управление схемами в Kafka с использованием Schema Registry
Приветствую читателей! Меня зовут Темирлан, и на протяжении последних нескольких лет я активно использовал Apache Kafka в проектах в сферах финансовых технологий FinTech. Этот опыт позволил мне не только глубоко изучить возможности и преимущества Kafka, но и столкнуться с уникальными вызовами, связанными с обработкой и управлением большими потоками данных в критически важных системах. В этой статье я хочу поделиться своими знаниями и опытом работы с Schema Registry, ключевым компонентом для управления схемами данных в Apache Kafka.
Apache Kafka является мощным инструментом для обработки и передачи потоковых данных в реальном времени, который находит широкое применение в различных индустриях для обработки огромных объемов данных с низкой задержкой. В центре этой платформы лежит способность эффективно распределять данные между множеством производителей (producers) и потребителей (consumers), при этом поддерживая высокую пропускную способность и масштабируемость. Однако, с увеличением количества и разнообразия данных, возникает необходимость в управлении структурами этих данных, что обеспечивает Schema Registry. Этот компонент является критически важным для поддержания согласованности данных в Kafka, поскольку он управляет схемами сообщений и обеспечивает совместимость между различными версиями схем, что позволяет системам бесперебойно обмениваться данными даже при изменении структуры сообщений.
Schema Registry — это централизованное хранилище для схем сообщений, используемых в Apache Kafka, обеспечивающее управление и контроль версий схем данных. Основная задача Schema Registry — обеспечить, чтобы все сообщения, отправляемые в Kafka, соответствовали определенной схеме, что предотвращает возможные ошибки данных, вызванные несоответствием или изменением структуры данных. Schema Registry поддерживает проверку схем и управление версиями, позволяя разработчикам безопасно модифицировать схему данных без риска нарушения обработки или хранения данных.
Интеграция Schema Registry с Kafka осуществляется через использование специальных сериализаторов и десериализаторов. Например, при использовании Apache Kafka для отправки сообщений, производители данных могут автоматически регистрировать схемы с помощью следующего кода на Java:
import org.apache.kafka.common.serialization.Serializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
var producer = new KafkaProducer(props);
Этот код настраивает производителя Kafka для использования KafkaAvroSerializer
, который взаимодействует с Schema Registry для регистрации схем и обеспечивает, что все участники системы работают с согласованными и верифицированными схемами, что критически важно для эффективности и надежности потоков данных.
Управление схемами играет ключевую роль в поддержании качества и согласованности данных в распределенных системах, таких как Apache Kafka. Без эффективного управления схемами, системы могут столкнуться с серьезными проблемами, включая schema drift, когда изменения в структуре данных не согласованы между производителями и потребителями данных. Это может привести к ошибкам в обработке данных, потере данных или даже полному сбою системы при обработке несоответствующих сообщений.
Централизованное управление схемами, предоставляемое решениями вроде Schema Registry, предлагает множество преимуществ:
Согласованность и совместимость схем: управление версиями схем и обеспечение обратной и вперёд совместимости позволяет системам эффективно адаптироваться к изменениям без риска нарушения работы приложений.
Упрощение разработки и обслуживания: разработчики могут вносить изменения в схемы без страха нарушить процессы обработки данных, что сокращает время на тестирование и внедрение новых функций.
Улучшенная надежность данных: гарантия, что все сообщения соответствуют зарегистрированным схемам, снижает вероятность ошибок данных и улучшает общую надежность системы.
Примером практического применения централизованного управления схемами может служить сценарий, когда производитель данных вносит изменение в структуру отправляемых сообщений. Благодаря Schema Registry, изменение схемы регистрируется и верифицируется заранее, а потребители данных получают уведомление о новой версии схемы. Это позволяет потребителям данных подготовиться к обработке новой структуры данных без перебоев в их сервисах. Пример кода, иллюстрирующий это:
Consumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-awesome-topic"));
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
GenericRecord value = record.value();
// Обработка полученных данных с учетом актуальной схемы
}
}
} finally {
consumer.close();
}
Этот код демонстрирует, как потребитель Kafka использует KafkaAvroDeserializer
для десериализации сообщений в соответствии с актуальной схемой, предоставленной через Schema Registry. Таким образом, централизованное управление схемами не только упрощает обслуживание и разработку, но и повышает общую устойчивость и надежность системы обработки данных.
Теперь о каждом пункте подробнее!
Совместимость схем: Schema Registry поддерживает как обратную, так и вперёд совместимость схем. Это означает, что система может обрабатывать как старые, так и новые форматы данных без сбоев. Пример кода на Java показывает, как можно настроить сериализатор для отправки данных, учитывая обратную совместимость:
var producer = new KafkaProducer(props);
// Предположим, schema - это ваша Avro схема
var avroRecord = new GenericData.Record(schema);
// Заполнение avroRecord данными
producer.send(new ProducerRecord("your-awesome-topic", avroRecord));
producer.close();
Интеграция с API и сериализаторы/десериализаторы Kafka: Schema Registry тесно интегрирован с Kafka через API, которое позволяет сериализаторам и десериализаторам автоматически извлекать схемы для кодирования и декодирования сообщений. Ниже пример кода для десериализатора:
var consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("topic"));
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.println(record.value());
}
}
} finally {
consumer.close();
}
Версионирование схем: в основе управления схемами в Schema Registry лежит принцип версионирования. Каждый раз, когда производится изменение в схеме, оно регистрируется как новая версия в Registry. Это позволяет не только сохранять исторические версии схем, но и обеспечивать управление совместимостью между ними. Schema Registry поддерживает четыре режима совместимости: NONE
, BACKWARD
, FORWARD
, и FULL
. Например, режим гарантирует, что новые данные могут быть прочитаны старыми схемами, что критически важно для обеспечения стабильности систем при разработке и масштабировании.
var schemaString = "{\\"namespace\\": \\"example.avro\\", \\"type\\": \\"record\\", " +
"\\"name\\": \\"User\\", \\"fields\\": [{\\"name\\": \\"name\\", \\"type\\": \\"string\\"}]}";
var parser = new Schema.Parser();
var avroSchema = parser.parse(schemaString);
var client = new CachedSchemaRegistryClient("", 10);
int registeredSchemaId = client.register("user-value", new AvroSchema(avroSchema));
// Установка режима совместимости
client.updateCompatibility("user-value", "BACKWARD");
System.out.println("Schema registered with ID: " + registeredSchemaId);
В этом примере мы создаем новую схему для записей пользователя, регистрируем ее в Schema Registry и устанавливаем режим совместимости на BACKWARD
. Это обеспечивает, что все последующие версии схемы будут совместимы с предыдущими версиями, что позволяет старым приложениям корректно обрабатывать данные, произведенные новыми версиями приложений.
В данной статье мы рассмотрели ключевую роль Schema Registry в экосистеме Apache Kafka, начиная с обеспечения совместимости и версионирования схем до интеграции с API и сериализаторами/десериализаторами. Schema Registry служит важным инструментом для управления схемами сообщений, что необходимо для поддержания целостности и надежности данных в системах реального времени. Он предоставляет механизмы для обеспечения обратной и вперёд совместимости схем, что позволяет системам легко адаптироваться к изменениям без риска сбоев. Централизованное управление схемами через Schema Registry упрощает разработку и обслуживание распределенных систем, повышая их устойчивость и эффективность.