Skip to content

Commit

Permalink
Removing Scheduler overloads on operators (for now)
Browse files Browse the repository at this point in the history
I have some outstanding questions on how these should be implemented (or even why we need them when the 'subscribeOn' operator is far cleaner) so want to remove them for now so they don't make it into the public incorrectly.
  • Loading branch information
benjchristensen committed Apr 5, 2013
1 parent 4597784 commit 1fa96db
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 196 deletions.
193 changes: 0 additions & 193 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -779,21 +779,6 @@ public static <T> Observable<T> empty() {
return toObservable(new ArrayList<T>());
}

/**
* Returns an Observable that returns no data to the {@link Observer} and immediately invokes its <code>onCompleted</code> method on the given {@link Scheduler}.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.png">
*
* @param <T>
* the type of item emitted by the Observable
* @param {@link Scheduler} The scheduler to send the termination ({@link Observer#onCompleted()} call.
* @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s <code>onCompleted</code> method
*/
@SuppressWarnings("unchecked")
public static <T> Observable<T> empty(Scheduler scheduler) {
return (Observable<T>) empty().subscribeOn(scheduler);
}

/**
* Returns an Observable that calls <code>onError</code> when an {@link Observer} subscribes to it.
* <p>
Expand Down Expand Up @@ -877,22 +862,6 @@ public static <T> Observable<T> from(Iterable<T> iterable) {
return toObservable(iterable);
}

/**
* Converts an {@link Iterable} sequence to an Observable sequence that is subscribed to on the given {@link Scheduler}.
*
* @param iterable
* the source {@link Iterable} sequence
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @param <T>
* the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable
* @return an Observable that emits each item in the source {@link Iterable} sequence
* @see {@link #toObservable(Iterable)}
*/
public static <T> Observable<T> from(Iterable<T> iterable, Scheduler scheduler) {
return toObservable(iterable, scheduler);
}

/**
* Converts an Array to an Observable sequence.
*
Expand All @@ -907,22 +876,6 @@ public static <T> Observable<T> from(T... items) {
return toObservable(items);
}

/**
* Converts an Array to an Observable sequence that is subscribed to on the given {@link Scheduler}.
*
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @param items
* the source Array
* @param <T>
* the type of items in the Array, and the type of items emitted by the resulting Observable
* @return an Observable that emits each item in the source Array
* @see {@link #toObservable(Object...)}
*/
public static <T> Observable<T> from(Scheduler scheduler, T... items) {
return toObservable(scheduler, items);
}

/**
* Generates an observable sequence of integral numbers within a specified range.
*
Expand Down Expand Up @@ -1302,25 +1255,6 @@ public static <T> Observable<T> merge(List<Observable<T>> source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens the Observable sequences from a list of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the <code>merge</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
*
* @param source
* a list of Observables that emit sequences of items
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the <code>source</code> list of Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(List<Observable<T>> source, Scheduler scheduler) {
return merge(source).subscribeOn(scheduler);
}

/**
* Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a
* Observable into one Observable sequence without any transformation. You can combine the output
Expand All @@ -1338,25 +1272,6 @@ public static <T> Observable<T> merge(Observable<Observable<T>> source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a
* Observable into one Observable sequence without any transformation. You can combine the output
* of multiple Observables so that they act like a single Observable, by using the <code>merge</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
*
* @param source
* an Observable that emits Observables
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the Observables emitted by the <code>source</code> Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<Observable<T>> source, Scheduler scheduler) {
return merge(source).subscribeOn(scheduler);
}

/**
* Flattens the Observable sequences from a series of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
Expand All @@ -1374,25 +1289,6 @@ public static <T> Observable<T> merge(Observable<T>... source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens the Observable sequences from a series of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the <code>merge</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
*
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the <code>source</code> Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Scheduler scheduler, Observable<T>... source) {
return merge(source).subscribeOn(scheduler);
}

/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
Expand Down Expand Up @@ -2316,27 +2212,6 @@ public static <T> Observable<T> toObservable(Iterable<T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
}

/**
* Converts an Iterable sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}.
*
* Any object that supports the Iterable interface can be converted into an Observable that emits
* each iterable item in the object, by passing the object into the <code>toObservable</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toObservable.png">
*
* @param iterable
* the source Iterable sequence
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @param <T>
* the type of items in the iterable sequence and the type emitted by the resulting
* Observable
* @return an Observable that emits each item in the source Iterable sequence
*/
public static <T> Observable<T> toObservable(Iterable<T> iterable, Scheduler scheduler) {
return toObservable(iterable).subscribeOn(scheduler);
}

/**
* Converts an Future to an Observable sequence.
*
Expand All @@ -2356,27 +2231,6 @@ public static <T> Observable<T> toObservable(Future<T> future) {
return create(OperationToObservableFuture.toObservableFuture(future));
}

/**
* Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}.
*
* Any object that supports the {@link Future} interface can be converted into an Observable that emits
* the return value of the get() method in the object, by passing the object into the <code>toObservable</code> method.
* <p>
* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing.
*
* @param future
* the source {@link Future}
* @param scheduler
* The {@link Scheduler} to wait for the future on.
* @param <T>
* the type of of object that the future's returns and the type emitted by the resulting
* Observable
* @return an Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future, Scheduler scheduler) {
return toObservable(future).subscribeOn(scheduler);
}

/**
* Converts an Future to an Observable sequence.
*
Expand All @@ -2401,32 +2255,6 @@ public static <T> Observable<T> toObservable(Future<T> future, long timeout, Tim
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
}

/**
* Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}.
*
* Any object that supports the {@link Future} interface can be converted into an Observable that emits
* the return value of the get() method in the object, by passing the object into the <code>toObservable</code> method.
* The subscribe method on this synchronously so the Subscription returned doesn't nothing.
* <p>
* This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing.
*
* @param future
* the source {@link Future}
* @param timeout
* the maximum time to wait
* @param unit
* the time unit of the time argument
* @param scheduler
* The {@link Scheduler} to wait for the future on.
* @param <T>
* the type of of object that the future's returns and the type emitted by the resulting
* Observable
* @return an Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
return toObservable(future, timeout, unit).subscribeOn(scheduler);
}

/**
* Converts an Array sequence to an Observable sequence.
*
Expand All @@ -2446,27 +2274,6 @@ public static <T> Observable<T> toObservable(T... items) {
return toObservable(Arrays.asList(items));
}

/**
* Converts an Array sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}.
*
* An Array can be converted into an Observable that emits each item in the Array, by passing the
* Array into the <code>toObservable</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toObservable.png">
*
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @param items
* the source Array
* @param <T>
* the type of items in the Array, and the type of items emitted by the resulting
* Observable
* @return an Observable that emits each item in the source Array
*/
public static <T> Observable<T> toObservable(Scheduler scheduler, T... items) {
return toObservable(items).subscribeOn(scheduler);
}

/**
* Sort T objects by their natural order (object must implement Comparable).
* <p>
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testMergeWithImmediateScheduler1() {
Observable<Integer> o1 = Observable.<Integer> from(1, 2, 3, 4, 5);
Observable<Integer> o2 = Observable.<Integer> from(6, 7, 8, 9, 10);
@SuppressWarnings("unchecked")
Observable<String> o = Observable.<Integer> merge(Schedulers.immediate(), o1, o2).map(new Func1<Integer, String>() {
Observable<String> o = Observable.<Integer> merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1<Integer, String>() {

@Override
public String call(Integer t) {
Expand All @@ -141,7 +141,7 @@ public void testMergeWithCurrentThreadScheduler1() {
Observable<Integer> o1 = Observable.<Integer> from(1, 2, 3, 4, 5);
Observable<Integer> o2 = Observable.<Integer> from(6, 7, 8, 9, 10);
@SuppressWarnings("unchecked")
Observable<String> o = Observable.<Integer> merge(Schedulers.currentThread(), o1, o2).map(new Func1<Integer, String>() {
Observable<String> o = Observable.<Integer> merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1<Integer, String>() {

@Override
public String call(Integer t) {
Expand All @@ -167,7 +167,7 @@ public void testMergeWithScheduler1() {
Observable<Integer> o1 = Observable.<Integer> from(1, 2, 3, 4, 5);
Observable<Integer> o2 = Observable.<Integer> from(6, 7, 8, 9, 10);
@SuppressWarnings("unchecked")
Observable<String> o = Observable.<Integer> merge(Schedulers.threadPoolForComputation(), o1, o2).map(new Func1<Integer, String>() {
Observable<String> o = Observable.<Integer> merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1<Integer, String>() {

@Override
public String call(Integer t) {
Expand Down

0 comments on commit 1fa96db

Please sign in to comment.