[Из песочницы] Как я свой мессенджер писал
Одним вечером, после очередного расстраивающего дня, наполненного попытками наладить баланс в своей игре, я решил, что мне срочно требуется отдых. Переключусь на другой проект, быстренько его сделаю, верну на место скатившуюся за время разработки игры самоооценку и с новыми силами возьму игру штурмом! Главное выбрать проект nice and relaxing… Написать свой месседжер? Ха! How hard can it be?
Код можно посмотреть здесь.
Краткая предыстория
До начала работы над мессенджером почти год корпел над мультиплеерной онлайн Line Tower Wars игрой. Программирование шло хорошо, всё остальное (баланс и визуал в особенности) — не очень. Внезапно оказалось, что сделать игру и сделать увлекательную игру (увлекательную для кого-то помимо самого себя) — две разные вещи. После года мытарств мне нужно было отвлечься, поэтому я решил попробовать свои силы в чём-то другом. Выбор пал на мобильную разработку, а именно, Flutter. Слышал множество хороших вещей про Flutter, да и дарт после недолгих экспериментов мне понравился. Решил написать свой собственный мессенджер. Во-первых, хорошая практика по реализации и клиента, и сервера. Во-вторых, будет что-то весомое положить в портфолио для поиска работы, я как раз нахожусь в процессе.
Запланированный функционал
- Личные и групповые чаты
- Отправка текста, изображений и видео
- Аудио и видео-звонки
- Подтверждение получения и прочтения (галочки из Вотсапа)
- «Печатает…»
- Уведомления
- Поиск по QR-коду и геолокации
Забегая вперёд, могу с гордостью (и с облегчением) сказать, что почти всё запланированное было реализовано, а что ещё не реализовано — будет реализовано в ближайшее время.
Выбор языка
С выбором языка долго не думал. Сначала был соблазн использовать дарт и для клиента, и для сервера, но более детальная инспекция показала, что доступных драйверов для дарт не очень много, а те что есть не внушают особого доверия. Хотя не поручусь говорить о текущем моменте, возможно ситуация улучшилась. Так что выбор мой пал на C#, с которым я работал в Unity.
Архитектура
Начал с продумывания архитектуры. Конечно, учитывая что моим мессенджером скорее всего будут пользоваться 3 с половиной человека, можно было бы не заморачиваться с архитектурой вообще. Берёшь и делаешь как в бесчисленных туториалах. Вот нода, вот монго, вот вебсокеты. Готово. И Firebase где-то тут. Но так не интересно. Я решил делать мессенджер, способный легко горизонтально скейлиться, будто ожидаю миллионы одновременных клиентов. Однако так как опыта в этой сфере у меня не было никакого, пришлось всё познавать на практике методом ошибок и снова ошибок.
Я не утвержаю, что такая архитектура супер крута и надёжна, но она жизнеспособна и в теории должна выдерживать большие нагрузки и скейлиться горизонтально, но я не очень понимаю, как проверить. И я надеюсь, что не упустил какой-то очевидный момент, который известен всем, кроме меня.
Ниже будет подробное описание отдельных компонентов.
Frontend Server
Ещё до того как я взялся делать игру, меня увлекла концепция асинхронного однопоточного сервера. Эффективно и без потенциальных race’ов — о чем ещё можно просить. С целью разобраться, как такие сервера устроены, я стал копаться в модуле asyncio
языка python. Увиденное решение показалось мне очень изящным. Если кратко, то решение на псевдокоде выглядит так.
// Есть сокет, из которого мы ожидаем получить байты, но мы не знаем
// пришли ли они уже или ещё нет. Вместо того чтобы сразу вызывать socket.Receive
// и потенциально блокировать весь поток, делаем:
var bytesReceived = Completer
С помощью такой нехитрой техники мы можем обслуживать большое число сокетов одним потоком. Мы никогда не блокируем поток в ожидании пока байты будут получены или отправлены. Поток всегда занят полезной работой. Concurrency, одним словом.
Frontend сервера реализованы именно так. Они все однопоточные и асинхронные. Поэтому для максимальной производительности нужно запускать столько серверов на одной машине сколько у неё имеется ядер (4 на картинке).
Frontend сервер читает сообщение от клиента и, основываясь на коде сообщения, отправляет его в один из топиков кафки.
Зачем такие сложности? Зачем делить топик на партишн? Партишн служит в качестве единицы параллелизации. Несколько потребителей (consumer) могут подписаться на один и тот же топик (образуя группу consumer’ов), и тогда кафка (по умолчанию) распределит все партишн равномерно между ними. Если, скажем, у нас топик с двумя партишн, на который подписано 2 клиента, кафка распределит каждому клиенту по одному партишн. Если партишн 3 — одному из клиентов достанется 2. Кафка умеет детектить добавление новых партишн и новых клиентов и автоматически перераспределять партишн в случае необходимости.
Frontend сервер отправляет сообщение в кафку без ключа (когда нет ключа, кафка просто отправляет сообщения в партишн по очереди). Из топика сообщение вытаскивает один из соответствующих backend серверов. Сервер обрабатывает сообщение и… что дальше? А что дальше зависит от типа сообщения.
В самом банальном случае происходит цикл запрос-ответ. Например, на запрос о регистрации нам нужно просто дать клиенту ответ (Success
, EmailAlreadyInUse
, и тп). Но на сообщение, содержащем приглашение в существующий чат новых членов (Васю, Эмиля и Юлю), нам нужно ответить сразу тремя разными типами сообщений. Первый тип — нужно уведомить приглашающего об исходе операции (вдруг произошла серверная ошибка). Второй тип — нужно уведомить всех текущих членов чата, что в чате теперь такие-то новые члены. Третий — отправить приглашения Васе, Эмилю и Юле.
Окей, звучит не очень сложно, но для того чтобы отправить сообщение какому-либо клиенту нам нужно: 1) узнать с каким frontend сервером этот клиент соединён (ведь мы не выбираем с каким конкретно сервером клиент соединятся, за нас решает балансировщик); 2) передать сообщение от backend сервера нужному frontend серверу; 3) собственно, отправить сообщение клиенту.
Для реализации пунктов 1 и 2 я решил использовать отдельный топик («frontend servers» топик). Разделение authentication, session и call топиков на партишн служит как механизм параллелизации. Видим что session сервера сильно загружены? Просто добавляем парочку новых партишн и session серверов, и кафка сделает перераспределение нагрузки за нас, разгружая имеющиеся session сервера. Разделение же «frontend servers» топика на партишн служит как механизм маршрутизации.
Каждому frontend серверу соответствует один партишн «frontend servers» топика (с таким же индексом, что и сам сервер). То есть серверу 0 — партишн 0 и тд. Кафка даёт возможность подписаться не только на определённый топик, но и на определённый партишн определённого топика. Все frontend сервера на стартапе подписываются на соответствующий партишн. Таким образом backend сервер получает возможность отправить сообщение конкретному frontend серверу, отправив сообщение в определённый партишн.
Окей, теперь, когда клиент присоединяется, нужно просто сохранять где-то пару UserId — Frontend Server Index. При дисконнекте — удалять. Для этих целей подойдёт любое из многих in-memory key-value бд. Я выбрал редис.
Как всё выглядит на практике. Первым делом после установки соединения клиент Андрей отправляет серверу сообщение Join
. Frontend сервер получает сообщение и пересылает его в топик session, предварительно добавляя заголовок «Frontend Server»: {index}. Один из backend session серверов получит сообщение, прочитает токен авторизации, определит что это за юзер присоединился, прочитает добавленный frontend сервером индекс и сделает запись UserId — Index в редис. С этого момента клиент считается онлайн, и теперь мы знаем через какой frontend сервер (и, соответственно, через какой партишн «frontend servers» топика) мы можем до него «достучаться», когда другие клиенты будут отправлять Андрею сообщения.
* На самом деле процесс чуть сложнее чем я описал. Можете ознакомиться в исходном коде.
Псевдокод frontend сервера
// Frontend Server 6
while (true) {
// Consume from "Frontend Servers" topic, partition 6
var messageToClient = consumer.Consume();
if (message != null) {
relayMessageToClient(messageToClient);
}
var callbacks = selector.Poll();
while (callbacks.TryDequeue(out callback)) {
callback();
}
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
while (!callAtQueue.IsEmpty && callAtQueue.PeekPriority() <= now) {
callAtQueue.Dequeue()();
}
while (messagesToRelayToBackendServers.TryDequeue(out messageFromClient)) {
// choose topic
producer.Produce(topic, messageFromClient);
}
}
Здесь есть несколько трюков.
1) relayMessageToClient
. Будет ошибкой просто взять нужный сокет и сразу начать отправлять в него сообщение, потому что, возможно, мы уже отправляем клиенту какое-то другое сообщение. Если мы начнем посылать байты, не проверив не занят ли сокет в данный момент, сообщения будут перемешаны. Как и во многих других местах, где требуется упорядоченная обработка данных, трюк заключается в использовании очереди, а именно, очереди из Completer’ов (TaskCompletionSource
в C#).
void async relayMessageToClient(message) {
// find client
await client.ReadyToSend();
await sendMessage(client, message);
client.CompleteSend();
}
class Client {
// ...
sendMessageQueue = new LinkedList>();
async Future ReadyToSend() {
var sendMessage = Completer
Если очередь не пуста, значит, в данный момент сокет уже занят. Создаём новый completer
, добавляем его в очередь и await
'им предыдущийcompleter
. Таким образом, когда предыдущее сообщение будет отправлено, CompleteSend
завершит completer
, что приведет к тому, что сервер начнёт отправлять следующее сообщение. Такая очередь так же позволяет гладко propagate исключения. Допустим, во время отправки клиенту какого-то сообщения произошла ошибка. В таком случае нам нужно завершить с исключением отправку не только этого сообщения, но и всех сообщений, которые в данный момент ожидают своего часа в очереди (висят на await
'ах). Если мы этого не сделаем, то они так и продолжат висеть, и мы получим утечку памяти. Для краткости, код, который занимается этим, здесь не приведён.
2) selector.Poll
. Собственно, даже не трюк, а просто попытка сгладить недостатки реализации метода Socket.Select
(selector
— просто обертка над этим методом). В зависимости от ОС этот метод под капотом использует либо select
, либо poll
. Но важно здесь не это. Важно то, как этот метод работает со списками, которые мы подаём ему на вход (список сокетов на чтение, на запись, на проверку ошибки). Этот метод берёт списки, опрашивает сокеты и оставляет в списках только те сокеты, которые готовы выполнить требуемую операцию. Все остальные сокеты выкидываются из списков. «Выкидывание» происходит через RemoveAt
(то есть все последуюшие элементы сдвигаются, что неэффективно). Плюс к этому, так как нам нужно опрашивать все зарегистрированные сокеты каждую итерацию цикла, такое «очищение» вообще приносит вред, приходится каждый раз заново наполнять списки. Мы можем обойти все эти проблемы, используя кастомный List
, метод RemoveAt
которого не удаляет элемент из списка, а просто помечает его как удалённый. Класс ListForPolling
и есть моя реализация такого списка. ListForPolling
работает только с методом Socket.Select
и не годится ни для чего другого.
3) callAtQueue
. В большинстве случаев frontend сервер, переслав клиентское сообщение backend серверу, ожидает ответ (подтверждение, что операция прошла успешно, или ошибка, если что-то пошло не так). Если он не дожидается ответа в течение какого-то конфигурируемого промежутка времени, он отправляет клиенту ошибку, чтобы тот не ждал ответа, который никогда не придёт. callAtQueue
— это priority queue. Сразу после того, как сервер отправляет сообщение в кафку, он делает примерно следущее:
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
callAtQueue.Enqueue(callback, now + config.WaitForReplyMSec);
В коллбэке ожидание ответа отменяется и начинается отправка серверной ошибки. Если же ответ от backend сервера получен, коллбэк ничего не делает.
Использовать await Task.WhenAny(answerReceivedTask, Task.Delay(x))
нет возможности, так как код после Task.Delay
выполняется на потоке из пула.
Вот, собственно, всё, что касается frontend серверов. Здесь требуется небольшая поправка. На самом деле, сервер не полностью однопоточный. Конечно, кафка под капотом использует потоки, но я имею в виду код приложения. Дело в том, что отправка сообщения в топик кафки (produce) может и не преуспеть. Кафка в случае провала сам повторяет отправку определённое конфигурируемое количество раз, но, если и повторные отправления проваливаются, кафка бросает это дело как безнадёжное. Проверить, было ли сообщение успешно отправлено или нет, можно в deliveryHandler
, который мы передаём в метод Produce
. Кафка вызывает этот хэндлер в I/O потоке producer’а (поток, который занимается отправкой сообщений). Мы должны удостовериться, что сообщение отправлено успешно, и, если нет, отменить ожидание ответа от backend сервера (ответ не придёт, потому что запрос не был отправлен) и отправить клиенту ошибку. То есть нам никак не избежать взаимодействия с другим потоком.
* При написании статьи я вдруг осознал, что мы можем не передавать deliveryHandler
в метод Produce
или просто игнорировать все ошибки кафки (клиенту всё равно будет отправлена ошибка по таймауту, который я описал ранее) — тогда весь наш код будет однопоточным. Теперь думаю, как лучше сделать.
Учитывая, что я использую кафку в качестве брокера сообщений, может возникнуть вопрос, а почему, собственно, не использовать RabbitMQ? Весь нужный мне функционал есть и в рэббите. И я, действительно, сначала использовал его. Так почему перешёл на кафку? Рэббит проще в использовании, но в моем случае он существенно усложнял код frontend серверов. При использовании рэббита, чтобы мониторить сообщения от backend серверов, мы подписываемся на определенную очередь сообщений, и рэббит вызывает предоставленный нами коллбек при каждом новом сообщении. Проблема заключается в том, что коллбеки вызываются на потоках из пула, что ломает мою однопоточную модель. Приходится использовать мютексы, что сразу делает код сложным и error-prone. О том, что рэббит также предоставляет basicGet
механизм, который делает именно то, что мне нужно, мне было невдомёк в то время. Поэтому я перешёл на кафку. Если бы я знал про basicGet
, скорее всего остался бы на рэббите, но о переходе на кафку не жалею. Кафка легче кластеризуется и в теории обладает большей пропускной способностью.
Backend Server
По сравнению с frontend сервером, интересных моментов здесь практически нет. Все backend сервера работают одинаково. На стартапе сервер подписывается на топик (authentication, session или call в зависимости от роли), и кафка назначает ему один или более партишн. Сервер получает сообщение из кафки, обрабатывает и обычно посылает в ответ одно или более сообщений. Почти реальный код:
void Run() {
long lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
while (true) {
var consumeResult = consumer.Consume(
TimeSpan.FromMilliseconds(config.Consumer.PollTimeoutMSec)
);
if (consumeResult != null) {
var workUnit = new WorkUnit() {
ConsumeResult = consumeResult,
};
LinkedList workUnits;
if (partitionToWorkUnits.ContainsKey(consumeResult.Partition)) {
workUnits = partitionToWorkUnits[consumeResult.Partition];
} else {
workUnits = partitionToWorkUnits[consumeResult.Partition] =
new LinkedList();
}
workUnits.AddLast(workUnit);
handleWorkUnit(workUnit);
}
if (
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - lastCommitTime >=
config.Consumer.CommitIntervalMSec
) {
commitOffsets();
lastCommitTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
}
}
TopicPartitionOffset
. Когда мы читаем (consume) сообщение из кафки, мы получаем ConsumeResult
, который, помимо самого сообщения, также содержит TopicPartitionOffset
. Зачем нам нужна эта информация? Кафка гарантирует at least once delivery, что означает, что сообщения не будут потеряны и будут доставлены в большинстве случаев один раз (иногда возможна повторная доставка сообщения). Это достигается за счёт того, что кафка для каждого партишн каждого топика хранит последний подтвержённый (commited) оффсет. Скажем, один consumer вытащил из назначенного ему партишн сообщение с оффсетом 16, обработал его, закоммитил 16й оффсет, вытащил следующее сообщение, но во время обработки вдруг умер, не сделав коммит. Кафка назначит его партишн какому-то другому consumer’у из той же группы consumer’ов и начнёт доставлять ему сообщения из данного партишн, начиная с оффсета 16 + 1 (последний подтверждённый оффсет + 1). Таким образом сообщение 17 не будет потеряно. Кафка может либо коммитить оффсеты автоматически каждые N миллисекунд, либо полностью передать контроль над коммитами пользователю.
Я отключил авто-коммит и занимаюсь коммитами самостоятельно. Это необходимо так как handleWorkUnit
, где собственно и осуществляется обработка сообщения, — это async void
метод, поэтому нет никаких гарантий, что сообщение 5 будет обработано раньше сообщения 6. Кафка хранит только один commited оффсет (а не набор оффсетов), соответственно, перед тем как коммитить оффсет 6, нам нужно убедиться, что все предыдущие сообщения тоже были обработаны. Помимо этого, один backend сервер может потреблять сообщения из нескольких партишн одновременно, и, значит, должен следить за тем чтобы коммитить правильный оффсет в соответствующий партишн. Для этого мы используем hash map вида partition: work units. Вот как выглядит код commitOffsets
(настоящий код на этот раз):
private void commitOffsets() {
foreach (LinkedList workUnits in partitionToWorkUnits.Values) {
WorkUnit lastFinishedWorkUnit = null;
LinkedListNode workUnit;
while ((workUnit = workUnits.First) != null && workUnit.Value.IsFinished) {
lastFinishedWorkUnit = workUnit.Value;
workUnits.RemoveFirst();
}
if (lastFinishedWorkUnit != null) {
offsets.Add(lastFinishedWorkUnit.ConsumeResult.TopicPartitionOffset);
}
}
if (offsets.Count > 0) {
consumer.Commit(offsets);
foreach (var offset in offsets) {
logger.Debug(
"{Identifier}: Commited offset {TopicPartitionOffset}",
identifier,
offset
);
}
offsets.Clear();
}
}
Как видно, мы итерируем по ворк юнитам, находим последний завершённый к данному моменту юнит, после которого нет незавершённых, и коммитим соответствующий ему оффсет. Такой цикл позволяет нам избежать «дырявых» коммитов. Например, если у нас в данный момент 4 ворк юнита (0: Finished, 1: Not Finished, 2: Finished, 3: Finished
), мы можем закоммитить только 0й юнит, так как, если закоммитим сразу 3й, это может привести к потенциальной потере 1 го, если вдруг сервер умрёт прямо сейчас.
class WorkUnit {
public ConsumeResult ConsumeResult { get; set; }
private int finished = 0;
public bool IsFinished => finished == 1;
public void Finish() {
Interlocked.Increment(ref finished);
}
}
handleWorkUnit
, как было сказано, async void
метод, и он, соответственно, полностью обёрнут в try-catch-finally
. В try
он вызывает нужный сервис, а в finally
— workUnit.Finish()
.
Сервисы довольно тривиальны. Вот, например, какой код выполняется, когда юзер отправляет новое сообщение:
private async Task createShareItem(CreateShareItemMessage msg) {
byte[] message;
byte[] messageToPals1 = null;
int?[] partitions1 = null;
// Вытаскиваем UserId из токена.
long? userId = hashService.ValidateSessionIdentifier(msg.SessionIdentifier);
if (userId != null) {
var shareItem = new ShareItemModel(
requestIdentifier: msg.RequestIdentifier,
roomIdentifier: msg.RoomIdentifier,
creatorId: userId,
timeOfCreation: null,
type: msg.ShareItemType,
content: msg.Content
);
// Создаём новое сообщение или возвращаем null,
// если такой комнаты не существует.
long? timeOfCreation = await storageService.CreateShareItem(shareItem);
if (timeOfCreation != null) {
// Ищем всех членов комнаты в кэше.
List pals = await inMemoryStorageService.GetRoomPals(
msg.RoomIdentifier
);
if (pals == null) {
// Если нет в кэше - вытаскиваем из бд и сохраняем в кэш.
pals = await storageService.GetRoomPals(msg.RoomIdentifier);
await inMemoryStorageService.SaveRoomPals(msg.RoomIdentifier, pals);
}
// Хотим отправить сообщение всем, кроме отправителя.
pals.Remove(userId.Value);
if (pals.Count > 0) {
// Создаём ack, чтобы отслеживать, кто не получил и
// кто не прочитал сообщение.
await storageService.CreateAck(
msg.RequestIdentifier, userId.Value, msg.RoomIdentifier,
timeOfCreation.Value, pals
);
// in - список UserId, out - список индексов frontend серверов,
// к которым юзеры подключены. Если какой-то юзер офлайн -
// индекс будет null.
partitions1 = await inMemoryStorageService.GetUserPartitions(pals);
List onlinePals = getOnlinePals(pals, partitions1);
// Если никого нет онлайн, то и слать сообщение никому не нужно.
// Офлайн юзеры получат сообщение при следующем заходе в приложение.
if (onlinePals.Count > 0) {
messageToPals1 = converterService.EncodeNewShareItemMessage(
userId.Value, timeOfCreation.Value, onlinePals, shareItem
);
nullRepeatedPartitions(partitions1);
// Какие-то юзеры могут быть подключены к одному и тому же
// frontend серверу, поэтому здесь мы null'им дупликаты.
}
}
message = converterService.EncodeSuccessfulShareItemCreationMessage(
msg.RequestIdentifier, timeOfCreation.Value
);
} else {
message = converterService.EncodeMessage(
MessageCode.RoomNotFound, msg.RequestIdentifier
);
}
} else {
message = converterService.EncodeMessage(
MessageCode.UserNotFound, msg.RequestIdentifier
);
}
return new ServiceResult(
message: message, // Это сообщение уйдёт отправителю.
messageToPals1: messageToPals1, // Это - всем остальным членам комнаты.
partitions1: partitions1
);
}
База данных
Большая часть функционала сервисов, вызываемых backend серверами, — это просто добавление новых данных в бд и обработка уже имеющихся. Очевидно, как база данных устроена и как мы ей оперируем играет очень важное значение для мессенджера, и тут мне бы хотелось сказать, что я подошёл к вопросу выбора бд очень тщательно после внимательного изучения всех вариантов, но это не так. Я просто выбрал CockroachDb, потому что он обещает много при минимуме усилий и имеет совместимый с postgres синтаксис (я работал с постгрес раньше). Были мысли использовать Кассандру, но в конце концов решил остановиться на чём-то знакомом. Я никогда раньше не работал ни с кафкой, ни с рэббитом, ни с Flutter и дарт, ни с WebRtc, поэтому решил не тащить ещё и Кассандру, так как боялся утонуть во всём множестве новых для меня технологий.
Из всех частей моего проекта дизайн базы данных — вещь, в которой я сомневаюсь больше всего. Я не уверен, что решения, которые я принял, действительно, хорошие решения. Всё работает, но можно было сделать лучше. Например, есть таблицы ShareRooms (так я называю чаты) и ShareItems (так я называю сообщения). Так вот все юзеры, входящие в какую-то комнату, записаны в jsonb поле этой комнаты. Это удобно, но явно очень медленно, так что скорее всего переделаю на использование внешних ключей. Или, например, таблица ShareItems хранит все сообщения. Что тоже удобно, но так как ShareItems является одной из самых нагруженных таблиц (постоянные select
и insert
), возможно стоит создавать новую таблицу для каждой комнаты или что-то в этом роде. Кокроач раскидывает записи по разным нодам, соответственно, нужно тщательно продумывать куда какая запись пойдёт, чтобы добиться максимальной производительности, а я этого не делал. В общем, как можно понять из всего вышесказанного, базы данных не самое моё сильное место. Прямо сейчас я вообще тестирую всё на постгрес, а не кокроач, потому что так меньше нагрузки на мою рабочую машину, она и так бедная от нагрузок скоро взлетит. Благо код для постгрес и кокроач разнится совсем немного, так что переключаться не составляет труда.
Сейчас я нахожусь в процессе изучения, как, собственно, кокроач работает (как происходит mapping между SQL и key-value (кокроач использует RocksDb под капотом), как он распределяет данные между нодами, реплицирует и тд). Стоило, конечно, изучить кокроач перед тем как использовать его, но лучше поздно чем никогда.
Я думаю, что база претерпит большие изменения, когда я стану лучше разбираться в этом вопросе. Прямо сейчас мне не даёт покоя таблица Acks. В этой таблице я храню данные о том, кто ещё не получил и кто ещё не прочитал сообщение (чтобы показывать юзеру галочки). Легко уведомить юзера, что его сообщение прочитано, если юзер сейчас онлайн, но если нет, нам нужно сохранять эту информацию, чтобы уведомить юзера позже. И так как доступны групповые чаты, недостаточно просто хранить флаг, нужны данные про отдельных юзеров. Так вот здесь прямо просится использование битовых строк (одна строка на ещё не получивших юзеров, вторая — на ещё не прочитавших). Тем более кокроач поддерживат bit
и bit varying
. Однако я так и не придумал, как это дело реализовать, учитывая, что состав комнат может постоянно меняться. Чтобы битовые строки сохраняли свой смысл, юзеры в комнате должны оставаться в том же порядке, что довольно затрудительно сделать, когда, например, какой-то юзер покидает комнату. Здесь есть варианты. Возможно стоит записывать -1 вместо того чтобы удалять юзера из jsonb поля, чтобы сохранялся порядок, или использовать какой-то способ версионирования, чтобы мы знали, что вот эта вот битовая строка ссылается на порядок юзеров, который был тогда-то тогда-то, а не на нынешний порядок юзеров. Я всё ещё в процессе продумывания, как это дело лучше реализовать, а пока ещё не получившие и ещё не прочитавшие юзеры — это тоже просто jsonb поля. Учитывая, что запись в таблицу Acks делается при каждом сообщении, объём данных получается большим. Хотя запись, конечно, удаляется, когда сообщение получено и прочитано всеми.
Flutter
Долгое время я работал над серверной частью и использовал простые консольные клиенты для теста, так что даже не создавал Flutter проект. А когда создал, думал, что серверная часть была сложной частью, а приложение это так, фигня, за пару дней разберусь. Пока работал над сервером, пару раз создавал Hello World’ы на флаттер, чтобы прочувствовать фреймворк, и, так как мессенджеру не требуется какой-то замысловатый UI, думал, что полностью готов. Так вот UI, действительно, фигня, но реализация функционала доставила мне проблем (и ещё доставит, так как не всё готово).
State management
Самая популярная тема. Есть тысяча способов управлять состоянием, и рекомендуемый подход меняется раз в полгода. Сейчас мэйнстримом является provider. Лично я для себя выбрал 2 способа: bloc и redux. Bloc (Business Logic Component) для управления локальным состоянием и redux для управления глобальным.
Bloc — это не какая-то библиотека (хотя, конечно, есть и библиотека, уменьшающая бойлерплейт, но я ей не пользуюсь). Bloc — это подход, основанный на стримах. Вообще дарт довольно приятный язык, а стримы так вообще конфетка. Суть этого подхода заключается в том, что мы распихиваем всю бизнес-логику по сервисам, а коммуникацию между UI и сервисами осуществляем посредством контроллёра, который предоставляет нам различные стримы. Пользователь нажал кнопку «найти контакт»? Используя sink
(другой конец стрима) отправляем в контроллёр событие SearchContactsEvent
, контроллёр вызовет нужный сервис, дождётся результата и вернёт список юзеров обратно UI тоже посредством стрима. UI ждёт результаты, используя StreamBuilder
(виджет, который ребилдится каждый раз когда в стрим, на который он подписан, поступают новые данные). Вот, собственно, и всё. В некоторых случаях нам нужно обновлять UI безо всякого участия юзера (например, когда пришло новое сообщение), но это тоже легко делается посредством стримов. Фактически, простой MVC со стримами, никакой магии.
По сравнению с некоторыми другими подходами, bloc требует больше бойлерплейта, но, по моему мнению, лучше использовать родные решения без участия сторонних библиотек, если только использование стороннего решения не даёт какие-то существенные преимущества. Чем больше сверху абстракций, тем сложнее понять, в чём ошибка, когда ошибка случается. Преимущества провайдера я для себя не считаю достаточно существенными, чтобы переходить на него. Но опыта у меня в этой сфере немного, так что вполне вероятно я сменю лагерь в будущем.
Ну, а про redux и так все всё знают, так что и говорить нечего. Тем более я его вырезал из приложения :) Использовал для управление аккаунтом, но затем, поняв что в данном случае преимуществ над блоком особых нет, вырезал, чтобы не тащить лишнее. Но в целом считаю redux полезной вещью для управления глобальным состоянием.
Самая мучительная часть
Что делать, если юзер отправил сообщение, но перед тем, как оно было послано, пропало интернет соединение? Что делать, если юзеру пришло подтверждение прочтения, но он закрыл приложение перед тем, как соответствующая запись в базе данных была обновлена? Что делать, если юзер пригласил в комнату своего друга, но перед тем как приглашение было отправлено, у него умерла батарея? Вы когда-нибудь задавались подобными вопросами? Вот и я нет. Раньше. А вот в процессе разработки стал задаваться. Так как соединение может в любой момент пропасть, а телефон в любой момент выключиться, необходимо подтверждать всё. Not fun. Поэтому самое первое сообщение, которое клиент отправляет серверу (Join
, если помните) — это не просто «Hello I am online», это «Hello I am online and here are unconfirmed rooms, here are unconfirmed messages, here are unconfirmed acks, here are unconfirmed room membership operations, and here are last received messages per room». И сервер отвечает похожей простынёй: «Пока ты был офлайн такие-то твои сообщения были прочитаны такими-то юзерами, а ещё в эту комнату пригласили Петю, а ещё из этой комнаты ушла Света, а ещё тебя пригласили в эту комнату, а вот в этих двух комнатах по 40 новых сообщений». Я бы очень хотел знать, как подобные вещи делаются в других мессенджерах, потому что моя реализация не блещет изяществом.
Изображения
В данный момент можно отправлять текст, текст + изображения и просто изображения. Отправка видео ещё не реализована. Изображения немного ужимаются и сохраняются в Firebase storage. В самом сообщении передаются ссылки. По получении сообщения клиент скачивает изображения, генерирует миниатюры и сохраняет всё на файловую систему. В базу записываются пути к файлам. Кстати, генерация миниатюр — единственный код, выполняемый на отдельном треде, так как это compute-heavy операция. Я просто запускаю один воркер-поток, скармливаю ему изображение и в ответ получаю миниатюру. Код предельно прост, так как дарт даёт удобные абстракции для работы с потоками.
class ThumbnailGeneratorService {
SendPort _sendPort;
final Queue> _completerQueue =
Queue>();
ThumbnailGeneratorService() {
var receivePort = ReceivePort();
Isolate.spawn(startWorker, receivePort.sendPort);
receivePort.listen((data) {
if (data is SendPort) {
_sendPort = data;
} else {
var completer = _completerQueue.removeFirst();
completer.complete(data);
}
});
}
static void startWorker(SendPort sendPort) async {
var receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
receivePort.listen((imageBytes) {
Image image = decodeImage(imageBytes);
Image thumbnail = copyResize(image, width: min(image.width, 200));
sendPort.send(Uint8List.fromList(encodePng(thumbnail)));
});
}
Future generate(Uint8List imageBytes) {
var completer = Completer();
_completerQueue.add(completer);
_sendPort.send(imageBytes);
return completer.future;
}
}
Также используется Firebase auth, но только для авторизации доступа к Firebase storage (чтобы юзер не мог, скажем, залить профильную картинку кому-то другому). Вся остальная авторизация осуществляется через мои серверы.
Формат сообщений
Здесь вы наверное ужаснётесь, так как я использую обычные массивы байтов. Json отпадает, потому что требуется эффективность, а про protobuf я не знал, когда начинал. Использование массивов требует большой аккуратности, потому что один неправильный индекс и всё пойдёт наперекосяк.
Первые 4 байта — длина сообщения.
Следующий байт — код сообщения.
Следующие 16 байт — идентификатор запроса (uuid).
Следующие 40 байт — токен авторизации.
Остальная часть сообщения.
Длина сообщения требуется, так как я не использую http или вебсокеты, или какой-то другой протокол, который обеспечивает разделение одного сообщения от другого. Мои frontend сервера видят только потоки байтов, и они должны знать, где одно сообщение заканчивается, и начинается другое. Есть несколько способов разделять сообщения (например, использовать какой-то никогда не встречающийся в сообщениях символ в качестве разделителя), но я предпочёл указывать длину, так как этот способ самый простой, хоть он и влечёт за собой оверхед, так как большинству сообщений хватает и одного байта для указания длины.
Код сообщения это просто один из членов enum’а MessageCode
. Routing осуществляется по коду, и, так как мы можем вытащить код из массива без предварит