Асинхронная обработка Stripe событий с помощью Scala

Изображение от Andy Hermawan на Unsplash

Привет хабровчане! Мне довелось поработать со Stripe в своем проекте  dockovpn.io и я спешу поделиться свежим опытом.

Каждый стартап рано или поздно сталкивается с необходимостью обработки платежей онлайн. В эпоху SaaS, PaaS, IaaS и других «As a Service» инициатив (их число постоянно растет), многие полагаются на бизнес модель платных подписок, так как она обеспечивает возможность регулярно в автоматическом режиме собирать платежи с пользователей и дает лучшую прогнозируемость финансовых потоков.

На рынке сейчас много поставщиков продуктов для решения данной задачи, однако есть среди них один, чьи продукты обычно рассматривают в первую очередь. Я говорю о Stripe.

В этой статье я хочу предложить читателю технический взгляд на решение проблемы обработки онлайн подписок с помощью языка программирования Scala и фреймворка Akka в распределенном серверном приложении для Кубернетиса.

Обзор Архитектуры

Давайте для начала поговорим об архитектуре нашего приложения. Общая картина выглядит следующим образом. Артифактом нашего приложения будет Докер контейнер. Размещать в Кубернетисе мы его будем как Деплоймент, а в качестве интерфейса для сетвого взаимодействия выберем Сервис с типом ClusterIP. Количество реплик у нашего деплоймента будет 2. На самом деле, для того чтобы гордо называть наше приложение распределенным, это может быть любое число больше 1. Кубернетис, в свою очередь, создаст 2 экземпляра нашего приложения. Каждый контейнер будет принимать входящие http запросы от Stripe на специально сконфигурированый веб-хук. Вернее, в первую очередь, запросы будут поступать на наш Сервис, который в случайном порядке будет распределять их на все инстансы нашего приложения. Для простоты понимания, в данной статье мы опустим настройку Ingress и балансировщика нагрузок и будем считать что все запросы из-вне поступают напрямую на наш Сервис. В конечном счете, тема данный статьи не тонкости DevOps (если тема покажется читателю интересной, я сделаю отдельную публикация для этого). В общих чертах архитектура нашего приложения будет выглядеть как на изображении ниже.

Кубернетис Деплоймент с ClusterIP Сервисом

Кубернетис Деплоймент с ClusterIP Сервисом

События Stripe

Stripe  может генерировать десятки различных событий, но нам интересны лишь некоторые из них. Так как мы пишем модуль для обработки подписок, то во-первых, нам понадобятся как минимум customer.subscription.created и customer.subscription.updated. Это поможет нам различать первоначальное оформление подписки, от последующих регулярных апдейтов.Во-вторых, нам будут нужны события об успешности (или нет) оплаты. Это позволит на этапе оформления подписки не создавать ее вообще, если оплата не прошла. Теоретически, мы могли бы подписаться на customer.subscription.cancelled, но в этом случае нам пришлось бы слушать еще одно событие и потом деактивировать созданную подписку в базе данных. Таким образом, в нашем серверном приложении мы будем слушать следующие события:

  • customer.subscription.created

  • customer.subscription.updated

  • invoice.payment_succeeded

  • invoice.payment_failed

Этого должно быть достаточно, чтобы написать надежное приложение способное обрабатывать новые подписки и последующие обновления, а также справляться с разного рода неожиданностями.

Все выглядит хорошо, если предположить что события от Stripe будут приходить к нам в строгой очередности и сгруппированные по пользователям. Однако, так как Stripe не гарантирует очередность событий, то получать мы их можем в любой последовательности и мы можем оказаться в ситуации как на картинки ниже.

События Stripe в произвольной очередности

События Stripe в произвольной очередности

Как мы можем здесь наблюдать, для пользователя 1 сначала приходит customer.subscription.updated, затем customer.subscription.created, invoice.payment_failed, и завершает последовательность invoice.payment_succeeded. В это же время, для пользователя 2 мы получаем customer.subscription.updated и invoice.payment_succeeded. В первом случае мы имели дело с созданием новой подписки, а во втором — с обновлением уже существующей.

Чтобы сделать ситуацию еще более пикантной, Stripe рекомендует возвращать код 200 как можно быстрее. В идеальном случае, мы должны получить от Stripe событие, декодировать его, положить в асинхронную очередь и вернуть код 200.

К счастью, у нас имеются все инструменты, для того чтобы справиться с данной задачей.

Серверное приложение

После того как мы познакомились в общих чертах с архитектурой и имеем представление о том, чего ожидать от Stripe, самое время начать писать наше приложение.

Итак, нам понадобиться несколько библиотек:

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed" % "2.7.0",
  "com.typesafe.akka" %% "akka-cluster-typed" % "2.7.0",
  "com.lightbend.akka.management" %% "akka-management-cluster-http" % "1.2.0",
  "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.2.0",
  "com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % "1.2.0",
  "com.stripe" % "stripe-java" % "24.4.0",
)

Код ниже демонстрирует, как мы могли бы обрабатывать входящие запросы от Stripe на нашем веб-хуке:

path("stripe") {
  post {
    extractStripeSigHeader { sigHeader =>
      entity(as[String]) { payload =>
        onSuccess(stripeService.processStripeEvent(payload, sigHeader)) {
          complete(StatusCodes.OK)
        }
      }
    }
  }
}

В StripeService мы будем декодировать событие полученное от Stripe и создавать StripeMessage объекты, которые затем отправляем StripeGatewayActor актору (это асинхронное действие и управление передается вызывающему коду немедленно). Если в процессе декодирование события возникнет ошибка, то оно будет обработано библиотекой akka-http, и сервер вернет Stripe код 503.

import com.stripe.Stripe
import com.stripe.model.{Event, Invoice, Subscription}
import com.stripe.net.Webhook
import akka.actor.typed.ActorRef

def processStripeEvent(payload: String, sigHeader: String): Future[Unit] =
    Future(Webhook.constructEvent(payload, sigHeader, endpointSecret))
      .flatMap(processStripeEvent)
  
def processStripeEvent(event: Event): Future[Unit] =
  Future(makeStripeMessage(event)).map {
    case Some(value) => stripeActor ! value
    case None => println("Can't decode Stripe event")
  }
  
private def makeStripeMessage(event: Event): Option[StripeMessage] = ...

Акторы

Вследствие рапределенной (и масштабируемой) природы нашего приложения, событие Stripe может прийти на любой инстанс (напомню, что у нас их 2). Однако, нам необходимо по крайней мере принимать события в асинхронную очередь централизованно, чтобы затем распределять, перегруппировывать и анализировать их. Чтобы решить эту проблему мы воспользуемся SingletonActor паттерном из Акки, который гарантирует, что только один экземпляр актора будет создан в кластере, а остальные ноды получат прокси. Прокси могут принимать сообщения на других нодах и перенаправлять их на ноду с синглтоном.

Синглтон актор и прокси

Синглтон актор и прокси

Чтобы создать синглтон актор, мы можем воспользоваться следующим кодом:

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.cluster.typed.{ClusterSingleton, SingletonActor}

ClusterSingleton(system).init(
  SingletonActor(
    Behaviors.supervise(
      StripeGatewayActor(...)
    ).onFailure[Exception](SupervisorStrategy.restart),
    "global-stripe-gateway"
  )
)

Давайте рассмотрим StripeGatewayActor в деталях. На самом деле тут не происходит ничего особенного. Это актор верхнего уровня в архитектуре наших страйп акторов и единственное его назначение перераспределять входящие сообщения по StripeUserActor акторам основываясь на значении stripeUserId.

Behaviors.receiveMessage {
  case sc@SubscriptionCreated(StripeEvent(_, _, _, _, stripeUserId), _) =>
    getOrCreateUserActor(stripeUserId) ! sc
    Behaviors.same
  case su@SubscriptionUpdated(StripeEvent(_, _, _, _, stripeUserId), _) =>
    getOrCreateUserActor(stripeUserId) ! su
    Behaviors.same
  case ips@InvoicePaymentSucceeded(StripeEvent(_, _, _, _, stripeUserId), _) =>
    getOrCreateUserActor(stripeUserId) ! ips
    Behaviors.same
  case ipf@InvoicePaymentFailed(StripeEvent(_, _, _, _, stripeUserId)) =>
    getOrCreateUserActor(stripeUserId) ! ipf
    Behaviors.same
  case x =>
    log.info(s"Unsupported Stripe event [$x]")
    Behaviors.same
}

Как предполагает название функции getOrCreateUserActor, здесь мы проверяем есть ли нужный нам StripeUserActor актор в кеше, если нет — то создаем его и возвращаем вызывающему коду. Здесь нам необязательно использовать конкурентный кеш, так как код внутри актора гарантированно выполняется синхронно.

val userActors: mutable.Map[String, ActorRef[StripeMessage]] = mutable.Map.empty

def getOrCreateUserActor(stripeUserId: String): ActorRef[StripeMessage] =
  userActors.getOrElseUpdate(
    stripeUserId,
    ctx.spawnAnonymous(
      StripeUserActor(
        stripeUserId,
        subscriptionService,
        userService,
        newSubscriptionCallback,
        updateSubscriptionCallback
      )
    )
  )

StripeUserActor работает похожим образом. Главное отличие в том, что здесь мы перераспределяем сообщения по StripeCheckoutSessionActor акторам основываясь на значении stripeSubscriptionId. Еще мы слушаем одно дополнительное событие SubscriptionFlowCompleted, которое сигнализирует нам о том, что подписка была успешно создана или продлена и в обработчике которого мы удаляем соответсвующий StripeCheckoutSessionActor из кеша.

val stripeCheckoutSessionActors: mutable.Map[String, ActorRef[StripeMessage]] = mutable.Map.empty

Behaviors.receiveMessage {
  case sc@SubscriptionCreated(StripeEvent(_, _, _, stripeSubscriptionId, _), _) =>
    log.info(s"Get SubscriptionCreated: $stripeSubscriptionId")
    getOrCreateUserSubscriptionActor(stripeSubscriptionId) ! sc
    Behaviors.same
  case su@SubscriptionUpdated(StripeEvent(_, _, _, stripeSubscriptionId, _), _) =>
    log.info(s"Get SubscriptionUpdated: $stripeSubscriptionId")
    getOrCreateUserSubscriptionActor(stripeSubscriptionId) ! su
    Behaviors.same
  case ips@InvoicePaymentSucceeded(StripeEvent(_, _, _, stripeSubscriptionId, _), _) =>
    log.info(s"Get InvoicePaymentSucceeded: $stripeSubscriptionId")
    getOrCreateUserSubscriptionActor(stripeSubscriptionId) ! ips
    Behaviors.same
  case ipf@InvoicePaymentFailed(StripeEvent(_, _, _, stripeSubscriptionId, _)) =>
    log.info(s"Get InvoicePaymentFailed: $stripeSubscriptionId")
    getOrCreateUserSubscriptionActor(stripeSubscriptionId) ! ipf
    Behaviors.same
  case SubscriptionFlowCompleted(stripeSubscriptionId) =>
    userSubscriptionActors -= stripeSubscriptionId
    Behaviors.same
  case x =>
    log.info(s"Unsupported Stripe event [$x]")
    Behaviors.same
  }

Чекаут сессия

Перед тем как погрузиться в детали StripeCheckoutSessionAcrtor, несколько слов о Stripe чекаут сессии как таковой. Каждый раз, когда мы нажимаем на кнопку «Подписаться» в таблице цен на сайте, мы перенаправляемся на страницу чекаута Stripe. stripeSubscriptionId привязан к чекаут сессии.

Встраеваемая таблица цен Stripe

Встраеваемая таблица цен Stripe

Чекаут страница Stripe

Чекаут страница Stripe

События генерируемые Stripe отражают различные состояния, через которые приходит чекаут сессия.

Внутренние состояния чекаут сессии

Внутренние состояния чекаут сессии

Если мы закроем или забросим страницу чекаута и нажмем на кнопку «Подписаться» еще раз, то откроется новая страница с чекаутом и новый stripeSubscriptionId будет сгенерирован.

StripeCheckoutSessionActor

StripeCheckoutSessionActor отражает состояние чекаут сессии и находится внизу иерархии акторов Stripe.

Иерархия акторов Stripe

Иерархия акторов Stripe

В конструкторе поведения актора (Behaviuor) мы передаем 2 функции обратного вызова: createSubscriptionCallback и updateSubscriptionCallback. Чтобы решить какую из этих функций вызывать, нам необходимо установить существует ли запись с данным stripeSubscriptionId в нашей базе данных. Таком образом, изначальное состояние нашего StripeCheckoutSessionActor актора — это initializingBehaviour, в котором мы и обращаемся кнашей базе данных.

def initializingBehavior(buffer: StashBuffer[StripeMessage]): Behavior[StripeMessage] = {
  ctx.pipeToSelf {
    for {
      userName <- retryFuture(getUser(stripeUserId), 3)
      userSubscriptionOpt <- subscriptionService.getStripeSubscriptionOpt(stripeSubscriptionId)
    } yield (userName, userSubscriptionOpt)
  } {
    case Failure(exception) =>
      // this anomaly requires manual resolution, notification to admin must be sent
      SubscriptionFlowFailed(
        stripeSubscriptionId,
        "",
        s"Couldn't resolve userName from stripeUserId [$stripeUserId] because of ${exception.getMessage}"
      )
    case Success((user, userSubscriptionOpt)) =>
      resolvedUser = user
      StartProcessingUserSubscriptionEvents(user.userName, userSubscriptionOpt)
  }

  Behaviors.receiveMessage {
    case StartProcessingUserSubscriptionEvents(userName, userSubscriptionOpt) =>
      userSubscriptionOpt match {
        case Some(_) =>
          buffer.unstashAll(listenForUpdateFlowEventsBehavior(userName))
        case None =>
          buffer.unstashAll(listenForCreationFlowEventsBehavior(userName))
      }
    case sff: SubscriptionFlowFailed =>
      parent ! sff
      Behaviors.stopped
    case x =>
      buffer.stash(x)
      Behaviors.same
  }
}
Behaviors.withStash(25)(buffer => initializingBehavior(buffer))

В этом состоянии актор ожидает только StartProcessingUserSubscriptionEvents сообщение и накапливает все остальные сообщения, чтобы обработать их в listenForCreationFlowEventsBehavior или listenForUpdateFlowEventsBehavior состоянии, куда он переводится в зависимости от результата запроса к базе данных.

Тип подписки (создание или продление) определяет последовательность событий для активации функции обратного вызова. Но, как мы уже упоминали ранее Stripe не гарантирует порядок событий. Как быть в этой ситуации? Нам не остается ничего другого, как самим упорядочивать события. Для этого мы буем назначать событиям ранг/вес следующим образом.

  • SubscriptionCreated будет иметь ранг — 1

  • SubscriptionUpdated будет иметь ранг — 2

  • InvoicePaymentSucceeded будет иметь ранг — 3

Для работы с ранговыми событиями нам понадобится новый кейс класс OrderRankedEvent, экземпляры которого мы будем складывать в TreeSet c кастомизированнойсортировокой.

private implicit val stripeEventsOrdering: Ordering[OrderRankedEvent] =
    (x: OrderRankedEvent, y: OrderRankedEvent) => x.rank.compare(y.rank)

case class OrderRankedEvent(id: String, eventName: String, rank: Int)

val orderedEvents = mutable.TreeSet.empty[OrderRankedEvent]
Behaviors.receiveMessage {
  case SubscriptionCreated(event, _) =>
    log.info(s"Get SubscriptionCreated: $stripeSubscriptionId")
    if (orderedEventsList.exists(_.id == event.id)) {
      log.warn(s"Received duplicate event with id [${event.id}], ignoring event")
      Behaviors.same
    } else {
      val rankedEvent = OrderRankedEvent(event.id, event.eventName, 1)
      orderedEvents += rankedEvent
      arrivedEvents += rankedEvent //debug
      listenForCreationFlowEventsBehavior(userName)
    }
  
  case SubscriptionUpdated(event, stripeSubscription) =>
    log.info(s"Get SubscriptionUpdated: $stripeSubscriptionId")
    if (orderedEventsList.exists(_.id == event.id)) {
      log.warn(s"Received duplicate event with id [${event.id}], ignoring event")
      Behaviors.same
    } else {
      val rankedEvent = OrderRankedEvent(event.id, event.eventName, 2)
      orderedEvents += rankedEvent
      arrivedEvents += rankedEvent
      resolvedStripeSubscription = stripeSubscription
      listenForCreationFlowEventsBehavior(userName)
    }
  
  case ips@InvoicePaymentSucceeded(event, invoice) =>
    log.info(s"Get InvoicePaymentSucceeded: $ips")
    resolvedInvoice = invoice
    if (orderedEventsList.exists(_.id == event.id)) {
      log.warn(s"Received duplicate event with id [${event.id}], ignoring event")
      Behaviors.same
    } else {
      val rankedEvent = OrderRankedEvent(event.id, event.eventName, 3)
      orderedEvents += rankedEvent
      arrivedEvents += rankedEvent //debug
      listenForCreationFlowEventsBehavior(userName)
    }
  
  case InvoicePaymentFailed(event) =>
    log.info(s"Get InvoicePaymentFailed: $stripeSubscriptionId")
    if (orderedEventsList.exists(_.id == event.id)) {
      log.warn(s"Received duplicate event with id [${event.id}], ignoring event")
    } else {
      log.warn(s"Invoice payment failed for user [$userName] with subscription [$stripeSubscriptionId]")
    }
    Behaviors.same
  
  case x =>
    log.info(s"Unexpected Stripe event [$x]")
    Behaviors.same
}

Как можно увидеть из кода, мы логируем, но не обрабатываем InvoicePaymentFailed события, так как в рамках одной чекаут сессии может быть несколько неудачных попыток оплатить покупку (просроченная карта например) перед успешной оплатой.

В дальнейшем, мы делаем паттерн-матч накопленных OrderRankedEvent объектов с последовательностью активации для данного типа подписки и, если он совпадает, то мы отправляем в родительский StripeUserActor актор SubscriptionFlowCompleted и активируем функцию обратного вызова (колбэк).

val orderedEventsList = orderedEvents.toList

orderedEventsList match {
  case List(
    OrderRankedEvent(_, Events.SubscriptionCreated, _),
    OrderRankedEvent(_, Events.SubscriptionUpdated, _),
    OrderRankedEvent(_, Events.InvoicePaymentSucceeded, _)
  ) => 
    newSubscriptionCallback(...)
    ctx.parent ! SubscriptionFlowCompleted(stripeSubscriptionId)
    Behaviours.stopped
  ...

Для продления подписки алгоритм действий схожий, но последовательность активации немного короче.

orderedEventsList match {
    case List(
      OrderRankedEvent(_, Events.SubscriptionUpdated, _),
      OrderRankedEvent(_, Events.InvoicePaymentSucceeded, _)
    ) =>
    updateSubscriptionCallback(...)
  ...

Итак, нам удалось написать гибкое и масштабируемое приложение для обработки онлайн подписок, включая первоначальное оформление подписки и последующие обновления. События в нашей экосистеме акторов быстро передаются сверху вниз по иерархии с минимальным оверхедом и для каждой новой чекаут сессии создается новый StripeCheckoutSessionActor, который благополучно удаляется после того как процесс завершен.

P.S. S

На момент написания статьи, приведенный код был протестирован и хорошо себя зарекомендовал в продакшене. Безусловно, остается много пространства для улучшений, чем я и буду заниматься перед тем как оформить его в библиотеку, которую выложу в своем ГитХабе.

Если вам понравилась статья — подписывайтесь на меня на Хабре и в соц сетях (в профиле), а также не стесняйтесь комментировать и задавать вопросы.

© Habrahabr.ru