Интеграция Apache NiFi и Atlas: Настройка в Docker и Создание Пользовательского Репортера
Ремарка
Обратите внимание, что методы и подходы, описанные в этой статье, являются одним из способов решения возникших проблем и могут не являться наилучшим или рекомендуемым вариантом для всех ситуаций. Автор не призывает следовать описанным шагам без учета возможных альтернатив и не гарантирует, что данный метод подойдет для всех пользователей или случаев. Учитывайте, что использование нестандартных решений может повлечь за собой риски или потребовать дополнительных усилий для поддержки и обновлений. Рекомендуется всегда исследовать и оценивать возможные варианты решений в контексте вашего проекта и требований.
Как запустить Apache Atlas в Docker
Первое, с чем я столкнулась, — это то, что большинство (возможно, не все, я, конечно, не проверяла) образов Apache Atlas не работают из коробки.
Каждый из них падал с следующей ошибкой:
14: curl#6 - "Could not resolve host: mirrorlist.centos.org; Unknown error"
One of the configured repositories failed (Unknown),
and yum doesn't have enough cached data to continue. At this point the only
safe thing yum can do is fail. There are a few ways to work "fix" this:
1. Contact the upstream for the repository and get them to fix the problem.
2. Reconfigure the baseurl/etc. for the repository, to point to a working
upstream. This is most often useful if you are using a newer
distribution release than is supported by the repository (and the
packages for the previous distribution release still work).
3. Run the command with the repository temporarily disabled
yum --disablerepo= ...
4. Disable the repository permanently, so yum won't use it by default. Yum
will then just ignore the repository until you permanently enable it
again or use --enablerepo for temporary usage:
yum-config-manager --disable
or
subscription-manager repos --disable=
5. Configure the failing repository to be skipped, if it is unavailable.
Note that yum will try to contact the repo. when it runs most commands,
so will have to try and fail each time (and thus. yum will be be much
slower). If it is a very temporary problem though, this is often a nice
compromise:
yum-config-manager --save --setopt=.skip_if_unavailable=true
Об этой ошибке чуть позже.
Я начала искать готовые Docker-файлы на GitHub.
Однако они также сталкивались с той же ошибкой.
Ошибка связана с тем, что Docker не смог найти или разрешить хост для репозитория mirrorlist.centos.org
. Это привело к сбою команды yum
, используемой для установки пакетов внутри контейнера Docker. Подобные ошибки могут возникать, если репозиторий больше не поддерживается или временно недоступен.
Остается только написать свои Docker-файлы. Я взяла за основу файлы из данного репозитория (уточню, что я использовала не последнюю версию, а на один коммит назад от версии на 27.08.2024, так как последнюю версию не удалось запустить в принципе), поскольку на него ссылались в данной статье.
Первое, что мы делаем — убираем все лишнее из docker-compose.yml
.
В оригинале, помимо Atlas и сервисов, необходимых для его работы, также поднимались Spark, Hive, Hadoop (DataNode, NameNode). Все это оказалось ненужным, поэтому мы просто вычеркиваем это из файла, а также удаляем папки Spark и Hive. Оставляем только необходимое: сам Apache Atlas, Apache Kafka и Apache Zookeeper.
Итого получаем:
version: "2"
services:
atlas-server:
build: ./atlas
image: pico/apache-atlas
volumes:
- ./atlas/resources/1000-Hadoop:/opt/atlas/models/1000-Hadoop
ports:
- "21000:21000"
depends_on:
- "zookeeper"
- "kafka"
zookeeper:
image: wurstmeister/zookeeper
hostname: zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
ports:
- "9092:9092"
hostname: kafka
environment:
KAFKA_CREATE_TOPICS: "create_events:1:1,delete_events:1:1,ATLAS_HOOK:1:1"
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
Далее мы переходим к Dockerfile
, который собирает Apache Atlas в контейнере. Для устранения ошибки достаточно прописать архивное зеркало для CentOS сразу после строки FROM centos:7
.
FROM centos:7
COPY --from=stage-atlas /apache-atlas.tar.gz /apache-atlas.tar.gz
#новые строчки с архивным зеркалом centos
RUN sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo \
&& sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo \
&& sed -i s/^mirrorlist=http/#mirrorlist=http/g /etc/yum.repos.d/*.repo
И можно запускать docker-compose up -d
. Однако возникает следующая ошибка:
Command »/usr/bin/python3 -u -c «import setuptools, tokenize;__file__='/tmp/pip-build-f8uu6b5l/typed-ast/setup.py'; f=getattr (tokenize, 'open', open)(__file__); code=f.read ().replace ('\r\n', '\n'); f.close (); exec (compile (code, __file__, 'exec'))» install --record /tmp/pip-artr5er7-record/install-record.txt --single-version-externally-managed --compile» failed with error code 1 in /tmp/pip-build-f8uu6b5l/typed-ast/ ERROR: Service 'atlas-server' failed to build: The command '/bin/sh -c pip3 install amundsenatlastypes==1.1.0' returned a non-zero code: 1
Ошибка связана с неудачной установкой Python-пакета amundsenatlastypes
. Решение еще проще: в Dockerfile
в строке RUN pip3 install amundsenatlastypes==1.1.0
измените версию на 1.2.2
, и снова запустите docker-compose up -d
. На этот раз все будет работать. Можно перейти по адресу http://localhost:21000/ и открыть веб-интерфейс Apache Atlas. Отмечу, что запуск может занять несколько минут; в первый раз у меня это заняло около 10 минут, но последующие запуски происходят гораздо быстрее.
Apache Atlas Web UI
Apache Atlas заработал, готовые файлы для запуска можно взять тут
поднимает Apache NI-Fi
Здесь проблем нет. Создайте файл docker-compose.yml
со следующим содержанием и запустите его командой docker-compose up -d
:
version: '3'
services:
nifi:
image: apache/nifi:1.20.0
container_name: nifi
ports:
- "8080:8080"
- "8443:8443"
- "8081:8081"
volumes:
- nifi-data:/opt/nifi/nifi-current/logs
environment:
- NIFI_WEB_HTTP_PORT=8080
- NIFI_WEB_HTTPS_PORT=8443
- NIFI_WEB_HTTP_HOST=0.0.0.0
- NIFI_WEB_PROXY_HOST=localhost
restart: unless-stopped
volumes:
nifi-data:
driver: local
После этого достаточно перейти по адресу http://localhost:8080, и должен открыться веб-интерфейс NiFi
. Отмечу, что для запуска может понадобиться несколько минут.
Apache NI-Fi Web UI
Постановка задачи
Задача состоит в том, чтобы создать репортер между NiFi
и Atlas. Для этой задачи существует стандартный репортер под названием ReportLineageToAtlas
(в Docker-образе по умолчанию данного плагина нет; в конце будет небольшой абзац о том, как добавлять плагины). Создадим в NiFi
небольшой DAG и настроим репортер в Atlas.
даг
настроеный репортер
результат в apache Atlas
Как мы видим, информация о DAG отображается в Atlas, но проблема в том, что отображается вся информация. Мне нужно скрыть определенные стадии DAG, чтобы их не было в Atlas. Таким образом, моя задача — создать репортер, который будет отделять, что нужно отображать, а что нет в Atlas. Мы введем правило, что отображаться будут только те стадии, в названиях которых в конце присутствует _to_atlas
.
Создание плагина
Создадим файл pom.xml
со следующим содержимым:
Ni-Fi
pack
pom
1.20.0
1.20.0
2.9
reporting-task-atlas
reporting-task-atlas-nar
Определим зависемости
org.slf4j
slf4j-api
1.7.12
org.apache.nifi
nifi-api
${nifi.version}
provided
org.apache.nifi
nifi-processor-utils
1.15.3
org.apache.nifi
nifi-reporting-utils
${nifi.version}
org.apache.commons
commons-io
1.3.2
Компоненты Apache NiFi
следует упаковывать в .nar
архивы, и для этого необходимо подключить несколько плагинов.
org.apache.nifi
nifi-nar-maven-plugin
1.5.0
true
org.apache.maven.plugins
maven-surefire-plugin
2.15
org.apache.maven.plugins
maven-compiler-plugin
17
Теперь можно собирать .nar
архивы. Переходим в модуль pico-reporting-task-atlas-nar и создаем там файл pom.xml
со следующим содержимым:
pico.habr
PicoReportLineageToAtlas
1.20.0
pico-reporting-task-atlas-nar
nar
reporting-task-atlas-nar
UTF-8
pico.habr
reporting-task-atlas
1.20.0
compile
org.apache.nifi
nifi-standard-services-api-nar
1.20.0
nar
В этом модуле больше ничего трогать не нужно, и при запуске сборки проекта в этой директории появится .nar
архив.
Переходим в модуль reporting-task-atlas
и создаем там файл pom.xml
со следующим содержимым:
pico.habr
PicoReportLineageToAtlas
1.20.0
reporting-task-atlas
1.20.0
jar
org.apache.nifi
nifi-processor-utils
org.slf4j
slf4j-api
org.apache.nifi
nifi-api
org.apache.nifi
nifi-reporting-utils
org.apache.nifi
nifi-atlas-reporting-task
${nifi.version}
org.apache.nifi
nifi-ssl-context-service-api
${nifi.version}
org.apache.nifi
nifi-kerberos-credentials-service-api
${nifi.version}
compile
org.apache.nifi
nifi-persistent-provenance-repository
${nifi.version}
org.jetbrains
annotations
RELEASE
compile
org.projectlombok
lombok
1.18.26
provided
Все pom.xml
файлы готовы, и остается только написать код репортера.
Создаю класс PicoReportLineageToAtlas и копирую в него код класса org.apache.nifi.atlas.reporting.ReportLineageToAtlas
.
Меняю в нем:
package org.apache.nifi.atlas.reporting
наpackage pico.habr.nifi.atlas.reporting
Название самого репортера с
ReportLineageToAtlas
на PicoReportLineageToAtlasНемного обновляю теги.
// было
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
" End-to-end lineages across NiFi environments and other systems can be reported if those are" +
" connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
" Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
" in addition to NiFi provenance events providing detailed event level lineage." +
" See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {
//...
}
// стало
@Tags({"pico", "atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
" End-to-end lineages across NiFi environments and other systems can be reported if those are" +
" connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
" Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
" in addition to NiFi provenance events providing detailed event level lineage." +
" See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class PicoReportLineageToAtlas extends AbstractReportingTask {
//...
}
Далее задача — найти, где в коде ставится фильтрация по имени компонентов дага.
Как оказалось, за это отвечает другой класс, а именно org.apache.nifi.atlas.NiFiFlowAnalyzer
. Поэтому создаем еще один класс PicoNiFiFlowAnalyzer и аналогично копируем в него код. После этого вносим несколько правок.
//было
package org.apache.nifi.atlas;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class NiFiFlowAnalyzer {
//...
}
//меняем на
package pico.habr.nifi.atlas; //замена групы
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.atlas.NiFiFlow; //добовляю недостающию зависемость
import org.apache.nifi.atlas.NiFiFlowPath; //добовляю недостающию зависемость
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class NiFiFlowAnalyzer { //меняю название класса
//...
}
Находим в коде метод analyzeProcessGroup
и вносим правки.
// было
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {
processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
processGroupStatus.getProcessorStatus().forEach(p -> nifiFlow.addProcessor(p));
processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
processGroupStatus.getInputPortStatus().forEach(p -> nifiFlow.addInputPort(p));
processGroupStatus.getOutputPortStatus().forEach(p -> nifiFlow.addOutputPort(p));
// Analyze child ProcessGroups recursively.
for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
analyzeProcessGroup(child, nifiFlow);
}
}
// правки
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {
processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
processGroupStatus.getProcessorStatus()
.stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
.forEach(p -> nifiFlow.addProcessor(p));
processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
processGroupStatus.getInputPortStatus()
.stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
.forEach(p -> nifiFlow.addInputPort(p));
processGroupStatus.getOutputPortStatus()
.stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
.forEach(p -> nifiFlow.addOutputPort(p));
// Analyze child ProcessGroups recursively.
for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
analyzeProcessGroup(child, nifiFlow);
}
}
Остается только в PicoReportLineageToAtlas подменить один класс на другой.
import org.apache.nifi.atlas.NiFiFlowAnalyzer
//меняем на
import pico.habr.nifi.atlas.PicoNiFiFlowAnalyzer;
//и
final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
//меняем на
final PicoNiFiFlowAnalyzer flowAnalyzer = new PicoNiFiFlowAnalyzer();
Остается только в resources создать файл org.apache.nifi.reporting.ReportingTask
со следующим содержимым:
pico.habr.nifi.atlas.reporting.PicoReportLineageToAtlas
Он содержит в себе только путь до класса репортера. Этот файл используется для регистрации пользовательских реализаций интерфейса ReportingTask в NiFi. Он следует спецификации Java Service Provider Interface (SPI), которая позволяет динамически находить и загружать реализации интерфейсов или абстрактных классов во время выполнения.
Остается только собрать проект командой mvn clean install
.
И тогда в модуле pico-reporting-task-atlas-nar появится .nar
архив pico-reporting-task-atlas-nar-1.20.0.nar
.
Итого, имеем следующую структуру проекта:
ls -R
.:
pom.xml reporting-task-atlas reporting-task-atlas-nar
./reporting-task-atlas:
pom.xml src
./reporting-task-atlas/src:
main
./reporting-task-atlas/src/main:
java resources
./reporting-task-atlas/src/main/java:
pico
./reporting-task-atlas/src/main/java/pico:
habr
./reporting-task-atlas/src/main/java/pico/habr:
nifi
./reporting-task-atlas/src/main/java/pico/habr/nifi:
atlas
./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas:
PicoNiFiFlowAnalyzer.java reporting
./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas/reporting:
PicoReportLineageToAtlas.java
./reporting-task-atlas/src/main/resources:
META-INF
./reporting-task-atlas/src/main/resources/META-INF:
services
./reporting-task-atlas/src/main/resources/META-INF/services:
org.apache.nifi.reporting.ReportingTask
./reporting-task-atlas-nar:
pom.xml
как проникунуть .nar в NIFI
Чтобы подключить .nar
архив в NIFI, достаточно прокинуть его в папку lib
, которая должна находиться в корневой папке NIFI. В случае с Docker можно воспользоваться командой docker cp
. После этого достаточно перезагрузить NIFI или контейнер целиком.
Результат
заходим в панель управления NIFI и находим там кастомный репортер. Настраиваем его в соответствии с требованиями. (настраиваеться так же как и оригинальный)
Для примера создам следующий даг
И смотрим что отобразилось в Atlas
И как видим в Атласе процессора D
не появилось, поскольку в нем нету необходимого суффикса
P.S. код можно глянуть сдесь