[Перевод] Грокаем RxJava, часть четвертая: Реактивный Android

В первой, второй и третьей частях я объяснил в общих чертах устройство RxJava. Вы можете подумать: «Прекрасно, но как всё это сделать полезным для меня, как для разработчика под Android?» В заключительной части статьи я приведу некоторую информацию, практичную именно для вас.

RxAndroid


RxAndroid — это расширение RxJava, написанное специально для Android, которое включает в себя специальные обвязки вокруг RxJava, делающие вашу жизнь проще.

Во-первых, здесь есть класс AndroidSchedulers, предоставляющий готовые планировщики для потоков, специфичных для Android. Нужно запустить код на UI потоке? Без проблем — воспользуйтесь AndroidSchedulers.mainThread():

retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));


Если у вас есть ваш собственный Handler, вы можете создать связанный с ним планировщик с помощью HandlerThreadScheduler1.

Во-вторых, у нас есть AndroidObservable, предоставляющий возможности по работе с жизненным циклом некоторых классов из Android SDK. В нем есть операторы bindActivity()() и bindFragment(), которые не только автоматически используют для наблюдения AndroidSchedulers.mainThread(), но ещё и перестанут порождать данные когда ваши Activity или Fragment начнут завершать свою работу (таким образом вы не попадёте впросак, попытавшись изменить их состояние тогда, когда делать этого уже нельзя).

AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));


Ещё мне нравится AndroidObservable.fromBroadcast(), позволяющий вам создавать Observable, который работает как BroadcastReceiver. Вот так, например, можно получить уведомление в момент изменения состояния сети:

IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));


Ну и наконец, здесь есть ViewObservable, добавляющий привязки к View. Он, помимо прочего, содержит операторы ViewObservable.clicks(), если вы хотите получать уведомление всякий раз, когда происходит нажатие по View, и ViewObservable.text(), срабатывающий всякий раз когда TextView изменяет своё содержимое.

ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));

Retrofit


Существует такая примечательная библиотека, поддерживающая RxJava, как Retrofit, популярный REST клиент для Android. Обычно, когда вы определяете в ней асинхронный метод, вы используете Callback:

@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback cb);


Но, если вы пользуетесь RxJava, вы вместо этого можете возвращать нашего друга Observable:

@GET("/user/{id}/photo")
Observable getUserPhoto(@Path("id") int id);


После этого вы можете использовать Observable как только вы пожелаете, можно будет не только получить из него данные, но и трансформировать их на лету!

Поддержка Observable, включенная в Retrofit, также упрощает комбинирование нескольких REST запросов вместе. Например, у нас есть два метода api, первый возвращает фото, а второй — его метаданные. Мы можем собрать результаты выполнения этих запросов вместе:

Observable.zip(
    service.getUserPhoto(id),
    service.getPhotoMetadata(id),
    (photo, metadata) -> createPhotoWithData(photo, metadata))
    .subscribe(photoWithData -> showPhoto(photoWithData));


Я показывал нечто похожее во второй части (используя flatMap()). Сейчас я хотел показать насколько легко собрать несколько REST запросов в один, воспользовавшись связкой RxJava+Retrofit.

Старый, медленный код


То, что Retrofit умеет возвращать Observables, здорово, но что если у вас есть другая библиотечка, которая ни сном ни духом про них не слышала? Или у вас есть какой-то старый код, который вы хотели бы изменить без особых трудозатрат так, чтобы он умел работать с Observable. Проще говоря, как вам соединить старый код с новым без того чтобы переписывать всё подряд?

Чаще всего вам будет достаточно использовать Observable.just() и Observable.from():

private Object oldMethod() { ... }

public Observable newMethod() {
    return Observable.just(oldMethod());
}


Это сработает замечательно, если oldMethod() выполняется быстро, но что если это не так? Вы заблокируете весь поток, потому что сначала будет вызван oldMethod(), а уж потом его результат будет передан в Observable.just().
Чтобы обойти эту проблему, можно воспользоваться следующим трюком (которым я пользуюсь постоянно): обернуть медленный код в Observable.defer():

private Object slowBlockingMethod() { ... }

public Observable newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}


Теперь, полученный вами Observable не будет вызывать slowBlockingMethod() до тех пор, пока вы не подпишитесь на него.

Жизненный цикл


Самую сложную часть я оставил напоследок. Как нам учитывать жизненный цикл Activity, работая с RxJava? Есть пара проблем, которые дают о себе знать снова и снова:

  1. Возобновление подписки после смены конфигурации.
    Например, вы делаете REST запрос с Retrofit, и хотите отобразить его результаты в ListView. Что если во время выполнения запроса пользователь повернет телефон? Надо бы возобновить выполнение запроса, но как?
  2. Утечки памяти, вызванные Observables, которые удерживают ссылку на Context.
    Эта проблема вызывается созданием подписки, которая каким-то образом удерживает ссылку на Context (что не так уж и сложно, если вы работаете со Views!) Если Observable не завершит свою работу вовремя, в какой-то момент вы обнаружите, что вы никак не можете освободить большое количество памяти.


К сожалению, серебряных пуль тут нет, но есть некоторые методики, которые могут упростить вашу жизнь.
С первой проблемой можно справиться, используя встроенные в RxJava механизмы кеширования, которые позволяют подписываться на/отписываться от одного и того же Observable, без повторения его работы. В частности, cache() (или replay()) продолжат выполнявшийся ранее запрос, даже если вы успели отписаться. Это означает, что вы можете продолжить работу после пересоздания Activity:

Observable request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...Когда Activity пересоздаётся...
sub.unsubscribe();

// ...Как только Activity была пересоздана...
request.subscribe(photo -> handleUserPhoto(photo));


Заметьте, что мы используем тот же самый закешированный request в обоих случаях; таким образом, выполняемый им запрос будет выполнен только один раз. Где вы сохраните ваш request, решать вам, но, как и в случае со всеми решениями, связанными с жизненным циклом, это должно быть место, которое переживает изменения, порожденные жизненным циклом (retained fragment, синглетон, и т.д.)
Вторая проблема решается правильным отписыванием от подписок в соответствии с жизненным циклом. Общим решением является использование CompositeSubscription для хранения всех ваших подписок, и отписывание от них всех в onDestroy() или в onDestroyView():

private CompositeSubscription mCompositeSubscription  = new CompositeSubscription();

private void doSomething() {
    mCompositeSubscription.add(
        AndroidObservable.bindActivity(this, Observable.just("Hello, World!"))
        .subscribe(s -> System.out.println(s)));
}

@Override
protected void onDestroy() {
    super.onDestroy();

    mCompositeSubscription.unsubscribe();
}


Чтобы упростить себе жизнь, вы можете создать базовую Activity/Fragment, содержащие в себе CompositeSubscription, через которую впоследствии вы будете сохранять все ваши подписки, и которая будет автоматически очищаться.

Внимание! Как только вы вызвали CompositeSubscription.unsubscribe(), этот экземпляр CompositeSubscription перестанет быть доступным для использования (то есть добавлять к нему подписки вы, конечно, сможете, но он будет тут же автоматом вызывать на них unsubscribe())! Если вы хотите в дальнейшем продолжать использовать CompositeSubscription, вам придётся создать новый экземпляр.

Для решения обеих этих проблем мы вынуждены писать дополнительный код, и потому я очень надеюсь, что в один прекрасный день к нам спустится с небес долгожданный гений, и найдёт способ всё это упростить.

Заключение?


RxJava является относительно новой технологией, а для Android так и подавно, поэтому заключения для Android пока что не будет. RxAndroid находится в стадии активной разработки, а мы (Android-программисты) до сих пор пытаемся разобраться с тем, как делать хорошо, а как — плохо; общепризнанных примеров отличного применения связки RxJava+RxAndroid пока что нет, и я бьюсь об заклад, что спустя год некоторые из советов, которые я вам тут понадавал, будут считаться довольно эксцентричными.

Пока же я нахожу, что RxJava не только упрощает процесс написания кода, но и делает его чуть более интересным. Если вы всё ещё мне не верите, давайте встретимся как-нибудь и поболтаем об этом за кружечкой пива.

Я благодарю Matthias Kay ещё раз за его неоценимую помощь в подготовке этой статьи, и призываю всех присоединиться к нему, чтобы сделать RxAndroid ещё круче!


1AndroidSchedulers.mainThread() использует внутри себя HandlerThreadScheduler.

Прим. переводчика: я благодарю пользователя Artem_zin за помощь в переводе пары мест, вызвавших у меня сложности; если бы не он и его обширные знания RxJava, застрял бы я надолго.

© Habrahabr.ru