Skip to content

Commit

Permalink
fix #743 Let Mono#fromCallable/Supplier map null result to empty Mono
Browse files Browse the repository at this point in the history
Also fix a few missing @nullable and javadocs

Root cause was actually fusionState not being updated to COMPLETE in
the null case of FluxSubscribeOnCallable
  • Loading branch information
simonbasle authored Sep 15, 2017
1 parent 45b3e0a commit f7a2e00
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
Expand All @@ -40,19 +40,22 @@ public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T> wrapper = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(wrapper);

T v;
try {
v = Objects.requireNonNull(callable.call(), "callable returned null");
T v = callable.call();
if (v == null) {
wrapper.onComplete();
}
else {
wrapper.complete(v);
}
}
catch (Throwable ex) {
actual.onError(Operators.onOperatorError(ex, actual.currentContext()));
return;
}

wrapper.complete(v);
}

@Override
@Nullable
public T call() throws Exception {
return callable.call();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,17 @@ public void run() {
return;
}

if (v == null) {
fusionState = COMPLETE;
actual.onComplete();
return;
}

for (; ; ) {
int s = state;
if (s == HAS_CANCELLED || s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) {
return;
}
if(v == null){
actual.onComplete();
return;
}
if (s == HAS_REQUEST_NO_VALUE) {
if (fusionState == NO_VALUE) {
this.value = v;
Expand Down
6 changes: 4 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ public static <T> Mono<T> from(Publisher<? extends T> source) {
}

/**
* Create a {@link Mono} producing its value using the provided {@link Callable}.
* Create a {@link Mono} producing its value using the provided {@link Callable}. If
* the Callable resolves to {@code null}, the resulting Mono completes empty.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/fromcallable.png" alt="">
Expand Down Expand Up @@ -403,7 +404,8 @@ public static <T> Mono<T> fromRunnable(Runnable runnable) {
}

/**
* Create a {@link Mono}, producing its value using the provided {@link Supplier}.
* Create a {@link Mono}, producing its value using the provided {@link Supplier}. If
* the Supplier resolves to {@code null}, the resulting Mono completes empty.
*
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.0.RC1/src/docs/marble/fromsupplier.png" alt="">
Expand Down
25 changes: 13 additions & 12 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
Expand All @@ -43,7 +44,6 @@ final class MonoCallable<T>

@Override
public void subscribe(CoreSubscriber<? super T> actual) {

Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);

Expand All @@ -53,41 +53,42 @@ public void subscribe(CoreSubscriber<? super T> actual) {
return;
}

T t;
try {
t = Objects.requireNonNull(callable.call(), "callable returned null");
T t = callable.call();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
return;
}

sds.complete(t);
}

@Override
@Nullable
public T block() {
//duration is ignored below
return block(Duration.ZERO);
}

@Override
@Nullable
public T block(Duration m) {
try {
return Objects.requireNonNull(callable.call(),
"The callable source returned null");
return callable.call();
}
catch (Throwable e) {
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
}
throw Exceptions.propagate(e);
}
}

@Override
@Nullable
public T call() throws Exception {
return Objects.requireNonNull(callable.call(),
"The callable source returned null");
return callable.call();
}
}
25 changes: 14 additions & 11 deletions reactor-core/src/main/java/reactor/core/publisher/MonoSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.Callable;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;

Expand All @@ -41,7 +43,6 @@ final class MonoSupplier<T>

@Override
public void subscribe(CoreSubscriber<? super T> actual) {

Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);

Expand All @@ -51,34 +52,36 @@ public void subscribe(CoreSubscriber<? super T> actual) {
return;
}

T t;
try {
t = Objects.requireNonNull(supplier.get(),
"The supplier source returned null");
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
return;
}

sds.complete(t);
}

@Override
@Nullable
public T block(Duration m) {
return Objects.requireNonNull(supplier.get(),
"The supplier source returned null");
return supplier.get();
}

@Override
@Nullable
public T block() {
//the duration is ignored above
return block(Duration.ZERO);
}

@Override
@Nullable
public T call() throws Exception {
return Objects.requireNonNull(supplier.get(),
"The supplier source returned null");
return supplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public class FluxCallableTest {
public void callableReturnsNull() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();

Mono.<Integer>fromCallable(() -> null).flux()
Mono.<Integer>fromCallable(() -> null).log().flux()
.subscribe(ts);

ts.assertNoValues()
.assertNotComplete()
.assertError(NullPointerException.class);
.assertNoError()
.assertComplete();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,20 @@ protected List<Scenario<String, String>> scenarios_operatorSuccess() {
scenario(f -> f.concatMapDelayError(d -> Flux.empty(), true, 32))
.receiverEmpty(),

scenario(f -> f.concatMap(Flux::just, 1)).prefetch(1)
scenario(f -> f.concatMap(Flux::just, 1)).prefetch(1),

//scenarios with fromCallable(() -> null)
scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null)))
.receiverEmpty(),
scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null), 1))
.prefetch(1)
.receiverEmpty(),
scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null)))
.shouldHitDropErrorHookAfterTerminate(true)
.receiverEmpty(),
scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null), true, 32))
.shouldHitDropErrorHookAfterTerminate(true)
.receiverEmpty()
);
}

Expand All @@ -94,20 +107,13 @@ protected List<Scenario<String, String>> scenarios_operatorError() {
throw exception();
})),

scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null)))
,

scenario(f -> f.concatMap(d -> null))
,

scenario(f -> f.concatMap(d -> {
throw exception();
}, 1)).prefetch(1),

scenario(f -> f.concatMap(d -> Mono.fromCallable(() -> null), 1))
.prefetch(1)
,

scenario(f -> f.concatMap(d -> null, 1))
.prefetch(1)
,
Expand All @@ -117,10 +123,6 @@ protected List<Scenario<String, String>> scenarios_operatorError() {
}))
.shouldHitDropErrorHookAfterTerminate(true),

scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null)))
.shouldHitDropErrorHookAfterTerminate(true)
,

scenario(f -> f.concatMapDelayError(d -> null))
.shouldHitDropErrorHookAfterTerminate(true)
,
Expand All @@ -130,10 +132,6 @@ protected List<Scenario<String, String>> scenarios_operatorError() {
}, true, 32))
.shouldHitDropErrorHookAfterTerminate(true),

scenario(f -> f.concatMapDelayError(d -> Mono.fromCallable(() -> null), true, 32))
.shouldHitDropErrorHookAfterTerminate(true)
,

scenario(f -> f.concatMapDelayError(d -> null, true, 32))
.shouldHitDropErrorHookAfterTerminate(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,14 @@ public void failScalarMapCallableError() {
}

@Test
public void failMapCallableNullError() {
public void prematureScalarMapCallableNullComplete() {
StepVerifier.create(Mono.just(1)
.flatMapMany(f -> Mono.fromCallable(() -> null)))
.verifyError(NullPointerException.class);
.verifyComplete();
}

@Test
public void prematureScalarMapCallableNullComplete() {
public void prematureScalarMapCallableEmptyComplete() {
StepVerifier.create(Mono.just(1)
.flatMapMany(f -> Mono.empty()))
.verifyComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ public void callablePath() {
StepVerifier.create(Mono.fromCallable(() -> null)
.flux()
.publishOn(Schedulers.immediate()))
.verifyError(NullPointerException.class);
.verifyComplete();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ public void callableReturnsNull() {
.verifyComplete();
}

@Test
public void callableReturnsNull2() {
StepVerifier.create(Mono.fromCallable(() -> null)
.flux()
.subscribeOn(Schedulers.single()), 0)
.verifyComplete();
}

@Test
public void callableReturnsNull3() {
StepVerifier.create(Mono.fromCallable(() -> null)
.flux()
.subscribeOn(Schedulers.single()), 1)
.verifyComplete();
}

@Test
public void normal() {
StepVerifier.create(Mono.fromCallable(() -> 1)
Expand Down Expand Up @@ -86,6 +102,26 @@ public void callableReturnsNullFused() {
.verifyComplete();
}

@Test
public void callableReturnsNullFused2() {
StepVerifier.create(Mono.fromCallable(() -> null)
.flux()
.subscribeOn(Schedulers.single())
.doOnNext(v -> System.out.println(v)), 1)
.expectFusion(Fuseable.ASYNC)
.thenRequest(1)
.verifyComplete();
}

@Test
public void callableReturnsNullFused3() {
StepVerifier.create(Mono.fromCallable(() -> null)
.flux()
.subscribeOn(Schedulers.single()), 0)
.expectFusion(Fuseable.ASYNC)
.verifyComplete();
}

@Test
public void normalFused() {
StepVerifier.create(Mono.fromCallable(() -> 1)
Expand Down
Loading

0 comments on commit f7a2e00

Please sign in to comment.