Skip to content

Commit

Permalink
Merge #3939 into 3.7.1
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Nov 21, 2024
2 parents 1e98ddb + a7fe500 commit da340cd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -716,7 +716,7 @@ public final void subscribe(CoreSubscriber<? super T> inboundSubscriber) {
return;
}

if (hasInboundClosedPrematurely(previousState)) {
if (hasInboundCancelled(previousState) ||hasInboundClosedPrematurely(previousState)) {
Operators.error(inboundSubscriber, new CancellationException("FluxSwitchOnFirst has already been cancelled"));
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,7 @@
import reactor.core.scheduler.Schedulers;
import reactor.test.MockUtils;
import reactor.test.StepVerifier;
import reactor.test.StepVerifierOptions;
import reactor.test.publisher.TestPublisher;
import reactor.test.scheduler.VirtualTimeScheduler;
import reactor.test.util.RaceTestUtils;
Expand Down Expand Up @@ -1386,6 +1387,35 @@ public void shouldCancelSourceOnOnDownstreamTerminal() {
assertThat(testPublisher.wasCancelled()).isTrue();
}

@Test
void shouldErrorWhenResubscribingAfterCancel() {
TestPublisher<Long> testPublisher = TestPublisher.create();

AtomicReference<Flux<Long>> intercepted = new AtomicReference<>();

Flux<Flux<Long>> flux =
testPublisher.flux().switchOnFirst((s, f) -> {
intercepted.set(f);
// Due to wrapping we can avoid subscribing to f and subscribe later
// (re-subscribing is not allowed)
return Mono.just(f);
});

StepVerifier.create(flux)
.expectSubscription()
.then(() -> testPublisher.next(1L))
.thenCancel()
.verify(Duration.ofSeconds(2));

Flux<Long> switchOnFirstMain = intercepted.get();

StepVerifier.create(switchOnFirstMain,
StepVerifierOptions.create()
.scenarioName("Expect immediate onError from SwitchOnFirstMain as the outer operator has completed"))
.expectError(IllegalStateException.class)
.verify(Duration.ofSeconds(1));
}

@Test
public void scanOperator(){
Flux<Integer> parent = Flux.just(1);
Expand Down

0 comments on commit da340cd

Please sign in to comment.