Как работать со словарями данных и оптимизировать запросы в ClickHouse
Приветствуем! На связи вновь Глеб Кононенко и Алексей Диков — разработчики из Лиги Цифровой Экономики. Ранее мы уже немного рассказывали про наш опыт работы с распределенными таблицами в ClickHouse в этой статье.
Сегодня хотим поделиться опытом оптимизации запросов и работы со словарями данных. Используемая версия ClickHouse: 23.8.7.24
Напомним характеристики нашего проекта:
Данные грузятся каждые 15 минут
Постоянно приходит дублирующая информация
Необходимо хранить данные в течение 5 лет
В среднем в сутки приходит 150 млн строк (пик — до 13 млрд/сут)
В базе 1266 млрд строк, в сжатом виде 61 Тб, в несжатом — 585 Тб
Оглавление
Создание тестовых таблиц
Джойн. Первые шаги, важность порядка
Настройки для запросов
Пример оптимизации в ClickHouse
Работа со словарями
Создание тестовых таблиц
Создадим две таблицы на движке MergeTree. Первая таблица — факты, в которой поля типа String будем сразу по умолчанию наполнять строками разной длины:
create table fct_contract(
org_id UUID,
contract_num String
DEFAULT randomPrintableASCII(randUniform(10, 15)),
name String
DEFAULT randomPrintableASCII(randUniform(15, 30)),
value UInt32,
create_date Date
)
ENGINE = MergeTree
ORDER BY (org_id,contract_num);
Вторая таблица — справочник, поле ИНН генерируется случайным образом в пределах 10 тысяч значений:
create table dim_name(
org_id UUID,
name String,
inn UInt64 DEFAULT randUniform(100000000000, 100000010000)
)
ENGINE MergeTree
ORDER BY (org_id, name);
Вставим в первую таблицу 15 млн записей:
insert into fct_contract (org_id, value, create_date)
SELECT
UUIDNumToString(murmurHash3_128(toString(rand()%10000))) as org_id,
value,
now()::date - randUniform(1, 10000)
FROM generateRandom('value UInt32')
LIMIT 15000000;
Джойн. Первые шаги, важность порядка
Пока у нас есть одна наполненная и одна пустая таблицы. Даже в этом случае операция джойн может не выполниться из-за нехватки памяти, если мы неправильно выберем порядок соединения таблиц.
Попробуем. Для этого ограничим память для выполнения запросов в ~1 гигабайт:
select org_id, contract_num, name, value, create_date, inn
from dim_name
join fct_contract
on fct_contract.org_id = dim_name.org_id
and fct_contract.name = dim_name.name
settings max_memory_usage = 1000000000;
Памяти не хватает, и мы получаем ошибку:
Received exception from server (version 23.8.7):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 955.75 MiB (attempt to allocate chunk of 4244058 bytes), maximum: 953.67 MiB.: (while reading column name): (while reading from part /var/lib/clickhouse/store/d30/d30dbd5e-bad0-4bb1-859c-7f26f955c2cc/all_186_186_0/ in table default.fct_contract (d30dbd5e-bad0-4bb1-859c-7f26f955c2cc) located on disk default of type local, from mark 62 with max_rows_to_read = 8192): While executing MergeTreeThread. (MEMORY_LIMIT_EXCEEDED)
А теперь поменяем таблицы в запросе местами и запустим снова:
select org_id, contract_num, name, value, create_date, inn
from fct_contract
join dim_name
on fct_contract.org_id = dim_name.org_id
and fct_contract.name = dim_name.name
settings max_memory_usage = 1000000000;
0 rows in set. Elapsed: 0.011 sec. Processed 262.14 thousand rows, 19.39 MB (23.49 million rows/s., 1.74 GB/s.)
Peak memory usage: 24.86 MiB.
Запрос отрабатывает, ничего не возвращая, потому что справочник в настоящий момент не заполнен. Дело в том, что ClickHouse при операции соединения в первую очередь загружает в оперативную память правую таблицу из запроса.
Наполним наш справочник:
insert into dim_name (org_id, name)
select org_id, name from fct_contract;
Попробуем опять выполнить запрос:
select org_id, contract_num, name, value, create_date, inn
from fct_contract
join dim_name
on fct_contract.org_id = dim_name.org_id
and fct_contract.name = dim_name.name
settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 1.38 GiB (attempt to allocate chunk of 1342177280 bytes), maximum: 953.67 MiB.: While executing FillingRightJoinSide. (MEMORY_LIMIT_EXCEEDED)
Теперь и ему не хватает памяти… Попробуем это исправить.
Настройки для запросов
У ClickHouse есть разные алгоритмы соединения таблиц. По умолчанию используется hash-алгоритм, который записывает хеш-значения для соединяемых полей правой таблицы в оперативную память. Затем он ищет среди хешей совпадения для данных из левой таблицы. Такой процесс достаточно эффективен по скорости выполнения, но не очень — в плане потребления ресурсов.
Медленный, но менее ресурсозатратный алгоритм — partial_merge. При его использовании сначала сортируются ключи правой таблицы и затем поблочно (partial_merge_join_rows_in_right_blocks) записываются в оперативную память вместе с индексами каждого блока, до тех пор пока корзина (выделенный на запрос объем оперативной памяти) не заполнится. Затем данные сбрасываются на диск. Далее такая же операция происходит с левой таблицей, и уже затем данные сопоставляются с использование min-max индексов.
Подробнее про алгоритмы вы можете прочитать в этой статье.
Включим этот алгоритм:
select org_id, contract_num, name, value, create_date, inn
from fct_contract
join dim_name
on fct_contract.org_id = dim_name.org_id
and fct_contract.name = dim_name.name
settings max_memory_usage = 1000000000,
join_algorithm = 'partial_merge',
default_max_bytes_in_join= 500000000;
15000000 rows in set. Elapsed: 22.969 sec. Processed 30.00 million rows, 1.94 GB (1.31 million rows/s., 84.24 MB/s.)
Peak memory usage: 927.08 MiB.
Отлично! Наш запрос отработан за 23 секунды.
На какие еще настройки нужно обратить внимание?
Рассмотрим ситуацию, в которой нам надо упорядочить таблицу fct_contract не по заданной при создании таблицы сортировке, а по произвольному полю:
select org_id, contract_num, name, value, create_date
from fct_contract
order by name
settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 954.38 MiB (attempt to allocate chunk of 4614592 bytes), maximum: 953.67 MiB.: (avg_value_size_hint = 31.0965576171875, avg_chars_size = 27.715869140625, limit = 16384): (while reading column name): (while reading from part /var/lib/clickhouse/store/d30/d30dbd5e-bad0-4bb1-859c-7f26f955c2cc/all_186_186_0/ in table default.fct_contract (d30dbd5e-bad0-4bb1-859c-7f26f955c2cc) located on disk default of type local, from mark 86 with max_rows_to_read = 16384): While executing MergeTreeThread. (MEMORY_LIMIT_EXCEEDED)
Скрипт не отрабатывает. Чтобы это исправить, добавим настройку max_bytes_before_external_sort. Она будет сбрасывать на диск отсортированные данные при достижении указанного значения. Выделим на сортировку 500 Мб. Вот только этого может не хватить, чтобы избежать ошибки с недостатком памяти. Дело в том, что ClickHouse умеет запускать несколько потоков для выполнения запроса, и для каждого будет работать наше ограничение в 500 Мб.
Чтобы этого избежать, требуется ограничить количество потоков. Это можно сделать с помощью настройки max_threads:
select org_id, contract_num, name, value, create_date
from fct_contract
order by name
settings max_memory_usage = 1000000000,
max_bytes_before_external_sort = 500000000,
max_threads = 1;
15000000 rows in set. Elapsed: 19.259 sec. Processed 15.28 million rows, 1.13 GB (793.45 thousand rows/s., 58.72 MB/s.)
Peak memory usage: 539.32 MiB.
Готово!
А теперь мы захотели сгруппировать таблицу по полю, не входящему в значения «order by». По логике архитектуры в ClickHouse у нас не должно быть таких запросов, они явно сигнализируют о том, что мы неправильно задали ключ сортировки. Но будем исходить из того, что это разовая история.
select name, sum(value)
from fct_contract
group by name
settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 958.75 MiB (attempt to allocate chunk of 0 bytes), maximum: 953.67 MiB.: While executing AggregatingTransform. (MEMORY_LIMIT_EXCEEDED)
Добавим настройку для группировки, которая будет также сбрасывать данные на диск после достижения выделенного объема. Теперь отсортируем результат и все сделаем в одном потоке.
select name, sum(value)
from fct_contract
group by name
order by sum(value) desc
settings max_memory_usage = 1000000000,
max_bytes_before_external_sort = 500000000,
max_bytes_before_external_group_by = 500000000,
max_threads = 1;
15000000 rows in set. Elapsed: 14.964 sec. Processed 35.51 million rows, 1.44 GB (2.37 million rows/s., 96.55 MB/s.)
Peak memory usage: 523.50 MiB.
Следующая задача: нам надо соединить таблицы и просуммировать значения.
select inn, sum(value)
from fct_contract
join dim_name
on fct_contract.org_id = dim_name.org_id
and fct_contract.name = dim_name.name
group by inn
settings max_memory_usage = 1000000000,
join_algorithm = 'partial_merge',
default_max_bytes_in_join= 500000000,
max_bytes_before_external_group_by= 500000000,
max_threads = 1;
Received exception from server (version 23.8.7):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 955.32 MiB (attempt to allocate chunk of 5077241 bytes), maximum: 953.67 MiB.: While executing SourceFromNativeStream. (MEMORY_LIMIT_EXCEEDED)
Мы снова получаем ошибку из-за ограничений памяти. На текущий момент при соединениях настройки сортировки и группировки не отрабатывают должным образом.
Пример оптимизации в ClickHouse
Мы решили проанализировать наш предыдущий скрипт и пришли к выводу, что нам достаточно суммировать значения по конкретному org_id. Давайте попробуем запустить скрипт без каких-либо дополнительных настроек, предварительно выбрав любой org_id из сгенерированных в нашей таблице:
select inn, sum(value)
from fct_contract as contract
join dim_name
on contract.org_id = dim_name.org_id
and contract.name = dim_name.name
where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
group by inn
settings max_memory_usage = 1000000000;
Received exception from server (version 23.8.7):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (for query) exceeded: would use 1.38 GiB (attempt to allocate chunk of 1342177280 bytes), maximum: 953.67 MiB.: While executing FillingRightJoinSide. (MEMORY_LIMIT_EXCEEDED)
Вполне ожидаемо: наш запрос не помещается в оперативную память. Добавим настройки:
select inn, sum(value)
from fct_contract as contract
join dim_name
on contract.org_id = dim_name.org_id
and contract.name = dim_name.name
where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
group by inn
settings max_memory_usage = 1000000000,
join_algorithm = 'partial_merge',
default_max_bytes_in_join= 500000000;
1376 rows in set. Elapsed: 5.245 sec. Processed 15.05 million rows, 827.51 MB (2.87 million rows/s., 157.77 MB/s.)
Peak memory usage: 857.55 MiB.
Запрос выполнился, но насколько оптимально?
При соединении таблиц в ClickHouse крайне важна предварительная выборка данных. Давайте из обеих таблиц отфильтруем данные в подзапросах до джойна и посмотрим, что получится:
select inn, sum(value)
from (
select org_id, name, value
from fct_contract
where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
) as contract
join (
select org_id, name, inn
from dim_name
where org_id = '9f98b98b-1935-2faf-0002-f45721ee9f37'
) as dim
on contract.org_id = dim.org_id
and contract.name = dim.name
group by inn
settings max_memory_usage = 1000000000;
1376 rows in set. Elapsed: 0.010 sec. Processed 90.11 thousand rows, 1.55 MB (9.43 million rows/s., 162.36 MB/s.)
Peak memory usage: 10.35 MiB.
Мы видим, что по итогам предварительного отбора данных вместо чтения 15 миллионов строк было прочитано 90 тысяч, пик потребления памяти уменьшился более чем в 80 раз. Скорость выполнения выросла в сотни раз.
Работа со словарями
Но если мы не можем отобрать данные из таблиц и нам целиком надо их соединить и агрегировать?
Есть в ClickHouse такая сущность, как словарь. Словари бывают внутренние и внешние (хранят данные из внешних источников). Они могут быть использованы для оптимизации запросов как альтернативы операциям соединения (join-ам). Словари хранят данные в парадигме ключ-значение частично или полностью в оперативной памяти. Ключ для значения указывается в секции Primary Key, перечисление атрибутов в теле запроса обязательно должно начинаться с ключа.
Неверно:
CREATE DICTIONARY dic_name (name String, id UInt64) PRIMARY KEY id..
Верно:
CREATE DICTIONARY dic_name (id UInt64, name String) PRIMARY KEY id...
Кроме того, при создании словаря нам необходимо указывать следующие секции:
Source — источник, которым может быть как таблица внутри самого ClickHouse, так и информация из внешних источников (другая СУБД, внешний файл или http-источник).
Lifetime — периодичность автоматического обновления словаря из источника.
Layout — тип размещения словаря в памяти.
Создадим на основе нашей таблицы-справочника словарь. В качестве типа размещения выберем complex_key_hashed_array, так как соединение словаря с таблицей будет происходить по нескольким полям. Такой тип более экономичен по потреблению памяти по сравнению с рекомендуемым в документации complex_key_hashed.
Подробнее про типы хранения словарей в памяти можно почитать в официальной документации.
CREATE DICTIONARY dic_dim_name
(
org_id UUID,
name String,
inn UInt64
)
PRIMARY KEY org_id, name
SOURCE(CLICKHOUSE(DB 'default' TABLE 'dim_name'))
LIFETIME(0)
LAYOUT(complex_key_hashed_array());
Запустим запрос словарю, чтобы он прогрузился, и проверим объем занимаемой оперативной памяти:
select *
from dic_dim_name;
select name, formatReadableSize(bytes_allocated)
from system.dictionaries
where name = 'dic_dim_name';
┌─name─────────┬─formatReadableSize(bytes_allocated)─┐
│ dic_dim_name │ 910.30 MiB │
└──────────────┴─────────────────────────────────────┘
В нашем случае словарь занял 910.30 MiB. Теперь попробуем сделать нашу агрегацию, но вместо операции соединения мы извлекаем значения из словаря с помощью функции dictGet. В ней указываем наименование словаря, нужные поля и ключ соединения. Если ключ соединения составной, как в нашем случае, его необходимо собрать в кортеж, обернув в функцию tuple ().
select
dictGet('dic_dim_name', 'inn', tuple(org_id, name)) as inn,
sum(value)
from fct_contract
group by inn;
10000 rows in set. Elapsed: 0.888 sec. Processed 15.00 million rows, 765.01 MB (16.89 million rows/s., 861.45 MB/s.)
Peak memory usage: 20.56 MiB.
Мы не только получили верный результат, наш запрос отработал за 888 миллисекунд! В то время как предыдущий запрос без агрегации, но с джойном двух таблиц по алгоритму partial_merge отрабатывал за 23 секунды.
У функции dictGet есть альтернативы. Для добавления значений по умолчанию при отсутствии совпадения можно использовать dictGetOrDefault, это аналог Left Join. С помощью dictHas в секции where мы может отрезать часть данных таблицы, для которых нет соответствующего ключа в словаре. В документации описаны и другие функции.
Словари значительно эффективнее джойнов: с их помощью можно увеличить скорость расчетов, а также извлекать данные из разных источников (другие базы, файлы и даже URL). ClickHouse прекрасно работает со словарями и позволяет размещать в них десятки миллионов значений.
Но есть и минусы:
1) Данные могут устаревать, и словарям надо обновляться. Мы можем настроить период жизни данных, через который словарь сделает это автоматически, но для больших словарей это может занять довольно длительное время.
2) Словарь загружается в память и хранится там на постоянной основе:
при обращении к нему командой select * from dict_name;
командой system reload dictionary dict_name;
при старте сервера, если параметр dictionaries_lazy_load выставлен в false.
Словари могут забирать много оперативной памяти и сами ее не освобождают. При dictionaries_lazy_load=false не поможет даже перезагрузка сервера. Если словарь постоянно используется и есть выгода в скорости обработки данных, то все хорошо. Но если он применяется однократно в каком-то большом расчете, стоит вытолкнуть его из оперативной памяти. Для этого можно открепить словарь (detach), при этом СУБД «забудет» о его существовании, и он будет вытеснен из памяти, но все метаданные будут сохранены. После этого подсоединяем словарь обратно.
Открепляем словарь:
detach dictionary dic_dim_name;
Возвращаем на место:
attach dictionary dic_dim_name;
Проверим занимаемое место:
select name, formatReadableSize(bytes_allocated)
from system.dictionaries
where name = 'dic_dim_name';
┌─name─────────┬─formatReadableSize(bytes_allocated)─┐
│ dic_dim_name │ 0.00 B │
└──────────────┴─────────────────────────────────────┘
Как видим, теперь словарь не занимает оперативной памяти.
Итак, в этом материале мы рассказали про важные настройки в ClickHouse, которые позволяют успешно работать с базой данных:
join_algorithm — если мы используем джойны в запросах, мы должны контролировать алгоритм выполнения;
max_bytes_before_external_sort — для нетипичной сортировки таблицы;
max_bytes_before_external_group_by — для объемных агрегаций;
max_memory_usage — позволяет контролировать память, доступную запросу.
Еще мы наглядно продемонстрировали, как важно заниматься предварительным отбором данных перед соединением таблиц, сократив время выполнения скрипта в сотню раз! А также показали, как успешно применять словари в запросах для избегания операций соединения, и научились освобождать их из памяти после использования.
В следующей статье планируем рассказать, как мы запилили бэкап распределенной СУБД.