Skip to content

Commit

Permalink
Merge pull request ReactiveX#106 from benjchristensen/function-memoiz…
Browse files Browse the repository at this point in the history
…ation

Performance optimizations for dynamic function execution.

ReactiveX#104
  • Loading branch information
benjchristensen committed Jan 22, 2013
2 parents 960b107 + 0c5d6e6 commit 2fd22df
Show file tree
Hide file tree
Showing 19 changed files with 491 additions and 144 deletions.
93 changes: 72 additions & 21 deletions rxjava-core/src/main/java/rx/observables/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import rx.util.Func2;
import rx.util.Func3;
import rx.util.Func4;
import rx.util.FuncN;
import rx.util.Functions;

/**
Expand Down Expand Up @@ -301,7 +302,7 @@ private static void handleError(Exception e) {
* @param args
*/
private void executeCallback(final Object callback, Object... args) {
Functions.execute(callback, args);
Functions.from(callback).call(args);
}

/**
Expand Down Expand Up @@ -395,11 +396,13 @@ public static <T> Observable<T> create(Func1<Observer<T>, Subscription> func) {
* @return a Observable that, when a Observer subscribes to it, will execute the given function
*/
public static <T> Observable<T> create(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return create(new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> t1) {
return Functions.execute(callback, t1);
return (Subscription) _f.call(t1);
}

});
Expand Down Expand Up @@ -469,11 +472,13 @@ public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> pre
* evaluates as true
*/
public static <T> Observable<T> filter(Observable<T> that, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return filter(that, new Func1<T, Boolean>() {

@Override
public Boolean call(T t1) {
return Functions.execute(function, t1);
return (Boolean) _f.call(t1);

}

Expand Down Expand Up @@ -597,11 +602,14 @@ public static <T, R> Observable<R> map(Observable<T> sequence, Func1<T, R> func)
* in the sequence emitted by the source Observable
*/
public static <T, R> Observable<R> map(Observable<T> sequence, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return map(sequence, new Func1<T, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return Functions.execute(function, t1);
return (R) _f.call(t1);
}

});
Expand Down Expand Up @@ -656,11 +664,14 @@ public static <T, R> Observable<R> mapMany(Observable<T> sequence, Func1<T, Obse
* Observables obtained from this transformation
*/
public static <T, R> Observable<R> mapMany(Observable<T> sequence, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return mapMany(sequence, new Func1<T, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return Functions.execute(function, t1);
return (R) _f.call(t1);
}

});
Expand Down Expand Up @@ -876,11 +887,14 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(that, new Func1<Exception, Observable<T>>() {

@SuppressWarnings("unchecked")
@Override
public Observable<T> call(Exception e) {
return Functions.execute(resumeFunction, e);
return (Observable<T>) _f.call(e);
}
});
}
Expand Down Expand Up @@ -999,11 +1013,14 @@ public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> ac
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(final Observable<T> sequence, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return reduce(sequence, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1071,11 +1088,14 @@ public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, F
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(final Observable<T> sequence, final T initialValue, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return reduce(sequence, initialValue, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1126,11 +1146,14 @@ public static <T> Observable<T> scan(Observable<T> sequence, Func2<T, T, T> accu
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(final Observable<T> sequence, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return scan(sequence, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1185,11 +1208,14 @@ public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Fun
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(final Observable<T> sequence, final T initialValue, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return scan(sequence, initialValue, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1359,11 +1385,13 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(sortFunction);
return OperationToObservableSortedList.toSortedList(sequence, new Func2<T, T, Integer>() {

@Override
public Integer call(T t1, T t2) {
return Functions.execute(sortFunction, t1, t2);
return (Integer) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1422,11 +1450,14 @@ public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return zip(w0, w1, new Func2<T0, T1, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T0 t0, T1 t1) {
return Functions.execute(function, t0, t1);
return (R) _f.call(t0, t1);
}

});
Expand Down Expand Up @@ -1493,11 +1524,14 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return zip(w0, w1, w2, new Func3<T0, T1, T2, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T0 t0, T1 t1, T2 t2) {
return Functions.execute(function, t0, t1, t2);
return (R) _f.call(t0, t1, t2);
}

});
Expand Down Expand Up @@ -1566,11 +1600,14 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return zip(w0, w1, w2, w3, new Func4<T0, T1, T2, T3, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
return Functions.execute(function, t0, t1, t2, t3);
return (R) _f.call(t0, t1, t2, t3);
}

});
Expand All @@ -1588,7 +1625,7 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
* @return a Observable that emits only those items in the original Observable that the filter
* evaluates as <code>true</code>
*/
public Observable<T> filter(Func1<Boolean, T> predicate) {
public Observable<T> filter(Func1<T, Boolean> predicate) {
return filter(this, predicate);
}

Expand All @@ -1605,10 +1642,12 @@ public Observable<T> filter(Func1<Boolean, T> predicate) {
* evaluates as "true"
*/
public Observable<T> filter(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return filter(this, new Func1<T, Boolean>() {

public Boolean call(T t1) {
return Functions.execute(callback, t1);
return (Boolean) _f.call(t1);
}
});
}
Expand Down Expand Up @@ -1638,7 +1677,7 @@ public Observable<T> last() {
* @return a Observable that emits a sequence that is the result of applying the transformation
* closure to each item in the sequence emitted by the input Observable.
*/
public <R> Observable<R> map(Func1<R, T> func) {
public <R> Observable<R> map(Func1<T, R> func) {
return map(this, func);
}

Expand All @@ -1655,10 +1694,13 @@ public <R> Observable<R> map(Func1<R, T> func) {
* closure to each item in the sequence emitted by the input Observable.
*/
public <R> Observable<R> map(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return map(this, new Func1<T, R>() {

@SuppressWarnings("unchecked")
public R call(T t1) {
return Functions.execute(callback, t1);
return (R) _f.call(t1);
}
});
}
Expand Down Expand Up @@ -1698,10 +1740,13 @@ public <R> Observable<R> mapMany(Func1<T, Observable<R>> func) {
* Observables obtained from this transformation.
*/
public <R> Observable<R> mapMany(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return mapMany(this, new Func1<T, Observable<R>>() {

@SuppressWarnings("unchecked")
public Observable<R> call(T t1) {
return Functions.execute(callback, t1);
return (Observable<R>) _f.call(t1);
}
});
}
Expand Down Expand Up @@ -1771,10 +1816,13 @@ public Observable<T> onErrorResumeNext(final Func1<Exception, Observable<T>> res
* @return the original Observable with appropriately modified behavior
*/
public Observable<T> onErrorResumeNext(final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {

@SuppressWarnings("unchecked")
public Observable<T> call(Exception e) {
return Functions.execute(resumeFunction, e);
return (Observable<T>) _f.call(e);
}
});
}
Expand Down Expand Up @@ -1857,10 +1905,13 @@ public Observable<T> onErrorReturn(Func1<Exception, T> resumeFunction) {
* @return the original Observable with appropriately modified behavior
*/
public Observable<T> onErrorReturn(final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorReturn(this, new Func1<Exception, T>() {

@SuppressWarnings("unchecked")
public T call(Exception e) {
return Functions.execute(resumeFunction, e);
return (T) _f.call(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
/* package */final class AtomicObserver<T> implements Observer<T> {

/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
private static boolean allowMultiThreaded = true;
private static boolean allowMultiThreaded = false;
static {
String v = System.getProperty("rx.onNext.multithreaded.enabled");
if (v != null) {
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action0.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action0 {
public interface Action0 extends Function {
public void call();
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action1.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action1<T1> {
public interface Action1<T1> extends Function {
public void call(T1 t1);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action2.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action2<T1, T2> {
public interface Action2<T1, T2> extends Function {
public void call(T1 t1, T2 t2);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action3.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action3<T1, T2, T3> {
public interface Action3<T1, T2, T3> extends Function {
public void call(T1 t1, T2 t2, T3 t3);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func0.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func0<R> {
public interface Func0<R> extends Function {
public R call();
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func1.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func1<T1, R> {
public interface Func1<T1, R> extends Function {
public R call(T1 t1);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func2.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func2<T1, T2, R> {
public interface Func2<T1, T2, R> extends Function {
public R call(T1 t1, T2 t2);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func3.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func3<T1, T2, T3, R> {
public interface Func3<T1, T2, T3, R> extends Function {
public R call(T1 t1, T2 t2, T3 t3);
}
Loading

0 comments on commit 2fd22df

Please sign in to comment.