Pull to refresh

Spring Reactor Mono::block deadlock

Reading time3 min
Views1.6K
And where is the deadlock here?
And where is the deadlock here?

Faced with the situation when calling Mono::block leads to deadlock - in debug I can just see how it stands on Unsafe::park and no more movement at all.


Actually this is the code example with described problem.

Flux<PojoClass> pojoClassFlux = Flux.fromIterable(pojoClassSet);

pojoClassFlux
        .groupBy(PojoClass::getInteger)
        .map(pojoClassIntegerGroupedFlux -> pojoClassIntegerGroupedFlux
                .map(pojoClass -> {
                    //some mapping
                })
                .reduce(new PojoClassIntegerGroup(), (currentPojoClassIntegerGroup, newSourceFilterToColumn) -> {
                    //some reduce function
                })
        )// <-- Flux<Mono<PojoClassIntegerGroup>>
        .reduce(new PojoClassIntegerAllGroup(), (currentPojoClassIntegerAllGroup, newPojoClassIntegerGroupMono) -> {
                    PojoClassIntegerGroup newPojoClassIntegerGroup = newPojoClassIntegerGroupMono
                            .block();// <-- here is the DEADLOCK!
                    // do something with newPojoClassIntegerGroup for reduce
                }
        )
        .subscribe(//do something,
                throwable -> {
                    //throw the problem
                },
                () -> log.info("SUCCESS")
        );

So, at the point of newPojoClassIntegerGroupMono.block() of my example, I got stack of JVM awaiting for something at Unsafe::park method.

Continuous awaiting at Unsafe::park method.
Continuous awaiting at Unsafe::park method.

And it happens nothing after awaiting for some time - the debug point not moving at all. For current example it was used next version of reactor-core dependency in pom.xml:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.10.RELEASE</version>
</dependency>

Here you can find the link to registered issue at GitHub for reactor-core:

https://github.com/reactor/reactor-core/issues/1651

But how can we solve our case now?

We can use Flux::flatMap instead of Flux::map function after groupBy. And then our code example become like below - it works!

pojoClassFlux
        .groupBy(PojoClass::getInteger)
        .flatMap(pojoClassIntegerGroupedFlux -> pojoClassIntegerGroupedFlux // <-- flatMap here
                .map(pojoClass -> {
                    //some mapping
                })
                .reduce(new PojoClassIntegerGroup(), (currentPojoClassIntegerGroup, newSourceFilterToColumn) -> {
                    //some reduce function
                })
        )// <-- Flux<PojoClassIntegerGroup> here after flatMap
        .reduce(new PojoClassIntegerAllGroup(), (currentPojoClassIntegerAllGroup, newPojoClassIntegerGroup) -> {
                    //do something with the newPojoClassIntegerGroup for reduce -- no need for Mono::block!
                }
        )
        .subscribe(//do something with final PojoClassIntegerAllGroup,
                throwable -> {
                    //throw the problem
                },
                () -> log.info("SUCCESS")
        );

Using Flux::flatMap here, we will remove passing of unnecessary Mono inside of Flux at final reduce step - it will be just Flux<PojoClassIntegerGroup> - what we need!

At least if there is a need to work with the result - it can be used Mono::subscribe to set some AtomicReference field with the result of Mono, instead of Mono::block directly. See example below:

AtomicReference<MyBestPojoClass> myBestPojoClassHolder = new AtomicReference<>();
pojoClassMono.subscribe(myBestPojoClassHolder::set);

It is better not to call blocking operation in general and Mono::block particularly at all when working with Spring Reactor. This example demonstrates how it could lead to deadlock. In this case you can see how Flux::flatMap removes Mono inside of Flux<Mono<PojoClassIntegerGroup>> - and we can work directly with returned type without calling blocking method. If very needed to extract the result of Mono then try Mono::subscribe to set the field as described above in this article. If you have your own recipe of how to avoid call to block in some cases with Reactor - please feel free to write comments here.

If you have what to say about the topic and want to give negative rating - please feel free to write your comments to understand your opinion. Otherwise it is not serious and it is not clear why you vote negatively.

And of course If you like this article, please vote UP - this will support me to write more such posts with real code examples!

Tags:
Hubs:
Rating0
Comments0

Articles