Введение в систему обмена сообщениями ZeroMQ

Николя Пиёль (Nicolas Piël) опубликовал введение в технологию ZeroMQ (0MQ), позволяющую организовать быстрый асинхронный обмен сообщениями между высоконагруженными приложениями и интересную тем, что сетевое взаимодействие организовано через новый уровень сетевого стека, который может использовать в качестве транспорта TCP, PGM, IPC и т.п. API библиотеки напоминает обычные сокеты, поддерживается передача сообщений в направлениях точка-точка, издатель-подписчики, запрос-ответ, возможна параллельная рассылка. Система очень быстрая, тесты производительности показали способность обработать на обычном сервере более 8 млн. сообщений в секунду.

Ниже представлен перевод вводной статьи по ZeroMQ:

ZeroMQ - это библиотека обмена сообщениями (Messaging Queue, MQ), которая без особых усилий позволяет создавать сложные коммуникационные решения. Сначала эта программная компонента разрабатывалась как интерфейс для обмена сообщениями (messaging middleware), затем - как легкий коммуникационный протокол, основанный на TCP/IP, а в настоящее время ZeroMQ позиционируется как новая компонента в стеке сетевых протоколов.

Было не просто разобраться в ZeroMQ, даже на основании попытки сравнения MQ-систем, проделанной в компании Linder Research и, прежде всего потому, что ZeroMQ не является полноценной системой как, например RabbitMQ или ActiveMQ: полнофункциональная система, после развертывания и настройки - работает, и можно увидеть ее достоинства и недостатки. ZeroMQ - всего лишь достаточно простой программный интерфейс, позволяющий создать свою собственную MQ-систему.

Почему же нужно использовать ZeroMQ, а не просто стандартный интерфейс Berkeley-сокетов? Ответ, скорее всего, в компромиссе между сложностью реализации и высокой производительностью. Как правило, прикладная система, когда используется по назначению, работает эффективно, но любая попытка добавления функциональности или универсализации использования путем модификации базовых элементов системы приводит к ухудшению производительности. Это справедливо не только для MQ-систем. Поэтому в настоящее время принято использовать небольшие фреймворки, как это используется в web-технологиях.

Кажется, что в ZeroMQ успешно реализован компромисс между функциональностью и эффективностью и ниже перечисляются основные возможности этой библиотеки:

  • Производительность. ZeroMQ действительно работает существенно быстрее чем большинство реализаций AMQP, и это достигается:

  • Простота использования. С помощью API ZeroMQ передача сообщения действительно проще, чем при использовании сокетов, где вам нужно, например, следить за длиной сокетного буфера, а в ZeroMQ - просто инициировать отправку сообщения, а дробление (или агрегация) и отправка делается API в отдельном потоке, асинхронно с выполнением пользовательского кода. Асинхронная природа методов ZeroMQ особенно удобна для реализации механизмов событийной обработки. Немаловажным удобством в ZeroMQ является отказ от типизации сообщений передаваемых интерфейсом - сообщения никак не интерпретируются интерфейсом и являются BLOB (областью памяти). Таким образом, через ZeroMQ можно передавать что угодно, например сообщения JSON или двоичные форматированные данные типа BSON, Protocol Buffers или Thrift, не чувствуя при этом никаких неудобств.

  • Масштабируемость. Являясь низкоуровневым интерфейсом, ZeroMQ, тем не менее, предоставляет множество опций, например сокет ZeroMQ может быть подключенным к нескольким адресатам и равномерно распределять нагрузку по сети. Другая возможность - это входное мультиплексирование, когда один сокет может получать сообщения от множества отправителей. В ZeroMQ реализована децентрализованная схема обмена сообщениями. Это, в комбинации с высокой производительностью, дает возможность построения распределенных систем любой сложности.

Пример использования ZeroMQ

Приведем пример кода, использующего интерфейс ZeroMQ. В примере используется библиотека Python PyZMQ для ZeroMQ, разработанная Брайаном Грэнджером (Brian Granger).

Первый шаг - выбор транспорта из четырех вариантов, предоставляемых ZeroMQ:

  • INPROC - передача сообщений внутри процесса, между потоками.

  • IPC - передача сообщений между процессами.

  • MULTICAST - широковещательная сетевая передача сообщений с гарантированной доставкой, реализованная посредством энкапсуляции прикладных данных непосредственно в IP-пакет (pgm) или с использованием стандартного UDP-протокола (epgm).

  • TCP - стандартная однонаправленная сетевая передача данных с использованием TCP-протокола.

Второй шаг - разработка инфраструктуры распределенной системы обмена сообщениями. Это, фактически ответ на вопрос "что с чем связано". Ответ на вопрос "как связываются компоненты системы между собой" получен в предыдущем пункте. Компоненты инфраструктуры, которые всегда подключены к сети и выполняют роль серверов, должны выполнить BIND для выбранных типов соединений, а те компоненты, которые будут подключаться к этим серверам как клиенты, выполняют CONNECT, в котором также указывается тип соединения.

Для компонентов распределенной системы, которые подключаются динамически, в ZeroMQ предусмотрен другой набор примитивов:

  • QUEUE - механизм интерактивного взаимодействия, типа "запрос-ответ"

  • FORWARDER - механизм "подписки/публикации" сообщений (publish/subscribe)

  • STREAMER - механизм потокового обмена сообщениями

Для перечисленных методов ZeroMQ, и клиент, и сервер распределенной системы обмена сообщениями подключаются к агенту, у которого открыта пара сокетов на каждое соединение.

Третий шаг - разработка механизма обмена сообщениями. В ZeroMQ предусмотрены следующие типы обмена:

  • REQUEST/REPLY - двусторонняя связь между программами-абонентами распределенной MQ-системы: одна программа-клиент может взаимодействовать с одной или несколькими программами-серверами. Каждое отправленное сообщение предусматривает уведомление о доставке. Уведомление о доставке однозначно идентифицирует получателя сообщения.

  • PUBLISH/SUBSCRIBE - опубликовать сообщение для множества подписчиков. От предыдущего метода отличается тем, что программа-отправитель не получает уведомлений о получении сообщений программами-подписчиками. В данном методе, однако, тоже имеется механизм регулирования: определяется некоторое, пороговое количество сообщений, которое может оставаться в очереди не полученным подписчиком и при попытке опубликовать очередное сообщение, система сбрасывает попытку с соответствующим уведомлением отправителя.

  • UPSTREAM/DOWNSTREAM или Pipeline - этот метод используется для иерархической рассылки сообщений, DOWNSTREAM используется для рассылки вниз по иерархии, а UPSTREAM - наоборот. Программа-отправитель не получает уведомлений о доставке и также, как и в случае PUBLISH/SUBSCRIBE имеется ограничение на количество не полученных сообщений.

  • PAIR - взаимодействие только между клиентом и сервером. Данный тип взаимодействия не предполагает маршрутизации сообщений и не содержит уведомлений о доставке

Рассмотрим эти типы взаимодействия практически

RequestReply. Такой тип взаимодействия программ в сети является типичным, например так работают протоколы HTTP, POP, или IMAP - когда за запросом следует ответ. В ZeroMQ для такого обмена сообщениями клиентская программа использует сокет типа REQ. Серверная программа использует сокет REP. ZeroMQ позволяет через один сокет взаимодействовать с любым количеством парных программ.

Фрагмент кода на Python, который "слушает" TCP-порт 5000 и возвращает отправителю полученные сообщения:

    import zmq  context = zmq.Context()  socket = context.socket(zmq.REP)  socket.bind("tcp://127.0.0.1:5000")     while True:      msg = socket.recv()      print "Got", msg      socket.send(msg)  

Заметим, что этот код будет работать корректно с любым количеством клиентов - ZeroMQ самостоятельно разбирается на какой запрос кому отвечать. Клиентская программа может выглядеть так:

    import zmq  context = zmq.Context()  socket = context.socket(zmq.REQ)  socket.connect("tcp://127.0.0.1:5000")  socket.connect("tcp://127.0.0.1:6000")     for i in range(10):      msg = "msg %s" % i      socket.send(msg)      print "Sending", msg      msg_in = socket.recv()  

В примере сделано, что клиент работает с двумя серверами (TCP-порты 5000 и 6000), на которые отправляются сообщения равномерно, по одному открытому сокету REQ, вывод сообщений этой клиентской программы будет такой:

    Sending msg 0  Sending msg 1  Sending msg 2  Sending msg 3  Sending msg 4  Sending msg 5  Sending msg 6  Sending msg 7  Sending msg 8  Sending msg 9  

в то время как сервер, "слушающий" соединение по TCP-порту 5000 напечатает:

    Got msg 0  Got msg 2  Got msg 4  Got msg 6  Got msg 8  

а вывод консоли второго сервера, на порту 6000 будет

    Got msg 1  Got msg 3  Got msg 5  Got msg 7  Got msg 9  

Метод Publish subscribe стал популярным относительно недавно, его наиболее часто используют в механизмах рассылки сообщений, например XMPP или webhooks. Рассылка и получение сообщения не связаны напрямую или говоря по-другому, когда программе нужно отправить сообщение не заботясь, получат ли его - в не компьютерной тематике наиболее близко работе радиостанции: отправленное сообщение доходит только до слушателей, настроенных на волну вашей радиостанции или, возвращаясь к компьютерной тематике - программы, подписавшиеся (subscribed) на сообщения такого типа.

Важно отметить, что этот метод обмена сообщениями не связан с инфраструктурой: отправляемое на закодированный порт сообщение получат только те, кто предварительно подписался. Так же возможен способ подписки на сообщения нескольких публикующих серверов. Первый пример идентичен выше упомянутому радиовещанию (получит любой подписавшийся), тогда как второй пример можно сравнить с группой людей, которым вы что-то кричите в мегафон. Понятно, что в обоих случаях получение информации не гарантируется: услышит только тот, кто хочет услышать.

Следующий фрагмент кода показывает как создать сервер рассылки, скажем, футбольных новостей:

    import zmq  from random import choice  context = zmq.Context()  socket = context.socket(zmq.PUB)  socket.bind("tcp://127.0.0.1:5000")     countries = ['netherlands','brazil','germany','portugal']  events = ['yellow card', 'red card', 'goal', 'corner', 'foul']     while True:      msg = choice( countries ) +" "+ choice( events )      print ' ->' ,msg      socket.send( msg )  

Сервер постоянно будет "вещать" в сокет типа PUB случайными событиями:

    -> portugal corner  -> portugal yellow card  -> portugal goal  -> netherlands yellow card  -> germany yellow card  -> brazil yellow card  -> portugal goal  -> germany corner  …  

Клиент-подписчик (через сокет SUB) будет получать только "интересующую его" информацию:

    import zmq  context = zmq.Context()  socket = context.socket(zmq.SUB)  socket.connect("tcp://127.0.0.1:5000")  socket.setsockopt(zmq.SUBSCRIBE, "netherlands")  socket.setsockopt(zmq.SUBSCRIBE, "germany")     while True:      print  socket.recv()  
Вывод полученного таким клиентом-подписчиком будет таким:
    netherlands red card  netherlands goal  netherlands red card  germany foul  netherlands yellow card  germany foul  netherlands goal  netherlands corner  germany foul  netherlands corner  …  

Метод Pipeline похож на RequestReply, что вместо полудуплексного способа передачи данных (каждый запрос сопровождается ответом), организуется два независимых потока сообщений - от клиента к серверу и - наоборот (UPSTREAM и DOWNSTREAM). Данный способ передачи сообщений может быть удобным для, например, потоковой обработки: данные от клиента передаются серверу, а сервер пересылает результат дальше по цепочке. Как и в описанных ранее методах возможна организация несколько потоков данных через один сокет.

Метод Paired Sockets похож на Berkeley-сокеты: каждому отправителю соответствует один получатель и наоборот - установленное логическое соединение симметрично. Данный метод наименее применим в системах рассылки сообщений и разработан, скорее всего для полноты интерфейса. Пару программ взаимодействующую таким образом можно представить очень просто:

Клиент

    import zmq  context = zmq.Context()  socket = context.socket(zmq.PAIR)  socket.bind("tcp://127.0.0.1:5555")  

Сервер

    import zmq  context = zmq.Context()  socket = context.socket(zmq.PAIR)  socket.connect("tcp://127.0.0.1:5555")  

Дальнейшее развитие ZeroMQ

В данной статье дано лишь краткое описание возможностей ZeroMQ, однако автор надеется, что он донес до читателя идею этой библиотеки, а также то, что что описанный интерфейс возможно станет новым сетевым уровнем передачи сообщений. По меньшей мере, ZeroMQ решает много вопросов масштабируемости и портируемости кода, и только этим может быть полезен для применения.

К ZeroMQ реализовано 15 языковых интерфейсов (Ada, C, C++, Common Lisp, Erlang, Go, Haskell, Java, Lua, .NET, OOC, Perl, PHP, Python и Ruby) и, таким образом, он может служить универсальной шиной в неоднородной распределенной прикладной системе. Наверняка можно придумать и другие применения для ZeroMQ.

Полный текст статьи читайте на OpenNet