Как перейти на многонодовую архитектуру без боли. Или почти без боли

67a9831c423c88bca533b9956260e154.jpg

Есть у нас флагманский продукт под названием «Единый клиент», с которым работают десятки энтерпрайз-клиентов, оперирующих в нем сотнями миллионов записей. Продукт массивный, обвешанный аналитикой и различными сложными сервисами. В какой-то момент большая часть клиентов захотела получить общий uptime в 99,9%, чего сложно достичь без резервирования решения. И мы начали погружаться в эту тему практически с нуля.

В этом материале хочу поделиться нашим опытом перехода с одной ноды на 10+, расскажу о технических нюансах и решениях, которые мы использовали, а также про всякие боли в процессе перехода.

Меня зовут Николай, я архитектор «Единого клиента». Начну с короткого экскурса в сам продукт, чтобы было детальное представление о сути вопроса и нашей стартовой позиции.

Стартовая позиция

«Единый клиент» — это система класса MDM (Master Data Management), предназначенная для хранения и управления персональными данными всех клиентов организации. Основные заказчики — крупные банки, страховые компании, телеком, то есть организации, которые имеют множество внутренних систем с данными о клиентах.

Вот из чего состоит «Единый клиент»:

Основных сервисов два — «Единый клиент» и «Фактор» (алгоритмическая шкатулка). Сам «Единый клиент» состоит из ядра, в котором содержится основная динамическая модель клиента и механизмы обновления и слияния, а также из множества подключаемых функциональных и инфраструктурных модулей. На схеме красным выделены модули, у которых есть локальные данные на сервере приложений

Основных сервисов два — «Единый клиент» и «Фактор» (алгоритмическая шкатулка). Сам «Единый клиент» состоит из ядра, в котором содержится основная динамическая модель клиента и механизмы обновления и слияния, а также из множества подключаемых функциональных и инфраструктурных модулей. На схеме красным выделены модули, у которых есть локальные данные на сервере приложений

Перед «Единым клиентом» стоит сразу несколько основных задач:

  • Сбор данных из различных источников: система загружает данные из всех имеющихся источников (AБС, CRM, ERP, CDP, кредитные конвейеры и другие системы).

  • Стандартизация данных: система проверяет качество данных и приводит их к стандарту различных справочников. Например, адреса приводятся к стандарту ФИАС, проверяются телефоны и определяются их операторы и часовые пояса для удобной и корректной коммуникации с клиентами.

  • Объединение данных: система автоматически ищет дубликаты и объединяет данные о клиенте из разных систем, создавая «золотую», эталонную карточку клиента.

  • Оценка качества данных: система выделяет значимые компоненты данных и размечает их на предмет качества. Это включает разбор и стандартизацию Ф.И. О., даты рождения, телефонов, имейлов, удостоверяющих личность документов и почтовых адресов.

С представлением закончили, теперь поговорим, зачем такому проекту многонодовость.

Для чего тут многонодовость

На старте проекта наша система работала в изоляции и использовалась только для батчевых офлайн-загрузок данных. Данные сначала накапливались в стейджинг-зоне и затем раз в сутки загружались в основную систему. 

Эта схема в целом работала, но имела свои ограничения, главным из которых была невысокая актуальность данных. Однако на том этапе это было приемлемо, так как система не играла критической роли в работе компании и могла останавливаться на час-другой для проведения технических работ.

Со временем все изменилось. Появились онлайн-интеграции: как только запись появлялась в системе-источнике, она сразу отправлялась к нам. Перед сохранением данные нужно было очистить и проверить на корректность — например, разобрать адреса и ФИО. Интенсивность онлайн-потока росла с увеличением числа заказчиков. В итоге мы не могли позволить себе длительные остановки системы — клиенты ожидали круглосуточной доступности вплоть до 99,9% времени.

Чтобы поддерживать постоянную работу, мы начали разрабатывать многонодовую архитектуру. Это был первый шаг к обеспечению нулевых технических остановок, чтобы можно было обновлять систему без прерываний в онлайн-интеграции и работе дата-стюардов, которые круглосуточно обрабатывают и проверяют качество данных вручную. Многонодовая архитектура также обеспечивала отказоустойчивость. При выходе из строя одного из серверов система должна продолжать обслуживать клиентов без перерыва.

С ростом числа крупных заказчиков потребовалось обрабатывать все больше онлайн-запросов. Начав с одной ноды, мы перешли к двум, а теперь можем добавлять их сколько угодно: 5, 10 и более. Это позволяет увеличивать мощность системы для обработки чтений, которые происходят намного чаще, чем записи. Данные клиентов обновляются редко, но читаются и анализируются почти постоянно.

Ну, а сам процесс проектирования и перехода начинался с разбора сложностей.

Сложность 1: синхронизация локальных данных

Первое, с чем пришлось разбираться, — это синхронизация локальных данных на сервере приложений. Каждый сервер хранит локальные файлы и индексы для полнотекстового поиска на Lucene. Эти локальные индексы — сердце системы, ведь без них поиск данных был бы медленным и неудобным для пользователей. Клиенты могут искать информацию по любым параметрам: «Иванов Иван Москва»,»89991234567», «Игорь 9991234567», «ivanov@ya.ru». Однако локальные индексы создают проблемы при масштабировании. 

Если бы у нас не было локального состояния, можно было бы просто добавить второй сервер и использовать балансировщик нагрузки. Но поскольку у каждого сервера свои локальные индексы, их нужно как-то синхронизировать между всеми нодами, чтобы данные оставались согласованными и результаты поиска не различались. Для этого мы использовали EDA (Event-driven architecture). Выглядит она так:

7598a7b244b99385040e18af845a3231.png

А работает по таким принципам:

  • Создание события: при изменении данных (например, обновление адреса клиента) создается событие с информацией об изменении.

  • Публикация события: событие отправляется в очередь сообщений. Мы сначала использовали встроенные очереди (ArtemisMQ), а потом перешли на Kafka при кратном увеличении числа нод.

  • Чтение события другими нодами: ноды подписываются на очередь сообщений и читают события оттуда. Каждая нода обновляет свои локальные индексы и данные.

  • Обновление локальных данных: после чтения события нода применяет изменения к своим локальным индексам и файлам, синхронизируясь с другими нодами.

Такой подход обеспечивает асинхронную синхронизацию данных между нодами. Даже если одна нода временно выходит из строя, она может прочитать накопленные события и обновить свои данные, как только вернется в строй. Это гарантирует, что все ноды, хоть и с некоторой задержкой, всегда имеют актуальные данные. Минимизация этой задержки — отдельный челлендж.

Сложность 2: миграция базы данных

У нас одна общая БД для всех нод, которая является основным хранилищем данных о клиентах. Когда у нас была одна нода, обновление структуры таблиц было простым. Мы могли добавлять или удалять колонки, переименовывать таблицы и так далее. Но с появлением нескольких нод ситуация усложнилась.

Когда старая и новая версии приложений работают одновременно на одной БД, возникают проблемы несовместимости изменений и согласованности данных. Обновление структуры таблиц может вызвать ошибки, если одна из нод работает с новой схемой, а другая — со старой. Обе версии приложения должны иметь доступ к актуальным данным без нарушений их целостности.

Для решения этой проблемы мы разработали специальный поэтапный подход к миграции базы данных:

  1. Добавление новых элементов без удаления старых: на первом этапе мы добавляем новые колонки, таблицы или индексы, необходимые для новой версии приложения, без удаления старых элементов. Это позволяет новым нодам использовать обновленную структуру, в то время как старые ноды продолжают работать со старой.

  2. Заполнение новых элементов данными: мы создаем миграционные скрипты, которые копируют данные из старых колонок или таблиц в новые. Это обеспечивает согласованность данных между старыми и новыми структурами.

  3. Обновление логики приложения: после того как данные скопированы и проверены, мы обновляем логику приложения, чтобы оно начинало использовать новые колонки и таблицы. При этом старые элементы еще не удаляются, чтобы избежать возможных сбоев.

  4. Удаление старых элементов: на последнем этапе, когда все ноды обновлены и работают с новой структурой, мы удаляем старые колонки, таблицы или индексы. Это делается осторожно, чтобы не нарушить целостность данных.

Такой подход позволяет избежать простоев. Система продолжает работать во время миграции, так как старые и новые структуры данных существуют одновременно. Попутно обеспечивается согласованность данных: все ноды имеют доступ к актуальным данным на всех этапах миграции. А постепенное обновление и тестирование минимизируют риски и вероятность ошибок.

Сложность 3: сессии пользователей и распределенные блокировки

Третья проблема связана с управлением сессиями пользователей и распределенными блокировками. При множестве процессов Java-примитивы для синхронизации не подходят и нужно внешнее хранилище для блокировок. Мы выбрали базу данных, чтобы не добавлять новые компоненты и не усложнять развертывание. Это важно для системы в закрытом периметре заказчика с ограниченными средствами мониторинга. Чем меньше дополнительных точек отказа, тем надежнее система.

Для хранения сессий мы взяли готовую библиотеку spring-session-jdbc, а для распределенных блокировок разработали свою. Это решение синхронизирует сессии между нодами и управляет распределенными блокировками. 

Сессии сохраняются в специальной таблице в БД. Каждая нода имеет доступ к этой таблице, что позволяет ей читать и записывать данные сессий. Когда пользователь логинится, его сессия сохраняется в таблице. При последующих запросах используется session ID для поиска сессии, что позволяет обработать запрос независимо от того, на какую ноду он попал. Это обеспечивает централизованное хранение сессий и не зависит от конкретной ноды. Система надежна, потому что БД гарантирует целостность и доступность данных.

Распределенные блокировки также реализованы с помощью базы данных. В БД есть специальная таблица для блокировок.

Механизм работает так:  

1. Когда процесс хочет получить эксклюзивный доступ к данным, например к данным в буферных таблицах, он записывает информацию о блокировке в таблицу. Эта запись включает название блокировки и идентификатор процесса. 

2. Другие процессы, пытаясь получить ту же блокировку, сталкиваются с ошибкой уникального ключа в таблице, что сигнализирует им о занятой блокировке. Они ждут, пока блокировка не освободится. 

3. По завершении процесса он удаляет запись из таблицы, освобождая блокировку. Если процесс завершился некорректно и не удалил запись, блокировка автоматически снимется спустя заданный интервал протухания. 

Для карточек клиентов мы используем обычный подход с пессимистичными блокировками на БД. Прежде чем изменить данные клиента, процесс блокирует соответствующую строку в таблице с помощью SQL-инструкции:

SELECT * FROM PHYSICAL_CLIENT WHERE ID = ? FOR UPDATE

Эта инструкция блокирует строку, сигнализируя БД, что она собирается изменяться. Если другая нода пытается изменить ту же строку, она выполняет аналогичный запрос и ждет, пока текущая блокировка не будет снята. Когда процесс завершает изменения и выполняет коммит, блокировка снимается — и следующий процесс может начать свои изменения.

Таким образом, мы получаем эксклюзивный доступ, автоматическое управление блокировками и синхронизацию между нодами. Все ноды имеют доступ к одной и той же БД, что гарантирует консистентность данных.

Опыт с ArtemisMQ

Еще на этапе разработки для передачи событий синхронизации мы решили использовать ArtemisMQ. Поскольку наш софт разворачивается в контуре заказчика, у нас ограничены возможности админить и мониторить всю систему. Встроенная в приложение очередь значительно упрощает эксплуатацию и гарантирует стабильность.

ArtemisMQ работает как часть процесса приложения, что исключает дополнительные точки отказа и убирает проблемы, связанные с сетевыми вызовами. Важно и то, что ArtemisMQ написан на Java, в котором у нас есть экспертиза, что позволяет легко разбираться с проблемами. Но у такого решения есть и минусы. Чтобы было нагляднее, покажу все в виде таблицы:

Плюсы

Минусы

Всегда доступна:
поскольку очередь встроена в приложение, она всегда доступна и готова к использованию

Нагрузка на сервер приложений:
поскольку очередь встроена, она увеличивает нагрузку на сервер приложений. Для решения этой проблемы мы используем более мощные серверы приложений, что нивелирует этот минус

Быстрый отклик:
отсутствие дополнительных сетевых вызовов обеспечивает быстрый отклик системы

Потеря глобального порядка событий:
при использовании локальной очереди могут возникать проблемы с порядком событий. Это решается хранением временной метки последнего примененного события для каждого клиента

Отсутствие дополнительных точек отказа:
Artemis MQ работает внутри процесса, что снижает число отказов

Настройка времени: для корректной работы этой схемы необходимо, чтобы на всех серверах приложения время было синхронизировано. Для этого используются NTP-протокол и механизмы проверки времени между серверами. Если время на нодах начинает отличаться больше допустимого (100 мс), система генерирует ошибку в логах и уведомляет службу поддержки

Простота развертывания и администрирования:
нет необходимости разворачивать и администрировать отдельный сервер очередей

Таким образом, использование ArtemisMQ имеет свои плюсы и минусы, но с учетом всех факторов мы посчитали, что это решение нам подходит.

Опыт с HazelCast

На этапе разработки мы решили попробовать использовать Hazelcast для управления сессиями и распределенными блокировками. В то время HazelCast был популярным и казался многообещающим решением для хранения локальных данных и их синхронизации между нодами. Однако на практике выяснилось, что хайповая технология не всегда оказывается подходящей.

HazelCast — открытая Java-библиотека для создания распределенных систем хранения данных в памяти. Она позволяет работать с распределенными объектами, кешировать данные, управлять транзакциями и очередями. Помогает строить масштабируемые и отказоустойчивые системы.

Изначально у нас было всего две ноды, для которых мы развернули HazelCast. Но это был опрометчивый шаг, потому что он не должен был стабильно работать в такой схеме. Это у нас и подтвердилось на практике. Когда разрывалось сетевое соединение между нодами, каждая из них не могла определить, которая из них главная по определенной части данных. Это приводило к куче ошибок из-за split-brain, когда кластер HazelCast делится на две части и каждая считает себя основной.

Решить проблему можно было бы, увеличив число нод до трех, что помогло бы HazelCast определять кворум. Но для наших заказчиков, которые разворачивают систему у себя и не могут быстро добавить ресурсы, это оказалось сложно и дорого. Более того, если Hazelcast разваливается, его трудно восстановить. Нужно перезагружать все ноды и много работать над восстановлением данных.

После нескольких таких сбоев мы поняли, что Hazelcast нам не подходит. Система должна быть максимально простой в эксплуатации и сопровождении.

Ситуацию усугубляет то, что наше решение поставляется on-premise и у нас нет прямого доступа к системам заказчика для быстрого устранения проблем. Когда что-то ломается, нам пишут, и мы запрашиваем детали. Часто это требует передачи диагностической информации, содержащей конфиденциальные данные, что усложняет процесс.

В итоге мы решили отказаться от Hazelcast в пользу БД. У нас уже есть общее хранилище для базы данных, и функции, которые мы использовали в HazelCast, можно реализовать на БД. Для нас это оказалось более стабильным и простым решением, особенно в отсутствии прямого доступа к системам у заказчиков.

Наступаем на грабли

Как и в любом крупном проекте на пути к многонодовой архитектуре мы слегка прошлись по граблям. Выше были описаны сложности, а теперь несколько основных наших факапов, которые в свое время попортили нам крови.

Первые грабли: количество очередей

Одна из первых ошибок была связана с использованием одной очереди для всех типов событий. Мы кидали в нее все подряд: изменения данных клиентов, системные события, управление задачами. В итоге при активных изменениях данных, например массовом обновлении данных клиентов, очередь переполнялась и важные задачи запускались с задержкой.

Чтобы избежать этого, мы разделили события по разным очередям. Например, системные события идут в одну очередь, изменения физических лиц — в другую, изменения юридических лиц — в третью. Это позволило разгрузить каждую очередь и ускорить обработку задач. Теперь массовые изменения данных по одной категории не блокируют обработку других важных событий.

Вторые грабли: массовые задачи

Другой крупный факап произошел, когда мы начали выполнять массовые задачи, генерирующие огромное количество событий. Сначала мы использовали Artemis MQ, который встроен в приложение и работает локально. Но при росте числа нод (три и более) коммуникация peer-to-peer стала неэффективной, и мы перешли на Kafka. Очередь изменений не справлялась с потоком, и синхронизация данных между нодами задерживалась. В итоге данные на одной из нод могли оказаться устаревшими, что приводило к некорректным результатам при запросах.

Нам помогло увеличение количества потоков для обработки очереди. Благодаря этому события начали обрабатываться параллельно, значительно ускорив процесс. В случае с Kafka мы использовали партиции для разделения топиков, что позволило обрабатывать события в нескольких потоках одновременно. Как результат, задержки уменьшились и данные стали более актуальными.

Если в своем проекте вы столкнулись с этой проблемой, следите за длиной очереди и увеличивайте количество потребителей, если один поток не справляется со всеми событиями.

Третьи грабли: проблемы с индексами БД

Создание индексов на больших объемах данных изначально занимало много времени и блокировало таблицы, что приводило к простоям всей системы.

Чтобы решить проблему, мы начали использовать конкурентное создание индексов. В PostgreSQL и Oracle есть специальные команды (CONCURRENTLY и ONLINE), которые позволяют создавать индексы асинхронно, не блокируя таблицы, и продолжать запись данных без простоев.

Итоговая архитектура и сценарии использования

Финальная схема «Единого клиента» на многонодовой архитектуре

Финальная схема «Единого клиента» на многонодовой архитектуре

После устранения вышеописанных сложностей у нас получилась отказоустойчивая многонодовая архитектура. Вот как она работает в различных сценариях.

Обработка запросов на чтение:

  • Ноды без выделенного мастера обрабатывают поисковые запросы.

  • При сбое одной ноды другие продолжают работу, используя локальные индексы.

Обработка запросов на запись:

  • Основная и резервная БД (мастер и стендбай) обеспечивают отказоустойчивость.

  • События изменений данных записываются в персистентную очередь и обрабатываются другими нодами.

Синхронизация данных:

  • Использование CQRS для разделения операций записи и чтения.

  • Двухэтапный подход записи данных: сначала в БД, затем в очередь сообщений.

Обеспечение целостности данных:

Такая архитектура и подходы помогают сделать систему стабильной, устойчивой к сбоям и легко масштабируемой под разные условия эксплуатации.

Ну и напоследок…

Семь заповедей многонодовости

Если, прочитав материал выше, вы решили, что вам тоже нужна многонодовость, — вот несколько моментов, проработка которых поможет начать и избежать некоторых ошибок. Да простят меня читатели, если в каких-то моментах я немного покапитанствую.

  1. Оценка нагрузки и требований

    Без них не обойтись, вот совсем. И опираться нужно только на цифры проекта, а не свое «Я уже сто раз так делал». Важно понять, сколько нод действительно нужно и какая будет нагрузка. Анализ преобладания записи или чтения помогает выбрать правильную архитектуру и избежать ненужных сложностей.

  2. Модные технологии — спорная штука 

    Опыт показывает, что использование хайповых технологий, таких как Hazelcast, может создавать больше проблем, чем решений, если они не подходят по конкретным требованиям.

  3. Проверенные решения — наше все

    Стандартные компоненты, такие как базы данных и балансировщики нагрузки, часто оказываются более чем достаточными. Например, персистентные очереди и базы данных для хранения сессий и распределенных блокировок обеспечивают стабильность и простоту эксплуатации.

  4. Резервирование и отказоустойчивость 

    Надежность системы является ключевым фактором. Резервирование для всех критичных компонентов, использование персистентных очередей, которые могут пережить выключение приложения и сбои, и настройка базы данных с горячим резервом обеспечивают устойчивость работы и сберегут вам немало нервных клеток.

  5. Излишняя сложность — не лучший путь

    Большое количество технологий и компонентов может создать дополнительные сложности. Каждая новая технология требует изучения, настройки и поддержки. Простота эксплуатации и минимально необходимые инструменты предпочтительны.

  6. Ориентация на бизнес-процессы

    Критически важная штука. Включение бизнес-пользователей в процесс с самого начала помогает в определении требований и проверке решений, несмотря на возможные трудности в коммуникации. Ведь отказоустойчивость системы нужна именно для поддержания важных бизнес-процессов, будь то быстрый и гарантированный поиск клиентов или внесение изменений в их данные.

  7. Безопасность данных пользователей — это основное

    Централизация данных в одной системе имеет свои плюсы и минусы. Единая точка аудита и контроля доступа — преимущество, но увеличивается риск утечки данных. Защита системы на всех уровнях и готовность к возможным компрометациям крайне важны.

А в целом, каждое решение должно быть обоснованным и соответствовать вашим конкретным нуждам и возможностям.

Спасибо, что прочитали! Поделитесь в комментариях, что думаете о многонодовой архитектуре и какие моменты или сложности возникали у вас при переходе на нее.

© Habrahabr.ru