From 4b7772cb3eca154aaa486ff487dca34a1b882345 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 30 Apr 2013 21:47:52 -0700 Subject: [PATCH] Observable.toFuture --- rxjava-core/src/main/java/rx/Observable.java | 51 ++++-- .../java/rx/operators/OperationToFuture.java | 167 ++++++++++++++++++ 2 files changed, 208 insertions(+), 10 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationToFuture.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 48a00638a5..5bcfa312fe 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -42,9 +42,9 @@ import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; -import rx.operators.OperationGroupBy; import rx.operators.OperationFilter; import rx.operators.OperationFinally; +import rx.operators.OperationGroupBy; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -64,6 +64,7 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationToFuture; import rx.operators.OperationToIterator; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -590,9 +591,11 @@ public void call(Object args) { /** * Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. - * - * @param subject the subject to push source elements into. - * @param result type + * + * @param subject + * the subject to push source elements into. + * @param + * result type * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public ConnectableObservable multicast(Subject subject) { @@ -2005,6 +2008,19 @@ public Boolean call(T t, Integer integer) })); } + /** + * Return a Future representing a single value of the Observable. + *

+ * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). + * + * @param that + * the source Observable + * @returna Future that expects a single item emitted by the source Observable + */ + public static Future toFuture(final Observable that) { + return OperationToFuture.toFuture(that); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -2088,11 +2104,15 @@ public static Iterable mostRecent(Observable source, T initialValue) { /** * Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. - * - * @param source the source sequence whose elements will be pushed into the specified subject. - * @param subject the subject to push source elements into. - * @param source type - * @param result type + * + * @param source + * the source sequence whose elements will be pushed into the specified subject. + * @param subject + * the subject to push source elements into. + * @param + * source type + * @param + * result type * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public static ConnectableObservable multicast(Observable source, final Subject subject) { @@ -2101,7 +2121,7 @@ public static ConnectableObservable multicast(Observable source, fi /** * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. - * + * * @param that * the source Observable * @return The single element in the observable sequence. @@ -3360,6 +3380,17 @@ public Observable takeUntil(Observable other) { return takeUntil(this, other); } + /** + * Return a Future representing a single value of the Observable. + *

+ * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). + * + * @returna Future that expects a single item emitted by the source Observable + */ + public Future toFuture() { + return toFuture(this); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationToFuture.java b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java new file mode 100644 index 0000000000..89b2bc294e --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java @@ -0,0 +1,167 @@ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +/** + * Convert an Observable into a Future. + */ +public class OperationToFuture { + + /** + * Returns a Future that expects a single item from the observable. + * + * @param that + * an observable sequence to get a Future for. + * @param + * the type of source. + * @return the Future to retrieve a single elements from an Observable + */ + public static Future toFuture(Observable that) { + + final CountDownLatch finished = new CountDownLatch(1); + final AtomicReference value = new AtomicReference(); + final AtomicReference error = new AtomicReference(); + + final Subscription s = that.subscribe(new Observer() { + + @Override + public void onCompleted() { + finished.countDown(); + } + + @Override + public void onError(Exception e) { + error.compareAndSet(null, e); + finished.countDown(); + } + + @Override + public void onNext(T v) { + if (!value.compareAndSet(null, v)) { + // this means we received more than one value and must fail as a Future can handle only a single value + error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected.")); + finished.countDown(); + } + } + }); + + return new Future() { + + private volatile boolean cancelled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (finished.getCount() > 0) { + cancelled = true; + s.unsubscribe(); + // release the latch (a race condition may have already released it by now) + finished.countDown(); + return true; + } else { + // can't cancel + return false; + } + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return finished.getCount() == 0; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + finished.await(); + return getValue(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (finished.await(timeout, unit)) { + return getValue(); + } else { + throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable."); + } + } + + private T getValue() throws ExecutionException { + if (error.get() != null) { + throw new ExecutionException("Observable onError", error.get()); + } else { + return value.get(); + } + } + + }; + + } + + @Test + public void testToFuture() throws InterruptedException, ExecutionException { + Observable obs = Observable.toObservable("one"); + Future f = toFuture(obs); + assertEquals("one", f.get()); + } + + @Test + public void testToFutureList() throws InterruptedException, ExecutionException { + Observable obs = Observable.toObservable("one", "two", "three"); + Future> f = toFuture(obs.toList()); + assertEquals("one", f.get().get(0)); + assertEquals("two", f.get().get(1)); + assertEquals("three", f.get().get(2)); + } + + @Test(expected = ExecutionException.class) + public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException { + Observable obs = Observable.toObservable("one", "two"); + Future f = toFuture(obs); + assertEquals("one", f.get()); + // we expect an exception since there are more than 1 element + } + + @Test + public void testToFutureWithException() { + Observable obs = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onError(new TestException()); + return Subscriptions.empty(); + } + }); + + Future f = toFuture(obs); + try { + f.get(); + fail("expected exception"); + } catch (Exception e) { + assertEquals(TestException.class, e.getCause().getClass()); + } + } + + private static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } +}