I’m fetching a Mono (with T containing an id and orgId fields). I also have a Flux of orgIds. I need to return the Mono if T.getOrgId() is contained in the Flux.
I’m not entirely sure to understand the exact Mono/Flux flow. But so far I have tried 2 approaches, blocking and non-blocking.
Blocking
public Mono<T> getItemBlocking(final String orgId, final String id) {
final Flux<String> orgs = getOrgs(orgId);
final T t = repository.findById(id).block();
final Boolean tInOrg = orgs.hasElement(t.getOrgId()).subscribe().isDisposed();
return tInOrg ? repository.findById(id) : Mono.error(someError.error(id)); // Ideally would not make the request again
}
The code is not working, I’m receiving the following exception:
"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
Non-blocking
public Mono<T> getItemNonBlocking(final String orgId, final String id) {
final Flux<String> orgs = getOrgs(orgId);
return (Mono<T>) repository.findById(id)
.filter(t -> orgs.hasElement(t.getOrgId()).block()) // Need to make this non blocking but I'm not able to, but need the Mono<Boolean> to be a Boolean
.switchIfEmpty(Mono.error(someError.error(id))
.subscribe();
}
This code is not working also, I’m receiving the exception:
java.lang.ClassCastException: reactor.core.publisher.LambdaMonoSubscriber incompatible with reactor.core.publisher.Mono at service.getItemNonBlocking
Any clue on what I’m doing wrong?
>Solution :
- You should avoid blocking. Blocking is designed for cases where you have to interact with non reactive components.
- If you return a Mono, you should not subscribe yourself. It is the user/consumer that should do it.
Now, you can achieve what you want by modifying your second example:
- Change filter with filterWhen, to allow a reactive stream as operator, instead of a blocking op.
- Return the resulting reactive pipeline directly, instead of subscribing to it.
return repository.findById(id)
.filterWhen(t -> getOrgs(orgId).hasElement(t.getOrgId()))
.switchIfEmpty(Mono.error(someError.error(id));