How do I handle errors thrown in the below SpringIntegrationFlow just by logging the error and continuing with the next record. As well should be able to send an email only once irrespective of 1 or more records failed. Is there something equivalent of Spring Batch’s StepListener? Thanks
return IntegrationFlows.from(jdbcMessageSource(), p -> p.poller(pollerSpec()))
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.transform(transformer, "transform")
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
.handle(Http.outboundGateway(url)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.requestFactory(requestFactory))
.get();
>Solution :
I believe the closest equivalent for StepListener is a ChannelInterceptor.
See IntegrationFlowDefinition.intercept() operator to place in between your endpoints in that flow.
However according your splitter configuration, it doesn’t seem like you are going to lose anything and errors are just going to be logged. That c.executor() does the trick to shift the work for each record into a thread pool and its error cannot effect the rest of records.
Anyway it sounds like better for you to have a custom error channel be set into headers just before that split() – .enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, myErrorChannel())).
Than you have an aggregator subscribed to this channel where you aggregate errors for the batch and emit a message to send an email. This is going to happen really only once per batch or none at all if no errors.