diff --git a/src/main/java/io/primeval/codex/promise/PromiseHelper.java b/src/main/java/io/primeval/codex/promise/PromiseHelper.java index 3ce967c..a94b007 100644 --- a/src/main/java/io/primeval/codex/promise/PromiseHelper.java +++ b/src/main/java/io/primeval/codex/promise/PromiseHelper.java @@ -1,20 +1,26 @@ package io.primeval.codex.promise; import java.lang.reflect.InvocationTargetException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Collectors; import org.osgi.util.promise.Deferred; import org.osgi.util.promise.FailedPromisesException; import org.osgi.util.promise.Promise; import org.osgi.util.promise.Promises; +import io.primeval.codex.util.Procedure; import io.primeval.common.function.FallibleFunction; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; @@ -53,6 +59,13 @@ public static Promise wrap(Callable callable) { return wrap(Function.identity(), callable); } + public static Promise wrap(Procedure procedure) { + return wrap(Function.identity(), () -> { + procedure.call(); + return null; + }); + } + public static Promise wrap(Function wrapException, Callable callable) { try { return Promises.resolved(callable.call()); @@ -168,10 +181,26 @@ public static Mono toMono(Promise promise) { return monoP; } + public static Mono toMonoOptional(Promise> promise) { + MonoProcessor monoP = MonoProcessor.create(); + onResolve(promise, success -> { + if (success.isPresent()) { + monoP.onNext(success.get()); + } else { + monoP.onComplete(); + } + }, error -> monoP.onError(error)); + return monoP; + } + public static Mono toMono(Callable> promise) { return Mono.fromCallable(promise).then(PromiseHelper::toMono); } + public static Mono toMonoOptional(Callable>> promise) { + return Mono.fromCallable(promise).then(PromiseHelper::toMonoOptional); + } + public static Promise fromMono(Mono mono) { Deferred deferred = new Deferred<>(); mono.doOnTerminate((value, error) -> { @@ -184,6 +213,29 @@ public static Promise fromMono(Mono mono) { return deferred.getPromise(); } + public static Promise> fromMonoOptional(Mono mono) { + return fromMono(mono).map(Optional::ofNullable); + } + + public static Collector, ?, Promise>> collector() { + return Collector., ArrayList>, Promise>> of( + ArrayList::new, + ArrayList::add, + (left, right) -> { + left.addAll(right); + return left; + }, + builder -> Promises.all(builder)); + } + + public static Promise> allMap(Map> input) { + Promise>> pms = Promises + .all(input.entrySet() + .stream().map(e -> e.getValue().map(p -> new AbstractMap.SimpleImmutableEntry<>(e.getKey(), p))) + .collect(Collectors.toList())); + return pms.map(l -> l.stream().collect(Collectors., T, U> toMap(e -> e.getKey(), e -> e.getValue()))); + } + } final class AllSuccessful implements Runnable {