Как мы проверяли качество данных после завершения миграции с Teradata на Greenplum

0fc666f54aba70cb5578a1352e8da269.png

Привет, Хабр! Мы завершаем серию статей о миграции аналитического хранилища данных с платформы Teradata на GreenPlum. В предыдущих статьях мы рассказали о нашем опыте и результатах автоматизированного переписывания SQL‑скриптов с помощью реализованных сервисов миграции кода и переноса архива данных. В этот раз мы расскажем вам о нашем опыте и результатах кросс‑платформенной проверки качества данных во время и после миграции, а также о трудностях и решениях, связанных с этим процессом.

Завершая нашу серию, мы подходим к ключевому аспекту миграции данных — проверке и обеспечению качества данных после переноса. Теперь, когда перед нами стоят два параллельно функционирующих хранилища, возникает вопрос о точности и согласованности данных между ними.

Постановка задачи

После миграции хранилища с Teradata на Greenplum мы получили два независимых хранилища, содержащих загружаемые данные из одних и тех же систем‑источников и с одинаковым набором рассчитываемых витрин данных. Прежде чем переключить потребителей на новое хранилище,  было важно сравнить результаты загрузок и расчётов витрин между обеими платформами. Это сравнение необходимо было выполнить не однократно для каждого источника, а регулярно, чтобы гарантировать постоянное соответствие данных. Полная сверка всех данных Teradata и Greenplum не имела смысла, поскольку большая часть данных попала в Greenplum как часть архивных данных. Акцент был сделан на проверку недавно загруженных данных и точности последующих расчётов витрин.

Архитектура решения по сверке данных

В нашем решении для сверки данных использовался Hadoop-кластер в качестве основной платформы для хранения сравниваемых данных и результатов проведённого сравнения. Для переноса сверяемых наборов данных из Teradata в кластер Hadoop использовался сервис Teradata QueryGrid и его коннектор Teradata‑Hive. Для переноса данных из GreenPlum в Hadoop задействовали фреймворк PXF. Данные сверяли с помощью Spark‑приложения, реализованного на Scala.

Требования к реализации решения

В процессе разработки решения для сверки данных мы определили следующие ключевые требования:

  1. Добавить возможность упрощённого добавления новых таблиц для сверки.

  2. Определить методику, позволяющую соотнести между собой записи между Greenplum и Teradata, потому что внутренние ключи для одного и того же внешнего идентификатора записи в системе‑источнике для новых записей различались между платформами.

  3. Обеспечить детализацию расхождений. Отображать в качестве результата не только строки, в которых выявлены расхождения, но и точное указание атрибутов.

Глобальные межплатформенные идентификаторы (UID)

До перехода к описанию решения рассмотрим суть и назначение уникальных идентификаторов, которые могли бы использоваться различными платформами, — так называемых глобальных межплатформенных UID. В процессе загрузки данных в хранилище обычно применяется методика присвоения новым записям внутренних суррогатных ключей на основе определённых метаданных. В нашем случае в наборе К‑таблиц в слое технических метаданных сохранялся исходный набор данных, а именно первичный ключ записи в системе‑источнике и сгенерированный суррогатный ключ. Таким образом, суррогатные ключи синхронизировались только на момент их миграции. При последующих загрузках данных эти ключи для одинаковых внешних идентификаторов в разных хранилищах начинали различаться. Для решения этой проблемы внедрили механизм формирования таблицы соответствия внутренних суррогатных ключей детального слоя единому глобальному межплатформенному UID.

Значение UID определялось как результат функции хеширования MD5, которая принимает конкатенацию:

  • наименования K‑таблицы, в которой хранится внутренний ключ (k_tbl_name);

  • условное обозначение системы источника (info_system_type_cd + info_system_inst_cd);

  • значение полей, формирующих первичный ключ записи в системе‑источника (nk_column_name_1, nk_column_name_2 … nk_column_name_N).

UID = md5(«'^'||||^'||||'^'||< info_system_inst_cd>||'^'||||'^'||||'^'...||»)

Результат формирования UID сохранялся в отдельной таблице k_uid_map с такой структурой:

344592f56821dd58d3d7d6336be300ff.png

Это таблица позволила соотнести между собой записи из двух платформ.

Реализация решения

Метаданные решения

Для реализации процесса сверки данных, в соответствии с нашей архитектурой,  мы внедрили регулярную выгрузку данных из хранилищ Teradata и Greenplum в кластер Hadoop. Для упрощения добавления новых таблиц в процесс сверки реализовали такую схему. На стороне Teradata и Greenplum создали по две таблицы с метаданными для выгрузки и последующей сверки. Первая таблица, именуемая comparison_obj, содержала параметры для выгрузки:

  • источник данных для выгрузки (info_system_type_cd + info_system_inst_cd + table_name);

  • поле, позволяющее разделить сверяемые данные на блоки, например для выгрузки по различным критериям (block);

  • условие фильтрации для выгрузки данных (filter);

  • список полей, составляющий первичный ключ записи (key_list);

  • список полей, содержащих данные о начале и окончании действия записи для исторических таблиц (history_fields_list);

  • правила трансформации данных при выгрузке в виде SQL‑выражений (transformation_rule).

Трансформации потребовались для устранения расхождений, связанных с особенностями платформ или спецификой миграции отдельных таблиц. Например, для удаления пробелов в конце строк или приведения данных к единому регистру. Пример такой настройки:

2d0c79e2d62b3b9c5e13760bd8ae7931.png

Вторая таблица, comparison_obj_x_uid_map, обеспечивала выполнение второго требования к процессу сверки, содержа в себе правила для сопоставления внутренних ключей (ID), которые различались между Teradata и Greenplum, с глобальными межплатформенными идентификаторами (UID), общими для обеих систем. Это позволяло унифицировать идентификацию данных в процессе сверки. Пример настройки:

c28383d31ec6d2e75d4367927ba8e9ac.png

В указанной таблице, поля info_system_type_cd, info_system_inst_cd, block и table_name определяют объект, подлежащий сверке. Поле attribute_name обозначает имя атрибута, значение которого следует заменить на UID. k_table_name указывает на наименование K‑таблицы, используемой для генерации ключей. Поле k_table_info_system_type_cd содержит код системы‑источника, связанный с таблицей межсистемных идентификаторов, и заполняется в случаях, когда для объекта сверки требуется использовать таблицу k_uid_map из другого источника данных, например, при сверке данных витрины.

Выгрузка данных в Hadoop

Для выполнения выгрузки в Hive создали в кластере Hadoop внешние таблицы для каждого выгружаемого объекта сверки. Шаблон имени включал в себя идентификаторы платформы данных (td или gp), а также метаданные сверяемого объекта. Например, td_083_000 001_1234_t_cust и gp_083_000 001_1234_t_cust. Таблицы создавали партицироваными по идентификатору выгрузки (workflow_run_id_part), что позволяло хранить каждую выгрузку данных в отдельной партиции.

Процесс выгрузки автоматизировали с помощью сервисных процедур в Teradata и функций в Greenplum, принимающих в качестве параметров идентификаторы системы и объекта выгрузки. Динамический SQL‑запрос, сформированный на основе таблицы с метаданными (comparison_obj) и словаря базы данных, запускал процедуру записи данных в Hadoop.

В случае с Teradata данные вставляли в Hive‑таблицы на стороне Hadoop через сервер QueryGrid. Сформированный запрос выглядел так:

INSERT INTO custom_t_dmcomparison_src.td_083_000001_1234_t_cust@QGForeigServer
(
  start_dt,
  end_dt,
  cust_id,
  okfs_id,
  vsp_id,
  tb_id,
  osb_id,
  /* ... остальные поля таблицы, выгружаемые без изменений */
)
SELECT 
  /* Поля, определяющие период действия записи и поля, содержащие идентификаторы */
  TO_CHAR(start_dt),
  end_dt,
  cust_id,
  okfs_id,
  /* Поля с заданными правилами преобразования transformation_rule из настройки comparison_obj */
  CASE WHEN vsp_id = -1 THEN 1 ELSE 0 END AS vsp_id,
  -1 AS tb_id,
  -1 AS osb_id,
  /* ... остальные поля таблицы, выгружаемые без изменений */
  3454354 AS workflow_run_id_part /* Идентификатор выгрузки данных для новой партиции */
FROM 
  /* Выгружаемая таблица, определяется настройкой info_system_type_cd+info_system_inst_cd+table_name в comparison_obj */
  prd3_1_083_db_dwh.t_cust 
WHERE 
  /* Применяется filter из настройки comparison_obj */
  end_dt >= DATE '2023-05-15' - INTERVAL '10' DAY

Шаг 1. На стороне Greenplum необходимо было предварительно пересоздать внешнюю таблицу, так как для выгрузки в Hadoop через фреймворк PXF используется HDFS: profile и необходимо было указать новый путь в HDFS для сохранения данных. Пример SQL‑команд для этого шага:

DROP EXTERNAL TABLE IF EXISTS s_grnplm_as_t_didsd_083_db_stg.gp_083_000001_1234_t_cust_ext_wr;

CREATE WRITABLE EXTERNAL TABLE s_grnplm_as_t_didsd_083_db_stg.gp_083_000001_1234_t_cust_ext_wr
(
  start_dt DATE,
  end_dt DATE,
  cust_id BIGINT,
  okfs_id BIGINT,
  vsp_id BIGINT,
  tb_id BIGINT,
  osb_id BIGINT,
  /* ... остальные поля таблицы */
)
LOCATION ('pxf:///data/custom/t/comparison/src/gp/083/000001/1234/t_cust/workflow_run_id_part=453533?profile=hdfs:parquet&SERVER=sdpbdm_comparison')
FORMAT 'custom' (formatter = 'pxfwritable_export');

Шаг 2. После пересоздания внешней таблицы данные могут быть записаны в Parquet-файлы в Hadoop. Пример SQL‑команды для этого шага:

INSERT INTO s_grnplm_as_t_didsd_083_db_stg.gp_083_000001_1234_t_cust_ext_wr
(
  start_dt,
  end_dt,
  cust_id,
  okfs_id,
  vsp_id,
  tb_id,
  osb_id,
  /* ... остальные поля таблицы, выгружаемые без изменений */
)
SELECT 
  TO_CHAR(start_dt),
  end_dt,
  cust_id,
  okfs_id,
  /* Поля с заданными правилами преобразования из настройки comparison_obj */
  CASE WHEN vsp_id = -1 THEN 1 ELSE 0 END AS vsp_id,
  -1 AS tb_id,
  -1 AS osb_id,
  /* ... остальные поля таблицы, выгружаемые без изменений */
FROM 
  s_grnplm_as_t_didsd_083_db_dwh.t_cust 
WHERE  
  end_dt >= DATE '2023-05-15' - INTERVAL '10' DAY;

Данные для сверки выгружались из Teradata и Hadoop регулярно. На каждой из платформ одновременно параллельно запускали нескольких экземпляров процедуры и функции выгрузки с параметрами для выгрузки необходимых объектов. Для этого использовали собственный механизм параллельного запуска SQL‑команд с возможностью указания количества одновременно выполняемых потоков.

Подготовительные работы по выгрузке данных

Описанная выше реализация решения по выгрузке существенно упростила необходимые работы. В основном, требовалось только создать серию внешних таблиц Hive и заполнить соответствующие метаданные для выгрузки.

Для создания DDL‑скриптов Hive‑таблиц мы использовали немного модифицированную функцию, ранее использованную для создания таблиц в процессе миграции данных, таким образом, чтобы она выдавала два набора таблиц, соответствующих двум платформам выгрузки.

Метаданные для сверки сгенерировали на основе source‑to‑target-описаний (s2t) загрузок данных, которые создавали в Excel. В s2t были необходимые данные для формирования метаданных, используемых в процессе сверки.

Spark-приложение для сверки данных в кластере Hadoop

Как было отмечено выше, данные между двумя платформами сверяли через приложение на Spark, написанное на Scala. Этот процесс включал работу в себя с объектами, определёнными в таблице метаданных comparison_obj. Важно отметить, что помимо данных, подлежащих сверке, таблицы с метаданными (comparison_obj и comparison_obj_x_uid_map) из Greenplum загружались в Hadoop ежедневно. Метаданные из Greenplum служили основой и являлись мастер‑данными для приложения сверки.

Описание шагов выполнения алгоритма сверки данных:

  1. Выбираем актуальные данные из последних доступных партиций Hive‑таблиц, содержащих данные, загруженные из Teradata и Greenplum. При этом исключаются все технические поля, такие как идентификаторы загрузки и признаки изменений записей.

  2. Объединяем с глобальными межплатформенными идентификаторами UID. Далее, данные, полученные на предыдущем этапе, объединяем с таблицами глобальных идентификаторов (k_uid_map) согласно метаданным (comparison_obj_x_uid_map), что позволяет заменить суррогатные ключи платформ на универсальные глобальные идентификаторы.

  3. Учёт историчности. Если для сверяемого объекта указаны поля историчности (comparison_obj.history_fields_list), то соединяем с календарём на заданную глубину для перехода от периодов к срезам на конкретную дату. В случае отсутствия таких полей соединяем с календарём на текущую дату. Таким образом, исторические таблицы мы сравнивали на каждую дату внутри интервалов сверки, которые по умолчанию составляли 10 дней.

  4. Рассчитывали хеш-значения по всем полям наборов данных, за исключением полей, являющихся ключами для соединения наборов данных, полей историчности (если такие указаны).

  5. Полностью внешне соединяли (FULL OUTER JOIN) наборы данных из п.4 согласно настройке comparison_obj.key_list.

  6. Финальные результаты сверки фиксировали в трёх таблицах. Первая содержит подробные результаты по каждому объекту сверки, включая строки с расхождениями хеш‑значений или отсутствием ключей. Для новых сверяемых объектов таблицу детального результата сверки создавали при первом запуске сверки, а при повторных запусках результат сверки записывали в новую партицию. Следующая таблица (comparison_agg_result) содержала агрегированную информацию о результате сверки. В таблицу сохраняли следующие показатели сверки:

59b2d8c3cf832795d57a6ca7cda4302b.png

Пример агрегированного результата сверки данных исторической таблицы за последние 10 дней до даты начала сверки.

917a3643f9805dedac79b7c367ec27bd.png

Другой пример агрегированного вывода результатов сверки содержит данные о несоответствиях по атрибутам.

e2d6da2379af036e8b00e97a5a901d50.png

Этот вид агрегированного результата сверки был очень полезен для анализа расхождений в детальных данных, так как он наглядно отображал, какой именно атрибут приводил к расхождениям между платформами. Вот пример такого агрегированного результата сверки по атрибутам таблицы:

adc70b3469dffc1af8e0a17276f8d929.png

Результаты эксплуатации решения

Реализованное решение прежде всего позволило нам быстро выявить и устранить все несоответствия в процессах загрузки, возникшие в результате миграции хранилища с Teradata на Greenplum. Ежедневно мы сверяли около 10 Тб данных, что позволяло сверять практически все данные, загружаемые в каждый новый инкремент на обеих платформах. Так мы продемонстрировали пользователям, что качество данных в новом хранилище Greenplum соответствует уровню старого хранилища Teradata.

Заключение

Наш проект по миграции данных из Teradata в GreenPlum успешно завершён. Проведённая сверка дала нам уверенность в том, что качество данных в новой системе Greenplum находится на уровне проверенного временем хранилища Teradata, что мы смогли подтвердить нашим пользователям. Проект миграции обеспечил непрерывность и надёжность бизнес‑процессов на новой платформе.

© Habrahabr.ru