Интеграция Apache NiFi и Atlas: Настройка в Docker и Создание Пользовательского Репортера

Ремарка

Обратите внимание, что методы и подходы, описанные в этой статье, являются одним из способов решения возникших проблем и могут не являться наилучшим или рекомендуемым вариантом для всех ситуаций. Автор не призывает следовать описанным шагам без учета возможных альтернатив и не гарантирует, что данный метод подойдет для всех пользователей или случаев. Учитывайте, что использование нестандартных решений может повлечь за собой риски или потребовать дополнительных усилий для поддержки и обновлений. Рекомендуется всегда исследовать и оценивать возможные варианты решений в контексте вашего проекта и требований.

Как запустить Apache Atlas в Docker

Первое, с чем я столкнулась, — это то, что большинство (возможно, не все, я, конечно, не проверяла) образов Apache Atlas не работают из коробки.

685ca4ece48a7e54f138506ee276a271.png

Каждый из них падал с следующей ошибкой:

14: curl#6 - "Could not resolve host: mirrorlist.centos.org; Unknown error"


 One of the configured repositories failed (Unknown),
 and yum doesn't have enough cached data to continue. At this point the only
 safe thing yum can do is fail. There are a few ways to work "fix" this:

     1. Contact the upstream for the repository and get them to fix the problem.

     2. Reconfigure the baseurl/etc. for the repository, to point to a working
        upstream. This is most often useful if you are using a newer
        distribution release than is supported by the repository (and the
        packages for the previous distribution release still work).

     3. Run the command with the repository temporarily disabled
            yum --disablerepo= ...

     4. Disable the repository permanently, so yum won't use it by default. Yum
        will then just ignore the repository until you permanently enable it
        again or use --enablerepo for temporary usage:

            yum-config-manager --disable 
        or
            subscription-manager repos --disable=

     5. Configure the failing repository to be skipped, if it is unavailable.
        Note that yum will try to contact the repo. when it runs most commands,
        so will have to try and fail each time (and thus. yum will be be much
        slower). If it is a very temporary problem though, this is often a nice
        compromise:

            yum-config-manager --save --setopt=.skip_if_unavailable=true

Об этой ошибке чуть позже.

Я начала искать готовые Docker-файлы на GitHub.

191d1ab0677767e67eef585a3b867376.png

Однако они также сталкивались с той же ошибкой.


Ошибка связана с тем, что Docker не смог найти или разрешить хост для репозитория mirrorlist.centos.org. Это привело к сбою команды yum, используемой для установки пакетов внутри контейнера Docker. Подобные ошибки могут возникать, если репозиторий больше не поддерживается или временно недоступен.

Остается только написать свои Docker-файлы. Я взяла за основу файлы из данного репозитория (уточню, что я использовала не последнюю версию, а на один коммит назад от версии на 27.08.2024, так как последнюю версию не удалось запустить в принципе), поскольку на него ссылались в данной статье.

Первое, что мы делаем — убираем все лишнее из docker-compose.yml.

В оригинале, помимо Atlas и сервисов, необходимых для его работы, также поднимались Spark, Hive, Hadoop (DataNode, NameNode). Все это оказалось ненужным, поэтому мы просто вычеркиваем это из файла, а также удаляем папки Spark и Hive. Оставляем только необходимое: сам Apache Atlas, Apache Kafka и Apache Zookeeper.

Итого получаем:

version: "2"

services:

  atlas-server:
    build: ./atlas
    image: pico/apache-atlas
    volumes:
      - ./atlas/resources/1000-Hadoop:/opt/atlas/models/1000-Hadoop
    ports:
      - "21000:21000"
    depends_on:
      - "zookeeper"
      - "kafka" 

  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"  

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    hostname: kafka
    environment:
      KAFKA_CREATE_TOPICS: "create_events:1:1,delete_events:1:1,ATLAS_HOOK:1:1"
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper  

Далее мы переходим к Dockerfile, который собирает Apache Atlas в контейнере. Для устранения ошибки достаточно прописать архивное зеркало для CentOS сразу после строки FROM centos:7.

FROM centos:7

COPY --from=stage-atlas /apache-atlas.tar.gz /apache-atlas.tar.gz

#новые строчки с архивным зеркалом centos
RUN sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo \
	&& sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo \
	&& sed -i s/^mirrorlist=http/#mirrorlist=http/g /etc/yum.repos.d/*.repo

И можно запускать docker-compose up -d. Однако возникает следующая ошибка:

Command »/usr/bin/python3 -u -c «import setuptools, tokenize;__file__='/tmp/pip-build-f8uu6b5l/typed-ast/setup.py'; f=getattr (tokenize, 'open', open)(__file__); code=f.read ().replace ('\r\n', '\n'); f.close (); exec (compile (code, __file__, 'exec'))» install --record /tmp/pip-artr5er7-record/install-record.txt --single-version-externally-managed --compile» failed with error code 1 in /tmp/pip-build-f8uu6b5l/typed-ast/ ERROR: Service 'atlas-server' failed to build: The command '/bin/sh -c pip3 install amundsenatlastypes==1.1.0' returned a non-zero code: 1

Ошибка связана с неудачной установкой Python-пакета amundsenatlastypes. Решение еще проще: в Dockerfile в строке RUN pip3 install amundsenatlastypes==1.1.0 измените версию на 1.2.2, и снова запустите docker-compose up -d. На этот раз все будет работать. Можно перейти по адресу http://localhost:21000/ и открыть веб-интерфейс Apache Atlas. Отмечу, что запуск может занять несколько минут; в первый раз у меня это заняло около 10 минут, но последующие запуски происходят гораздо быстрее.

Apache Atlas Web UI

Apache Atlas Web UI

Apache Atlas заработал, готовые файлы для запуска можно взять тут

поднимает Apache NI-Fi

Здесь проблем нет. Создайте файл docker-compose.yml со следующим содержанием и запустите его командой docker-compose up -d:

version: '3'

services:

  nifi:
    image: apache/nifi:1.20.0
    container_name: nifi
    ports:
      - "8080:8080"
      - "8443:8443"
      - "8081:8081"
    volumes:
      - nifi-data:/opt/nifi/nifi-current/logs
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_WEB_HTTPS_PORT=8443
      - NIFI_WEB_HTTP_HOST=0.0.0.0
      - NIFI_WEB_PROXY_HOST=localhost
    restart: unless-stopped

volumes:
  nifi-data:
    driver: local

После этого достаточно перейти по адресу http://localhost:8080, и должен открыться веб-интерфейс NiFi. Отмечу, что для запуска может понадобиться несколько минут.

Apache NI-Fi Web UI

Apache NI-Fi Web UI

Постановка задачи

Задача состоит в том, чтобы создать репортер между NiFi и Atlas. Для этой задачи существует стандартный репортер под названием ReportLineageToAtlas (в Docker-образе по умолчанию данного плагина нет; в конце будет небольшой абзац о том, как добавлять плагины). Создадим в NiFi небольшой DAG и настроим репортер в Atlas.

даг

даг

настроеный репортер

настроеный репортер

результат в apache Atlas

результат в apache Atlas

Как мы видим, информация о DAG отображается в Atlas, но проблема в том, что отображается вся информация. Мне нужно скрыть определенные стадии DAG, чтобы их не было в Atlas. Таким образом, моя задача — создать репортер, который будет отделять, что нужно отображать, а что нет в Atlas. Мы введем правило, что отображаться будут только те стадии, в названиях которых в конце присутствует _to_atlas.

Создание плагина

Создадим файл pom.xml со следующим содержимым:

    Ni-Fi
    pack
    
    pom
    1.20.0

    
    
        
        1.20.0

        
        2.9
    


    
    
        
        reporting-task-atlas

        
        reporting-task-atlas-nar
    

Определим зависемости

    
    
        
            
            
                org.slf4j
                slf4j-api
                1.7.12
            

            
            
                org.apache.nifi
                nifi-api
                ${nifi.version}
                provided
            

            
            
                org.apache.nifi
                nifi-processor-utils
                1.15.3
            

            
            
                org.apache.nifi
                nifi-reporting-utils
                ${nifi.version}
            

            
            
                org.apache.commons
                commons-io
                1.3.2
            
        
    

Компоненты Apache NiFi следует упаковывать в .nar архивы, и для этого необходимо подключить несколько плагинов.

    
    
        
            
            
                org.apache.nifi
                nifi-nar-maven-plugin
                1.5.0
                
                true
            

            
            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.15
            

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    
                    17
                    
                    17
                
            
        
    

Теперь можно собирать .nar архивы. Переходим в модуль pico-reporting-task-atlas-nar и создаем там файл pom.xml со следующим содержимым:




    pico.habr
    PicoReportLineageToAtlas
    1.20.0



pico-reporting-task-atlas-nar


nar


reporting-task-atlas-nar



    UTF-8




    
    
        pico.habr
        reporting-task-atlas
        1.20.0
        compile
    
    
    
    
        org.apache.nifi
        nifi-standard-services-api-nar
        1.20.0
        nar
    

В этом модуле больше ничего трогать не нужно, и при запуске сборки проекта в этой директории появится .nar архив.

Переходим в модуль reporting-task-atlas и создаем там файл pom.xml со следующим содержимым:



    pico.habr
    PicoReportLineageToAtlas
    1.20.0



reporting-task-atlas


1.20.0


jar




    
    
        org.apache.nifi
        nifi-processor-utils
    

    
    
        org.slf4j
        slf4j-api
    

    
    
        org.apache.nifi
        nifi-api
    

    
    
        org.apache.nifi
        nifi-reporting-utils
    

    
    
        org.apache.nifi
        nifi-atlas-reporting-task
        ${nifi.version}
    

    
    
        org.apache.nifi
        nifi-ssl-context-service-api
        ${nifi.version}
    

    
    
        org.apache.nifi
        nifi-kerberos-credentials-service-api
        ${nifi.version}
        compile
    

    
    
        org.apache.nifi
        nifi-persistent-provenance-repository
        ${nifi.version}
    

    
    
        org.jetbrains
        annotations
        RELEASE
        compile
    

    
    
        org.projectlombok
        lombok
        1.18.26
        provided
    

Все pom.xml файлы готовы, и остается только написать код репортера.

Создаю класс PicoReportLineageToAtlas и копирую в него код класса org.apache.nifi.atlas.reporting.ReportLineageToAtlas.

Меняю в нем:

  • package org.apache.nifi.atlas.reporting на package pico.habr.nifi.atlas.reporting

  • Название самого репортера с ReportLineageToAtlas на PicoReportLineageToAtlas

  • Немного обновляю теги.

// было 
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
        " End-to-end lineages across NiFi environments and other systems can be reported if those are" +
        " connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
        " Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
        " in addition to NiFi provenance events providing detailed event level lineage." +
        " See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
                 description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {

  //...

}

// стало
@Tags({"pico", "atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
        " End-to-end lineages across NiFi environments and other systems can be reported if those are" +
        " connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
        " Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
        " in addition to NiFi provenance events providing detailed event level lineage." +
        " See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.", value = "hostname Regex patterns",
                 description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class PicoReportLineageToAtlas extends AbstractReportingTask {

  //...

}

Далее задача — найти, где в коде ставится фильтрация по имени компонентов дага.

Как оказалось, за это отвечает другой класс, а именно org.apache.nifi.atlas.NiFiFlowAnalyzer. Поэтому создаем еще один класс PicoNiFiFlowAnalyzer и аналогично копируем в него код. После этого вносим несколько правок.

//было
package org.apache.nifi.atlas;

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class NiFiFlowAnalyzer { 
  //...
  }

//меняем на 
package pico.habr.nifi.atlas; //замена групы

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.atlas.NiFiFlow; //добовляю недостающию зависемость
import org.apache.nifi.atlas.NiFiFlowPath; //добовляю недостающию зависемость
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class NiFiFlowAnalyzer { //меняю название класса
  //...
  }

Находим в коде метод analyzeProcessGroup и вносим правки.

// было
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {

        processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
        processGroupStatus.getProcessorStatus().forEach(p -> nifiFlow.addProcessor(p));
        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
        processGroupStatus.getInputPortStatus().forEach(p -> nifiFlow.addInputPort(p));
        processGroupStatus.getOutputPortStatus().forEach(p -> nifiFlow.addOutputPort(p));

        // Analyze child ProcessGroups recursively.
        for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
            analyzeProcessGroup(child, nifiFlow);
        }

    }

// правки
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {

        processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
        processGroupStatus.getProcessorStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addProcessor(p));
        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
        processGroupStatus.getInputPortStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addInputPort(p));
        processGroupStatus.getOutputPortStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addOutputPort(p));

        // Analyze child ProcessGroups recursively.
        for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
            analyzeProcessGroup(child, nifiFlow);
        }

    }

Остается только в PicoReportLineageToAtlas подменить один класс на другой.

import org.apache.nifi.atlas.NiFiFlowAnalyzer
//меняем на 
import pico.habr.nifi.atlas.PicoNiFiFlowAnalyzer;

//и

final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
//меняем на 
final PicoNiFiFlowAnalyzer flowAnalyzer = new PicoNiFiFlowAnalyzer();

Остается только в resources создать файл org.apache.nifi.reporting.ReportingTask со следующим содержимым:

pico.habr.nifi.atlas.reporting.PicoReportLineageToAtlas

Он содержит в себе только путь до класса репортера. Этот файл используется для регистрации пользовательских реализаций интерфейса ReportingTask в NiFi. Он следует спецификации Java Service Provider Interface (SPI), которая позволяет динамически находить и загружать реализации интерфейсов или абстрактных классов во время выполнения.

Остается только собрать проект командой mvn clean install.

И тогда в модуле pico-reporting-task-atlas-nar появится .nar архив pico-reporting-task-atlas-nar-1.20.0.nar.

Итого, имеем следующую структуру проекта:

ls -R
.:
pom.xml  reporting-task-atlas  reporting-task-atlas-nar

./reporting-task-atlas:
pom.xml  src

./reporting-task-atlas/src:
main

./reporting-task-atlas/src/main:
java  resources

./reporting-task-atlas/src/main/java:
pico

./reporting-task-atlas/src/main/java/pico:
habr

./reporting-task-atlas/src/main/java/pico/habr:
nifi

./reporting-task-atlas/src/main/java/pico/habr/nifi:
atlas

./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas:
PicoNiFiFlowAnalyzer.java  reporting

./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas/reporting:
PicoReportLineageToAtlas.java

./reporting-task-atlas/src/main/resources:
META-INF

./reporting-task-atlas/src/main/resources/META-INF:
services

./reporting-task-atlas/src/main/resources/META-INF/services:
org.apache.nifi.reporting.ReportingTask

./reporting-task-atlas-nar:
pom.xml

как проникунуть .nar в NIFI

Чтобы подключить .nar архив в NIFI, достаточно прокинуть его в папку lib, которая должна находиться в корневой папке NIFI. В случае с Docker можно воспользоваться командой docker cp. После этого достаточно перезагрузить NIFI или контейнер целиком.

Результат

заходим в панель управления NIFI и находим там кастомный репортер. Настраиваем его в соответствии с требованиями. (настраиваеться так же как и оригинальный)

23687e75f41d30ed192ac6d0277968d0.png

Для примера создам следующий даг

15f1935d1453829b4feecb52eec7760f.png

И смотрим что отобразилось в Atlas

8b52db4d983e351c2e077c88022fdfaa.png

И как видим в Атласе процессора D не появилось, поскольку в нем нету необходимого суффикса

P.S. код можно глянуть сдесь

© Habrahabr.ru