Эффективные надежные микросервисы

d7-fp2y6q3qoz7vap3xaaxjlsao.png

В Одноклассниках запросы пользователей обслуживает более 200 видов уникальных типов сервисов. Многие из них совмещают в одном JVM-процессе бизнес-логику и распределенную отказоустойчивую базу данных Cassandra, превращая обычный микросервис в микросервис с состоянием. Это позволяет нам строить высоконагруженные сервисы, управляющие сотнями миллиардов записей с миллионами операций в секунду на них.

Какие преимущества появляются при совмещении бизнес-логики и БД? Какие нюансы надо учесть, прибегая к такому подходу? Что с надёжностью и доступностью сервисов? Расскажем подробно об этом всём.

Те, кому удобнее видеоформат, могут посмотреть запись, а для всех остальных дальше идёт текстовая версия.


Меня зовут Олег Анастаcьев, я работаю главным инженером в Одноклассниках. А кроме меня, в Одноклассниках есть более 6 000 машин, на которых работает 15 000 задач, как просто на железных серверах, так и в виде контейнеров — в наших собственных четырёх облаках, о которых я ранее уже рассказывал.

Эти задачи предоставляют более двух сотен различных сервисов. Сегодня мы вместе с вами попытаемся построить один из таких микросервисов, связанных с задачей мессенджинга.

Итак, в мессенджере каждый пользователь имеет какие-то чаты. Может создать чат один на один с пользователем, а может сразу с несколькими.

s_akkgtgd2hj7rlloohhmcuddqy.png

Вариантом чата с несколькими пользователями являются каналы (здесь просто есть ограничение, могут ли пользователи писать или только читать).

Очевидно, что средний пользователь участвует в ограниченном количестве чатов одновременно. По нашим данным, 95% активности пользователя приходится на 5% чатов.

Основное, что хранят чаты — это сообщения. В 80% случаев мы видим в чате не более 13 последних сообщений. На примере выше видно только три и часть четвёртого, но для этой иллюстрации окно браузера было уменьшено — при его обычном размере мы видим приблизительно 13 сообщений.

Но эти цифры не работают, когда нужно получить последние сообщения для отображения в списке чатов:

hdrksj0tdm_sygl2jiwo-tvrszo.png

Маловероятно, что эти сообщения будут востребованы второй раз. В web-клиенте такие сообщения нужно достать только для пары десятков видимых чатов, а для мобильных устройств их может понадобиться значительно больше.

Мы будем строить сервис для хранения сообщений, который сам по себе достаточно масштабный и нагруженный:


  • 600 миллиардов сообщений
  • 5 миллиардов чатов
  • Около 100 терабайт чистых данных без учёта репликации
  • Около 120 тысяч запросов в секунду на чтение
  • Около 8 тысяч запросов в секунду на запись

Как и любой современный мессенджер, наш должен поддерживать полнотекстовый поиск. Введя запрос в поле поиска, можно получить все чаты с соответствующими сообщениями. А это значит, что сообщения нужно периодически обходить для индексирования. Есть и другие задачи, связанные с обходом и анализом полного набора данных, вроде антиспама.

Сами сообщения — это не только текст, но и множество другой необходимой информации:

1sm0hzyof90eouhpnqgtyhkc7vg.jpeg

В разных колонках этой таблицы — ID чата, ID сообщения в чате (я обозначил его временем создания этого сообщения), автор, тип сообщения (оно может быть текстом, видеозвонком, может быть помеченным как спам), далее текст или какие-то данные, относящиеся к сообщению определённого типа, и прикреплённые файлы.

То есть у нас вот такая табличка передаёт структуру этих сообщений:

CREATE TABLE Messages (
    chatId, msgId

    user, type, text, attachments[], terminal, deletedBy[], replyTo…

    PRIMARY KEY ( chatId, msgId )
)

И на эти данные у нас поступают следующие операции:


  • getMessages (viewer, chat, from, to), который для указанного пользователя в указанном чате получает список последних сообщений за определённый промежуток времени. Разные пользователи могут видеть разный набор сообщений (например, не видеть спам).
  • getLastMessages (viewer, chats) получает список последних сообщений для большого списка чатов. Это самый тяжёлый запрос в системе, поскольку он идёт по списку чатов.
  • Естественно, для мессенджинга у нас есть операция добавления сообщения add (chat, message), операция поиска search (viewer, text) и операция индексации сообщений для поиска indexMessages ().

Давайте попробуем реализовать это всё с использованием микросервисной архитектуры.



Современные микросервисы

Пользователи с мобилки или с веба (либо напрямую, либо через какие-то фронты) работают с нашим сервисом сообщений, в котором реализована бизнес-логика, через какой-нибудь наш любимый протокол.

Данные же мы храним отдельно в постоянном хранилище. Отказоустойчивом и надёжном, естественно. Мы используем Cassandra.

Для администрирования этой самой Кассандры мы наняли солидного бородатого дядьку DBA, а мы, нашей хипстерской персоной в кепке, отвечаем только за свой сервис:

bm5wy1tocp_c0xmfluf30l_lct8.jpeg

Всё, пора запускаться. Попробуем для начала реализовать самый частый запрос getMessages ().

Для этого нам нужно выполнить приблизительно вот такой запрос в базу данных:

SELECT FROM Messages
    WHERE chatId = ? AND
    msgId BETWEEN :from AND :to

Есть одна небольшая проблема: таких запросов поступает более ста тысяч в секунду. Есть некоторая нагрузка, и сервис у нас начинает ложиться.

Ну что ж. Масштабирование — это то, что мы умеем в микросервисах, правильно? Добавляем инстансов нашего сервиса. И обнаруживаем, что нагрузка проваливается в базу и ложится теперь уже она:

vitde-nlrpc_dqwoqyktzkbqswu.jpeg

Что дальше? Мы можем либо масштабировать базу, либо воспользоваться знанием, что на 5% активных чатов приходится 95% активности. Будем кешировать.

Вот и бородатый DBA подсказывает, что в нашей базе данных есть кеш часто используемых данных. Пробуем, помогает, но немного.

Оказывается, есть тонкости в других методах сервиса. Вызовы getLastMessages () и indexMessages () работают не только с активными, но с абсолютно всеми чатами. И хоть вызовов мало, они приводят к тому, что в кеш попадают те 95% записей, которые нам в общем-то не нужны, и вытесняют оттуда нужные нам 5%. Встроенные алгоритмы кеширования БД не могут учитывать специфику приложения и чаще всего ограничены каким-нибудь общим алгоритмом, как, например, LRU. То есть в нашем сервисе происходит замусоривание кеша ненужными данными.

Чтобы не увеличивать на порядок количество памяти, расходуемой под кеш базы данных, мы должны управлять кешами явно из прикладного кода, помещая туда только те сообщения и тогда, когда это нужно приложению.

Для этого мы можем использовать любое внешнее хранилище ключей-значений в оперативной памяти, например, Memcached, Redis или Tarantool, полностью сняв кеширование с базы данных. При росте нагрузки можем масштабировать кластер memcache независимо.

Вот мы и пришли к типовой архитектуре, к которой приходят многие при построении нагруженного сервиса: прикладной микросервис без состояния, реализующий прикладную логику и использующий внешнюю БД для постоянного хранения состояния и внешний кеш для кеширования:

2fmqotaaww_b50yv7dr2iijtaqc.jpeg

Давайте проанализируем потери, возникающие в такой архитектуре.



Потери типовых микросервисов

Первая проблема — затрата ресурсов CPU на маршалинг и демаршалинг. Маршалинг — это процесс преобразования данных, хранящихся в оперативной памяти, в формат, пригодный для хранения или передачи. Прикинем, где он происходит в нашей схеме.

Наше приложение, обращаясь к memcache, должно замаршаллировать данные и запрос. Кеш должен демаршаллировать эти данные, понять, что это за запрос, обработать, замаршаллировать ответ. Приложение же, получив этот ответ, должно его демаршаллировать на своей стороне.

Те же самые операции маршалинга и демаршалинга будут происходить и при взаимодействии приложения с БД:

pxdm6seji3s8y3flkva_gdypvdi.jpeg

Насколько велики потери при этом? Интуитивно кажется, что немного.

Есть недавно проведенные исследования. Atul Adya, Robert Grandl, Daniel Myers
из Google и Henry Qin из Стэнфордского университета, переписывая одну из своих систем с подобной архитектуры, замерили это и обнаружили, что маршалинг и демаршалинг увеличивают нагрузку на процессор на 85%.

veykvesmiuwdz3baz9oqndx4tlw.jpeg

И это даёт плюс 27% к медианной задержке, что не очень хорошо, если мы строим low-latency сервис.

Вторая проблема — чрезмерные чтения и запись. Они возникают, когда приложение читает больше данных из memcache, чем это необходимо.

В нашем примере с хранением последних 13 сообщений чата в memcache, чтобы получить последнее сообщение из каждого чата, нам пришлось бы прочитать все 13, сериализовать, передать их по сети, десериализовать на стороне сервиса, после чего отбросить 12 из 13, оставив только одно самое свежее сообщение.

Согласно тому же исследованию, если запись содержит только 10% необходимой информации, то мы тратим на 46% больше ресурсов CPU и на 86% больше ресурсов сети, чем это действительно необходимо.

Третья проблема — сетевые задержки. Тут мы теряем время как просто на удалённые походы на memcache и базу данных, так и на время передачи нужного объёма данных в запросе-ответе, особенно в сочетании с излишними чтениями.

Но хуже всего, что эти потери растут прямо пропорционально тому, сколько обращений в кеш и базу данных необходимо для обслуживания единственного запроса от клиента.

lyir0vzphax_chgheuh-baxm34w.jpeg

Это число N может быть достаточно большим, оно сильно зависит от креативности разработчика. В нашем примере с получением последних сообщений оно легко может достигать десятков.

Кто виноват — разобрались. Теперь давайте разберёмся, что с этим делать.



Чтожеделать

Поскольку большинство нагрузки попадает на memcache и его связь с прикладной логикой, в индустрии в последнее время большинство усилий было направлено на оптимизацию именно этого: memcache и этой связи.

Некоторые компании концентрируют усилия на оптимизации перцентильных задержек memcache до 99.99.

Redis и Tarantool пытаются уменьшить чрезмерные чтения путём поддержки структуры данных на стороне memcache, что позволяет выбирать не всё значение под ключом, а только часть.

NetCache — решение, которое пытается уменьшить сетевые задержки путём построения memcache прямо на инфраструктуре сетевых свитчей.

KV-Direct борется с демаршаллингом путем переноса задач маршалинга и демаршалинга на сетевые карты.

Тут легко заметить, что большинство проблем возникает именно из-за факта удалённости необходимых данных от приложений. И таким образом, мы можем решить проблемы, просто двинув данные к нашей логике. Так мы приходим к микросервису с состоянием.



Микросервис с состоянием

Микросервис с состоянием объединяет прикладную логику сервиса с кешем в памяти одного и того же процесса. Это специализированный кеш, в нём реализуются операции, необходимые для прикладной логики.

s72c2lzvc83wqllfz_jnloqlsnq.jpeg

Это решает все перечисленные выше проблемы:

Во-первых, позволяет полностью избавиться от потерь, связанных с сетевыми задержками. Теперь между этими компонентами нет сети.

Во-вторых, прикладной интерфейс встроенного кеша позволяет реализовать нужную функциональность без чрезмерных операций чтения/записи.

В-третьих, так как теперь прикладная логика и кеш обмениваются данными в памяти одного процесса, мы можем выбрать приемлемый для обоих формат данных, что позволяет полностью убрать или сильно снизить затраты на демаршалинг.

Совмещение базы данных в одном процессе с сервисом решает абсолютно те же самые проблемы, но просто на второй «ноге» нашей архитектуры. И позволяет просто интегрировать все компоненты в случае необходимости.

Ну и вредный дядька DBA нам больше не нужен. Теперь мы сами себе девопсы и отвечаем за все компоненты системы.

Таким образом, микросервис с состоянием будет всегда быстрее при одинаковой реализации приложения за счёт экономии почти в два раза на потерях, которых в этом случае просто нет. Но не может же быть всё хорошо, правильно? Может, оно будет глючить, если не будет тормозить?

Давайте посмотрим, что там с отказоустойчивостью.



Отказоустойчивость

Давайте вспомним, какие проблемы возможны в простейшей распределённой системе, состоящей из двух машин — клиента и сервера. Клиент посылает запрос серверу, сервер его обрабатывает и возвращает. Что может пойти не так?

1, 2 Это внезапная пропажа участника (сервера или клиента) из сети: остановка приложения, падение, ребут сервера.
3, 4: Потеря исходящего и входящего сообщения.
5 Таймаут. Участник отвечает вне указанных временных рамок.
6 Неправильный ответ — участник отвечает таким значением, которое другой участник не может понять. Например, неправильный формат данных в memcache.
7 Произвольный отказ, он ещё называется византийским. Участник отправляет произвольные сообщения в произвольное время. Это могут быть какие-то умышленные действия, типа различных атак, а могут быть retries, вышедшие из-под контроля.

Итак, давайте посчитаем, какие сценарии отказов будут у нашей системы, построенной как микросервис без состояния. Запрос поступает на наш сервис, в рамках обработки которого он обращается в memcache и базу данных.

qbgdggwkctugayqqxjuncdt1ucq.jpeg

Что в этой схеме может отказать? Может ли отказать memcache? Допустим, memcache пишут волшебные эльфы, и его софт не отказывает. Но может отказать машина или сеть (мы пока не рассматриваем репликацию).

База данных? Ещё более волшебные эльфы пишут ещё более волшебно базу данных, она вообще никогда не крэшится. Но сеть и машина всё равно могут отказать.
И memcache и БД могут отказать либо по-отдельности, либо одновременно, в любой последовательности. Естественно, по закону Мёрфи они будут отказывать в самой неблагоприятной.

Таким образом, на обеих «ногах» архитектуры у нас получается по семь сценариев отказа. И наш микросервис тоже может отказать, там уже волшебных эльфов не водилось, и с этим тоже надо как-то иметь дело.

u84p9c-7vm5zdrtq1v52sp6w5os.jpeg

Интересно здесь то, что все эти отказы связаны между собой, они не независимые.

Поэтому для того, чтобы правильно запрограммировать нужную обработку, нам необходимо отдельно рассматривать все отказы и комбинации между ними. Что не делает наш код проще, а вероятность ошибиться и накосячить в таких случаях всегда не равна нулю.

Давайте посмотрим, что будет с отказами в нашей системе, построенной как микросервис с состоянием. Те же самые три машины, организованные по варианту сервиса с состоянием. Здесь схема шардирования будет немного другой. Если в первом случае все три машины отвечали за множество ключей K, то во втором случае каждая из машин будет отвечать только за диапазон равный ⅓ K, которые не пересекаются друг с другом.

Соответственно, коммуникация будет идти только в прикладную логику, ведь кэш и база данных более не требуют сетевой коммуникации. На каждой из этих связей тоже возможны те же самые сценарии отказа.

p47rakv676smel7_km7cyzohahs.jpeg

Но действия, которые необходимо предпринять разработчику при отказе первой трети и остальных одинаковы. Если вы обработали отказы по первой трети, остальные надо обрабатывать точно также. Это потому, что эти отказы никак не связаны между собой в рамках обработки одной операции. А значит, комбинации отказов рассматривать не надо и код становится проще.

Давайте теперь посчитаем вероятность отказа машины. Сначала для микросервиса без состояния. Пока предположим для простоты, что репликации у нас нет.

Что будет, если откажет база данных? Будет ли работать у нас сервис по всем ключам? Не будет. Если откажет memcache или сервис, тоже не будет. То есть при отказе любого из компонентов мы получим недоступность сервиса по всем ключам из множества K.

Значит, если вероятность отказа любой конкретной машины обозначим p, то общая вероятность отказа P будет приблизительно вот такой:

P (K) = 1 — (1 — p)3

В варианте же сервиса с состоянием при отказе любой машины из трех мы получим недоступность только тех ключей, за которые отвечает эта машина. То есть такой будет вероятность недоступности не всех ключей, а только трети:

P (⅓ K) = 1 — (1 — p)3

А чтобы получить отказ по всему множеству K, должны отказать все три машины. То есть формула вероятности отказа для всех ключей сразу будет приблизительно такой:

P (K) = p3

По формулам не очень очевидно, что лучше, для наглядности подставим цифры.

Допустим, у нас вероятность отказа конкретной машины p составляет 0.1 (то есть 10%), это очень много, но здесь мы возьмем эту величину для наглядности:

Для сервиса без состояния получим
P (K) = 1 — (1 — p)3 = 1 — (1 — 0.1)3 = 0.271

А для сервиса с состоянием:
P (K) = p3 = 0.13 = 0.001

Так получается, что архитектура сервиса с состоянием на порядки надежнее.

Конечно, в реальности никто не ставит по одной реплике. Каждая реплика, добавляемая к БД, memcache или микросервису в идеальном случае снижает вероятность отказа каждого компонента на порядок. Но это непринципиально: взаимосвязи компонентов никуда не делись, а поэтому формулы, по которым мы считаем вероятности тоже — мы просто будем подставлять вместо вероятности отказа компонента p = 0.01 для 2 реплик, p = 0.001 для трех, и т.п. Но точно так же мы можем реплицировать и сервис с состоянием, реплицировав каждую уникальную ⅓ K в две (получив вероятность недоступности каждой трети ключей p = 0.01), три (p = 0.001) реплики.

Получается, что сервис с состоянием всегда будет надёжнее при одинаковом и даже меньшем количестве машин.



Вперёд, к реализации!

С чего начнём? Мы хотим:


  • Чтобы наш сервис был высокодоступным. Значит, должна быть обеспечена репликация и определены какие-то гарантии консистентности данных между репликами.
  • Чтобы он был масштабируемым, и при увеличении нагрузки мы могли на ходу добавлять мощности. А поскольку сервис теперь совмещён с данными, решардинг, то есть перераспределение данных по масштабируемым нодам, тоже должен поддерживаться.

Конечно, это можно реализовать самим, это интересная задача, работы тут примерно на несколько лет, но намного быстрее взять что-то готовое и допилить. Слова «репликация», «консистентность» и «решардинг» намекают, что мы можем взять что-то в современных БД, поэтому в первую очередь надо определиться с ней.

У нас есть еще одно важное требование к базе данных — она должна быть на языке приложения. Это позволит нам минимизировать демаршалинг и упростить интеграцию с нашим приложением. У нас в Одноклассниках почти всё пишется на Java, соответственно, мы хотим иметь базу данных на Java. Также БД должна иметь открытый код: мы собираемся её использовать нестандартным способом —, а значит, будем что то допиливать.

Итак: open source, на Java, высокодоступное масштабирование — исходя из всего этого, в качестве БД мы и выбрали Cassandra.

Теперь её нужно встроить в наш сервис. Если посмотреть скрипты запуска Cassandra, становится понятно, что запуском занимается класс CassandraDaemon.

А значит, всё, что нам нужно сделать — включить необходимые библиотеки в classpath нашего сервиса (-cp cassandra/lib/*.jar) и вызвать Кассандра-демона вот таким заклинанием:

System.setProperty( "cassandra.config", "file://whatever/cassandra.yaml" );
CassandraDaemon.instance.activate();

В первой строчке мы устанавливаем, откуда CassandraDaemon будет брать конфигурацию (тот, кто запускал Cassandra, знает, что cassandra.yaml — это её конфигурационный файл). Во второй строчке делается вызов activate (), которым мы и запускаем ноду базы данных.

Файл сassandra.yaml использовать необязательно, есть интерфейс ConfigurationLoader, который можно реализовать, чтобы интегрировать ноду Cassandra с системой централизованного управления.

Вот и все — мы встроили дазу данных в наш сервис —, а значит теперь у нашего сервиса есть состояние. Давайте посмотрим, как это меняет маршрутизацию клиентских запросов.


Маршрутизация запросов

Предположим, у нас есть пользователь Лёха. Он хочет попереписываться в чате, ключ которого попадает в определённое множество ключей B. Для этого Лёха использует какой-то фронт, который должен решить, куда отправить запрос Лёхи для обработки. Для сервиса без состояния фронту всё равно, какой конкретно работоспособный инстанс сервиса выбрать, последовательные запросы Лёхи могут в итоге попасть на каждый из них:

n_rj1tmkglpnej6logbufrm-k_a.jpeg

Со всех этих инстансов цена обслуживания запроса будет одинаковой. Каждому нужно будет отправлять запросы за необходимыми для обработки запросов данными по сети на кеши и на базу данных.

А у микросервисов с состоянием каждый из инстансов сервиса имеет какую-то часть данных локально — один из интервалов ключей, которые я обозначил A, B и C. И теперь инстансу, владеющему интервалом ключей В, незачем куда-то ходить по сети: все данные для выполнения запросов доступны либо в памяти процесса, либо на локальных дисках.

У остальных же инстансов нет данных нужного Лёхе интервала B, и им придётся доставать эти данные по сети. А значит, цена обработки запроса сразу увеличится на демаршалинг и сеть.

efztm1afczxnzwoe_7cfjwegiz4.jpeg

Для того, чтобы запрос по интервалу ключей обрабатывался эффективно, мы не должны обращаться к нелокальным данным. А сделать мы можем это только в случае, если клиенты будут вызывать сразу нужный инстанс сервиса B. Нам нужна маршрутизация запросов, учитывающая топологию кластера, то есть информацию о распределении данных по нодам.

Мы можем реализовать такой роутинг на фронте в виде библиотеки. Она делала бы запросы по ключам сразу на нужные ноды, и эти ноды могли бы работать только с локальными данными.

Распределение данных диктует база данных. Мы только что встроили Cassandra в наш сервис. Давайте посмотрим, как она распределяет данные.

В Cassandra каждый ряд имеет ключ, составленный из двух компонентов:
Partition Key (chatId) — на основании него выбирается нода.
Clustering Key (msgId), который в распределении данных по нодам не участвует, но определяет порядок, в котором записи будут упорядочены внутри партиции.

CREATE TABLE Messages (
    chatId, msgId

    user, type, text, attachments[], terminal, deletedBy[], replyTo…

    PRIMARY KEY ( chatId, msgId )
) 

Записи с разным Partition Key попадают в разные партиции и могут быть распределены на разные ноды. А записи, где различается только Clustering Key, находятся в одной партиции и никак не могут быть распределены по разным нодам — то есть всегда будут находиться вместе. Подробнее об этом можно почитать тут.

Поскольку в мессенджере порядок следования сообщений в чате важен, а порядок разных чатов между собой — нет, то для нас в качестве Partition Key правильно будет выбрать ID чата, а Clustering Key — ID сообщения.

5kvuvl99bx3vsqbczu8_2wzibbc.jpeg

С точки зрения кода в распределении данных в Cassandra участвуют следующие компоненты:


  • Partitioner — на основании значения Partition Key вычисляет токен, все доступные значения которого принято отображать в виде кольца. Partitioner задаётся один раз на весь кластер и не может быть изменён впоследствии. Среди всех вариантов Partitioner наиболее общеприменим тот, который получает токен путем применения hash-функции murmur3 к значению ключа.
  • TokenMetadata — глобальная структура, в которой хранится разбиение кольца на интервалы значений токенов и соответствие этих интервалов нодам, на которых хранится соответствующая токену партиция.
  • Replication Strategy определяет, как организуется хранение реплик партиции и сколько их вообще должно быть. Можно указать разные Replication Strategy для разных таблиц, поместив их в разные keyspace.

Этой информации достаточно для того, чтобы в три строчки составить полную карту нахождения всех данных и их реплик в нашем кластере.

SortedMap> endpointMap = …

AbstractReplicationStrategy replication = …

for (Token token : tokenMetadata.sortedTokens() ) {
    endpointMap.put( token, replication.getNaturalEndpoints( token ) );
}

В приведённом примере endpointMap будет содержать соответствие между максимальным токеном интервала и адресами всех реплик попадающих в интервал ключей. Передав этот endpointMap клиенту, легко сделать нужную нам маршрутизацию. Стоит иметь в виду, что топология кластера меняется время от времени, ноды добавляются и уходят, поэтому эту информацию нужно периодически обновлять.

Мы, например, обновляем её раз в 30 секунд. Также стоит подумать о том, что делать, если клиент вызывает сервис, пользуясь устаревшей топологией. Например, вскоре после расширения кластера, запросы могут уйти не на те ноды. Тут могут быть несколько вполне очевидных стратегий: и редирект, и push-нотификации с новыми нодами. Стоит подумать, какая конкретно подходит под задачу.

С клиентами теперь всё нормально, давайте разберёмся, как нам работать с данными.


Работаем со встроенной БД

У нас есть вот такая таблица:

CREATE TABLE Messages (
    chatId, msgId

    user, type, text, attachments[], terminal, deletedBy[], replyTo…

    PRIMARY KEY ( chatId, msgId )
) 

И нам нужно получить список сообщений, то есть выполнить вот такой запрос:

SELECT FROM Messages
    WHERE chatId = ? AND
    msgId BETWEEN :from AND :to

Возможно, отфильтровав ненужные записи потом.

Как мы сделали бы это через драйвера базы данных? Вот пример, который я взял прямо из мануала.

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;

try (CqlSession session = CqlSession.builder().build()) { // (1)
    ResultSet rs = session.execute("select release_version from system.local"); // (2)
    Row row = rs.one();
    System.out.println(row.getString("release_version")); // (3)
}

Что тут происходит (в строках, помеченных числами 1–3):


  1. Строим сессию с кластером, устанавливаем соединение с кластером
  2. Выполняем запрос, получаем ResultSet
  3. Используем его

На самом деле всё немного сложнее, потому что нужно где-то хранить конфигурацию куда на какой кластер ходить, работать с сессиями и прочее.

Для работы же со встроенной в наше приложение Cassandra мы будем работать с классом QueryProcessor, у которого есть метод execute, который достаточно вызвать для выполнения запроса:

package org.apache.cassandra.cql3;

import java.nio.ByteBuffer;

public class QueryProcessor
{
    public static UntypedResultSet execute(String query,
        ConsistencyLevel cl, Object… values)
        throws RequestExecutionException
}

Обратите внимание, что он — статический. Почему? Дело в том, что нам не нужна никакая сессия ни с каким кластером. Наша локальная нода и есть уже член кластера, ей никуда не нужно устанавливать соединение. Поэтому этот метод статический, вы его вызываете и, естественно, вызываете в контексте своего собственного кластера.

Соответственно, для getMessages () получится примерно такой код выполнения этого запроса:

UntypedResultSet rs = QueryProcessor.execute(
        "SELECT * FROM Messages "
        + "WHERE chatId = ? AND msgId < ? AND msgId > ?"
        ConsistencyLevel.QUORUM, chatId, from, to );

rs.forEach ( row -> {} );

Аналогичным способом для add () мы должны выполнить запрос добавления записи приблизительно таким же образом:

QueryProcessor.execute( "INSERT INTO Messages VALUES (?,?,?,...)", cl, values );

Всё просто. Если покопаться в QueryProcessor, можно найти и более эффективный способ получения записей из базы данных. Он не так просто выглядит, поэтому здесь я его не показал, но даже этот вариант будет быстрее стандартного драйвера за счет экономии на маршалинге и сетевой задержке.



Кеш сообщений

Как мы определились ранее, скорости БД нам не хватает и мы решили реализовывать прикладной кеш сообщений. Для начала давайте посчитаем, сколько данных он будет хранить. Итак, у нас всего:


  • 600 миллиардов сообщений
  • 100 терабайт данных
  • 5 миллиардов чатов
  • Из этих чатов только 5% активных
  • И из каждого мы хотим хранить в кеше 13 последних сообщений

Все исходные данные есть, можем посчитать, что в память у нас попадает довольно большое количество сообщений и данных:


  • 3+ миллиарда сообщений
  • 250 миллионов чатов
  • 500 гигабайт данных

Это без учёта репликации. Нужно их ещё поделить на количество нод в кластере и умножить на replication factor.

Такой кеш сообщений должен обладать прикладным интерфейсом, соответственно, мы должны реализовать какие-то методы из нашего списка (getMessages (), getLastMessages (), add (), search (), indexMessages ()). На практике нужны только первые три, потому что мы хотим, чтобы поиск и индексация никогда ходили в кеш.

Начнём с реализации getMessages () на примере чатика Лизы с Лёхой:


  • Пользователь Лиза открывает ok.ru и выбирает чат с Лёхой. То есть, мы через веб-фронт получаем на некоторый инстанс запрос getMessages ().
  • Далее прикладной код проверяет, есть ли этот чат у нас в кеше.
  • Обнаруживает, что нет, вызывает QueryProcessor, получает данные из БД.
  • Помещает его в кеш.
  • Наконец, возвращает актуальное состояние чата Лизе.

Вот эти пять шагов:

cb3gxnhr2ozu9nqextckx5uyisq.jpeg

Лёха через мобильное приложение делает то же самое и удачно попадает на другую реплику.

Выполняет тот же самый запрос, точно так же в кеше другой реплики данных нужного чата нет, загружаем его, получаем актуальное состояние чата:

s2fpfod8xxjlyou-tuewrh7ruk4.jpeg

Заботясь о состоянии Лёхи, Лиза предлагает ему сходить пообедать, то есть вызывает метод add (). Нода помещает запись о новом сообщении и в базу данных, и в кеш:

o2xjewjnms1q78ar-m4vnm6zhre.jpeg

Теперь в кеше этой реплики у нас три сообщения, а в кеше Лёхиной по-прежнему два:

wvl3ss3gs-orykbpctubexcmf04.jpeg

Лёха, получив пуш на телефон, спрашивает getMessages (), обращается в кеш. А там уже есть данные, но они устаревшие. Мы получаем их, и Лёха остаётся голодным:

kthdbhkv2jtnev9v2kfqnkodl2w.jpeg


Актуальность данных

Очевидно, мы должны как-то сообщать другим репликам, что появилось новое сообщение.

Мы можем использовать информацию о топологии кластера: точно так же, как мы это делали на клиенте, вычислить все реплики и нотифицировать их все с новым сообщением. Например, вызвав какой-то спецметод added ():

db0ictcyag0wkm1scunwy0pyu5s.jpeg

И тогда мы вставим новое сообщение кеши всех остальных реплик, и на всех репликах всегда будет актуальное состояние кешей. Или нет?

Проблема в том, что этот запрос пойдёт через сеть на другую реплику, а, следовательно, он может не пройти:

tkurdnzg7wy3srnvqrzbxh0o_u0.jpeg

И что на самом деле произошло с репликой, до которой не удалось доставить это сообщение, мы не знаем. Может быть, она крэшнулась? У нее вылетели диски? Может, какие-то нарушения в сети произошли?

Время восстановления удаленной реплики спрогнозировать невозможно, а ждать до бесконечности завершения операции add () мы не можем. Однозначно то, что при такой схеме не все нотификации будут достигать всех реплик, поэтому для некоторых записей состояние кешей будет устаревшим. А такая несогласованность имеет тенденцию к накапливанию со временем.

Давайте посмотрим, как эту же ситуацию будет отрабатывать Cassandra. При выполнении INSERT она тоже определит необходимые реплики данных, которые нужно изменить, сформирует для них требования об изменении (мутацию) и попытается послать это требование по сети.

vcinicuremfgsakfmtmvdfquxas.jpeg

Поскольку наш кеш и нода желтой реплики — это один процесс, то такая мутация не может быть доставлена точно в силу тех же самых сетевых и прочих проблем.

drr734bk0t3o0zgepsiiyxuawgi.jpeg

В этом случае Cassandra сохраняет такие мутации локально и персистентно.

Когда связь с желтой репликой снова восстановится, такие пропущенные мутации будут отосланы ей при первой же возможности. Это называется Hint.

04zxzd39y2lgme-e1ll9pogosfo.jpeg

Обычно такие пропущенные мутации сохраняются некоторое ограниченное время, иначе место на дисках работоспособных нод могло бы закончиться из-за одной-единственной достаточно долго не работающей ноды.

Для восстановления согласованности в случаях долгой неработоспособности ноды служит Read Repair — сравнение реплик при чтении. В результате обнаружения расхождения версий формируется мутация на устаревшие реплики.

tm3bz2c6nrwdmohqtg1zmeo0enu.jpeg

И, наконец, Streaming Repair — это пакетный фоновый процесс обхода и сравнения всех или какой-то части данных всех реплик.

В результате такого процесса формируется файл, содержащий несовпадающие между репликами данные, который передаётся на устаревшую ноду целиком через отдельное соединение. Это называется Streaming.

cvrr1euhqasnjwqkxjenhe70f3g.jpeg

Очевидно, что мы могли бы всё вот это реализовать сами. Это ещё одна интересная задача на несколько лет.

Но мы с вами люди занятые, давайте посмотрим, что мы х

© Habrahabr.ru