[FIXED] How to handle mocked RxJava2 observable throwing exception in unit test


I have been doing TDD in Kotlin for these past few weeks now in Android using MVP. Things have been going well.

I use Mockito to mock classes but I can’t seem to get over on how to implement one of the tests I wanted to run.

The following are my tests:

  1. Call api, receive list of data, then show list. loadAllPlacesTest()
  2. Call api, receive empty data, then show list. loadEmptyPlacesTest()
  3. Call api, some exception happen on the way, then show error message. loadExceptionPlacesTest()

I have tests for #1 and #2 successfully. The problem is with #3, I’m not sure how to approach the test in code.


interface RestApiInterface {

fun getPlacesPagedObservable(
        @Header("header_access_token") accessToken: String?,
        @Query("page") page: Int?
): Observable<PlacesWrapper>

the manager class implementing the interface looks like this:

open class RestApiManager: RestApiInterface{
var api: RestApiInterface
    internal set
internal var retrofit: Retrofit
init {
    val logging = HttpLoggingInterceptor()
    // set your desired log level

    val client = okhttp3.OkHttpClient().newBuilder()
            .readTimeout(60, TimeUnit.SECONDS)
            .connectTimeout(60, TimeUnit.SECONDS)

    retrofit = Retrofit.Builder()
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())//very important for RXJAVA and retrofit
    api = retrofit.create(RestApiInterface::class.java)
override fun getPlacesPagedObservable(accessToken: String?, page: Int?): Observable<PlacesWrapper> {
    //return throw Exception("sorry2")
    return api.getPlacesPagedObservable(


Here is my unit test:

class PlacesPresenterImplTest : AndroidTest(){

lateinit var presenter:PlacesPresenterImpl
lateinit var view:PlacesView
lateinit var apiManager:RestApiManager
//lateinit var apiManager:RestApiManager


val MANY_PLACES = Arrays.asList(PlaceItem(), PlaceItem());
var EXCEPTION_PLACES = Arrays.asList(PlaceItem(), PlaceItem());

val manyPlacesWrapper = PlacesWrapper(MANY_PLACES)
var exceptionPlacesWrapper = PlacesWrapper(EXCEPTION_PLACES)
val emptyPlacesWrapper = PlacesWrapper(Collections.emptyList())

fun clear(){
fun init(){
    //MOCKS THE subscribeOn(Schedulers.io()) to use the same thread the test is being run on
    //Schedulers.trampoline() runs the test in the same thread used by the test
    RxJavaPlugins.setIoSchedulerHandler { t -> Schedulers.trampoline() }

    view = Mockito.mock<PlacesView>(PlacesView::class.java)
    apiManager = Mockito.mock(RestApiManager::class.java)
    presenter = PlacesPresenterImpl(view,context(), Bundle(), Schedulers.trampoline())
    presenter.apiManager = apiManager

    //exceptionPlacesWrapper = throw Exception(EXCEPTION_MESSAGE1);

fun loadAllPlacesTest() {
    Mockito.`when`(apiManager.getPlacesPagedObservable(Mockito.anyString(), Mockito.anyInt())).thenReturn(Observable.just(manyPlacesWrapper))

    Mockito.verify(view, Mockito.atLeastOnce()).__showLoading()
    Mockito.verify(view, Mockito.atLeastOnce())._showList()

fun loadEmptyPlacesTest() {

    Mockito.`when`(apiManager.getPlacesPagedObservable(Mockito.anyString(), Mockito.anyInt())).thenReturn(Observable.just(emptyPlacesWrapper))
    Mockito.verify(view, Mockito.atLeastOnce()).__showLoading()
    Mockito.verify(view, Mockito.atLeastOnce())._showList()

fun loadExceptionPlacesTest() {
    Mockito.`when`(apiManager.getPlacesPagedObservable(Mockito.anyString(), Mockito.anyInt())).thenThrow(Exception(EXCEPTION_MESSAGE1))
    Mockito.verify(view, Mockito.atLeastOnce()).__showLoading()
    Mockito.verify(view, Mockito.never())._showList()

This is the presenter.

   class PlacesPresenterImpl
constructor(var view: PlacesView, var context: Context, var savedInstanceState:Bundle?, var mainThread: Scheduler)
: BasePresenter(), BasePresenterInterface, PlacesPresenterInterface {

lateinit var apiManager:RestApiInterface
var placeListRequest: Disposable? = null

override fun __firstInit() {
    apiManager = RestApiManager()

override fun __init(context: Context, savedInstanceState: Bundle, view: BaseView?) {
    this.view = view as PlacesView
    if (__isFirstTimeLoad())

override fun __destroy() {

override fun __populate() {

override fun _callPlacesApi() {
    apiManager.getPlacesPagedObservable("", 0)
            .subscribe (object : DisposableObserver<PlacesWrapper>() {
                override fun onNext(placesWrapper: PlacesWrapper) {
                    placesWrapper?.let {
                        val size = placesWrapper.place?.size
                        System.out.println("Great I found " + size + " records of places.")
                        view.__showFullScreenMessage("Great I found " + size + " records of places.")

                override fun onError(e: Throwable) {
                    if (ExceptionsUtil.isNoNetworkException(e)){
                        view.__showFullScreenMessage("So sad, can not connect to network to get place list.")
                        view.__showFullScreenMessage("Oops, something went wrong. ["+e.localizedMessage+"]")


                override fun onComplete() {


private fun _getEventCompletionObserver(): DisposableObserver<String> {
    return object : DisposableObserver<String>() {
        override fun onNext(taskType: String) {
            //_log(String.format("onNext %s task", taskType))

        override fun onError(e: Throwable) {
            //_log(String.format("Dang a task timeout"))
            //Timber.e(e, "Timeout Demo exception")

        override fun onComplete() {
            //_log(String.format("task was completed"))

Problem/Questions for the loadExceptionPlacesTest()

  1. I’m not sure why the code doesn’t go to the Presenter’s onError().
    correct me if I’m wrong the following but this is what I think:
  •     a – `apiManager.getPlacesPagedObservable(“”, 0)` observable itself throws an Exception that is why the `.subscribe()` can not happen/proceed and the methods of the observer won’t get called,
  •     b – it will only go to onError() when the operations inside the observable encounters an Exception like JSONException
  1. For loadExceptionPlacesTest() I think the 1b above is the way to go to make the presenter’s onError() get called and make the test pass. Is this correct? If it is how to do it on the test. If it is not can you guys point out what I am missing or doing wrong?


I’ll leave this here for future reference and to be able to elaborate a bit more, even though I’ve answered in the comments.

What you’re trying to accomplish is to put the stream in the onError flow. Unfortunately, by mocking it like this:

                   Mockito.anyString(), Mockito.anyInt()))

You’re actually telling Mockito to setup your mock in a way that just calling apiManager.getPlacesPagedObservable(anystring, anystring) should thrown an exception.

It is indeed true that throwing an exception inside an Rx stream will cause the entire stream to stop and end up in the onError method. However, this is exactly the problem with the approach you’re using. You’re not inside the stream when the exception is thrown.

Instead what you want to do is tell Mockito that once you call apiManager.getPlacesPagedObservable(anystring, anystring) you want to return a stream that will end up in the onError. This can be easily achieved with Observable.error() like so:

               Mockito.a‌​nyString(), Mockito.anyInt()))

(It might be possible that you need to add some type information in this part here Observable.error(), you might also need to use something else instead of an observable – single, completable, etc.)

The mocking above will tell Mockito to setup your mock to return an observable that will error as soon as it’s subscribed to. This will in turn put your subscriber directly in the onError stream with the specified exception.

Answered By – Fred

Answer Checked By – Katrina (Easybugfix Volunteer)

Leave a Reply

(*) Required, Your email will not be published