Apache Pulsar как основа для системы очередей

Меня зовут Агалецкий Павел, я старший инженер в команде Архитектуры. Эта статья про новую для нас в Авито технологию — Apache Pulsar и построение системы очередей, Queues as a Service, на её основе.

Я расскажу:  

  1. Почему нам понадобился отдельный сервис для очередей.

  2. Почему мы выбрали Apache Pulsar.

  3. Про архитектуру Apache Pulsar.

  4. Об особенностях работы с сервисом Queues и Apache Pulsar для пользователей.

Система очередей — это не шина данных

На начало 2020 года в Авито была общая шина данных, Data Bus на основе Apache Kafka. Идея, лежащая в основе Data Bus, — асинхронное взаимодействие между сервисами, построенное на обмене сообщениями.

Под сообщениями можно понимать как команды, так и события. Отличия между ними двумя хорошо описывает в своём докладе Мартин Фаулер. Если совсем кратко, то команды указывают на действие, которое должно произойти, а события — на действие, которое уже случилось.

Команда

Событие

Отправь email

Email был успешно отправлен

Особенностью нашей шины данных является то, что как публиковать, так и читать сообщения определённого типа могут несколько сервисов сразу. Правда, в отношении публикации несколько продюсеров для одного и того же сообщения скорее антипаттерн, так как фактически это означает, что у события присутствует несколько источников правды.

ef464a7af01f30499f49ef3f138086e3.png

Особым случаем шины данных является ситуация, когда продюсер и консьюмер сообщения — один и тот же сервис. Сообщения при этом становятся способом выполнить некоторую отложенную или асинхронную работу, а сама шина данных исполняет роль сервиса очередей.

Основным путём реализации очередей в Авито было использование RabbitMQ. У каждого сервиса был свой экземпляр RabbitMQ, в котором сервис реализовывал очереди в требуемом виде.

У RabbitMQ был ряд недостатков:

  • Невысокая скорость работы.

  • Сложность с работой в нескольких дата-центрах.

  • Свой формат сообщений для каждого сервиса.

Нам же хотелось обеспечить своим пользователям такое же удобство по работе с сообщениями, какое уже было в Data Bus и синхронных обменах. Data Bus выглядел как наиболее близкое по функциональности решение, и именно он стал основой для построения сервиса очередей.

Сравним требования, предъявляемые нами к шине данных и к сервису очередей:

Требование

Data Bus

QaaS

Источник событий

Любой сервис

Только один сервис

Получатель событий

Любой сервис

Только источник

Хранение сообщений

Должны храниться некоторое время, чтобы их могли прочитать новые потребители

Могут храниться некоторое время для расследования проблем, но новых потребителей для прочитанных сообщений не ожидается

Возможность публикации отложенных сообщений

Не требуется

Требуется

Возможность отложить сообщение в случае невозможности его обработать

Не требуется

Требуется

Максимальное число консьюмеров на одного клиента

Не критично

Желательно не иметь лимита

Основная задача шины данных — асинхронный обмен информацией между сервисами, а задача системы очередей — это асинхронное или отложенное выполнение различных операций в рамках одного сервиса.

В нашем случае сервис Data Bus стал мощной точкой отказа, так как вбирал в себя все процессы асинхронного обмена данными как между сервисами, так и внутри отдельных сервисов, как система очередей. При этом используемая в Data Bus Kafka хорошо подходит для асинхронного обмена данными между разными сервисами, но плохо — для построения очередей внутри сервиса. 

По этим причинам мы решили отделить систему очередей от Data Bus и вынести её в отдельный сервис — Queues.

Почему Apache Pulsar

Новый сервис — новые возможности. Возник вопрос о том, какую технологию использовать для обеспечения функциональности очередей.

Разумеется, первой технологией-кандидатом была уже использующаяся Apache Kafka. Но, как я уже говорил, архитектура и изначальная идея Kafka несколько противоречит её удобному использованию для организации очередей:

  1. Максимальное число консьюмеров ограничивается числом партиций в топиках.

  2. Нет возможности публикации отложенных сообщений из коробки.

  3. Самочувствие Кафки ухудшается по мере роста числа топиков.

Наш взор упал на Pulsar — более молодую технологию, также находящуюся под крылом Apache. Pulsar был разработан в Yahoo для обработки сообщений на нескольких платформах: Flickr, Yahoo Mail, Yahoo Finance. В 2016 году он впервые стал доступен как open source решение.

Основными возможностями Pulsar, которые сделали его фаворитом в наших глазах в качестве основы для реализации системы сообщений, стали:

  1. Поддержка хранения бэклогов сообщений, подобно Apache Kafka.

  2. Поддержка на уровне самого Pulsar механики публикации отложенных сообщений.

  3. Отсутствие органичений на число консьюмеров.

  4. Высокая скорость работы.

  5. Поддержка более 1М топиков в кластере.

  6. Поддержка гео-репликации.

В результате мы подготовили новый сервис Queues, где реализовали полностью совместимое с Data Bus API для работы клиентов с очередями. Архитектурно Queues был построен таким образом, чтобы можно было заменить внутреннюю реализацию хранилища с Pulsar на другой вариант или вернуться на Apache Kafka.

Проведённые нагрузочные тесты показали хорошее поведение при большом объёме данных и низкое латенси на запись и чтение:  

56b835f277a209a3ccedbb9d4a0ee571.png

Мы приняли решение о переводе первых пользователей на новый сервис, проверили его в бою, а затем перевели и остальных пользователей тоже.

Архитектура Apache Pulsar

Начнём с того, что все подробности архитектуры Pulsar можно найти в его официальной документации, которая очень даже неплоха.

Верхнеуровневая схема

На самом верхнем уровне запущенная конфигурация Pulsar состоит из одного или нескольких кластеров, которые могут реплицировать данные друг с другом. Несколько кластеров стоит использовать в ситуации гео-распределённой работы системы.

323978be716f4f8617e1d6693d11f630.png

Мы используем один общий кластер, который физически разнесён по нашим дата-центрам. Отдельные кластеры были бы полезны, если бы латенси доступа до какого-то кластера был очень большим, что в нашем случае не так.

Следующим уровнем архитектуры является устройство кластера:

14f5a35e7cd1c19d7835006fd5cced5c.png

Продюсеры и консьюмеры сообщений используют механизм service discovery, чтобы найти брокер внутри кластера и подключиться к нему. Первым брокером для подключения клиента может быть любой из брокеров кластера. В нашем случае service discovery осуществляется на основе Consul. Все клиенты подключаются по адресу DNS, указанному в Сonsul, и уже он адресует их через round-robin к одному из брокеров Pulsar.

После подключения к брокеру, клиент посылает ему lookup-запрос на получение информации об интересующем его топике. Брокер может ответить на запрос, сразу вернув информацию о том, какой брокер обслуживает соответствующий топик, или вернуть информацию о том, что запрос надо повторить к другому брокеру. Второй ответ может вернуться, когда текущий брокер не обладает информации о топике по какой-то причине.

Если информация о топике вернулась, то клиент узнаёт из неё адрес брокера и подключается уже к нему. Далее, если клиент собирается публиковать сообщения, то он посылает запрос на создание продюсера CreateProducer. Если же он собирается получать события, то посылает запрос на добавление консьюмера в подписку — Subscribe.

7cf8c6b8203321a3bd507034cf52ba12.png

Брокеры и BookKeeper

Здесь надо рассказать о том, что представляет собой брокер Pulsar.

20ebb9162d784e07b12261722d5df0e2.png

В Pulsar брокер — это легковесный компонент, который отвечает только за взаимодействие с клиентами и реализацию протокола Pulsar. За хранение данных отвечает отдельный компонент — BookKeeper, через который осуществляется чтение/запись информации. Это означает несколько вещей.

Во-первых, мы можем масштабировать брокеры независимо от хранилища данных. И наоборот — один брокер может взаимодействовать с несколькими экземплярами BookKeeper. Иными словами, в Pulsar можно отдельно масштабировать компонент, отвечающий за хранение, и компонент, отвечающий за API и логику работы очередей.

Второй момент касается способа хранения и репликации сообщений. BookKeeper, согласно определению, представляет собой Write Ahead Log и изначально был спроектирован как система хранения такого типа лога для HDFS. Он хранит сообщения в виде отдельных файлов-логов, называемых ledger.

Каждый отдельный топик разбивается на несколько ledgers, при этом сами ledgers разделяются по инстансам BooKeeper, называемым bookie. Такой подход имеет ряд преимуществ. Во-первых, отказ одного bookie не приводит к потере или недоступности всех сообщений в топике. Во-вторых, каждый новый ledger открывается на bookie, который наименее загружен в данный момент.

9ec318e075be92676fcfc53cb48dd90b.png

Пара слов про репликацию сообщений при записи. На данный момент Pulsar осуществляет запись сообщений в три экземпляра bookie, при этом его настройки требуют, чтобы как минимум два bookie были в разных rack. Rack в BookKeeper абстрагирует физическое размещение серверов в дата-центре. В нашем случае это означает, что как минимум две копии сообщений будут расположены в разных дата-центрах.

Кроме BooKeeper Pulsar также использует ZooKeeper для хранения метаинформации о топиках, подписках и прочем.

Продюсеры и подписки

Как именно в Apache Pulsar реализована работа с публикацией и получением сообщений?

Продюсер с точки зрения Pulsar регистрируется для каждого топика и существует до тех пор, пока он либо не отправит команду на отключение от Pulsar, либо пока не порвётся коннект между клиентом и брокером. После регистрации продюсер может сразу начать слать сообщения в Pulsar.

Получение сообщений реализовано сложнее. Некоторая группа подписчиков объединяется в понятие Subscription (подписка). Подписки характеризуются:

  1. Названием.

  2. Типом.

Название служит только для идентификации подписки и должно быть уникальным в рамках топика.

Тип подписки определяет, как именно консьюмеры, входящие в одну подписку, получают сообщения.

Существует четыре типа подписок:

  1. Shared.

  2. Key_Shared.

  3. Exclusive.

  4. Failover. 

Shared — наиболее типичный для очередей режим работы. Все консьюмеры подписки будут получать сообщения по принципу round-robin. Именно этот тип подписки используем мы.

1a8209ac819f1ecec219c000f89e054c.png

Key_Shared — данный тип подписки аналогичен Shared, но если у сообщений в топике есть ключ, то события с одинаковым его значением будут доставляться только определённому консьюмеру.

2c15792c10f994dffca3d817784a417b.png

Exclusive — в такой тип подписки может в один момент времени подключиться только один консьюмер. Другие консьюмеры, которые будут пытаться подключиться, получат ошибку.

6c851468bf830e99b044f9213d37b8db.png

Failover — похож на Exclusive, но к подписке может быть подключено любое число консьюмеров. Pulsar выполнит их сортировку по имени и будет отдавать сообщение только первому. Если первый из консьюмеров отключится, то Pulsar начнёт отдавать сообщения следующему по очереди.

c26cd44b865f6fc6b518cbb9df884d52.png

Ack сообщений

Когда консьюмер получает и обрабатывает некоторое событие, он должен его акнуть, то есть сообщить Pulsar, что оно успешно обработано. В отличие от Kafka, в Pulsar акать сообщение можно индивидуально и независимо от остальных сообщений. 

Если ваш консьюмер вычитал сто сообщений, то можно акнуть, например, 45-е от начала. Это не будет означать, что сообщения до него тоже станут акнутыми. Нет, Pulsar трекает все сообщения, которые он сейчас отдал консьюмерам, и следит за аками этих сообщений тоже индивидуально. В Pulsar также есть механизм аккумулятивных аков, но он доступен, по сути, только для эксклюзивной подписки. 

В Pulsar есть лимит на количество отданных конкретному консьюмеру сообщений. Он довольно большой, и я думаю, что на практике достичь его сложно. Чтобы упереться в лимит, нужно создать несколько тысяч консьюмеров и вычитать из каждого из них по несколько сотен сообщений, при этом не акая ни одно. Но если лимит всё-таки будет достигнут, то Pulsar перестанет отдавать консьюмерам новые сообщения, чтобы не потратить всю память на хранение информации о них.

Почему так сделано? Дело в том, что помимо самих топиков в BookKeeper хранится и ещё информация о курсоре. Курсор указывает на место, до которого все сообщения из топика были акнуты соответствующий подпиской. Он ведётся независимо для каждой подписки по отдельности. 

d4838630ac4d714347a87f941699f1eb.png

Представим, что сейчас курсор указывает на какие-то два сообщения от начала, как на картинке. Что бы в такой ситуации ни произошло с нашим брокером, клиентом, Queues, и даже самим BookKeeper, сообщений до курсора отдано клиенту не будет. Исключение — если мы специально сдвинем курсор. А так мы зафиксировали, что клиент акнул два сообщения, в данном случае помеченных зелёным цветом. А вот сообщения розового и голубого цвета — те, что мы сейчас отдали клиенту. При этом помеченные розовым цветом сообщения клиентом ещё не акнуты, а голубое — акнуто.

Pulsar трекает у себя в памяти и записывает в ZooKeeper, что из новых сообщений, которые мы сейчас вычитали и отдали нашему консьюмеру, акнуто только одно. Но он не двигает курсор в топике, потому что он видит, что перед акнутым сообщением есть ещё какое-то количество неакнутых. В нашем примере такое сообщение одно. Когда же клиент акнет непрерывную последовательность событий, курсор сдвинется и запишется в BookKeeper. Тогда мы продолжим трекать только какие-то следующие сообщения, которые ещё не акнуты клиентом.

Эта работа требует определённой памяти в Pulsar. Нужная информация хранится в оперативной памяти, плюс Pulsar записывает её в ZooKeeper. Из-за этого возникает лимит на некое максимальное количество сообщений, которое можно отдать клиенту. Этот лимит настраиваемый, в реальности может трекаться много тысяч сообщений. Всё это работает в рамках подписки: одновременно у топика может быть любое количество подписок, каждая из них может работать в своём индивидуальном режиме, и у каждой подписки может быть индивидуальный курсор. Соответственно, и акать сообщения они могут полностью индивидуально.

Бэклог подписки

Что происходит с сообщением, когда оно попадает в топик? Это сильно зависит от того, какие параметры выставлены в настройках Pulsar. Если кратко, то есть пара основных вариантов.

Если у топика нет ни одной подписки или все подписки прочитали сообщение, то для него настаёт период ожидания, когда Pulsar выполнит его удаление. Удалять прочитанные сообщения можно либо по наступлению таймаута его нахождения в топике, либо по пределу занимаемой топиком памяти. 

Если у топика есть активная подписка, то сообщение попадает в её счетчик бэклога. Когда бэклог подписки достигает заданного значения, то, в зависимости от настроек, может случиться одна из следующих вещей:

  1. Pulsar начнёт удалять из подписки самые старые сообщения.

  2. Pulsar заблокирует возможность публикации новых сообщений в топик.

У нас контроль бэклога отключен, поэтому можно публиковать любое количество сообщений.

Партицированные топики

Один из способов повысить пропускную способность — добавить к топику партиции.

В Pulsar партиции реализованы через обычные топики, объединенные в общую группу — партицированный топик. Число партиций для топика задаётся на глобальном уровне в настройках.

Когда клиент-продюсер или консьюмер подключается к брокеру и получает информацию о топике, то он сначала запрашивает у него информацию о том, является ли топик партицированным. В ответ Pulsar возвращает число партиций или сообщение о том, что партиций нет.

450fc404941e1bd7eb317b91bcaa5e4e.png

Далее клиент создаёт отдельный продюсер или консьюмер для каждой партиции. Каждая партиция, будучи по сути просто топиком, обслуживается своим брокером. Порядок публикации сообщений в партиции определяется клиентом. Мы используем обычный алгоритм round-robin, то есть пишем каждое последующее сообщение в следующую по очереди партицию.

При чтении событий из партиций используется отдельный консьюмер для каждой партиции. В подключенные к сервису Queues клиенты мы отдаём сообщения из всех партиций. В этом случае также используется round-robin, только наоборот: мы вычитываем по очереди из каждой партиции и отдаём сообщения клиентам.

Тонкости работы с Apache Pulsar

Такая сложная система, как очереди, не может не иметь нюансов работы. В сервисе Queues мы стараемся скрыть детали подлежащей системы очередей и минимизировать её особенности. К сожалению, это невозможно сделать на все сто процентов, поэтому какие-то вещи всё равно протекают и влияют на работу клиентов, подключающихся к нашей системе.

Я расскажу про пару основных деталей, на которые мы обращаем внимание сотрудников Авито. Возможно, вам они тоже пригодятся. 

Батчи сообщений

Один из способов повышения пропускной способности в Apache Pulsar — использование отдельных партиций, поскольку в таком случае происходит распределение нагрузки по брокерам. Другой важный способ повышения производительности — использование батчей сообщений.

При публикации сообщений в Pulsar, клиент дожидается получения от него ack. Pulsar, в свою очередь, возвращает ack только после того, как сохранил сообщение во все сконфигурированные для него инстансы bookie. Среднее время публикации одного сообщения составляет 5–7 мс, и понятно, что если писать все сообщения по одному последовательно, то общее время публикации заметно вырастет.

Поэтому Pulsar предлагает механизм, называемый батчами. В этом случае клиент отсылает сообщения не по одному, а сразу группой. Наш сервис Queues получает сообщения от подключенных к нему клиентов и в рамках каждого отдельного пода накапливает батч до выполнения одного из трёх условий:

  1. Максимальное время наполнения батча не должно превышать 1 мс.

  2. Максимальный размер батча, отправляемого в Pulsar, не должен быть больше 1000 сообщений.

  3. Мы поместили в батч все сообщения, которые получили от подключенного к нам клиента.

После наступления одного из условий выше, Queues отсылает батч в Pulsar. В этом случае Pulsar возвращает один общий ack на все сообщения из батча.

При этом надо иметь в виду, что батч в Pulsar — это не просто группа сообщений, отсылаемых вместе через API. Отправленный батч также записывается в bookie, поэтому такая операция выполняется гораздо быстрее в сравнении с индивидуальными сообщениями. Например, отправка батча в 1000 сообщений занимает примерно 10 мс вместо 7 мс для одного сообщения.

Но важной особенностью для потребителей является то, что Pulsar также батчами отдаёт сообщения консьюмерам. То есть при получении следующего сообщения от Pulsar мы можем получить батч, размер которого будет таким же, каким он был при публикации.

Разумеется, подключенным к Queues клиентам мы отдаём сообщения по отдельности, в соответствии с запрашиваемым с их стороны размерам батчей.

Надо иметь в виду, что ack со стороны консьюмеров в этом случае осуществляется также на уровне всего батча: когда последнее сообщение из батча акается, мы посылаем ack в Pulsar. Это означает, что если на стороне Queues или Pulsar случится сбой до обработки всего батча, сообщения из него могут быть доставлены ещё раз. В Pulsar, начиная с версии 2.6.0, есть поддержка хранения информации об аках внутри батча.

Вторым следствием является то, что поскольку один консьюмер получает сразу целый батч сообщений, то есть шанс, что распределение сообщений по консьюмерам будет не совсем честным. Все сообщения из такого батча достанутся одному консьюмеру. С данной проблемой мы думаем побороться путём шаринга одного консьюмера Pulsar на несколько клиентских консьюмеров.

Выводы:

  1. Если ваш сервис пишет относительно мало сообщений, но обработка каждого из них тяжела — лучше писать маленькими порциями.

  2. Если вам надо очень много писать, то лучше писать батчами.

Балансировка нагрузки

Apache Pulsar умеет балансировать нагрузку внутри себя. Единицей баланса является бандл — совокупность топиков. 

Бандл представляет собой числовой диапазон от N, M, где N и M — целые числа, такие что N < M, и 0 < N, M < 2^32.

Попадание топика в бандл определяется как хэш функция от названия топика, которая также даёт результат от 0 до 2^32.

В Pulsar настраивается число бандлов и каждый топик в итоге попадает в один из них. Далее каждый бандл закрепляется за определённым брокером. Если нагрузка на брокер начинает расти, то он может избавиться от части топиков, разделив бандл на части. Эта логика полностью сокрыта внутри Pulsar.

В момент, когда Pulsar выполняет балансировку нагрузки, он отключает потребителей топиков того бандла, который сейчас балансируется, передает бандл другому брокеру и разрешает подключение снова. Соответственно, для продюсеров и консьюмеров это выглядит как кратковременная недоступность соответствующих топиков.

Что это означает на практике:

  1. Лучше всего резко не менять число продюсеров/консьюмеров.

  2. Когда происходит балансировка, топики могут быть недоступны на короткий период времени. Это не баг.

Заключение

На текущий момент мы переключили всех пользователей в Авито на сервис Queues. Переключение уже дало свои плоды: уменьшилось время, затрачиваемое на публикацию сообщений, многие сервисы получили возможность свободно скейлить консьюмеры и постепенно уйти от использования RabbitMQ.

Сейчас мы используем не все возможности Apache Pulsar. Во многом это связано с необходимостью сохранения обратной совместимости с Data Bus в части API.

Между тем, Pulsar даёт много интересных направлений развития. Например, можно дать возможность выполнять ack произвольных сообщений независимо от остальных. Можно использовать встроенный в Pulsar механизм работы с отложенными сообщениями так, чтобы не было потребности в отдельном специальном объявлении отложенных очередей. Решение по этим доработкам мы будем принимать на основе дальнейшего опыта использования системы.

© Habrahabr.ru