How to get an element that caused an exception in Flux? Code Answer

Let’s say I have an array of ids: [9, 8, 7, 6].

I do some processing and one element causes to throw an exception. I want to handle this situation on my own way (let’s say log it) and let the other elements go with the flow.

How can I know which one was it? I need to have this element in my onError processing.

Flux.fromArray(myArray)
  .flatMap(element -> {
    var foo = processMyEl(element);  
    return anotherProcess(foo); // this returns Mono
  })
  .onErrorOperator(element -> handleMyError(element)) // this line is what I need
  

So, what I saw, there’s this almost nice .onErrorContinue((error, obj) -> that emits an error and an object.

But this obj is not the element that caused the exception but the object that did so. It happens inside of my processing methods and it doesn’t have to be the same type of object every time.

.onErrorReturn(...) – not really what I want

.doOnError(error -> – no information of my element

.onErrorResume(error -> – same as above

there were suggestions that I can create my own Exception and pass there the element and then retrieve it from the exception. But how should I throw the exception?

Should I go with an old way of try catch:

Flux.fromArray(myArray)
  .flatMap(el -> {
    try {
      var foo = processMyEl(el);  
      return anotherProcess(foo); // this returns Mono
    } catch (Exception e) {
      return Mono.error(new MyException(el));
     }
    })
  .onErrorOperator(error -> handleMyError(error.getElement()))

It doesn’t look well

Edit:

Not only it looks bad, but also doesn’t work. The exception is not caught at all and triggers directly doOnTerminate() and stops the whole stream

Update:

Thanks to @JEY I used .onErrorResume() inside flatMap.

I also transformed first method to be a reactive stream by Mono.defer(() -> Mono.just(processMyEl(el))).

Just as a note: using Mono.defer() allows me to use onErrorResume since Mono.just() cannot signal errors.

Final code looks like this:

Flux.fromArray(myArray)
    .flatMap(element -> Mono.defer(() -> Mono.just(processMyEl(element)))
        .onErrorResume(th -> handleMyError(element, th))
    )
    .flatMap(foo -> anotherProcess(foo)
        .onErrorResume(th -> handleMyError(foo, th)
    )

Where:

private Mono<> handleMyError(el, th) {
  // handling code
  return Mono.empty()
}

Answer

As requested by @Kamil I’ll add my comments as an answer:

You should just handle the error in the flatMap and return a Mono.empty() to discard it do something like:

Flux.fromArray(myArray)
  .flatMap(el -> anotherProcess(processMyEl(el)).onErrorResume(th -> handleError(th, el))

With handle error like:

Mono<Void> handleError(Throwable th, Object element) {
    LOG.error("An error occurred on {}", element, th);
    return Mono.empty()
}

Or if you want to do something more complex that require async:

Mono<Void> handleError(Throwable th, Object element) {
    return doSomethingThaReturnFluxOrMono(element).then();
}

Related Posts

© No Copyrights, All Questions are retrived from public domain.
Tutorial Guruji