Rx головного мозга

6uyfbh1c38mxs7zw0bhazeuajzq.jpeg

О том, как удобно писать на Rx, какие операторы и сущности в нём есть, как делать запросы в сеть и работать с многопоточностью, не писал разве что ленивый. Кто-то рассказывает, что можно «обмазаться» Rx-ом на всех уровнях архитектуры и приложение станет реактивным, а разработка — в разы быстрее. Но как только слова доходят до дела, то встаёт множество вопросов, как подружить Rx со спецификой UI и системными API и зачем нужен Rx, кроме как для многопоточности. В этой статье я хотел бы рассказать о нашем видении разработки в реактивном стиле и на реальных примерах показать, как он упрощает жизнь.

Наверное, многие видели доклад Джейка Вортона про RxJava2. Всем, кто не смотрел, категорически советую. В нём не столь важно описание того, что умеет RxJava и какие объекты там есть. В этом докладе Вортон показал проблему нереактивных подходов к разработке — неочевидность потоков обработки данных, отсутствие чёткого и однонаправленного пути, по которому идут события, начиная от пользовательского ввода и заканчивая изменением интерфейса, которое повлекли действия пользователя.

И эта проблема очень сильно прослеживается в MVP. То, что Presenter и View имеют ссылки друг на друга, ставит крест на однонаправленности данных; начинаются споры о том, насколько View должна быть пассивной, а в это время дебаггером приходится гулять по коду от одного метода к другому, чтобы отследить цепочку действий. Эту проблему начинают решать MVVM и подобные подходы, и мы пошли по их пути.

Команда Android-разработки FunCorp вдохновилась докладом Вортона и решила попробовать написать абсолютно всё на Rx. Изначально такой цели не было, но по ходу дела мы поняли, что использование реактивного подхода в тех местах, где он кажется очень странным, сильно упрощает жизнь и делает код очевиднее.

Первое, что мы сделали, это подключили RxBinding — теперь наши View умеют в Rx. Это начало любого действия в приложении, то есть начало цепочки обработки данных.

implementation "com.jakewharton.rxbinding3:rxbinding-core:$rx_binding_version"

Мы решили, что View будет максимально пассивной и не будет самостоятельно инициировать никаких действий.

У View есть два типа методов:


  • изменения какого-либо свойства;
  • наблюдатель на событие пользовательского ввода.
class AuthFullRegistrationView @Inject constructor(): BaseArchView() {

   fun doneClick(): Observable = viewHolder.doneBtn.clicks()
   fun loginClick(): Observable = viewHolder.loginBtn.clicks()
   fun nicknameText(): Observable = viewHolder.nickname.textChanges()
   fun passwordText(): Observable = viewHolder.password.textChanges()
   fun emailText(): Observable = viewHolder.email.textChanges()

   fun setNickname(nickname: String?) {
       viewHolder.nickname.setText(nickname)
   }

   fun setNicknameError(enabled: Boolean, text: String? = null) {
       viewHolder.nicknameRegistrationView.error = text
       viewHolder.nicknameRegistrationView.isErrorEnabled = enabled
   }

   fun setEmailError(enabled: Boolean, text: String? = null) {
       viewHolder.emailRegistrationView.error = text
       viewHolder.emailRegistrationView.isErrorEnabled = enabled
   }

   fun setPasswordError(enabled: Boolean, text: String? = null) {
       viewHolder.passwordRegistrationView.error = text
       viewHolder.passwordRegistrationView.isErrorEnabled = enabled
   }
}


sqw7cninq_3fkepdpjtygw9hj6m.jpeg

Такие View становятся максимально переиспользуемыми, так как ни от кого не зависят (чего нельзя сказать о View в MVP).

Бизнес-логика и пользовательские действия соединяются в Presenter. Presenter подписывает изменение доменной модели на события от View. В onNext/OnError View обновляется исходя из итогового состояния модели. В самом простом варианте это выглядит так:

archView.loginClick()
   .subscribe { authNavigationController.goToLogin(LoginParams()) }

Сразу встаёт вопрос о том, как отписывать все эти цепочки действий при уходе с экрана. Он решается через CompositeDisposable и экстеншен-функцию.

abstract class SimpleArchPresenter {

   var args: Any? = null
   var state: Bundle? = null

   val compositeDisposable = CompositeDisposable()

   @CallSuper
   open fun bindIntents(archView: V) {
   }

   @CallSuper
   open fun unbindIntents() {
       compositeDisposable.clear()
   }
}
fun Disposable.addToDisposable(compositeDisposable: CompositeDisposable): Disposable {
   compositeDisposable.add(this)
   return this
}

Теперь наша цепочка приобретает следующий вид:

archView.loginClick()
   .subscribe { authNavigationController.goToLogin(LoginParams()) }
   .addToDisposable(compositeDisposable)

Появляются более сложные сценарии обработки. Например, регистрация пользователя:


  • клик по кнопке Sign in;
  • считываем имя пользователя и пароль из полей ввода
  • запрос к API;
  • в случае успеха переходим на следующий экран или показываем ошибку.
private fun bindRegistrationFlow(archView: AuthFullRegistrationView) {
   val handleFields = Observable
       .zip(archView.nicknameText(), archView.emailText(), archView.passwordText(),
           Function3{ nicknameText: CharSequence, emailText: CharSequence, passwordText: CharSequence ->
               FullRegistrationInteractor.RegisterFields(nicknameText, passwordText, emailText)
           })
       .take(1)

   archView.doneClick()
       .flatMap { handleFields }
       .flatMap {
           fullRegistrationInteractor.registration(it)
                   .subscribeOn(Schedulers.io())
       }
       .safeResponseSubscribe({ authNavigationController.goToVerifyNewEmail() },
                              { handleError(archView, it) })
       .addToDisposable(compositeDisposable)
}

Далее мы понимаем, что пока идёт запрос в сеть, нам нужно показать индикатор прогресса. В классическом варианте показ и скрытие диалога находятся в разных местах и разбивают цепочку действий.

У нас есть следующий экстеншен:

/**
* if we hide progress with delay and after that this observable is completed, doOnDispose not
* called and we dont reset delayed progress
*/
fun  Observable.withProgress(progress: ProgressDelegate, action: (data: E) -> Observable): Observable {
  return this.flatMap {
     action.invoke(it)
           .doOnSubscribe { progress.show() }
           .observeOn(AndroidSchedulers.mainThread())
           .doFinally {
              progress.hide()
           }
  }
}
interface ProgressDelegate {

  fun show()
  fun hide()
}

Внутрь функции передаётся Observable, который оборачивается в показ прогресса на onSubscribe и его скрытие на onFinally.

Дефолтная реализация прогресса представлена ниже.


ProgressDialogManager
class ProgressDialogManager constructor(activity: Activity, defaultMessage: CharSequence) : ProgressDelegate {

  private val progressDialog = ProgressDialog(activity)

  init {
     progressDialog.setCancelable(false)
     progressDialog.setMessage(defaultMessage)
  }

  override fun show() {
     if (progressDialog.isShowing) return
     progressDialog.show()
  }

  override fun hide() {
     if (!progressDialog.isShowing) return
     progressDialog.dismiss()
  }
}


yvghuuq11dzg4vicfu_bciifkci.jpeg

Но и также есть реализации ProgressDelegate, добавляющие в RecycleView в конец списка элемент с прогрессом:

private class StatisticsServersListProgressDelegate(private val statisticsServersAdapter: StatisticsServersAdapter) : ProgressDelegate {

  override fun show() {
     with(statisticsServersAdapter) {
        transaction {
           clear()
           add(ArchAdapterItem(PROGRESS_VIEW_TYPE.toString(),
                               SimpleProgressItemData(isFooter = false),
                               PROGRESS_VIEW_TYPE))
        }
     }
  }

  override fun hide() {
     statisticsServersAdapter.removeItem(PROGRESS_VIEW_TYPE.toString())
  }
}

inp2fjjikvdkh5obuf1aykwnvz0.jpegar_ubexgxpdebiq-2bmmz8z5zca.jpegpo5p9h-ndovwnwdcdq-kwvdae5q.jpeg

Или, например, экстеншен, добавляющий задержку на показ любой реализации ProgressDelegate, позволяющий убрать мелькания диалогов на коротких действиях.


DelayedProgressDelegate
class DelayedProgressDelegate internal constructor(private val progressDelegate: ProgressDelegate,
                                                  private val delayTimeMillis: Long = SHOW_DELAY_MILLIS) : ProgressDelegate {

  companion object {
     private const val SHOW_DELAY_MILLIS = 800L
  }

  enum class ProgressState {
     SHOW,
     HIDE
  }

  private val stateChangeListener = object : StateMachine.StateChangeListener {
     override fun onStateChanged(oldState: ProgressState, newState: ProgressState) {
        when (newState) {
           ProgressState.SHOW -> progressDelegate.show()
           ProgressState.HIDE -> progressDelegate.hide()
        }
     }
  }

  private val stateMachine = StateMachine(ProgressState.HIDE, stateChangeListener)

  override fun show() {
     stateMachine.gotoState(ProgressState.SHOW, delayTimeMillis, true)
  }

  override fun hide() {
     stateMachine.gotoState(ProgressState.HIDE, 0, true)
  }

  fun reset() {
     stateMachine.clear()
  }
}

Этот экстеншен можно использовать с абсолютно любыми реализациями диалогов.

В итоге вышеописанный пример принимает следующий вид:

private fun bindRegistrationFlow(archView: AuthFullRegistrationView) {
   val handleFields = Observable
       .zip(archView.nicknameText(), archView.emailText(), archView.passwordText(),
           Function3{ nicknameText: CharSequence, emailText: CharSequence, passwordText: CharSequence ->
               FullRegistrationInteractor.RegisterFields(nicknameText, passwordText, emailText)
           })
       .take(1)

   archView.doneClick()
       .flatMap { handleFields }
       .withProgress(progressDialogManager.delayed())  {  // flatMap -> withProgress
           fullRegistrationInteractor.registration(it)
                   .subscribeOn(Schedulers.io())
       }
       .safeResponseSubscribe({ authNavigationController.goToVerifyNewEmail(VerifyEmailParams(archView.getEmailText())) },
                              { handleError(archView, it) })
       .addToDisposable(compositeDisposable)
}

В этот момент мы поняли, что любые диалоги проще показывать через Rx.

fun createSimpleDialog(@StringRes messageId: Int, @StringRes positiveTitle: Int, @StringRes negativeTitle: Int,
                      postCreateDialogAction: (dialog: AlertDialog) -> Unit = {}): Observable {
  val result = BehaviorSubject.create()
  val dialog = AlertDialog.Builder(activity, styleId)
        .setMessage(messageId)
        .setPositiveButton(positiveTitle) { _, _ -> result.onNext(true) }
        .setNegativeButton(negativeTitle) { _, _ -> result.onNext(false) }
        .create()
  dialog.setOnDismissListener { result.onComplete() }
  return result.doOnSubscribe {
     dialog.show()
     postCreateDialogAction.invoke(dialog)
  }
        .subscribeOn(AndroidSchedulers.mainThread())
        .doOnDispose { dialog.dismiss() }
}

Вся цепочка удаления сервера принимает следующий вид:

private fun bindDeleteMenuItem(archView: CurrentServerArchView, server: ServerEntity) {
   archView.serverMenuDeleteClicks()
           .concatMap {
               alertDialogRxFactory.createSimpleDialog(R.string.delete_server_alert_message,
                                                       R.string.common_yes,
                                                       R.string.common_no)
           }
           .filter { it }
           .withProgress(progressDialogManager.delayed()) {
               deleteServerInteractor.deleteServer(server.id)
                       .subscribeOn(Schedulers.io())
           }
           .safeResponseSubscribe(onError = {
               errorDialogProvider.showModelError(it)
           })
           .addToDisposable(serverActions)
}

archView.serverMenuDeleteClicks — это клик по элементу меню, который тоже обрабатывается реактивно.


ActionMenuArchView
class ActionMenuArchView @Inject constructor(private val activity: Activity) : BaseArchView() {

  val items: List get() = menuView.menu.items()
  val menuClicks: Observable by lazy { menuView.menuClicks().share() }

  private val menuView: ActionMenuView get() = viewHolder.view as ActionMenuView

  fun inflateMenu(@MenuRes menu: Int): List {
     menuView.menu.clear()
     activity.menuInflater.inflate(menu, menuView.menu)
     return items
  }

  fun itemClicks(@IdRes itemId: Int): Observable =
        menuClicks.filter { it.itemId == itemId }
}

Для примера, показ диалогов с календарём:

fun openCalendar(activity: Activity): Observable {
   val result = BehaviorSubject.create()
   val dateCalendar = Calendar.getInstance()
   dateCalendar.timeInMillis = System.currentTimeMillis()

   val dialog = DatePickerDialog(activity,
       DatePickerDialog.OnDateSetListener { _, year, month, dayOfMonth ->
           val calendar = Calendar.getInstance()
           calendar.set(year, month, dayOfMonth)
           result.onNext(calendar.time)
           result.onComplete()
       },dateCalendar.get(Calendar.YEAR),
       dateCalendar.get(Calendar.MONTH),
       dateCalendar.get(Calendar.DAY_OF_MONTH)
   )
   dialog.setOnDismissListener { result.onComplete() }
   dialog.show()
   return result.doOnDispose { dialog.dismiss() }
}

Или выбором пола:

fun openSexChooser(activity: Activity): Observable {
   val result = BehaviorSubject.create()
   val items = arrayOf("Male", "Female")
   val builder = AlertDialog.Builder(activity)
   builder.setItems(items) { _, which ->
       result.onNext(items[which])
       result.onComplete()
   }

   val alertDialog = builder.create()
   alertDialog.setOnDismissListener { result.onComplete() }
   alertDialog.show()
   return result.doOnDispose { alertDialog.dismiss() }
}


Правильнее показывать диалог в doOnSubscribe, но никто не идеален:)

Почему это очень удобно?


  • диалог полностью отвязывается от жизненного цикла;
  • диалог полностью отвязывается от действий, которые он инициирует, и его можно легко переиспользовать;
  • диалог остаётся частью цепочки действий, а не разбивает её на «до» и «после» и не теряется контекст действия;
  • как результат предыдущего пункта — не падает читаемость кода.

Жизненный цикл

Далее можно обернуть вызовы методов жизненного цикла. Зачем? Например, чтобы повторить поведение, аналогичное LiveData, т.е. чтобы не обновлять UI между onPause и onResume.

@ActivityScope
class ActivityLifecycleDispatcher @Inject constructor() {
   private val isResumedSubject = BehaviorSubject.create()

   fun isResumed(): Observable = isResumedSubject

   fun onResumed(isResumed: Boolean) {
       isResumedSubject.onNext(isResumed)
   }
}
private fun startServers() {
  updateServersDisposable?.dispose()
  updateServersDisposable = visibleScreen()
        .flatMap { serversInteractor.load().repeatWhen { it.delay(UPDATING_PERIOD_SECONDS, TimeUnit.SECONDS) } }
        .subscribeOn(Schedulers.io())
        .subscribe()
}

private fun visibleScreen() = activityLifecycleDispatcher.isResumed().filter { it }.take(1)

Здесь мы периодически обновляем список серверов, но только если экран виден пользователю.

Таким же образом можно обрабатывать onNewIntent.

@ActivityScope
class ActivityResultDispatcher @Inject constructor() {
   private val onActivityResultSubject = PublishSubject.create()

   fun onActivityResult(requestCode: Int): Observable = onActivityResultSubject.filter { it.requestCode == requestCode }

   fun handleOnActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
       onActivityResultSubject.onNext(OnActivityResultInfo(requestCode, resultCode, data))
   }
}

data class OnActivityResultInfo constructor(val requestCode: Int, val resultCode: Int, val data: Intent?)

Например, цепочка действий при возврате в галерею после создания нового канала:

activityResultDispatcher.onActivityResult(ServerChannelsListPresenter.CREATE_CHANNEL_REQUEST_CODE)
     .filter { it.resultCode == Activity.RESULT_OK }
     .map { it.data!!.extras!!.revealNavigationParams()!! }
     .flatMap {
        getChannelInteractor.get(it.channelId)
              .take(1)
              .subscribeOn(Schedulers.io())
     }
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe {
        if (it.data != null) {
           galleryNavigator.openChannel(it.requireData)
           mainMenuVisibilityController.closeImmediately()
        }
     }
     .addToDisposable(compositeDisposable)


  • получили необходимый onActivityResult;
  • развернули параметры из интента;
  • загрузили ожидаемый канал;
  • на главном потоке открыли этот канал и скрыли меню.

Аналогично мы обрабатываем запросы разрешений, onNewIntent, остальные методы жизненного цикла, появление клавиатуры, клик по клавише громкости и прочие системные события.

Для обработки ошибок сети можно написать механизм повторения запросов при ошибках либо ожидания сети:

fun  Observable>.repeatOnNetwork(networkController: NetworkController): Observable> {
    return this
        .flatMap {
            if (it.exception is ServerException) {
                Observable.just(it)
            }
            Observable.error>(Exception())
        }
        .retryWhen {
            it.flatMap { networkController.networkState }
        }
}

Работа с плеером

RxBinding не поддерживает методы ExoPlayer, поэтому пришлось написать пару десятков экстеншенов, чтобы поддержать работу с плеером в таком же стиле:

archView.renderFirstFrame()
     .take(1)
     .subscribe { trackViewed() }
     .addToDisposable(contentDisposable)

archView.playWhenReadyState()
     .subscribe { onChangePlayWhenReadyState(it) }
     .addToDisposable(contentDisposable)

archView.videoSize()
     .take(1)
     .subscribe { archView.setSize(it.height, it.width) }
     .addToDisposable(contentDisposable)

archView.playerState()
     .distinctUntilChanged()
     .subscribe { onChangePlayerState(it, archView) }
     .addToDisposable(contentDisposable)


SimpleExoPlayer.renderFirstFrame
fun SimpleExoPlayer.renderFirstFrame(): Observable = ExoPlayerRenderFirstFrameObservable(this)

private class ExoPlayerRenderFirstFrameObservable(private val view: SimpleExoPlayer) : Observable() {

   override fun subscribeActual(observer: Observer) {
       val listener = Listener(observer)

       val disposable = object : MainThreadDisposable() {
           override fun onDispose() {
               view.removeVideoListener(listener)
           }
       }
       observer.onSubscribe(disposable)
       view.addVideoListener(listener)
   }

   private class Listener(private val observer: Observer) : VideoListener {
       override fun onRenderedFirstFrame() {
           observer.onNext(Unit)
       }
   }
}

Аналогично мы написали все остальные экстеншены для ExoPlayer.

Работа с RecycleView

Адаптер, который мы используем у себя в проектах:


ArchRecyclerViewAdapter
abstract class ArchRecyclerViewAdapter constructor(diffExecutor: Executor,
                                                  private val componentsFactory: ArchRecycleComponentsFactory) : RecyclerView.Adapter() {

  /** this items are update after diff is applied. They may be used when you items adapter is currently showing */
  val adapterItems: List> get() = differ.currentList
  /** this items are updated on [update] call. They represent the future state of this adapter and may be used as a cache for further updates*/
  var dataItems: List> = emptyList()
     private set

  private val differ = AsyncListDifferFactory(this, diffExecutor).create()

  fun update(newList: List>?) {
     dataItems = newList ?: emptyList()
     differ.submitList(newList)
  }

  @Suppress("unused")
  fun transaction(transaction: MutableList>.() -> Unit) {
     update(dataItems.toMutableList().apply(transaction))
  }

  override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ArchRecycleViewHolder {
     val view = componentsFactory.inflateView(viewType, parent)
     return componentsFactory.createViewHolder(viewType, view)!!
  }

  override fun onBindViewHolder(holder: ArchRecycleViewHolder, position: Int) {
     getBinder(holder).bind(holder, adapterItems[position].data)
  }

  override fun onViewRecycled(holder: ArchRecycleViewHolder) {
     super.onViewRecycled(holder)
     getBinder(holder).unbind(holder)
  }

  override fun onViewDetachedFromWindow(holder: ArchRecycleViewHolder) {
     super.onViewDetachedFromWindow(holder)
     getBinder(holder).detach(holder)
  }

  override fun onViewAttachedToWindow(holder: ArchRecycleViewHolder) {
     super.onViewAttachedToWindow(holder)
     getBinder(holder).attach(holder)
  }

  override fun getItemViewType(position: Int) = adapterItems[position].viewType
  override fun getItemCount() = adapterItems.size

  private fun getBinder(holder: ArchRecycleViewHolder) = componentsFactory.createViewBinder(holder.itemViewType) as ArchRecycleViewBinder
}

interface ArchRecycleViewBinder {
   fun bind(holder: V, data: D)
   fun unbind(holder: V) {}

   fun attach(holder: V) {}
   fun detach(holder: V) {}
}

Если коротко, то есть ArchRecycleViewBinder, который в методе bind связывает данные и холдер. На каждый тип элемента имеется один общий на элементы этого типа биндер. Он возвращает действия пользователя, которые инициируются внутри холдеров адаптера. View проксирует эти методы в Presenter.

class ServerMemberViewBinder @Inject constructor() : ArchRecycleViewBinder {

   private val memberClickSubject = BehaviorSubject.create()
   private val roleMenuClickSubject = BehaviorSubject.create()
   private var roleSettingsEnable: Boolean = false

   fun memberClicks(): Observable = memberClickSubject
   fun roleMenuClicks(): Observable = roleMenuClickSubject

   fun setRoleSettingsEnable(enable: Boolean) {
       roleSettingsEnable = enable
   }

   override fun bind(holder: ServerMemberViewHolder, data: ServerMemberItemData) {
       val serverMember = data.serverMember

       ViewUtils.setViewVisibility(holder.moreMenu,
                                   roleSettingsEnable && serverMember.role != ServerRole.OWNER)

       holder.itemView.clicks()
               .map { serverMember }
               .subscribe(memberClickSubject)
       holder.moreMenu.clicks()
               .map { serverMember }
               .subscribe(roleMenuClickSubject)
   }
}


f60zhc-lvyiilxdgw0qmrqcgtzm.jpeg

Вывод

В подобном стиле у нас написано ещё множество других частей приложения: обработка диплинков, работа с редактированием изображений, туториалы по ходу работы приложения, шаринг, пагинация и т.д. Самое удивительное, что порог и скорость вхождения в подобную архитектуру остаются довольно высокими. Большая часть задач решается минимальным количеством операторов, а, например, Room уже из коробки поддерживает Rx, и достаточно просто подписаться на данные, чтобы получать уведомления об их изменении. Довольно быстро разработчик перестраивается на такой стиль написания кода и начинает рассматривать любое действие в реактивном стиле, код становится читабельнее, а писать его быстрее.

Буду искренне рад любому фибдеку и комментариям об уместности использования реактивного подхода в разных слоях архитектуры приложения.

© Habrahabr.ru