Горизонтальное масштабирование websocket-ов на Ruby

Не так давно вышла статья, в которой автор описывал свой framework для написания приложений с использованием Ruby, Sinatra и websoсket. Но в том решении не был затронут вопрос горизонтального масштабирования. Так при подключении к одному из узлов, пользователи могут получать уведомления/данные только о событиях/изменениях, вызванных пользователями этого же узла, а при изменениях, внесенных через другой, они не узнают. Для решения данной задачи необходимо организовать общую шину данных. Рассматривать данную задачу буду в контексте обмена сообщениями клиент-клиент.Шина данныхТребования, которые будем предъявлять к шине следующие: простота работа; передача в «реальном времени»; производительность. Организовать шину можно через хранилище с периодическим опросом, либо через сервер очередей.Первый вариант не удовлетворяет второму условию, т.к. задержка в передаче будет ровна периоду опроса хранилища. Уменьшение периода приведет к росту нагрузки на него. Поэтому этот вариант отметаем сразу.Второй вариант подходит лучше всего. В данном случае можно воспользоваться специализированными решениями на подобии RabbitMQ, ActiveMQ. Оба этих продукта представляют из себя серьезные решения, со множеством функций, хорошим масштабированием. Можно использовать и их, но нужно оценить, не будет ли это пушкой по воробьям. Кроме подобных решений функционал очередей предоставляет и Redis, в добавок получаем key-value хранилище, которое нам тоже понадобится.

Redis предоставляет простейший механизм Pub-Sub, которого достаточно для нашей задачи. Он достаточно быстр, прост в работе и имеет малые задержки при передаче.

Решение Наша система будет иметь следующую схему.5295e92b407d4c5e92d845721a812f13.png

Сообщения между пользователями одного узла передаются напрямую, а сообщения между узлами через шину.Для этого:

узел генерирует уникальное имя; подписывается по нему на сообщения в Redis; все клиенты подключенные к этому узлу записывают пару ключ-значение в виде идентификатора клиента и идентификатора узла, к которому он подключен; при отправке сообщения другому клиенту, узнаем имя узла и передаем сообщения в его очередь для обработки. А теперь реализуем В качестве библиотеки для websocket выбран faye-websocket-ruby. Для работы с Redis стандартный гем redis (hiredis) + код примера для PubSub через EventMachine, так как реализация из гема работает в блокирующем режиме, а при работе в одном потоке с web-сервером это не допустимо. module App class << self def configuration yield(config) if block_given? config.sessions = Metriks.counter('total_sessions') config.active = Metriks.counter('active_sessions') end def config @config ||= OpenStruct.new( redis: nil, root: nil ) end def id @instance_id ||= SecureRandom.hex end def logger @logger ||= Logger.new $stderr end

def register config.redis.multi do config.redis.set «node_#{App.id}», true config.redis.expire «node_#{App.id}», 60×10 end if config.redis

EM.next_tick do config.sub = PubSub.connect config.sub.subscribe App.id do |type, channel, message| case type when 'message' begin json = Oj.load (message, mode: : compat) WS: Base.remote_messsage json rescue => ex App.logger.error «ERROR: #{message.class} #{message} #{ex.to_s}» end else App.logger.debug »(#{type}) #{channel}:: #{message}» end end @pingpong = EM.add_periodic_timer (30) do App.config.redis.expire «node_#{App.id}», 60 end end rescue config.redis = nil end end end Основная работа этого модуля заключается в методе register, который регистрирует себя на шине и ожидает входящие сообщения. Для мониторинга создается ключ вида node_%node_id% c TTL в 60 секунд и периодом обновления 30 секунд, на случай если узел отвалится. Таким образом можно всегда узнать сколько узлов сейчас находится в сети и их имена. module WS class Base NEXT_RACK = [404, {}, []].freeze def self.call (*args) instance.call (*args) end def self.instance @instance ||= self.new end def self.remote_messsage (json) user = User.get json['from'] instance.send: process, user, json if user rescue => ex user.error ({ error: ex.to_s }) end def initialize @ws_cache = {} end def call (env) return NEXT_RACK unless Faye: WebSocket.websocket?(env) ws = Faye: WebSocket.new (env, ['xmpp'], ping: 5) user = User.register (ws) ws.onmessage = lambda do |event| json = Oj.load (event.data, mode: : compat) process (user, json) end ws.onclose = lambda do |event| App.logger.info [: close, event.code, event.reason] user.unregister user = nil end ws.rack_response rescue WS: User: NotUnique => ex ws.send Oj.dump ({ action: : error, data: { error: 'not unique session' } }) ws.close ws.rack_response end private

def process (user, json) action = json['action'].to_s data = json['data'] return App.logger.info ([: message, 'Empty action']) if action.empty? return App.logger.info ([: message, «Unknown action #{json['action']}»]) unless user.respond_to? «on_#{action}» user.send «on_#{action}», data rescue => ex user.error ({ error: ex.to_s }) puts ex.to_s puts ex.backtrace end end end Данный класс отвечает за установление соединения и обработку сообщений. В методе call создается новый клиент и вешаются обработчики. Метод класса remote_messsage используется для приема внешних сообщений (из шины). Метод process — единая точка для сообщений пришедших напрямую от клиента и для сообщений пришедших по шине.Клиенты module WS class User include UserBehavior attr_reader: id class Error < StandardError; end class RoomFull < Error; end class NotFound < Error attr_reader :id def initialize(id); @id = id end def to_s; "User '@#{id}' not found" end end class NotUnique < Error; end

class << self def cache @ws_cache ||= {} end

def get (id) fail NotFound.new (id) if id.to_s.empty? @ws_cache.fetch (id) rescue KeyError WS: RemoteUser.new (id) end

def register (ws) self.new (ws) end

def unregister (ws) url = URI.parse (ws.url) id = url.path.split ('/').last get (id).unregister end end

def initialize (ws) @ws = ws register

@pingpong = EM.add_periodic_timer (5) do @ws.ping ('') do App.config.redis.expire @id, 15 if App.config.redis end end end

def unregister on_close if respond_to? : on_close App.config.active.decrement App.config.redis.del @id if App.config.redis User.cache.delete (@id) @pingpong.cancel @pingpong = nil @ws = nil @id = nil end

def send_client (from, action, data) return unless @ws data = Oj.dump ({ from: from.id, action: action.to_s, data: data }, mode: : compat) @ws.send (data) end

private def register url = URI.parse (@ws.url) @id = url.path.split ('/').last if App.config.redis App.config.redis.multi do App.config.redis.set @id, App.id App.config.redis.expire @id, 15 end App.config.sessions.increment App.config.active.increment end User.cache[@id] = self App.logger.info [: open, @ws.url, @ws.version, @ws.protocol] on_register if respond_to? : on_close self end end

class RemoteUser include UserBehavior attr_reader: id attr_reader: node def initialize (id) @id = id.to_s fail WS: User: NotFound.new (id) if @id.empty? @node = App.config.redis.get (@id).to_s fail WS: User: NotFound.new (id) if @node.empty? end def send_client (from, action, data) return if node.to_s.empty? App.logger.info ['REMOTE', self.id, from.id, action] data = Oj.dump ({ from: from.id, action: action.to_s, data: data }, mode: : compat) App.config.redis.publish node, data end end end Метод register регистрирует пользователя в хранилище, сопоставляя его ID с ID узла куда он подключен и кэширует его в локальном списке. Метод unregister напротив убирает все записи о клиенте и удаляет таймер. Таймер используется для периодической проверки состояние клиента и обновления TTL для его записи, чтобы в Redis не было мертвых душ.ID клиента получается из URL по которому был запрос на подключение. Он имеет формат ws://%hostname%/ws/%user_id% где user_id случайно сгенерированная уникальаня последовательность.Метод send_client отправляет данные уже самому клиенту.

Отдельное место занимает метод класса get. Данный метод возвращает по ID экземпляр класса WS: User либо если пользователь не найден в локальном кэше создает экземпляр класса WS: RemoteUser. При его создании проверяется есть ли такой ID в хранилище и какому узлу он принадлежит. Если ID не найдет кидается исключение.

Класс WS: RemoteUser в отличии от WS: User имеет только один метод send_client, который пересылает сформированные сообщения через шину на требуемый узел.

Таким образом, неважно где находится клиент вызов метода send_client доставит данные до адресата.

module UserBehavior module ClassMethods def register_action (action, params = {}) return App.logger.info ['register_action', «Method #{action} already defined»] if respond_to? action

block = lambda do |*args | if block_given? data, from = yield (self, *args) send_client from || self, action, data else send_client self, action, args.first end end

define_method action, &block define_method «on_#{action}» do |data| self.send action, data end if params[: passthrough]

end end

def self.included (base) base.instance_exec do extend ClassMethods register_action: message do |user, from, text| [{ to: user.id, text: text }, from] end

register_action: error, passthrough: true end end

def on_message (data) App.logger.info ['MESSAGE', id, data.to_s]

to_user_id = data['to'] to_user = WS: User.get (to_user_id) to_user.message self, data['text']

rescue WS: User: NotFound => ex error ({ error: ex.to_s }) end end Обработка самих событий вынесена в отдельный модуль UserBehavior, который расширяет предыдущие два класса методами для реакции на сообщения. Каждое сообщение имеет поля FROM, ACTION и DATA. Первое идентифицирует от кого пришло, второе определяет метод, а третья сопутствующие данные. Так для ACTION со значением «message» будет вызван метод on_message, в который будет передано значение поля DATA.Используя такой подход получилось реализовать прозрачную передачу сообщений между подключенными клиентами, при этом не важно находятся они на одном узле или на разных. Для тестирования запускал несколько экземпляров на разных портах, сообщения корректно отправлялись и получались.

Для желающих попробовать, код рабочего приложения выложил на github. Запускается просто, через rackup

PS Данное решения не является законченным, думаю есть куда его улучшить и убрать лишнее, но как отправная точка вполне сгодится.

© Habrahabr.ru