Введение в Clickhouse движок AggregatingMergeTree
В процессе разработки витрин данных часто возникает задача предоставления клиентам данных в агрегированном виде. Если данных в хранилище немного, то их можно агрегировать «на лету», но это плохая практика так как, чем больше будет копиться данных, тем дольше будут выполняться запросы, и тем больше Clickhouse будет съедать ресурсов. В этих случаях логично хранить данные в заранее агрегированном виде, вопрос лишь в том, как реализовать расчет данных агрегированных значений.
В интернете существуют много однотипных статей иллюстрирующих базовое использование материализованных представлений (далее — матвью) на движке AggregatingMergeTree, но если ваша задача выходит за рамки »1 нода, 1 метрика, 1 параметр агрегации» эти статьи вам мало чем помогут. Я посчитал, что моим коллегам может пригодиться своего рода гайд о том, как пользоваться данными представлениями для более сложных задач.
Гайд выполнен в виде шагов, иллюстрирующих мой путь в понимании данной концепции. Если я совершил какую-либо ошибку в процессе, и вы ее заметили, или у вас есть предложение по улучшению / дополнению данного гайда, прошу написать об этом в комментариях, уверен всем от этого будет только лучше.
В рамках моей задачи хранилище данных (далее — DWH) реализовано в виде реплицированного кластера состоящего из 3 нод, данные на ноды распределяются равномерно в соответствии с ключом сортировки таблиц. Существует исходная таблица source, которая содержит столбцы id, timecode_1, metric_data — данные представляют собой временной ряд утилизации ресурсов с гранулярностью 1 минута. Данные поступают блоками каждые 2 минуты.
Пользователи могут запрашивать данные с разной гранулярностью, поэтому наша задача — агрегировать и сохранить данные с разными периодами агрегации (2, 3, 5, 10 минут и т.д.).
Шаг 1. Матвью на 1 ноде с движком AggregatingMergeTree
Начнем с создания исходной таблицы source, в которой будут храниться данные нашего временного ряда. В таблице будут следующие столбцы:
id — идентификатор объекта;
timecode_1 — метка времени каждой точки данных с гранулярностью 1 минута;
metric_data — значение метрики для точки временного ряда
SQL запрос для создания source таблицы (вопросов оптимизации сейчас не касаемся):
CREATE TABLE source
(
id UInt32,
timecode_1 DateTime,
metric_data Float32
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timecode_1)
ORDER BY (id, timecode_1);
Теперь, когда у нас есть исходная таблица, мы можем создать матвью с движком AggregatingMergeTree, которая будет агрегировать данные по каждому объекту с 2-минутными интервалами:
CREATE MATERIALIZED VIEW source_aggregated_mv
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(timecode)
ORDER BY (id, timecode)
AS
SELECT id,
toStartOfInterval(timecode_1, toIntervalMinute(2)) AS timecode,
avgState(metric_data) AS metric_data
FROM source
GROUP BY id, timecode;
SELECT вычисляет время начала 2-минутного интервала для каждой точки данных и агрегирует данные с помощью функции avgState (). GROUP BY группирует данные по идентификатору и 2-минутным интервалам.
Важно отметить, что поля в GROUP BY в запросе материализованного представления и в ORDER BY в определении материализованного представления должны совпадать. Это связано с тем, что механизм AggregatingMergeTree использует ORDER BY для задания первичного ключа, который используется для сортировки данных перед их агрегированием и для правильного объединения данных.
Теперь, когда мы создали исходную таблицу и материализованное представление, вставим некоторые данные в исходную таблицу, чтобы увидеть, корректно ли работает материализованное представление.
Вставим первую группу данных:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:00:00', 10),
(2, '2023-03-05 00:00:00', 15);
Сделаем запрос в матвью source_aggregated_mv, чтобы извлечь агрегированные данные. Помните, что мы должны использовать функцию avgMerge (), чтобы объединить ранее вычисленные состояния и сгруппировать их.
SELECT id,
timecode,
avgMerge(metric_data) AS metric_data
FROM source_aggregated_mv
GROUP BY id, timecode;
Он возвращает:
id | timecode | metric_data |
1 | 2023–03–05 00:00:00 | 10 |
2 | 2023–03–05 00:00:00 | 15 |
Вставим вторую группу данных:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:01:00', 20),
(2, '2023-03-05 00:01:00', 25),
(1, '2023-03-05 00:02:00', 30);
Запрос возвращает:
id | timecode | metric_data |
1 | 2023–03–05 00:00:00 | 15 |
1 | 2023–03–05 00:02:00 | 30 |
2 | 2023–03–05 00:00:00 | 20 |
Как видите, матвью успешно справилась с задачей, корректно рассчитав агрегированные значения метрики.
Шаг 2. Матвью с передачей данных в таблицу AggregatedMergeTree на 1 ноде
Итак, мы успешно создали материализованное представление с движком AggregatingMergeTree, которое агрегирует данные из нашей исходной таблицы source с 2-минутными интервалами.
Забегу немного в будущее. Напомню, наша задача — агрегировать данные по нескольким временным интервалам. В обычной вьюхе эту задачу можно было бы решить с помощью оператора UNION ALL, но матвью в Clickhouse не поддерживает данный оператор. Будем решать задачу с помощью набора матвьюх, каждая из которых будут агрегировать данные со своим интервалом, а результат будем складывать в одну таблицу. Об этом подробно поговорим на шаге 3, а пока разберемся как сохранить агрегированные данные 1 матвью в таблицу.
Создадим целевую таблицу target с тем же движком. Мы также настроим процесс, который будет автоматически передавать данные из source в target с помощью матвью source_aggregated_mv. Обратите внимание, что операторы PARTITION BY и ORDER BY, используемые для целевой таблицы, идентичны операторам в матвью source_aggregated_mv из 1 шага.
Также будьте внимательны при задании типа данных, агрегационная функция и тип обрабатываемых данных должны быть заданы корректно, в обратном случае процесс работать не будет. Например, вы хотите получить последнее значение в столбце типа Decimal (9, 2), но в целевой таблице указали тип столбца AggregateFunction (argMax, UInt32, DateTime), тогда вы получите ошибку:
Code: 70. DB: Exception: Conversion from AggregateFunction (argMax, Decimal (9, 2), DateTime) to AggregateFunction (argMax, UInt32, DateTime) is not supported: while converting source column metric_data to destination column metric_data (CANNOT_CONVERT_TYPE)
Рекомендую перед созданием таблицы и матвью запустить SELECT на ограниченном объеме данных, чтобы выяснить типы.
SQL для создания target таблицы и матвью:
CREATE TABLE target
(
id UInt32,
timecode DateTime,
metric_data AggregateFunction(avg, Float32)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(timecode)
ORDER BY (id, timecode);
CREATE MATERIALIZED VIEW source_aggregated_mv TO target
AS
SELECT id,
toStartOfInterval(timecode_1, toIntervalMinute(2)) AS timecode,
avgState(metric_data) AS metric_data
FROM source
GROUP BY id, timecode;
Протестируем ETL процесс, вставив некоторые данные в source таблицу и проверив, правильно ли агрегируются и передаются данные в target таблицу. Вставим тестовые данные теми же блоками, как и в предыдущей задаче:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:00:00', 10),
(2, '2023-03-05 00:00:00', 15);
Запросим данные из целевой таблицы:
SELECT id,
timecode,
avgMerge(metric_data) AS metric_data
FROM target
GROUP BY id, timecode;
Запрос вернет:
id | timecode | metric_data |
1 | 2023–03–05 00:00:00 | 10 |
2 | 2023–03–05 00:00:00 | 15 |
Вставим в таблицу source вторую пачку данных:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:01:00', 20),
(2, '2023-03-05 00:01:00', 25),
(1, '2023-03-05 00:02:00', 30);
Запрос в target вернет:
id | timecode | metric_data |
1 | 2023–03–05 00:00:00 | 15 |
1 | 2023–03–05 00:02:00 | 30 |
2 | 2023–03–05 00:00:00 | 20 |
Как мы видим, данные были правильно агрегированы и переданы в целевую таблицу с помощью материализованного представления.
Шаг 3. Агрегируем данные с разными периодами
Как я сказал ранее, пользователи хотят запрашивать данные, агрегированные на разных временных интервалах. Нам надо эти агрегированные данные заранее подготовить, и эту задачу мы будем решать с помощью нескольких матвьюх, передающих результаты в одну таблицу.
Таким образом, следующий пункт — разработать процесс, который агрегирует данные с различными временными интервалами и сохраняет результат в целевой таблице.
Первый шаг — добавить столбец timespan в запрос CREATE для target таблицы:
CREATE TABLE target
(
id UInt32,
timespan UInt32,
timecode DateTime,
metric_data AggregateFunction(avg, Float32)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(timecode)
ORDER BY (id, timespan, timecode);
Следующим шагом является создание материализованных представлений с функциями AggregatingMergeTree для агрегирования данных. Матвью source_aggregated_2_mv агрегирует данные с интервалом в 2 минуты, source_aggregated_3_mv делает то же самое, но с интервалом в 3 минуты:
CREATE MATERIALIZED VIEW source_aggregated_2_mv TO target
AS
SELECT id,
2 AS timespan,
toStartOfInterval(timecode, INTERVAL 2 MINUTE) AS timecode,
avgState(metric_data) AS metric_data
FROM source
GROUP BY id,
timespan,
toStartOfInterval(timecode, INTERVAL 2 MINUTE);
CREATE MATERIALIZED VIEW source_aggregated_3_mv TO target
AS
SELECT id,
3 AS timespan,
toStartOfInterval(timecode, INTERVAL 3 MINUTE) AS timecode,
avgState(metric_data) AS metric_data
FROM source
GROUP BY id,
timespan,
toStartOfInterval(timecode, INTERVAL 3 MINUTE);
Как видите, материализованные представления в основном такие же, как и раньше, но с некоторыми изменениями:
Мы добавили столбец timespan после столбца id. Значение этого столбца равно 2 для первой матвьюхи и 3 для второй, и это постоянное значение, которое не рассчитывается на основе данных.
Мы также добавили временной интервал в качестве группирующего столбца в предложении GROUP BY, чтобы он был включен в группировку вместе с идентификатором и временным кодом.
Проверим работоспособность. Вставим данные:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:00:00', 10),
(2, '2023-03-05 00:00:00', 15);
Затем мы обратимся к target таблице, используя следующий запрос:
SELECT id,
timespan,
timecode,
avgMerge(metric_data) AS metric_data
FROM target
GROUP BY id, timespan, timecode;
Получили следующее:
id | timespan | timecode | metric_data |
1 | 2 | 2023–03–05 00:00:00 | 10 |
1 | 3 | 2023–03–05 00:00:00 | 10 |
2 | 2 | 2023–03–05 00:00:00 | 15 |
2 | 3 | 2023–03–05 00:00:00 | 15 |
Вставим второй набор данных в source таблицу:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:01:00', 20),
(2, '2023-03-05 00:01:00', 25),
(1, '2023-03-05 00:02:00', 30);
Запустим тот же запрос и получим:
id | timespan | timecode | metric_data |
1 | 2 | 2023–03–05 00:00:00 | 15 |
1 | 2 | 2023–03–05 00:02:00 | 30 |
1 | 3 | 2023–03–05 00:00:00 | 20 |
2 | 2 | 2023–03–05 00:00:00 | 20 |
2 | 3 | 2023–03–05 00:00:00 | 20 |
Как видите, данные были точно агрегированы с различными интервалами и перемещены в target таблицу.
Шаг 4. Масштабируем решение на кластер
Мы разобрались как создавать инфраструктуру, которая агрегирует данные на разных периодах и складывает их в одну таблицу. Теперь пользователь может выбирать гранулярность данных, запрашивать их и очень быстро получать ответ. Никакой агрегации в запросе, всё «летает». Но все действия мы производили в рамках одной ноды. Теперь же наша задача масштабировать наше решение на реплицированный кластер, состоящий из 3 нод.
Не буду показывать как создавать источник данных, лишь уточню, что теперь он состоит из таблицы source с движком Distributed и таблицы source_shard с движком ReplicatedMergeTree.
Целевая таблица теперь также состоит из двух, покажем создание этих таблиц:
CREATE TABLE target ON CLUSTER ‘cluster’
(
id UInt32,
timespan UInt32,
timecode DateTime,
metric_data AggregateFunction(avg, Float32)
)
ENGINE = Distributed ('cluster', ‘target_shard’, id);
CREATE TABLE target_shard ON CLUSTER ‘cluster’
(
id UInt32,
timespan UInt32,
timecode DateTime,
metric_data AggregateFunction(avg, Float32)
)
ENGINE = ReplicatedAggregatingMergeTree(/clickhouse/tables‘/{shard}/target ’, ‘{replica}’)
PARTITION BY toYYYYMM(timecode)
ORDER BY (id, timespan, timecode);
Последний шаг — создать матвьюхи, которые будут агрегировать данные с разными интервалами. Важно заметить, первая таблица, к которой обращается матвью должна быть локальной. В документации написано, что матвью может обращаться и к локальной, и к распределенной таблице, но моя практика показала, что если обратиться к Distributed таблице, матвью не обработает данные. Плюс разработчики clickhouse рекомендуют при работе с матвью пользоваться схемой «from local to local», так как это эффективнее, снижает нагрузку на сеть, а значит быстрее. Вставлять данные из матвью также желательно в локальную таблицу на той же ноде, причину описал выше.
CREATE MATERIALIZED VIEW source_aggregated_2_mv ON CLUSTER ‘cluster’ TO target_shard
AS
SELECT id,
2 AS timespan,
toStartOfInterval(timecode, INTERVAL 2 MINUTE) AS timecode,
avgState(metric_data) AS metric_data
FROM source_shard
GROUP BY id,
timespan,
toStartOfInterval(timecode, INTERVAL 2 MINUTE);
CREATE MATERIALIZED VIEW source_aggregated_3_mv ON CLUSTER ‘cluster’ TO target_shard
AS
SELECT id,
3 AS timespan,
toStartOfInterval(timecode, INTERVAL 3 MINUTE) AS timecode,
avgState(metric_data) AS metric_data
FROM source_shard
GROUP BY id,
timespan,
toStartOfInterval(timecode, INTERVAL 3 MINUTE);
Проверим работоспособность. Вставим данные в распределенную таблицу:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:00:00', 10),
(2, '2023-03-05 00:00:00', 15);
Затем мы обратимся к распределенной target таблице, используя следующий запрос:
SELECT id,
timespan,
timecode,
avgMerge(metric_data) AS metric_data
FROM target
GROUP BY id, timespan, timecode;
Получили следующее:
id | timespan | timecode | metric_data |
1 | 2 | 2023–03–05 00:00:00 | 10 |
1 | 3 | 2023–03–05 00:00:00 | 10 |
2 | 2 | 2023–03–05 00:00:00 | 15 |
2 | 3 | 2023–03–05 00:00:00 | 15 |
Вставим второй набор данных в source таблицу:
INSERT INTO source (id, timecode_1, metric_data)
VALUES (1, '2023-03-05 00:01:00', 20),
(2, '2023-03-05 00:01:00', 25),
(1, '2023-03-05 00:02:00', 30);
Запустим тот же запрос и получим:
id | timespan | timecode | metric_data |
1 | 2 | 2023–03–05 00:00:00 | 15 |
1 | 2 | 2023–03–05 00:02:00 | 30 |
1 | 3 | 2023–03–05 00:00:00 | 20 |
2 | 2 | 2023–03–05 00:00:00 | 20 |
2 | 3 | 2023–03–05 00:00:00 | 20 |
Как видите, данные были точно агрегированы с различными интервалами и перемещены в target таблицу.
Подведем итог:
Теперь наши данные хранятся в заранее агрегированном виде, распределены между 3 нодами, реплицированы; при запросе пользователем данных с определенной гранулярностью Clickhouse не придется ничего агрегировать с нуля, а значит данные вернутся быстрее. Также данный подход позволит «размазать» нагрузку на DWH, которая бы возникла при обычной агрегации, во времени. Как видите, выигрывает и клиент, и DWH.
PS:
Помните, AggregatingMergeTree — это не серебряная пуля в решении задачи агрегации и хранения агрегированных данных. Данные хранятся в предагрегированном виде, и при запросе их все равно придется преобразовывать. Поэтому, если вы можете организовать агрегацию и хранение данных вне таблиц с AggregatingMergeTree, делайте это.