Is Reactor's FlatMap Asynchronous?

Advertisements

I’m new to reactive programming and I’m using reactor through micronaut framework and kotlin. I’m trying to understand the advantages of reactive programming and how we implement it using Map and FlatMap through Mono and Flux.

I understand the non-blocking aspect of reactive programming but I’m confused if the operation on the data stream is actually asynchronous.

I’ve been reading about FlatMap and understand that they asynchronously produce inner streams and then merge these streams to another Flux without maintaining the order. The many diagrams I’ve seen all make it easier to understand but I have some basic questions when it comes down to actual use-cases.

Example:

fun updateDetials() {
        itemDetailsCrudRepository.getItems()
            .flatMap { 
                customerRepository.save(someTransferObject.toEntity(it))
            }
    }

In the above example assume itemDetailsCrudRepository.getItems() returns a Flux of a particular Entity. The flatMap operation has to save each of the items in the flux to another table. customerRepository.save() will save the item from the flux and we get the required entity through an instance of a data class someTransferObject.

Now, let’s say the getItems() query returned 10 items and we need to save 10 rows in the new table. Is the flatMap operation(the operation of saving these items into the new table) applied to each item of the flux one at a time(synchronously) or does all the save happen at once asynchronously?

One thing I read was if subscribeOn(Scheduler.parallel()) is not applied then the flatMap operation is applied to each item in the flux one at a time(synchronously). Is this information right?

Please do correct me if my basic knowledge itself is incorrect.

>Solution :

Yes, you have a good understanding of reactive programming, and your questions are valid. In the example you’ve shown, the flatMap operation applied to each item in the flux one at a time, unless you specify a different scheduler with subscribeOn. By default, when you subscribe to a Flux or Mono, the operations within the reactive pipeline will be executed on the same thread that calls subscribe. If you want to apply the flatMap operation asynchronously, you can use subscribeOn to specify a different scheduler. For example, you can use Scheduler.parallel() to execute the operations on a different thread pool, which will allow multiple operations to be executed in parallel. Here’s an example of how you could modify your code to make the flatMap operation asynchronous:

fun updateDetails() {
    itemDetailsCrudRepository.getItems()
        .flatMap { 
            customerRepository.save(someTransferObject.toEntity(it))
        }
        .subscribeOn(Scheduler.parallel())
        .subscribe()
}

Note that in this example, we have added a subscribe call at the end of the reactive pipeline to initiate the execution of the operations. It’s also worth noting that even when you use subscribeOn(Scheduler.parallel()), the order of the items in the Flux may not be preserved. The order of the items will be determined by the underlying scheduler, and there’s no guarantee that the items will be processed in the order in which they were emitted. However, you can use other operators such as concatMap or concatMapSequential to preserve the order of the items in the Flux.

Leave a ReplyCancel reply