From 2ce16400ef1d82bfd08006c4431a5816c466e6fe Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Tue, 14 Feb 2023 10:04:10 +0200 Subject: [PATCH] ensures exception is caught if inner fused for `FlatMap` #3353 --- .../reactor/core/publisher/FluxFlatMap.java | 3 +- .../core/publisher/FluxFlatMapTest.java | 40 ++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java index f3959edd33..e14d6eb1fc 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -416,6 +416,7 @@ else if (!delayError || !Exceptions.addThrowable(ERROR, this, e_)) { onError(Operators.onOperatorError(s, e_, t, ctx)); } Operators.onDiscard(t, ctx); + tryEmitScalar(null); return; } tryEmitScalar(v); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxFlatMapTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxFlatMapTest.java index 7176da5845..5997724024 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxFlatMapTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxFlatMapTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,6 @@ import reactor.core.TestLoggerExtension; import reactor.core.publisher.FluxPeekFuseableTest.AssertQueueSubscription; import reactor.core.scheduler.Schedulers; -import reactor.test.util.LoggerUtils; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import reactor.test.subscriber.AssertSubscriber; @@ -1869,4 +1868,41 @@ public void noWrappingOfCheckedExceptions_hide() { .expectError(NoSuchMethodException.class) .verify(); } + + static class FluxFlatMapDelayError3336Test { + + @Test + void workingFlatMapDelayError() { + Flux.just(0, 1, 2, 3) + .flatMapDelayError(integer -> { + throw new RuntimeException(); // Cancels upstream subscription after consuming one event + }, 1, 1) + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(1)); // Completes as expected + } + + @Test + void hangingFlatMapDelayError() { + Flux.just(0, 1, 2, 3) + .flatMapDelayError(integer -> { + return Flux.error(new RuntimeException()); // Does not cancel upstream subscription + }, 1, 1) + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(1)); // Triggers timeout + } + + @Test + void deoptimizedFlatMapDelayError() { + Flux.just(0, 1, 2, 3) + .flatMapDelayError(integer -> { + return Flux.error(new RuntimeException()) + .hide(); // Does not cancel upstream subscription + }, 1, 1) + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(1)); // Completes after consuming all events + } + } }