RxJava error handling: OnErrorNotImplementedException vs UndeliverableException
I recently spent some time digging into RxJava’s error handling facilities and realized that throwables forwarded to RxJavaPlugins.setErrorHandler() are sometimes wrapped in OnErrorNotImplementedException and sometimes in UndeliverableException . I couldn’t easily figure out why and when so I drafted a piece of code that I hope might help others shed some light on the matter.
Exceptions can occur in many places in an RxJava chain but for this specific article we will cover three particular situations:
1) Upstream exceptions
Exceptions coming from upstream (i.e. from the source we are hooking up to) are forwarded to the subscriber’s onError()
if present, or wrapped in OnErrorNotImplementedException
and sent to RxJavaPlugins.setErrorHandler()
if not present.
2) Midstream exceptions
I’m using the term ‘midstream’ here (which does not belong to RxJava’s vocabulary) solely for lack of a better term.
Exceptions thrown inside a subscriber’s onNext()
are forwarded to their relative onError()
if present, or wrapped in OnErrorNotImplementedException
and sent to RxJavaPlugins.setErrorHandler()
if not present.
3) Downstream exceptions
Exceptions thrown inside a subscriber’s onComplete()
or onSuccess()
are not forwarded to their relative onError()
even if present, they are instead wrapped in UndeliverableException
and sent to RxJavaPlugins.setErrorHandler()
.
Exceptions thrown inside onError()
follow the same path.
Why is 3) special?
While 1) and 2) seem to be pretty straightforward why is 3) different? As mentioned in this enlightening RxJava issue, these exceptions cannot be forwarded to onError()
because onError()
onComplete()
and onSuccess()
are terminal events of a flow and thereby must always be a flow’s last emission. Since the exception itself originated during a terminal event, forwarding it to onError()
would imply emitting an onError
event right after an onSuccess
event thereby violating the definition of terminal event (for further info check out Rx’s Observable Contract).
Caveat emptor!
This can get especially tricky when mixing Observables/Flowables and Singles/Maybes/Completables because the formers do emit onNext
events, while the latters do not.
Let’s say you were processing an Observable’s events inside an onNext
and handling relative errors in an onError
: during a refactoring that Observable is changed into a Single, and so you “move” the event processing logic from onNext
to onSuccess
while keeping onError
as is. This triggers a problem: If the event processing logic throws an Exception, it won’t be forwarded toonError
anymore but is instead wrapped in UndeliverableException
and sent to RxJavaPlugins.setErrorHandler()
.
Too much talking, show me the code!
Here you go and thanks for reading!
package net.kjulio.rx.test
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.exceptions.OnErrorNotImplementedException
import io.reactivex.exceptions.UndeliverableException
import io.reactivex.functions.Consumer
import io.reactivex.plugins.RxJavaPlugins
import org.junit.Assert
import org.junit.Test
class RxErrorDeliveryTest {
@Test
fun upstreamErrorsAreRoutedToGlobalHandlerWhenOnErrorIsMissing() {
val list = mutableListOf<Throwable>()
RxJavaPlugins.setErrorHandler { list.add(it) }
Flowable.error<Any>(RuntimeException()).subscribe()
Observable.error<Any>(RuntimeException()).subscribe()
Single.error<Any>(RuntimeException()).subscribe()
Maybe.error<Any>(RuntimeException()).subscribe()
Completable.error(RuntimeException()).subscribe()
Assert.assertEquals(5, list.size)
list.forEach {
Assert.assertTrue(it is OnErrorNotImplementedException)
Assert.assertTrue(it.cause is RuntimeException)
}
}
@Test
fun upstreamErrorsAreNotRoutedToGlobalHandlerWhenOnErrorIsPresent() {
val list = mutableListOf<Throwable>()
RxJavaPlugins.setErrorHandler { list.add(it) }
Flowable.error<Any>(RuntimeException()).subscribe({}, {})
Observable.error<Any>(RuntimeException()).subscribe({}, {})
Single.error<Any>(RuntimeException()).subscribe({}, {})
Maybe.error<Any>(RuntimeException()).subscribe({}, {})
Completable.error(RuntimeException()).subscribe({}, {})
Assert.assertEquals(0, list.size)
}
@Test
fun downstreamErrorsAreAlwaysRoutedToGlobalHandlerRegardlessOfOnErrorPresence() {
// They couldn't be routed to onError (when present) because the stream has already
// completed and by contract invoking onError would represent another emission.
val list = mutableListOf<Throwable>()
RxJavaPlugins.setErrorHandler { list.add(it) }
Single.just("").subscribe(Consumer { throw RuntimeException() })
Maybe.just("").subscribe { throw RuntimeException() }
Completable.complete().subscribe { throw RuntimeException() }
Flowable.just("").subscribe({}, {}, { throw RuntimeException() })
Observable.just("").subscribe({}, {}, { throw RuntimeException() })
Single.just("").subscribe({ throw RuntimeException() }, {})
Maybe.just("").subscribe({ throw RuntimeException() }, {})
Completable.complete().subscribe({ throw RuntimeException() }, {})
Assert.assertEquals(8, list.size)
list.forEach {
Assert.assertTrue(it is UndeliverableException)
Assert.assertTrue(it.cause is RuntimeException)
}
}
@Test
fun midstreamErrorsAreRoutedToGlobalHandlerWhenOnErrorIsMissing() {
val list = mutableListOf<Throwable>()
RxJavaPlugins.setErrorHandler { list.add(it) }
Flowable.just("").subscribe { throw RuntimeException() }
Observable.just("").subscribe { throw RuntimeException() }
Assert.assertEquals(2, list.size)
list.forEach {
Assert.assertTrue(it is OnErrorNotImplementedException)
Assert.assertTrue(it.cause is RuntimeException)
}
}
@Test
fun midstreamErrorsAreNotRoutedToGlobalHandlerWhenOnErrorIsPresent() {
val list = mutableListOf<Throwable>()
RxJavaPlugins.setErrorHandler { list.add(it) }
Flowable.just("").subscribe({ throw RuntimeException() }, {})
Observable.just("").subscribe({ throw RuntimeException() }, {})
Assert.assertEquals(0, list.size)
}
}