Миграция данных Cassandra с помощью Cassandra Data Migrator

Привет, Хабр! Меня зовут Александр, я системный инженер и занимаюсь администрированием и внедрением платформы ZIIoT компании «Цифра», которая разрабатывает и внедряет промышленные цифровые решения на базе собственной платформы ZIIoT, предоставляет доступ к среде разработки платформенных приложений, а также развивает индустрию роботизированного промышленного транспорта.
Я хочу рассказать, как мы решаем задачи миграции данных кластеров Cassandra с помощью инструмента Cassandra Data Migrator (CDM). Выбор этой темы продиктован продуктовыми задачами по корректной миграции данных между кластерами Cassandra сложной топологии. Статей с обзором этого решения в русскоязычном сегменте я не нашел и решил описать сам, так как оно будет полезно всем администраторам СУБД Cassandra.
Чем продиктован выбор именно этого решения для миграции данных?
Однажды мы столкнули на проекте с задачей простой миграции данных между кластерами Cassandra, в которой было требование какого-то иного решения, чем простое перетягивание снапшотов и восстановление из них через sstableloader. Задача так же осложнялись тем, что на одном из проектов не было как ssh доступа к самим нодам кластеров Cassandra, так и возможности затащить и установить что-то «совсем opensource-ное» вроде Medusa. Требовалось решение, которое будет поставляться самим разработчиком Cassandra, либо вовлеченными в разработку компаниями, и которое можно будет использовать, завернув в контейнер. На удивление такое решение было найдено в виде готовой, мощной, живой и поддерживаемой разработки от DataStax, компании, C# драйвера, от которой по умолчанию и устанавливаются при развертывании Cassanda. Cassandra Data Migrator.
Сложности, которые решает данный инструмент
Начнем сначала, у нас есть простая задача миграции данных между двумя кластерами Cassandra с пятью нодами и replication_factor=3. Как это сделать штатными средствами, если данные неравномерно размазаны по пяти нодам кластера? Для резервной копии можно использовать стандартную связку bash-скриптов, s3 хранилище либо другое, nodetool snapshot и стандартные инструменты перемещения данных вроде rcync. Всё просто и описано на многих форумах, даже в официальных рекомендациях от некоторых разработчиков. Но вот что касается восстановления всё интереснее: я не нашел ни одного внятного описания на форумах, как восстанавливать данные на больших кластерах с replication_factor отличным от 1, не говоря уже об официальной документации. И если мы сравним данные при выгрузке с разных нод, то увидим файлы БД с идентичными названиями, но разными объемом. Понятно, что восстановление через sstableloader работает и есть некая «магия» Cassandra, которая позволит восстановить поочередно каждую ноду и скорей всего мы получим корректные данные в таблицах. Но, это не точно и нигде не описано, нужно какое-то надежное решение. Cassandra Data Migrator позволит с легкостью мигрировать даже данные из кластера, к примеру, с пятью нодами и replication_factor=3 в кластер, состоящий из одной ноды.

Практическое использование
Ссылки и документация GitHub
GitHub
Документация с описанием установки и использования Cassandra Data Migrator
DockerHub
Описание и установка
По сути это Spark приложение, поставляемое в виде *.jar файла. Можно установить необходимые зависимости (Java 11 и Spark 3.5.1) и запустить процесс миграции локально. Либо выгрузить docker образ из официального репозитория с DockerHub и запустить в Docker либо в K8s. Кстати, в контейнере полно всего, даже vim имеется :) Все подробные инструкции по установке доступны по ссылкам выше.
Использование
Рассмотрим использование приложения в контейнере. Загрузка docker образа, запуск и подключение к контейнеру стандартны:
docker image pull datastax/cassandra-data-migrator:4.3.3
docker container run --name cassandra-data-migrator-433 --detach datastax/cassandra-data-migrator:4.3.3
docker container exec -it cassandra-data-migrator-433 /bin/bash
Далее переходим в директорию с самим приложением, и на основании имеющегося файла конфигурации cdm.properties создаем свой, в котором мы укажем источник данных и приемник. Причем отдельные параметры конфигурации можно указать даже не в файле, а в аргументах при последующем запуске миграции:
cd /assets/
vim my-cdm.properties
В файле будет указано следующее:
# origin details spark.cdm.connect.origin.host 172.16.1.50
spark.cdm.connect.origin.port 9042
spark.cdm.connect.origin.username cassandra
spark.cdm.connect.origin.password cassandra
spark.cdm.schema.origin.keyspaceTable ziiot.example_table
# target details
spark.cdm.connect.target.host 172.16.1.55
spark.cdm.connect.target.port 9042
spark.cdm.connect.target.username cassandra
spark.cdm.connect.target.password cassandra
spark.cdm.schema.target.keyspaceTable ziiot.example_table
# other details
spark.cdm.autocorrect.missing false
spark.cdm.autocorrect.mismatch false
Всё, параметры подключений к необходимым keyspace/table у источника и приемника данных Cassandra мы перечислили, можно запускать процесс миграции:
spark-submit \ --properties-file
my-cdm.properties
\
--master "local[*]" \
--driver-memory 8G \
--executor-memory 8G \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.3.3.jar &> \
/tmp/cassandra-data-migrator_$(date +%Y%m%d_%H_%M).txt
где:
--properties-file — указание расположения файла с конфигурацией
--master — URL кластера Spark для подключения, в нашем случает он локальный
--driver-memory — объем памяти, который будет выделен для драйвера
--executor-memory — объем памяти, который будет выделен для исполнителей
--class — класс приложения.
Как и упоминалось ранее, таблицы можно указать через аргумент »--conf» в командной сроке, а не в файле конфигурации. Например, так:
…
--executor-memory 8G \
--conf spark.cdm.schema.origin.keyspaceTable=«ziiot.example_table»
--conf spark.cdm.schema.target.keyspaceTable=«ziiot.example_table »
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.3.3.jar &> \
…
где в конфигурации:
spark.cdm.autocorrect.missing — Если задано значение true, данные, отсутствующие в целевом кластере, но обнаруженные в исходном кластере, повторно переносятся в целевой кластер.
spark.cdm.autocorrect.mismatch — Если значение true, данные, которые различаются между исходным и целевым кластерами, согласовываются.
Надежность передачи данных
Что касается корректности передаваемых данных, то это можно проверить как подсчетом строк в таблице, так и суммой значений какого-нибудь столбца, если таблица не превышает примерно пары гигабайт.SELECT sum(dv) FROM ziiot.example_table;
SELECT COUNT(*) FROM ziiot.example_table;
Если таблица больше, то скорей всего придется править конфиг Cassandra, увеличивая время таймаута. Для теста можно поиграться со следующими значениями файла: cassandra.yaml:
read_request_timeout:
range_request_timeout:
write_request_timeout:
counter_write_request_timeout:
cas_contention_timeout:
truncate_request_timeout:
request_timeout:
Практическое применение здесь сомнительно, так как таблицы, как правило, значительно больше 2Гб, и при попытке просчитать сумму значений одного столбца либо количество строк, произойдет обрыв по таймауту. Этим можно просто проверить корректность работы инструмента
Скорость передачи данных
Скорость. Мягко говоря, она невысока, и зависит как от инфраструктуры, так и от того, что и куда мы мигрируем. Если вы мигрируете данные из тестового кластера, состоящего из одной ноды, в кластер с десятком нод, то всё пройдет относительно быстро, можно говорить о гигабайтах за 10 минут. Если же наоборот, то будет долго и нужно тестировать каждую ситуацию отдельно.

Вывод
Резюмируя, можно сказать, что есть задачи, которые можно успешно выполнить с помощью Cassandra Data Migrator, что дает возможность мигрировать данные между кластерами Cassandra разных топологий. Это позволило компании «Цифра» расширить используемый инструментарий, выработать наиболее эффективное решение для выполнения одного из проектов.