[Из песочницы] Rx. Постигаем retryWhen и repeatWhen на примерах из Android разработки

habralogo.jpg
В сети очень много русско- и англоязычных статей по Rx операторам retryWhen и repeatWhen.
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).

Приведу несколько как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.

В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.

Императивный способ перезапуска цепи


Предположим, мы используем Retrofit и загрузку начинаем в методе load (). Repository.getSomething () возвращает Single().
@NonNull
private Subscription loadingSubscription = Subscriptions.unsubscribed();

private void load() {
    subscription.unsubscribe();
    subscription = repository
             .getSomething()
             .subscribe(result -> {}, err -> {});
}

private void update() {
    load();
}

Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update (), который, в свою очередь, вызовет метод load (), где с нуля будет создана подписка.

Я предпочитаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen ().

Реактивный способ перезапуска цепи — repeatWhen


Создадим объект PublishSubject updateSubject, и передадим в оператор лямбду
repeatHandler → repeatHandler.flatMap (nothing → updateSubject.asObservable ())
@NonNull
private final PublishSubject updateSubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .repeatWhen(repeatHandler ->
                                repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.
private void update() {
    updateSubject.onNext(null);
}

Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete () сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).

Реактивный способ обработки ошибок retryWhen


Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError () внутри Single, который возвращается методом getUser ().

В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry ().

@NonNull
private final PublishSubject retrySubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .doOnError(err -> showConnectionDialog())
            .retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

private void retry() {
    retrySubject.onNext(null);
}

По вызову retrySubject.onNext (null) вся цепочка выше retryWhen () переподпишется к источнику getUser (), и повторит запрос.

При таком подходе важно помнить, что doOnError () должен находиться выше в цепочке, чем retryWhen (), поскольку последний «поглощает» ошибки до эмита repeatHandler’а.

В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.

В следующем, бессовестно притянутом за уши, примере, в методе load () мы объединяем два источника оператором combineLatest.

Первый источник — repository.getSomething () загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge (), загружает что-то тяжелое из локального хранилища.

public void load() {
    Observable.combineLatest(repository.getSomething(),
                             localStorage.fetchSomethingReallyHuge(),
                             (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

При обработке ошибки императивным способом, вызывая load () на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.

Посмотрим, как будет выглядеть реактивный способ.


public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retrySubject.asObservable())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retrySubject.asObservable())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen () исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.

А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.

Делегирование обработки ошибок


Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:
public class RetryEvent {
}

Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:
interface RetryManager {

    Observable observeRetries(@NonNull Throwable error);

}

Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.

Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:

//pass this through constructor, DI or use singleton (but please don't)
private final RetryManager retryManager;

public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retryManager.observeRetries())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retryManager.observeRetries())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}


Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.

Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen () и retryWhen () в своих проектах.

Комментарии (0)

© Habrahabr.ru