Интеграция Apache NiFi и Atlas: Настройка в Docker и Создание Пользовательского Репортера28.08.2024 14:45
Ремарка
Обратите внимание, что методы и подходы, описанные в этой статье, являются одним из способов решения возникших проблем и могут не являться наилучшим или рекомендуемым вариантом для всех ситуаций. Автор не призывает следовать описанным шагам без учета возможных альтернатив и не гарантирует, что данный метод подойдет для всех пользователей или случаев. Учитывайте, что использование нестандартных решений может повлечь за собой риски или потребовать дополнительных усилий для поддержки и обновлений. Рекомендуется всегда исследовать и оценивать возможные варианты решений в контексте вашего проекта и требований.
Как запустить Apache Atlas в Docker
Первое, с чем я столкнулась, — это то, что большинство (возможно, не все, я, конечно, не проверяла) образов Apache Atlas не работают из коробки.
Каждый из них падал с следующей ошибкой:
14: curl#6 - "Could not resolve host: mirrorlist.centos.org; Unknown error"
One of the configured repositories failed (Unknown),
and yum doesn't have enough cached data to continue. At this point the only
safe thing yum can do is fail. There are a few ways to work "fix" this:
1. Contact the upstream for the repository and get them to fix the problem.
2. Reconfigure the baseurl/etc. for the repository, to point to a working
upstream. This is most often useful if you are using a newer
distribution release than is supported by the repository (and the
packages for the previous distribution release still work).
3. Run the command with the repository temporarily disabled
yum --disablerepo= ...
4. Disable the repository permanently, so yum won't use it by default. Yum
will then just ignore the repository until you permanently enable it
again or use --enablerepo for temporary usage:
yum-config-manager --disable
or
subscription-manager repos --disable=
5. Configure the failing repository to be skipped, if it is unavailable.
Note that yum will try to contact the repo. when it runs most commands,
so will have to try and fail each time (and thus. yum will be be much
slower). If it is a very temporary problem though, this is often a nice
compromise:
yum-config-manager --save --setopt=.skip_if_unavailable=true
Об этой ошибке чуть позже.
Я начала искать готовые Docker-файлы на GitHub.
Однако они также сталкивались с той же ошибкой.
Ошибка связана с тем, что Docker не смог найти или разрешить хост для репозитория mirrorlist.centos.org. Это привело к сбою команды yum, используемой для установки пакетов внутри контейнера Docker. Подобные ошибки могут возникать, если репозиторий больше не поддерживается или временно недоступен.
Остается только написать свои Docker-файлы. Я взяла за основу файлы из данного репозитория (уточню, что я использовала не последнюю версию, а на один коммит назад от версии на 27.08.2024, так как последнюю версию не удалось запустить в принципе), поскольку на него ссылались в данной статье.
Первое, что мы делаем — убираем все лишнее из docker-compose.yml.
В оригинале, помимо Atlas и сервисов, необходимых для его работы, также поднимались Spark, Hive, Hadoop (DataNode, NameNode). Все это оказалось ненужным, поэтому мы просто вычеркиваем это из файла, а также удаляем папки Spark и Hive. Оставляем только необходимое: сам Apache Atlas, Apache Kafka и Apache Zookeeper.
Далее мы переходим к Dockerfile, который собирает Apache Atlas в контейнере. Для устранения ошибки достаточно прописать архивное зеркало для CentOS сразу после строки FROM centos:7.
FROM centos:7
COPY --from=stage-atlas /apache-atlas.tar.gz /apache-atlas.tar.gz
#новые строчки с архивным зеркалом centos
RUN sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo \
&& sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo \
&& sed -i s/^mirrorlist=http/#mirrorlist=http/g /etc/yum.repos.d/*.repo
И можно запускать docker-compose up -d. Однако возникает следующая ошибка:
Command »/usr/bin/python3 -u -c «import setuptools, tokenize;__file__='/tmp/pip-build-f8uu6b5l/typed-ast/setup.py'; f=getattr (tokenize, 'open', open)(__file__); code=f.read ().replace ('\r\n', '\n'); f.close (); exec (compile (code, __file__, 'exec'))» install --record /tmp/pip-artr5er7-record/install-record.txt --single-version-externally-managed --compile» failed with error code 1 in /tmp/pip-build-f8uu6b5l/typed-ast/ ERROR: Service 'atlas-server' failed to build: The command '/bin/sh -c pip3 install amundsenatlastypes==1.1.0' returned a non-zero code: 1
Ошибка связана с неудачной установкой Python-пакета amundsenatlastypes. Решение еще проще: в Dockerfile в строке RUN pip3 install amundsenatlastypes==1.1.0 измените версию на 1.2.2, и снова запустите docker-compose up -d. На этот раз все будет работать. Можно перейти по адресу http://localhost:21000/ и открыть веб-интерфейс Apache Atlas. Отмечу, что запуск может занять несколько минут; в первый раз у меня это заняло около 10 минут, но последующие запуски происходят гораздо быстрее.
Apache Atlas Web UI
Apache Atlas заработал, готовые файлы для запуска можно взять тут
поднимает Apache NI-Fi
Здесь проблем нет. Создайте файл docker-compose.yml со следующим содержанием и запустите его командой docker-compose up -d:
После этого достаточно перейти по адресу http://localhost:8080, и должен открыться веб-интерфейс NiFi. Отмечу, что для запуска может понадобиться несколько минут.
Apache NI-Fi Web UI
Постановка задачи
Задача состоит в том, чтобы создать репортер между NiFi и Atlas. Для этой задачи существует стандартный репортер под названием ReportLineageToAtlas (в Docker-образе по умолчанию данного плагина нет; в конце будет небольшой абзац о том, как добавлять плагины). Создадим в NiFi небольшой DAG и настроим репортер в Atlas.
даг
настроеный репортер
результат в apache Atlas
Как мы видим, информация о DAG отображается в Atlas, но проблема в том, что отображается вся информация. Мне нужно скрыть определенные стадии DAG, чтобы их не было в Atlas. Таким образом, моя задача — создать репортер, который будет отделять, что нужно отображать, а что нет в Atlas. Мы введем правило, что отображаться будут только те стадии, в названиях которых в конце присутствует _to_atlas.
Все pom.xml файлы готовы, и остается только написать код репортера.
Создаю класс PicoReportLineageToAtlas и копирую в него код класса org.apache.nifi.atlas.reporting.ReportLineageToAtlas.
Меняю в нем:
package org.apache.nifi.atlas.reporting на package pico.habr.nifi.atlas.reporting
Название самого репортера с ReportLineageToAtlas на PicoReportLineageToAtlas
Немного обновляю теги.
// было
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
" End-to-end lineages across NiFi environments and other systems can be reported if those are" +
" connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
" Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
" in addition to NiFi provenance events providing detailed event level lineage." +
" See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {
//...
}
// стало
@Tags({"pico", "atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
" End-to-end lineages across NiFi environments and other systems can be reported if those are" +
" connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
" Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
" in addition to NiFi provenance events providing detailed event level lineage." +
" See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class PicoReportLineageToAtlas extends AbstractReportingTask {
//...
}
Далее задача — найти, где в коде ставится фильтрация по имени компонентов дага.
Как оказалось, за это отвечает другой класс, а именно org.apache.nifi.atlas.NiFiFlowAnalyzer. Поэтому создаем еще один класс PicoNiFiFlowAnalyzer и аналогично копируем в него код. После этого вносим несколько правок.
Остается только в PicoReportLineageToAtlas подменить один класс на другой.
import org.apache.nifi.atlas.NiFiFlowAnalyzer
//меняем на
import pico.habr.nifi.atlas.PicoNiFiFlowAnalyzer;
//и
final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
//меняем на
final PicoNiFiFlowAnalyzer flowAnalyzer = new PicoNiFiFlowAnalyzer();
Остается только в resources создать файл org.apache.nifi.reporting.ReportingTask со следующим содержимым:
Он содержит в себе только путь до класса репортера. Этот файл используется для регистрации пользовательских реализаций интерфейса ReportingTask в NiFi. Он следует спецификации Java Service Provider Interface (SPI), которая позволяет динамически находить и загружать реализации интерфейсов или абстрактных классов во время выполнения.
Остается только собрать проект командой mvn clean install.
И тогда в модуле pico-reporting-task-atlas-nar появится .nar архив pico-reporting-task-atlas-nar-1.20.0.nar.
Чтобы подключить .nar архив в NIFI, достаточно прокинуть его в папку lib, которая должна находиться в корневой папке NIFI. В случае с Docker можно воспользоваться командой docker cp. После этого достаточно перезагрузить NIFI или контейнер целиком.
Результат
заходим в панель управления NIFI и находим там кастомный репортер. Настраиваем его в соответствии с требованиями. (настраиваеться так же как и оригинальный)
Для примера создам следующий даг
И смотрим что отобразилось в Atlas
И как видим в Атласе процессора D не появилось, поскольку в нем нету необходимого суффикса