Лепим микросервис

Подкинули задачу сделать микросервис, который получает данные от RabbitMQ, обрабатывает, и отправляет данные дальше по этапу в RabbitMQ. После отправки задания, я посмотрел на то что поучилось. Оказалось, что этот набор компонентов можно использовать для быстрого прототипирования pipeline архитектуры


Используемые компоненты:


  • REACT-CPP;
  • AMQP-CPP;
  • RapidJSON;
  • LevelDB;
  • Easylogging++.

Для примера буду делать микросервис для выдачи рейтинга игроков. От ядра системы в микросервис приходят следующие сообщения:


  • player_registered (id, name);
  • player_renamed (id, name);
  • player_won (id, points).

Сервис раз в минуту должен отсылать сообщение с содержимым рейтинга.Рейтинг сортируется по набранным очкам за календарную неделю.


REACT-CPP


REACT-CPP — это обертка над libev на C++11. Эта библиотека нужна для организации цикла обработка событий (event loop).
Т.к. кроме работы с сокетом потребуются таймеры и обработчики unix сигналов.


class Application
{
public:

    Application();
    ~Application();

    using IntervalWatcherPtr = std::shared_ptr;

    void run();
    void shutdown();
    //...

private:

    bool onMinute();
    //...    

private:

    React::MainLoop m_loop;
    IntervalWatcherPtr m_minuteTimer;
    //...
};

void Application::run()
{
    m_minuteTimer = m_loop.onInterval(5.0, 60.0, std::bind(&Application::onMinute, this));

    m_loop.onSignal(SIGTERM, [this]() -> bool
    {
        shutdown();
        return false;
    });

    m_loop.onSignal(SIGUSR1, [this]()->bool{
        cleanRating();
        return true;
    });

    //...
    m_loop.run();
}

bool Application::onMinute()
{
    calculateRating();
    sendRating();
    return true;
}

Тут создаю таймер который стартует через 5 секунд и который будет вызывать обработчик каждые 60 секунд.
Любой приличный демон/сервис должен иметь обработчик SIGTERM, что бы из вне попросить его корректно завершится.
Что касается обработчика SIGUSR1 тут можно самостоятельно вычислять начало/конец недели через Boost.Date_Time, но мне тупо лень, когда в GNU/Linux есть cron+pkill.


AMQP-CPP


С тех пор как опубликовал RabbitMQ tutorials на C++ AMQP-CPP обзавелась реализацией обработчика на libev и libuv.


Подключение и обработка сообщения:


void Application::createChannel(AMQP::TcpConnection &connection)
{
    m_channel = std::make_unique(&connection);

    m_channel->declareQueue(m_cfg.source().name, AMQP::durable)
        .onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount)
                   {
                       LOG(INFO) << "Declared queue "
                                 << name
                                 << ", message count: "
                                 << messagecount;

                       m_channel->consume(m_cfg.source().name)
                           .onReceived([&](const AMQP::Message &message,
                                           uint64_t deliveryTag,
                                           bool redelivered)
                                       {
                                           onMessage(message, deliveryTag, redelivered);
                                       })
                           .onError([](const char *message)
                                    {
                                        LOG(ERROR) << "Error consume:" << message;
                                        APP->shutdown();
                                    });
                   })
        .onError([&](const char *message)
                 {
                     LOG(ERROR) << "Error declare queue:" << message;
                     shutdown();
                 });
}

void Application::onMessage(const AMQP::Message &message,
                            uint64_t deliveryTag,
                            bool redelivered)
{
    parseMessage(message);
    m_channel->ack(deliveryTag);
}

Публикация сообщения:


AMQP::Envelope env(s.GetString());

m_channel->publish("", m_cfg.destination().name, env);

LevelDB


Может потребоваться локальное хранилище данных. Взял LelevDB, я о нем писал в Использование LevelDB. Сделал лишь небольшую RAII обертку:


Код обертки
class DataBase
{
public:

    DataBase();

    bool open(const std::string &path2base, bool compression = true);

    bool put(const std::string &key, const ByteArray &value, bool sync = false);
    ByteArray get(const std::string &key);

    Snapshot snapshot();

    Iterator iterator();

private:

    std::shared_ptr m_backend;
};

class Snapshot
{
public:

    Snapshot();

    ~Snapshot();

    ByteArray get(const std::string &key);

    Iterator iterator();

private:

    Snapshot(const std::weak_ptr &backend, const leveldb::Snapshot *snapshot);

private:

    friend class DataBase;

    std::weak_ptr m_backend;
    const leveldb::Snapshot *m_shapshot;
};

class Iterator
{
public:

    Iterator(std::unique_ptr rawIterator);
    Iterator(Iterator &&iter);

    /*!
     * Create empty iterator
     */
    Iterator() = default;

    ~Iterator();

    bool isValid() const noexcept;

    void next();

    void prev();

    std::string key();
    ByteArray value();

    /*!
     * Seek to first
     */
    void toFirst();

    /*!
     * Seek to last
     */
    void toLast();

    Iterator(const Iterator &) = delete;
    Iterator &operator=(const Iterator &) = delete;

private:

    std::unique_ptr m_iterator;
};

LevelDB используется для сохранения/востановления состояния.


void Application::loadFromLocalStorage()
{
    auto snapshot = m_localStorage->snapshot();
    auto iter = snapshot.iterator();
    iter.toFirst();
    while (iter.isValid()) {
        auto player = new Player(iter.value());
        m_id2player[player->id] = player;
        m_players.push_back(player);
        iter.next();
    }
}

void Application::updatePlayerInBD(const Player *player)
{
    if (!m_localStorage->put(std::to_string(player->id), player->serialize())) {
        LOG(ERROR) << "[" << player->id << ", "
                   << player->name
                   << "] is not updated in the database";
    }
}

Логика сервиса


Данные приходят в формате JSON.


Разбирает json используя RapidJSON, ищу подходящий метод, вызываю нужный обработчик:


void Application::parseMessage(const AMQP::Message &message)
{
    /*
     * Схемка имеет вид
     * {
     *   "method":"player_registered",
     *   "params":{
     *   ...
     *   }
     * }
     */
    rapidjson::Document doc;
    doc.Parse(message.body(), message.bodySize());

    const std::string method = doc["method"].GetString();
    auto iter = m_handlers.find(method);
    if (iter != m_handlers.end()) {
        iter->second(*this, doc["params"]);
    }
    else {
        LOG(WARNING) << "Unknown method:" << method;
    }
}

Сами методы простые:


void Application::onPlayerRegistered(const JValue ¶ms)
{
    auto obj = params.GetObject();
    const uint64_t playerId = obj["id"].GetUint64();
    if (!isRegistred(playerId)) {
        auto player = new Player;
        player->id = playerId;
        player->name = obj["name"].GetString();
        m_players.push_back(player);
        m_id2player[playerId] = player;
        updatePlayerInBD(player);
    }
}

void Application::onPlayerRenamed(const JValue ¶ms)
{
    auto obj = params.GetObject();
    const uint64_t playerId = obj["id"].GetUint64();
    if (isRegistred(playerId)) {
        auto player = m_id2player[playerId];
        player->name = obj["name"].GetString();
        updatePlayerInBD(player);
    }
    else {
        LOG(WARNING) << "Renaming an unknown user[" << playerId << "]";
    }
}

void Application::onPlayerWon(const JValue ¶ms)
{
    auto obj = params.GetObject();
    const uint64_t playerId = obj["id"].GetUint64();
    if (isRegistred(playerId)) {
        auto player = m_id2player[playerId];
        player->points += obj["points"].GetInt64();
        updatePlayerInBD(player);
    }
    else {
        LOG(WARNING) << "Unknown player[" << playerId << "]";
    }
}

Раз в минуту сортируем игроков и отправляем рейтинг:


bool Application::onMinute()
{
    calculateRating();
    sendRating();
    return true;
}

void Application::calculateRating()
{
    std::sort(m_players.begin(), m_players.end(), [](const Player *a, const Player *b)
    {
        return a->points > b->points;
    });
}

void Application::sendRating()
{
    using namespace rapidjson;

    StringBuffer s;
    Writer writer(s);
    writer.StartArray();

    const size_t count = std::min(m_players.size(), size_t(10));
    for (size_t i = 0;
         i < count;
         ++i) {
        writer.StartObject();

        writer.Key("id");
        writer.Uint64(m_players[i]->id);

        writer.Key("name");
        writer.String(m_players[i]->name.c_str());

        writer.Key("points");
        writer.Int64(m_players[i]->points);

        writer.EndObject();
    }

    writer.EndArray();
    AMQP::Envelope env(s.GetString());

    m_channel->publish("", m_cfg.destination().name, env);
}

Весь код доступен на GitHub’e. Исходники библиотек поставляются вместе с сервисом и собираются автоматически на GNU/Linux с gcc.


Подведем итоги, что имеем:


  • event loop с таймерами, обработчиками сигналов и всеми остальными плюшками libev;
  • работа с RabbitMQ;
  • встроенное key-value хранилище;
  • поддержка json.

Комментарии (0)

© Habrahabr.ru