You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
FlatMapDelayError does not propagate an onError signal when a callable source produces an error.
It also does not cancel the upstream subscription. This results in a hanging flux that can never complete because it
does not process any more values from the upstream publisher nor does it signal a termination to downstream subscribers.
Notably this only occurs when the error is from a callable source. When the mapping function throws the error directly
then the behavior is as expected where the backlog is consumed and the flux terminates with an error. However when the
operator is modified with onErrorConsume the error is properly swallowed and processing continues.
This does not affect concatMapDelayError as far as I have been able to test
Expected Behavior
FlatMapDelayError should cancel the upstream subscription, process the backlog, then emit an onError response
Actual Behavior
FlatMapDelayError never completes when callable source throws an error
Additionally it appears that this is caused by some optimization around the flatMap operator as there is a current test which utilizes .hide() on the error flux and completes successfully
linked here
A difference when using hide though is that the upstream subscription is still never cancelled so there is still a deviation of behavior from the thrown case as this would cause hot infinite publishers to never propagate the error and continue requesting events.
Steps to Reproduce
Below are examples for the direct throw vs callable source
@TestvoidworkingFlatMapDelayError() {
Flux.just(0, 1, 2, 3).log()
.flatMapDelayError(integer -> {
thrownewRuntimeException(); // Cancels upstream subscription after consuming one event
}, 1, 1)
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(1)); // Completes as expected
}
@TestvoidhangingFlatMapDelayError() {
Flux.just(0, 1, 2, 3).log()
.flatMapDelayError(integer -> {
returnFlux.error(newRuntimeException()); // Does not cancel upstream subscription
}, 1, 1)
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(1)); // Triggers timeout
}
@TestvoiddeoptimizedFlatMapDelayError() {
Flux.just(0, 1, 2, 3).log()
.flatMapDelayError(integer -> {
returnFlux.error(newRuntimeException()).hide(); // Does not cancel upstream subscription
}, 1, 1)
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(1)); // Completes after consuming all events
}
FlatMapDelayError does not propagate an onError signal when a callable source produces an error.
It also does not cancel the upstream subscription. This results in a hanging flux that can never complete because it
does not process any more values from the upstream publisher nor does it signal a termination to downstream subscribers.
Notably this only occurs when the error is from a callable source. When the mapping function throws the error directly
then the behavior is as expected where the backlog is consumed and the flux terminates with an error. However when the
operator is modified with onErrorConsume the error is properly swallowed and processing continues.
This does not affect concatMapDelayError as far as I have been able to test
Expected Behavior
FlatMapDelayError should cancel the upstream subscription, process the backlog, then emit an onError response
Actual Behavior
FlatMapDelayError never completes when callable source throws an error
Possible Solution
This behavior seems to have been introduced here
af0cb62#diff-3ad99cc26a2bc1707dda2203da9a667ccf3abdd52aac45ee6b2c4cea438a842fR361-R363.
However I do not know the side effects well enough to give a full solution.
Additionally it appears that this is caused by some optimization around the flatMap operator as there is a current test which utilizes .hide() on the error flux and completes successfully
linked here
reactor-core/reactor-core/src/test/java/reactor/core/publisher/FluxFlatMapTest.java
Line 604 in 912441f
A difference when using hide though is that the upstream subscription is still never cancelled so there is still a deviation of behavior from the thrown case as this would cause hot infinite publishers to never propagate the error and continue requesting events.
Steps to Reproduce
Below are examples for the direct throw vs callable source
Your Environment
netty
, ...): N/Ajava -version
): 17uname -a
): Windows 11The text was updated successfully, but these errors were encountered: