Как использовать JDBC коннектора Kafka для повышения производительности обработки и записи данных

d627cffd4c1ee7006419b740a02440e8.jpgРемизов Роман

Аналитик ГК Юзтех

Здравствуй, мир!

Коллеги, доброго дня и отличного настроения!

Меня зовут Ремизов Роман, я системный аналитик ГК Юзтех, и в этой статье я расскажу про использование JDBC коннектора Kafka для обработки нескольких сообщений в одном агрегате, с целью оптимизации производительности записи данных в базу.

Статья будет полезна тем системным аналитикам, на проектах которых в работе имеются высоконагруженные системы в микро-сервисной архитектуре, а в качестве очереди сообщений используется Kafka. В роли базы данных выступает PostgreSQL старше версии 15.

Область применения.

Мой реальный опыт использования JDBC коннектора Kafka был опробован:

  • на высоконагруженных маркетплейсах, где было необходимо из множества однотипных операций собирать финансовую отчётность;

  • на продуктах финтеха, где для главной книги собирается одна сводная сущность из множества однотипных аналитических проводков;

  • на медицинской системе, где годовые, пятилетние и десятилетние отчётные показатели собирались из ежемесячных региональных рапортов.

Общее описание проблемы.

Чтобы повысить количество RPS (запросов в секунду) для операции upsert в PostgreSQL при, например, 10 инстансах микро-сервиса без добавления аппаратной мощности возможно учесть следующие моменты:

  1. Оптимизация индексов:

Убедитесь, что у Вас есть подходящие индексы на столбцах, используемых для поиска и обновления данных. Индексы помогут ускорить операции поиска и обновления.

Здесь лучше отказаться от UQ (всех дополнительных уникальных индексов, кроме первичного ключа) на таблице агрегата, а уникальность строки проверять через uuid целевой сущности, генерируемом через передачу атрибутов ключей агрегирования в хеш-функцию (например, md5).

  1. Пакетные операции:

Вместо выполнения каждой операции upsert отдельно, попробуйте использовать пакетные операции. Вы можете собрать несколько операций upsert в одну транзакцию и выполнить их одновременно. Это может существенно увеличить производительность.

— А коннектор JDBC, о котором речь пойдёт ниже, может выполнять первую итерацию агрегирования. Она же может быть достаточной. Только её стоит ограничить по объёму, например, до десяти сообщений.

  1. Параллельное выполнение:

Если Ваша система позволяет, вы можете попробовать распараллелить операции upsert между инстансами микросервиса.

— Это может быть достигнуто, например, путем разделения данных на несколько групп и назначения каждой группе своего инстанса для обработки.

Описание решения.

Относительно вопроса о записи нескольких сообщений upsert из одного топика Kafka в PostgreSQL одной транзакцией без потери данных и с хорошей производительностью, — это вполне возможно. Для достижения этой цели предлагается использовать Kafka Connect с коннектором JDBC. Коннектор JDBC позволяет записывать данные из Kafka в PostgreSQL, а транзакционная поддержка PostgreSQL обеспечивает целостность данных.

Можно настроить коннектор JDBC для записи данных в PostgreSQL и настроить его для использования транзакций. Таким образом, все сообщения из топика Kafka будут записываться в PostgreSQL в рамках одной транзакции, что обеспечит целостность данных и хорошую производительность.

Kafka Connect с использованием JDBC коннектора позволяет интегрировать Kafka с реляционными базами данных для передачи данных между центром обработки данных и системой хранения. В нашем сценарии, где необходимо вычитывать несколько сообщений одним инстансом микро-сервиса и записывать их в БД в одну транзакцию, процесс работает следующим образом:

1. Чтение сообщений из Kafka:

  • Kafka Connect использует Kafka Consumer для чтения сообщений из определенной темы (topic).

  • Коннектор будет настроен на задачу вычитывания сообщений, выбирая их из темы и помещая их в очередь для обработки.

2. Обработка сообщений:

  • После получения нескольких сообщений (пакетов), микро-сервис или JDBC коннектор может обработать их в единое целое для выполнения upsert операции.

  • Наша задача — собрать данные из вычитанных сообщений в подходящий формат (например, объединить в одно сообщение).

3. Создание транзакции:

  • Когда приходит время записывать данные в БД, коннектор открывает транзакцию.

  • Все сообщения из собранного пакета будут записаны в базу именно в рамках этой транзакции, что гарантирует согласованность данных.

Как работает ack?

По принципу «Ack после обработки»:

  • До того, как данные будут отправлены в БД, Kafka не отдает подтверждение (ack) о том, что сообщения были успешно обработаны. Это означает, что сообщения остаются в offset и могут быть переработаны, если обработка окажется неуспешной.

  • Как только записи в БД проходят успешно и транзакция завершается, лишь тогда Kafka Connect отправляет ack на соответствующее смещение в партициях Kafka, указывая, что сообщения были успешно обработаны и могут быть «убраны» из очереди.

Что произойдет в случае сбоя?

1. Сбой после вычитки, но до записи:

  • Если микро-сервис упадет до завершения транзакции и записи данных в БД, Kafka Connect не зафиксирует ack и не обновит offset для этих сообщений.

  • В результате, при следующем запуске микро-сервиса или повторной активации коннектора он начнет обрабатывать сообщения с того же самого места, где остановился.

2. Восстановление и повторная обработка:

  • Система будет повторно обрабатывать те же сообщения, которые не были успешно записаны в БД, что позволяет избежать потери данных. Это свойство повторной обработки обеспечивается за счет хранения смещений и того факта, что ack не был отправлен.

3. Конфигурация:

  • Важно правильно настроить параметры коннектора и Kafka, чтобы контролировать уровень гарантии сообщений и процесс обработки ошибок. Например, сбор данных в одну транзакцию и их размеры должны быть адекватно настроены для обеспечения хорошей производительности и надежности.

Плюсы Kafka Connect с JDBC коннектором.

1. Простота интеграции: Позволяет легко интегрировать Kafka с реляционными базами данных, без необходимости создания сложных решений для передачи данных.

2. Масштабируемость: Kafka Connect поддерживает горизонтальное масштабирование. Можно добавлять дополнительные экземпляры коннекторов для увеличения пропускной способности и обработки сообщений.

3. Поддержка различных баз данных: JDBC коннектор может работать с широким спектром реляционных баз данных, таких как PostgreSQL, MySQL, Oracle, SQL Server и другими.

4. Управление схемами: поддержка схемы данных при использовании Kafka Schema Registry, что позволяет избежать ошибок, связанных с несовместимыми изменениями схем.

5. Пакетная обработка: возможность пакетной обработки сообщений позволяет оптимизировать производительность при записи в базу данных.

6. Поддержка транзакций: возможность обрабатывать данные в транзакциях, что обеспечивает согласованность при записи в БД.

7. Гибкость настроек: предоставляет возможность настроить параметры подключения к БД, а также конфигурацию обработки данных посредством настроек.

Минусы Kafka Connect с JDBC коннектором.

1. Задержка в обработке: в зависимости от конфигурации, существует возможность возникновения задержек между получением сообщений из Kafka и их записью в базу данных. Это может стать проблемой для приложений, требующих низкой задержки.

2. Производительность: Коннектор может иметь ограничения по производительности, особенно при большом объеме данных или высокой нагрузке на базу данных. Лучше всего использовать его для больших, но редких транзакций, а не для потокового ввода-вывода. Отсюда вытекает ранее обозначенное ограничение в десять сообщений. В вашем случае этот показатель может варьировать в зависимости от размера одного сообщения.

3. Проблемы с управлением состоянием: Корректное управление состоянием в данном контексте может требовать дополнительных усилий, особенно в отношении обработки ошибок и повторной обработки сообщений.

4. Сложность конфигурации: Хотя базовая настройка может быть простой, сложные сценарии могут требовать значительных усилий по конфигурации, особенно когда необходимо учитывать схемы данных, механизмы обработки ошибок и другие аспекты.

5. Эффективность работы с большими объемами данных: Kafka Connect в некоторых случаях может не справляться с высокими объемами данных так же эффективно, как специализированные решения для ETL (Extract, Transform, Load), особенно когда дело касается сложной обработки данных.

6. Обновление данных:   Если коннектор настроен для обновления данных (например, при использовании upsert), это может потребовать дополнительной логики и осторожности, чтобы избежать конфликтов и несоответствий. Об этом я напишу в отдельной статье.

Этапы настройки.

Для считывания нескольких сообщений из Kafka и объединения их в одно перед записью в реляционную базу данных с использованием Kafka Connect и JDBC коннектора потребуется организовать несколько этапов настройки.

Этапы настройки

1. Определение архитектуры

  • Прежде всего, нужно определить архитектуру решения. Потребуется:

  • Топик Kafka для производимых сообщений.

  • Kafka Connect для извлечения данных из Kafka и записи их в базу данных.

  • JDBC коннектор для записи данных в реляционную базу данных.

2. Настройка Kafka Connect

Убедитесь, что у вас уже установлен и настроен Kafka Connect. Для этого нужно следующее:

  • Java Development Kit (JDK).

  • Apache Kafka.

  • Kafka Connect.

  • JDBC драйвер для вашей базы данных (например, PostgreSQL).

3. Создание и настройка коннектора

Создание коннектора JDBC происходит через определение JSON-конфигурации. Однако, для задачи по объединению нескольких сообщений вам нужно будет настроить промежуточный процесс, который может объединять данные перед записью в базу данных.

3.1. Кастомизация

Чтобы удовлетворить требования о сборке нескольких сообщений в одно, вероятно, потребуется разработать собственный обработчик/поток обработки данных между потреблением сообщений и записью в базу данных. Это может быть сделано с использованием Kafka Streams или другими средствами обработки данных.

Если все же не планируется кастомизация, используются следующие шаги:

— Создание коннектора для чтения данных из Kafka. Например:

{

"name": "source-connector",

"config": {

 "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",

 "tasks.max": "1",

 "file": "/path/to/input/file",

 "topic": "source-topic"

}

}

— Создание задач потока обработки для агрегации сообщений. Это может быть реализовано с помощью Kafka Streams. Пример could может выглядеть следующим образом на Java:

KStream sourceStream = builder.stream("source-topic");

KTable aggregatedStream = sourceStream

 .groupByKey()

 .reduce((aggValue, newValue) -> merge(aggValue, newValue));

Здесь merge — это указанная вами логика объединения сообщений.

3.2. Запись в базу данных

Настройте JDBC Sink Connector для записи в базу данных:

{

"name": "jdbc-sink-connector",

"config": {

 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

 "tasks.max": "1",

 "topics": "aggregated-topic",

 "connection.url": "jdbc:postgresql://localhost:5432/yourdb",

 "connection.user": "youruser",

 "connection.password": "yourpassword",

 "auto.create": "true",

 "insert.mode": "insert",

 "pk.mode": "record_value",

 "pk.fields": "your_primary_key_field"

}

}

Здесь:

  • topics определяет, где будет размещаться конечный поток данных.

  • Используйте настройки для соединения с вашей базой данных.

4. Запуск и мониторинг

После настройки коннектора и его запуска наблюдайте за производительностью и консистентностью данных. Убедитесь, что данные корректно обрабатываются и записываются в базу данных.

5. Транзакции и согласованность

Убедитесь, что вы настроили коннекторы так, чтобы они обрабатывали сообщения в рамках одной транзакции. Можно использовать поддерживаемые транзакционные режимы в Kafka Connect, чтобы гарантировать согласованность данных.

Напоследок.

В этой статье мы с вами вместе прошлись (или освежили знания) по опыту использования JDBC коннектора Kafka.

Если в вашей работе требуется найти элегантное решение для обработки нескольких сообщений в одном агрегате с целью оптимизации производительности записи данных в базу, то JDBC коннектор Kafka — это один из проверенных временем, и не только временем, вариантов.

© Habrahabr.ru