Skip to content

Commit

Permalink
Polishing contribution
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored and lxbzmy committed Mar 26, 2022
1 parent 7d72157 commit 1095da8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import kotlinx.coroutines.CompletableDeferredKt;
import kotlinx.coroutines.Deferred;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -85,7 +83,7 @@ public class ReactiveAdapterRegistry {
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader);
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Uni", classLoader);
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
}

private final List<ReactiveAdapter> adapters = new ArrayList<>();
Expand Down Expand Up @@ -427,19 +425,24 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
}
}


private static class MutinyRegistrar {

void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Uni.class, ()-> Uni.createFrom().nothing()),
uni ->((Uni<?>)uni).convert().toPublisher(),
publisher -> Uni.createFrom().publisher(publisher)
ReactiveTypeDescriptor.singleOptionalValue(
io.smallrye.mutiny.Uni.class,
() -> io.smallrye.mutiny.Uni.createFrom().nothing()),
uni -> ((io.smallrye.mutiny.Uni<?>) uni).convert().toPublisher(),
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher)
);

registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Multi.class, ()-> Multi.createFrom().empty()),
multi -> (Multi<?>) multi,
publisher-> Multi.createFrom().publisher(publisher)
ReactiveTypeDescriptor.multiValue(
io.smallrye.mutiny.Multi.class,
() -> io.smallrye.mutiny.Multi.createFrom().empty()),
multi -> (io.smallrye.mutiny.Multi<?>) multi,
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
@SuppressWarnings("unchecked")
class ReactiveAdapterRegistryTests {

private static final Duration ONE_SECOND = Duration.ofSeconds(1);


private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();


Expand Down Expand Up @@ -85,15 +88,15 @@ void toFlux() {
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
assertThat(target instanceof Flux).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
void toMono() {
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
assertThat(target instanceof Mono).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
Expand All @@ -110,7 +113,7 @@ void fromCompletableFuture() {
future.complete(1);
Object target = getAdapter(CompletableFuture.class).toPublisher(future);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
}

Expand Down Expand Up @@ -155,23 +158,23 @@ void fromObservable() {
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
void fromSingle() {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
void fromCompletable() {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
((Mono<Void>) target).block(ONE_SECOND);
}
}

Expand Down Expand Up @@ -229,7 +232,7 @@ void fromFlowable() {
Object source = io.reactivex.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
Expand All @@ -238,23 +241,23 @@ void fromObservable() {
Object source = io.reactivex.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
void fromSingle() {
Object source = io.reactivex.Single.just(1);
Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
void fromCompletable() {
Object source = io.reactivex.Completable.complete();
Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
((Mono<Void>) target).block(ONE_SECOND);
}
}

Expand Down Expand Up @@ -312,7 +315,7 @@ void fromFlowable() {
Object source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
Expand All @@ -321,23 +324,23 @@ void fromObservable() {
Object source = io.reactivex.rxjava3.core.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
void fromSingle() {
Object source = io.reactivex.rxjava3.core.Single.just(1);
Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
void fromCompletable() {
Object source = io.reactivex.rxjava3.core.Completable.complete();
Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
((Mono<Void>) target).block(ONE_SECOND);
}
}

Expand All @@ -359,7 +362,6 @@ void deferred() {
}
}

// SmallRye Mutiny
@Nested
class Mutiny {

Expand All @@ -374,15 +376,15 @@ void toUni() {
Publisher<Integer> source = Mono.just(1);
Object target = getAdapter(Uni.class).fromPublisher(source);
assertThat(target).isInstanceOf(Uni.class);
assertThat(((Uni<Integer>) target).await().atMost(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Uni<Integer>) target).await().atMost(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
void fromUni() {
Uni<Integer> source = Uni.createFrom().item(1);
Object target = getAdapter(Uni.class).toPublisher(source);
assertThat(target).isInstanceOf(Mono.class);
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
Expand All @@ -391,7 +393,7 @@ void toMulti() {
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(Multi.class).fromPublisher(source);
assertThat(target).isInstanceOf(Multi.class);
assertThat(((Multi<Integer>) target).collect().asList().await().atMost(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Multi<Integer>) target).collect().asList().await().atMost(ONE_SECOND)).isEqualTo(sequence);
}

@Test
Expand All @@ -400,7 +402,7 @@ void fromMulti() {
Multi<Integer> source = Multi.createFrom().iterable(sequence);
Object target = getAdapter(Multi.class).toPublisher(source);
assertThat(target).isInstanceOf(Flux.class);
assertThat(((Flux<Integer>) target).blockLast(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(3));
assertThat(((Flux<Integer>) target).blockLast(ONE_SECOND)).isEqualTo(Integer.valueOf(3));
}

}
Expand Down

0 comments on commit 1095da8

Please sign in to comment.