Data Lineage из топора

Статья навеяна удачной реализацией Data Lineage «на коленке». Рассматривается случай, когда в окружающем корпоративном ландшафте Apache Atlas, Datahub или Amundsen еще не подвезли (и неизвестно, будет ли, и если будет, то когда) —, а посмотреть от таблицы назад к источниками или вперед к потребителям от конкретной таблицы хочется прямо сейчас. Условия, в которых это удалось сделать, могут не повториться в других случаях, но сам кейс наверняка будет интересен.

Прежде всего речь идет об инфраструктуре Hadoop/Hive/Spark, в которых потоки oozie запускают последовательно и параллельно модули с набором SQL-трансформаций. Для разработки потоков используется внутренний инструмент кодогенерации. В нем разработчики в UI заполняют и модифицируют в табличном виде метаданные потоков, таблиц, представлений, и других артефактов, получая на выходе дистрибутив патча.

Из его БД можно взять поля:

Среди трансформаций данных кроме SQL также встречаются другие языки (прежде всего java), но шагов SQL подавляющее большинство. Если же в трансформации сознательным разработчиком заполнены не только Target table name, но и список её источников (SQL source list), то эта информация будет использована при построении связей. Если нет — увы, часть графа будет упущена и самое неприятное — упускаются все дальнейшие связи по направлению к следующим источникам или потребителям. Альтернатива — установка на нодах кластера агентов Apache Atlas, который в инфраструктуре Apache Spark анализирует в Spark History Server вставки фреймов в целевые таблицы и их таблицы-источники для большинства запросов. Дополнительно на уровне таких инструментов в коробке обеспечивается column level lineage.

Первый шаг перед сборкой data lineage — получение всех источников каждой трансформации и каждого представления. Требование указывать в коде полное имя любой таблицы, с именем схемы через точку помогает отсечь псевдонимы таблиц и блоков CTE. Функция несложная, примеров в сети много. Внизу приведен вариант на Posgtre PLPgSQL, который разбирает запрос и возвращает таблицы и представления — источники или параметризированные имена (типа #src_table# или #src_schema#.#src_table#)

CREATE OR REPLACE FUNCTION slp.get_sources(
        ssql text)
  RETURNS character varying
  LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
    ssql1 text;
    ssql2 text;
    s_src_list varchar(4096) := '';
BEGIN
  -- remove multiline comments /* ... */
  SELECT regexp_replace( ssql, '/\*[^*]*\*+(?:[^*/][^*]*\*+)*/', '' )
    INTO ssql1;
  -- replace {% %} conditionals with the word from
  SELECT regexp_replace( ssql, '\{\% [^\%\}]* \%\}', '' )
      INTO ssql1;

  -- first, remove trailing comment --,
  -- second,  remove ENTERs, (, ), = , *
  -- then split by ENTERs  -- then join by SPACEs to ssql2
  SELECT regexp_replace( ssql1, '\t', ' ' , 'g')
    INTO ssql1;
  SELECT regexp_replace( ssql1, '\{\% [^\%\}]* \%\}', ' from ', 'g' )
      INTO ssql1;
  SELECT lower(
      regexp_replace(
           regexp_replace(ssql1, '--(\S*)', ' '), E'[\\n\\r\\u2028\\u2029\\u000B\\u0085\(\)\*\=]', ' ', 'g')
      )
  INTO ssql2;

  select regexp_replace(ssql2, '(\s+)', ' ', 'g')
  into ssql2;

  SELECT string_agg ( DISTINCT tt.chunk, ';' )
    INTO s_src_list
   FROM (
      SELECT q.chunk, q.prev
      FROM (
           SELECT t.chunk, LAG(t.chunk) over () as prev
           FROM (
               SELECT regexp_split_to_table(ssql2, ' ') as chunk
             ) t
        ) q
      WHERE q.prev in ('from', 'join')
    AND q.chunk != 'select'
    AND position('.' IN q.chunk) >0
    ) tt;

  return regexp_replace(s_src_list, '\t', '');
end;
$BODY$;

Рекурсивные запросы с небольшой цепочкой with-блоков CTE вытаскивают data lineage «вверх» к источникам или «вниз» к потребителям. Условимся, что данные текут сверху вниз, как вода по каскаду водопадов. Результатом такого запроса с аргументом — полным именем рассматриваемой таблицы оказывается набор данных из 2 полей (source, target). Фактически Data Lineage таблицы здесь представлен списком смежности вершин, развернутым в таблицу из 2 полей.
Пример table lineage к источникам.

CREATE OR REPLACE FUNCTION slp.lineage_up( like_target text)
    RETURNS TABLE(source text, target text)
    LANGUAGE 'sql'
AS $BODY$
--  logging
INSERT INTO slp.t_lineage_log (function_name, log_timestamp, function_argument)
VALUES( 'slp.lineage_up', current_timestamp, like_target);

--  from bottom to TOP  (UP)
WITH RECURSIVE
a as (
SELECT source_table, target_table
    FROM slp.mv_source_target mvst
    WHERE mvst.target_table like  like_target
UNION
SELECT mvst.source_table, mvst.target_table
    FROM slp.mv_source_target mvst
      INNER JOIN a ar ON mvst.target_table = ar.source_table
)

SELECT  a.source_table as source, a.target_table target
FROM a
 WHERE source_table <> target_table;
$BODY$;

Аналогичная функция строит lineage «вниз», к потребителям.

Отдельно с представлениями. Обращение в коде SQL-запроса к вьюхе просто приводит к разворачиванию ее источников, потому что она с точки зрения потока данных просто разновидность SQL-трансформации.

Аналогично параметризованные имена таблиц типа #param_db#.#param_tab# разворачивается в реальное имя таблицы, взятой из значений параметров потока по умолчанию. Таким образом на lineage-диаграмме показывается и вершина с параметризированным именем и дефолтное значение как его источник.

Аргумент к lineage-запросу может быть также параметризирован, например рассматриваться как SQL-LIKE выражение. Например, для 'schema_name%.t_table_name%' получается объединенный в один список lineage нескольких таблиц и схем, которые может быть удобно изучить на одной диаграмме.

Масштабирование

Имея информацию, что любая атомарная трансформация данных относится к конкретному потоку, а поток — к витрине, можно от table lineage переходить к schema lineage, workflow lineage и datamart lineage. Грубо говоря, если пары source → target заменить на source schema → target schema, потом на source wf → target wf и source datamart → target datamart, а потом убрать дубли, то можно строить покрывающие lineages на более крупных уровнях. При этом граф связей относится только к зависимостям по запрашиваемой одной или нескольким таблицам, а не ко всем возможным связям между агрегирующими их объектами (потоками, их модулями, системами (АС)). Ниже после описания yEd будут приведены отрисовки несколько примеров агрегирующих уровней.

yEd и другие возможности визуализации

Подходящим инструментом визуализации оказался yEd. Бесплатного и условно бесплатного софта для отрисовки графов немало, но только в yEd оказались подходящая коллекция компоновок (layouts) визуализации графов, набор форматов файлов от простейшего колоночного tgf до xml-ного graphml, возможности экспорта в графические форматы, pdf и svg и множества других полезных фич. Можно посмотреть ролик от фирмы-разработчика yEd за 90 секунд. Причем среди многообразия некоммерческих редакторов — визуализаторов графов ближайшим к yEd оказался древний Graphviz. У него есть встраиваемая в python версия в виде библиотеки, но алгоритмы компоновок не вытягивают сколь-нибудь серьезный граф, а документация в pdf ссылается на научные публикации IEEE по визуализации графов 1980х годов. Также не подошел известный визуализатор графов Gephi, библиотека networkx и некоторые другие. Характерная фишка этих редакторов-визуализаторов — «биологические» компоновки, которые выглядят красиво, но совсем не подходят для рассматриваемой задачи. Если вам известны другие подходящие программы или библиотеки кроме yEd, прошу поделиться в комментариях.

yEd встроить не получится, фактически это исполняемый jar-ник с безнадежно обфусцированными классами, но сам редактор как инструмент пока что для меня выглядит непревзойдённым. При этом по наглядности и возможностей анализа подграфов он на мой взгляд превосходит диаграммы Apache Atlas, не могу судить о других аналогичных системах. В целом из всего дистрибутива yWorks yEd достаточно одного yed.jar, можно написать для него bat-стартер с вызовом java.exe из JDK.

В большой сложной диаграмме yEd можно выделить от узла (таблицы) подграф источников (Predecessors) или, наоборот, потребителей (Successors) или же ближайших соседей сверху и снизу (Neighbourhood) — в отдельный документ-закладку c тем же видом компоновки. Это делается правой кнопкой на одной из четырех закладок слева.
Пожалуй в yEd мне не хватило функции поиска дорог между вершинами, хотя бы одной или всех. Для большой загроможденной диаграммы это иногда нужно. Но всегда вместо этого можно построить 2 lineage — от одной таблицы вниз, к потребителям и от другой вверх, к источникам. Глядя на них обоих на одном экране обычно маршрут находится очень быстро.

image
Табличный Data Lineage к потребителям в редакторе yEd, компоновка BPMN.

Во всех рисунках метаданные обфусцированы, то есть подстроки имен колонок, таблиц, схем массово заменены на другие аббревиатуры.
Ниже приводится пример агрегации на уровень oozie-потоков, которая оборачивает табличный уровень только для таблиц, которые в нем участвуют. Диаграмма обрезана, слишком большое количество объектов на ней делают ее плохо читаемой. Но как я уже говорил, могут помочь выделения иерархии predecessors, successors и Neighbourhood.

image
Lineage на уровне потоков.

Далее можно выйти на уровень агрегации АС или витрин (datamarts). Опять же, это оборачивание изначального табличного уровня, только вместо таблицы подставляется витрина, в которой она находится. К диаграмме связей витрин подмешиваются связи простых таблиц из табличного уровня агрегации, которые не относятся к известным витринам (например, находятся за пределами кластера), а также параметризованные таблицы. Если бы их не было и остались только связи между витринами, это затруднило бы анализ и диаграмма была бы малополезной. Связывающие таблицы как бы пунктиром показывают движение данных между витринами.

image
Диаграмма уровня витрин.

Еще полезным оказался простая диаграмма прямых входов и выходов потока. Одна таблица может быть как его входом, так и выходом, то есть поток и читает из нее и в какой-то момент пишет обратно. Поэтому на диаграмме между потоком и такой таблицей проведены две встречных стрелки. Для вьюх и параметризованных таблиц здесь добавляется еще один уровень раскрытия их источников — до первой реальной таблицы (не в этом примере).

image
Прямые источники и приемники потока данных. Поток — комплексный набор упорядоченных трансформаций данных, каждая из которых может читать из многих, но пишет в одну таблицу.

Еще немного о yEd и его интерфейсе.

TGF — 2-колоночный файл с разделителем tab, source и target. Может быть третья колонка — комментарий на стрелке между вершинами (узлами) графа, но опыт показал, что пользы от них мало, а диаграмму загромождают сильно. Избежать ручных действий не получится, но для построения диаграммы их требуется немного.

После загрузки TGF в редактор надо сразу сделать действия Tools → Fit Node to Label и Layout → (Выбрать разновидность layout и настроить его параметры в форме). Для изображения lineage из подходящих layout-ов yEd больше всего подошли BPMN с разными настройками, Flowchart, изредка разновидности Orthogonal Layout. Обычно настройки ограничиваются направление потока диаграммы: сверху вниз или слева направо. Также можно настраивать вид стрелок, прямоугольные повороты или гибкие кривые, минимальное расстояние между прямоугольными нодами. Перед сохранением в graphml или экспортом в svg/jpg можно вручную раскрасить фон отдельных нод другими цветами.

После построения графа можно по Ctrl-S сохранить результат уже в формат graphml, который в xml запоминает все построение и все ручные изменения. Ручных изменений может быть много: раскрашивание узлов в свои цвета, сдвиги узлов и их групп вместе со связями — любые изменения, которые позволяют диаграммные редакторы вроде Visio.

Ctrl-E — можно экспортировать диаграмму в другие форматы, прежде всего в векторный браузерный svg. Файл svg просто перетаскивается в любой браузер, кроме IE и просматривается как векторная диаграмма, с поиском по Ctrl-F в тексте всех элементов диаграммы.

Ctrl-A, Right Click→Copy to System Clipboard — копирует диаграмму в буфер с возможностью вставить в Outoook или Word в графическом виде.

Немного о Column Lineage

С ним все сильно сложнее, но частично можно построить и его. Посмотрите на развитие проекта https://github.com/reata/sqllineage Пропустить через него два вида артефактов кода — трансформации данных INSERT as SELECT и представления CREATE VIEW, немного допилить напильником и вставить в таблицу колоночного lineage с полями source_column и target_column с полной записью schema_name.table_or_view_name.column_name — и можно применить тот же подход рекурсивный запросов от объекта (ов) к источникам или потребителям. Проект еще сильно сырой, исправляется множество багов, метаданные полей таблиц и вьюх или не используются, или используются по минимуму. Нужно учесть, что атрибутов на 1–2 порядка больше, чем таблиц и вьюх, так что с запросами надо аккуратнее — просить показать только по 1–2 поля за раз.
Здесь появляется разные проблемы. Одна из главных — рассматривать ли в lineage только атрибуты, значения которых непосредственно попадают из таблицы источника в целевую таблицу или же добавлять к ним управляющие: из фильтров WHERE, HAVING, в логических условиях типа CASE WHEN, диалектных NVL (), NVL2(), IF () и так далее. Часто может оказаться, что для аналитиков важен второй вариант, и он очень трудно реализуем в таком самопальном инструменте. Всевозможные варианты коррелированных подзапросов, латеральных соединений никогда не получится проанализировать качественно, но, к счастью, сами возможности SQL Spark и Hive/TEZ не очень развиты по сравнению с ANSI или зрелых РСУБД. Полностью все вытащить можно лишь из построенного плана запроса.

image
Пример ограниченного column lineage.

Есть возможность самостоятельного анализа логов приложений Spark HistoryServer, то есть вытаскивать из текстовых логов планов запросов как таблицы\вьюхи, так и их полей. Но такая задача никак не проще примера, описанного выше. Кроме этого полученные связи источник-потребитель надо доставить из пром-области в dev. Лучше, конечно, если есть возможность анализировать план. Это возможно, если на промышленном кластере есть доступ к папкам hdfs spark-history. Описанный выше метод построения lineage не зацикливается только на spark с стеке движков Hadoop. Можно строить на любой кластерной или серверно-реляционной БД, главное иметь табличную упорядоченную базу метаинформации, как минимум все DDL.

Для бизнес-аналитики потоков данных часто бывает необходимо посмотреть lineage-зависимости на уровне таблиц, в которых учитывается связь управляющих полей — то есть таких, которые не светятся явно в списке SELECT для переноса данных в таблицу-потребитель, но участвуют в связках JOIN, в WHERE, NVL в том же SELECТ в подзапросах и в других конструкциях. Apache Atlas позволяет такое учитывать, но описанный выше алгоритм c парсером get_sources — пока нет.

Приведенный пример создания инструмента на коленке ни в коем случае не заменяет собой коробочный продукт, но может сильно облегчить анализ, когда больше ничего нет. Я бы предложил назвать его DIY Data Lineage или как в заголовке статьи, но можете предложить свой вариант.

© Habrahabr.ru