Kafka в условиях повышенной нагрузки. Артём Выборнов (2017)

kfufphgwbcht75siqrdkjoxmwz8.png

Kafka — распределённый брокер сообщений, нашедший широкое применение как универсальная шина для больших данных. Kafka позволяет как реализовать realtime-обработку большого числа событий, так и построить батчевый pipeline по доставке логов.

Почему мы используем Kafka? Если коротко — унификация. А если чуть подробнее — десятки поставщиков, терабайты логов каждый день, онлайн- и офлайн-pipeline’ы — без единой высокопроизводительной шины данных с этим крайне сложно совладать.

Из доклада вы узнаете о том, почему мы перешли на Kafka, и как она вписалась в наш pipeline. Поймёте, как обеспечить exactly once доставку данных. Узнаете о том, как из-за одной опечатки в несколько раз выросла нагрузка на Kafka, и что мы из этого выяснили. Выясните, какие метрики Kafka стоит мониторить и как по ним понять, что что-то идёт не так.

Видео:

Доклад 2017 года. Версия Kafka 0.8.2. Сейчас реалии уже могут немного отличаться. Но основные концепции остались такие же.

Когда я кому-то говорю, что работаю в Rambler, люди отвечают довольно однотипно. Вторым по популярности ответом является: «О, я знаю Rambler. Это поиск». На самом деле Rambler — это уже давно не поиск. Мы сейчас конкурируем за пол процента аудитории с Bing и это вообще не интересно.

lcoqzp7q9neexqpnl8j_ujqx9hc.png

На этом графике более интересно то, что Google, наконец-то, обогнал Яндекс.

Какой, по вашему мнению, самый популярный ответ на то, что я работаю в Rambler? Самый популярный ответ: «А Rambler еще жив?». На самом деле Rambler жив.

Сейчас Rambler Group это крупнейший в России интернет медиахолдинг. В него входят такие известные ресурсы, как: Rambler, Лента, Газета, Афиша, Чемпионат и много-много других. Аудитория составляет более 40 000 000 человек.

Преимущественно это медиаресурсы, на них заходят пользователи чтобы узнать новости, почитать статьи и это надо как-то монетизировать. Монетизируется это все за счет рекламы. Для того чтобы делать это наиболее эффективным образом, Rambler несколько лет назад создал подразделение рекламных технологий, в котором я и работаю.

kcqunuxjeut_opmddlsqiogr9pe.png

Мы собираем данные со всех этих сайтов с целью извлечь полезную информацию, чтобы как итог больше заработать на рекламе.

Когда мы собрали информацию, то, что мы с ней делаем?


  • Мы занимаемся сегментацией аудитории. К примеру, классифицируем пользователей по полу, возрасту. Определяем пользовательские интересы. Например, какие люди интересуются автомобилем Ford или кто собирается поехать в отпуск в следующем месяце. Находим аудиторию, похожую на аудиторию какого-то более маленького сайта.
  • Мы прогнозируем трафик.
  • Предсказываем CTR.
  • Строим рекомендации.
  • Делаем аналитику.
  • И многое другое.

Но прежде чем все эти полезные действия совершить, нам нужно получить данные.

dgifam-uphefx7b_b6f6ujm-nru.png

Для этого раньше у нас был целый зоопарк способов, на слайде приведены самые популярные из них.

bmicebcqlprq0ctv9rw3ccnbs70.png

Основой получения данных был worker. На самом деле их было много, просто здесь на рисунке изображен один. Он забирал на себя данные, локально их сохранял, а затем перекладывал в Hadoop.

tfwznhle0anme9z5ibnvsy1ohe4.png

Данные мы храним в HDFS и обрабатываем с помощью Spark и Hive. После того как они обработаны, нужно полученной информацией поделиться с окружающим миром — под это мы используем key-value базу Aerospike.

dudkcce-au7im-yxpiyek9zj1gw.png

Вроде бы все хорошо, но на самом деле в этом подходе кроется много проблем:


  • Во-первых, это зоопарк. Добавление каждого нового партнера — это часть решения абсолютно новой задачи как со стороны этого партнера (источника данных), так и со стороны нас (потребителя).
  • Во-вторых, worker здесь выглядит как один сервер. На самом деле это какой-то набор воркеров и каждый файл должен сначала пройти через одну из нод, где будет сохранен на диск, а потом уже выгружен в Hadoop. Worker — это узкое место.
  • В-третьих, таким способом нельзя реализовать real-time pipeline обработки данных. Предположим какой-то партнер стал присылать данные микробатчами по одной минуте. Мы их забрали и сохранили на сервере, переложили в HDFS, на HDFS обработали, выгрузили в Aerospike. Если всё пройдет идеально, то получим задержки в десятки минут, — ни о каком real-time речь уже не идет.

6fwbtuiahk4mxcowmxe0qszukxq.png

Что мы сделали? Все, что находится слева, выбросили и заменили на Kafka. Kafka — это брокер сообщений для больших данных. Он не обладает такой крутой логикой доставки сообщений, как RabbitMQ, он более простой, но при этом он более производительный при большой нагрузке. В Kafka могут писать много поставщиков и из неё могут читать много потребителей.

Но данные из Kafka надо каким-то образом доставить до HDFS, для этого мы используем Gobblin.

icoma2l-a6te5rkxjvqrngstsem.png

Это решение от создателей Kafka компании LinkedIn. По сути это Map-Reduce задача, которая крутится на Hadoop и перекладывает данные из Kafka в HDFS.

В дополнение мы получили возможность приблизиться к real time.

vgkq9rowjk_wdovwfmz0zageppe.png

Мы его реализуем с помощью Spark Streaming. Это такая задача на Spark, которая постоянно работает на кластере Hadoop (LLAP). От Kafka Spark Streaming откусывает микробатч информации, обрабатывает и складывает результат в key-value базу.

Самое главное преимущество такого подхода в том, что мы полностью минуем стадии записи и чтения из HDFS, все происходит в памяти. Мы с Kafka считали данные, обработали и положили результат в Aerospike.

Такой подход позволил нам получить задержку между наступлением события и обновлением соответствующей информации в Aerospike порядка одной минуты.

1_b1-xwib1simwojr3jtgbahyyg.png

Про что сегодня расскажу?


  • Во-первых, я расскажу, как приготовить Kafka и обеспечить с помощью нее семантику exactly once.
  • Во-вторых, я расскажу, как из-за нашей ошибки у нас выросла нагрузка на Kafka в 4 раза и о том, что мы из этого интересного вынесли.
  • В-третьих, я подытожу, рассказав, на какие метрики Kafka стоит обратить отдельное внимание.

olxspd-fhyfvxjstq4b_gxnkdhw.png

Для начала посмотрим, как устроена Kafka.

Kafka состоит из набора брокеров (каждый брокер — это отдельная машина) и ZooKeeper для хранения метаданных. На самих брокерах хранятся сообщения. В Kafka пишут producers, из Kafka читают consumers.

3ah_wm8_7zgi6ic-bhy4skngpq8.png

Базовым элементом Kafka является поток сообщений. Он называется topic. Топик состоит из партиций.

bpsepos4wrsyuxvu2x04rzcy1om.png

Каждую партицию можно представить как array list. Это последовательность сообщений, где все сообщения по порядку пронумерованы, причем чем сообщение свежее, тем больше у него номер.

rfwzhtk0h0pw-xwmaqgmpltwc-4.png

В партиции данные дописывает продюсер, в процессе записи сообщения шардируются.

5sv5qhnb82t-xasnczqtmgk_hec.png

Если вы ранее слышали про Kafka, то наверняка знаете, что одним из бонусов Kafka и отличием от других популярных решений является то, что она хранит данные на диске и их реплицирует.

Репликация в Kafka происходит с точностью до партиций. Репликация асинхронная: master-slave. Master в терминах Kafka называется лидер.

luwx0zfeso85_lzxhzk6brs7ddc.png

Consumers, producers всегда работают только с лидером, т. е. запись и чтение происходит только с лидера (master партиции). Затем данные с лидера aсинхронно доезжают до всех остальных реплик.

rglq7cgjnc3prieoznbvh6oz2vw.png

Все бы хорошо, но у асинхронной репликации есть типичная проблема — slave-партиция может отставать от мастера (на некую дельту). Это в целом не столь страшно, но стоит обратить на внимание на процесс переизбрания лидеров.

К примеру, наш мастер выпал, т. е. нода, на которой лежал мастер этой партиции, теперь недоступна. Kafka производит переизбрание лидеров и лидером становится какая-то slave реплика. Мы получаем потерю данных в размере дельты отставания, которая в общем случае больше нуля.

4af8hefy6hrwjvlfr1cztfyxp4c.png

В документации по Kafka есть 2 вида переизбрания лидеров: это clean и unclean. С первого взгляда кажется, что unclean — это какой-то плохой способ, и при нём теряем данные, а clean — это хороший, и данные не теряем. Но на самом деле это не так. Единственное между ними различие — размер этой дельты.

u3hm_czwyhctvn1hzwzvmvlhmek.png

Если slave в момент переключения отставал от мастера больше, чем на заранее заданное число сообщений, то мы говорим, что это unclean, а если меньше, то говорим, что это clean.

bnrkyk4i8fle5x4zpcxlmkcwsuw.png

Отсюда мы получаем важный инсайд — при переизбрании лидеров мы всегда теряем данные.

А также есть еще один неочевидный нюанс, который не описан в документации: переизбрание может происходить, когда у вас все хорошо. Кластер Kafka работает стабильно, нагрузка распределена равномерно, время ответа небольшое, а Kafka взяла и переизбрала лидеров, и следовательно мы потеряли данные.

Это грустно, а что же с этим делать?

ux7sakt7-jevklz0msstlbdbcee.png

Для этого пойдем дальше. Есть такой стандартный механизм в любой системе, в которую происходит запись данных -это возможность явно подтвердить факт успешной операции записи. Такая же функциональность есть и в Kafka.

Рассмотрим на примере. Если продюсер пишет в Kafka и этот параметр на продюсере был выставлен в ноль, то никакого подтверждения не будет получено.

9zfdh1hectn40lmqyswyz_sdona.png

Если параметр будет установлен в единичку, тогда вам придет подтверждение после успешной записи одной реплики, т. е. после записи в лидер (как мы помним продюсер всегда пишет напрямую в лидер).

ig7tfihm0czrctupbrcg2eluss8.png

Интересный момент. Если значение этого параметра -1, то Kafka вышлет подтверждение при успешной записи во все синхронные партиции. А что Kafka понимает под синхронными партициями — конфигурируется отдельным ключом на самой Kafka. Для каждого топика можно задать число партиций, которые Kafka должна держать в in sync. Если в insync стоит 1, то это никак не отличается от предыдущего примера.

wd5iw3udoje_wxsxrur6t6iuyt0.png

А если вы поставить insync.replicas = 2, тогда Kafka отправит продюсеру подтверждение, когда обе реплики будут успешно записаны.

Вроде бы все хорошо. В таком случае, если будет переизбрание лидеров мы не потерям данные, потому что по факту всё будет работать как синхронная репликация. Да, бесспорно, будет какой-то дополнительный overhead на продюсер, т. к. он будет дольше ждать ответа от Kafka, но это не столь страшно и в целом решаемая задача.

Более серьёзная проблема заключается в том, что если в данной ситуации одна из реплик выпадает, то мы вообще не сможем продолжить запись данных в топик. Это происходит из-за того, что Kafka ждет пока не будут записаны две реплики, а доступна лишь одна.

4xpesttbddpc1oclduxyl3el5ie.png

Это довольно серьёзная проблема. Сейчас я опишу подход, к которому мы пришли, насколько он хорош — уже вам решать. Мы держим для важных топиков insync.replicas = 2, а replication factor = 3. При таких настройках если у нас одна из insync реплик выпадет, то оставшаяся реплика за конечное время догонит лидера, и продюсер сможет продолжить запись в Kafka.

stfoitlhsyv4ehrltbnjcptw_jq.png

Одним из фундаментальных понятий Kafka являются отступы.

Как вы помните, партиция представляет собой набор пронумерованных сообщений. Отступ это номер какого-то сообщения. Среди отступов есть два особенных:

Latest offset — это номер самого последнего добавленного в партицию сообщения.

Earliest offset — это номер самого старого доступного сообщения.

Kafka хранит данные не все время, а за какой-то период, который вы задаёте в настройках, и в фоне удаляёт слишком старые данные. Поэтому earliest offset постоянно движется вперед. По сути, между earliest и latest offset, мы получаем окно, которое постоянно движется вперёд.

fmdeihpdeo07tiq6kpjbb4uobcu.png

Предположим у нас ранее отработал consumer и зафетчил данные вплоть до какого-то отступа, т. е. данные от earliest offset до offset у нас уже были зафетчены и куда-то сохранены. При следующем запуске нашего consumer требуется забрать выделенный розовым кусочек данных с offset до latest offset и обновить отступы (то есть значение отступа до которого мы выкачали данные в предыдущую итерацию).

Здесь есть важный нюанс в том, как мы производим процесс обновления отступов. От этого зависит с какого момента, при следующем запуске, consumer начнет работу.

c1zinwvadfzuttgntan4f5nukfu.png

Для этого существует несколько способов, если вы выбираете consumer для Kafka — это первое на что стоит обратить внимание.

Подход по умолчанию — автоматическое сохранение. Но на самом деле он самый ненадёжный.

Как он работает? Consumer фетчит данные из Kafka. Kafka периодически до него стучится с целью проверить: жив он или не жив. Если он жив, то Kafka сохраняет соответствующий отступ.

В чем здесь проблема? Рассмотрим пример: consumer зафетчил данные, Kafka убедилась в том что он жив и сохранила отступ, а он, не успев записать данные на диск, упал. Мы получили потерю данных.

Может быть и другая ситуация. К примеру, consumer успел записать данные на диск и упал, Kafka пришла проверить жив ли он, а он уже мертв. Kafka отступ не сохранит, а мы получим дубли в данных. Т. е. такой подход не способен обеспечить никакую семантику доставки.

yd1py1sxbbl5jbeidltdoujkwty.png

Следующий способ — это сохранение вручную. Он работаем следующим образом: consumer фетчит данные из Kafka, после скачивания очередного батча, он явно отправляет следующее сообщение: «Kafka, я этот кусочек данных успешно скачал, сохрани отступы». Выглядит надёжнее чем предыдущий способ, но это обеспечит только at least once семантику.

At least once — это такая семантика, при которой мы данные не теряем, но могут быть дубли.

Почему такое может произойти? Рассмотрим аналогичную ситуацию: предположим, мы сохранили батч данных, но, когда consumer начал отправлять уведомление об этом Kafka, кто-то дернул kill -9 и consumer благополучно упал, не успев сообщить о том, что нужно было сохранить отступы. Получили дубли данных.

На самом деле эта семантика не такая плохая. Большинство pipeline по доставке данных работают как раз в at least once семантике с последующей дедупликацией.

Но мы гонимся за пресловутой семантикой exactly once. Как её обеспечить?

x6jee3efhw5urlk0lbflnwbwfoo.png

Единственный путь обеспечить exactly once на чтение — это хранить данные и отступы вне Kafka. При этом сохранение данных и отступов должно быть атомарно. Если оно будет не атомарно, то мы можем получить аналогичную предыдущему примеру проблему: мы сохранили сколько-то данных, а отступы потерялись. И на следующей итерации мы повторно эти данные выкачаем.

Если у вас в качестве целевой базы данных выступает полноценная база с транзакциями, то атомарность обеспечить не сложно. В нашем случае — это HDFS, а там транзакций нет.

Я вам расскажу решение этой проблемы от кампании LinkedIn. Оно используется в Gobblin для обеспечения exactly once семантики при фетче в HDFS.

sn7fqqf7l98gygxl-ixdyxcmj0s.png

Предположим, у нас отработал consumer и сохранил 2 файла с данными и файл с отступами в какую-то временную папку. Затем их надо переместить в production директорию. Важно отметить что move (mv) одного файла в Hadoop атомарный.

Это перемещение разбивается на набор атомарных операций и сonsumer пробует их по очереди выполнить.

Предположим, первый отработал успешно, а на втором мы упали. Тогда при следующем запуске consumer«а, он первым делом посмотрит в эту временную директорию и узнает какие файлы еще не были перемещены. Также у него есть информация о том, что должно было быть перемещено и куда. На основе этих вводных он попытается завершить процесс.

Если у него это получилось — значит мы закончили предыдущий fetch данных, правда в начале следующего fetch«а. Если не получилось завершить, то вся необходимая информация для полного отката изменений также присутствует. Таким образом можно получить exactly once даже при сохранении в базах данных без транзакций.

gtksqod7lkvxgk9bbfjandaijd8.png

Вроде бы все хорошо:


  • Producer получает ответ об успешной записи данных.
  • Kafka реплицирует данные и данные внутри себя не теряет.
  • Мы фетчим данные в HDFS с семантикой exactly once. Exactly once — это семантика, когда мы забираем данные без потерь и без дублей.

rdghzezdf0opjyi_7zgqhmozxzo.png

Но у нас начались проблемы. Поговорим о том, как они проявили себя.

Во-первых, они начали проявляться в запаздывании логов. Большая часть pipeline — это обработка дневных логов: мы накапливаем данные за день и затем (на следующий день) запускаем их обработку на Hadoop.

Если данные за вчера доезжают сразу после полуночи (в 1–2 ночи — на слайде зеленая отметка), то все зависящие от них pipeline успешно отрабатывают.
Но если они доезжают с бОльшим опозданием, например, после обеда или тем более с опозданием на день, то у нас начинаются проблемы.
Мы не успеваем вовремя посчитать требуемые pipeline, метрики, аналитику и прочее. В общем, данные стали к нам приезжать с большИм опозданием.

Во-вторых, у нас появились дубли сообщений. Эта пресловутая семантика Exactly once, которой мы так дорожим, оказалась нарушена.

nrjmd3f0ywjejxsjvudww_84mpw.png

Кроме этого, мы заметили очень медленную работу consumers на микробатчах (в real-time пайплайнах): минутный батч данных из Kafka фетчился по 2–3 минуты.

Также consumers стали падать с ошибками, при этом все из них были довольно однотипны: я не могу достучаться до Kafka, я не могу получить важную информацию от Kafka, etc.

ctjywahlsej9xtlqbyjpdjbooda.png

Для последних двух проблем существовала общая причина: Kafka стала очень медленно отвечать на простейшие запросы — по несколько минут. Точнее отдельные ноды Kafka стали очень медленно отвечать на эти запросы.

Если фетчить информацию большим батчем (к примеру раз в полчаса), то эти пару минут задержки будут просто не заметны. А если фетчить каждую минуту, то эти две минуты приведут к серьезной деградации pipeline.

zpxepat7itqwcntmtkwnjbmt-fc.png

Первым делом мы посмотрели в мониторинг и увидели, что нагрузка на чтение из Kafka выросла в 4 раза.

Нижняя красная линия — это пиковая нагрузка до этих проблем. Верхняя красная линия — нагрузка когда мы эти проблемы обнаружили. Желтый график — это график на чтение.

gkjfll3wdidopqzhik-e3icb-ne.png

А еще более интересную картину мы увидели, когда посмотрели на эту же информацию (нагрузка на чтение) в разрезе нод и увидели какую-то лапшу которую сложно разобрать.

Ключевое, что можно вынести из этого графика, — Kafka недозагружена, в отдельные моменты времени чтение происходит только с двух-трех нод кластера из пяти машин. В добавок на этой картинке отлично видно постоянное переизбрание лидеров: Kafka лихорадочно пытается переизбрать лидеров, чтобы сбалансировать нагрузку и у нее ничего не получается.

syfhlbmh1-18lynjerngajekxgi.png

В чем же была причина? Причина такого неравномерного распределения по нодам оказалась в двух вещах:


  • Во-первых, Kafka неравномерно распределяла партиции по нодам.
  • Во-вторых, Kafka неравномерно распределяла лидеров по нодам.

Конечно, у меня на слайде изображен гротескный случай. Но на самом деле он крайне близок к истине, примерно так все и было.

lz8kyod0albnvcbwn8mpnqlat18.png

Важно отметить, что даже если у вас все хорошо и трафик не вырос в 4 раза — то Kafka все равно может ошибаться и неккоректно распределять лидеров партиций по нодам.

cfduh8v3qjqxzxesm8oxrczf9cq.png

Как же мы решили эту проблему? Более-менее автоматизированного способа решения мы не нашли и единственное решение, которое нам помогло — это ручное распределение. В Kafka можно скормить конфиг, который описывает на каких нодах какие партиции должны лежать и какие из них должны быть лидерами. Это позволяет построить картинку ближе к идеалу.

Понятное дело, все, о чем я сейчас говорю, верно лишь для самых толстых топиков, через которые проходит наибольшее количество информации.

7o2v-a11xcosspqx5nivvwk-xse.png

Другой элемент решения совсем не очевиден — повышаем репликацию. Хотя казалось бы: трафик на чтение увеличился в 4 раза, зачем его еще повышать допонительным чтением данных на репликацию?

Сейчас попытаюсь объяснить, почему это помогает. Вот снова наша гротескная ситуация:

daqba77lawjjhe1cqsl4haiqpoy.png

Весь трафик пришелся на первый брокер, на нем все лидеры, он красненький, ему очень плохо.

Что делает Kafka в этом случае? По какому-то таймауту брокер не ответил, Kafka решает: «Ему плохо, дай-ка я сниму нагрузку». И получаем следующее:

x6i3oum516abbb5mmzub_ds1mgo.png

Весь трафик переезжает на другой брокер и плохо уже ему. Именно эту картину вы и видели ранее на графике нагрузки в разрезе по нодам.

А если бы у нас был фактор репликации, к примеру, 3, то мы бы могли увидеть после этого процесса следующую картину:

cylrmednadbo5lxpqmghmpug4zi.png

По сути мы помогли Kafka меньше ошибаться при распределении нагрузки по нодам — чем больше реплик, тем ей сложнее ошибиться.

Все, что я описываю верно для Kafka 0.8.2, надеюсь, в новых релизах её сделали чуть умнее. Для всех старых версий эта проблема очень актуальна.

r9afmsehudibgjfcsoff7r6gt_m.png

Также повышение нагрузки выявило довольно классические проблемы, которые, наверное, бывают во всех сервисах обработки данных.

Мы уперлись в сеть. У нас часть нод в стойке оказались с uplink в 1 Gb. Это примерно 125 мегабайт в секунду. На графике у нас на ноду приходилось более 150 мегабайт в секунду. Понятное дело — сеть была загружена.

Мы внезапно узнали, что Kafka не дружит с RAID 5. Почему внезапно? Основная причина — документация читается в последний момент. Уже из практики мы узнали что RAID 5 хорошо работает с Kafka до первого сбоя. В момент, когда у вас на RAID 5 теряется блок данных и начинается восстановление, производительность Kafka сильно деградирует. Для Kafka рекомендуется использовать RAID 10.

uv6778pvl9ksgbeoqzpn0kum2bs.png

По итогу мы поправили все описанные выше проблемы:


  • Самое главное, мы равномерно размазали всю нагрузку.
  • Разобрались с сетью.
  • Разобрались с дисками.
  • Kafka стала отвечать на базовые запросы за адекватное время — вместо минут за секунды.

Но нагрузка осталась такой же высокой.

and7a6ggorpxnz2276wkr_j_y9e.png

В чем же была проблема с избыточной нагрузкой?
Как уже нетрудно догадаться, проблема оказалась в повторном скачивании данных — у нас был at least once.

Корень этой проблемы заключался в следующем. Мы использовали батчевый consumer данных с Kafka — Camus, это прародитель Gobblin.

bytbdi6snb37k1k6evsiv-yx2ce.png

Он сохраняет отступы для топиков в директорию под названием history. И из-за ошибки в конфиге отступы всех топиков стали попадать в одну папку. На слайде приведен фикс, исправляющий проблему.

Это стало возможным из-за того, что Camus был заточен под фетч несольких топиков за один запуск. Мы иногда теряли отступы после успешного фетча, а иногда они успешно обновлялись. Из-за этого было крайне сложно понять, что проблема именно в этом месте.

Искомый фикс вкатили. Так было:

ttk8gowyy7oasihb1k6iou0yojk.png

Так стало:

8u6um3pmfyygpnw728wyp-kva_u.png

На слайде видим нагрузку на Kafka в нормальном режиме работы. Пиковый трафик упал в 4 раза, а периодические пики, которые вы видите, это запуск нашего consumer раз в полчаса. У нас на Hadoop работают задачи, которые каждые полчаса просыпаются, фетчат новые данные и обратно засыпают.

ijajgu1srdkjiib9n-eie3vzn1m.png

Немного цифр. К чему мы сейчас пришли?

Сейчас Kafka состоит из тех же 5 серверов, каждый день в нее записывается порядка 3 млрд событий. Это более 5 терабайт информации. Пиковая нагрузка на запись порядка гигабита.

Каждый день из Kafka вычитывается порядка 18 терабайт информации. И пиковая нагрузка порядка 5 гигабит. Это на 5 серверов.

Стоит отметить, что в данный момент мы держим нагрузку большую, чем та которая вызвала описанные выше проблемы, но сейчас на том же железе мы с ней успешно справляемся, растём дальше.

bfcxzqj1tjbn6-ycofz0hjzfxus.png

На последок пара слов про мониторинг.

В первую очередь стоит мониторить узкие места:


  • Сеть
  • Диски
  • Распределение нагрузки

Вы должны за этим следить, Kafka может ошибаться, и ей будет требоваться ваша помощь.

Вот, к примеру, график распределения нагрузки по нодам, когда у вас все хорошо:

hvvh2x5qbvkvbtblv5k6-f0gdao.png

Здесь можно увидеть, что 4 ноды работают более-менее в унисон. Одна немного отстает, но это не критично. При этом на графике ближе к концу заметно, как Kafka сделала leader reelection, и баланс немножко изменился.

m0viq1gvzxx5vcbhi-zfpx4fdxa.png

Во-вторых, стоит мониторить переизбрание лидеров. Да, вы на это повлиять никак не можете и данные все равно потеряете, но круто иметь количественную метрику того, насколько часто и насколько много данных вы теряете.

buznf_tgujlmrqwk72f9g4wfltu.png

В-третьих, стоит мониторить рассинхрон партиций. В любой системе с асинхронной репликацией на это стоит смотреть. Метрик для этого целое множество. Из тех, которые нравятся мне, это:


  • Число несинхронных партиций из тех, которые должны быть in sync
  • Максимальный лаг репликации

Это график максимального лага репликации с точностью до сервера показывающий на сколько сообщений мы отстаем:

8xnjtfnudaaldbmj6xmfqggjbem.png

Что можно увидеть на этом графике? В целом все хорошо, лаг около нуля с отдельными пиками — это нормальная ситуация. Стоит начать беспокоиться, когда у вас лаг стабильно больше нуля, т. е. ваша система не успевает вовремя реплицировать всю поступающую в нее информацию.

hoo7-z8cz0blxmrzcrm7cs9kymc.png

И, в-четвертых, время ответа на простейшие запросы. Это, наверное, самая главная метрика. Сама по себе она мало что значит, если только вы не хотите с помощью Kafka делать около real-time обработку данных. Но её повышение является очень важным симптомом проблем с вашей Kafka.

Здесь изображено время ответа на запросы consumer«а:

7pyivrxqd35zdddoe63ribmu5p8.png

И красной линией выделена отсечка в 10 секунд, которую мы нашли эмпирическим образом и сейчас используем в продакшн. Если Kafka отвечает дольше, чем за 10 секунд, кричим: «Караул!».

На этом графике видно два пика. По ним сразу видно, что голубенькой ноде очень плохо и пора начать разбираться её переодическими долгими ответами. Но проблемы не только на ней, на других нодах они тоже заметны.

8frr02-wgnwrtbhrnf331o7uad0.png

В качестве итога:


  • Если вы хотите обеспечить exactly once семантику доставки данных на базе Kafka, то это возможно. Но вы должны обращать внимание на все элементы pipeline. Вы должны понимать как это работает и внимательно контролировать и producer’ы, и consumer’ы и саму Kafka. Когда у вас много поставщиков данных, то крайне сложно за всеми уследить и обеспечить exactly once для всех. Рекомендую придирчиво относится только к тем данным, где это действительно необходимо.


  • Нужно мониторить узкие места. В нашем случае при повышении нагрузки это оказались — сеть, диски и баланс нагрузки по нодам.


  • Также стоит мониторить множество дополнительных метрик. Самая главная — это время ответа Kafka на запросы, причем как продюсера, так и консьюмера. И бесспорно стоит мониторить классические метрики аля cpu, memory, etc.


По вопросам доклада можно обратиться к Артёму Выборнову

© Habrahabr.ru