Типы совместимости в Schema Registry для Apache Kafka

37384f868fde2da75c8c6b6cd7d5121f

В прошлой статье я писал о том, что такое Schema Registry и как используется в Apache Kafka. Сегодня я хочу углубиться в тему и описать поведение системы при различных типах совместимости. Правильное понимание и применение этих типов совместимости поможет обеспечить стабильность и гибкость системы при изменениях структуры данных.

Для удобства, ниже представлена таблица, которая обобщает информацию о различных типах совместимости схем в Confluent Schema Registry:

Тип совместимости

Разрешенные изменения

Проверка с какими схемами

Обновлять в первую очередь

BACKWARD

— Удаление полей
— Добавление опциональных полей

Последняя версия

Consumer

BACKWARD_TRANSITIVE

— Удаление полей
— Добавление опциональных полей

Все предыдущие версии

Consumer

FORWARD

— Добавление полей
— Удаление опциональных полей

Последняя версия

Producer

FORWARD_TRANSITIVE

— Добавление полей
— Удаление опциональных полей

Все предыдущие версии

Producer

FULL

— Добавление опциональных полей
— Удаление опциональных полей

Последняя версия

В любом порядке

FULL_TRANSITIVE

— Добавление опциональных полей
— Удаление опциональных полей

Все предыдущие версии

В любом порядке

NONE

— Все изменения разрешены

Проверка совместимости отключена

Зависит от ситуации

Теперь давайте рассмотрим более детальнее типы совместимости, которые помогут поддерживать стабильность ваших данных и сервисов.

Confluent Schema Registry поддерживает несколько типов совместимости, включая:

  • None

  • Backward Compatibility

  • Forward Compatibility

  • Full Compatibility

  • Backward Transitive Compatibility

  • Forward Transitive Compatibility

  • Full Transitive Compatibility

Разберем этих 7 самураев совместимости и посмотрим, как каждый из них работает на практике.

Начнем с самого простого и неинтересного тривиального типа — None. Этот тип совместимости означает, что схема может быть изменена без каких-либо ограничений. При использовании None Schema Registry не проверяет новые схемы на совместимость с предыдущими версиями. Если consumer и producer данных ожидают различные структуры данных, то не трудно догадаться какие последствия могут возникнуть.

Теперь разберем более сложные и интересные типы совместимости, которые помогут вам поддерживать стабильность ваших данных и сервисов.

Backward Compatibility

Обратная совместимость означает, что новые версии схем должны быть совместимы с предыдущими версиями. Это означает, что данные, записанные с использованием старых версий схем, могут быть прочитаны новыми версиями схем без ошибок. Это полезно для ситуаций, когда у вас есть старые данные, которые должны быть доступны для новых приложений.

  • Producer: Новые данные, которые записываются с использованием новой версии схемы, должны быть совместимы с предыдущими версиями. Producer использует схему, зарегистрированную в Schema Registry, и если новая схема несовместима с предыдущими, Producer получит ошибку при попытке отправить данные.

  • Consumer: Потребители могут продолжать читать данные, используя новую версию схемы, даже если данные были записаны с использованием старой версии. Consumer может столкнуться с ситуацией, когда новые поля, добавленные в схему, отсутствуют в старых данных.

// Старая версия схемы
var oldSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";

// Новая версия схемы с добавленным полем "age"
var newSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"},{\\"name\\":\\"age\\",\\"type\\":\\"int\\",\\"default\\":0}]}";

// Producer с новой схемой
var producer = new KafkaProducer(producerProps);
var schema = new Schema.Parser().parse(newSchema);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
user1.put("age", 30); // Новое поле "age"
producer.send(new ProducerRecord<>("users", "key1", user1));

// Consumer с новой схемой, читает данные, записанные со старой схемой
var consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
    var records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord record : records) {
        GenericRecord user = record.value();
        var name = user.get("name").toString();
        
        // Старые данные могут не содержать "age", поэтому проверяем наличие
        var age = user.get("age") != null ? (Integer) user.get("age") : 0; // Устанавливаем значение по умолчанию для старых данных
    }
}

Producer:

  • ➕ Возможность добавлять новые поля без нарушения существующих данных.

  • ➕ Позволяет расширять схему с минимальными изменениями в коде.

  • ➖ Невозможность удалить поля без предоставления значения по умолчанию.

  • ➖ Необходимо тщательное управление версионированием схем, чтобы избежать ошибок совместимости.

Consumer:

  • ➕ Гарантия того, что данные, записанные старыми версиями схемы, будут читаться без ошибок.

  • ➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.

  • ➖ Старые данные могут не содержать новые поля, что требует дополнительных проверок и обработки.

  • ➖ Ограниченная гибкость при изменении структуры данных, поскольку новые поля должны быть совместимы с предыдущими версиями.

Forward Compatibility

Прямая совместимость означает, что старые версии схем должны быть совместимы с новыми версиями. Это означает, что данные, записанные с использованием новых версий схем, могут быть прочитаны старыми версиями схем без ошибок.

  • Producer: Producer может добавлять новые поля и удалять опциональные поля. Если схема, зарегистрированная в Schema Registry, несовместима с предыдущими, Producer получит ошибку при попытке отправить данные.

  • Consumer: Потребители могут продолжать читать данные, используя старую версию схемы, даже если данные были записаны с использованием новой версии.

// Старая версия схемы
var oldSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";

// Новая версия схемы с добавленным полем "age"
var newSchema = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\"},{\\"name\\":\\"age\\",\\"type\\":\\"int\\"}]}";

// Producer с новой схемой
var producer = new KafkaProducer(producerProps);
var schema = new Schema.Parser().parse(newSchema);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
user1.put("age", 30); // Новое поле "age"
producer.send(new ProducerRecord<>("users", "key1", user1));

// Consumer со старой схемой, читает данные, записанные с новой схемой
var consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
    var records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord record : records) {
        GenericRecord user = record.value();
        String name = user.get("name").toString();
        
        // Поскольку consumer использует старую схему, он игнорирует поле "age"
        System.out.println("User: " + name);
    }
}

Producer:

  • ➕ Возможность добавлять новые поля и удалять опциональные поля.

  • ➕ Позволяет расширять схему с минимальными изменениями в коде.

  • ➖ Producer должен гарантировать, что новые данные могут быть прочитаны старыми потребителями.

  • ➖ Необходимо тщательное управление версионированием схем, чтобы избежать ошибок совместимости.

Consumer:

  • ➕ Гарантия того, что старые версии схем могут читать новые данные без ошибок.

  • ➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.

  • ➖ Старые потребители могут игнорировать новые поля, что может требовать дополнительных проверок и обработки.

  • ➖ Ограниченная гибкость при изменении структуры данных, поскольку новые данные должны быть совместимы со старыми версиями схем.

Full Compatibility

Полная совместимость означает, что новая версия схемы должна быть совместима как с предыдущими версиями (backward compatibility), так и с будущими версиями (forward compatibility). Это обеспечивает максимальную гибкость и стабильность при работе с данными, так как гарантирует, что данные, записанные как старыми, так и новыми версиями схем, могут быть прочитаны без ошибок любой версией схемы. Самый любимый и часто используемый мною вариант!

  • Producer: Новая схема должна позволять запись данных, которые могут быть прочитаны как старыми, так и новыми потребителями. Producer может добавлять опциональные поля и удалять опциональные поля. Если схема, зарегистрированная в Schema Registry, несовместима с предыдущими и будущими версиями, Producer получит ошибку при попытке отправить данные.

  • Consumer: Потребители могут читать данные, записанные как старыми, так и новыми версиями схем. Это требует тщательной проверки, чтобы новые поля были опциональными и имели значения по умолчанию, чтобы избежать ошибок.

var oldSchema = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
var newSchema = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\", \"int\"], \"default\":null}]}";

// Producer с новой схемой
var producer = new KafkaProducer(producerProps);
var schema = new Schema.Parser().parse(newSchema);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
// Новое поле "age" может быть не указано, так как оно опциональное
producer.send(new ProducerRecord<>("users", "key1", user1));

// Consumer с новой схемой, читает данные, записанные с использованием старой схемы
var consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
    var records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord record : records) {
        var user = record.value();
        var name = user.get("name").toString();
        
        // Старые данные могут не содержать "age", поэтому проверяем наличие
        var age = user.get("age") != null ? (Integer) user.get("age") : null; // Устанавливаем значение по умолчанию для старых данных
        System.out.println("User: " + name + ", Age: " + (age != null ? age : "N/A"));
    }
}

Producer:

  • ➕ Возможность добавлять опциональные поля и удалять опциональные поля.

  • ➕ Гарантия, что данные будут совместимы как с предыдущими, так и с будущими версиями схем.

  • ➖ Ограничения на изменения, чтобы гарантировать совместимость с предыдущими и будущими версиями.

Consumer:

  • ➕ Гарантия, что данные, записанные старыми и новыми версиями схем, будут читаться без ошибок.

  • ➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.

  • ➖ Требуется проверка и обработка опциональных полей, чтобы избежать ошибок при чтении данных.

  • ➖ Ограниченная гибкость при изменении структуры данных, поскольку новые данные должны быть совместимы как с предыдущими, так и с будущими версиями схем.

Все типы совместимости выше не учитывают историчность схем, как бы смотрим на шаг назад или вперед поэтому следующие форматы на мой взгляд более серьезные которые требует тщательного анализа прежде чем их тащить к себе в production! Ниже я не буду описывать все транзитивные зависимости, а лучше опишу самую на мой взгляд осторожную и безопасную стратегию, остальные думаю будут очевидны складывая материал далее и что уже есть в статье!

Full Transitive Compatibility

Полная транзитивная совместимость — это наиболее строгий тип совместимости схем. Он требует, чтобы новая версия схемы была совместима со всеми предыдущими и будущими версиями. Это означает, что данные, записанные любой версией схемы, могут быть прочитаны любой другой версией схемы без ошибок. Этот тип совместимости обеспечивает максимальную стабильность данных.

  • Producer: новая схема должна позволять запись данных, которые могут быть прочитаны как старыми, так и новыми потребителями. Producer может добавлять опциональные поля и удалять опциональные поля. Если схема, зарегистрированная в Schema Registry, несовместима с любой из предыдущих и будущих версий, Producer получит ошибку при попытке отправить данные.

  • Consumer: могут читать данные, записанные как старыми, так и новыми версиями схем. Это требует тщательной проверки, чтобы новые поля были опциональными и имели значения по умолчанию, чтобы избежать ошибок.

var schemaV1 = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
var schemaV2 = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\", \"int\"],\"default\":null}]}";
var schemaV3 = "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\", \"int\"],\"default\":null},{\"name\":\"email\",\"type\":[\"null\", \"string\"],\"default\":null}]}";

// Producer с третьей версией схемы
var producer = new KafkaProducer(producerProps);
var schema = new Schema.Parser().parse(schemaV3);
var user1 = new GenericData.Record(schema);
user1.put("name", "Alice");
user1.put("age", 30); // Опциональное поле "age"
user1.put("email", "alice@example.com"); // Опциональное поле "email"
producer.send(new ProducerRecord<>("users", "key1", user1));

// Consumer с третьей версией схемы, читает данные, записанные со всеми предыдущими версиями схемы
var consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
    var records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord record : records) {
        var user = record.value();
        var name = user.get("name").toString();
        
        // Старые данные могут не содержать "age" и "email", поэтому проверяем наличие
        var age = user.get("age") != null ? (Integer) user.get("age") : null;
        var email = user.get("email") != null ? user.get("email").toString() : "N/A";
        System.out.println("User: " + name + ", Age: " + (age != null ? age : "N/A") + ", Email: " + email);
    }
}

Producer:

  • ➕ Гарантия, что данные, записанные с использованием новой версии схемы, будут совместимы со всеми предыдущими и будущими версиями.

  • ➕ Обеспечивает максимальную стабильность и совместимость данных.

  • ➖ Ограничения на изменения, чтобы гарантировать совместимость со всеми предыдущими и будущими версиями схем.

  • ➖ Требует тщательного управления версиями схем и их изменениями.

Для Consumer:

  • ➕ Гарантия, что данные, записанные любой предыдущей и будущей версией схемы, будут читаться без ошибок.

  • ➕ Упрощает процесс миграции на новые версии схемы без изменения логики обработки данных.

  • ➖ Необходимость проверки и обработки данных, чтобы избежать ошибок при чтении данных с разных версий схем.

  • ➖ Ограниченная гибкость при изменении структуры данных, поскольку новые данные должны быть совместимы со всеми предыдущими и будущими версиями схем.

Каждая стратегия эволюции схем в Confluent Schema Registry имеет свои плюсы и минусы. Выбор подходящей стратегии зависит от ваших потребностей, стабильности данных и требуемой гибкости! Вот краткое резюме рассмотренных стратегий:

  • Backward Compatibility: Обеспечивает совместимость с предыдущими версиями схемы. Отлично подходит, когда нужно минимизировать изменения в данных.

  • Forward Compatibility: Обеспечивает совместимость с будущими версиями схемы. Полезно при частых обновлениях схем.

  • Full Compatibility: Объединяет плюсы Backward и Forward Compatibility, обеспечивая совместимость с предыдущими и будущими версиями схемы. Максимум гибкости и стабильности!

  • Full Transitive Compatibility: Максимально строгий и надежный тип совместимости, обеспечивающий, что данные любой версии схемы могут быть прочитаны любой другой версией схемы. Абсолютный чемпион по совместимости!

Чтобы облегчить работу с Confluent Schema Registry, вот несколько полезных ресурсов и инструментов:

Habrahabr.ru прочитано 3021 раз