Skip to content

Commit

Permalink
Merge pull request ReactiveX#4 from benjchristensen/cleanup
Browse files Browse the repository at this point in the history
remove 'wrap' functionality
  • Loading branch information
benjchristensen committed Jan 12, 2013
2 parents eb95131 + d61c54d commit 1306f70
Showing 1 changed file with 31 additions and 57 deletions.
88 changes: 31 additions & 57 deletions rxjava-core/src/main/java/org/rx/reactive/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public Subscription subscribe(Observer<T> observer) {
* @return a Observable that, when a Observer subscribes to it, will execute the given function
*/
public static <T> Observable<T> create(Func1<Subscription, Observer<T>> func) {
return wrap(OperationToObservableFunction.toObservableFunction(func));
return OperationToObservableFunction.toObservableFunction(func);
}

/**
Expand Down Expand Up @@ -367,7 +367,7 @@ public static <T> Observable<T> empty() {
* @return a Observable object that calls <code>onError</code> when a Observer subscribes
*/
public static <T> Observable<T> error(Exception exception) {
return wrap(new ThrowObservable<T>(exception));
return new ThrowObservable<T>(exception);
}

/**
Expand Down Expand Up @@ -455,7 +455,7 @@ public static <T> Observable<T> just(T value) {
* by the source Observable
*/
public static <T> Observable<T> last(final Observable<T> that) {
return wrap(OperationLast.last(that));
return OperationLast.last(that);
}

/**
Expand All @@ -477,7 +477,7 @@ public static <T> Observable<T> last(final Observable<T> that) {
* in the sequence emitted by the source Observable
*/
public static <T, R> Observable<R> map(Observable<T> sequence, Func1<R, T> func) {
return wrap(OperationMap.map(sequence, func));
return OperationMap.map(sequence, func);
}

/**
Expand Down Expand Up @@ -532,7 +532,7 @@ public R call(T t1) {
* the Observables obtained from this transformation
*/
public static <T, R> Observable<R> mapMany(Observable<T> sequence, Func1<Observable<R>, T> func) {
return wrap(OperationMap.mapMany(sequence, func));
return OperationMap.mapMany(sequence, func);
}

/**
Expand Down Expand Up @@ -598,7 +598,7 @@ public static <T> Observable<Notification<T>> materialize(final Observable<T> se
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(List<Observable<T>> source) {
return wrap(OperationMerge.merge(source));
return OperationMerge.merge(source);
}

/**
Expand All @@ -616,7 +616,7 @@ public static <T> Observable<T> merge(List<Observable<T>> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<Observable<T>> source) {
return wrap(OperationMerge.merge(source));
return OperationMerge.merge(source);
}

/**
Expand All @@ -634,7 +634,7 @@ public static <T> Observable<T> merge(Observable<Observable<T>> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<T>... source) {
return wrap(OperationMerge.merge(source));
return OperationMerge.merge(source);
}

/**
Expand All @@ -654,7 +654,7 @@ public static <T> Observable<T> merge(Observable<T>... source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
return wrap(OperationMergeDelayError.mergeDelayError(source));
return OperationMergeDelayError.mergeDelayError(source);
}

/**
Expand All @@ -674,7 +674,7 @@ public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source) {
return wrap(OperationMergeDelayError.mergeDelayError(source));
return OperationMergeDelayError.mergeDelayError(source);
}

/**
Expand All @@ -694,7 +694,7 @@ public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<T>... source) {
return wrap(OperationMergeDelayError.mergeDelayError(source));
return OperationMergeDelayError.mergeDelayError(source);
}

/**
Expand All @@ -710,7 +710,7 @@ public static <T> Observable<T> mergeDelayError(Observable<T>... source) {
* @return a Observable that never sends any information to a Observer
*/
public static <T> Observable<T> never() {
return wrap(new NeverObservable<T>());
return new NeverObservable<T>();
}

/**
Expand Down Expand Up @@ -749,7 +749,7 @@ public static Subscription noOpSubscription() {
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Func1<Observable<T>, Exception> resumeFunction) {
return wrap(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
return OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction);
}

/**
Expand Down Expand Up @@ -812,7 +812,7 @@ public Observable<T> call(Exception e) {
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Observable<T> resumeSequence) {
return wrap(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence));
return OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence);
}

/**
Expand All @@ -839,7 +839,7 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<T, Exception> resumeFunction) {
return wrap(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
return OperationOnErrorReturn.onErrorReturn(that, resumeFunction);
}

/**
Expand Down Expand Up @@ -870,7 +870,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<T,
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, accumulator).last());
return OperationScan.scan(sequence, accumulator).last();
}

/**
Expand Down Expand Up @@ -941,7 +941,7 @@ public T call(T t1, T t2) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, initialValue, accumulator).last());
return OperationScan.scan(sequence, initialValue, accumulator).last();
}

/**
Expand Down Expand Up @@ -1004,7 +1004,7 @@ public T call(T t1, T t2) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(Observable<T> sequence, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, accumulator));
return OperationScan.scan(sequence, accumulator);
}

/**
Expand Down Expand Up @@ -1061,7 +1061,7 @@ public T call(T t1, T t2) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, initialValue, accumulator));
return OperationScan.scan(sequence, initialValue, accumulator);
}

/**
Expand Down Expand Up @@ -1114,7 +1114,7 @@ public T call(T t1, T t2) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">MSDN: Observable.Skip Method</a>
*/
public static <T> Observable<T> skip(final Observable<T> items, int num) {
return wrap(OperationSkip.skip(items, num));
return OperationSkip.skip(items, num);
}

/**
Expand All @@ -1132,7 +1132,7 @@ public static <T> Observable<T> skip(final Observable<T> items, int num) {
* @return a Observable that is a chronologically well-behaved version of the source Observable
*/
public static <T> Observable<T> synchronize(Observable<T> observable) {
return wrap(OperationSynchronize.synchronize(observable));
return OperationSynchronize.synchronize(observable);
}

/**
Expand All @@ -1155,7 +1155,7 @@ public static <T> Observable<T> synchronize(Observable<T> observable) {
* Observable
*/
public static <T> Observable<T> take(final Observable<T> items, final int num) {
return wrap(OperationTake.take(items, num));
return OperationTake.take(items, num);
}

/**
Expand All @@ -1178,7 +1178,7 @@ public static <T> Observable<T> take(final Observable<T> items, final int num) {
* items emitted by the source Observable
*/
public static <T> Observable<List<T>> toList(final Observable<T> that) {
return wrap(OperationToObservableList.toObservableList(that));
return OperationToObservableList.toObservableList(that);
}

/**
Expand All @@ -1198,7 +1198,7 @@ public static <T> Observable<List<T>> toList(final Observable<T> that) {
* @return a Observable that emits each item in the source Iterable sequence
*/
public static <T> Observable<T> toObservable(Iterable<T> iterable) {
return wrap(OperationToObservableIterable.toObservableIterable(iterable));
return OperationToObservableIterable.toObservableIterable(iterable);
}

/**
Expand Down Expand Up @@ -1233,7 +1233,7 @@ public static <T> Observable<T> toObservable(T... items) {
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
return wrap(OperationToObservableSortedList.toSortedList(sequence));
return OperationToObservableSortedList.toSortedList(sequence);
}

/**
Expand All @@ -1247,7 +1247,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2<Integer, T, T> sortFunction) {
return wrap(OperationToObservableSortedList.toSortedList(sequence, sortFunction));
return OperationToObservableSortedList.toSortedList(sequence, sortFunction);
}

/**
Expand All @@ -1261,40 +1261,14 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
return wrap(OperationToObservableSortedList.toSortedList(sequence, new Func2<Integer, T, T>() {
return OperationToObservableSortedList.toSortedList(sequence, new Func2<Integer, T, T>() {

@Override
public Integer call(T t1, T t2) {
return Functions.execute(sortFunction, t1, t2);
}

}));
}

/**
* Allow wrapping responses with the <code>AbstractObservable</code> so that we have all of
* the utility methods available for subscribing.
* <p>
* This is not expected to benefit Java usage, but is intended for dynamic script which are a primary target of the Observable operations.
* <p>
* Since they are dynamic they can execute the "hidden" methods on <code>AbstractObservable</code> while appearing to only receive an <code>Observable</code> without first casting.
*
* @param o
* @return
*/
private static <T> Observable<T> wrap(final Observable<T> o) {
if (o instanceof Observable) {
// if the Observable is already an AbstractObservable, don't wrap it again.
return (Observable<T>) o;
}
return new Observable<T>() {

@Override
public Subscription subscribe(Observer<T> observer) {
return o.subscribe(observer);
}

};
});
}

/**
Expand Down Expand Up @@ -1322,7 +1296,7 @@ public Subscription subscribe(Observer<T> observer) {
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Func2<R, T0, T1> reduceFunction) {
return wrap(OperationZip.zip(w0, w1, reduceFunction));
return OperationZip.zip(w0, w1, reduceFunction);
}

/**
Expand Down Expand Up @@ -1389,7 +1363,7 @@ public R call(T0 t0, T1 t1) {
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<R, T0, T1, T2> function) {
return wrap(OperationZip.zip(w0, w1, w2, function));
return OperationZip.zip(w0, w1, w2, function);
}

/**
Expand Down Expand Up @@ -1461,7 +1435,7 @@ public R call(T0 t0, T1 t1, T2 t2) {
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<R, T0, T1, T2, T3> reduceFunction) {
return wrap(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
return OperationZip.zip(w0, w1, w2, w3, reduceFunction);
}

/**
Expand Down

0 comments on commit 1306f70

Please sign in to comment.