Create an instance of org.springframework.kafka.listener.adapter.RecordFilterStrategy using SPEL

I’m having a difficulty understanding how to configure the following.

Since spring-kafka:2.8.4 thee KafkaListener interface can be configured with a filter which will be applied to all incoming messages, the Javadoc for the filter method:

    /**
     * Set an {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} bean
     * name to override the strategy configured on the container factory. If a SpEL
     * expression is provided ({@code #{...}}), the expression can either evaluate to a
     * {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} instance or
     * a bean name.
     * @return the error handler.
     * @since 2.8.4
     */
    String filter() default "";

RecordFilterStrategy has a single method:

    /**
     * Return true if the record should be discarded.
     * @param consumerRecord the record.
     * @return true to discard.
     */
    boolean filter(ConsumerRecord<K, V> consumerRecord);

Basically I need to create a kind of a lambda, but I don’t understand how to reference the consumerRecord variable, this is what I have already tried:

#{#consumerRecord.key().equals(T(com.example.kafkaconsumer.EventType).CREATE.toString())}

It fails with the exception:

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1011E: 
Method call: Attempted to call method key() on null context object

This is what I’m trying to implement using SPEL:

@Bean
public RecordFilterStrategy<String, Foo> recordFilterStrategy() {
    return rec -> !Objects.equals(rec.key(), EventType.CREATE.toString());
}

>Solution :

See that JavaDocs one more time:

Set an {@link org.springframework.kafka.listener.adapter.RecordFilterStrategy} bean
 * name

Since you already have a recordFilterStrategy bean, so that’s enough for your to use in that filter() attribute:

 filter = "recordFilterStrategy"

No need to fight with a complex SpEL.

Leave a Reply