Библиотека для синхронизации состояния

59ec7c2e98f91240184168.jpeg


Так случилось, что на одном проекте потребовалось реформировать способ обмена данными между различными процессами. Исторически сложившаяся схема была довольно неприглядна. Один процесс периодически перезаписывал свои текущие настройки в виде XML-файла. Второй вычитывал этот файл раз в секунду, проверяя, что в нём поменялось с прошлого раза. Изменения файла вычислялись через множество сравнений текущего и прошлого его состояний, порождая некоторую цепочку действий. Читающий процесс писал в свою очередь другой XML-файл, который читался третьим процессом и т.п. Самое печальное то, что данная схема требовала громоздкого, из раза в раз повторяющегося кода сравнений, который наслаивался при добавлении новых данных.


Была предложена идея замены всего этого зоопарка XML-файлов на систему обмена сообщениями, поддерживающую pub/sub. Активно рассматривались три кандидатуры: NATS, Redis и ZeroMQ. Поскольку планировалось обмениваться не только метаданными, но и большим объёмом бинарных данных в реальном времени, во краю угла стала максимальная пропускная способность. По этой причине пришлось отсеять первые два кандидата, несмотря на их более высокоуровневый и удобный broker-based API (тесты показали, что NATS даёт фору Redis, но где-то на 20% проигрывает ZeroMQ).


Далее стал вопрос о способе синхронизации состояния между процессами. Логичной показалась следующая схема:


  1. Клиенты после подключения к серверу вычитывают его полное состояние.
  2. Далее при изменении состояния сервер публикует патчи (изменения), на которые подписаны клиенты.
  3. При получении патча клиент вызывает обработчики, соответствующие изменениям (событиям) в патче, а затем накладывает его на предыдущее состояние сервера.


В эту схему прекрасно укладывалось использование JSON Patch, что позволило не изобретать велосипед для генерации и наложения патчей. Таким образом, библиотека JSON, имеющая встроенную поддержку JSON Patch, стала идеальной основой для нашей библиотеки для синхронизации состояния.


Итак, после пары недель работы была написана небольшая библиотека, включавшая в себя следующие коммуникационные примитивы:


  1. Publisher — простая обёртка над PUB-сокетом.
  2. Subscriber — обёртка над SUB-сокетом, позволяющая асинхронно обрабатывать нотификации в выделенном потоке.
  3. Requester — обёртка над REQ-советом, позволяющая асинхронно отправить запрос и обработать ответ в выделенном потоке.
  4. Replier — обёртка над REP-сокетом, позволяющая обрабатывать входящие запросы в выделенном потоке.


На основе этих примитивов были реализованы Client и Server,  позволяющие синхронизировать состояние, а также назначать callbacks на конкретные его изменения.


Пример кода и его вывод
#include 
#include 
#include 
#include 

#include "syncer.h"

using namespace nlohmann;
using namespace std;
using namespace std::chrono;
using namespace syncer;

struct Site {
  int temperature;
  int pressure;
};

static inline void to_json(json& j, const Site& s) {
  j = json();
  j["temperature"] = s.temperature;
  j["pressure"] = s.pressure;
}

static inline void from_json(const json& j, Site& s) {
  s.temperature = j.at("temperature").get();
  s.pressure = j.at("pressure").get();
}

struct State {
  map sites;
  string forecast;
};

static inline void to_json(json& j, const State& s) {
  j = json();
  j["sites"] = s.sites;
  j["forecast"] = s.forecast;
}

static inline void from_json(const json& j, State& s) {
  s.sites = j.at("sites").get>();
  s.forecast = j.at("forecast").get();
}

PatchOpRouter CreateRouter() {
  PatchOpRouter router;

  router.AddCallback(R"(/sites/(\w+)/temperature)", PATCH_OP_ANY,
    [] (const State& old, const smatch& m, PatchOp op, int t) {
      cout << "Temperature in " << m[1].str() << " has changed: "
           << old.sites.at(m[1].str()).temperature << " -> " << t << endl;
    });

  router.AddCallback(R"(/sites/(\w+)$)", PATCH_OP_ADD,
    [] (const State&, const smatch& m, PatchOp op, const Site& s) {
      cout << "Site added: " << m[1].str()
           << " (temperature: " << s.temperature
           << ", pressure: " << s.pressure << ")" << endl;
    });

  router.AddCallback(R"(/sites/(\w+)$)", PATCH_OP_REMOVE,
    [] (const State&, const smatch& m, PatchOp op, const Site&) {
      cout << "Site removed: " << m[1].str() << endl;
    });

  return router;
}

int main() {
  State state;
  state.sites["forest"] = { 51, 29 };
  state.sites["lake"] = { 49, 31 };
  state.forecast = "cloudy and rainy";
  Server server("tcp://*:5000", "tcp://*:5001", state);

  Client client("tcp://localhost:5000",
                       "tcp://localhost:5001",
                       CreateRouter());

  this_thread::sleep_for(milliseconds(100));

  cout << "Forecast: " << client.data().forecast << endl;

  state.sites.erase("lake");
  state.sites["forest"] = { 50, 28 };
  state.sites["desert"] = { 55, 30 };
  state.forecast = "cloudy and rainy";
  server.Update(state);

  this_thread::sleep_for(milliseconds(100));

  return 0;
}

В результате выполнения этого кода будет:


Site added: forest (temperature: 51, pressure: 29)
Site added: lake (temperature: 49, pressure: 31)
Forecast: cloudy and rainy
Temperature in forest has changed: 51 → 50
Site removed: lake
Site added: desert (temperature: 55, pressure: 30)


Конечно, выбранный подход далёк от оптимальности с точки зрения производительности, поскольку щедро выделяет потоки под индивидуальные сокеты, вместо использования Epoll. Потому он будет плохо подходить для систем, требующих большого числа одновременных соединений. Будем надеяться, что для большинства случаев это некритично.


Итак появилась возможность сильно упростить большую часть межпроцессной коммуникации. Это будет не так просто сделать для legacy-кода, поскольку ручные проверки изменений сильно перемешаны с остальной функциональностью, а потому придётся резать «по-живому». С другой стороны, реализовывать синхронизацию для нового кода стало одним удовольствием.

© Habrahabr.ru