WebSocket Akka HTTP на практике
Довольно продолжительное время существовала только одна достойная реализация работы с HTTP поверх Akka — spray. К этой библиотеке пару умельцев написали расширения для WebSocket,
которое было вполне понятно в использовании и проблем не возникало. Но годы шли и spray, в том или ином виде, перекочевал в Akka HTTP с реализованной поддержкой WebSocket из коробки.
Для работы с WebSocket ребята из Akka предлагают нам использовать Akka Stream, тем самым упрощая нам жизнь с потоковыми данными и, одновременно, усложняя ее. Akka Stream не так прост в понимании. Далее я попытаюсь показать базовые практические примеры использования.
Это своеобразный pipeline обработки данных, каждая итерация которого что-либо делает с данными, попадающими в него. Flow делится на 3 составляющие: Source, GraphStage, Sink.
Лучше всего это показано на диаграмме из документации
Пожалуй один из самых неэффективных способов обработки, но самый простой для понимания.
Идея его заключается в том, чтобы все входящие сообщения попадали в актор, и у него был ActorRef, который отправлял данные непосредственно клиенту.
На каждое подключение клиента мы создаем актор ClientConnectionActor. А также Source, который будет представлять из себя еще один актор, направляющий полученные сообщения во flow. После его создания через метод mapMaterializedValue мы получим на него ссылку. Кроме этого мы создаем Sink, который все сообщения будет отправлять в ClientConnectionActor.
Таким образом ClientConnectionActor будет получать все сообщения из сокета. Отправлять мы их будем через прилетевший ему ActorRef, который будет доставлять их клиенту.
Минусы: необходимо следить за побочными акторами; быть аккуратным с OverflowStrategy; для обработки всех сообщений у нас всего один актор, он, соотвественно, однопоточный, из-за чего могут последовать проблемы с производительностью.
Идея данного подхода заключается в полном ипользовании Akka Stream для достижения целей. Общий вид его сводится к построению pipeline обработки входящих сообщений клиента.
Теперь немного усложним скелет и добавим парсинг и сериализацию JSON.
Модифицируем flow
Сперва мы отсекаем все бинарные сообщения, далее парсим входящий поток в JSON, обрабатываем его и сериализуем в текст для отправки клиенту.
Есть и другой способ: можно реализовать свой filter/map унаследовав GraphStage[FlowShape[A, A]].
И напоследок сделаем так, чтобы всем подключенным пользователям каждую секунду отправлялось текущее время:
Это базовые примеры того, как можно реализовать поддержку WebSocket в своем проекте. Пакет Akka Stream большой и разннобразный, он поможет решить довольно большой пласт задач, не переживая за масштабирование и параллелизацию.
которое было вполне понятно в использовании и проблем не возникало. Но годы шли и spray, в том или ином виде, перекочевал в Akka HTTP с реализованной поддержкой WebSocket из коробки.
Для работы с WebSocket ребята из Akka предлагают нам использовать Akka Stream, тем самым упрощая нам жизнь с потоковыми данными и, одновременно, усложняя ее. Akka Stream не так прост в понимании. Далее я попытаюсь показать базовые практические примеры использования.
Коротко об Akka Stream
Это своеобразный pipeline обработки данных, каждая итерация которого что-либо делает с данными, попадающими в него. Flow делится на 3 составляющие: Source, GraphStage, Sink.
Лучше всего это показано на диаграмме из документации
Для реализации WebSocket нам потребуется реализовывать GraphStagе. Source нам предоставляет akka, это как раз и есть наш клиент с летящими от него сообщениями. А Sink — сама отправка наших сообщений клиенту.
Actor style
Пожалуй один из самых неэффективных способов обработки, но самый простой для понимания.
Идея его заключается в том, чтобы все входящие сообщения попадали в актор, и у него был ActorRef, который отправлял данные непосредственно клиенту.
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import scala.io.StdIn
object Boot extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def flow: Flow[Message, Message, Any] = {
val client = system.actorOf(Props(classOf[ClientConnectionActor]))
val in = Sink.actorRef(client, 'sinkclose)
val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a ⇒
client ! ('income → a)
a
}
Flow.fromSinkAndSource(in, out)
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ ⇒ system.terminate())
}
class ClientConnectionActor extends Actor {
var connection: Option[ActorRef] = None
val receive: Receive = {
case ('income, a: ActorRef) ⇒ connection = Some(a); context.watch(a)
case Terminated(a) if connection.contains(a) ⇒ connection = None; context.stop(self)
case 'sinkclose ⇒ context.stop(self)
case TextMessage.Strict(t) ⇒ connection.foreach(_ ! TextMessage.Strict(s"echo $t"))
case _ ⇒ // ingone
}
override def postStop(): Unit = connection.foreach(context.stop)
}
На каждое подключение клиента мы создаем актор ClientConnectionActor. А также Source, который будет представлять из себя еще один актор, направляющий полученные сообщения во flow. После его создания через метод mapMaterializedValue мы получим на него ссылку. Кроме этого мы создаем Sink, который все сообщения будет отправлять в ClientConnectionActor.
Таким образом ClientConnectionActor будет получать все сообщения из сокета. Отправлять мы их будем через прилетевший ему ActorRef, который будет доставлять их клиенту.
Минусы: необходимо следить за побочными акторами; быть аккуратным с OverflowStrategy; для обработки всех сообщений у нас всего один актор, он, соотвественно, однопоточный, из-за чего могут последовать проблемы с производительностью.
Производный вариант с использованием ActorPublisher и ActorSubscriber мы рассматривать не будем, так как, судя по официальной документации, он в состоянии deprecated.
Flow style
Идея данного подхода заключается в полном ипользовании Akka Stream для достижения целей. Общий вид его сводится к построению pipeline обработки входящих сообщений клиента.
Скелет
В данном случае мы обрабатываем только текстовые сообщения и изменяем их. Дальше TextMessage отправляется клиенту.
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import scala.io.StdIn
object Boot extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def flow: Flow[Message, Message, Any] = {
Flow[Message].collect {
case TextMessage.Strict(t) ⇒ t
}.map { text ⇒
TextMessage.Strict(s"echo: $text")
}
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ ⇒ system.terminate())
}
В данном случае мы обрабатываем только текстовые сообщения и изменяем их. Дальше TextMessage отправляется клиенту.
Теперь немного усложним скелет и добавим парсинг и сериализацию JSON.
Классы для сериализации
trait WsIncome
trait WsOutgoing
@JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing
implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity)
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
case s: Say ⇒ s.asJson
}
Модифицируем flow
Flow[Message]
.collect {
case tm: TextMessage ⇒ tm.textStream
}
.mapAsync(CORE_COUNT * 2 - 1)(in ⇒ in.runFold("")(_ + _).flatMap(in ⇒ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
.collect {
case Say(name) ⇒ Say(s"hello: $name")
}
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))
Сперва мы отсекаем все бинарные сообщения, далее парсим входящий поток в JSON, обрабатываем его и сериализуем в текст для отправки клиенту.
Усложним конструкцию, добавив контекст для каждого клиента. В этом нам поможет statefulMapConcat.
ClientContext
class ClientContext {
@volatile var userName: Option[String] = None
}
object ClientContext {
def unapply(arg: ClientContext): Option[String] = arg.userName
}
@JsonCodec case class SetName(name: String) extends WsIncome
@JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing
implicit val WsIncomeDecoder: Decoder[WsIncome] =
Decoder[Say].map[WsIncome](identity)
.or(Decoder[SetName].map[WsIncome](identity))
def flow: Flow[Message, Message, Any] = {
Flow[Message]
.collect {
case tm: TextMessage ⇒ tm.textStream
}
.mapAsync(CORE_COUNT * 2 - 1)(in ⇒ in.runFold("")(_ + _).flatMap(in ⇒ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
.statefulMapConcat(() ⇒ {
val context = new ClientContext
m ⇒ (context → m) :: Nil
})
.mapConcat {
case (c: ClientContext, SetName(name)) ⇒
c.userName = Some(name)
Nil
case a ⇒ a :: Nil
}
.collect {
case (ClientContext(userName), Say(text)) ⇒ Say(s"$userName: $text")
case (_, Say(text)) ⇒ Say(s"unknown: $text")
}
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))
}
Есть и другой способ: можно реализовать свой filter/map унаследовав GraphStage[FlowShape[A, A]].
Пример (не адаптировано под предыдущий код)
В данном варианте фильтруется все сообщения до тех пор, пока не получено сообщение на авторизацию. Если авторизация пройдет успешно, сообщения проходят дальше совместно с профилем пользователя.
class AuthFilter(auth: ws.AuthMessage ⇒ Future[Option[UserProfile]])(implicit ec: ExecutionContext) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] {
val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in")
val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
@volatile var profile: Option[UserProfile] = None
setHandler(in, new InHandler {
override def onPush(): Unit = profile match {
case Some(p) ⇒ push(out, ws.WsContextIncomeMessage(p, grab(in)))
case _ ⇒ grab(in) match {
case a: ws.AuthMessage ⇒ auth(a) onComplete {
case Success(p) ⇒
profile = p
pull(in)
case Failure(e) ⇒ fail(out, e)
}
case _ ⇒ pull(in)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
}
В данном варианте фильтруется все сообщения до тех пор, пока не получено сообщение на авторизацию. Если авторизация пройдет успешно, сообщения проходят дальше совместно с профилем пользователя.
И напоследок сделаем так, чтобы всем подключенным пользователям каждую секунду отправлялось текущее время:
case object Tick extends WsOutgoing
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
case s: Say ⇒ s.asJson
case Tick ⇒ Json.obj("time" → DateTime.now.toIsoDateTimeString().asJson)
}
...
val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick)
...
.collect {
case (ClientContext(userName), Say(text)) ⇒ Say(s"$userName: $text")
case (_, Say(text)) ⇒ Say(s"unknown: $text")
}
.merge(broadcast)
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))
Это базовые примеры того, как можно реализовать поддержку WebSocket в своем проекте. Пакет Akka Stream большой и разннобразный, он поможет решить довольно большой пласт задач, не переживая за масштабирование и параллелизацию.
PS: Используя новую для вас технологию в более-менее нагруженном проекте, не забывайте проводить нагрузочное тестирование, следить за памятью и горячими участками кода (в этом вам может помочь gatling). Всем добра.
Комментарии (1)
20 января 2017 в 08:38
0↑
↓
А это так должно быть, что в исходниках стрелки заменились на спецсимволы? Это отголоски внедрения парсера формул в редактор?