Как использовать JDBC коннектора Kafka для повышения производительности обработки и записи данных
Аналитик ГК Юзтех
Здравствуй, мир!
Коллеги, доброго дня и отличного настроения!
Меня зовут Ремизов Роман, я системный аналитик ГК Юзтех, и в этой статье я расскажу про использование JDBC коннектора Kafka для обработки нескольких сообщений в одном агрегате, с целью оптимизации производительности записи данных в базу.
Статья будет полезна тем системным аналитикам, на проектах которых в работе имеются высоконагруженные системы в микро-сервисной архитектуре, а в качестве очереди сообщений используется Kafka. В роли базы данных выступает PostgreSQL старше версии 15.
Область применения.
Мой реальный опыт использования JDBC коннектора Kafka был опробован:
на высоконагруженных маркетплейсах, где было необходимо из множества однотипных операций собирать финансовую отчётность;
на продуктах финтеха, где для главной книги собирается одна сводная сущность из множества однотипных аналитических проводков;
на медицинской системе, где годовые, пятилетние и десятилетние отчётные показатели собирались из ежемесячных региональных рапортов.
Общее описание проблемы.
Чтобы повысить количество RPS (запросов в секунду) для операции upsert в PostgreSQL при, например, 10 инстансах микро-сервиса без добавления аппаратной мощности возможно учесть следующие моменты:
Оптимизация индексов:
Убедитесь, что у Вас есть подходящие индексы на столбцах, используемых для поиска и обновления данных. Индексы помогут ускорить операции поиска и обновления.
— Здесь лучше отказаться от UQ (всех дополнительных уникальных индексов, кроме первичного ключа) на таблице агрегата, а уникальность строки проверять через uuid целевой сущности, генерируемом через передачу атрибутов ключей агрегирования в хеш-функцию (например, md5).
Пакетные операции:
Вместо выполнения каждой операции upsert отдельно, попробуйте использовать пакетные операции. Вы можете собрать несколько операций upsert в одну транзакцию и выполнить их одновременно. Это может существенно увеличить производительность.
— А коннектор JDBC, о котором речь пойдёт ниже, может выполнять первую итерацию агрегирования. Она же может быть достаточной. Только её стоит ограничить по объёму, например, до десяти сообщений.
Параллельное выполнение:
Если Ваша система позволяет, вы можете попробовать распараллелить операции upsert между инстансами микросервиса.
— Это может быть достигнуто, например, путем разделения данных на несколько групп и назначения каждой группе своего инстанса для обработки.
Описание решения.
Относительно вопроса о записи нескольких сообщений upsert из одного топика Kafka в PostgreSQL одной транзакцией без потери данных и с хорошей производительностью, — это вполне возможно. Для достижения этой цели предлагается использовать Kafka Connect с коннектором JDBC. Коннектор JDBC позволяет записывать данные из Kafka в PostgreSQL, а транзакционная поддержка PostgreSQL обеспечивает целостность данных.
Можно настроить коннектор JDBC для записи данных в PostgreSQL и настроить его для использования транзакций. Таким образом, все сообщения из топика Kafka будут записываться в PostgreSQL в рамках одной транзакции, что обеспечит целостность данных и хорошую производительность.
Kafka Connect с использованием JDBC коннектора позволяет интегрировать Kafka с реляционными базами данных для передачи данных между центром обработки данных и системой хранения. В нашем сценарии, где необходимо вычитывать несколько сообщений одним инстансом микро-сервиса и записывать их в БД в одну транзакцию, процесс работает следующим образом:
1. Чтение сообщений из Kafka:
Kafka Connect использует Kafka Consumer для чтения сообщений из определенной темы (topic).
Коннектор будет настроен на задачу вычитывания сообщений, выбирая их из темы и помещая их в очередь для обработки.
2. Обработка сообщений:
После получения нескольких сообщений (пакетов), микро-сервис или JDBC коннектор может обработать их в единое целое для выполнения upsert операции.
Наша задача — собрать данные из вычитанных сообщений в подходящий формат (например, объединить в одно сообщение).
3. Создание транзакции:
Когда приходит время записывать данные в БД, коннектор открывает транзакцию.
Все сообщения из собранного пакета будут записаны в базу именно в рамках этой транзакции, что гарантирует согласованность данных.
Как работает ack?
По принципу «Ack после обработки»:
До того, как данные будут отправлены в БД, Kafka не отдает подтверждение (ack) о том, что сообщения были успешно обработаны. Это означает, что сообщения остаются в offset и могут быть переработаны, если обработка окажется неуспешной.
Как только записи в БД проходят успешно и транзакция завершается, лишь тогда Kafka Connect отправляет ack на соответствующее смещение в партициях Kafka, указывая, что сообщения были успешно обработаны и могут быть «убраны» из очереди.
Что произойдет в случае сбоя?
1. Сбой после вычитки, но до записи:
Если микро-сервис упадет до завершения транзакции и записи данных в БД, Kafka Connect не зафиксирует ack и не обновит offset для этих сообщений.
В результате, при следующем запуске микро-сервиса или повторной активации коннектора он начнет обрабатывать сообщения с того же самого места, где остановился.
2. Восстановление и повторная обработка:
Система будет повторно обрабатывать те же сообщения, которые не были успешно записаны в БД, что позволяет избежать потери данных. Это свойство повторной обработки обеспечивается за счет хранения смещений и того факта, что ack не был отправлен.
3. Конфигурация:
Важно правильно настроить параметры коннектора и Kafka, чтобы контролировать уровень гарантии сообщений и процесс обработки ошибок. Например, сбор данных в одну транзакцию и их размеры должны быть адекватно настроены для обеспечения хорошей производительности и надежности.
Плюсы Kafka Connect с JDBC коннектором.
1. Простота интеграции: Позволяет легко интегрировать Kafka с реляционными базами данных, без необходимости создания сложных решений для передачи данных.
2. Масштабируемость: Kafka Connect поддерживает горизонтальное масштабирование. Можно добавлять дополнительные экземпляры коннекторов для увеличения пропускной способности и обработки сообщений.
3. Поддержка различных баз данных: JDBC коннектор может работать с широким спектром реляционных баз данных, таких как PostgreSQL, MySQL, Oracle, SQL Server и другими.
4. Управление схемами: поддержка схемы данных при использовании Kafka Schema Registry, что позволяет избежать ошибок, связанных с несовместимыми изменениями схем.
5. Пакетная обработка: возможность пакетной обработки сообщений позволяет оптимизировать производительность при записи в базу данных.
6. Поддержка транзакций: возможность обрабатывать данные в транзакциях, что обеспечивает согласованность при записи в БД.
7. Гибкость настроек: предоставляет возможность настроить параметры подключения к БД, а также конфигурацию обработки данных посредством настроек.
Минусы Kafka Connect с JDBC коннектором.
1. Задержка в обработке: в зависимости от конфигурации, существует возможность возникновения задержек между получением сообщений из Kafka и их записью в базу данных. Это может стать проблемой для приложений, требующих низкой задержки.
2. Производительность: Коннектор может иметь ограничения по производительности, особенно при большом объеме данных или высокой нагрузке на базу данных. Лучше всего использовать его для больших, но редких транзакций, а не для потокового ввода-вывода. Отсюда вытекает ранее обозначенное ограничение в десять сообщений. В вашем случае этот показатель может варьировать в зависимости от размера одного сообщения.
3. Проблемы с управлением состоянием: Корректное управление состоянием в данном контексте может требовать дополнительных усилий, особенно в отношении обработки ошибок и повторной обработки сообщений.
4. Сложность конфигурации: Хотя базовая настройка может быть простой, сложные сценарии могут требовать значительных усилий по конфигурации, особенно когда необходимо учитывать схемы данных, механизмы обработки ошибок и другие аспекты.
5. Эффективность работы с большими объемами данных: Kafka Connect в некоторых случаях может не справляться с высокими объемами данных так же эффективно, как специализированные решения для ETL (Extract, Transform, Load), особенно когда дело касается сложной обработки данных.
6. Обновление данных: Если коннектор настроен для обновления данных (например, при использовании upsert), это может потребовать дополнительной логики и осторожности, чтобы избежать конфликтов и несоответствий. Об этом я напишу в отдельной статье.
Этапы настройки.
Для считывания нескольких сообщений из Kafka и объединения их в одно перед записью в реляционную базу данных с использованием Kafka Connect и JDBC коннектора потребуется организовать несколько этапов настройки.
Этапы настройки
1. Определение архитектуры
Прежде всего, нужно определить архитектуру решения. Потребуется:
Топик Kafka для производимых сообщений.
Kafka Connect для извлечения данных из Kafka и записи их в базу данных.
JDBC коннектор для записи данных в реляционную базу данных.
2. Настройка Kafka Connect
Убедитесь, что у вас уже установлен и настроен Kafka Connect. Для этого нужно следующее:
Java Development Kit (JDK).
Apache Kafka.
Kafka Connect.
JDBC драйвер для вашей базы данных (например, PostgreSQL).
3. Создание и настройка коннектора
Создание коннектора JDBC происходит через определение JSON-конфигурации. Однако, для задачи по объединению нескольких сообщений вам нужно будет настроить промежуточный процесс, который может объединять данные перед записью в базу данных.
3.1. Кастомизация
Чтобы удовлетворить требования о сборке нескольких сообщений в одно, вероятно, потребуется разработать собственный обработчик/поток обработки данных между потреблением сообщений и записью в базу данных. Это может быть сделано с использованием Kafka Streams или другими средствами обработки данных.
Если все же не планируется кастомизация, используются следующие шаги:
— Создание коннектора для чтения данных из Kafka. Например:
{
"name": "source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"file": "/path/to/input/file",
"topic": "source-topic"
}
}
— Создание задач потока обработки для агрегации сообщений. Это может быть реализовано с помощью Kafka Streams. Пример could может выглядеть следующим образом на Java:
KStream
KTable
.groupByKey()
.reduce((aggValue, newValue) -> merge(aggValue, newValue));
Здесь merge — это указанная вами логика объединения сообщений.
3.2. Запись в базу данных
Настройте JDBC Sink Connector для записи в базу данных:
{
"name": "jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "aggregated-topic",
"connection.url": "jdbc:postgresql://localhost:5432/yourdb",
"connection.user": "youruser",
"connection.password": "yourpassword",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "record_value",
"pk.fields": "your_primary_key_field"
}
}
Здесь:
topics определяет, где будет размещаться конечный поток данных.
Используйте настройки для соединения с вашей базой данных.
4. Запуск и мониторинг
После настройки коннектора и его запуска наблюдайте за производительностью и консистентностью данных. Убедитесь, что данные корректно обрабатываются и записываются в базу данных.
5. Транзакции и согласованность
Убедитесь, что вы настроили коннекторы так, чтобы они обрабатывали сообщения в рамках одной транзакции. Можно использовать поддерживаемые транзакционные режимы в Kafka Connect, чтобы гарантировать согласованность данных.
Напоследок.
В этой статье мы с вами вместе прошлись (или освежили знания) по опыту использования JDBC коннектора Kafka.
Если в вашей работе требуется найти элегантное решение для обработки нескольких сообщений в одном агрегате с целью оптимизации производительности записи данных в базу, то JDBC коннектор Kafka — это один из проверенных временем, и не только временем, вариантов.