Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Spring Kafka – Exception is causing Manual Immediate acknowledgment to be rolled back?

Can someone please help me understand why a message offset that is manually and immediately committed is re-processed by the KafkaListener when an exception occurs?

So I’m expecting the following behaviour:

  1. I receive an event in Kafka Listener
  2. I commit the offset
  3. An exception occurs
  4. I’m expecting that message not to be reprocessed because the offset was committed.

Not sure if my understanding is correct? Or does Spring rolls-back the manual Acknowledgment that we do in case of exception?

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

I have the following Listener code:

@KafkaListener(topics = {"${acknowledgement.topic}"}, containerFactory = "concurrentKafkaListenerContainerFactory")
public void onMessage(String message, Acknowledgment acknowledgment) throws InterruptedException {
    acknowledgment.acknowledge();

    throw new Exception1();
}

And the concurrentKafkaListenerContainerFactory code is:

    @Bean
public ConsumerFactory<String, String> consumerFactory() {
    kafkaProperties.getConsumer().setEnableAutoCommit(false);
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
    concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return concurrentKafkaListenerContainerFactory;
}

>Solution :

Yes, the default error handler treats any exception as retryable by default, regardless of whether its offset has been committed.

You should either not throw an exception, or tell the DefaultErrorHandler which exception(s) should not be retried.

/**
 * Add exception types to the default list. By default, the following exceptions will
 * not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link ConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried, unless {@link #defaultFalse()} has been called.
 * @param exceptionTypes the exception types.
 * @see #removeClassification(Class)
 * @see #setClassifications(Map, boolean)
 */
public final void addNotRetryableExceptions(Class<? extends Exception>... 
exceptionTypes) {
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading