Григорий Кошелев – А вы Кафку пробовали

Apache Kafka — распределённый программный брокер сообщений, применяемый в обработке в реальном времени данных большого объёма. К отличительным особенностям Apache Kafka можно отнести: надёжность, масштабируемость и высокую производительность. В докладе разберём основные архитектурные особенности и сценарии использования Apache Kafka. Рассмотрим неочевидные моменты и грабли, которые мы собрали на пути Востока.

vpm7ko62n5acusry8shwqjnavfc.png


Всем привет! Меня зовут Григорий! И сегодня мы поговорим про Kafka.

План у нас будет такой:


  • Вначале я расскажу для чего нужна нам Kafka, что мы с ней делаем.
  • Потом потихоньку начнем разбираться, как она устроена, т. е. введение в Kafka.
  • Далее по архитектуре отдельных компонентов пробежимся.
  • И самая интересная часть доклада — это какие-то неочевидности. По-хорошему здесь надо было писать «грабли, боль и страдания», но оставим немного интриги.
  • А в конце подведем итоги. Сделаем выводы и поймем: надо ли с этим что-то делать и как вообще жить.

Содержание:

[TOC]


Зачем нам Kafka?

В какой-то момент у нас в компании появился проект Vostok.

Это тот инструмент, который позволяет масштабировать и разрабатывать микросервисную архитектуру с минимальными затратами. Т. е. он конкретно под проект Hercules заточен. Он нацелен на то, чтобы со всех приложений, со всех тысяч микросервисов уметь собирать логи, метрики, распределенные трассировки сетевого взаимодействия между сервисами.

И также мы хотим универсальное какое-то решение получить, чтобы наши пользователи, наши проекты могли передавать бизнес-события через такую трубу.

А также в нашей компании Kafka используется для поисковых и рекомендательных систем.

gxjxsqkd9kcipghkkltoffl9jqy.png

Т. е. у нас есть много источников различных интересных данных, по которым можно строить интересные штуки, рекомендательные системы. Kafka где-то это все агрегирует. На другом конце находится Spark, туда заливаются данные, вычисления происходят. А также поисковые движки могут стоят на другом конце.

Для первого проекта у нас используется Kafka версии 2 и выше, т. е. самую последнюю версию используем.

Почему мы это можем делать? Потому что у нас там Java stack.

Для рекомендательных систем используется Kafka версии 0.11, потому что нет клиента ни для Python, ни для DotNet для Kafka версии 1 и выше. У нас в компании основной stack — это DotNet, поэтому ребятам очень тяжело. И поэтому они старую Kafka использует.

В каждой версии Kafka есть свои баги, свои проблемы. И они либо перетекают из версии в версию, либо все-таки фиксятся. Может даже что-то меняться в архитектурном плане. И к этому надо быть готовым при апдейте своей версии.

tyhqjns1kkqk_8zsgch53cthy9a.png


Давайте разберемся, что такое Apache Kafka


Введение в Apache Kafka

Для тех, кто не поднимал сейчас руку, поясню, что у Kafka много компонентов. Первый из них — это producer. Producer — это тот, кто создает кучу сообщений и отправляет их куда-то.

ubg0ugd0m-sqivf_onuk2ujfcbm.png

На другом конце находится consumer. Они употребляют эти данные, что-то с ними делают.

nftugg6gqwaiyjs6upzcdsrceh4.png

Передача осуществляется через кластер. Там у меня было написано, что Apache Kafka — это распределенный брокер сообщений. Получается, что у нас есть в кластере куча брокеров.

t1xro7pxrytxinxqkkpbimmzvoo.png

У нас в кластере есть куча брокеров.

pjx9tbzyafuw3dshtf_txd0f6w0.png

Передача сообщений осуществляется через них.

teo3xbaqx_k7opfqei2fyslxuns.png

И там брокер выступает таким звеном, который позволяет от producer к consumer не напрямую данные передавать, а через такой топик. Опытный слушатель может сказать, что это обычный Message Queue. Обычный, но не совсем.

uvmtwrvobiujc82_73lwif9qhyq.png

Там используется Publish-Subscribe, т. е. у нас просто продюсер пихает сообщение в топик, а consumers могут подписываться на них. И одни и те же сообщения могут читаться разными consumers.

И важно, что для чтения используется poll-механика, т. е. брокер не должен говорить: «Вот, consumer, тебе новое сообщение. Забери его». Каждый consumer должен прийти к Kafka и сказать: «Есть что-то новое?». И она отдает данные.

Такой подход лучше масштабируется, чем, когда сам брокер начинает вливать данные в consumers.


Архитектура Apache Kafka


  • Topic
  • Broker
  • Producer
  • Consumer

Мы увидели 4 важные вещи. Это топик, брокер, producer и consumer. Давайте по ним и пойдем в таком порядке.


Архитектура Kafka Topic

lclvtvss9ze4lsby07cnf_mufpu.png

Топик — это логическая единица, которая связывает между собой producers, consumers. И есть какое-то физическое хранение. Каждый такой топик — это множество партиций.

В данном случае у нас топик с тремя партициями. В них записано сообщение. Что при этом важно?

xfwy3zapgfg5bqbgfktv5w8tywq.png


  • Сообщения всегда пишутся в конец партиции.

gpnedg_hcdyhlgxzmw23ejtkax0.png


  • Писать можно параллельно в несколько партиций одновременно. Они между собой никак не связаны, физически это разные вещи.

p_f3jhcfo6oze8bus5qinoufrzk.png

При этом у каждого сообщения есть свой номер, свой offset. Каждая партиция начинается с номера 0.

xeynd5qzs2yauubnaahyxvmqcee.pngИ там они увеличиваются на единичку. Offset = 0

j-iitfxwubvkrr4utrrfqmw-rwu.png

Все вроде бы просто.

_gg7wnqgh451dmnynvtmmxndsmc.png

Но партиция может стать очень большой. Она может начать весить десятки, сотни гигабайт. А видео одного такого файла не похоронить, поэтому каждая партиция делится на сегменты.

Вот у нас такая длинная-длинная партиция.

ex9f761cnm1xskbb9jckzp7egi8.png

Она делится на кусочки.

zi_1bhusjxfss4de2j6sbgjgvbq.png

Они примерно одинакового размера, т. е. 1 сегмент, 2 сегмент и т. д.

x7bpoxxqid7uv6dgz6kjzm503ju.png

И есть последний сегмент.

w_e25rztejxibtppojdzwo1r2kw.png

Последний сегмент еще по-другому называется активным сегментом. Это тот сегмент, в который можно писать данные. Благодаря этому сохраняется правило, что пишем всегда в конец партиции. Это логично.

mckshvwj4-x04la0kbtbdefxety.png

У нас каждый такой сегмент начинал с некоторого сообщения.

Вот такой базовый offset, с которого начинается каждый из сегментов.

rzsnapivie1eeaeoexbu65rpass.png

Он состоит из четырех вещей. Про базовый offset уже понятно.

gfimjd1pqvdl67ekvw3griftzei.png

Он используется для того, чтобы называть файлики, которые лежат в файловой системе. И также data, index, timeindex — это три файла.

shobnsawvfswghh37uhtwybhzka.png

Data — это наши данные, т. е. есть какой-то здоровый файл. И в нем лежат сообщения. Сообщения могут быть разного размера. Kafka все равно, складывайте, что хотите в нее.

kdqpsge229mnrmsw8o9804ehfvy.png

Что такое index? Если мы хотим найти какое-то сообщение по смещению, то надо уметь это делать быстро. Как раз index позволяет это делать.

ctxe5d792bkbogcsluydh6v7r_w.png

Он выглядит следующим образом. Каждая запись в index занимает в общей сложности 8 байт. Там два int: relative offset, positon.

Relative offset — это смещение от начала сегмента.

Берем offset сообщения, вычитаем базовый offset, получаем вот это число.

Сообщений в партиции могут быть миллиарды, соответственно базовый offset будет большой и не влезет в int. Но у нас сегмент небольшой, поэтому относительный offset влезет в int.

Тоже самое с position. Position — это просто физическое смещение данного сообщения в этом лог-файле.

up4uspjkjcg1dzjusgni97u1nsi.png

Получается, что у нас растет relative offset на единичку, позиция смещается относительно размера предыдущих сообщений.

g22ycll6yvax5ohnaytpsglffke.png

Он потихоньку растет-растет.

yxlb5p4rgcrvhbzgoeqebnzuj34.png

И так можно будет потом искать по этому индексу. Мы знаем номер сообщения, который мы хотим найти. Быстро через базовый offset находим relative offset и в индексе смотрим его позицию, и можно забирать.

Понятно, что там не все сообщения хранятся, а только какие-то выборочные. Там есть шаг, с котором их можно сохранять в индекс. Это все настраивается в Kafka.

3w5o5ofw4ljhtcf9gfeq-gpaiza.png

Похожим образом устроен timeindex. Для чего он нужен? Каждое сообщение в Kafka имеет какую-то метку времени. И неплохо бы уметь искать по этой ветке времени. Timeindex решает эту задачу.


Архитектура Kafka Broker

v6lsnljaa4v5f3hzo7n2z4va4h4.png

Вернемся к кластеру, к брокерам. Кластер — это множество брокеров.

Один из них отвечает за контроллер.

c3tdmivdhbpobugqqlgmf-hlqcq.png

Он координирует работу кластера. О его назначении мы поговорим чуть попозже.

fpveu6_gfolkpmheg94e9axg2ya.png

У нас есть топик, который состоит из нескольких партиций. В данном случае мы создали топик на кластере из трех брокеров. И партиции распределились по кластеру.

При этом Kafka надежная, у нее есть replication factor для каждого топика. И мы можем сказать, что он равен трем.

vymqsacqthjfnlp_reths2_g5wi.png

Это означает, что каждая партиция должна иметь три копии. У нас три копии, поэтому на каждом брокере будет по копии.

Вот у нас 4 партиции на одном, на втором и на третьем.

При этом Kafka позволяет добавлять новые партиции к топику.

rep2ujtlnlo-wodfyfgvyin1wkw.png

Поэтому, если у нас данных станет много, можно увеличить единицу параллелизации.

Партиция — это единица параллелизации, поэтому можно добавить еще одну. И там producers стало больше, которые могут писать с той же эффективностью.

Теперь поговорим о роли контроллера. Дело в том, что контроллер должен назначить лидера. Каждая партиция должна иметь своего лидера.

2hrulz42siengjdhwk2y7wnwioc.png

Лидер — это тот брокер, который отвечает за запись в конкретную партицию. Не может быть у одной партиции несколько лидеров. Иначе каждый из них что-то написал бы свое, а потом это никак нельзя было бы соединить в какую-то одну последовательную партицию.

yrdrbbza19dyi2ahmejiudhe5nk.png

И каждый брокер может стать лидером у некоторых партиций.

gzvcochiye6vaed-suph_mlriey.png

В данном случае получилось так, что у нас на одном брокере лидерство двух партиций. На одном всего одна.

Kafka старается это равномерно распределять по кластеру и балансировать, чтобы не получилось так, что один брокер пыхтит и в себя все пишет-пишет, а остальные отдыхают. Чтобы такого не было, она равномерно пытается распределить.

cjxk_uylpbmo_cik3qnrmu5fqmq.png

Что происходит дальше? Данные, которые пишутся в лидера, должны быть распределены по репликам. Те, кто в себя тащат данные из лидера, называются follower. Они следуют за ним и потихоньку подкачивают к себе данные.

jscrd4-pdeqoidsp0x5spuncuc0.png

После того, как они сохранили в себе все данные, они становятся крутыми, они становятся in sync replica. Это реплика, которая синхронизирована с лидером.

wt8xwdm75gtgeapug2iaoafkdi0.png

И в идеале у нас весь кластер должен быть синхронизирован, т. е. все реплики должны быть в списке in sync replica.

Понятно, что какая-то из реплик может выпасть. Если это follower, то ничего страшного? Почему? Потому что follower ходит в лидер за данными. И подумаешь, если один из них перестанет ходить.

w2uuij2wt7bg2b3ls88r9c4czsu.png

А что произойдет, если сам лидер пропадает?

У нас был лидер в брокер 3 на партиции 2, а потом пропал. В этот момент мы данные писать не можем, потому что лидера нет.

o3j6phntbx5vr6em7m_nohgkioi.png

Kafka не теряется, она выбирает нового лидера. И за это отвечает контроллер. Все, отлично.

r8vvfumalk2y_07fb4-ajk08zni.png

Теперь с него можно реплицировать данные по другим брокерам. Но в партиции старый лидер может ожить.

bpkzvsl_6-pte6qosfiifgs8dus.png

Ожил, у него все хорошо, но он успел отстать по данным, потому что лидерство сменилось, появились новые данные.

glojejr1fujhdhd_yjrbqvxoonm.png

И поэтому он вынужден уже реплицироваться с нового лидера. Он это сделал. Кластер восстановился. И все хорошо.

ic_bziozp1pbhk3mlpabhokb-h4.png

И потом в Kafka случается магия, которая — раз и возвращает лидера обратно.

yccxlptwojwmgsgscygylzhdauo.png

Для чего это сделано? Если мы последовательно будем перезапускать ноды, мы можем взять и согнать лидерство на одного брокера. Это не очень хорошо. Он будет отвечать за всю запись, а остальные брокеры будут отдыхать. И чтобы этого не было Kafka периодически умеет это дело перебалансировать. И у нее есть куча настроек, которые это дело регламентируют.


Архитектура Kafka Producer

ojeidus0mcdhhaha_oh3anbag8y.png

Теперь пойдем к продюсеру, т. е. поговорим о том, как данные пишутся.

Начнем с сообщения. Сообщение можно представлять, что это пара ключ, значение.

При этом для Kafka и ключ, и значение — это просто массив байтов, в который можно писать все, что угодно. Продюсер должен знать, что он туда сериализует. И consumer должен знать, что он туда десериализует.

vi9fxu5nyhjoucoofyf5mpwkzrg.png

Зачем нужен ключ? Ключ используется для определения номера партиции, куда положить данные. Используется MurmurHash в том случае, если ключ есть.

rxerhzrihazfqpmeq8h_vc7ip64.png

Если ключа нет, то используется round robin, когда продюсер перебирает партиции по кругу, т. е. дошел до конца, начинает с нулевой и т.д.

И при этом важный point: куда положить данные решает продюсер. Т. е. не брокер как-то там сам определяет и сохраняет, а именно продюсер. Продюсер должен всегда писать в лидера. Соответственно, он пишет в конкретного брокера, в конкретную партицию. Это важно.

Про ключ я уже сказал, что Kafka это интерпретирует как массив байтов и так же сохраняет в лог, как и другую метаинформацию. Другая метаинформация — это offset, timestamp и т. д.

hvzgfarfz4qxquejyxr3ipmzlqk.png

У нас есть одна партиция. И у нее три реплики. Они синхронизированы.

Одна из них является лидером. В каждой по 9 записей.

qblon2vban8p4sslscy_loztjzq.png

Теперь посмотрим, как работает продюсер. Есть такая штука в Kafka, как acknowledgement, т. е. подтверждение записи. Продюсер должен убедиться, что данные записались

eaklhtbkvltok8loh1qgp4nnwms.png

Нулевой уровень предоставляет нулевые гарантии. Как это работает?

m-sceq1qz_iobyz493gxz-elxta.png

Продюсер начинает писать сообщение в одной из партиции. Он передал все данные и говорит, что у него все хорошо.

tdchjts3i5acmisfcappb43ze1k.png

Ему не надо никакого подтверждения.

При этом данные могли записаться в Kafka, а могли не записаться. Там могла быть какая-то проблема. И лидер в реплике 0 мог не сохранить данные. И тут уже может быть потеря данных. Это нужно понимать.

xqc-arwtswus3sbp-irxxx0qyfc.png

Поднимаем уровень. Уровень гарантии стал 1.

lmq2948untc4q_dtmafwbytof0y.png

Что изменилось? Теперь продюсер пишет данные. Он передал все данные в Kafka. Kafka пишет. Он ждет.

jst4xncyzh9krnazzq9ifh6fzpe.png

Запись произошла успешно на брокера. Брокер вернул подтверждение, что сохранил.

2n9pitrga236ftjhi-n6cp7ifnu.png

После этого продюсер считает, что все хорошо.

Какой следующий шаг?

vmsyvipnoyodraqd2y4qjkbi58c.png

Followers приходят к лидеру и говорят: «У тебя есть новые данные?». Он говорит: «Да». И они это забирают. А могли и не забрать. Это важно. Об этом мы, может быть, позже подробней поговорим.

sd1jj1tbwfiitp0uqnniufxkdj8.png

И, наконец, максимальный уровень. Уровень all, когда идем на все.

tckgfj3mg2adn_tajv8kw93i2ng.png

Как это работает? У нас продюсер пишет данные в лидера.

zrjcdgzuek8xwjbnpfzjet19x9s.png

Лидер сохранил, но ничего пока не говорит продюсеру.

obr-_dgwbmc7y82ajzitadyffpw.png

Дальше followers фоном приходят за данными.

wc0kdza1zjucvpmzhutcg9plqbc.png

И начинают их фетчить с лидера, пишут к себе. Записал первый, записал второй.

ffru9kbx6g9zvyto-tsvkaex9r4.png

Брокер, который лидер, понимает, что данные записаны на всех и говорит продюсеру, что данные записались.

tvbmdeztf0oaol8dotpwieuvlz8.png

После этого продюсер считает, что все хорошо.

И тут появляется важная настройка появляется. Это min.insync.replicas.

yicesyc39kkngazcujprszzvrfs.png

Сейчас это равно 3, поэтому лидер уведомляет продюсера, что данные записались не меньше, чем в эти 3 реплики, т. е. в себе + еще 2.

Эту настройку можно понизить, тогда можно дожидаться не всех, а, допустим, все-1.


Архитектура Kafka Consumer

mxwhjpfs3kbqxhcmx4ewzq92w3g.png

Теперь посмотрим, как работает consumer. У нас есть несколько партиций. Пока consumer будет читать из одной партиции. Он еще ни разу не читал, он читает с самого начала. Вот он прочитал какое-то количество сообщений.

5sjzml1lufuely1wirfnflhtf0u.png

Потом перешел к следующему моменту, и так постепенно дочитывает.

fwyncawvzyoaszol3lvmg5igba8.png

3s55g5oti5ouva95hawwa-yucbw.png

gnc9tiwjvsciubx7yjtqvhayzqa.png

5f_tjlaibos9ktppf7r79j2cnsq.png

Также он может читать из нескольких партиций одновременно.

jbxoggslhvxppiqhjc_chdmqdpy.png

Как это выглядит?

mqqmvcra2nwmytdruemr1cj3nqm.png

Он на каком-то месте остановился и начинает забирать данные.

nh_krqm736hbvvgwcxoeeis04m8.png

Он забрал данные.

mhefjwlzsi7ocsxc4nkpfzfxoxu.png

Пошел дальше-дальше и дошел до конца.

1mfpfpvb0qfwwvwii613dpvlydc.png

tnv5hepdyfs42sbqtng6smzizgc.png

3_6agm4qxskrjdv7ruzadxhavqc.png

8qpf0uvwv7g1y9y6u3neg-hqpaa.png

А в version polled сообщений в Kafka еще нет. И он будет продолжать делать это до тех пор, пока данные новые не появятся, и он их не получит.

Но есть маленькая тонкость. Когда вот это он делал, он запоминал место, с которого надо читать дальше. Он мог и перезапуститься, мы могли прибить процесс. И эта информация потеряется. Это понимали и те, кто делали Kafka. И поэтому добавили такую фичу, как commit offset.

x8m3wlazxqbvoezriz7_d9sx5m8.png

Что это значит?

pzkxytdlspsk0q_vvebzb1cfkmm.png

У нас есть consumer, он прочитал несколько сообщений.

jnzbrcwkni1xhoxwzx9ldfehgri.png

И то значение, с которого надо начинать дальше, он коммитит. Он говорит Kafka: «Я дочитал до вот этого и с этого места я хочу потом продолжить». Соответственно, он потом продолжает и читает.

b-ho9qyn05fd62wt-ttrvyi3ev8.png

Потом у него могло произойти что-то и он отвалился.

veo5yyev88okguxihy2pqiklnho.png

И при этом он не успел закоммитить данные. Т. е. вот эти все сообщения считаются не закоммитченными.

5vqaan78klxnwdwxjn5j5yak4qs.png

И когда consumer вернется в работу, он начнет ровно с того момента, на который в последний раз закоммитил.

iuq4quuutm5lkzhki5bqr_u0o7u.png

И с него благополучно начнет читать.

xgbalp9_ba5drlulvsgbxunpiyy.png

Кажется, что все логично.

Мы в какой-то момент начинаем писать много данных и нам надо уметь масштабироваться.

ocg5c_vj6gp7cj7hj9hups6ddto.png

У нас есть consumer, который начинает уже не справляться с чтением всех данных.

3zoulavvkihrvmmqspmwiwsi8vq.png

Тогда мы поднимаем еще один. И как бы мы могли это сделать? Мы могли бы сделать так, что один consumer читает из первых двух партиций, второй consumer читает из другой партиции. Но этим очень трудно управлять ручками, поэтому в Kafka предусмотрена вот такая штука.

hprot1o3e05t_wjgjvaaykpf1ty.png

Можно взять и объединить набор consumers в группу. И у них там появляется общий идентификатор.

2rjy0ba8cfvvs7z8u-afxqrhsjg.png

И когда они работают, они между ними автоматически расбалансируют все партиции и начнут читать.

ljorzfl-egysxhmbkuin5qwori0.png

У них там был какой-то offset, с которого они должны начать.

m2ku3t5qzsj7dw-ndznr83s7t7w.png

Первый почитал, второй начал читать.

yzbx3lgwwj1cgd3fvabni_enwxe.png

И второй — бах и упал, и не докоммитил что-то.

urbntgwbwvv1iahndmucjuswg_i.png

Что произойдет? Во-первых, надо начать снова с первого сообщения.

ayfusb2eeaxchoewc2ln7qvf-ns.png

Произойдет перебалансировка в этой группе.

ozndsw1uug9vbrsulhufjn5m-qo.png

Единственный живой consumer в данном случае подхватит эту партицию.

a8uvvdg9hciblkrs9f4zsefvz6g.png

И уже сам начнет читать данные.

peakvc950rhf44ns645unofhqva.png

И так же коммитить.

60if7l20pvsyp1ejx03dwvytx0y.png

А потом consumer ожил, все у него хорошо. Он снова присоединяется к группе.

qed1q6eno0bnpm7ycvxtdljgte0.png

И получает какую-то партицию, с которой он может читать данные.

Кажется, что все очень просто, все хорошо работает. Там очень клевая архитектура, все очень быстро.


Неочевидности Kafka

Но потом мы начинаем все эксплуатировать и сталкиваемся с реальностью.

И поговорим о том, что мы пережили за год эксплуатации.

Но честно скажу, что это очень-очень краткое изложение.

Я постарался выбрать показательные примеры того, с чем приходится сталкиваться разработчикам и не только при работе с Kafka.


Настройки — Как разломать кластер

Давайте начнем с настроек. Как разломать кластер? Его можно разломать разными способами. Я предложу один.

Есть настройка в Kafka. Называется log.dirs. Как можно догадаться из названия, это папки, где лежат данные.

smdddjjno_uopl7kebcbgkj3tmm.png

У нас есть два брокера. У каждого из них по реплике, по какой-то партиции.

f-gjoyax9-em4ldidfjxyfgs2_k.pngОдин — не очень, он не успевает за лидером. Лидер в этот момент был нами, например, остановлен, перезагружен.

avck2uzyxq2diiahfdfr5ufdu-q.png

Что произойдет?

7sdezhivj7dbgptyjte1eu45d6o.png

А реплика 1 ожила, у нее все хорошо. GC прошел долгий.

И вот какая ситуация. У лидера данных было больше, чем у последователя. И в Kafka может произойти такая штука, как грязные перевыборы. Вроде бы ему нельзя быть лидером, потому что данные потеряются.

bnt-geijld9841bjznvkcdfvvlq.png

И есть вот такая настройка — unclean.leader.election.enable=false. Она по умолчанию такая с 0.11 версии.

KIP-106 — Change Default unclean.leader.election.enabled from True to False (0.11)

pvt55idvf3ht_0pqxvsuklj45sy.png

Теперь всё. Теперь партиция не доступна на запись. Надо ждать. Чего мы ждем? Мы ждем, когда вернется старый лидер.

А мы же эту настройку поменяли. И сказали, что теперь у тебя данные лежат в другом месте. Это что означает? Когда брокер поднимается, у него партиция пустая. И теперь кластер видит вот такую ситуацию. У реплики 1 какие-то данные есть, у реплики 0 их нет.

Но как мы помним, мы ждали возвращение реплики 0.

4j0sedmyhcdiduw7o8qililnlqc.png

И поэтому реплика 1 говорит, что я так не могу больше. И брокер упал.

А теперь представьте, что таких партиций было много на самом первом, который был перезагружен с новым каталогом. И получается, что все остальные брокеры упали, один остался.

zy3gagups6-glsrbymboxvgevt4.png

https://issues.apache.org/jira/browse/KAFKA-3410

В Kafka заведен не один баг по этой теме. И кажется, что он исправлен в версии 1.1.

По крайней мере в этой ситуации вроде бы брокеры уже не падают. Но это не точно.


Настройки — Настройки по умолчанию

z-cjysuo8lso8lkefuhfsib2jsi.png

Поехали дальше. Теперь мы поговорим про настройки по умолчанию. У нас по умолчанию default.replication.factor = 1.

И понятно, почему. Потому что, когда мы только установили Kafka и начинаем ее пробовать, мы поднимаем на своей dev-тачке всего одну ноду, поэтому, если бы default.replication.factor был каким-то другим, то мы не смогли бы ничего себе создать. Это логично.

Хорошо, всегда при создании топика будем указывать replication.factor, который мы хотим.

szf3d4sfsec9cansocvr4cd-rhw.png

Но есть второй подводный камень — это опция auto.create.topics.enable = true.

Это по умолчанию такая настройка. Что она означает? Если мы создаем топик со своими настройками, то все отлично, но если кто-то пытается писать к нам в несуществующий топик сообщение, то Kafka думает: «О, что-то новенькое». И создает нам топик. И создает топик с настройками по умолчанию, т. е. с replication.factor =1. И мы увидели у себя в production, что часть топиков с нормальным настройками replication.factor, а у некоторых топиков почему-то replication.factor = 1. И оказалось, что кто-то просто не дожидался создания и уже начинал писать данных. И из-за этого оно вот так ломалось.

Поэтому первым делом, когда вы развернули кластер, меняйте вот эти две настройки.

fh64pcim8bb5_yxsvoer_znos2g.png

Еще две истории с настройками. Я думаю, все согласны, что настройки у брокера, у consumer должны быть. Но они должны не просто быть, а еще должны быть согласованы.

Давайте посмотрим, что нам разработчики Kafka подложили в качестве дефолтных настроек.

01cc2tops6q33js8_sg1c-ypseg.png

Есть такая настройка, как message.max.bytes. На брокере она равна 1 000 012 байт.

Что она означает? Грубо говоря, мы с продюсера начинаем писать данные. И это максимальный размер сообщения с данными, который можно запихнуть в Kafka.

7xbn8oatxluajgtu2wq2nzva93k.png

Хорошо, на стороне брокера есть похожая настройка, только она равна мегабайту в другой системе. Мегабайт равен настоящему мегабайту, которому мы с вами привыкли. Для сравнения у consumer она точно такая же, как у продюсера. Дело в том, что consumer и продюсер, видимо, одни люди писали. А брокер кто-то другой делал. Клиент Kafka на Java написан, а брокер написан на Scala.

К чему это может привести? Если у нас на стороне продюсера есть большое-большое сообщение, то, естественно, мы это сообщение получили только в production. Оно чуть больше, чем один миллион байт, но меньше одного мегабайта. Т. е. с нашей стороны пуля вылетела, а брокер сообщение не сохраняет.

С этими настройками была еще одна забавная история.


Настройки — Умножение

q6loqfw4pu_a0rg2z-yynmm7rgk.png

Т. е. тут было большое сообщение, но мы в коде ограничение понизили. На стороне нашего кода мы не передаем большие сообщения. Соответственно, когда в Kafka упаковывается вроде бы должно быть все нормально.

Но в Kafka есть еще одна такая настройка, как batch.size. Она измеряется не в штуках, она измеряется в байтах. 16 килобайт — это пачка, которую Kafka-клиент может собрать перед тем, как отправить в брокер. Т. е. там не сразу первое же сообщение пуляет в Kafka, он набирает потихоньку, чтобы меньше сетевых запросов было. Кажется, логично. Все здорово.

Мы протестировали на своих данных. И оказалось, что вот это сообщение надо умножить на 10. Т. е. для нас 160 килобайт выдает нормальный performance.

Хорошо, все протестировали, все замечательно.

А потом мы выкатили это на production. И помнили, что это значение надо умножить на 10. И оказалось, что у нас уже 160 килобайт было, а мы еще на 10 умножить. И в production уехала эта настройка в чуть больше 1,5 мегабайт.

Что мы получили? Если вы думаете, что все развалилось, то нет. Все было следующим образом. Мы выкатываем, все хорошо работает. Никаких ошибок нет. У нас нагрузка потихоньку растет-растет и растет. И потом мы пересекаем какой-то невидимый порог и все разваливается. Производительность падает практически в ноль.

Что происходит?

4hvzaxcay-nfaudynn5aliue-hu.png

Дело в том, что в Kafka есть KIP-126 — Allow KafkaProducer to split and resend
oversized batches (0.11)

KIP (Kafka Improvement Proposals) — это такие предложения по тому, как улучшить Kafka, т. е. сделать ее прикольней.

И такой KIP в версии 0.11 выпущен. Kafka producer, если получает от брокера информацию о том, что какой-то большой пакет, он его разбивает и снова перенаправляет Kafka.

И когда нагрузка маленькая, то не успевают копиться пачки. Мы их отправляем, и они пролетают быстро. А когда нагрузка возрастает, пачки начинают расти. И мы отправляем пачку, которая чуть меньше одного мегабайта, потому что у продюсера такая настройка, но это больше миллиона байта, который может принять брокер.

Соответственно, брокер присылает ошибку. В логах ничего нет. Сам клиент решает переразбить. Он переразбивает данные и переотправляет.

И мы раз большую пачку отправили. Брокер отвечает: «Не пойдет». Мы разбили и еще отправили. Получается, что у нас сетевая активность очень сильно выросла. И мы по этой части тухли неожиданно.


API — Блокирующий send

co3ayfw8crx-yq-xmnhnympk8hi.png

Давайте перейдем к API, перейдем к блокирующему send. Почему я об этом говорю? Дело в том, что в Kafka есть крутой асинхронный API, т. е. все, что бы мы не делали с Kafka, она возвращает нам future. И код дальше продолжает выполняться. Но, как выяснилось, не всегда. Если метаданные не доступы, то продюсер send блокируется, т. е. он нам даже future не вернет, а просто остановится. Метаданные — это информация о том, какой брокер на каких топиках является лидером и у какой партиции. Это информация, которая нужна продюсеру, чтобы понять в какой брокер записать данные, т. е. в какую конкретно партицию. И как раз вот эта метаданная периодически обновляется. В кластере лидерство меняется, поэтому его надо периодически обновлять. Заблокироваться он может вплоть до 60 секунд.

Такое значение по умолчанию. Если каким-то с кластером были проблемы, то мы даже фьючу не сможем получить. И свой код, который написан с той точки зрения, что у нас от Kafka все асинхронно, просто встает на достаточно большой промежуток времени.

f56s9crwcps6v7lktnfdloesucm.png

KIP-286: producer.send () should not block on metadata update (discuss)

Кто считает, что так недолжно быть? Все верно, есть люди, которые считают, что это не нормально. И есть люди, которые сказали: «Давайте сделаем нормально». И этот KIP в статусе discuss. Это означает, что там какие-то обсуждения идут. И неизвестно, когда эта штука появится в Kafka и когда не появится. Но живем дальше, жизнь на этом не заканчивается.


API — бесконечная десериализация

ecwse6mnboj7lincg8hivze6uxs.png

Давайте посмотрим, как на другой стороне. У нас был продюсер, а сейчас посмотрим, как на стороне consumer. Я говорил, что у нас poll-механика, т. е. consumer приходит и просит у Kafka данные. Это означает, что типичный consumer выглядит примерно таким образом:

Есть такой бесконечный цикл, в котором мы берем и долбит Kafka. Получаем какие-то данные и дальше с ними что-то делаем. Все неинтересное,

© Habrahabr.ru