Синхронизация данных в приложениях реального времени c Theron

Иногда я рисую себе граф того, как должна выглядеть архитектура современных систем и нахожу те моменты процесса разработки, которые могут быть улучшены и те практики, которые могут быть применены для улучшения этих процессов. После очередной такой итерации я еще раз убедился, что существуют потрясающие фреймворки и методологии для разработки и серверной и клиентской частей, но синхронизация данных между клиентом, сервером и базой данных работает не так, как того требуют современные реалии: быстрое реагирование на изменение состояния системы, распределенность и асинхронность обработки данных, повторное использование раннее обработанных данных.

В последние годы требования к современным приложениям и методы их разработки значительно изменились. Большинство таких приложений используют асинхронную модель, состоящую из множества слабо связанных компонентов (микросервисов). Пользователи же хотят, чтобы приложение работало безотказно и всегда было в актуальном состоянии (данные должны быть синхронизированы в любой момент времени), проще говоря, пользователи чувствуют себя более комфортно, когда им не нужно каждый раз нажимать кнопку «Обновить» или полностью перезагружать приложение, если что-то пошло не так. Под катом немного теории и практики и полноценное приложением c открытым исходным кодом со cтеком разработки React, Redux/Saga, Node, TypeScript и нашим проектом Theron.

image
Rick and Morty. Рик открывает множество порталов.
Я использовал различные сервисы для синхронизации и хранения данных в реальном времени, большинство из которых упомянуто в этой статье. Но каждый раз, как только приложение развивалось в более сложный продукт, становилось очевидным, что система слишком сильно зависит от одного провайдера услуг и в ней нет той необходимой гибкости, которую дает создание своей микроархитектуры с множеством диверсифицированных сервисов-сателитов, использование классических баз данных (SQL и NoSQL) и написание кода, взамен конструкторам и панелям управления BaaS. Такой подход, действительно, более сложен на начальной стадии разработки прототипа, но он окупает себя в будущем.

Результатом моих исследований стал Theron. Theron — сервис для создания современных приложений реального времени. Реактивное хранилище данных Theron беспрерывно транслирует изменения, произошедшие в базе данных, исходя из запроса к ней. Чуть больше чем за четыре месяца небольшой командой из двух разработчиков мы реализовали базовую архитектуру, основные критерии которой:

  • Быстрое создание новых приложений и безболезненная миграция существующих на Theron.
  • Использование современных практик при создании асинхронных, распределенных и отказоустойчивых систем и изоморфизм компонентов системы.
  • Распределенная интеграция на низком уровне с базами данных, такими как Postgres и Mongo.
  • Легкая интеграция с современными фреймворками, такими как React, Angular и их друзьями: ReactiveX, Redux т.д.
  • Сфокусированность на решение задачи синхронизации данных, а не предоставление полного стека разработки и последующего «вендор локинга».
  • Основная логика приложений (в том числе аутентификация и права доступа) должна реализовываться разработчиками на их стороне.


Реактивные каналы


Мне понравился функциональный подход еще тогда, когда я познакомился с одним из старейших функциональных языков программирования, ориентированным на символьные вычисления Рефал. Позже, сам того не осознавая, я начал использовать реактивную парадигму программирования, и, со временем, большая часть моей работы строилась на этих подходах.

Theron построен на основе ReactiveX. Фундаментальный концепт в Theron —реактивные каналы, предоставляющие гибкий способ трансляции данных различным сегментам пользователей. Theron использует классический Pub/Sub шаблон проектирования. Для создания нового канала (количество неограниченно) и стриминга данных достаточно лишь создать новую подписку.

После установки (англ.), импортируйте Theron и создайте нового клиента:

import { Theron } from 'theron';

const theron = new Theron('https://therondb.com', { app: 'YOUR_APP_NAME' });


Создание нового клиента не устанавливает нового WebSocket подключения и не начинает синхронизацию данных. Подключение устанавливается только тогда, когда создается подписка, при условии того, что нет другого активного подключения. То есть в рамках реактивного программирования клиент Theron и каналы — это «cold observable» объекты.

Создайте новую подписку:

const channel = theron.join('the-world');

const subscription = channel.subscribe(
  message => {
    console.log(message.payload);
  },

  err => {
    console.log(err);
  },

  () => {
    console.log('done');
  }
);


Когда канал больше не нужен — отпишитесь:

subscription.unsubscribe();


Отправка данных клиентам, подписанных на этот канал, со стороны сервера (Node.js) также проста:

import { Theron } from 'theron';

const theron = new Theron('https://therondb.com', { app: 'YOUR_APP_NAME', secret: 'YOUR_SECRET_KEY' });

theron.publish('the-world', { message: 'Greatings from Cybertron!' }).subscribe(
  res => {
    console.log(res);
  },

  err => {
    console.log(err);
  },

  () => {
    console.log('done');
  },
);


Theron использует экспоненциальный бэкофф (включен по умолчанию) при потере соединения или при возникновении некритических ошибок (англ.): ошибки, когда возможна повторная подписка на канал.

Реализация многих алгоритмов в рамках реактивного программирования изящна и проста, например, экспоненциальный бэкофф в клиентской библиотеке Theron выглядит примерно так:

let attemp = 0; 

const rescueChannel = channel.retryWhen(errs =>
  errs.switchMap(() => Observable.timer(Math.pow(2, attemp + 1) * 500).do(() => attemp++))
).do(() => attemp = 0);


Интеграция с базой данных


Как было сказано выше, Theron — это реактивное хранилище данных: система уведомлений об изменениях, которая беспрерывно транслирует обновления по защищенным каналам для вашего приложения, исходя из обычного SQL запроса к базе данных. Theron анализирует запрос к базе данных и отправляет артефакты данных, с помощью которых можно воссоздать исходные данные.

Theron интегрирован на данный момент с Postgres; интеграция с Mongo в процессе разработки.


Рассмотрим, как это работает на примере жизненного цикла простого списка, состоящего из первых трех элементов, упорядоченного в алфавитном порядке:

image

Перед тем как мы продолжим, подключите базу данных к Theron, введя данные для доступа к ней в панели управления:

image

Внутреннее устройство захвата (locking) базы данных — большая тема для отдельной статьи в будущем. Theron — распределенная система, поэтому пул подключений к базе данных ограничен 10-ю (с возможностью увеличения до 20-и) общими подключениями.

1. Создание новой подписки

Theron работает с SQL запросами, поэтому ваш сервер должен возвращать не результат выполнения запроса, а исходный SQL запрос. Например, в нашем случае JSON ответ сервера может выглядеть так:

{ "query": "SELECT * FROM todos ORDER BY name LIMIT 3" }


На стороне клиента начнем трансляцию данных для нашего SQL запроса, создав новую подписку:

import { Theron } from 'theron';

const theron = new Theron('https://therondb.com', { app: 'YOUR_APP_NAME' });

const subscription = theron.watch('/todos').subscribe(
  action => {
    console.log(action); // Инструкции Theron'а
  },

  err => {
    console.log(err);
  },

  () => {
    console.log('complete');
  }
);


Theron отправит GET запрос '/todos' вашему серверу, проверит валидность возвращенного SQL запроса и начнет трансляцию начальных инструкций с необходимыми данными, если данный запрос не был ранее скэширован на стороне клиента.

Инструкция TheronRowArtefact — это обычный JavaScript объект с самими данными `payload` и типом инструкции `type`. Основные типы инструкций:

  • ROW_ADDED — добавлен новый элемент.
  • ROW_REMOVED — элемент был удален.
  • ROW_MOVED — элемент был изменен.
  • ROW_CHANGED — элемент был изменен.
  • BEGIN_TRANSACTION — новый блок синхронизации.
  • COMMIT_TRANSACTION — синхронизация завершена успешно.
  • ROLLBACK_TRANSACTION — при синхронизации возникла ошибка.


Предположим, что в базе данных уже существует несколько элементов A, B, C. Тогда изменение состояния клиента можно представить следующем образом (слева — было, справа — стало):

Id Name Id Name
1 A
2 B
3 C


Инструкции Theron для данного состояния:

  1. { type: BEGIN_TRANSACTION }
  2. { type: ROW_ADDED, payload: { row: { id: 1, name: 'A' }, prevRowId: null } }
  3. { type: ROW_ADDED, payload: { row: { id: 2, name: 'B' }, prevRowId: 1 }
  4. { type: ROW_ADDED, payload: { row: { id: 3, name: 'C' }, prevRowId: 2 } }
  5. { type: BEGIN_TRANSACTION }


Каждый блок синхронизации начинается и заканчиваются инструкциями BEGIN_TRANSACTION и COMMIT_TRANSACTION. Для корректной сортировки элементов на стороне клиента Theron дополнительно отправляет данные о предыдущем элементе.

2. Пользователь переименовывает элемент A (1) в D (1)

Предположим, что пользователь переименовывает элемент A (1) в D (1). Так как SQL запрос упорядочивает элементы в алфавитном порядке, то произойдет сортировка элементов, и состояние клиента изменится следующим образом:

Id Name Id Name
1 A 2 B
2 B 3 C
3 C 1 D


Инструкции Theron для данного состояния:

  1. { type: BEGIN_TRANSACTION }
  2. { type: ROW_CHANGED, payload: { row: { id: 1, name: 'D' }, prevRowId: 3 } }
  3. { type: ROW_MOVED, payload: { row: { id: 1, name: 'D' }, prevRowId: 3 } }
  4. { type: ROW_MOVED, payload: { row: { id: 2, name: 'B' }, prevRowId: null } }
  5. { type: ROW_MOVED, payload: { row: { id: 3, name: 'C' }, prevRowId: 2 } }
  6. { type: COMMIT_TRANSACTION }


3. Пользователь создает новый элемент A (4)

Предположим, что пользователь создает новый элемент A (4). Так как наш SQL запрос ограничивает данные первыми тремя элементами, то на стороне клиента произойдет удаление элемента D (1), и состояние клиента изменится следующим образом:

Id Name Id Name
2 B 4 A
3 C 2 B
1 D 3 C
1 D


Инструкции Theron для данного состояния:

  • { type: BEGIN_TRANSACTION }
  • { type: ROW_ADDED, payload: { row: { id: 4, name: 'A' }, prevRowId: null } }
  • { type: ROW_MOVED, payload: { row: { id: 2, name: 'B' }, prevRowId: 4 } }
  • { type: ROW_MOVED, payload: { row: { id: 3, name: 'C' }, prevRowId: 2 } }
  • { type: ROW_REMOVED, payload: { row: { id: 1, name: 'D' }, prevRowId: 3 } }
  • { type: COMMIT_TRANSACTION }


4. Пользователь удаляет элемент D (1)

Предположим, что пользователь удаляет элемент D (1) из базы данных. Theron в этом случае не отправит новых инструкций, так как это изменение в базе данных не влияет на данные, возвращаемые нашим SQL запросом, и соответственно не влияет на состояние клиента:

Id Name Id Name
4 A 4 A
2 B 2 B
3 C 3 C


Обработка инструкций на стороне клиента

Теперь, зная как Theron работает с данными, мы можем реализовать логику по воссозданию данных на стороне клиента. Алгоритм довольно простой: мы будем использовать тип инструкции и метаданные предыдущего элемента для корректного позиционирования элементов в массиве. В реальном приложении нужно использовать, например, библиотеку Immutable.js для работы с массивами и оператор scan — пример.

import { ROW_ADDED, ROW_CHANGED, ROW_MOVED, ROW_REMOVED } from 'theron';

let todos = [];

const subscription = theron.watch('/todos').subscribe(
  action => {
    switch (action.type) {
      case ROW_ADDED:
        const index = nextIndexForRow(rows, action.prevRowId)
        if (index !== -1) {
          rows.splice(index, 0, action.row);
        }
        break;

      case ROW_CHANGED:
        const index = indexForRow(rows, action.row.id);
        if (index !== -1) {
          rows[index] = action.row;
        }
        break;

      case ROW_MOVED:
        const index = indexForRow(rows, action.row.id);
        if (index !== -1) {
          const row = list.splice(curPos, 1)[0];
          const newIndex = nextIndexForRow(rows, action.prevRowId);
          rows.splice(newIndex, 0, row);
        }
        break;

      case ROW_REMOVED:
        const index = indexForRow(rows, action.row.id);
        if (index !== -1) {
          list.splice(index, 1);
        }
        break;
     }
  },

  err => {
    console.log(err);
  }
);

function indexForRow(rows, rowId) {
  return rows.findIndex(row => row.id === rowId);
}

function nextIndexForRow(rows, prevRowId) {
  if (prevRowId === null) {
    return 0;
  }

  const index = indexForRow(rows, prevRowId);

  if (index === -1) {
    return rows.length;
  } else {
    return index + 1;
  }
}


Время примеров


Изучать иногда лучше, основываясь на готовых примерах: поэтому вот обещанное приложение, опубликованное под лицензией MIT — https://github.com/therondb/figure. Figure — это сервис для работы с HTML формами в статичных сайтах; cтек разработки — React, Redux/Saga, Node, TypeScript и, конечно, Theron. Например, мы используем Figure для формирования листа подписчиков нашего блога и сайта документации (https://github.com/therondb/therondb.com):

image

Заключение


Помимо исправления гипотетической тонны ошибок и классического написания клиентских библиотек под популярные платформы, мы работаем над выделением в независимый компонент обратного прокси-сервера и балансировщика. Идея заключается в том, чтобы можно было создавать на стороне сервера API, к которому можно обращаться как через обычные запросы HTTP, так и через постоянное подключение WebSocket. В следующей статье про архитектуру Theron я напишу про это более подробно.

Команда у нас небольшая, но энергичная, и мы любим экспериментировать. Theron находится в активной разработке: есть множество идей и моментов, которые нужно реализовать и улучшить. С удовольствием выслушаем любую критику, примем советы и конструктивно это обсудим.

© Habrahabr.ru