RxJava error handling: OnErrorNotImplementedException vs UndeliverableException

I recently spent some time digging into RxJava’s error handling facilities and realized that throwables forwarded to RxJavaPlugins.setErrorHandler() are sometimes wrapped in OnErrorNotImplementedException and sometimes in UndeliverableException . I couldn’t easily figure out why and when so I drafted a piece of code that I hope might help others shed some light on the matter.

Exceptions can occur in many places in an RxJava chain but for this specific article we will cover three particular situations:

1) Upstream exceptions

Exceptions coming from upstream (i.e. from the source we are hooking up to) are forwarded to the subscriber’s onError() if present, or wrapped in OnErrorNotImplementedException and sent to RxJavaPlugins.setErrorHandler() if not present.

2) Midstream exceptions

I’m using the term ‘midstream’ here (which does not belong to RxJava’s vocabulary) solely for lack of a better term.

Exceptions thrown inside a subscriber’s onNext() are forwarded to their relative onError() if present, or wrapped in OnErrorNotImplementedException and sent to RxJavaPlugins.setErrorHandler() if not present.

3) Downstream exceptions

Exceptions thrown inside a subscriber’s onComplete() or onSuccess() are not forwarded to their relative onError() even if present, they are instead wrapped in UndeliverableException and sent to RxJavaPlugins.setErrorHandler() . Exceptions thrown inside onError() follow the same path.

Why is 3) special?

While 1) and 2) seem to be pretty straightforward why is 3) different? As mentioned in this enlightening RxJava issue, these exceptions cannot be forwarded to onError() because onError() onComplete() and onSuccess() are terminal events of a flow and thereby must always be a flow’s last emission. Since the exception itself originated during a terminal event, forwarding it to onError() would imply emitting an onError event right after an onSuccess event thereby violating the definition of terminal event (for further info check out Rx’s Observable Contract).

Caveat emptor!

This can get especially tricky when mixing Observables/Flowables and Singles/Maybes/Completables because the formers do emit onNext events, while the latters do not.

Let’s say you were processing an Observable’s events inside an onNext and handling relative errors in an onError : during a refactoring that Observable is changed into a Single, and so you “move” the event processing logic from onNext to onSuccess while keeping onError as is. This triggers a problem: If the event processing logic throws an Exception, it won’t be forwarded toonError anymore but is instead wrapped in UndeliverableException and sent to RxJavaPlugins.setErrorHandler() .

Too much talking, show me the code!

Here you go and thanks for reading!

package net.kjulio.rx.test

import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.exceptions.OnErrorNotImplementedException
import io.reactivex.exceptions.UndeliverableException
import io.reactivex.functions.Consumer
import io.reactivex.plugins.RxJavaPlugins
import org.junit.Assert
import org.junit.Test

class RxErrorDeliveryTest {

    @Test
    fun upstreamErrorsAreRoutedToGlobalHandlerWhenOnErrorIsMissing() {
        val list = mutableListOf<Throwable>()
        RxJavaPlugins.setErrorHandler { list.add(it) }

        Flowable.error<Any>(RuntimeException()).subscribe()
        Observable.error<Any>(RuntimeException()).subscribe()
        Single.error<Any>(RuntimeException()).subscribe()
        Maybe.error<Any>(RuntimeException()).subscribe()
        Completable.error(RuntimeException()).subscribe()

        Assert.assertEquals(5, list.size)
        list.forEach {
            Assert.assertTrue(it is OnErrorNotImplementedException)
            Assert.assertTrue(it.cause is RuntimeException)
        }
    }

    @Test
    fun upstreamErrorsAreNotRoutedToGlobalHandlerWhenOnErrorIsPresent() {
        val list = mutableListOf<Throwable>()
        RxJavaPlugins.setErrorHandler { list.add(it) }

        Flowable.error<Any>(RuntimeException()).subscribe({}, {})
        Observable.error<Any>(RuntimeException()).subscribe({}, {})
        Single.error<Any>(RuntimeException()).subscribe({}, {})
        Maybe.error<Any>(RuntimeException()).subscribe({}, {})
        Completable.error(RuntimeException()).subscribe({}, {})

        Assert.assertEquals(0, list.size)
    }

    @Test
    fun downstreamErrorsAreAlwaysRoutedToGlobalHandlerRegardlessOfOnErrorPresence() {
        // They couldn't be routed to onError (when present) because the stream has already
        // completed and by contract invoking onError would represent another emission.

        val list = mutableListOf<Throwable>()
        RxJavaPlugins.setErrorHandler { list.add(it) }

        Single.just("").subscribe(Consumer { throw RuntimeException() })
        Maybe.just("").subscribe { throw RuntimeException() }
        Completable.complete().subscribe { throw RuntimeException() }

        Flowable.just("").subscribe({}, {}, { throw RuntimeException() })
        Observable.just("").subscribe({}, {}, { throw RuntimeException() })
        Single.just("").subscribe({ throw RuntimeException() }, {})
        Maybe.just("").subscribe({ throw RuntimeException() }, {})
        Completable.complete().subscribe({ throw RuntimeException() }, {})

        Assert.assertEquals(8, list.size)
        list.forEach {
            Assert.assertTrue(it is UndeliverableException)
            Assert.assertTrue(it.cause is RuntimeException)
        }
    }

    @Test
    fun midstreamErrorsAreRoutedToGlobalHandlerWhenOnErrorIsMissing() {
        val list = mutableListOf<Throwable>()
        RxJavaPlugins.setErrorHandler { list.add(it) }

        Flowable.just("").subscribe { throw RuntimeException() }
        Observable.just("").subscribe { throw RuntimeException() }

        Assert.assertEquals(2, list.size)
        list.forEach {
            Assert.assertTrue(it is OnErrorNotImplementedException)
            Assert.assertTrue(it.cause is RuntimeException)
        }
    }

    @Test
    fun midstreamErrorsAreNotRoutedToGlobalHandlerWhenOnErrorIsPresent() {
        val list = mutableListOf<Throwable>()
        RxJavaPlugins.setErrorHandler { list.add(it) }

        Flowable.just("").subscribe({ throw RuntimeException() }, {})
        Observable.just("").subscribe({ throw RuntimeException() }, {})

        Assert.assertEquals(0, list.size)
    }
}