From 1fa96db3c04d461f8f79db19bd481f50804dd504 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 13:21:13 -0700 Subject: [PATCH] Removing Scheduler overloads on operators (for now) I have some outstanding questions on how these should be implemented (or even why we need them when the 'subscribeOn' operator is far cleaner) so want to remove them for now so they don't make it into the public incorrectly. --- rxjava-core/src/main/java/rx/Observable.java | 193 ------------------ .../java/rx/concurrency/TestSchedulers.java | 6 +- 2 files changed, 3 insertions(+), 196 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index db6a5cdf5bb..6d7f503084b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -779,21 +779,6 @@ public static Observable empty() { return toObservable(new ArrayList()); } - /** - * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method on the given {@link Scheduler}. - *

- * - * - * @param - * the type of item emitted by the Observable - * @param {@link Scheduler} The scheduler to send the termination ({@link Observer#onCompleted()} call. - * @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method - */ - @SuppressWarnings("unchecked") - public static Observable empty(Scheduler scheduler) { - return (Observable) empty().subscribeOn(scheduler); - } - /** * Returns an Observable that calls onError when an {@link Observer} subscribes to it. *

@@ -877,22 +862,6 @@ public static Observable from(Iterable iterable) { return toObservable(iterable); } - /** - * Converts an {@link Iterable} sequence to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * @param iterable - * the source {@link Iterable} sequence - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param - * the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable - * @return an Observable that emits each item in the source {@link Iterable} sequence - * @see {@link #toObservable(Iterable)} - */ - public static Observable from(Iterable iterable, Scheduler scheduler) { - return toObservable(iterable, scheduler); - } - /** * Converts an Array to an Observable sequence. * @@ -907,22 +876,6 @@ public static Observable from(T... items) { return toObservable(items); } - /** - * Converts an Array to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param items - * the source Array - * @param - * the type of items in the Array, and the type of items emitted by the resulting Observable - * @return an Observable that emits each item in the source Array - * @see {@link #toObservable(Object...)} - */ - public static Observable from(Scheduler scheduler, T... items) { - return toObservable(scheduler, items); - } - /** * Generates an observable sequence of integral numbers within a specified range. * @@ -1302,25 +1255,6 @@ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } - /** - * Flattens the Observable sequences from a list of Observables into one Observable sequence - * without any transformation. You can combine the output of multiple Observables so that they - * act like a single Observable, by using the merge method. - *

- * - * - * @param source - * a list of Observables that emit sequences of items - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @return an Observable that emits a sequence of elements that are the result of flattening the - * output from the source list of Observables - * @see MSDN: Observable.Merge - */ - public static Observable merge(List> source, Scheduler scheduler) { - return merge(source).subscribeOn(scheduler); - } - /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output @@ -1338,25 +1272,6 @@ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } - /** - * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a - * Observable into one Observable sequence without any transformation. You can combine the output - * of multiple Observables so that they act like a single Observable, by using the merge method. - *

- * - * - * @param source - * an Observable that emits Observables - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @return an Observable that emits a sequence of elements that are the result of flattening the - * output from the Observables emitted by the source Observable - * @see MSDN: Observable.Merge Method - */ - public static Observable merge(Observable> source, Scheduler scheduler) { - return merge(source).subscribeOn(scheduler); - } - /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they @@ -1374,25 +1289,6 @@ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } - /** - * Flattens the Observable sequences from a series of Observables into one Observable sequence - * without any transformation. You can combine the output of multiple Observables so that they - * act like a single Observable, by using the merge method. - *

- * - * - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param source - * a series of Observables that emit sequences of items - * @return an Observable that emits a sequence of elements that are the result of flattening the - * output from the source Observables - * @see MSDN: Observable.Merge Method - */ - public static Observable merge(Scheduler scheduler, Observable... source) { - return merge(source).subscribeOn(scheduler); - } - /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * @@ -2316,27 +2212,6 @@ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } - /** - * Converts an Iterable sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. - * - * Any object that supports the Iterable interface can be converted into an Observable that emits - * each iterable item in the object, by passing the object into the toObservable method. - *

- * - * - * @param iterable - * the source Iterable sequence - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param - * the type of items in the iterable sequence and the type emitted by the resulting - * Observable - * @return an Observable that emits each item in the source Iterable sequence - */ - public static Observable toObservable(Iterable iterable, Scheduler scheduler) { - return toObservable(iterable).subscribeOn(scheduler); - } - /** * Converts an Future to an Observable sequence. * @@ -2356,27 +2231,6 @@ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } - /** - * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * Any object that supports the {@link Future} interface can be converted into an Observable that emits - * the return value of the get() method in the object, by passing the object into the toObservable method. - *

- * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. - * - * @param future - * the source {@link Future} - * @param scheduler - * The {@link Scheduler} to wait for the future on. - * @param - * the type of of object that the future's returns and the type emitted by the resulting - * Observable - * @return an Observable that emits the item from the source Future - */ - public static Observable toObservable(Future future, Scheduler scheduler) { - return toObservable(future).subscribeOn(scheduler); - } - /** * Converts an Future to an Observable sequence. * @@ -2401,32 +2255,6 @@ public static Observable toObservable(Future future, long timeout, Tim return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } - /** - * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * Any object that supports the {@link Future} interface can be converted into an Observable that emits - * the return value of the get() method in the object, by passing the object into the toObservable method. - * The subscribe method on this synchronously so the Subscription returned doesn't nothing. - *

- * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. - * - * @param future - * the source {@link Future} - * @param timeout - * the maximum time to wait - * @param unit - * the time unit of the time argument - * @param scheduler - * The {@link Scheduler} to wait for the future on. - * @param - * the type of of object that the future's returns and the type emitted by the resulting - * Observable - * @return an Observable that emits the item from the source Future - */ - public static Observable toObservable(Future future, long timeout, TimeUnit unit, Scheduler scheduler) { - return toObservable(future, timeout, unit).subscribeOn(scheduler); - } - /** * Converts an Array sequence to an Observable sequence. * @@ -2446,27 +2274,6 @@ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); } - /** - * Converts an Array sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. - * - * An Array can be converted into an Observable that emits each item in the Array, by passing the - * Array into the toObservable method. - *

- * - * - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param items - * the source Array - * @param - * the type of items in the Array, and the type of items emitted by the resulting - * Observable - * @return an Observable that emits each item in the source Array - */ - public static Observable toObservable(Scheduler scheduler, T... items) { - return toObservable(items).subscribeOn(scheduler); - } - /** * Sort T objects by their natural order (object must implement Comparable). *

diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index bec93cfdcd8..ec247d0b95c 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -115,7 +115,7 @@ public void testMergeWithImmediateScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(Schedulers.immediate(), o1, o2).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { @Override public String call(Integer t) { @@ -141,7 +141,7 @@ public void testMergeWithCurrentThreadScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(Schedulers.currentThread(), o1, o2).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { @Override public String call(Integer t) { @@ -167,7 +167,7 @@ public void testMergeWithScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(Schedulers.threadPoolForComputation(), o1, o2).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { @Override public String call(Integer t) {