Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Filter a Mono<T> with a String Id matching one of a list of Flux<String>

I’m fetching a Mono (with T containing an id and orgId fields). I also have a Flux of orgIds. I need to return the Mono if T.getOrgId() is contained in the Flux.

I’m not entirely sure to understand the exact Mono/Flux flow. But so far I have tried 2 approaches, blocking and non-blocking.

Blocking

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

public Mono<T> getItemBlocking(final String orgId, final String id) {
        final Flux<String> orgs = getOrgs(orgId);
        final T t = repository.findById(id).block();
        final Boolean tInOrg = orgs.hasElement(t.getOrgId()).subscribe().isDisposed();
        return tInOrg ? repository.findById(id) : Mono.error(someError.error(id)); // Ideally would not make the request again
}

The code is not working, I’m receiving the following exception:

"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)

Non-blocking

public Mono<T> getItemNonBlocking(final String orgId, final String id) {
        final Flux<String> orgs = getOrgs(orgId);
        return (Mono<T>) repository.findById(id)
                .filter(t -> orgs.hasElement(t.getOrgId()).block()) // Need to make this non blocking but I'm not able to, but need the Mono<Boolean> to be a Boolean
                .switchIfEmpty(Mono.error(someError.error(id))
                .subscribe();
}

This code is not working also, I’m receiving the exception:
java.lang.ClassCastException: reactor.core.publisher.LambdaMonoSubscriber incompatible with reactor.core.publisher.Mono at service.getItemNonBlocking

Any clue on what I’m doing wrong?

>Solution :

  1. You should avoid blocking. Blocking is designed for cases where you have to interact with non reactive components.
  2. If you return a Mono, you should not subscribe yourself. It is the user/consumer that should do it.

Now, you can achieve what you want by modifying your second example:

  1. Change filter with filterWhen, to allow a reactive stream as operator, instead of a blocking op.
  2. Return the resulting reactive pipeline directly, instead of subscribing to it.
return repository.findById(id)
                .filterWhen(t -> getOrgs(orgId).hasElement(t.getOrgId()))
                .switchIfEmpty(Mono.error(someError.error(id));

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading