[Из песочницы] Rx. Постигаем retryWhen и repeatWhen на примерах из Android разработки
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).
Приведу несколько как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.
В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.
Императивный способ перезапуска цепи
Предположим, мы используем Retrofit и загрузку начинаем в методе load (). Repository.getSomething () возвращает Single
@NonNull
private Subscription loadingSubscription = Subscriptions.unsubscribed();
private void load() {
subscription.unsubscribe();
subscription = repository
.getSomething()
.subscribe(result -> {}, err -> {});
}
private void update() {
load();
}
Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update (), который, в свою очередь, вызовет метод load (), где с нуля будет создана подписка.
Я предпочитаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen ().
Реактивный способ перезапуска цепи — repeatWhen
Создадим объект PublishSubject updateSubject, и передадим в оператор лямбду
repeatHandler → repeatHandler.flatMap (nothing → updateSubject.asObservable ())
@NonNull
private final PublishSubject updateSubject = PublishSubject.create();
private void load() {
repository
.getSomething()
.repeatWhen(repeatHandler ->
repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
.subscribe(result -> {}, err -> {});
}
Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.
private void update() {
updateSubject.onNext(null);
}
Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete () сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).
Реактивный способ обработки ошибок retryWhen
Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError () внутри Single, который возвращается методом getUser ().
В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry ().
@NonNull
private final PublishSubject retrySubject = PublishSubject.create();
private void load() {
repository
.getSomething()
.doOnError(err -> showConnectionDialog())
.retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))
.subscribe(result -> {}, err -> {});
}
private void retry() {
retrySubject.onNext(null);
}
По вызову retrySubject.onNext (null) вся цепочка выше retryWhen () переподпишется к источнику getUser (), и повторит запрос.
При таком подходе важно помнить, что doOnError () должен находиться выше в цепочке, чем retryWhen (), поскольку последний «поглощает» ошибки до эмита repeatHandler’а.
В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.
В следующем, бессовестно притянутом за уши, примере, в методе load () мы объединяем два источника оператором combineLatest.
Первый источник — repository.getSomething () загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge (), загружает что-то тяжелое из локального хранилища.
public void load() {
Observable.combineLatest(repository.getSomething(),
localStorage.fetchSomethingReallyHuge(),
(something, hugeObject) -> new Stuff(something, hugeObject))
.subscribe(stuff -> {}, err -> {});
}
При обработке ошибки императивным способом, вызывая load () на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.
Посмотрим, как будет выглядеть реактивный способ.
public void load() {
Observable.combineLatest(
repository.getSomething()
.retryWhen(retryHandler ->
retryHandler.flatMap(
err -> retrySubject.asObservable())),
localStorage.fetchSomethingReallyHuge()
.retryWhen(retryHandler ->
retryHandler.flatMap(
nothing -> retrySubject.asObservable())),
(something, hugeObject) -> new Stuff(something, hugeObject))
.subscribe(stuff -> {}, err -> {});
}
Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen () исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.
А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.
Делегирование обработки ошибок
Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:
public class RetryEvent {
}
Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:
interface RetryManager {
Observable observeRetries(@NonNull Throwable error);
}
Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.
Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:
//pass this through constructor, DI or use singleton (but please don't)
private final RetryManager retryManager;
public void load() {
Observable.combineLatest(
repository.getSomething()
.retryWhen(retryHandler ->
retryHandler.flatMap(
err -> retryManager.observeRetries())),
localStorage.fetchSomethingReallyHuge()
.retryWhen(retryHandler ->
retryHandler.flatMap(
nothing -> retryManager.observeRetries())),
(something, hugeObject) -> new Stuff(something, hugeObject))
.subscribe(stuff -> {}, err -> {});
}
Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.
Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen () и retryWhen () в своих проектах.