Skip to content

Commit

Permalink
Merge pull request #3 from benjchristensen/static-core
Browse files Browse the repository at this point in the history
Remove subscribe(Map<String, Object>) and cleanup Functions.from
  • Loading branch information
mattrjacobs committed Aug 29, 2013
2 parents bb45652 + f8bacd4 commit 39239fa
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 84 deletions.
61 changes: 6 additions & 55 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
import rx.util.Timestamped;
import rx.util.functions.Action;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
Expand Down Expand Up @@ -249,56 +250,6 @@ private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Map<String, Object> callbacks) {
if (callbacks == null) {
throw new RuntimeException("callbacks map can not be null");
}
Object _onNext = callbacks.get("onNext");
if (_onNext == null) {
throw new RuntimeException("'onNext' key must contain an implementation");
}
// lookup and memoize onNext
final FuncN onNext = Functions.from(_onNext);

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
return protectivelyWrapAndSubscribe(new Observer() {

@Override
public void onCompleted() {
Object onComplete = callbacks.get("onCompleted");
if (onComplete != null) {
Functions.from(onComplete).call();
}
}

@Override
public void onError(Throwable e) {
handleError(e);
Object onError = callbacks.get("onError");
if (onError != null) {
Functions.from(onError).call(e);
} else {
throw new OnErrorNotImplementedException(e);
}
}

@Override
public void onNext(Object args) {
onNext.call(args);
}

});
}

public Subscription subscribe(final Map<String, Object> callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}

public Subscription subscribe(final Action1<T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
Expand Down Expand Up @@ -1086,13 +1037,13 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
*
* @param w0
* The first source observable.
* The first source observable.
* @param w1
* The second source observable.
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
Expand All @@ -1112,7 +1063,7 @@ public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Obs
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
}

/**
* Creates an Observable which produces buffers of collected values.
*
Expand Down
18 changes: 1 addition & 17 deletions rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.FuncN;
import rx.util.functions.Function;
import rx.util.functions.Functions;

/**
Expand Down Expand Up @@ -83,23 +84,6 @@ public static CompositeSubscription create(Subscription... subscriptions) {
return new CompositeSubscription(subscriptions);
}

/**
* A {@link Subscription} implemented via an anonymous function (such as closures from other languages).
*
* @return {@link Subscription}
*/
public static Subscription create(final Object unsubscribe) {
final FuncN<?> f = Functions.from(unsubscribe);
return new Subscription() {

@Override
public void unsubscribe() {
f.call();
}

};
}

/**
* A {@link Subscription} that does nothing when its unsubscribe method is called.
*/
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/functions/Action.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
* <p>
* Marker interface to allow instanceof checks.
*/
public interface Action {
public interface Action extends Function {

}
13 changes: 2 additions & 11 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package rx.util.functions;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;

public class Functions {

/**
Expand All @@ -26,17 +23,11 @@ public class Functions {
* @param function
*/
@SuppressWarnings({ "rawtypes" })
public static FuncN from(final Object function) {
public static FuncN from(final Function function) {
if (function == null) {
throw new RuntimeException("function is null. Can't send arguments to null function.");
}

/* check for typed Rx Function implementation first */
if (function instanceof Function) {
return fromFunction((Function) function);
}
// no support found
throw new RuntimeException("Unsupported closure type: " + function.getClass().getSimpleName());
return fromFunction(function);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down

0 comments on commit 39239fa

Please sign in to comment.