[Перевод] Грокаем RxJava, часть третья: Реактивность с пользой
В первой части мы прошлись по основам RxJava. Во второй части я показал вам потенциал операторов. Но, быть может, всего показанного мною всё ещё недостаточно для того, чтобы убедить вас. В таком случае я покажу вам ещё несколько полезностей RxJava, которые должны стать решающим аргументом в мою пользу.
Обработка ошибок
До настоящего момента мы полностью игнорировали такие методы Observable
, как onComplete()
и onError()
. Данные методы вызываются в момент, когда Observable
прекращает порождать новые данные — либо потому, что ему нечего больше порождать, либо потому, что произошла ошибка.
Самый первый наш Subscriber
следил за onCompleted()
и onError()
. Давайте сделаем что-нибудь полезное в этих точках:
Observable.just("Hello, world!")
.map(s -> potentialException(s))
.map(s -> anotherPotentialException(s))
.subscribe(new Subscriber() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { System.out.println("Completed!"); }
@Override
public void onError(Throwable e) { System.out.println("Ouch!"); }
});
Положим, что potentialException()
и anotherPotentialException()
могут выбрасывать исключения во время работы. Каждый Observable
завершает своё выполнение вызовом onCompleted()
или onError
. В таком случае, вывод программы будет либо строкой, за которой следует «Completed!», либо вывод будет состоять из одного-единственного «Ouch!» (потому что было выброшено исключение).
Таким образом, у нас есть несколько выводов:
onError()
вызывается вне зависимости от того, когда было выброшено исключение.
Благодаря этому, обработка ошибок становится очень простой: можно просто обрабатывать каждую возникающую ошибку в одной-единственной функции, находящейся в самом конце.- Операторы не обязаны обрабатывать исключения.
Обработка ошибок, возникающих в любом месте цепочкиObservables
становится задачейSubscriber
, т.к. каждое исключение следует напрямую вonError()
. - Вы всегда знаете, когда
Subscriber
прекратил получать новые элементы.
Знание момента завершения работы помогает вам писать более последовательный код (хотя может произойти и так, чтоObservable
никогда не завершит своё выполнение).
Я считаю подобный подход к обработке ошибок гораздо более простым, в сравнении с традиционным подходом. Если вы пишете код с функциями обратного вызова, то обработка ошибок должна происходить в каждой из них. Это не просто ведёт к тому, что ваш код начинает повторяться во многих местах, но ещё и к тому, что каждая функция обратного вызова теперь должна знать, как ей обрабатывать ошибки, то есть она становится сильно связанной с тем, кто её вызывает.
В случае с RxJava, Observable
не должен даже знать о том, что ему делать с ошибками! Это относится и к операторам: они не будут выполняться, если на каком-то из предыдущих этапов у нас произошла критическая ошибка. Вся обработка ошибок находится в Subscriber
.
Планировщики
У вас есть Android приложение, которое делает запрос к сети. Запрос может продлиться долго, поэтому вы выносите его в другой поток. Не успеете и оглянуться, как у вас есть проблемы.
Многопоточные Android приложения сложны в написании потому, что вам нужно убедиться, что вы запускаете правильный код в правильном потоке; перепутаете что-нибудь, и приложение упадёт. Классический пример — исключение, которое падает в ответ на вашу попытку модифицировать состояние View
не из главного потока.
В RxJava можно легко указать, в каком потоке должны запускаться ваши Observer
и Subscriber
, воспользовавшись, соответственно, subscribeOn()
и observeOn()
:
myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
Просто, правда? Всё, что выполняется до Subscriber
, выполняется в отдельном I/O потоке, а манипуляции с View
работают уже в главном потоке1.
Интересно здесь то, что subscribeOn()
и observeOn()
могут быть вызваны на любом Observable
, так как они всего-навсего операторы. Не нужно беспокоиться о том, что делает наш Observable()
, или следующие за ним операторы — можно просто добавить subscribeOn()
и observeOn()
в самом конце, для того, чтобы раскидать выполнение задач по нужным потокам.
Если мы пользуемся AsyncTask
, или чем-то подобным, нам нужно писать код с учётом того, какие его части должны выполняться параллельно. В случае с RxJava мы просто пишем код —, а потом указываем, где нам его выполнять2.
Подписки
Есть один момент, который я до сих пор прятал от вас. Когда вы вызываете Observable.subscribe()
, вам в ответ возвращается объект класса Subscription
, который представляет собой связь между вашими Observable
и Subscriber
:
Subscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));
В дальнейшем можно использовать полученный нами Subscription
для того, чтобы прекратить подписку:
subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Выводит "Unsubscribed=true"
Когда мы отменяем подписку, RxJava останавливает всю написанную нами цепочку, то есть, иными словами, если у вас написана разухабистая цепочка преобразований, состоящая из множества операторов, unsubscribe
остановит выполнение вне зависимости от того, какой код сейчас выполняется.3 Ничего больше не требуется.
Заключение
Помните, что эта серия статей является всего лишь введением в RxJava. Есть много интересного и сложного материала, в сравнении с которым моё введение покажется вам цветочками (не верите — почитайте про backpressure). Я бы не стал писать реактивный код повсюду, а приберёг бы его для более сложных участков кода, которые мне хотелось бы переписать в простом и понятном виде.
Изначально я думал, что трех статей будет достаточно, но многие просили меня показать какие-нибудь практические примеры применения RxJava в Android, поэтому скоро будет готова ещё одна часть. Я надеюсь, что моего введения было достаточно, чтобы убедить вас попробовать этот замечательный фреймворк. Если хотите погрузиться в изучение глубже, я рекомендую почитать официальную wiki-страничку. И помните: невозможного не существует.
Большое спасибо всем людям, помогшим мне вычитать эту статью на предмет ошибок и неточностей: Matthias Käppler, Matthew Wear, Ulysses Popple, Hamid Palo и Joel Drotos (на которого стоит взглянуть уже только из-за его бороды).
1 Это одна из причин, по которым Subscriber
стоит делать как можно более легковесными: чтобы не блокировать главный поток более необходимого.
2 Иногда, правда от использования observeOn()
и subscribeOn()
можно воздержаться. Например, даже если Observable
обещает работать долго, а Subscriber
при этом будет выполняться на I/O потоке, то нет причин перекидывать последний на новый поток.
3 В первой части я заметил, что Observable.just()
— это не то же самое, что и самописный Observable
, вызывающий onNext()
и onCompleted()
. И дело тут в подписках: в случае с Observable.just()
перед вызовом onNext()
происходит проверка того, является ли Subscriber
все ещё подписанным, или нет.