[Из песочницы] Готовим многопоточность с core.async
Всё больше набирает популярность паттерн использования каналов при создании
многопоточных приложений. Идея не нова, ее дизайн заложен ещё в далёком 1978 году
в виде CSP.Наиболее известная реализация сейчас повсеместно используется в Golang.
Мы же в статье рассмотрим реализацию CSP в core.async для Clojure, если интересно, добро пожаловать под кат.
В статье будут рассмотрены простые и базовые практики для работы с core.async, описанного в статье будет достаточно для хорошего старта в многопоточное программирование.
В отличии от Golang где парадигма работы с потоками через каналы встроена в сам язык, core.async является просто библиотекой для Clojure, если вам импонирует другая парадигма, то выбор есть: pulsar, promesa, manifold
При этом core.async и promesa можно также использовать на стороне браузера в ClojureScript, естественно в этом случае ни о какой многопоточности говорить не приходится, так как все это добро компилируется в ES5 и исполняется в браузере, но знакомый интерфейс и удобная работа с асинхронностью может хорошо послужить.
Так что же нам дает core.async? Если объяснять на пальцах то core.async нам предоставляет диспетчеризацию через go-блоки в свой фиксированный Thread Pool состоящий из 8 тредов (размер Thread Pool можно менять через специальную опцию). При поступлении сообщения в канал, core.async сам найдет свободный поток, и передаст ему задачу, либо поставит сообщение в очередь. Кто впервые слышит про Thread Pool можно почитать хорошую заметку по паттерну Worker Thread
Пример № 1
(defonce log-chan (chan))
(defn loop-worker [msg]
(println msg))
(go-loop []
(let [msg (
В примере выше, мы создали канал log-chan
, и определили функцию loop-worker которая будет обрабатывать сообщения из канала.Затем создали go-block с бесконечным циклом, поместив туда наш loop-worker
.Теперь мы можем отправить данные в канал: (>!! log-chan "привет")
Функция loop-worker была вынесена отдельно за go-block умышленно, ради удобной ее отладки через REPL.
Само тело go-loop так как это макрос запекается где то внутри core.async, и перекомпиляция его на лету в REPL носит странный характер, поэтому обработчик проще вынести отдельно и жить спокойно.
Тут стоит заметить что никакого бесконечного цикла в привычном его понимании go-loop не делает.
После получения сообщения, происходит разовое выполнения функции обработчика, а затем go-block паркуется функцией которая будет ждать нового сообщения. Таким образом можно создавать сколько угодно много каналов и обработчиков к ним.
В пределах go-блока функция чтения из канала осуществляет парковку потока.
За пределами go блока есть возможность использовать для чтения из канала функцию которая блокирует основной поток до получения сообщения. Поведение
можно сравнить с функцией await в ES7.
Parking go блока, это термин core.async означающий, что поток освобожден, и доступен для других задач. Также существует термин blocking, который означает что поток будет непосредственно заблокирован и недоступен для новых задач до его освобождения.
В примере №1 есть изъян, если в loop-worker будет вызван Exception
, то произойдет прерывание выполнения формы, и (recur)
никогда не будет вызван, следовательно ожидание данных из канала log-chan
прекратится, исправим это в примере № 2.
Пример № 2
(defonce log-chan (chan))
(defn loop-worker [msg]
(throw (Exception. "my exception message")))
(go-loop []
(let [msg (
В этом примере мы обернули весь вызов loop-worker в форму try
, а переменная res
, будет содержать флаг, сообщающий об успешном выполнении формы или же об ошибке. Этот флаг может пригодится, например, если мы захотим закрыть канал в случае ошибки. Рабочий пример этого подхода можно посмотреть тут
Пример № 3
(let [c1 (go (
Данный пример будет ждать результата от всех асинхронных операций перечисленных в блоке let
. Эта практика очень удобна для решения проблемы callback hall в JavaScript, и очередной повод порадоваться что это можно использовать на стороне браузера в лице ClojureScript.
Пример № 4
(defn upload
"upload emulator"
[headshot c time]
(go (Thread/sleep time)
(>! c headshot)))
(let [c1 (chan) c2 (chan)]
(upload "pic1.jpg" c1 30)
(upload "pic2.jpg" c2 40)
(let [[headshot channel] (alts!! [c1 c2 (timeout 20)])]
(if headshot
(println "Sending headshot notification for" headshot)
(println "Timed out!"))))
В этом примере мы создали функцию upload эмулирующую асинхронную операцию, в данном случае загрузку файла. Последним аргументом upload, принимает время задержки в миллисекундах. С помощью функции alts!!! мы можем получить первый же результат, который нам вернет один из перечисленных в векторе каналов. В нашем векторе, последним каналом идет (timeout 20)
, этот канал нам вернет результат через 20 миллисекунд, и это будет первым значением которое будет записано в переменную headshot
и будет продолжено выполнение формы. Таким образом данный пример эмулирует установку времени на timeout, в течении которого мы будем ждать выполнения набора асинхронных операций.
Пример № 5
(def ping (chan))
(def pong (chan))
(go-loop []
(let [msg (! pong :pong)
(Thread/sleep 1000))
(recur)))
(go-loop []
(let [msg (! ping :ping)
(Thread/sleep 1000))
(recur)))
(>!! ping :ping)
Пример общения двух каналов, классический Ping-Pong.
Это был последний пример который я хотел показать. Отдельно так же стоит выделить наличие в clojure типов данных, созданных специально для записи туда информации в несколько потоков, это atom и agent, а также общую иммутабельность остальных типов, всё это очень облегчает жизнь разработчика при разработке многопоточного приложения.
Полезные ссылки:
» http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html
» https://github.com/clojure/core.async
» https://github.com/clojure/core.async/wiki/Getting-Started
» http://www.braveclojure.com/core-async/
» http://go.cognitect.com/core_async_webinar_recording