[Перевод] Написание чата с Akka

ye3nem_8yk9i8zcdct8ppweadcs.jpeg


Создание чатов — простая и одновременно сложная задача. В этой статье представлен пошаговый туториал по реализации бэкенда для простого многоканального чата.

Сервис будет реализован как комбинация из простого REST API и приложения WebSocket. Чтобы было чуть интереснее, я решил по максимуму использовать связанные с Akka библиотеки и typed actors.

Весь приведённый в статье код доступен в репозитории GitHub.
Давайте начнём с простого вопроса:

▍ Почему именно чат?


Во-первых, чаты, наряду со стримингом видео, являются одной из самых интересных для меня тем. Во-вторых, написание чат-приложения — это, наверное, самый простой способ знакомства с протоколом WebSocket. Более того, большинство чатов в вебе работают так же, как описанный в статье. Разумеется, их масштаб и количество функций сильно больше, однако основной принцип остаётся таким же.

▍ Что такое WebSocket и почему это важно?


Если вкратце, то это протокол связи, обеспечивающий двунаправленную коммуникацию между сервером и браузером при помощи одного TCP-соединения. Благодаря этой особенности, нам не нужно постоянно запрашивать новые данные с сервера; обмен данными происходит в реальном времени в виде сообщений между задействованными сторонами. Каждое сообщение — это или двоичные данные, или текст в формате Unicode.

Протокол стандартизован в 2011 году советом IETF в виде RFC 6455. Протокол WebSocket отличается от HTTP, но оба они находятся в слое 7 модели OSI и зависят от TCP, находящегося в слое 4. С другой стороны, WebSocket по определению работает через HTTP-порты 443 и 80, а также поддерживает такие концепции HTTP, как прокси и посредники. Более того, в handshake WebSocket используется заголовок HTTP upgrade для перехода с протокола HTTP на WebSockets.

Самый большой недостаток протокола WebSocket — безопасность. WebSockets не ограничен политикой одинакового источника (same-origin policy), что сильно упрощает атаки в стиле CSRF.

▍ Используемые инструменты


Давайте начнём с описания инструментов, которые будут использоваться для реализации приложения.

  • В реализации приложения важную роль будут играть Akka — библиотеки из тулкита Akka и модель Actor. Так как типобезопасность важна, а проверка типов во время компиляции позволяет избавиться от множества проблем ещё до запуска кода, я решил использовать максимальное количество typed actors. Именно поэтому в качестве основных я выбрал akka-stream-typed и akka-actor-typed, а не их классические версии.
  • akka-http — ничего неожиданного, ведь мне нужно предоставить REST API с функциями WebSocket и использовать акторы Akka. akka-http — простейший способ добиться этого, поскольку мне не нужно будет беспокоиться об интеграции и взаимодействии разных библиотек. Кроме того, я использую akka-http-circe. Для парсинга входящих JSON я выбрал библиотеку circe и мне необходимо взаимодействие между akka-http и circe.
  • pureconfig — для загрузки файлов конфигурации и их парсинга в объекты Scala без слишком большого бойлерплейта
  • logback-classic — для логгинга


И это всё, что понадобится мне для реализации чат-приложения с WebSockets на Akka. Давайте приступим к реализации!

▍ Реализация


Я решил использовать Scala 2.13.8, потому что на момент написания статьи не все выбранные библиотеки поддерживали Scala 3.

1. Я начинаю с добавления всех необходимых зависимостей в файл проекта build.sbt.

libraryDependencies := Seq(
  "com.typesafe.akka" %% "akka-actor-typed" % "2.6.19",
  "com.typesafe.akka" %% "akka-stream-typed" % "2.6.19",
  "com.typesafe.akka" %% "akka-http" % "10.2.9",
  "de.heikoseeberger" %% "akka-http-circe" % "1.39.2",
  "io.circe" %% "circe-generic" % "0.14.1",
  "com.github.pureconfig" %% "pureconfig" % "0.17.1",
  "ch.qos.logback" % "logback-classic" % "1.2.11",
)


2. Добавляю файл конфигурации приложения chatp-app.conf и два case-класса, которые в дальнейшем будут использоваться в качестве значений конфигурации.

Пример содержимого файла .conf:

http-config {
  port = 8070
  port = ${?PORT}
  host = "localhost"
  host = ${?HOST}
}


Такая нотация позволяет считывать переменную окружения PORT и если она отсутствует, то будет использоваться значение 8070. По сути, все значения из этого файла могут быть переопределены при помощи переменных окружения.

case class AppConfig(httpConfig: HttpConfig)
case class HttpConfig(port: Int, host: String) {
  val toPath = s"$host:$port"
}


3. Реализую для этого приложения runner-класс.

object ChatAppStarter {

  def main(args: Array[String]): Unit = {
    val appConfig = ConfigSource.resources("chat-app.conf").loadOrThrow[AppConfig]
    val rootBehavior = Behaviors.setup[Nothing] { context =>
      context.setLoggerName("ChatApiLogger")
      Behaviors.same
    }
    ActorSystem[Nothing](rootBehavior, "ChatApp")
  }
}


На данный момент мы будем только считывать файл конфигурации, а в дальнейшем я буду добавлять новый код.

4. Теперь я реализую актор, представляющий каждый конкретный Chat, созданный в приложении.

object Chat {

  sealed trait ChatCommand
  final case class ProcessMessage(sender: String, content: String) extends ChatCommand
  final case class AddNewUser(ref: ActorRef[String]) extends ChatCommand

  def apply(): Behavior[ChatCommand] =
    Behaviors.setup { _ =>
      var participants = List.empty[ActorRef[String]]
      val messageQueue = mutable.Queue.empty[String]
      Behaviors.receiveMessage[ChatCommand] {
        case ProcessMessage(sender, content) =>
          participants.foreach(ref => ref ! s"$sender: $content")
          Behaviors.same
        case AddNewUser(ref) =>
          participants = participants.appended(ref)
          messageQueue.foreach(m => ref ! m)
          Behaviors.same
      }
    }
}


Чат — это простой актор, обрабатывающий два типа сообщений: AddNewUser с аргументом actorRef и ProcessMessage, представляющим каждое сообщение, отправляемое между пользователем внутри чата.

  • После получения сообщения AddNewUser актор чата будет добавлять actorRef к списку участников, а затем сразу отправлять все уже переданные в чат сообщения новому пользователю.
  • После получения ProcessMessage актор просто будет добавлять содержимое сообщения в очередь сообщений и транслировать содержимое всем участникам, присутствующим в чате.


5. Я реализую актор ChatsStore, отвечающий за хранение всех чатов, существующих в приложении.

object ChatsStore {

  sealed trait StoreCommand
  final case class AddNewChat(sender: User, receiver: User, replyTo: ActorRef[Int]) extends StoreCommand
  final case class GetChatMeta(chatId: Int, userName: String, replyTo: ActorRef[Option[GetChatMetaResponse]]) extends StoreCommand

  final case class GetChatMetaResponse(userName: String, ref: ActorRef[ChatCommand])

  private var sequence = 0
  private val store = mutable.Map.empty[Int, ChatMetadata]

  private case class ChatMetadata(participants: Map[String, User], ref: ActorRef[ChatCommand]) {
    def containUserId(userId: String): Boolean =
      participants.contains(userId)
  }

  def apply(): Behavior[StoreCommand] =
    Behaviors.setup(context => {
      Behaviors.receiveMessage {
        case AddNewChat(sender, receiver, replyTo) =>
          sequence += 1
          val newChat: ActorRef[ChatCommand] = context.spawn(Chat(), s"Chat$sequence")
          val participants = Map(sender.id.toString -> sender, receiver.id.toString -> receiver)
          val metadata = ChatMetadata(participants, newChat)
          store.put(sequence, metadata)
          replyTo ! sequence
          Behaviors.same
        case GetChatMeta(chatId, userId, replyTo) =>
          val chatRef = store
            .get(chatId)
            .filter(_.containUserId(userId))
            .flatMap(meta =>
              meta.participants
                .get(userId)
                .map(user => GetChatMetaResponse(user.name, meta.ref))
            )
          replyTo ! chatRef
          Behaviors.same
      }
    })
}


Это ещё один простой актор, поддерживающий только два типа сообщений: AddNewChat и GetChatMeta. Получив сообщение AddNewChat, актор Store будет создавать новый экземпляр актора Chat с указанным списком id в качестве участников и следующее число в последовательности как id.

С другой стороны, после получения GetChatMetada хранилище попытается найти Chat с указанным id и userId. Если чат для конкретной комбинации существует, то Store вернёт его acrofRef с полученным именем пользователя.

Поскольку его актор внутреннего состояния содержит точное хранилище, представленное здесь как простое отображение из Integer во внутренний case-класс — ChatMatada вместе с последовательностью реализован как простой Integer, который используется для назначения id новым добавляемым чатам.

Кроме того, я добавлю в ChatAppStarter следующую строку для создания актора ChatsStore:

val store = context.spawn(ChatsStore(), "Store")


6. Реализую ChatService, который будет играть роль класса utils.

class ChatService(contextPath: List[String], httpConfig: HttpConfig) {

  private val apiPath = contextPath.mkString("/")

  val path: PathMatcher[Unit] = toPath(contextPath, PathMatcher(""))

  @tailrec
  private def toPath(l: List[String], pathMatcher: PathMatcher[Unit]): PathMatcher[Unit] = {
    l match {
      case x :: Nil => toPath(Nil, pathMatcher.append(x))
      case x :: tail => toPath(tail, pathMatcher.append(x / ""))
      case Nil => pathMatcher
    }
  }

  def generateChatLinks(chatId: Int, senderId: String, receiverId: String): (String, String) = {
    val chatPath = s"ws://${httpConfig.toPath}/$apiPath/chats/$chatId/messages"
    (s"$chatPath/$senderId", s"$chatPath/$receiverId")
  }

}


Здесь вы видите метод toPath, разворачивающий путь контекста из List[String] в совместимый с Akka Http PathMatcher и метод generateChatLinks, генерирующий ссылки на чаты, которые в дальнейшем будут отправляться в качестве ответов после создания чата с конкретными сочетаниями userId и id.

Кроме того, я добавлю в ChatAppStarter следующую строку для создания экземпляра ChatService:

val service = new ChatService(List("api", "v1"), appConfig.httpConfig)


7. Я реализую класс ChatApi, отвечающий за раскрытие REST API с функциями WebSocket, а также Akka Stream Flow для обработки сообщений WebSocket.

class ChatApi(service: ChatService, store: ActorRef[StoreCommand], logger: Logger)(implicit val system: ActorSystem[_]) {

  private implicit val timeout: Timeout = Timeout(2.seconds)
  private implicit val ec: ExecutionContextExecutor = system.executionContext

  val routes: Route = {
    pathPrefix(service.path / "chats") {
      concat(pathEnd {
        post {
          entity(as[StartChat]) { start =>
            val senderId = start.sender.id.toString
            val receiverId = start.receiver.id.toString
            logger.info(s"Starting new chat sender: $senderId, receiver: $receiverId")
            val eventualCreated =
              store
                .ask(ref => AddNewChat(start.sender, start.receiver, ref))
                .map(id => {
                  val chatLinks = service.generateChatLinks(id, senderId, receiverId)
                  ChatCreated(id, chatLinks._1, chatLinks._2)
                })
            onSuccess(eventualCreated) { c =>
              complete(StatusCodes.Created, c)
            }
          }
        }
      }, path(IntNumber / "messages" / Segment) { (id, userId) =>
        onSuccess(store.ask(ref => GetChatMeta(id, userId, ref))) {
          case Some(meta) => handleWebSocketMessages(websocketFlow(meta.userName, meta.ref))
          case None => complete(StatusCodes.NotFound)
        }
      })
    }
  }

  def websocketFlow(userName: String, chatActor: ActorRef[ChatCommand]): Flow[Message, Message, Any] = {
    val source: Source[TextMessage, Unit] =
      ActorSource.actorRef[String](PartialFunction.empty, PartialFunction.empty, 5, OverflowStrategy.fail)
        .map[TextMessage](TextMessage(_))
        .mapMaterializedValue(sourceRef => chatActor ! AddNewUser(sourceRef))

    val sink: Sink[Message, Future[Done]] = Sink
      .foreach[Message] {
        case tm: TextMessage =>
          chatActor ! ProcessMessage(userName, tm.getStrictText)
        case _ =>
          logger.warn(s"User with id: '{}', send unsupported message", userName)
      }

    Flow.fromSinkAndSource(sink, source)
  }
}

object ChatApi {

  case class StartChat(sender: User, receiver: User)
  case class User(id: UUID, name: String)
  case class ChatCreated(chatId: Int, senderChatLink: String, receiverChatLink: String)

  implicit val startChatDecoder: Decoder[StartChat] = deriveDecoder
  implicit val startChatEncoder: Encoder[StartChat] = deriveEncoder
  implicit val userDecoder: Decoder[User] = deriveDecoder
  implicit val userEncoder: Encoder[User] = deriveEncoder
  implicit val chatCreatedDecoder: Decoder[ChatCreated] = deriveDecoder
  implicit val chatCreatedEncoder: Encoder[ChatCreated] = deriveEncoder
}


Основная часть кода используется для раскрытия API в виде двух конечных точек. Первая — это чистая конечная точка REST по пути http://{url}/chats с функциональностью POST для раскрытия способа создания чатов.

Вторая — это смешанная конечная точка по пути http::/{url}/chats/{chatId}/messages/{userId}, запускающая канал WebSocket вместе с сообщениями процесса для конкретного чата.

Весь код из объекта-компаньона ChatApi используется для моделирования запросов и ответов с полуавтоматическим извлечением circe, которое я решил использовать вместо автоматического извлечения.

Самая важная часть кода на этом этапе — часть, отвечающая за обработку сообщений WebSocket, в частности, метод websocketFlow. Он имеет классическую сигнатуру akka-http для обработки websockets, однако реализация не так уж тривиальна.

Во-первых, я создаю источник на основе ActorRef, отвечающий за получение сообщений от акторов, обозначающих пользователей чата. После материализации каждого источника они отправляют ссылку на своего актора актору чата, для которого она запрошена. Второй элемент — это Sink, которому отправляются все сообщения от конечной точки websocket и который будет перенаправлять все сообщения заинтересованным акторам чата. Я объединил их с помощью Flow.fromSinkAndSource.

Кроме того, я добавил в  ChatAppStarter следующую строку для создания нового экземпляра ChatApi.

val api = new ChatApi(service, store, context.log)(context.system)


8. На этом этапе я добавляю объект, отвечающий за запуск HTTP-сервера:

object Server {

  def start(routes: Route, config: HttpConfig, logger: Logger)(implicit system: ActorSystem[_]): Unit = {
    import system.executionContext

    val bindingFuture = Http()
      .newServerAt(config.host, config.port)
      .bind(routes)
    bindingFuture.onComplete {
      case Success(binding) =>
        val address = binding.localAddress
        logger.info("Server online at http://{}:{}/", address.getHostString, address.getPort)
      case Failure(ex) =>
        logger.error("Failed to bind HTTP endpoint, terminating system", ex)
        system.terminate()
    }
  }
}


Здесь нет ничего особо сложного, всего лишь простой Http-сервер Akka, который запускается с сообщением.

И ещё одна последняя строка в ChatAppStarter, чтобы объединить их все:

Server.start(api.routes, appConfig.httpConfig, context.log)(context.system)


9. Добавляем ещё один файл конфигурации, отвечающий за загрузку конфигурации Akka. После загрузки начинается автоматическая отправка пинга активности соединения websockets чата.

Файл состоит из одной строки:

akka.http.server.websocket.periodic-keep-alive-max-idle = 1 second


Затем добавляем две строки в ChatAppStarter:

val akkaConfig = ConfigSource.resources("akka.conf").config().right.get -  for loading the config file 

ActorSystem[Nothing](rootBehavior, "ChatApp", akkaConfig)


для запуска всей системы акторов с загруженной конфигурацией.

Вот и всё, весь наш чат реализован. Давайте его протестируем!

▍ Тестирование


Для тестов я использую Postman и Simple Web Socket Client.

1. С помощью Postman я создаю новый чат для двух пользователей.

xay-abytc_4_cvvlx06qeasjhek.jpeg


В теле ответа я получаю Id чата и две персонализированные ссылки для пользователей, чтобы они могли присоединиться к чату и использовать его в стиле квази-hypermedia.

2. Настало время использовать их и проверить, смогут ли пользователи общаться друг с другом. Здесь нам на помощь приходит Simple Web Socket Client.

ou43dilwe0qc8a6ye6vsk5fqbes.jpeg


mlbaoccbyud7zgczzgvrtzqhpdc.jpeg


Вот и всё, система работает, и пользователи могут общаться друг с другом.

Остался последний шаг: давайте посмотрим, что можно усовершенствовать.

▍ Что можно усовершенствовать


Так как лишь создал простейшее приложение для чата, его во многом можно усовершенствовать. Ниже представлен список того, что стоит улучшить:

  • Улучшить поддержку выходящих и возвращающихся в чат пользователей — пока это реализовано не самым оптимальным образом и всё можно существенно усовершенствовать.
  • Отправка приложений — пока чат поддерживает только простые текстовые сообщения. Хотя обмен текстом — основная функция чата, пользователям нравится также отправлять фотографии и аудиофайлы.
  • Модель сообщения — подумать и переосмыслить, что конкретно нужно в вашем сообщении; возможно, также потребуются некоторые изменения в модели API.
  • Постоянное хранилище сообщений — чтобы сообщения сохранялись при перезагрузке приложения. Кроме того, может потребоваться определённый уровень шифрования.
  • Поддержка групповых чатов — пока приложение поддерживает только чат двух пользователей, поэтому логично будет в дальнейшем реализовать групповые чаты.
  • Тесты — пока тестов нет, но зачем на этом останавливаться? Тесты — это всегда хорошо.


oug5kh6sjydt9llengsiebnp40w.png

© Habrahabr.ru