Распределенная трассировка с Jaeger и Clickhouse

Привет! Меня зовут Филипп Бочаров, я руководитель центра мониторинга и наблюдаемости в МТС Digital. Мы делаем распределённую трассировку, чтобы контролировать качество наших сервисов и предотвращать аварии. В этой статье разберём, как добиться понятной и прозрачной работы от сложных распределённых систем.

За время, прошедшее с прошлого доклада, количество обрабатываемых в единицу времени спанов выросло в несколько раз. Рассмотрим, какие архитектурные решения начали «поджимать» и как команда МТС их исправляла.

Эта статья написана на основе доклада для Highload, который  стал продолжением доклада для Highload 2022. Там мы строили сервис на кластере Elasticsearch и разгоняли его до нагрузки 50k спанов/с. За год у нас многое изменилось как в архитектуре, так и по нагрузке. 

Трассировка экосистемы

МТС — довольно большая экосистема. Когда пользователь выполняет какую-то важную бизнес-операцию, в этом часто участвует не один продукт. Например, если пользователь оформляет подписку на услугу, в процессе будут участвовать целых четыре продукта:  

1830029066b7e9c47c5dbe94fcbe89cf.jpg

Один продукт занимается коммуникацией с пользователем, отправляет ему SMS. Второй отвечает за биллинг: считает стоимость услуги. Третий сохраняет профиль в БД. 

Представим, что между продуктами A и B возникла какая-то проблема, процесс затягивается. Но ни A, ни B в этом не виноваты, а виноват следующий в цепочке продукт — С. Продукт B обращается к продукту C, но постоянно получает ошибки, происходит переповтор, в результате пользователь не получает услугу, которую так хотел.

Чтобы автоматически получать картинку бизнес-операций, и существует сервис распределённой трассировки. Это древовидная структура данных. Она показывает, как во времени общаются распределённые сервисы между собой: кто кого вызывает, сколько времени это занимает и есть ли там ошибки. Такую картину мы хотим видеть по всем бизнес-функциям во всей экосистеме.

Старая архитектура

В 2022 году у нас была такая архитектура:

969add6de177f8ea7440153775b11e3a.jpg

У нас было множество инструментированных продуктов и сервисов. Это значит, что в каждый сервис встроен SDK, который собирает распределённую трассировку с продукта. Например, регистрирует все входящие и исходящие запросы к сервису. Вся эта трассировка собирается и отправляется на бэкенд платформы, где её подхватывают множество экземпляров Jaeger-коллектора. Коллектор кладёт весь этот поток на промежуточное хранение в очередь Apache Kafka. Это нужно, чтобы сглаживать различные пики нагрузки. Jaeger Ingester достаёт всё это из Apache Kafka и кладёт в кластер Elasticsearch уже на постоянное хранение.

Чтобы работать с распределённой трассировкой, у нас было два интерфейса:

  1. Jaeger UI — родной интерфейс Jaeger, который позволял делать различную операционную работу, то есть искать конкретные трейсы по параметрам по определённой проблеме.

  2. Kibana, в которой мы делали различные аналитические дашборды. Так мы могли проанализировать не один трейс, а их совокупность.

Также мы научились вычислять долгосрочные тренды по трассировке и так называемые APM-метрики (метрики производительности приложений). Мы написали для этого отдельный сервис. Он обрабатывал тот же самый поток трассировки, высчитывал по каждому спану ту или иную метрику. Сервис понимал, что спан — это вызов БД или входящий запрос к нашему сервису, вычислял соответствующие метрики и клал это всё в Time Series БД. У нас также была Grafana, в которой мы могли наслаждаться дашбордами. Например, смотреть как менялась производительность разных методов за 6 месяцев.

Вся эта схема нас устраивала. На 2022 год она обрабатывала 50 000 спанов/с. Но время не стоит на месте, появились проблемы.

Проблема №1: x4 рост нагрузки

На сегодняшний день нагрузка уже выросла в четыре раза. В дневные часы она достигает 200 тысяч спанов/с, также есть выбросы до 300 тысяч.

3ae907e08d12ffa7f78c16211f52112b.jpg

Наши пользователи начали жаловаться на медленный и нестабильный поиск трейсов в Jaeger UI. Но самое плохое, что подход горизонтального масштабирования кластера Elasticsearch перестал нас радовать своей производительностью. Мы закидывали проблемы железом, увеличивали  количество нод. Но производительность не росла линейно, а лишь увеличивалась на небольшой процент. Мы начали понимать, что если нагрузка продолжит расти такими же темпами, то никакого железа нам не хватит.

Проблема №2: аналитика

В Kibana мы могли строить аналитические дашборды. Но они нас не до конца устраивали по своей функциональности. Например, вот сложный SQL-запрос, который бы мы хотели выполнять по нашему хранилищу:

58fafb4d737b64738cfc73d90768d48e.jpg

Если читать его из наиболее вложенного запроса к верхнему, то там происходит следующее:

  • Находим все трейсы, которые содержат определённую операцию (в данном случае OrderFulfillment, операция заказов с ордером).

  • Дальше для каждого трейса вычисляем его полную длительность, то есть время от первого спана до последнего, полную длительность нашей бизнес-операции.

  • Наконец, в самом верхнем запросе мы высчитываем 95 перцентиль. Трактовать этот запрос можно как время, в которое укладывается 95% выполнения той или иной бизнес-операции например, подписки.

Этот запрос выполнить по Elasticsearch в принципе невозможно, так как в нём  нет подзапросов, джойнов, синтаксис ограничен и работает это достаточно медленно.

Кроме того, тяжёлые аналитические запросы с этих дашбордов могли дестабилизировать наш кластер Elasticsearch. В Elasticsearch есть механизм защиты, который называется circuit breaker. Когда вы выполняете сложный запрос, кластер Elasticsearch перегружается и может обрубить его, а заодно и запросы ваших коллег. И это приведёт к тому, что кластер начнёт работать медленнее.

RND хранилища на Clickhouse

Обе проблемы привели нас к поиску альтернативного хранилища для наших трейсов, которое и проблему с нагрузкой решит, и аналитику позволит делать.

Jaeger из коробки поддерживает 3 типа хранилищ:

  1. In-memory хранилище — чисто тестовая вещь;

  2. Elasticsearch;

  3. Cassandra.

Любые другие хранилища тоже можно поддерживать с помощью механизма плагинов. Вы можете расширить Jaeger, написать свой плагин и поддержать там любой storage, который вам нравится. Наш выбор пал на Clickhouse. Для него уже был плагин. Поэтому мы начинали не с нуля, а брали готовую разработку.

5b15fcfc568c6c492b57733c782f741e.jpg

Первые же результаты RND были вдохновляющими:

  • Jaeger UI стал отвечать на 20% быстрее при той же нагрузке, быстрее искать данные;

  • Хранение стало в 2–4 раза компактнее;

  • Аналитические запросы также стали выполняться на порядок быстрее, к чему мы и стремились.

Поэтому мы решили завести это в прод.

gRPC для Clickhouse

Поддержка Clickhouse в Jaeger реализована через плагин, отдельный бинарный файл, который, как и Jaeger, написан на Go. Его можно передать в качестве параметра Jaeger, тот запустит его как отдельный дочерний процесс и будет с ним общаться по gRPC.

80fa908cfceae4ec1b2b4491a113e445.jpg

Если вы хотите реализовать свой storage, просто напишите свой gRPC-сервер, реализующий определённое API. Jaeger с ним общается и абстрагируется от конкретного storage.

Single node для stage-контура

Это контур, на который шла трассировка от непродуктивных стендов наших продуктов. Например, от тестовых стендов. Мы взяли минимально возможную конфигурацию — никаких кластеров, просто обычная single нода Clickhouse. Наши сервисы Jaeger мы запустили с плагином, нацелили их на Clickhouse, а для того, чтобы заменить Kibana с аналитикой, взяли Grafana. В ней есть замечательный Clickhouse-плагин, который позволяет обращаться к Clickhouse, визуализировать результаты запросов и строить аналитику.

71ac545131b4c887af8de888c4e10e6f.jpg

На тот момент на stage-контуре нагрузка была 25k спанов/с, Clickhouse её прекрасно выдержал. Мы решили использовать это значение как референсное, как некую схему масштабирования — на каждые 25–30k спанов/с стали делать отдельную ноду Clickhouse. 

Кластер для prod-контура

Нагрузка была 75k спанов/с, мы поделили её на 25, и поняли, что нам нужны 3 ноды Clickhouse. Развернули кластер из 3 шардов. Наученные опытом работы с Elasticsearch мы также добавили балансировку между всеми компонентами и нашим кластером, чтобы запросы на чтение и запись равномерно распределялись по нодам.

83398500ff3c027329f531c95b9bee25.jpg

Проблема

3 ноды у нас были, а трёхкратной скорости не было. Она выросла только в 1,5 раза. Кроме того резко выросла метрика Delayed Inserts (отложенная вставка):

a970cc362dd99427b5d1ee306f6178e8.jpg

Мы выяснили, что это связано с механизмом distributed-таблиц в Clickhouse. 

Дело в том, что Jaeger создаёт таблицы двух типов:

409ffb8242df718db6ff7f823f647232.jpg

  1. Локальные таблицы, которые физически хранят данные (показаны зелёным).

  2. Distributed-таблицы — некие прокси, которые позволяют вычитывать данные со связанных с ней таблиц со всех нод. То есть результаты запроса со всех нод объединяются.

Оказалось, Jaeger использует distributed-таблицы не только для того, чтобы читать, но и чтобы записывать данные. Когда мы записываем данные через distributed-таблицы, начинает работать механизм шардирования. Clickhouse старается разложить наши данные по ключу шардирования по нодам. Мы чётко знаем, на какую ноду какой трейс должен лечь. В качестве ключа шардирования у нас используется traceID, то есть трейс целиком попадает на ту ноду, которой предназначен.

Если мы попытаемся записать данные не на ту ноду, которая должна эти данные содержать по-внутреннему маппингу, то ничего страшного не произойдет. Clickhouse их запишет и потом фоном будет пытаться передать эти данные на ту ноду, которой эти трейсы предназначены. То есть у нас появляются фоновые процессы передачи данных между нодами. Метрика Delayed Inserts как раз говорит о том, что этот механизм запаздывает, что эти данные не успевают передаваться в реальном времени и лаг репликации растёт.

Решение

Мы решили всё поломать: отказаться от механизма distributed-таблиц и писать сразу в локальные таблицы. Сделать это очень легко: нужно подменять конфиг Jaeger Ingester, чтобы он забыл про кластеры, как будто работает с обычной single нодой.

6867c336fee8e69103644eee78b37e06.jpg

В этом случае все запросы распределяет наш балансировщик, в нашем случае nginx.  Он равномерно round robin«ом раскидывает данные, записывает их сразу в локальную таблицу без использования ключей шардирования. Данные остаются там, куда они записались.

При этом чтение мы не трогаем. Jaeger UI, Grafana, как читали из distributed-таблиц, так и продолжают.

Что изменилось?

Естественно, были положительные эффекты:

→ Линейное масштабирование

Мы вышли на расчётную производительность 75k спанов/c, к которой так стремились.

→ Решили проблему с Delayed Insert.

Мы перестали использовать distributed-таблицы. Но, как говорится, у медали две стороны.

Если раньше мы чётко знали, что трейс лежит целиком на конкретной ноде, то теперь наши данные равномерно размазываются по всем нодам. В результате не знаем, где какие части трейса лежат. Чтобы его целиком собрать, нам нужно запрашивать все ноды. Это здорово для записи, но не очень удобно для аналитики.

Для нас это хороший trade-off. Ведь мы пишем много и каждую секунду, а читаем относительно редко, только когда у нас случается какая-то проблема и инженерам нужно провести расследование.

Примерно так у нас стала выглядеть картина с балансировкой нагрузки на запись:

2bf54ffffcd453d73190021f92048447.jpg

Nginx раскидывал запросы по нодам Clickhouse более-менее равномерно. Каждой ноде доставалось примерно по 2 запроса в секунду, записывающих по 20–30k строк/с в Clickhouse.

Ускоряем Jaeger Collector

Естественно, Clickhouse был не единственным узким местом. Jaeger тоже нуждался в тюнинге. Мы сделали следующее:

→ KAFKA_PRODUCER_COMPRESSION = gzip

Очень большой прирост производительности нам дало включение сжатия на Jaeger. Это до сих пор экспериментальный параметр, который задаёт сжатие потока спанов, которые записываются и читаются из Kafka.

→ KAFKA_PRODUCER_BATCH_SIZE = 10485760

Есть рекомендация: писать в Clickhouse лучше раз в секунду, но большими батчами. Исходя из вашего трафика, вы можете подобрать размер батча, который вам нужен, чтобы раз в секунду обращаться  к Clickhouse.

→ COLLECTOR_QUEUE_SIZE = 10000

Мы установили размер очереди коллектора, при переполнении будут дропы.

→ COLLECTOR_NUM_WORKER = 50

Также мы ограничили количество потоков на отправку данных.

Оптимизация удаления данных

Как по расписанию с 6:00 до 10:00 утра мы видели такую замечательную картину со свечками:

c0e8084e93419379c6c40994443046bf.jpg

У нас росло потребление CPU на нодах. Это была постоянная воспроизводимая в одно и то же время проблема. Поэтому мы решили, что дело в какой-то фоновой операции в Clickhouse. И оказались правы.

Причиной стало фоновое удаление старых данных в таблицах по TTL построчно. Естественно, данные трассировки мы не храним бесконечно. У нас есть ограничение в 14 дней хранения. Оно реализовано с помощью механизма Time To Live в Clickhouse. На таблицу вешается это ограничение, и Clickhouse сам эти данные удаляет в фоне, причём по дефолту построчно. Это приводило к очень высокой нагрузке на сам Clickhouse.

К счастью, есть замечательный параметр ttl_only_drop_parts, который инструктирует Clickhouse удалять не построчно, а целыми частями.

30fe2f0d3de7365fced7c2537b7fddb9.jpg

Когда мы записываем какие-то данные в Clickhouse, они складываются в части и объединяются между собой в большие части. Естественно, удаление данных частями (parts) более эффективно, чем удаление построчно. Когда мы включили параметр, утренние пики нагрузки ушли, а мы продолжили по утрам спокойно пить кофе.

Тяжёлые аналитические запросы

Пример дашборда в Grafana по данным Clickhouse:

a02879ff18deb870616f4aa96e2a43d8.jpg

Здесь мы считаем длительность наших бизнес-операций. Но мы столкнулись с той же самой проблемой, что и с Elasticsearch, а именно — тяжелые аналитические запросы могли приводить к дестабилизации кластера Clickhouse и выпадению нод из него.

Чтобы справиться с этой проблемой, мы решили разделить нагрузку на чтение и запись. У нас будут выделенные сервера Clickhouse, которые нагружены записью и реплики, выделенные под аналитику, с которых будет идти чтение. К тому моменту нагрузка на наш стенд уже достигла 200k спанов/с. Поэтому мы масштабировали наш кластер, сделали 10 шардов и 10 реплик, каждому шарду соответствовала своя реплика, и всё чтение шло с реплик.

54b4d80f46680bee958ac40320b94700.jpg

Но, как вы знаете, пользователи — люди талантливые, могут написать такой запрос, который выведет из строя даже железо, выделенное конкретно под аналитику. Поэтому в любом случае мы нуждались в механизме ограничений. Мы должны были наложить какие-то квоты и ограничения на аналитические запросы, чтобы наши ноды не страдали. Именно так в нашей схеме появилось нечто под названием chproxy между репликами и Grafana.

Chproxy

Chproxy — это HTTP-балансировщик. Он умеет настраивать ограничения на аналитические запросы, в том числе:

  • ограничение количества запросов;

  • ограничение конкурирующих запросов;

  • ограничение времени запроса (таймауты);

  • кэширование ответов для любителей постоянно нажимать на рефреш на наших дашбордах.

Теперь дашборды Grafana не могут перегрузить реплики Clickhouse. Закручивая эти гайки, мы добились, что тяжёлые аналитические запросы с дашбордов не приводили к проблемам на самих нодах Clickhouse. Максимум у пользователя не загружался дашборд и он понимал, что ему надо оптимизировать свой запрос или уменьшать диапазон дат на дашборде для снижения нагрузки.

Запросы

Простенький запрос — поиск спанов по тегу db.statement. Мы здесь ищем все спаны, которые содержат значение GetAsync:

f897443cc68698fffd62aeff27882ce2.jpg

Тут какая-то сложная комбинация. Сначала мы проверяем, есть ли такой тег, потом громоздкой конструкцией пытаемся приравнять его с GetAsync. Получилось сложно. А всё из-за того, что Jaeger хранит свои теги как два массива — отдельно ключи, отдельно значения. Очень удобно для записи, но отвратительно для аналитики и написания запросов.

К счастью, поправить это очень легко: мы просто сделали отдельную вычисляемую колонку tags с типом map, то есть словарь «ключ-значение»:

22d7b3426100b8cd9f33f8eab4444c51.jpg

Колонка вычисляется при вставке в неё данных, данные хранятся сразу на диске, что облегчает к ним доступ. Наш запрос сразу становится гораздо чище, интуитивно понятней, кроме того, не нужно проверять наличие ключа:

d923e3b2b9a53466a1e41fce018a25da.jpg

Согласитесь, так писать запросы гораздо удобней.

Баг в плагине 

Когда Jaeger загружается, то с бэкенда тащит уникальные значения сервисов и операций, чтобы сделать выпадающие фильтры. По сути, делает select distinct. Чтобы  не делать select distinct налету, у Jaeger есть специальное материализованное представление operations, куда эти данные складываются прямо во время вставки. Мы залезли в эту таблицу и увидели там десятки миллионов строк, что было очень странно, потому что у нас полторы тысячи сервисов, там десятки тысяч операций, но никак не миллионы. Естественно, всё это были дубли, которые приводили к медленной загрузке Jaeger UI и ошибкам таймаута загрузки.

Как оказалось, был небольшой баг в самом плагине. Когда вы создаёте таблицу в Clickhouse, вы можете выбирать из целого семейства доступных вам движков. Там есть ReplicatedMergeTree, который удаляет дубликаты, и SummingMergeTree, который может эти дубликаты агрегировать, посчитать количество, положить в отдельную колонку.

У нас по-умолчанию использовался обычный MergeTree, который с дубликатами ничего не делал, поэтому дубли стали постоянно набираться в таблице и в конце концов всё стало тормозить и не открываться.

Понятно, что мы заменили движок в таблице, дополнительно кэшировали запросы на список сервисов и операций, чтобы всё это ускорить и наш Jaeger снова взлетел. Это баг уже поправлен на GitHub, поэтому вы с ним не столкнётесь.

Финальная архитектура 

У нас по-прежнему множество продуктов, инструментированных либо OpenTracing«ом, либо OpenTelemetry, которые отправляют трассировку на бэкенд. Там её подхватывают множество экземпляров Jaeger-коллектора или OpenTelemetry-коллектора, который мы сейчас используем. Также всё проходит через Kafka.

Главное изменение произошло в области хранилища. Теперь Jaeger Ingester уже с плагином работает с Clickhouse, записывает данные на шарды. Чтение идёт с реплик. Jaeger UI читает через балансировщик nginx и обеспечивает операционную деятельность. Grafana обращается через chproxy, чтобы сделать дополнительные умные ограничения и позволяет выполнять аналитические запросы.

6103578eafe3c06326bf811efcc73f34.jpg

Как мониторить?

Понятно, что нам нужно мониторить много параметров.

В Clickhouse:

  • место на дисках нод, которое имеет свойство кончаться неожиданно;

  • количество соединений Clickhouse;

  • отставание репликации;

  • количество запросов на запись. В идеале нужно стремиться свести их к одному запросу в секунду. У нас получилось чуть больше.

В Kafka:

Один из самых важных показателей в Kafka — это лаг (отставание) обработки спанов от реалтайма. Если лаг копится, это значит, что пропускная способность нашей системы ниже, чем нагрузка, которую она воспринимает. Если это кратковременно, то будет  просто всплеск, но если это происходит долго, значит, система не справляется.

В Chproxy:

  • распределение запросов по нодам;

  • количество конкурирующих запросов;

  • количество убитых запросов;

  • время выполнения запросов.

Chproxyдаётмножество интересных метрик, связанных с тем, как аналитические запросы выполняются через Grafana — сколько там пользователей, насколько долго эти запросы выполняются, насколько успешно.

В Jaeger Collector:

По Jaeger мы хотим мониторить различные дропы. Если Jaeger что-то куда-то не может записать, то он не копит эти данные во внутреннем буфере, а данные сбрасывает, то есть они могут пропасть. Поэтому если у вас возникают дропы — это повод искать узкое место.

В Jaeger UI по классике мы должны контролировать 4 золотых сигнала:

  • время ответа (latency);

  • коды ответа;

  • Black box: возможность получения списка сервисов;

  • Black box: поиск трейсов.

Причём как с серверной стороны, так и глазами пользователя. То есть нужно настроить black box-мониторинг Jaeger UI.

К счастью, большинство компонентов нашей схемы (Jaeger, chproxy, Clickhouse) умеют выставлять метрики в формате Prometheus прямо из коробки. Чтобы реализовать black box-мониторинг, можно взять Blackbox Exporter Prometheus и заставить его периодически запрашивать список сервисов и последние трейсы, чтобы убедиться, что и пользователь может это сделать. Так мы смотрим на сервис глазами пользователя.

425759c42ca3621d1e5c7f6569d7f58e.jpg

Все эти метрики собираются в кластер Victoria Metrics. В Grafana у нас построено множество дашбордов по каждому компоненту. Они тоже есть уже готовые в каталоге Grafana, ничего изобретать не нужно. Естественно, у нас есть алертинг. Когда какая-то ключевая метрика выходит за пороговое значение, мы об этом узнаем по тревожному звону телефона.

Выводы

Сначала похвастаюсь метриками:

  • Наш сервис сейчас обрабатывает в среднем 150k спанов/с с пиками до 300 k спанов/с. Эти данные приходят от более чем 1500 подключенных к нам сервисов продуктов.

  • Сейчас в хранилище (на 60 TB с учётом реплик) 180 млрд спанов, данные хранятся 14 дней.

  • Поиск трейсов по тегам занимает ~6 секунд — это не мгновенно, но в разы лучше, чем было с Elasticsearch.

  • Поиск конкретного трейса, если вы знаете его ID, занимает ~200 мс.

Если просуммировать наш опыт, можно дать следующие рекомендации:

→ Используйте Clickhouse для нагрузки больше 50k спанов/с

Если вы строите сервис распредёленной трассировки и рассчитываете на поток 50k спанов/с и больше, логично сразу выбирать Clickhouse, чтобы потом не переходить с одной технологии на другую.

→ Запись в distributed-таблицы может быть менее эффективной, чем запись в локальную таблицу

Если вы пишете через distributed-таблицы, попробуйте без них, возможно, будет лучше.

→ Используйте chproxy для ограничения нагрузки на чтение

Если вам нужно ограничить запросы к Clickhouse на чтение, ввести какие-то квоты и ограничения, chproxy — ваш лучший друг.

→ Удаляйте старые данные частями, а не построчно

→ Разделяйте нагрузку на запись и чтение

Если ваш профиль нагрузки похож на тот, что увидели у нас, возможно, стоит подумать о разделении нагрузки на чтение и запись.

© Habrahabr.ru