Webflux subscriber

I’m currently facing an issue about saving on redis inside a switchIfEmpty function. Probably is related to the fact I’m pretty new in Reactive Programming and unfortunately I struggling to find proper examples about it.

However I’ve been able to solve it but I’m pretty sure there’s a better way to do it. Here is what I wrote now :

public Mono<ResponseEntity<BaseResponse<Content>>> getContent(final String contentId, final String contentType){
    return redisRepository.findByKeyAndId(REDIS_KEY_CONTENT, contentId.toString()).cast(Content.class)
               .map(contentDTO -> ResponseEntity.status(HttpStatus.OK.value())
                                                .body(new BaseResponse<>(HttpStatus.OK.value(), HttpStatus.OK.getReasonPhrase(), contentDTO)))
               //here I have to defer, otherwise It would never wait for the findByKeyAndId 
               .switchIfEmpty(Mono.defer(() -> {
                   Mono<ResponseEntity<BaseResponse<Content>>> responseMono = contentService.getContentByIdAndType(contentId, contentType);
                   
                   //so far I understood I need to consume every stream I have, in order to actually carry out the task otherwise will be there waiting for a consumer.
                   //once I get what I need from the cmsService I need to put it in cache and till now this is the only way I've been able to do it
                   responseMono.filter(response -> response.getStatusCodeValue() == HttpStatus.OK.value())
                               .flatMap(contentResponse -> redisRepository.save(REDIS_KEY_CONTENT, contentId.toString(), contentResponse.getBody().getData()))
                                        .subscribe();
                   //then I return the Object I firstly retrived thru the cmsService
                   return responseMono;
               }
    ));
}

any clue or suggestion of better ways to do it? Thank you in advance for the help!

Answer

There’s a few things that aren’t great from a best practice perspective there, and this probably isn’t doing exactly what you think it’s doing:

  • You’re subscribing yourself, which is usually a clear sign something is wrong. Special cases aside, subscribing should generally be left to the framework. This is also the reason you need the Mono.defer() – usually, nothing would happen until the framework subscribed to your publisher at the correct time, whereas you’re managing the subscription lifecycle for that publisher yourself.
  • The framework will still subscribe to your inner publisher there, it’s just that the Mono that you’re returning doesn’t do anything with its result. So you’ll likely be calling contentService.getContentByIdAndType() twice, not just once – once when you subscribe, and once when the framework subscribes.
  • Subscribing on an inner publisher like this creates a “fire and forget” type model, which means when your reactive method returns you’ve got no clue if redis has actually saved it yet, which could cause issues if you then come to rely on that result later.
  • Unrelated to the above, but contentId is already a string, you don’t need to call toString() on it 🙂

Instead, you might consider making use of delayUntil inside your switchIfEmpty() block – this will allow you to save the value to redis if the response code is ok, delay until this has happened, and keep the original value when done. The code might look something like this (difficult to say if this is exactly correct without a complete example, but it should give you the idea):

return redisRepository
        .findByKeyAndId(REDIS_KEY_CONTENT, contentId).cast(Content.class)
        .map(contentDTO -> ResponseEntity.status(HttpStatus.OK.value()).body(new BaseResponse<>(HttpStatus.OK.value(), HttpStatus.OK.getReasonPhrase(), contentDTO)))
        .switchIfEmpty(
                contentService.getContentByIdAndType(contentId, contentType)
                        .delayUntil(response -> response.getStatusCodeValue() == HttpStatus.OK.value() ?
                                redisRepository.save(REDIS_KEY_CONTENT, contentId, contentResponse.getBody().getData()) :
                                Mono.empty())
        );

Leave a Reply

Your email address will not be published. Required fields are marked *