Espresso Idling Resource setup while using RxJava in Paging library v2

I am trying to write an Espresso test while I am using Paging library v2 and RxJava :

class PageKeyedItemDataSource<T>(
        private val schedulerProvider: BaseSchedulerProvider,
        private val compositeDisposable: CompositeDisposable,
        private val context : Context
) : PageKeyedDataSource<Int, Character>() {

    private var isNext = true

    private val isNetworkAvailable: Observable<Boolean> =
        Observable.fromCallable { context.isNetworkAvailable() }

    override fun fetchItems(page: Int): Observable<PeopleWrapper> =
        wrapEspressoIdlingResource {
            composeObservable { useCase(query, page) }
        }

    override fun loadAfter(params: LoadParams<Int>, callback: LoadCallback<Int, Character>) {
        if (isNext) {
            _networkState.postValue(NetworkState.LOADING)

            isNetworkAvailable.flatMap { fetchItems(it, params.key) }
                .subscribe({
                    _networkState.postValue(NetworkState.LOADED)
                    //clear retry since last request succeeded
                    retry = null
                    if (it.next == null) {
                        isNext = false
                    }
                    callback.onResult(it.wrapper, params.key + 1)
                }) {
                    retry = {
                        loadAfter(params, callback)
                    }
                    initError(it)
                }.also { compositeDisposable.add(it) }
        }
    }

    override fun loadInitial(
        params: LoadInitialParams<Int>, callback: LoadInitialCallback<Int, Character>,
    ) {
        _networkState.postValue(NetworkState.LOADING)
        isNetworkAvailable.flatMap { fetchItems(it, 1) }
            .subscribe({
                _networkState.postValue(NetworkState.LOADED)
                if (it.next == null) {
                    isNext = false
                }
                callback.onResult(it.wrapper, null, 2)
            }) {
                retry = {
                    loadInitial(params, callback)
                }
                initError(it)
            }.also { compositeDisposable.add(it) }
   }
}

Here is my wrapEspressoIdlingResource :

inline fun <T> wrapEspressoIdlingResource(task: () -> Observable<T>): Observable<T> = task()
    .doOnSubscribe { EspressoIdlingResource.increment() } // App is busy until further notice
    .doFinally { EspressoIdlingResource.decrement() } // Set app as idle.

But it does not wait until data delivered from network. When I Thread.Sleep before data delivered, Espresso test will be passed, so it is related to my Idling Resource setup.

I believe it could be related to Paging library, since this method works perfectly fine for Observable types when I use them in other samples without Paging library.

Full source code is available at : https://github.com/Ali-Rezaei/StarWarsSearch-Paging

What am I missing?

Answer

I needed to override the fetchDispatcher on the builder :

class BasePageKeyRepository<T>(
        private val scheduler: BaseSchedulerProvider,
) : PageKeyRepository<T> {

      @MainThread
      override fun getItems(): Listing<T> {

         val sourceFactory = getSourceFactory()

         val rxPagedList = RxPagedListBuilder(sourceFactory, PAGE_SIZE)
            .setFetchScheduler(scheduler.io()).buildObservable()

         ...
     }
}