Log the exceptions thrown in spring kafka listener Code Answer

We are using Spring kafka 2.7 non blocking retry mechanism. We want to log the error thrown while consuming the data in our @KafkaListener method.

For example: https://github.com/spring-projects/spring-kafka/blob/main/samples/sample-04/src/main/java/com/example/Application.java

In above example we can see, there is a RuntimeException has thrown. But that exception will not get logged instead we will get Seek to current after exception …..

// our configuration

 @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Object> factory
        = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }


@Bean
  public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {

    List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
        IllegalAccessException.class);

    return RetryTopicConfigurationBuilder
        .newInstance()
        .dltHandlerMethod(XYZ.class, "xyz")
        .exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
        .maxAttempts(retryAttempt)
        .notRetryOn(throwableList)
        .doNotAutoCreateRetryTopics()
        .listenerFactory(kafkaListenerContainerFactory())
        .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
        .create(template);
  }

Answer

You can use a RecordInterceptor.

@Bean
RecordInterceptor<String, String> interceptor(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
    RecordInterceptor<String, String> inter = new RecordInterceptor<String, String>() {

        @Override
        @Nullable
        public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
            return record;
        }

        @Override
        public void failure(ConsumerRecord<String, String> record, Exception exception,
                Consumer<String, String> consumer) {

            logger.error("Record failed " + ListenerUtils.recordToString(record, true), exception);
        }

    };
    factory.setRecordInterceptor(inter);
    return inter;
}

Related Posts

© No Copyrights, All Questions are retrived from public domain.
Tutorial Guruji