Как с нуля построить систему обработки событий

Здравствуйте!

Александр Шувалов и Юлиян Латыпов поделились с вами опытом создания системы обработки событий в потоке данных для обогащения информации и выявления аномалий.

Если вы ранее не были знакомы с приведенными ниже терминами, рекомендую прочесть следующие статьи.

Для начала стоит добавить немного контекста, мы занимаемся продуктом категории  UEBA. Данные это логи с устройств, на которых работают сотрудники компании.

К нам поступила следующая задача, перед записью данных в целевую таблицу в потоке проводить обогащение событий дополнительной информацией и выявлять аномалии в данных.

4b0c77893fba7c80cbbfa9601c62e377.png

Немного о потоке

Нам необходимо обрабатывать 80 тысяч сообщений в секунду, все это логи с разных устройств (Windows, Linux, Mac) и специализированного оборудования (сетевые маршрутизаторы).

Возможно, вы уже сталкивались с различными материалами по потоковой обработке, что не удивительно, так как тема популярна в эпоху Big Data. Однако, почти каждый такой материал, апеллирует своими критериями/требованиями к данным, скорости обработки и т.д. Из-за чего, сложно найти готовое описание решения под конкретную задачу, которую перед вами поставили. Именно с этим мы и столкнулись.

В нашем случае, ключевыми факторами потока было:

1) Обеспечить корректный таймлайн событий. Иначе часть сервисов обогащения будет давать неверный результат;

2) Обработка событий большого объема, что также уменьшает скорость consume/produce и нагружает канал передачи данных.

Перед нами стояли следующие задачи:

  1. Определение типа события и фильтрация:

  • 45 типов события, определяются по содержанию сообщения

  • отфильтрованные события попадают в обработку

  1. Обогащение данных:

  • определение зрелости по пользователю и хосту. В нашем случае, зрелость — это состояние пользователя/хоста, когда для него удалось набрать определенное число событий для каждого типа события

  • обогащение историческими данными

  • определение геолокации по IP-адресу

  • определение сетевой зоны по IP-адресу

  • определение группы устройств

  1. Определение аномалий:

  • первая активность пользователя

  • бездействующий пользователь. Для нас пользователь считается бездействующим, если от него не поступало событий достаточно длительный промежуток времени

Архитектура

Прежде чем перейти к рассказу об архитектуре, стоит объяснить, почему мы не использовали готовые решения, такие как Kafka Streams, Apache Flink или Spark Streaming:

  • Первое, новый компонент в системе потребовал бы дополнительных усилий на его освоение, настройку и поддержку. Это дополнительные ресурсы и время, что для нас не всегда приемлемо.

  • Гибкость возможностей реализации из-за активной стадии разработки продукта

Поэтому мы решили разрабатывать собственные решения, которые обеспечат нам гибкость в построении архитектуры и позволят контролировать всё, что происходит внутри сервисов.

На первой итерации мы разработали решения на монолитной архитектуре, так как такой подход обеспечил высокую скорость разработки и выдачи MVP версии, что нам и было нужно.

Процесс выглядел следующим образом:

4ea2db9719e4a2ef58fb095a57b949fd.png

Мы предполагали, что сможем легко масштабировать сервисы, добавив дополнительные инстансы, чтобы увеличить скорость обработки и справиться с любым потоком данных.

Однако возникли следующие сложности:

  • Нагрузка на различные виды задач была неравномерной, и, добавляя отдельный инстанс, мы могли задействовать больше ресурсов, чем требовалось.

  • Отказоустойчивость была на низком уровне: отслеживать работу всех процессов и перезапускать отдельные компоненты было сложно.

  • Можно было легко построить границы необходимых для работы модулей, причем было известно, что они не будут меняться со временем.

Поэтому мы решили перестроить архитектуру обогащатора на микросервисы, пока не стало слишком поздно.

025171ec7627a06cc35ee7e46490a997.png

Проблемы и их решения

Определение типа события и Фильтрация
Первоначальное решение было реализовано на Python, это работало довольно медленно из-за большого объема сообщений. Нам бы пришлось поднимать большое количество инстансов, что увеличивало нагрузку на Kafka и потребовало больше ресурсов.

e903ad6a6bbbd6a66f3e80ba3d93f5e1.png

Перенесли решение на Clickhouse вместо Kafka и выполняли задачу с помощью материализованных представлений, через простые правила. В результате мы смогли улучшить производительность системы без увеличения количества инстансов и избыточной нагрузки на инфраструктуру.

fd2e2f461325773483e9e01334cbf47d.png

Если есть способ производить вычисления в базе данных, то нужно это использовать.

Timeline
Как отмечалось ранее, в обработку должны поступать события формирующие корректный временной ряд. Одним из ключевых аспектов этого процесса является сортировка входящих данных.

Изначально для сортировки данных, поступающих к нам за различный период времени, мы решили использовать очередь с приоритетом. Мы разработали прототип и протестировали его на наших данных. Хотя решение работало, оно имело свои недостатки. Главным недостатком было большое использование оперативной памяти, которое росло с увеличением времени ожидания сообщений. Соответственно, чем дольше ожидание, тем больше требуется памяти и медленнее работает сервис. 

4c80a967faa56ad10583ea2f46688ec2.png

В итоге, мы остановились на достаточно простой в реализации идее. Поскольку данные сразу поступают в Clickhouse, мы можем запрашивать их в уже отсортированном виде «из прошлого», то есть те данные, произошедшие 2 минуты назад, и отправлять их в Kafka.

ae572759e23c89fd001f491cd060b4d9.png

Вывод такой же

1103cd81f76c1da61ba346f344c93d36.png

Размер сообщений
Большие размеры сообщений стали для нас серьезной проблемой, поскольку они препятствовали достижению необходимой скорости обработки потоков данных. Высокая нагрузка на сеть приводила к значительным задержкам.

Первоначально мы попытались решить проблему с помощью стандартных средств сжатия данных в Kafka, но это не дало ожидаемых результатов. 

После обратились к подходу Event Driven Architecture (EDA) для взаимодействия между сервисами, чисто event-ориентированный подход нам не подходил из-за высокой нагрузки на IO-операции с базой данных. Мы пришли к использованию дифференциальной передачи информации (diff). Этот метод позволяет передавать только полезную нагрузку события, нужную другим сервисам. 

Мы стали обрабатывать только ту часть данных события, которая необходима для наших аналитических задач. Это позволило сократить объём передаваемого сообщения до 200–300 байт. Хотя данное решение значительно улучшило ситуацию с трафиком, оно также породило новую проблему объединения данных, о которой мы расскажем далее.

30d65f35e3ed45b2f2aa096b1239f31a.png

Нужно найти баланс между производительностью сервиса и задержками на обработку сообщений, оценив максимальный полезный объем передаваемых данных и то, что можно оставить для получения сервисом из других источников, таких как БД.

Масштабирование и отказоустойчивость
Наши системы делятся на stateless и stateful  сервисы. Stateless сервисы работают легко и их также легко масштабировать, поскольку они не хранят состояние между запросами. Это делает их идеальными для распределенной обработки данных. Stateful сервисы выполняют куда более сложную задачу. Сервисы этого типа хранят состояние между запросами, что требует сохранения дампа состояний в долговременную память и эффективную синхронизацию между этими инстансами.

Проблема с синхронизацией состояния решается направлением потока в определенные инстансы по ключевым полям события — роутер. Этот компонент может разграничить потоки обработки данных на изолированные группы. Такой подход обеспечит более эффективное управление ресурсами и снизит риски возникновения проблем с синхронизацией состояния.

d54cf9802ef2a4e43af6fd67aa45fc1d.png

Для обеспечения отказоустойчивости можно поднимать два инстанса, первый из которых выполняет полезную задачу, а второй опрашивает первый о его состоянии, в случае сбоя первого, второй сервис создает процесс и занимает место закончившего работу сервиса.

70194217c874cfb3333115ec4b37918b.png

Проектируйте архитектуру сервиса с учетом возможности масштабирования и обеспечения отказоустойчивости.

Объединение diff

Итоговая вставка, у нас есть большое событие и diff — наши обогащенные данные в процессе анализа. Мы знали, что ClickHouse хорошо работает с MatView и плохо с JOIN. Многие решения работали медленно и забивали оперативную память. 

6d880d37791ebd5db5353f237d7b92c4.png

Слева обычный JOIN, при больших объемах таблицы B забьет оперативную память. Эту проблему удалось решить подзапросом, который уменьшает выборку данных из таблицы B для JOIN.

У нас это работает так. Есть Kafka таблица в ClickHouse. Теперь мы отправляем весь diff в очередь Kafka и настраиваем потребителя в ClickHouse, чтобы обрабатывать эти данные. Потребитель получает данные из очереди, определяет объемы для обработки и добавляет их в materialized view, которая выступает в роли триггера. В этой view происходит JOIN с использованием подзапросов по ID события, что уменьшает выборку большой таблицы, такие JOIN`ы работают быстро. Этот подход позволяет избежать прежних проблем с производительностью и перегрузкой памяти, предоставляя ClickHouse возможность оптимально распределять нагрузку и эффективнее использовать ресурсы.

9d9ce7bf13cd62e28d2246c863ec3561.png

«Также информация для справки: обнаружили, что Clickhouse при запросе резервирует на поле по 5 мб памяти, что при частых запросах убивает ОЗУ.»

Изучайте документацию, порой там скрыты решения ваших проблем, что значительно ускорит разработку

Архитектура

Архитектура

Видно, что все сервисы выстроены последовательно: каждый последующий должен отработать после предыдущего. Возможно, это не самый оптимальный подход, ведь некоторые сервисы можно запускать параллельно. Однако такой вариант позволил нам избежать необходимости в конечном сервисе синхронизации, который сопоставлял бы данные всех сервисов. Это создало бы дополнительную нагрузку и увеличило вероятность ошибок.

ca561e6a2ce5a3e0aafd489f3f923c82.png

Преимущества текущего подхода

Поскольку предложенный вариант позволяет обеспечить и поддерживать необходимый нам поток данных, мы остановились на нём. При этом мы предусмотрели возможность масштабирования каждого участка схемы. Если поток данных значительно возрастет, мы сможем перевести процесс в параллельную обработку, добавив сервис синхронизации.

Таким образом, последовательное построение сервисов позволяет нам избежать лишней сложности и ошибок, обеспечивая стабильную и эффективную обработку данных. Это решение гибко и может легко адаптироваться к изменениям в объеме и скорости поступающих данных.

18595ad5dc11158ece18be87588f440f.png

Подписывайтесь на телеграмм канал Crosstech Solutions Group, чтобы всегда быть первым в курсе событий!

© Habrahabr.ru