Flux does not wait for completion of elements before ‘then’

I am failing to understand the issue and I am not sure what am I doing wrong

I want to wait for flux to end and then return mono of serverResponse

I have attached the code snippet,the doOnNext will populate the categoryIdToPrintRepository

I have looked around on how to return mono after flux ends and found the ‘then’ but still the ‘then’ method is being executed way before the onNextSite is being processed, this results with the error:

java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry

What am I doing wrong?

 public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
        return Mono.just("start").flatMap(id ->
                Flux.fromIterable(appSettings.getSites())
                        .subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
                        .doOnNext(this::onNextSite)
                        .then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));

    }

    private void onNextSite(Integer siteId) {
        IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
            Optional<SiteCatalogCategoryDTO> cacheData =
                    siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
            cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
                    () -> {
                    Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
                            .get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
                    catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
                            handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
            });
        });
    }


    private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
                                          int siteId, int catalogId) {
        if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
            Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
                    .subscribe(mapSCC -> {
                        categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
                                "Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + """ +
                                        mapSCC.getCategoryName() + "" (" + mapSCC.getCategoryId() + ")");
                        siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
                    });
    }

Answer

You are doing several things wrong you should not subscribe in your application, and you are having void methods, which should not be used in reactive programming unless in specific places.

here is some example code:

// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
    Mono.just("Foo");
}

// complier error
doSomething().subscribe( ... );

Your application is a publisher the calling client, is the subscriber, thats why we return a Mono or a Flux out to the calling client, they subscribe.

You have solved it this way:

private void doSomething() {
    Mono.just("Foo").subscribe( ... );
}

doSomething();

Now you are subscribing to yourself to get things running, this is not the correct way, as mentioned before, the calling client is the subscriber, not you.

Correct way:

private Mono<String> doSomething() {
    return Mono.just("Foo");
}

// This is returned out to the calling client, they subscribe
return doSomething();

As a Mono/Flux completes, it will emit a signal, this signal will trigger the next and the next and the next in the chain.

So my opinion of what you need to do is the following:

  • Remove all subscribes, if you want to do things there are functions like, flatmap, map, doOnSuccess etc. keep the chain instact all the way out to the client.
  • Remove all void functions, make sure they return a Flux or a Mono and if you want to not return something return a Mono<Void> by using the Mono.empty() function so that the chain will be complete.

As soon as you use a Mono/Flux you need to handle the return so that others can chain on.

Update:

In order for then to trigger, you must return something, it will return when the previous mono/flux completes.

example:

private Flux<String> doSomething() {
    return Flux.just("Foo", "Bar", "FooBar")
               .doOnNext(string -> {
                   return // return something
               });
}

// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );

Leave a Reply

Your email address will not be published. Required fields are marked *