Как мы распиливаем монолит без даунтайма

image

Всем привет!

На связи Михаил, и я продолжаю делиться историями про рефакторинг одного из сервисов облачной платформы #CloudMTS. В прошлый раз я рассказывал о том, как мы аккуратно раскладывали по папочкам код в соответствии с принципами чистой архитектуры. Сегодня поговорим о решении, которое позволяет нам распиливать монолит по кусочкам без простоев.

Вместо дисклеймера

Переход от монолита к микросервисной архитектуре — задача непростая. Особенно когда приложение уже в продуктиве. Пускаться в эту историю, потому что микросервисы — это стильно и молодежно, плохая затея. Стартуйте только тогда, когда преимущества трансформации будут очевидны и перевесят возможные издержки.


Наши причины перехода были следующими:

  1. В монолите концентрировалось большое количество бизнес-процессов, которые охватывали сразу несколько потребителей: пользователей облачной платформы, сейлз-менеджеров (через CRM-систему), администраторов, обработчиков метрик. Получилась такая одна большая точка отказа сразу для 4 групп бизнес-процессов.
  2. Каждый бизнес-процесс потребляет свой объем ресурсов. Например, для обработки метрик нужно 5 подов (чтобы запараллелить и ускорить обработку), для администрирования хватит и одного. Так как у нас все в одном сервисе, при масштабировании монолита мы будем ориентироваться на самый «прожорливый» бизнес-процесс. Часть ресурсов будет просто простаивать.
  3. Хотелось добиться гранулярности, чтобы независимо писать и деплоить код для каждого бизнес-процесса. И не переживать, что какие-то изменения в одном бизнес-процессе неожиданно отрикошетят в соседний.


Сестра, скальпель


Вытащить какой-то из доменов целиком за раз было нереально: они большие, связаны сразу с несколькими бизнес-процессами и перенос даже одного домена занял бы много времени. Потратить столько времени на рефакторинг мы не могли себе позволить. Поэтому мы ели слона по частям — отрезали домен по маленьким кусочкам (по одному endpoint, http-хендлеру) и добавляли его на фронтенд. Такой подход также упрощал тестирование перенесенных бизнес-процессов.

Получилось так, что часть бизнес-логики остается на старом сервисе, а часть уже живет на новом. Оба сервиса пишут данные в свои базы, и сервисам нужно как-то синхронизироваться.
Так у нас появилась задача синхронизации данных между двумя сервисами. Вот как мы ее решали.

Сервис А (монолит) в качестве базы данных использует MongoDB, сервис Б (микросервис) — PostgreSQL. Обмен изменениями происходит через Apache Kafka.

  1. На стороне сервиса А мы подключаемся к change stream коллекции нужной нам сущности.
  2. Изменение из change stream перекладываем в отдельную коллекцию и оттуда отправляем в Kafka. Отдельная коллекция нужна для аудита изменений и отправки в Kafka тех событий, которые не были отправлены по какой-либо причине.
  3. Consumer на стороне сервиса Б забирает данные из Kafka и перекладывает их в свое хранилище, PostgreSQL. Таким образом происходит передача данных для синхронизации из сервиса А в сервис Б.


Похожий процесс происходит и в обратном направлении, разница лишь в том, что там не MongoDB, а PostgreSQL и другой инструментарий.

  1. Для каждой таблицы в PostgreSQL, которой нужна синхронизация, мы добавляем триггер:
    Код под спойлером
    create function on_table_row_change() returns trigger as
    $$
    declare
       changed_fields text[];
       changesCnt     numeric;
    begin
       if TG_OP = 'INSERT'
       then
           insert into audit (table_name, val_id, operation, new_val) values (TG_RELNAME, new.id::text, TG_OP, row_to_json(new)::jsonb);
           return new;
       elsif TG_OP = 'UPDATE'
       then
           changed_fields = akeys(hstore(new.*) - hstore(old.*));
           changesCnt = coalesce(array_length(changed_fields, 1), 0);
           if changesCnt = 0 or (changesCnt = 1 and coalesce(array_position(changed_fields, 'updated_at'), 0) > 0)
           then
               return new;
           end if;
    
    
           insert into audit (table_name, val_id, operation, old_val, new_val, changed_fields)
           values (TG_RELNAME, new.id::text, TG_OP, row_to_json(old)::jsonb, row_to_json(new)::jsonb, changed_fields);
           return new;
       elsif TG_OP = 'DELETE'
       then
           insert into audit (table_name, val_id, operation, old_val) values (TG_RELNAME, old.id::text, TG_OP, row_to_json(old)::jsonb);
           return old;
       end if;
    end;
    $$ language 'plpgsql';
    


    Этот триггер записывает в отдельную таблицу audit все изменения, включая информацию о сделанной операции и предыдущем значении.
  2. В самих бизнес-процессах есть точки, в которых мы триггерим перекладывание изменений из таблички audit в топик Kafka. Например, такой триггерной точкой является успешное завершение CRUD-операций над бизнес-сущностями.
  3. На стороне сервиса А есть Consumer, который слушает топик Kafka с изменениями из сервиса Б и перекладывает эти изменения в MongoDB.
    Оба сервиса отправляют и получают изменения из одного и того же топика. Чтобы сервис А не получал свои же сообщения, у сообщения есть поле source.


Общая схема движения данных из сервиса А в Б

image

Общая схема движения данных из сервиса Б в А

image

Последовательность упражнений, которые мы проделываем до начала переноса бизнес-логики.

  1. Описываем контракт топика Kafka, по которому будем передавать изменения. У всех сущностей он примерно одинаковый.
    Код под спойлером
    syntax = "proto3";
    
    
    import "google/protobuf/timestamp.proto";
    
    
    message ChangeEvent {
     // Тип события
     enum EventType {
       UNKNOWN_EVENT_TYPE = 0;  // Неизвестный тип
       CREATED = 1; // Создание сущности
       UPDATED = 2; // Изменение сущности
       DELETED = 3; // Удаление сущности
     }
    
    
     // Измененное поле
     enum Field {
       UNKNOWN_FIELD = 0; // Неизвестное поле
       FIELD_1 = 1; // Какое-то поле 1
       FIELD_2 = 2; // Какое-то поле 2
     }
    
    
     // Источник события
     enum EventSource {
       UNKNOWN_EVENT_SOURCE = 0; // Неизвестный источник
       SERVICE_A = 1; // Сервис А
       SERVICE_B = 2; // Сервис Б
     }
    
    
     // Бизнес модель
     message Model {
       string field_1 = 1; // Какое-то поле 1
       string field_2 = 2; // Какое-то поле 2
     }
    
    
     string id = 1; // ИД бизнес сущности
     EventType type = 2; // Тип события
     Model new_val = 3; // Состояние после события
     repeated Field changed_fields = 4; // Список полей который изменился
     EventSource source = 5; // Источник события
     google.protobuf.Timestamp changed_at = 6; // Время возникновения события
    }
    

  2. В сервисе Б создаем миграцию на добавление таблички. В ней будем хранить данные, которые переливаем в сервис А. Описываем все, как в конечном варианте: используем все необходимые constraint, foreign key, enum. На табличку вешаем триггер, который будет сохранять изменения в отдельной табличке audit.
  3. В сервисе Б пишем producer и consumer на топик с изменениями.
  4. В сервисе А пишем producer и consumer на топик с изменениями.
  5. Пишем интеграционные тесты на синхронизацию в обе стороны.
  6. Деплоим на прод, следим за алертами, consumer lag.


Когда убеждаемся, что синхронизация работает как часы, начинаем переносить бизнес-логику в сервис Б. Следом переключаем трафик на новый сервис.

Важные моменты


  1. Прежде чем браться за перенос бизнес-логики в новый сервис, нужно тщательно настроить и протестировать транспорт между сервисами через Kafka, настроить мониторинг и алертинг для ошибок. Речь идет об ошибках на уровне бизнес-логики, например, нет зависимой сущности, на которую имеется foreign key. Они обычно не исчезают сами по себе после ретраев, как ошибки, связанные с инфраструктурой (потеря связи с БД, например).

    Такие ошибки могут вызвать скопление большого количества необработанных сообщений в топике Kafka и стать причиной рассинхронизации данных. Чтобы этого не произошло, настраиваем алертинг. Например, у нас настроено так, что Grafana присылает алерт в Telegram, если consumer lag больше 1 от 30 секунд до 1 минуты.

  2. Consumer и Producer у нас реализованы на Go. У Go есть дефолтные значения (zero value) определенных типов данных. Например, для строки — это пустая строка, для слайса -nil. Возникает следующая небольшая, но сложность: как понять — мы имеем тут дело с zero value или это изменение с какого-то значения на пустую строку. Consumer сложно отличить первую ситуацию от второй. Чтобы Consumer понимал, с чем он имеет дело, мы передаем дополнительный массив строк, в которых мы указываем, какие поля изменились. Благодаря этому списку Consumer отличает zero value от значения, которое выставил пользователь.
  3. Нужно синхронизировать последовательность передаваемых изменений, чтобы не получилось так, что Consumer обработал изменение, которое произошло позже, раньше. Для этого ключом топика Kafka мы выбираем ID сущности.
  4. После возникновения изменения и до отправки его в Kafka мы иногда сталкиваемся со следующими ситуациями:
    • мы получили изменение из change stream, но не смогли переложить его в коллекцию для аудита;
    • мы переложили изменение в коллекцию для аудита, но не смогли отправить в Kafkа.

    Чтобы преодолеть их, мы делаем следующее: когда сохраняем изменение в коллекцию для аудита, мы сохраняем сразу со статусом «изменение успешно отправлено в Kafkа» и в той же горутине отправляем изменение в Kafka. Если в Kafka отправить не получилось, то сбрасываем признак. Если сбросить признак не получилось, то это исключительная ситуация, алерт, на который необходимо реагировать.

    Отдельно есть worker, который с определенным интервалом отправляет все неотправленные изменения после последнего успешного. Мы пока отказались от транзакций и более сложных реализаций гарантированной доставки в пользу более быстрой доставки изменений.


Какие еще варианты рассматривали для синхронизации данных?


Использовать готовые CDC. Мы писали свое решение на Go, но есть готовые инструменты, которые позволяют подключаться к БД, создавать события по триггеру и перекладывать их в Kafka. Например, Debezium. У него есть расширения для PostgreSQL и MongoDB. Также можно было бы использовать Confluent CLI как PostgreSQL Kafka Connector.

Любое из этих решений создало бы несколько проблем:

  • Усложняется архитектура решения. При смене главного узла в кластере PostgreSQL нам придется дополнительно следить за Debezium: какие данные туда уходят, работает ли репликация, работает ли репликация после switchover/failover. Для этого придется допиливать мониторинг СРК, смотреть, как это влияет на производительность.
  • Сложности при обновлении PostgreSQL. При обновлении баз данных, при смене мажорной версии протокол репликации и формат WAL-файлов могут меняться. Вплоть до полной несовместимости.


Ходить в базу напрямую. Мы создаем новый сервис, переносим туда бизнес-логику, но этот сервис продолжает ходить в БД старого сервиса, монолита. Это самый быстрый, но самый ненадежный вариант, так как связываем сервисы общей БД. У БД получается два потребителя. Если мы поменяем что-то у основного владельца этой базы (например, схему данных), то второй сервис перестанет работать. Получается, надо поддерживать любые изменения сразу в двух сервисах.

Заключение


Получилось рабочее решение. С одной стороны, у нас появилась история изменения данных, которая помогает в разборе инцидентов. С другой, есть двухсторонняя синхронизация данных между сервисами, которая помогает по более мелким кусочкам переносить бизнес-логику. Но также есть и проблема, которую мы пока не решили и не уверены, что будем решать: изменение из сервиса Б снова возвращается в сервис Б. Рабочей схеме оно никак не мешает, а сама конструкция — все-таки временное решение на период переноса бизнес-логики из монолита в микросервисы.

cj_4_8cxcdqtjzqqzudwqtlt7fk.png

Top.Mail.Ru

© Habrahabr.ru