[Перевод] Стандартный Error Handler в RxJava2 или почему RxJava вызывает сбой приложения даже если реализован onError

В переводе статьи пойдёт речь об UndeliverableException в RxJava2 версии 2.0.6 и новее. Если кто-то столкнулся и не может разобраться, или совсем не слышал об этой проблеме — прошу под кат. Побудили к переводу проблемы в production после перехода с RxJava1 на RxJava2. Оригинал был написан 28 декабря 2017, но лучше узнать поздно, чем никогда.

sh49yi0mhl6-tmfir_fdvlkmcny.png

Все мы хорошие разработчики и ловим ошибки в onError, когда используем RxJava. Это значит что мы обезопасили себя от падений приложения, верно?

Нет, не верно.

Ниже мы рассмотрим пару примеров в которых приложение будет падать из-за RxJava, даже если корректно реализован onError.

Базовый обработчик ошибок в RxJava


В роли базового обработчика ошибок в RxJava используется RxJavaPlugins.onError. Он обрабатывает все ошибки, которые не удается доставить до подписчика. По умолчанию, все ошибки отправляются именно в него, поэтому могут возникать критические сбои приложения.
В примечаниях к релизу 2.0.6 данное поведение описано:

Одна из целей дизайна 2.х — отсутсвие потерянных ошибок. Иногда последовательность кончается или отменяется до того, как источник вызывает onError. В данном случае ошибке деться некуда и она направляется в RxJavaPlugins.onError


Если у RxJava нет базового обработчика ошибок — подобные ошибки будут скрыты от нас и разработчики будут находится в неведении относительно потенциальных проблем в коде.

Начиная с версии 2.0.6, RxJavaPlugins.onError пытается быть умнее и разделяет ошибки библиотеки/реализации и ситуации когда ошибку доставить невозможно. Ошибки, отнесенные к категории «багов» вызываются как есть, остальные же оборачиваются в UndeliverableException и после вызываются. Всю эту логику можно посмотеть здесь (методы onError и isBug).

Одна из основных ошибок, с которыми сталкиваются новички в RxJava — OnErrorNotImplementedException. Эта ошибка возникает, если observable вызывает ошибку, а в подписчике не реализован метод onError. Данная ошибка — пример ошибки, которая для базового обработчика ошибок RxJava является «багом» и не оборачивается в UndeliverableException.

UndeliverableException


Поскольку ошибки относящиеся к «багам» легко исправить — не будем на них останавливаться. Ошибки, которые RxJava оборачивает в UndeliverableException, интереснее, так как не всегда может быть очевидно почему же ошибка не может быть доставлена до onError.

Случаи, в которых это может произойти, зависят от того, что конкретно делают источники и подписчики. Примеры рассмотрим ниже, но в общем можно сказать, что такая ошибка возникает, если нет активного подписчика, которому может быть доставлена ошибка.

Пример с zipWith ()


Первый вариант, в котором можно вызвать UndeliverableException — оператор zipWith.

val observable1 = Observable.error(Exception())
val observable2 = Observable.error(Exception())
val zipper = BiFunction { one, two -> "$one - $two" }
observable1.zipWith(observable2, zipper)
        .subscribe(
                { System.out.println(it) },
                { it.printStackTrace() }
        )


Мы объединяем вместе два источника, каждый из которых вызывает ошибку. Чего мы ожидаем? Можем предположить, что onError будет вызван дважды, но это противоречит спецификации Reactive streams.

После единственного вызова терминального события (onError, onCompelete) требуется, чтобы никаких вызовов больше не осуществлялось


Получается, что при единственном вызове onError повторный вызов уже невозможен. Что произойдёт при возникновении в источнике второй ошибки? Она будет доставлена в RxJavaPlugins.onError.

Простой способ попасть в подобную ситуациюю — использовать zip для объединения сетевых вызовов (например, два вызова Retrofit, возвращающие Observable). Если в обоих вызовах возникает ошибка (например, нет интернет соединения) — оба источника вызовут ошибки, первая из которых попадёт в реализацию onError, а вторая будет доставлена базовому обработчику ошибок (RxJavaPlugins.onError).

Пример с ConnectableObservable без подписчиков


ConnectableObservable также может вызвать UndeliverableException. Стоит напомнить, что ConnectableObservable вызывает события независимо от наличия активных подписчиков, достаточно вызвать метод connect(). Если при отсутствии подписчиков в ConnectableObservable возникнет ошибка — она будет доставлена базовому обработчику ошибок.

Вот довольно невинный пример, который может вызвать такую ошибку:

someApi.retrofitCall() // Сетевой вызов с использованием Retrofit
    .publish()
    .connect()


Если someApi.retrofitCall() вызовет ошибку (например, нет подключения к интернету) — приложение упадет, так как сетевая ошибка будет доставлена базовому обработчику ошибок RxJava.

Этот пример кажется выдуманным, но очень легко попасть в ситуацию, когда ConnectableObservable все еще соединен (connected), но подписчиков у него нет. Я столкнулся с этим при использовании autoConnect() при вызове к API. autoConnect() автоматически не отключает Observable. Я отписывался в onStop методе Activity, но результат сетевого вызова возвращался после уничтожения Activity и приложение падало с UndeliverableException.

Обрабатываем ошибки


Итак, что же делать с этими ошибками?

Первый шаг — посмотреть на возникающие ошибки и попытаться определить что их вызывает. Идеально, если вам удастся исправить проблему у её источника, чтобы предотвратить передачу ошибки в RxJavaPlugins.onError.

Решение для примера с zipWith — взять один или оба источника и реализовать в них один из методов для перехватыва ошибок. Например, вы можете использовать onErrorReturn для передачи вместо ошибки значения по умолчанию.

Пример с ConnectableObservable исправить проще — просто убедитесь в том, что вы отсоединили Observable в момент, когда последний подписчик отписывается. autoConnect(), к примеру, имеет перегруженную реализацию, которая принимает функцию, отлавливающую момент соединения (больше можно посмотреть здесь).

Другой путь решения проблемы — подменить базовый обработчик ошибок своим собственным. Метод RxJavaPlugins.setErrorHandler(Consumer<Throwable>) поможет вам в этом. Если это подходящее для вас решение — можете перехватывать все ошибки отправленные в RxJavaPlugins.onError и обрабатывать их по своему усмотрению. Это решение может оказаться довольно сложным — помните, что RxJavaPlugins.onError получает ошибки от всех потоков (streams) RxJava в приложении.

Если вы вручную создаете свои Observable, то можете вместо emitter.onError() вызывать emitter.tryOnError(). Этот метод передает ошибку только если поток (stream) не уничтожен (terminated) и имеет подписчиков. Помните, что данный метод экспериментальный.

Мораль данной статьи в том, что вы не можете быть уверены в отсутсвии ошибок при работе с RxJava, если просто реализовали onError в подписчиках. Вы должны быть в курсе ситуаций, в которых ошибки могут оказаться недоступны для подписчиков, и убедиться, что эти ситуации обрабатываются.

© Habrahabr.ru