Как мы собираем данные для аналитики с помощью Apache NiFi

Привет, Хабр! Мы команда мониторинга и анализа данных биотехнологической компании BIOCAD. Хотим рассказать вам о том, как мы собираем данные для аналитики из практически всех сервисов компании и при этом вполне успешно справляемся без полноценного дата-инженера. Пост будет интересен как тем, кто только ищет решение для ETL, так и тем, кто уже работает с NiFi или другими аналогичными инструментами и желает познакомиться с наработками, идеями и опытом других команд.

611dabb53439c6b377148423e5a591af.jpg

О команде

Наверное, мы похожи на стандартную команду аналитиков данных (BI + DS) в IT-департаменте производственной компании. Работаем в Jira, делаем прототипы с заказчиком в Miro, ведём базу знаний в Confluence, используем Python и SQL для решения ежедневных задач, а итоговую отчётность предоставляем клиентам в Power BI. Исторически сложилось так, что мы сами общаемся с заказчиком, составляем ТЗ, подготавливаем данные, разрабатываем модели, внедряем ML в производство, ну и, конечно, строим дашборды (и не только их).

Этот аспект (вкупе с большим количеством бизнес-направлений) неизбежно приводит к постоянному нашему развитию в различных областях — со временем учишься работать с промышленными данными SCADA, собирать данные из облачного SalesForce, редактировать объекты через API Jira и ещё массе всего.

В итоге это привело к тому, что мы самостоятельно построили аналитическое хранилище данных (PostgreSQL + MongoDB) для предоставления BI отчётности, быстрой отработки различных ad-hoc запросов и решения Data Science задач. Но выделенного дата-инженера у нас нет, и команда самостоятельно выстраивает пайплайны сбора и обработки необходимых данных из 30+ различных сервисов. При таком подходе есть свои плюсы и минусы, нам нравится, как о них написали в блоге Skyeng

О задачах

Четыре задачи, которые являются самыми распространенными в нашей практике, и самостоятельно решаются любым аналитиком данных с помощью NiFi:

  • Забрать данные из MySQL (или любой другой реляционной БД);

  • Записать данные в PSQL;

  • Масштабировать цепочку для сбора-записи данных;

  • Инкрементально собрать данные с оборудования по REST API.

Мы используем NiFi версии 1.9 (1.9.0.1.0.0.0–90) — обратите на это внимание, если планируете опираться на этот пост при настройке собственной инфраструктуры. Разные версии инструмента имеют отличия, связанные с работой процессоров, Controller Service и их атрибутов.

В рамках этого материала мы не будем говорить о базовой функциональности Apache NiFi, поэтому, если вы не знакомы с инструментом, рекомендуем изучить другие материалы по теме.

Забрать данные из MySQL

Одним из самых распространенных источников данных являются различные реляционные БД. Чтобы извлечь из них информацию с помощью Apache NiFi, понадобится всего два процессора: GenerateFlowFile и ExecuteSQL.

image-loader.svg

Первый процессор — служит для запуска всей цепочки. Он формирует FlowFile, который поступает на вход следующего процессора и выполняет роль «триггера». GenerateFlowFile может запускаться по требованию или по расписанию, задаваемому выражением Сron. У нас это происходит в ночное время, чтобы не мешать работе наших сервисов днем. Например, на скриншоте ниже видно, что запуск задачи назначен на 02:50. Мы учитываем специфику работы БД, из которых будут загружаться данные. Такой подход позволяет нам равномерно распределять нагрузку и поддерживать стабильную работу всех задействованных систем.

image-loader.svg

Каждый FlowFile содержит данные (content) и метаданные (attributes), то есть полезную нагрузку для последующих процессоров. В качестве полезной нагрузки FlowFile«а может выступать необходимая нам информация.

Для нашего случая передадим в содержимом FlowFile SQL-скрипт, с помощью которого следующий процессор запросит в БД нужные данные.

SELECT id, title, code, status, result
 FROM lims_test_active;

Для этого используем поле CustomText во вкладке Properties в окне настроек процессора GenerateFlowFile.

image-loader.svg

Если данных становится слишком много для ежедневной перезаписи, следует организовать инкрементальный сбор (хорошо подходит для различных логов, событий, показаний датчиков и т. д). Тогда не придется каждый раз заново собирать все данные, а можно «дособирать» только свежие — например, появившиеся после определённой даты или отсортированные по последнему ключу.

Второй процессор — ExecuteSQL — исполняет передаваемый в него запрос и возвращает полученные данные в формате Avro (формат хранения файлов, применяемый во многих продуктах Apache). Стоит отметить, что вместо ExecuteSQL можно задействовать процессор ExecuteSQLRecord. В зависимости от выбранного Controller Service в качестве RecordWriter он может отдавать FlowFile с содержимым в форматах Avro, JSON, CSV и XML, однако его настройка будет несколько сложнее.

Чтобы процессор ExecuteSQL понял, куда он должен отправить запрос, нужно настроить специальный Controller Service для подключения к базе данных. В настройках (поле Compatible Controller Services) выберем Database Connection Pooling Service. Он управляет открытыми соединениями к БД, создает новые и закрывает старые.

image-loader.svg

В свойствах Controller Service указываем URL-адрес целевой БД (Connection URL), имя класса драйвера (Database Driver Class Name) и путь до драйвера в файловой системе (Database Driver Location). Понять, что нужно писать в поле Class Name, можно по различным инструкциям в сети. В нашем случае мы указали Class Name для JDBC-драйвера — com.mysql.jdbc.Driver — и прописали его расположение на сервере NiFi. Этот драйвер позволяет выполнять SQL-запросы к базе данных и зависит от её типа, поэтому его нужно предварительно загрузить с официального сайта разработчиков [вот ссылка для MySQL, а вот — для PSQL].

image-loader.svg

Далее, остается выбрать настроенный сервис в свойствах процессора и заполнить остальные обязательные (выделены жирным) атрибуты.

image-loader.svg

Далее, объединим процессоры, указав при этом тип соединения с помощью отношения success или failure. Так, процессор будет понимать, куда направлять FlowFile c результатами работы или ошибкой. Теперь осталось запустить цепочку, и если все сделано правильно, то на схеме красные квадратики сменятся на зеленые треугольники.

image-loader.svg

В итоге мы сформировали цепочку из двух процессоров, которые получают таблицу целиком из внешней базы данных MySQL. Остается записать эти данные в новую базу данных для дальнейшей обработки и использования.

Записать данные в PSQL

Теперь, когда у нас есть собранные данные в формате Avro, запишем их в нашу БД PostgreSQL. Сначала создадим таблицу:

CREATE TABLE lims_test_active (
	id 		  int,
	title 	text, 
	code 	  text, 
	status 	text,
	result 	int
);

Для случаев в будущем (когда наша цепочка уже записала данные в таблицу), нужно предусмотреть удаление старых данных, перед перезаписью используем процессор PutSQL для выполнения TRUNCATE lims_test_active.

image-loader.svg

Далее, обратимся к процессору PutDatabaseRecord для записи новых данных. Требуется создать еще один Database Connection Pooling Service для подключения к БД, в которую мы будем записывать данные. Также пару слов тут нужно сказать про Record Reader. Это Controller Service для получения схемы данных из FlowFile (его контента или атрибутов).

image-loader.svg

Перейдем в настройки Record Reader, где в качестве значения для свойства Schema Access Strategy выберем встроенную в Avro схему данных (Use Embedded Avro Schema) и пропишем строку ${avro.schema} в блок Schema Text. Мы выбираем Use Embedded Avro Schema, так как до этого использовали процессор, получающий данные в формате Avro. На выбор стратегии влияет то, куда эта схема записана в пришедшем FlowFile (почитать про настройку можно в официальной документации, например, в описании AvroReader, или на тематических площадках). В свою очередь, содержание строки Schema Text (или Schema Name) зависит от того, в какой из этих атрибутов ранее была помещена схема.

image-loader.svg

Соединяем процессоры между собой с помощью отношения success, не забывая при этом «выключить» другие отношения (auto terminate). Это нужно сделать для того, чтобы процессор не выдавал ошибку типа: «Realationship X is invalid because Relationship X is not connected to any component and is not auto-terminated» (не все активные соединения знают, что им делать).

Отключить отношения можно вот так:

image-loader.svg

Схема принимает такой вид:

image-loader.svg

Если все прошло успешно, то мы увидим в нашей БД таблицу с данными, готовыми для дальнейшей обработки и анализа.

Конечно, если таблиц и сервисов много, то не стоит делать такую цепочку под каждую таблицу, это займет много времени и будет очень затратно при поддержке. Поэтому далее расскажем, как можно масштабировать этот пайплайн, для работы с десятками и даже сотнями таблиц.

Масштабировать цепочку для сбора-записи данных

В рамках этого поста мы опустим ряд моментов, связанных с логированием, мониторингом, разделением и слиянием больших файлов, а также некоторыми настройками процессоров. Если у вас возникнут вопросы, мы постараемся ответить на них в комментариях.

Основная идея масштабирования состоит в том, чтобы держать список таблиц в одном месте, а при его обработке разделять и распараллеливать сборы. Мы храним список таблиц и запросов в XML, но подойдут и другие структуры данных вроде JSON или простого текста с разделителями. Вот так выглядит наш журнал для сбора таблиц из БД:

image-loader.svg

Структура документа следующая: теги задают начало и конец списка таблиц, а теги  — структуру одной таблицы. Теги определяют SQL скрипт, а теги хранят в себе наименование таблицы для сохранения данных, полученных в результате выполнения SQL запроса.

Двумя важными компонентами масштабируемого пайплайна являются процессоры SplitXMLиEvaluateXPath.

image-loader.svg

Первый нужен, чтобы разделить FlowFile с изначальным XML на несколько файлов — по одному на каждую таблицу (теги ). Второй выполняет XPath-запросы (язык запросов к элементам XML-документа) по отношению к содержимому FlowFile. Их результаты записывает в соответствующие атрибуты FlowFile. Другими словами, процессор нужен для того, чтобы определенную информацию из контента переложить в атрибуты collection и query, которые теперь можно использовать в любом месте.

image-loader.svg

После того как EvaluateXPath выполнил свою задачу, в дело вступает ExecuteSQL, который реализует полученный на предыдущем шаге SQL-запрос (атрибут query) для загрузки нужных данных из БД.

image-loader.svg

На этом шаге можно настроить количество параллельно исполняемых задач процессором. Это указывается во вкладке Scheduling в настройках процессора ExecuteSQL.

image-loader.svg

Далее, используем PutSQL для очистки старых данных в нужной таблице БД, исполняя через него команду TRUNCATE, ее передаем в атрибуте SQL Statement, используя Expression Language для подстановки нужного имени таблицы.

image-loader.svg

Для вставки полученных данных в нужную таблицу используем процессор PutDatabaseRecord с настройками INSERT и указанием целевой таблицы ${collection}.

image-loader.svg

В итоге мы получили масштабируемую EL-цепочку, которую можно использовать для сбора данных из множества таблиц.

image-loader.svg

Инкрементально собрать данные с оборудования по REST API

Основная идея инкрементального сбора заключается в том, чтобы собирать только новые данные и копить их у себя в БД. Например, ночью мы получаем данные за прошлый день и сохраняем их.

Каждый день мы обновляем список серий, ошибок, измерений, калибровок, событий и других параметров с различных сервисов, систем на производстве и в лабораториях. Для получения этих данных можно использовать различные промышленные протоколы например OPC UA, но благодаря усилиям коллег, занимающихся разработкой АСУ ТП, мы можем забирать их через REST API. Если интересно, то можно почитать их пост о системе управления биореакторами на базе openSCADA и отечественных контроллеров.

Далее — общий вид ETL-цепочки для этого кейса:

image-loader.svg

В этом случае GenerateFlowFile задает список параметров в XML — так же, как и в предыдущем кейсе.

image-loader.svg

Далее, в работу вступают уже знакомые процессоры SplitXML и EvaluateXPath. Мы разбиваем XML на разные FlowFile и записываем их содержимое в атрибуты device_name и parameter.

image-loader.svg

Следующий шаг — получить последнюю дату успешного сбора данных по конкретному датчику. Выполним этот запрос с помощью ExecuteSQLRecord и получим последнее значение end_time из нашей таблицы с логами.

image-loader.svg

Далее, передаем эту информацию в EvaluateJsonPath, который запишет её в атрибут start.

image-loader.svg

Значение даты в start пригодится нам для реализации нового запроса к данным. Для формирования его тела используем процессор ReplaceText.

image-loader.svg

Как вы можете видеть, в параметр end_time мы подставляем текущую дату, используя now () из Expression language.

POST-запрос в REST API исполняет процессор InvokeHTTP.

image-loader.svg

Чтобы привести полученные данные в необходимый вид перед записью в БД, используем процессор JoltTransformJSON. Отладить Jolt-трансформации удобно в песочнице на сайте Jolt Transform Demo.

image-loader.svg

Дальнейшие шаги по добавлению новых полей, логированию и записи в БД тут опустим, думаем, что принцип уже понятен. Такая ETL-цепочка позволяет нам ежедневно собирать данные в PSQL через REST API и довольно легко масштабируется на новые сервисы и датчики.

Приходилось ли вам решать аналогичные задачи? Если у вас есть предложения, как можно улучшить наши пайплайны или есть вопросы — напишите, пожалуйста, в комментарии, интересно узнать о вашем опыте работы с дата-инженерами в вашей команде.

Пост для вас готовили Василий Вологдин и Александр Тимаков.

О чем еще мы пишем в блоге BIOCAD на Хабре:

© Habrahabr.ru