Кафка: сложная простота

07d7807019c40a9d60e467a634acdc1c.jpg

Привет! Это Сергей Калинец из Parimatch Tech и эта публикация будет про Кафку. 

У нас много данных, которые нужно быстро обрабатывать, много сервисов и команд, поэтому мы выбрали Кафку, как нашу основную платформу для обмена данными. За годы ее использования насобиралось много разных ситуаций, о некоторых из них хотим рассказать. 

Успешный успех мало кому интересен, но вот проблемы — другое дело. Поэтому этот пост про проблемы, недопонимания и (местами) героические решения. Попутно я проясню некоторую специфику и базовые принципы работы Кафки, так что даже если вы никогда с ней не работали, смело читайте — будет интересно, и ваш старт с Кафкой возможно будет проще. Я намеренно не буду указывать названий конфигурационных параметров, их все можно нагуглить. Но если вам эти параметры нужны — пишите в комментариях. 

DISCLAIMER: я в целом не люблю англицизмы, но эта нелюбовь несколько избирательна. Меня бесит фраза «заранить кверю», а вот «задеплоить билд» — вполне ок. Терминология Кафки в большинстве случаев в моей голове звучит именно по-английски, поэтому в статье этих самых англицизмов будет немало. Спасибо вам за терпение.

А еще интересно — Кафка это он или она? С писателем (в честь которого был назван продукт) понятно, это мужчина. А вот сам продукт всегда называют «она». Я не буду тут оригинальным и тоже буду использовать женский род. 

Bootstrap Servers

Начнем мы с начала, а именно с подключения к Кафке.

При подключении к Кафке нужно указать так называемые bootstrap servers (а вот и английские термины). Обычно указывают адреса всех брокеров, из которых состоит кластер. Но на самом деле там достаточно указать не всех, а только некоторых из брокеров. Почему?  

Для ответа давайте разберем, как происходит подключение клиентов к Kafka. При подключении клиент указывает топик и его партишен (про партишены детальнее будет дальше). Чтобы начать писать в / читать из этого партишена, нужно подключиться к его лидеру, которым является один из брокеров кластера. Авторы кафки сжалились над нами, разработчиками и избавили от необходимости самостоятельно искать лидера. Можно подключиться к любому из брокеров, а он уже переподключит на лидера. 

То есть, для успешного подключения к кластеру достаточно знать адрес всего одного брокера. Зачем же передавать список?  

А список позволяет повысить доступность кластера в случаях, если какие-то из брокеров недоступны. Клиент подключается к брокерам из списка по очереди, пока какой-то не ответит. Так что для локальной разработки и тестов можно использовать один адрес, а для прода три — это вполне надежно (допускается недоступность двух брокеров, а это крайне исключительная ситуация). Всех брокеров указывать можно, но нецелесообразно.

Retention

Кафка с одной стороны максимально простая, а с другой — невообразимо сложная штука. Казалось бы, это — просто сервис, который позволяет писать и читать байты. Но есть тысячи разных настроек, управляющие как передачей этих байтов, так и их хранением. 

Например, есть настройки, которые говорят, сколько нужно хранить сообщения в топике. Ведь в отличии от типичных брокеров сообщений, которые только передают данные, кафка еще и умеет их хранить. Вообще, по своей сути, Кафка — это commit log (такая структура, куда можно дописывать только в конец). Это значит, что после того, как сообщение было принято Кафкой, она будет его хранить столько, сколько нужно. 

Вот это «сколько нужно» определяется настройками ретешнена (retention), и там есть разные варианты. Можно указать, чтобы сообщения удалялись через какое-то время, или когда общий их объем достигнет определенной величины. 

Само удаление происходит не сразу, а когда Кафка решит. А из-за разных технических особенностей даже решения может быть недостаточно. Почему?  

Кафка хранит данные в файлах на диске, эти файлы называются сегменты, всегда есть один активный сегмент, куда данные пишутся, когда этот сегмент вырастает до определенного размера или возраста, он становится неактивным, а вместо него появляется новый активный сегмент. Так вот — данные удаляются только в неактивных сегментах. Поэтому вполне может быть ситуация, когда ставят ретеншен в один день (например), но в настройках сегментов ничего не меняют (а там по дефолту сегмент должен прожить неделю или дорасти до гигабайта) и потом удивляются, почему старые данные не пропадают.

Были неприятные ситуации, когда мы конфигурировали retention в топиках, допустим две недели, а потом, в случае нештатных ситуаций, вычитывали топики сначала и с ужасом обнаруживали там старые данные, которые повторно обрабатывались. 

Общее правило — никогда не стройте свою логику на доверии к механизму очистки данных в Кафке. Она просто не дает таких гарантий.

Compaction

А кроме простого удаления есть еще и так называемый компакшен (compaction). Это когда Кафка удаляет не просто старые сообщения, а все предыдущие сообщения с одинаковым ключем (про ключи будет дальше). Тут по сути у нас происходит удаление сообщений внутри топика. Зачем это нужно?

Компакшен позволяет сэкономить место для хранения данных, которые нам не нужны. Если мы записываем изменения какой-то сущности, и делаем это в виде снепшотов (актуального состояния сущности после изменения), то нам уже не нужны предыдущие версии, достаточно последнего снепшота. Компакшен это как раз про то, чтобы эти предыдущие версии удалялись.

Топики с компакшеном можно рассматривать как таблицы в реляционной базе данных, где для одного ключа будет всегда одно значение. Круто же? Разработчики дочитывают документацию примерно до этого места, потом пишут сервисы, где ожидается не больше одного сообщения на ключ, а потом… плачут. 

Реальное удаление данных происходит опять же в неактивных сегментах и при определенных обстоятельствах. Там есть ряд конфигурационных параметров, которые этим всем управляют, но суть в том, что данные долго не будут исчезать, и это нужно учитывать в ваших дизайнах.

50f4a267ccfe755575342e559b8784eb.jpg

Tombstones

Ну и напоследок еще интересное про компакшен. Публикация сообщения с существующим ключом в топик с компакшеном это по сути, операция UPDATE. А если мы можем менять, то должны уметь и удалять. Для удаления нужно послать сообщение с ключем и пустым телом (буквально передать NULL вместо тела). И такая комбинация называется tombstone (надгробие) — такой себе null terminator истории одной записи. Давайте будем называть эту комбинацию меткой удаления. 

Так вот, эти метки хранятся в топике, чтобы консьюмеры (consumers — сервисы, которые читают из Кафки), когда дойдут до них, понимали, что запись с таким-то ключем уже всё, и нужно этот факт обработать. Но кроме этого, они еще и сами удаляются через какое-то время, не оставляя внутри топика никаких следов исходной записи. Время удаления конфигурируется отдельно. И это время не стоит делать слишком коротким, особенно если вы не знаете всех потребителей вашего топика. Ведь если метка удалится до того, как какой-то неспешный консьюмер его прочитает, то для него запись и не удалиться вовсе, а останется навечно. 

Это все вроде неплохо придумано и понятно описано, в общем, ничего не предвещало беды. Мы придумали сервис, который вычитывал список актуальных событий из топика и хранил это все в памяти. Спортивных событий много, но все они рано или поздно заканчиваются, и тогда можно их удалять. Удаление мы делали через те самые метки в топике с настроенным компакшеном. 

Но через какое-то время мы заметили, что время старта новых экземпляров нашего сервиса занимает все больше и больше времени. Long story short, оказалось, что метки не удаляются, несмотря на все правильно поставленные конфигурационные параметры.

Есть даже KIP-534, который уже должен быть исправлен, но мы еще не обновили нашу кафку, так что пока живем с этим багом. Решением было добавить еще delete политику, чтобы записи удалялись спустя определенное время, а чтобы не потерять события из далекого будущего, по которым нет изменений, мы делали периодические фейковые обновления. 

Offsets And Commits

Выше я уже писал, что Кафка — это не совсем брокер сообщений. Да, там продюсеры публикуют сообщения, а консьюмеры на них подписываются и потом читают, и даже что-то коммитят, но есть важные отличия, которые вгоняют новичков в смуту. Нередко слышу вопросы «а как перечитать сообщение», «как удалить его после прочтения», «как консьюмеру уведомить продюсера об успешной обработке» или «почему я получаю следующее сообщение, если я не закоммитил предыдущее». Все это от того, что опыт работы со стандартными брокерами не очень хорошо натягивается на кафку. Дело в том, что это не те коммиты, к которым привыкли разработчики.

Чтобы разобраться в этом вопросе, проще представлять топик кафки, как поток (stream, а не thread). Файловый или буфер в памяти. И работа с таким потоком состоит в том, что мы подключаемся к нему, указываем позицию с которой хотим читать данные и потом в цикле читаем все по порядку. Причем нам не нужно никак сообщать Кафке, что мы что-то успешно прочитали. Если консьюмер что-то вернул во время текущего вызова, то во время следующего вызова нам вернется следующее сообщение. 

Начальная позиция, которую мы указываем при подключении, называется смещением (offset). В отличии от файлового потока, ее нельзя изменять после подключения, но задавать можно как абсолютную величину (100500), так и относительную (с начала или с конца). Поскольку большинство сценариев подразумевают, что после рестарта сервиса ему нужно читать с той же позиции, где он остановился прошлый раз (а для этого нужно передать последний прочитанный офсет при подключении), Кафка дает из коробки механизм для упрощения. 

Вводится понятие консьюмер групп, и (если в двух словах) они умеют хранить свои активные смещения. Вообще, основное предназначение консьюмер групп — это горизонтальное масштабирование, но это оставим на попозже.

И когда в группу добавляется новый консьюмер, он начинает читать сообщения со смещения, которое будет следующим за сохраненным. И коммит в контексте Кафки это не подтверждение того, что сообщение успешно прочитано и обработано, а просто сохранение текущего смещения для данной консьюмер группы. Еще раз — ничего общего с подтверждением там нет.

Exactly Once Consumers

На фоне таких новостей получается, что частота коммитов может не совпадать с частотой получения сообщений. Если у нас большая плотность сообщений, отправка коммитов на каждое может здорово подсадить производительность, и для высоконагруженных систем смещения комитятся реже. Например, раз в несколько секунд. 

Такой подход может привести к тому, что какие-то сообщения могут быть обработаны больше одного раза. Например, сервис перезапустился после обработки сообщения №10, а успел закоммитить только №5. В результате после рестарта он прочитает заново сообщения 6–10. 

Эту особенность нужно всегда иметь в виду и добавлять в сервисы обеспечение идемпотентности (сложное слово, которое означает, что повторное выполнение операции не должно ничего поменять). Некоторые разработчики пытаются добиться exactly once семантики (когда сообщение может быть прочитано только один раз) с помощью заигрываний с частотой коммитов и разных настроек кафки. Например, явно отправляя коммит для каждого сообщения. 

Однако такой подход мало того, что значительно снижает производительность, так еще и все равно не гарантирует exactly once. Сообщение может быть обработано, но, если сервис или инфраструктура упадет во время отправки коммита, что приведет к повторному чтению того же сообщения после рестарта сервиса. 

Поэтому лучше всегда при проектировании ваших сервисов исходить из того, что чтение из кафки имеет семантику at least once (будет прочитано один или более раз). Тут стоит отметить, что для более высокоуровневых API (Kafka Streams, ksqlDB) exactly once processing возможен из коробки, и в будущих версиях (которые может уже есть) Producer / Consumer API клиентов он тоже появится.

Consumer Groups for Assign

Как-то у нас было замечено нашествие странных консьюмер групп в кластере. Обычно консьюмер группы называются осознанно, там указывается название сервиса, продукта или команды, и потом по этому названию можно найти потребителей топика. А эти странные группы были пустыми (не хранили никаких офсетов), и назывались без особых изысков — просто бестолковые GUIDы. Откуда же они взялись?

Вообще, консьюмер группы это отличный механизм для беззаботного масштабирования чтения, когда Кафка прячет от разработчиков сложности перераспределения партишенов между консьюмерами в случае добавления или удаления из группы. Но для любителей держать все под контролем предусмотрена возможность ручного управления. 

Когда консьюмер подключается с помощью метода Assign (), а не Subscribe (),  он получает полный контроль над ситуацией и может указывать конкретно из каких партишенов хочет читать. В таком случае консьюмер группы не нужны, но по каким-то причинам, ее все равно нужно указывать при создании консьюмера, и она будет создаваться в кластере.

И наши потеряшки оказались консьюмер группами, создаваемые сервисом, который использовал Assign (). Но почему их много и откуда там GUID?

Оказалось, что в примере .NET клиента с официального репозитория для именования группы используется GUID. В подавляющем большинстве сценариев, где фигурирует GUID, нам нужен уникальный идентификатор. И в этом случае мы использовали код, который генерирует новый GUID (Guid.NewGuid () в .NET). В результате при каждом старте сервиса создавалась новая консьюмер группа, а старая никуда не девалась. Это все выглядело максимально странно, совершенно непохоже на замысел создателей.

Во время очередного изучения примеров консьюмеров с Assign (), мы внезапно осознали, что там используется конструктор new Guid (). А результат его работы будет не уникальный GUID, а дефолтное значение, состоящее из всех нулей. Получается, что в данном примере в качестве названия группы использовалась константа, которая не менялась при перезапуске сервисов. Более того, можно использовать эту константу для всех консьюмеров вообще, а не ограничиваться одним сервисом.

Так что используйте константы для консьюмер групп во всех сценариях — и Subscribe () и Assign ().

Client Libraries

Если начинать знакомство с Кафкой с книжки (а это один из самых лучших способов), то скорее всего работа клиентов там будет описана на примере Java. 

Там будет много всего интересного и правильного написано, например, то, что клиентский код консьюмера скрывает под капотом довольно сложный протокол. В котором, кроме самого чтения данных, скрывается множество деталей работы с консьюмер группами, балансировки и прочее. 

Наверное именно поэтому клиентских библиотек так мало. Их по сути две — out of the box, работающая под JVM и librdkafka, написанная на C, и используемая под капотом библиотек всех остальных языков. И в их работе есть одно значительное отличие, связанное с публикацией офсетов. Java клиенты делают все в одном потоке, и все взаимодействие происходит во время вызова метода poll (). Этот метод по сути читает сообщения из Кафки, но при этом делает и другую работу — публикацию офсетов, транзакции и тд. Все происходит в одном потоке и разработчик может быть уверен, что если он прочитал сообщение, потом закоммитил какой-то офсет, и сервис вылетел до вызова метода poll, то этот офсет стопроцентно не будет сохранен в кафке и при перезапуске сервиса это сообщение будет вычитано заново. 

А вот librdkafka работает по-другому. Там есть фоновый поток, который периодически шлет коммиты. Так что после вызова метода Commit коммит может долететь до Кафки, а может и не долететь. Что еще хуже, при дефолтных настройках коммит может записаться, а сообщение не обработаться (тут есть больше деталей). Поэтому в librdkafka в большинстве сценариев лучше делать вот такие настройки.

Default Partitioners

У нас основной стек был .NET, но как-то мы решили разбавить скучную жизнь, добавив в нее немного (как нам казалось) JVM –, а именно Scala. Зачем? Ну, потому что сама Kafka написана на джавоскале и именно на JVM доступны более высокоуровневое API — Kafka Streams. Разница в этих API такая же, как и между С и python для чтения файлов. В первом случае у вас будет открытие (и закрытие) файла, выделение (и освобождение) буфера, циклы и все остальные прелести низкоуровневого байтотреша. Ну, а во втором — простой однострочник. 

В кафка стримах топики предоставлены в виде, хм, стримов, из которых можно читать и, например, джойнить с другими стримами (топиками) по ключу. Или написать предикат и отфильтровать сообщение по критерию.

И вот написали мы какой-то код, запустили, а он не работает. Не ругается, но и не делает ничего. Стали копать и накопали интересное.

Чтобы по-полной познать и оценить интересность, давайте немного углубимся в такие понятия кафки, как ключи и партишены (partitions). Сообщения в кафке хранятся в топиках, а топики разбиваются на партишены. Каждый партишен это своего рода шард. Данные из одного топика могут разделяться по разным партишенам, которые могут быть на разных брокерах и, соответственно, обслуживать больше продюсеров и консьюмеров. 

Новички нередко путают партишены (шарды) с репликами (копии). Разница в том, что партишен хранит часть данных топика, а реплика — все данные топика. Эти две вещи не взаимоисключающие и в большинстве случаев у топиков несколько партишенов и несколько реплик. Партишены используются для повышения производительности, а реплики — надежности и доступности. Увеличение производительности достигается за счет горизонтального масштабирования консьюмеров, а при использовании рекомендованного подхода с консьюмер группами из одного партишена в каждый момент времени может читать только один консьюмер. Поэтому предел масштабирования — это количество партишенов. 

Логика партиционирования состоит в том, что данные по определенным признакам попадают в те или иные партишены. Можно, конечно, писать во все по очереди, как обычный балансировщик нагрузки, но такой подход не очень хорошо подходит под многие типичные сценарии, в которых нужно, чтобы сообщения, относящиеся к одной сущности (например, изменения заказа), обрабатывались всегда одним и тем же экземпляром консьюмера. Поэтому применяют разные хеш функции, чтобы по значению ключа определить партишен, куда оно должно писаться.

Это все начинает быть сложным, и тут снова спасибо разработчикам Кафки, потому что они все упростили. Да, при записи сообщения нужно указывать партишен, но в клиенты добавили механизм автоматического выбора.

Выбор этот делается с помощью так называемого partitioner. По сути это имплементация какой-то хеширующей функции. И есть даже partitioner по умолчанию, который просто работает.

Но вернемся к нашей проблеме. Оказалось, что в Scala и .NET клиентах разные дефолтные партишенеры. В нашем случае было два сервиса на разных технологиях, которые писали в один топик. И из-за этой разницы сообщения с одинаковыми ключами попадали в разные партишены.

Вывода тут два. Нужно проверять партишенеры по умолчанию, если у вас несколько сервисов пишут в один топик. А еще лучше проектировать системы так, чтобы в каждый топик писал только один сервис.

Timestamps

У каждого сообщения в Кафке есть поле timestamp. И логично было бы ожидать, что оно заполняется брокером в момент, когда сообщение добавляется. Но… не факт. 

Там есть возможность во-первых, задать это время явно, но еще и есть два варианта, что делать, если время не задано. Можно использовать время на стороне продюсера, когда он отправил сообщение, или же время на стороне брокера, когда это сообщение добавлялось в топик. 

Поэтому полагаться на timestamp сообщения в кафке нужно с осторожностью, особенно если продюсер топика не под вашим контролем. В таком случае лучше переложить сообщения в свой топик, и там уже устанавливать время как вам удобно.

Zookeeper vs Kafka

Кафка — довольно старый зрелый продукт (с 2011 года). За время ее развития некоторые API изменялись, а некоторые заменялись другими. 

Вот, например, для подключения вначале использовался адрес Zookeeper (который является необходимым компонентом Кафки до версии 2.8.0), а потом начали задавать адреса самих брокеров кафки (те самые bootstrap servers, о которых мы писали выше). Сейчас рекомендуется использовать именно bootstrap servers, но при этом подключение через zookeeper тоже работает и используется в некоторых утилитах. 

У нас была интересная проблема, когда консьюмер группа удалялась, но при этом по ней продолжали публиковаться метрики. Оказалось, что удаление группы происходило утилитой, которая подключалась к зукиперу, а метрики собирались экспортером, который подключался через bootstrap servers. И группа на самом деле и не удалась вовсе. 

Вывод — не используйте устаревшие протоколы, или как минимум не мешайте их с новыми.

Заключение

Вот такая получилась подборка фактов и заблуждений по Кафке. Очень надеемся, что статья поможет вам обойти грабли, на которые мы наступали. 

А с чем вы сталкивались? Напишите в коментах.

© Habrahabr.ru