I have a Flux stream. For each elements processed I want to have an action triggered which is an asynchronous/non-blocking one. For example a method returning back a mono from a db update.
I want this action to be done on the doOnNext block.
I don’t want to affect the Flux, the processing and the back pressure implemented there.
Supposing mono method to be called is
Mono<Integer> dbUpdate();
should my Flux be like this?
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data));
}
Or should be as mentioned on a stack overflow example.
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data).subscribe());
}
Won’t the above cause blocking issues inside doOnNext?
Also which is the most appropriate scheduler to use for this type of action?
>Solution :
dbUpdate() will be ignored if you do not subscribe to it. The following snippet doesn’t print anything because Mono.just("db update") doesn’t get subscribed.
Mono<String> dbUpdate() {
return Mono.just("db update")
.doOnNext(System.out::println);
}
public Flux<String> processData() {
return Flux.just("item 1", "item 2")
.doOnNext(data -> dbUpdate());
}
Note that .subscribe() doesn’t block your thread, it kicks off the work and returns immediately.