Skip to content
This repository has been archived by the owner on Jun 15, 2023. It is now read-only.

Commit

Permalink
Issue #13: Add utilities in PromiseHelper to deal with maps, and empt…
Browse files Browse the repository at this point in the history
…y Monos
  • Loading branch information
magnet committed Aug 20, 2017
1 parent 8d07490 commit ea280f7
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/main/java/io/primeval/codex/promise/PromiseHelper.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package io.primeval.codex.promise;

import java.lang.reflect.InvocationTargetException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.FailedPromisesException;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.Promises;

import io.primeval.codex.util.Procedure;
import io.primeval.common.function.FallibleFunction;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
Expand Down Expand Up @@ -53,6 +59,13 @@ public static <T> Promise<T> wrap(Callable<T> callable) {
return wrap(Function.identity(), callable);
}

public static Promise<Void> wrap(Procedure procedure) {
return wrap(Function.identity(), () -> {
procedure.call();
return null;
});
}

public static <T> Promise<T> wrap(Function<Throwable, Throwable> wrapException, Callable<T> callable) {
try {
return Promises.resolved(callable.call());
Expand Down Expand Up @@ -168,10 +181,26 @@ public static <T> Mono<T> toMono(Promise<T> promise) {
return monoP;
}

public static <T> Mono<T> toMonoOptional(Promise<Optional<T>> promise) {
MonoProcessor<T> monoP = MonoProcessor.create();
onResolve(promise, success -> {
if (success.isPresent()) {
monoP.onNext(success.get());
} else {
monoP.onComplete();
}
}, error -> monoP.onError(error));
return monoP;
}

public static <T> Mono<T> toMono(Callable<Promise<T>> promise) {
return Mono.fromCallable(promise).then(PromiseHelper::toMono);
}

public static <T> Mono<T> toMonoOptional(Callable<Promise<Optional<T>>> promise) {
return Mono.fromCallable(promise).then(PromiseHelper::toMonoOptional);
}

public static <T> Promise<T> fromMono(Mono<T> mono) {
Deferred<T> deferred = new Deferred<>();
mono.doOnTerminate((value, error) -> {
Expand All @@ -184,6 +213,29 @@ public static <T> Promise<T> fromMono(Mono<T> mono) {
return deferred.getPromise();
}

public static <T> Promise<Optional<T>> fromMonoOptional(Mono<T> mono) {
return fromMono(mono).map(Optional::ofNullable);
}

public static <T> Collector<Promise<T>, ?, Promise<List<T>>> collector() {
return Collector.<Promise<T>, ArrayList<Promise<T>>, Promise<List<T>>> of(
ArrayList::new,
ArrayList::add,
(left, right) -> {
left.addAll(right);
return left;
},
builder -> Promises.all(builder));
}

public static <T, U> Promise<Map<T, U>> allMap(Map<T, Promise<U>> input) {
Promise<List<Map.Entry<T, U>>> pms = Promises
.all(input.entrySet()
.stream().map(e -> e.getValue().map(p -> new AbstractMap.SimpleImmutableEntry<>(e.getKey(), p)))
.collect(Collectors.toList()));
return pms.map(l -> l.stream().collect(Collectors.<Map.Entry<T, U>, T, U> toMap(e -> e.getKey(), e -> e.getValue())));
}

}

final class AllSuccessful<T> implements Runnable {
Expand Down

0 comments on commit ea280f7

Please sign in to comment.