diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java index 45526259f0..77ea731930 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchOnFirst.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -716,7 +716,7 @@ public final void subscribe(CoreSubscriber inboundSubscriber) { return; } - if (hasInboundClosedPrematurely(previousState)) { + if (hasInboundCancelled(previousState) ||hasInboundClosedPrematurely(previousState)) { Operators.error(inboundSubscriber, new CancellationException("FluxSwitchOnFirst has already been cancelled")); return; } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java index db6cffc33b..b3e5a20bf2 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxSwitchOnFirstTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ import reactor.core.scheduler.Schedulers; import reactor.test.MockUtils; import reactor.test.StepVerifier; +import reactor.test.StepVerifierOptions; import reactor.test.publisher.TestPublisher; import reactor.test.scheduler.VirtualTimeScheduler; import reactor.test.util.RaceTestUtils; @@ -1386,6 +1387,35 @@ public void shouldCancelSourceOnOnDownstreamTerminal() { assertThat(testPublisher.wasCancelled()).isTrue(); } + @Test + void shouldErrorWhenResubscribingAfterCancel() { + TestPublisher testPublisher = TestPublisher.create(); + + AtomicReference> intercepted = new AtomicReference<>(); + + Flux> flux = + testPublisher.flux().switchOnFirst((s, f) -> { + intercepted.set(f); + // Due to wrapping we can avoid subscribing to f and subscribe later + // (re-subscribing is not allowed) + return Mono.just(f); + }); + + StepVerifier.create(flux) + .expectSubscription() + .then(() -> testPublisher.next(1L)) + .thenCancel() + .verify(Duration.ofSeconds(2)); + + Flux switchOnFirstMain = intercepted.get(); + + StepVerifier.create(switchOnFirstMain, + StepVerifierOptions.create() + .scenarioName("Expect immediate onError from SwitchOnFirstMain as the outer operator has completed")) + .expectError(IllegalStateException.class) + .verify(Duration.ofSeconds(1)); + } + @Test public void scanOperator(){ Flux parent = Flux.just(1);