Spring Team,
The below producer is able to send the values to a kafka topic successfully.
@Bean
Supplier<Flux<Integer>> someProducer(){
return () -> Flux.range(1, 10);
}
But..how do we get the correlation id of the message produced as we get using ReactiveKafkaSender? Since the flux is subscribed by Spring internally, Is there any way to get?
>Solution :
Currently, the binder does not support getting the complete SenderResult, only the RecordMetadata of a successful send.
Please open a bug on GitHub (spring-cloud-stream) and reference this question.
To get the RecordMetadata you can use Supplier<Flux<Message<Integer>>> and set the senderResult header in the message to an AtomicInteger<Mono<RecordMetadata>>; it will be populated with a Mono which you can subscribe to.
However, I can see that this is not much use without the correlation metadata.