Skip middle elements of a BehaviousSubject while pending in an Android app

I have and Android app that uses Rx-Java. I have an screen that displays a list of models form the database I have a filter where the user inputs a text and the results of the list are filtered. To do so I have:

A behaviourSubject for the filter

BehaviorSubject<String> filterEmmiter = BehaviorSubject.create();

// When users inputs a text
filterEmmiter.onNext(newText);

// To get items
Flowable<String> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
filter.flatMap((currentFilter)-> vacaDao.fullDataGroupedWhere(state,  stage, currentFilter));

Te problem is that the query is slower that the user inputing text, therefore I want that when a new key is pressed, if there are query that have not started discard them. That is the idea of the BackpressureStrategy.LATEST, but it is not working (all queries are called).

I’ve read in this question that the problem is that there is a buffer on the observer, however I don’t like the solution posted.

Is there anyway to solve this?

Thanks,

Stand Alone Sample

I want the following test to print only: D/TestTime: value processed:1 D/TestTime: value processed:10

    @Test
    public void someTest(){
        BehaviorSubject<Integer> filterEmmiter = BehaviorSubject.create();
        Flowable<Integer> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
        final int maxint = 10;
        new Thread(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            for (int i = 0; i <= maxint; i++) {
                filterEmmiter.onNext(i);
            }
        }).start();

        filter.flatMap(integer -> {
            return Flowable.fromCallable(() -> {
                longOperation();
                Log.d("TestTime", "value processed:"+integer);
                return integer;
            });
        }, false, 1, 1)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
                .filter(value -> value==maxint).blockingFirst();
    }

Answer

As commented, you can use switchMap to cancel a previous query when there is a new filter text. In addition, you may want to wait for the user to pause typing before firing off the next query:

filterEmmiter
.debounce(500, TimeUnit.MILLISECONDS)
.switchMap(currentFilter -> vacaDao.fullDataGroupedWhere(state,  stage, currentFilter))
...

Edit

If you want to limit the query execution to one, even if the user keeps typing, use the maxConcurrency overload of flatMap

filterEmmiter
.toFlowable(BackpressureStrategy.LATEST)
.flatMap(currentFilter -> 
     vacaDao.fullDataGroupedWhere(state,  stage, currentFilter), 
     false,   // <-------- delayErrors
     1        // <-------- maxConcurrency
)
...

There is also a 3rd party operator flatMapLatest to accomplish this with an Observable directly.

Edit 2

From comments, you have to make your fromCallable async otherwise it will block the chain and backpressure will have no effect:

filter.flatMap(integer -> {
            return Flowable.fromCallable(() -> {
                longOperation();
                Log.d("TestTime", "value processed:"+integer);
                return integer;
            }).subscribeOn(Schedulers.io()); // <---------------------------------
        }, false, 1, 1)
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
        .filter(value -> value==maxint).blockingFirst();