From 1095da8dfb97acc3ae24a89adf1584a299b6c50d Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 8 Sep 2021 16:59:28 +0100 Subject: [PATCH] Polishing contribution See gh-27331 --- .../core/ReactiveAdapterRegistry.java | 21 +++++----- .../core/ReactiveAdapterRegistryTests.java | 40 ++++++++++--------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 4cbb90c0c606..cfeaffcaf734 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -24,8 +24,6 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; import kotlinx.coroutines.CompletableDeferredKt; import kotlinx.coroutines.Deferred; import org.reactivestreams.Publisher; @@ -85,7 +83,7 @@ public class ReactiveAdapterRegistry { rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader); flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader); kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader); - mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Uni", classLoader); + mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader); } private final List adapters = new ArrayList<>(); @@ -427,19 +425,24 @@ void registerAdapters(ReactiveAdapterRegistry registry) { } } + private static class MutinyRegistrar { void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(Uni.class, ()-> Uni.createFrom().nothing()), - uni ->((Uni)uni).convert().toPublisher(), - publisher -> Uni.createFrom().publisher(publisher) + ReactiveTypeDescriptor.singleOptionalValue( + io.smallrye.mutiny.Uni.class, + () -> io.smallrye.mutiny.Uni.createFrom().nothing()), + uni -> ((io.smallrye.mutiny.Uni) uni).convert().toPublisher(), + publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher) ); registry.registerReactiveType( - ReactiveTypeDescriptor.multiValue(Multi.class, ()-> Multi.createFrom().empty()), - multi -> (Multi) multi, - publisher-> Multi.createFrom().publisher(publisher) + ReactiveTypeDescriptor.multiValue( + io.smallrye.mutiny.Multi.class, + () -> io.smallrye.mutiny.Multi.createFrom().empty()), + multi -> (io.smallrye.mutiny.Multi) multi, + publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher) ); } } diff --git a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java index 535891e6ff3e..579e1a463afc 100644 --- a/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java +++ b/spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java @@ -40,6 +40,9 @@ @SuppressWarnings("unchecked") class ReactiveAdapterRegistryTests { + private static final Duration ONE_SECOND = Duration.ofSeconds(1); + + private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); @@ -85,7 +88,7 @@ void toFlux() { Publisher source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence); Object target = getAdapter(Flux.class).fromPublisher(source); assertThat(target instanceof Flux).isTrue(); - assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -93,7 +96,7 @@ void toMono() { Publisher source = io.reactivex.rxjava3.core.Flowable.fromArray(1, 2, 3); Object target = getAdapter(Mono.class).fromPublisher(source); assertThat(target instanceof Mono).isTrue(); - assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @Test @@ -110,7 +113,7 @@ void fromCompletableFuture() { future.complete(1); Object target = getAdapter(CompletableFuture.class).toPublisher(future); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } } @@ -155,7 +158,7 @@ void fromObservable() { Object source = rx.Observable.from(sequence); Object target = getAdapter(rx.Observable.class).toPublisher(source); assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -163,7 +166,7 @@ void fromSingle() { Object source = rx.Single.just(1); Object target = getAdapter(rx.Single.class).toPublisher(source); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @Test @@ -171,7 +174,7 @@ void fromCompletable() { Object source = rx.Completable.complete(); Object target = getAdapter(rx.Completable.class).toPublisher(source); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - ((Mono) target).block(Duration.ofMillis(1000)); + ((Mono) target).block(ONE_SECOND); } } @@ -229,7 +232,7 @@ void fromFlowable() { Object source = io.reactivex.Flowable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source); assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -238,7 +241,7 @@ void fromObservable() { Object source = io.reactivex.Observable.fromIterable(sequence); Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source); assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -246,7 +249,7 @@ void fromSingle() { Object source = io.reactivex.Single.just(1); Object target = getAdapter(io.reactivex.Single.class).toPublisher(source); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @Test @@ -254,7 +257,7 @@ void fromCompletable() { Object source = io.reactivex.Completable.complete(); Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - ((Mono) target).block(Duration.ofMillis(1000)); + ((Mono) target).block(ONE_SECOND); } } @@ -312,7 +315,7 @@ void fromFlowable() { Object source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence); Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).toPublisher(source); assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -321,7 +324,7 @@ void fromObservable() { Object source = io.reactivex.rxjava3.core.Observable.fromIterable(sequence); Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).toPublisher(source); assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Flux) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Flux) target).collectList().block(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -329,7 +332,7 @@ void fromSingle() { Object source = io.reactivex.rxjava3.core.Single.just(1); Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).toPublisher(source); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @Test @@ -337,7 +340,7 @@ void fromCompletable() { Object source = io.reactivex.rxjava3.core.Completable.complete(); Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).toPublisher(source); assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue(); - ((Mono) target).block(Duration.ofMillis(1000)); + ((Mono) target).block(ONE_SECOND); } } @@ -359,7 +362,6 @@ void deferred() { } } - // SmallRye Mutiny @Nested class Mutiny { @@ -374,7 +376,7 @@ void toUni() { Publisher source = Mono.just(1); Object target = getAdapter(Uni.class).fromPublisher(source); assertThat(target).isInstanceOf(Uni.class); - assertThat(((Uni) target).await().atMost(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Uni) target).await().atMost(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @Test @@ -382,7 +384,7 @@ void fromUni() { Uni source = Uni.createFrom().item(1); Object target = getAdapter(Uni.class).toPublisher(source); assertThat(target).isInstanceOf(Mono.class); - assertThat(((Mono) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1)); + assertThat(((Mono) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1)); } @Test @@ -391,7 +393,7 @@ void toMulti() { Publisher source = Flux.fromIterable(sequence); Object target = getAdapter(Multi.class).fromPublisher(source); assertThat(target).isInstanceOf(Multi.class); - assertThat(((Multi) target).collect().asList().await().atMost(Duration.ofMillis(1000))).isEqualTo(sequence); + assertThat(((Multi) target).collect().asList().await().atMost(ONE_SECOND)).isEqualTo(sequence); } @Test @@ -400,7 +402,7 @@ void fromMulti() { Multi source = Multi.createFrom().iterable(sequence); Object target = getAdapter(Multi.class).toPublisher(source); assertThat(target).isInstanceOf(Flux.class); - assertThat(((Flux) target).blockLast(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(3)); + assertThat(((Flux) target).blockLast(ONE_SECOND)).isEqualTo(Integer.valueOf(3)); } }