Skip to content

Commit

Permalink
fix #1056 doOnEach correctly passes itself to onSubscribe
Browse files Browse the repository at this point in the history
This commit ensures that the FluxDoOnEach DoOnEachSubscriber correctly
passes itself to actual.onSubscribe rather than its own upstream
Subscription, thus ensuring that no short-circuiting fusion is
established between the downstream and upstream.
  • Loading branch information
simonbasle committed Feb 3, 2018
1 parent 0fc43f3 commit a5647fc
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void cancel() {
@Override
public void onSubscribe(Subscription s) {
this.s = s;
actual.onSubscribe(s);
actual.onSubscribe(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -29,6 +30,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.StepVerifierOptions;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -76,6 +78,26 @@ else if (s.isOnComplete()) {
Assert.assertEquals(2, state.intValue());
}


//see https://github.com/reactor/reactor-core/issues/1056
@Test
public void fusion() {
AtomicInteger invocationCount = new AtomicInteger();

Flux<String> sourceWithFusionAsync = Flux.just("foo")
.publishOn(Schedulers.elastic())
.flatMap(v -> Flux.just("flatMap_" + v)
.doOnEach(sig -> invocationCount.incrementAndGet())
);

StepVerifier.create(sourceWithFusionAsync)
.expectNoFusionSupport()
.expectNext("flatMap_foo")
.verifyComplete();

assertThat(invocationCount).as("doOnEach invoked").hasValue(2);
}

@Test
public void error() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Expand Down

0 comments on commit a5647fc

Please sign in to comment.