diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java new file mode 100644 index 0000000000..d8fcf88b87 --- /dev/null +++ b/src/main/java/rx/Single.java @@ -0,0 +1,1858 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package rx; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import rx.Observable.Operator; +import rx.annotations.Experimental; +import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.functions.Func3; +import rx.functions.Func4; +import rx.functions.Func5; +import rx.functions.Func6; +import rx.functions.Func7; +import rx.functions.Func8; +import rx.functions.Func9; +import rx.internal.operators.OnSubscribeToObservableFuture; +import rx.internal.operators.OperatorMap; +import rx.internal.operators.OperatorObserveOn; +import rx.internal.operators.OperatorOnErrorReturn; +import rx.internal.operators.OperatorSubscribeOn; +import rx.internal.operators.OperatorTimeout; +import rx.internal.operators.OperatorZip; +import rx.internal.producers.SingleDelayedProducer; +import rx.observers.SafeSubscriber; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaPlugins; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; + +/** + * The Single class that implements the Reactive Pattern for a single value response. See {@link Observable} for a stream or vector of values. + *

+ * This behaves the same as an {@link Observable} except that it can only emit either a single successful value, or an error. + *

+ * Like an {@link Observable} it is lazy, can be either "hot" or "cold", synchronous or asynchronous. + *

+ * The documentation for this class makes use of marble diagrams. The following legend explains these diagrams: + *

+ * + *

+ * For more information see the ReactiveX documentation. + * + * @param + * the type of the item emitted by the Single + */ +@Experimental +public class Single { + + final Observable.OnSubscribe onSubscribe; + + /** + * Creates a Single with a Function to execute when it is subscribed to (executed). + *

+ * Note: Use {@link #create(OnExecute)} to create a Single, instead of this constructor, + * unless you specifically have a need for inheritance. + * + * @param f + * {@link OnExecute} to be executed when {@link #execute(SingleSubscriber)} or {@link #subscribe(Subscriber)} is called + */ + protected Single(final OnSubscribe f) { + // bridge between OnSubscribe (which all Operators and Observables use) and OnExecute (for Single) + this.onSubscribe = new Observable.OnSubscribe() { + + @Override + public void call(final Subscriber child) { + final SingleDelayedProducer producer = new SingleDelayedProducer(child); + child.setProducer(producer); + SingleSubscriber ss = new SingleSubscriber() { + + @Override + public void onSuccess(T value) { + producer.setValue(value); + } + + @Override + public void onError(Throwable error) { + child.onError(error); + } + + }; + child.add(ss); + f.call(ss); + } + + }; + } + + private Single(final Observable.OnSubscribe f) { + this.onSubscribe = f; + } + + private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); + + /** + * Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or {@link Subscriber} subscribes to it. + *

+ * + *

+ * Write the function you pass to {@code create} so that it behaves as a Single: It should invoke the + * SingleSubscriber {@link SingleSubscriber#onSuccess onSuccess} and {@link SingleSubscriber#onError onError} methods appropriately. + *

+ * A well-formed Single must invoke either the SingleSubscriber's {@code onSuccess} method exactly once or + * its {@code onError} method exactly once. + *

+ *

+ *
Scheduler:
+ *
{@code create} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of the item that this Single emits + * @param f + * a function that accepts an {@code SingleSubscriber}, and invokes its {@code onSuccess} or {@code onError} methods as appropriate + * @return a Single that, when a {@link Subscriber} subscribes to it, will execute the specified function + * @see ReactiveX operators documentation: Create + */ + public final static Single create(OnSubscribe f) { + return new Single(f); // TODO need hook + } + + /** + * Invoked when Single.execute is called. + */ + public static interface OnSubscribe extends Action1> { + // cover for generics insanity + } + + /** + * Lifts a function to the current Single and returns a new Single that when subscribed to will pass + * the values of the current Single through the Operator function. + *

+ * In other words, this allows chaining TaskExecutors together on a Single for acting on the values within + * the Single. + *

{@code + * task.map(...).filter(...).lift(new OperatorA()).lift(new OperatorB(...)).subscribe() + * }

+ * If the operator you are creating is designed to act on the item emitted by a source + * Single, use {@code lift}. If your operator is designed to transform the source Single as a whole + * (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}. + *

+ *
Scheduler:
+ *
{@code lift} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param lift + * the Operator that implements the Single-operating function to be applied to the source Single + * @return a Single that is the result of applying the lifted Operator to the source Single + * @see RxJava wiki: Implementing Your Own Operators + */ + private final Single lift(final Operator lift) { + // This method is private because not sure if we want to expose the Observable.Operator in this public API rather than a Single.Operator + + return new Single(new Observable.OnSubscribe() { + @Override + public void call(Subscriber o) { + try { + final Subscriber st = hook.onLift(lift).call(o); + try { + // new Subscriber created and being subscribed with so 'onStart' it + st.onStart(); + onSubscribe.call(st); + } catch (Throwable e) { + // localized capture of errors rather than it skipping all operators + // and ending up in the try/catch of the subscribe method which then + // prevents onErrorResumeNext and other similar approaches to error handling + if (e instanceof OnErrorNotImplementedException) { + throw (OnErrorNotImplementedException) e; + } + st.onError(e); + } + } catch (Throwable e) { + if (e instanceof OnErrorNotImplementedException) { + throw (OnErrorNotImplementedException) e; + } + // if the lift function failed all we can do is pass the error to the final Subscriber + // as we don't have the operator available to us + o.onError(e); + } + } + }); + } + + /** + * Transform an Observable by applying a particular Transformer function to it. + *

+ * This method operates on the Observable itself whereas {@link #lift} operates on the Observable's + * Subscribers or Observers. + *

+ * If the operator you are creating is designed to act on the individual items emitted by a source + * Observable, use {@link #lift}. If your operator is designed to transform the source Observable as a whole + * (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}. + *

+ *
Scheduler:
+ *
{@code compose} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param transformer + * implements the function that transforms the source Observable + * @return the source Observable, transformed by the transformer function + * @see RxJava wiki: Implementing Your Own Operators + */ + @SuppressWarnings("unchecked") + public Single compose(Transformer transformer) { + return ((Transformer) transformer).call(this); + } + + /** + * Transformer function used by {@link #compose}. + * + * @warn more complete description needed + */ + public static interface Transformer extends Func1, Single> { + // cover for generics insanity + } + + private static Observable toObservable(Single t) { + // is this sufficient, or do I need to keep the outer Single and subscribe to it? + return Observable.create(t.onSubscribe); + } + + /** + * INTERNAL: Used with lift and operators. + * + * Converts the source {@code Single} into an {@code Single>} that emits the + * source Observable as its single emission. + *

+ * + *

+ *
Scheduler:
+ *
{@code nest} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Observable that emits a single item: the source Observable + * @see ReactiveX operators documentation: To + */ + private final Single> nest() { + return Single.just(toObservable(this)); + } + + /* ********************************************************************************************************* + * Operators Below Here + * ********************************************************************************************************* + */ + + /** + * Returns an Observable that emits the items emitted by two Tasks, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * an Single to be concatenated + * @param t2 + * an Single to be concatenated + * @return an Observable that emits items emitted by the two source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2) { + return Observable.concat(toObservable(t1), toObservable(t2)); + } + + /** + * Returns an Observable that emits the items emitted by three Tasks, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @return an Observable that emits items emitted by the three source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3)); + } + + /** + * Returns an Observable that emits the items emitted by four Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @param t4 + * a Single to be concatenated + * @return an Observable that emits items emitted by the four source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3, Single t4) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4)); + } + + /** + * Returns an Observable that emits the items emitted by five Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @param t4 + * a Single to be concatenated + * @param t5 + * a Single to be concatenated + * @return an Observable that emits items emitted by the five source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3, Single t4, Single t5) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5)); + } + + /** + * Returns an Observable that emits the items emitted by six Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @param t4 + * a Single to be concatenated + * @param t5 + * a Single to be concatenated + * @param t6 + * a Single to be concatenated + * @return an Observable that emits items emitted by the six source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6)); + } + + /** + * Returns an Observable that emits the items emitted by seven Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @param t4 + * a Single to be concatenated + * @param t5 + * a Single to be concatenated + * @param t6 + * a Single to be concatenated + * @param t7 + * a Single to be concatenated + * @return an Observable that emits items emitted by the seven source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6, Single t7) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7)); + } + + /** + * Returns an Observable that emits the items emitted by eight Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @param t4 + * a Single to be concatenated + * @param t5 + * a Single to be concatenated + * @param t6 + * a Single to be concatenated + * @param t7 + * a Single to be concatenated + * @param t8 + * a Single to be concatenated + * @return an Observable that emits items emitted by the eight source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6, Single t7, Single t8) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8)); + } + + /** + * Returns an Observable that emits the items emitted by nine Observables, one after the other, without + * interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated + * @param t2 + * a Single to be concatenated + * @param t3 + * a Single to be concatenated + * @param t4 + * a Single to be concatenated + * @param t5 + * a Single to be concatenated + * @param t6 + * a Single to be concatenated + * @param t7 + * a Single to be concatenated + * @param t8 + * a Single to be concatenated + * @param t9 + * a Single to be concatenated + * @return an Observable that emits items emitted by the nine source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final static Observable concat(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6, Single t7, Single t8, Single t9) { + return Observable.concat(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8), toObservable(t9)); + } + + /** + * Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError} method when the + * Observer subscribes to it. + *

+ * + *

+ *
Scheduler:
+ *
{@code error} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param exception + * the particular Throwable to pass to {@link Observer#onError onError} + * @param + * the type of the items (ostensibly) emitted by the Observable + * @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when + * the Observer subscribes to it + * @see ReactiveX operators documentation: Throw + */ + public final static Single error(final Throwable exception) { + return Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber te) { + te.onError(exception); + } + + }); + } + + /** + * Converts a {@link Future} into an Observable. + *

+ * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that emits the + * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} method. + *

+ * Important note: This Observable is blocking; you cannot unsubscribe from it. + *

+ *
Scheduler:
+ *
{@code from} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param future + * the source {@link Future} + * @param + * the type of object that the {@link Future} returns, and also the type of item to be emitted by + * the resulting Observable + * @return an Observable that emits the item from the source {@link Future} + * @see ReactiveX operators documentation: From + */ + public final static Single from(Future future) { + return new Single(OnSubscribeToObservableFuture.toObservableFuture(future)); + } + + /** + * Converts a {@link Future} into an Observable, with a timeout on the Future. + *

+ * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that emits the + * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} method. + *

+ * Important note: This Observable is blocking; you cannot unsubscribe from it. + *

+ *
Scheduler:
+ *
{@code from} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param future + * the source {@link Future} + * @param timeout + * the maximum time to wait before calling {@code get} + * @param unit + * the {@link TimeUnit} of the {@code timeout} argument + * @param + * the type of object that the {@link Future} returns, and also the type of item to be emitted by + * the resulting Observable + * @return an Observable that emits the item from the source {@link Future} + * @see ReactiveX operators documentation: From + */ + public final static Single from(Future future, long timeout, TimeUnit unit) { + return new Single(OnSubscribeToObservableFuture.toObservableFuture(future, timeout, unit)); + } + + /** + * Converts a {@link Future}, operating on a specified {@link Scheduler}, into an Observable. + *

+ * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that emits the + * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} method. + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param future + * the source {@link Future} + * @param scheduler + * the {@link Scheduler} to wait for the Future on. Use a Scheduler such as {@link Schedulers#io()} that can block and wait on the Future + * @param + * the type of object that the {@link Future} returns, and also the type of item to be emitted by + * the resulting Observable + * @return an Observable that emits the item from the source {@link Future} + * @see ReactiveX operators documentation: From + */ + public final static Single from(Future future, Scheduler scheduler) { + return new Single(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler); + } + + /** + * Returns an Observable that emits a single item and then completes. + *

+ * + *

+ * To convert any object into an Observable that emits that object, pass that object into the {@code just} method. + *

+ * This is similar to the {@link #from(java.lang.Object[])} method, except that {@code from} will convert + * an {@link Iterable} object into an Observable that emits each of the items in the Iterable, one at a + * time, while the {@code just} method converts an Iterable into an Observable that emits the entire + * Iterable as a single item. + *

+ *
Scheduler:
+ *
{@code just} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param value + * the item to emit + * @param + * the type of that item + * @return an Observable that emits {@code value} as a single item and then completes + * @see ReactiveX operators documentation: Just + */ + public final static Single just(final T value) { + // TODO add similar optimization as ScalarSynchronousObservable + return Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber te) { + te.onSuccess(value); + } + + }); + } + + /** + * Flattens a Single that emits a Single into a single Single that emits the items emitted by + * the nested Single, without any transformation. + *

+ * + *

+ *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param source + * a Single that emits a Single + * @return a Single that emits the item that is the result of flattening the Single emitted by the {@code source} Single + * @see ReactiveX operators documentation: Merge + */ + public final static Single merge(final Single> source) { + return Single.create(new OnSubscribe() { + + @Override + public void call(final SingleSubscriber child) { + source.subscribe(new SingleSubscriber>() { + + @Override + public void onSuccess(Single innerSingle) { + innerSingle.subscribe(child); + } + + @Override + public void onError(Throwable error) { + child.onError(error); + } + + }); + } + }); + } + + /** + * Flattens two Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2) { + return Observable.merge(toObservable(t1), toObservable(t2)); + } + + /** + * Flattens three Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3)); + } + + /** + * Flattens four Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @param t4 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3, Single t4) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4)); + } + + /** + * Flattens five Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @param t4 + * a Single to be merged + * @param t5 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3, Single t4, Single t5) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5)); + } + + /** + * Flattens six Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @param t4 + * a Single to be merged + * @param t5 + * a Single to be merged + * @param t6 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6)); + } + + /** + * Flattens seven Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @param t4 + * a Single to be merged + * @param t5 + * a Single to be merged + * @param t6 + * a Single to be merged + * @param t7 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6, Single t7) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7)); + } + + /** + * Flattens eight Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @param t4 + * a Single to be merged + * @param t5 + * a Single to be merged + * @param t6 + * a Single to be merged + * @param t7 + * a Single to be merged + * @param t8 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6, Single t7, Single t8) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8)); + } + + /** + * Flattens nine Observables into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @param t2 + * a Single to be merged + * @param t3 + * a Single to be merged + * @param t4 + * a Single to be merged + * @param t5 + * a Single to be merged + * @param t6 + * a Single to be merged + * @param t7 + * a Single to be merged + * @param t8 + * a Single to be merged + * @param t9 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final static Observable merge(Single t1, Single t2, Single t3, Single t4, Single t5, Single t6, Single t7, Single t8, Single t9) { + return Observable.merge(toObservable(t1), toObservable(t2), toObservable(t3), toObservable(t4), toObservable(t5), toObservable(t6), toObservable(t7), toObservable(t8), toObservable(t9)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * two items emitted, in sequence, by two other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1} and the first item + * emitted by {@code o2}; the second item emitted by the new Observable will be the result of the function + * applied to the second item emitted by {@code o1} and the second item emitted by {@code o2}; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results + * in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, final Func2 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * three items emitted, in sequence, by three other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1}, the first item + * emitted by {@code o2}, and the first item emitted by {@code o3}; the second item emitted by the new + * Observable will be the result of the function applied to the second item emitted by {@code o1}, the + * second item emitted by {@code o2}, and the second item emitted by {@code o3}; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Func3 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1}, the first item + * emitted by {@code o2}, the first item emitted by {@code o3}, and the first item emitted by {@code 04}; + * the second item emitted by the new Observable will be the result of the function applied to the second + * item emitted by each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Single o4, Func4 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * five items emitted, in sequence, by five other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by {@code o1}, the first item + * emitted by {@code o2}, the first item emitted by {@code o3}, the first item emitted by {@code o4}, and + * the first item emitted by {@code o5}; the second item emitted by the new Observable will be the result of + * the function applied to the second item emitted by each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Single o4, Single o5, Func5 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * six items emitted, in sequence, by six other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Single o4, Single o5, Single o6, + Func6 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * seven items emitted, in sequence, by seven other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Single o4, Single o5, Single o6, Single o7, + Func7 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6), toObservable(o7) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * eight items emitted, in sequence, by eight other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param o8 + * an eighth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Single o4, Single o5, Single o6, Single o7, Single o8, + Func8 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6), toObservable(o7), toObservable(o8) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the results of a specified combiner function applied to combinations of + * nine items emitted, in sequence, by nine other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the new Observable + * will be the result of the function applied to the first item emitted by each source Observable, the + * second item emitted by the new Observable will be the result of the function applied to the second item + * emitted by each of those Observables, and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations of the source Observable that + * emits the fewest + * items. + *

+ *
Scheduler:
+ *
{@code zip} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param o1 + * the first source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param o8 + * an eighth source Observable + * @param o9 + * a ninth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source Observables, results in + * an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + * @see ReactiveX operators documentation: Zip + */ + public final static Single zip(Single o1, Single o2, Single o3, Single o4, Single o5, Single o6, Single o7, Single o8, + Single o9, Func9 zipFunction) { + return just(new Observable[] { toObservable(o1), toObservable(o2), toObservable(o3), toObservable(o4), toObservable(o5), toObservable(o6), toObservable(o7), toObservable(o8), toObservable(o9) }).lift(new OperatorZip(zipFunction)); + } + + /** + * Returns an Observable that emits the items emitted from the current Observable, then the next, one after + * the other, without interleaving them. + *

+ * + *

+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be concatenated after the current + * @return an Observable that emits items emitted by the two source Observables, one after the other, + * without interleaving them + * @see ReactiveX operators documentation: Concat + */ + public final Observable concatWith(Single t1) { + return concat(this, t1); + } + + /** + * Returns an Observable that emits items based on applying a function that you supply to each item emitted + * by the source Observable, where that function returns an Observable, and then merging those resulting + * Observables and emitting the results of this merger. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function that, when applied to an item emitted by the source Observable, returns an + * Observable + * @return an Observable that emits the result of applying the transformation function to each item emitted + * by the source Observable and merging the results of the Observables obtained from this + * transformation + * @see ReactiveX operators documentation: FlatMap + */ + public final Single flatMap(final Func1> func) { + return merge(map(func)); + } + + /** + * Returns an Observable that emits items based on applying a function that you supply to each item emitted + * by the source Observable, where that function returns an Observable, and then merging those resulting + * Observables and emitting the results of this merger. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function that, when applied to an item emitted by the source Observable, returns an + * Observable + * @return an Observable that emits the result of applying the transformation function to each item emitted + * by the source Observable and merging the results of the Observables obtained from this + * transformation + * @see ReactiveX operators documentation: FlatMap + */ + public final Observable flatMapObservable(Func1> func) { + return Observable.merge(toObservable(map(func))); + } + + /** + * Returns a Single that applies a specified function to the item emitted by the source Single and + * emits the result of this function applications. + *

+ * + *

+ *
Scheduler:
+ *
{@code map} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param func + * a function to apply to the item emitted by the Single + * @return a Single that emits the item from the source Single, transformed by the specified function + * @see ReactiveX operators documentation: Map + */ + public final Single map(Func1 func) { + return lift(new OperatorMap(func)); + } + + /** + * Flattens this and another Observable into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code mergeWith} method. + *

+ *
Scheduler:
+ *
{@code mergeWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param t1 + * a Single to be merged + * @return an Observable that emits all of the items emitted by the source Observables + * @see ReactiveX operators documentation: Merge + */ + public final Observable mergeWith(Single t1) { + return merge(this, t1); + } + + /** + * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, + * asynchronously with an unbounded buffer. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @return the source Observable modified so that its {@link Observer}s are notified on the specified {@link Scheduler} + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + * @see #subscribeOn + */ + public final Single observeOn(Scheduler scheduler) { + return lift(new OperatorObserveOn(scheduler)); + } + + /** + * Instructs an Observable to emit an item (returned by a specified function) rather than invoking {@link Observer#onError onError} if it encounters an error. + *

+ * + *

+ * By default, when an Observable encounters an error that prevents it from emitting the expected item to + * its {@link Observer}, the Observable invokes its Observer's {@code onError} method, and then quits + * without invoking any more of its Observer's methods. The {@code onErrorReturn} method changes this + * behavior. If you pass a function ({@code resumeFunction}) to an Observable's {@code onErrorReturn} method, if the original Observable encounters an error, instead of invoking its Observer's + * {@code onError} method, it will instead emit the return value of {@code resumeFunction}. + *

+ * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + *

+ *
Scheduler:
+ *
{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param resumeFunction + * a function that returns an item that the new Observable will emit if the source Observable + * encounters an error + * @return the original Observable with appropriately modified behavior + * @see ReactiveX operators documentation: Catch + */ + public final Single onErrorReturn(Func1 resumeFunction) { + return lift(new OperatorOnErrorReturn(resumeFunction)); + } + + /** + * Subscribes to an Observable but ignore its emissions and notifications. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a {@link Subscription} reference can request the {@link Single} stop work. + * @throws OnErrorNotImplementedException + * if the Observable tries to call {@code onError} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe() { + return subscribe(new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + + @Override + public final void onNext(T args) { + // do nothing + } + + }); + } + + /** + * Subscribes to an Observable and provides a callback to handle the items it emits. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onNext + * the {@code Action1} you have designed to accept emissions from the Observable + * @return a {@link Subscription} reference can request the {@link Single} stop work. + * @throws IllegalArgumentException + * if {@code onNext} is null + * @throws OnErrorNotImplementedException + * if the Observable tries to call {@code onError} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe(final Action1 onSuccess) { + if (onSuccess == null) { + throw new IllegalArgumentException("onSuccess can not be null"); + } + + return subscribe(new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + throw new OnErrorNotImplementedException(e); + } + + @Override + public final void onNext(T args) { + onSuccess.call(args); + } + + }); + } + + /** + * Subscribes to an Observable and provides callbacks to handle the items it emits and any error + * notification it issues. + *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onNext + * the {@code Action1} you have designed to accept emissions from the Observable + * @param onError + * the {@code Action1} you have designed to accept any error notification from the + * Observable + * @return a {@link Subscription} reference can request the {@link Single} stop work. + * @see ReactiveX operators documentation: Subscribe + * @throws IllegalArgumentException + * if {@code onNext} is null, or + * if {@code onError} is null + */ + public final Subscription subscribe(final Action1 onSuccess, final Action1 onError) { + if (onSuccess == null) { + throw new IllegalArgumentException("onSuccess can not be null"); + } + if (onError == null) { + throw new IllegalArgumentException("onError can not be null"); + } + + return subscribe(new Subscriber() { + + @Override + public final void onCompleted() { + // do nothing + } + + @Override + public final void onError(Throwable e) { + onError.call(e); + } + + @Override + public final void onNext(T args) { + onSuccess.call(args); + } + + }); + } + + /** + * Subscribes to an Observable and invokes {@link OnSubscribe} function without any contract protection, + * error handling, unsubscribe, or execution hooks. + *

+ * Use this only for implementing an {@link Operator} that requires nested subscriptions. For other + * purposes, use {@link #subscribe(Subscriber)} which ensures the Rx contract and other functionality. + *

+ *
Scheduler:
+ *
{@code unsafeSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param subscriber + * the Subscriber that will handle emissions and notifications from the Observable + */ + public final void unsafeSubscribe(Subscriber subscriber) { + try { + // new Subscriber so onStart it + subscriber.onStart(); + // TODO add back the hook + // hook.onSubscribeStart(this, onSubscribe).call(subscriber); + onSubscribe.call(subscriber); + hook.onSubscribeReturn(subscriber); + } catch (Throwable e) { + // special handling for certain Throwable/Error/Exception types + Exceptions.throwIfFatal(e); + // if an unhandled error occurs executing the onSubscribe we will propagate it + try { + subscriber.onError(hook.onSubscribeError(e)); + } catch (OnErrorNotImplementedException e2) { + // special handling when onError is not implemented ... we just rethrow + throw e2; + } catch (Throwable e2) { + // if this happens it means the onError itself failed (perhaps an invalid function implementation) + // so we are unable to propagate the error correctly and will just throw + RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); + // TODO could the hook be the cause of the error in the on error handling. + hook.onSubscribeError(r); + // TODO why aren't we throwing the hook's return value. + throw r; + } + } + } + + /** + * Subscribes to an Single and provides a Subscriber that implements functions to handle the item the + * Single emits and any error notification it issues. + *

+ * A typical implementation of {@code subscribe} does the following: + *

    + *
  1. It stores a reference to the Subscriber in a collection object, such as a {@code List} object.
  2. + *
  3. It returns a reference to the {@link Subscription} interface. This enables Subscribers to + * unsubscribe, that is, to stop receiving items and notifications before the Observable completes, which + * also invokes the Subscriber's {@link Subscriber#onCompleted onCompleted} method.
  4. + *

+ * An {@code Single} instance is responsible for accepting all subscriptions and notifying all + * Subscribers. Unless the documentation for a particular {@code Single} implementation indicates + * otherwise, Subscriber should make no assumptions about the order in which multiple Subscribers will + * receive their notifications. + *

+ * For more information see the + * ReactiveX documentation. + *

+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param subscriber + * the {@link Subscriber} that will handle emissions and notifications from the Observable + * @return a {@link Subscription} reference can request the {@link Single} stop work. + * @throws IllegalStateException + * if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function + * @throws IllegalArgumentException + * if the {@link Subscriber} provided as the argument to {@code subscribe} is {@code null} + * @throws OnErrorNotImplementedException + * if the {@link Subscriber}'s {@code onError} method is null + * @throws RuntimeException + * if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe(Subscriber subscriber) { + // validate and proceed + if (subscriber == null) { + throw new IllegalArgumentException("observer can not be null"); + } + if (onSubscribe == null) { + throw new IllegalStateException("onSubscribe function can not be null."); + /* + * the subscribe function can also be overridden but generally that's not the appropriate approach + * so I won't mention that in the exception + */ + } + + // new Subscriber so onStart it + subscriber.onStart(); + + /* + * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls + * to user code from within an Observer" + */ + // if not already wrapped + if (!(subscriber instanceof SafeSubscriber)) { + // assign to `observer` so we return the protected version + subscriber = new SafeSubscriber(subscriber); + } + + // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. + try { + // allow the hook to intercept and/or decorate + // TODO add back the hook + // hook.onSubscribeStart(this, onSubscribe).call(subscriber); + onSubscribe.call(subscriber); + return hook.onSubscribeReturn(subscriber); + } catch (Throwable e) { + // special handling for certain Throwable/Error/Exception types + Exceptions.throwIfFatal(e); + // if an unhandled error occurs executing the onSubscribe we will propagate it + try { + subscriber.onError(hook.onSubscribeError(e)); + } catch (OnErrorNotImplementedException e2) { + // special handling when onError is not implemented ... we just rethrow + throw e2; + } catch (Throwable e2) { + // if this happens it means the onError itself failed (perhaps an invalid function implementation) + // so we are unable to propagate the error correctly and will just throw + RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); + // TODO could the hook be the cause of the error in the on error handling. + hook.onSubscribeError(r); + // TODO why aren't we throwing the hook's return value. + throw r; + } + return Subscriptions.empty(); + } + } + + /** + * Subscribes to an Single and provides a SingleSubscriber that implements functions to handle the item the + * Single emits and any error notification it issues. + *

+ * A typical implementation of {@code subscribe} does the following: + *

    + *
  1. It stores a reference to the Subscriber in a collection object, such as a {@code List} object.
  2. + *
  3. It returns a reference to the {@link Subscription} interface. This enables Subscribers to + * unsubscribe, that is, to stop receiving items and notifications before the Observable completes, which + * also invokes the Subscriber's {@link Subscriber#onCompleted onCompleted} method.
  4. + *

+ * An {@code Single} instance is responsible for accepting all subscriptions and notifying all + * Subscribers. Unless the documentation for a particular {@code Single} implementation indicates + * otherwise, Subscriber should make no assumptions about the order in which multiple Subscribers will + * receive their notifications. + *

+ * For more information see the + * ReactiveX documentation. + *

+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param subscriber + * the {@link Subscriber} that will handle emissions and notifications from the Observable + * @return a {@link Subscription} reference can request the {@link Single} stop work. + * @throws IllegalStateException + * if {@code subscribe} is unable to obtain an {@code OnSubscribe<>} function + * @throws IllegalArgumentException + * if the {@link Subscriber} provided as the argument to {@code subscribe} is {@code null} + * @throws OnErrorNotImplementedException + * if the {@link Subscriber}'s {@code onError} method is null + * @throws RuntimeException + * if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable} + * @see ReactiveX operators documentation: Subscribe + */ + public final Subscription subscribe(final SingleSubscriber te) { + Subscriber s = new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + te.onError(e); + } + + @Override + public void onNext(T t) { + te.onSuccess(t); + } + + }; + te.add(s); + subscribe(s); + return s; + } + + /** + * Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to perform subscription actions on + * @return the source Observable modified so that its subscriptions happen on the + * specified {@link Scheduler} + * @see ReactiveX operators documentation: SubscribeOn + * @see RxJava Threading Examples + * @see #observeOn + */ + public final Single subscribeOn(Scheduler scheduler) { + return nest().lift(new OperatorSubscribeOn(scheduler)); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, + * the resulting Observable terminates and notifies observers of a {@code TimeoutException}. + *

+ * + *

+ *
Scheduler:
+ *
This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param timeout + * maximum duration between emitted items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument. + * @return the source Observable modified to notify observers of a {@code TimeoutException} in case of a + * timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Single timeout(long timeout, TimeUnit timeUnit) { + return timeout(timeout, timeUnit, null, Schedulers.computation()); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item, where this policy is governed on a specified Scheduler. If the next item isn't emitted within the + * specified timeout duration starting from its predecessor, the resulting Observable terminates and + * notifies observers of a {@code TimeoutException}. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param timeout + * maximum duration between items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument + * @param scheduler + * the Scheduler to run the timeout timers on + * @return the source Observable modified to notify observers of a {@code TimeoutException} in case of a + * timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Single timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) { + return timeout(timeout, timeUnit, null, scheduler); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, + * the resulting Observable begins instead to mirror a fallback Observable. + *

+ * + *

+ *
Scheduler:
+ *
This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param timeout + * maximum duration between items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument + * @param other + * the fallback Observable to use in case of a timeout + * @return the source Observable modified to switch to the fallback Observable in case of a timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Single timeout(long timeout, TimeUnit timeUnit, Single other) { + return timeout(timeout, timeUnit, other, Schedulers.computation()); + } + + /** + * Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted + * item using a specified Scheduler. If the next item isn't emitted within the specified timeout duration + * starting from its predecessor, the resulting Observable begins instead to mirror a fallback Observable. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param timeout + * maximum duration between items before a timeout occurs + * @param timeUnit + * the unit of time that applies to the {@code timeout} argument + * @param other + * the Observable to use as the fallback in case of a timeout + * @param scheduler + * the {@link Scheduler} to run the timeout timers on + * @return the source Observable modified so that it will switch to the fallback Observable in case of a + * timeout + * @see ReactiveX operators documentation: Timeout + */ + public final Single timeout(long timeout, TimeUnit timeUnit, Single other, Scheduler scheduler) { + if (other == null) { + other = Single. error(new TimeoutException()); + } + return lift(new OperatorTimeout(timeout, timeUnit, toObservable(other), scheduler)); + } + + /** + * Returns an Observable that emits items that are the result of applying a specified function to pairs of + * values, one each from the source Observable and another specified Observable. + *

+ * + *

+ *
Scheduler:
+ *
{@code zipWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of items emitted by the {@code other} Observable + * @param + * the type of items emitted by the resulting Observable + * @param other + * the other Observable + * @param zipFunction + * a function that combines the pairs of items from the two Observables to generate the items to + * be emitted by the resulting Observable + * @return an Observable that pairs up values from the source Observable and the {@code other} Observable + * and emits the results of {@code zipFunction} applied to these pairs + * @see ReactiveX operators documentation: Zip + */ + public final Single zipWith(Single other, Func2 zipFunction) { + return zip(this, other, zipFunction); + } + +} diff --git a/src/main/java/rx/SingleSubscriber.java b/src/main/java/rx/SingleSubscriber.java new file mode 100644 index 0000000000..01fa84a8c1 --- /dev/null +++ b/src/main/java/rx/SingleSubscriber.java @@ -0,0 +1,81 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import rx.annotations.Experimental; +import rx.internal.util.SubscriptionList; + +/** + * Provides a mechanism for receiving push-based notifications. + *

+ * After an SingleSubscriber calls an {@link Single}'s {@link Single#subscribe subscribe} method, the + * {@code Single} calls the SingleSubscriber's {@link #onSuccess} and {@link #onError} methods to provide notifications. + * A well-behaved {@code Single} will call an SingleSubscriber's {@link #onSuccess} method exactly once or + * the SingleSubscriber's {@link #onError} method exactly once. + * + * @see ReactiveX documentation: Observable + * @param + * the type of item the SingleSubscriber expects to observe + */ +@Experimental +public abstract class SingleSubscriber implements Subscription { + + private final SubscriptionList cs = new SubscriptionList(); + + /** + * Notifies the SingleSubscriber with a single item and that the {@link Single} has finished sending push-based notifications. + *

+ * The {@link Single} will not call this method if it calls {@link #onError}. + */ + public abstract void onSuccess(T value); + + /** + * Notifies the SingleSubscriber that the {@link Single} has experienced an error condition. + *

+ * If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}. + * + * @param e + * the exception encountered by the Single + */ + public abstract void onError(Throwable error); + + /** + * Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as + * unsubscribed. If the list is marked as unsubscribed, {@code add} will indicate this by + * explicitly unsubscribing the new {@code Subscription} as well. + * + * @param s + * the {@code Subscription} to add + */ + public final void add(Subscription s) { + cs.add(s); + } + + @Override + public final void unsubscribe() { + cs.unsubscribe(); + } + + /** + * Indicates whether this Subscriber has unsubscribed from its list of subscriptions. + * + * @return {@code true} if this Subscriber has unsubscribed from its subscriptions, {@code false} otherwise + */ + @Override + public final boolean isUnsubscribed() { + return cs.isUnsubscribed(); + } +} \ No newline at end of file diff --git a/src/perf/java/rx/PerfBaseline.java b/src/perf/java/rx/ObservablePerfBaseline.java similarity index 98% rename from src/perf/java/rx/PerfBaseline.java rename to src/perf/java/rx/ObservablePerfBaseline.java index fa3f4bc313..061caf6264 100644 --- a/src/perf/java/rx/PerfBaseline.java +++ b/src/perf/java/rx/ObservablePerfBaseline.java @@ -30,7 +30,7 @@ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) -public class PerfBaseline { +public class ObservablePerfBaseline { @State(Scope.Thread) public static class Input extends InputWithIncrementingInteger { diff --git a/src/perf/java/rx/SinglePerfBaseline.java b/src/perf/java/rx/SinglePerfBaseline.java new file mode 100644 index 0000000000..e1a646cef0 --- /dev/null +++ b/src/perf/java/rx/SinglePerfBaseline.java @@ -0,0 +1,100 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import rx.Single.OnSubscribe; +import rx.jmh.LatchedObserver; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +public class SinglePerfBaseline { + + + @Benchmark + public void singleConsumption(Input input) throws InterruptedException { + input.single.subscribe(input.newSubscriber()); + } + + @Benchmark + public void singleConsumptionUnsafe(Input input) throws InterruptedException { + input.single.unsafeSubscribe(input.newSubscriber()); + } + + @Benchmark + public void newSingleAndSubscriberEachTime(Input input) throws InterruptedException { + input.newSingle().subscribe(input.newSubscriber()); + } + + @State(Scope.Thread) + public static class Input { + public Single single; + public Blackhole bh; + + @Setup + public void setup(final Blackhole bh) { + this.bh = bh; + single = Single.just(1); + } + + public LatchedObserver newLatchedObserver() { + return new LatchedObserver(bh); + } + + public Single newSingle() { + return Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber t) { + t.onSuccess(1); + } + + }); + } + + public Subscriber newSubscriber() { + return new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + bh.consume(t); + } + + }; + } + + } +} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java new file mode 100644 index 0000000000..778feffb3c --- /dev/null +++ b/src/test/java/rx/SingleTest.java @@ -0,0 +1,455 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package rx; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Single.OnSubscribe; +import rx.functions.Action0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; + +public class SingleTest { + + @Test + public void testHelloWorld() { + TestSubscriber ts = new TestSubscriber(); + Single.just("Hello World!").subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("Hello World!")); + } + + @Test + public void testHelloWorld2() { + final AtomicReference v = new AtomicReference(); + Single.just("Hello World!").subscribe(new SingleSubscriber() { + + @Override + public void onSuccess(String value) { + v.set(value); + } + + @Override + public void onError(Throwable error) { + + } + + }); + assertEquals("Hello World!", v.get()); + } + + @Test + public void testMap() { + TestSubscriber ts = new TestSubscriber(); + Single.just("A") + .map(new Func1() { + + @Override + public String call(String s) { + return s + "B"; + } + + }) + .subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("AB")); + } + + @Test + public void testZip() { + TestSubscriber ts = new TestSubscriber(); + Single a = Single.just("A"); + Single b = Single.just("B"); + + Single.zip(a, b, new Func2() { + + @Override + public String call(String a, String b) { + return a + b; + } + + }) + .subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("AB")); + } + + @Test + public void testZipWith() { + TestSubscriber ts = new TestSubscriber(); + + Single.just("A").zipWith(Single.just("B"), new Func2() { + + @Override + public String call(String a, String b) { + return a + b; + } + + }) + .subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("AB")); + } + + @Test + public void testMerge() { + TestSubscriber ts = new TestSubscriber(); + Single a = Single.just("A"); + Single b = Single.just("B"); + + Single.merge(a, b).subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("A", "B")); + } + + @Test + public void testMergeWith() { + TestSubscriber ts = new TestSubscriber(); + + Single.just("A").mergeWith(Single.just("B")).subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("A", "B")); + } + + @Test + public void testCreateSuccess() { + TestSubscriber ts = new TestSubscriber(); + Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber s) { + s.onSuccess("Hello"); + } + + }).subscribe(ts); + ts.assertReceivedOnNext(Arrays.asList("Hello")); + } + + @Test + public void testCreateError() { + TestSubscriber ts = new TestSubscriber(); + Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber s) { + s.onError(new RuntimeException("fail")); + } + + }).subscribe(ts); + assertEquals(1, ts.getOnErrorEvents().size()); + } + + @Test + public void testAsync() { + TestSubscriber ts = new TestSubscriber(); + Single.just("Hello") + .subscribeOn(Schedulers.io()) + .map(new Func1() { + + @Override + public String call(String v) { + System.out.println("SubscribeOn Thread: " + Thread.currentThread()); + return v; + } + + }) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + @Override + public String call(String v) { + System.out.println("ObserveOn Thread: " + Thread.currentThread()); + return v; + } + + }) + .subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList("Hello")); + } + + @Test + public void testFlatMap() { + TestSubscriber ts = new TestSubscriber(); + Single.just("Hello").flatMap(new Func1>() { + + @Override + public Single call(String s) { + return Single.just(s + " World!").subscribeOn(Schedulers.computation()); + } + + }).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList("Hello World!")); + } + + @Test + public void testTimeout() { + TestSubscriber ts = new TestSubscriber(); + Single s = Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber s) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // ignore as we expect this for the test + } + s.onSuccess("success"); + } + + }).subscribeOn(Schedulers.io()); + + s.timeout(100, TimeUnit.MILLISECONDS).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertError(TimeoutException.class); + } + + @Test + public void testTimeoutWithFallback() { + TestSubscriber ts = new TestSubscriber(); + Single s = Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber s) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // ignore as we expect this for the test + } + s.onSuccess("success"); + } + + }).subscribeOn(Schedulers.io()); + + s.timeout(100, TimeUnit.MILLISECONDS, Single.just("hello")).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + ts.assertValue("hello"); + } + + @Test + public void testUnsubscribe() throws InterruptedException { + TestSubscriber ts = new TestSubscriber(); + final AtomicBoolean unsubscribed = new AtomicBoolean(); + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(2); + + Single s = Single.create(new OnSubscribe() { + + @Override + public void call(final SingleSubscriber s) { + final Thread t = new Thread(new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(5000); + s.onSuccess("success"); + } catch (InterruptedException e) { + interrupted.set(true); + latch.countDown(); + } + } + + }); + s.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + unsubscribed.set(true); + t.interrupt(); + latch.countDown(); + } + + })); + t.start(); + } + + }); + + s.subscribe(ts); + + Thread.sleep(100); + + ts.unsubscribe(); + + if (latch.await(1000, TimeUnit.MILLISECONDS)) { + assertTrue(unsubscribed.get()); + assertTrue(interrupted.get()); + } else { + fail("timed out waiting for latch"); + } + } + + /** + * Assert that unsubscribe propagates when passing in a SingleSubscriber and not a Subscriber + */ + @Test + public void testUnsubscribe2() throws InterruptedException { + SingleSubscriber ts = new SingleSubscriber() { + + @Override + public void onSuccess(String value) { + // not interested in value + } + + @Override + public void onError(Throwable error) { + // not interested in value + } + + }; + final AtomicBoolean unsubscribed = new AtomicBoolean(); + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(2); + + Single s = Single.create(new OnSubscribe() { + + @Override + public void call(final SingleSubscriber s) { + final Thread t = new Thread(new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(5000); + s.onSuccess("success"); + } catch (InterruptedException e) { + interrupted.set(true); + latch.countDown(); + } + } + + }); + s.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + unsubscribed.set(true); + t.interrupt(); + latch.countDown(); + } + + })); + t.start(); + } + + }); + + s.subscribe(ts); + + Thread.sleep(100); + + ts.unsubscribe(); + + if (latch.await(1000, TimeUnit.MILLISECONDS)) { + assertTrue(unsubscribed.get()); + assertTrue(interrupted.get()); + } else { + fail("timed out waiting for latch"); + } + } + + /** + * Assert that unsubscribe propagates when passing in a SingleSubscriber and not a Subscriber + */ + @Test + public void testUnsubscribeViaReturnedSubscription() throws InterruptedException { + final AtomicBoolean unsubscribed = new AtomicBoolean(); + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(2); + + Single s = Single.create(new OnSubscribe() { + + @Override + public void call(final SingleSubscriber s) { + final Thread t = new Thread(new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(5000); + s.onSuccess("success"); + } catch (InterruptedException e) { + interrupted.set(true); + latch.countDown(); + } + } + + }); + s.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + unsubscribed.set(true); + t.interrupt(); + latch.countDown(); + } + + })); + t.start(); + } + + }); + + Subscription subscription = s.subscribe(); + + Thread.sleep(100); + + subscription.unsubscribe(); + + if (latch.await(1000, TimeUnit.MILLISECONDS)) { + assertTrue(unsubscribed.get()); + assertTrue(interrupted.get()); + } else { + fail("timed out waiting for latch"); + } + } + + @Test + public void testBackpressureAsObservable() { + Single s = Single.create(new OnSubscribe() { + + @Override + public void call(SingleSubscriber t) { + t.onSuccess("hello"); + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + request(0); + } + }; + + s.subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(1); + + ts.assertValue("hello"); + } +}