-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance Optimizations #104
Comments
Simple performance test to demonstrate performance issue: package testing;
import java.util.concurrent.atomic.AtomicInteger;
import rx.observables.Observable;
import rx.observables.Observer;
import rx.util.Func1;
public class PerformanceTest {
public static void main(String[] args) {
PerformanceTest test = new PerformanceTest();
Integer[] values = new Integer[100001];
for (int i = 0; i < values.length; i++) {
values[i] = i;
}
for (int i = 0; i < 100; i++) {
System.out.println("-------------------------------");
// test.runFunctionExecutionTest(values);
test.runCompositionTest(values);
test.runNonCompositionalTestWithDirectLoop(values);
test.runNonCompositionalTestWithArrayOfFunctions(values);
}
}
public void runCompositionTest(Integer[] values) {
System.out.println("runCompositionTest");
final AtomicInteger onNextSum = new AtomicInteger(0);
final long start = System.nanoTime();
MathFunction m = new MathFunction();
// 50 levels of composition (same function so that's not the cost)
Observable.from(values)
.map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m)
.map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m)
.map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m)
.map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m)
.map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m).map(m)
.subscribe(new TestObserver(onNextSum, start));
}
public void runNonCompositionalTestWithDirectLoop(Integer[] values) {
System.out.println("runNonCompositionalTestWithDirectLoop");
final AtomicInteger onNextSum = new AtomicInteger(0);
final long start = System.nanoTime();
final MathFunction m = new MathFunction();
Observable.from(values).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t1) {
// iterate the 50 times here in a loop rather than via composition
for (int i = 0; i < 50; i++) {
t1 = m.call(t1);
}
return t1;
}
}).subscribe(new TestObserver(onNextSum, start));
}
public void runNonCompositionalTestWithArrayOfFunctions(Integer[] values) {
System.out.println("runNonCompositionalTestWithArrayOfFunctions");
final AtomicInteger onNextSum = new AtomicInteger(0);
final long start = System.nanoTime();
final MathFunction m = new MathFunction();
final Func1[] functionCalls = new Func1<?, ?>[50];
for (int i = 0; i < 50; i++) {
functionCalls[i] = m;
}
Observable.from(values).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t1) {
// iterate the 50 times here in a loop rather than via composition
for (Func1<Integer, Integer> f : functionCalls) {
t1 = f.call(t1);
}
return t1;
}
}).subscribe(new TestObserver(onNextSum, start));
}
private static final class TestObserver implements Observer<Integer> {
private final AtomicInteger onNextSum;
private final long start;
private TestObserver(AtomicInteger onNextSum, long start) {
this.onNextSum = onNextSum;
this.start = start;
}
@Override
public void onNext(Integer i) {
onNextSum.addAndGet(i);
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
@Override
public void onCompleted() {
long end = System.nanoTime();
System.out.println("Sum: " + onNextSum.get() + " Time: " + ((double) (end - start)) / 1000 / 1000 + "ms");
}
}
private static class MathFunction implements Func1<Integer, Integer> {
@Override
public Integer call(Integer t1) {
return t1 + 1;
}
}
} |
On existing code the composition of 50 'map' calls takes ~370ms whereas a single call with a list of the same 50 functions takes ~23ms.
|
Playing with different things I got the following numbers:
|
By adding chaining (to collapse composition into a single observable with chain of functions) and improving the handling of dynamic function calls through memoization I get this:
I can get the composed version down to around 25-28ms if I use ArrayList but I need to be thread-safe in this case so am using ConcurrentLinkedQueue which adds a little time, but it's still far faster than before. Same code now takes ~48ms instead of ~370ms. |
Pretty significant performance improvements came from improved handling of function execution - in particular memoizing the logic for constructing a function, particularly dynamic ones. The long if/then conditional block for handling the many different Func/Action/language functions was very expensive in sequence with lots of data. This was fixed in #106 I'm still testing the potential gains from chaining. |
While playing with chaining I found that the actual performance issues were the dynamic function construction/lookup and concurrency/synchronization costs - not the depth of stack. Here is code that demonstrates that the stack depth is not a problem for performance. In fact, the composed version performs better than the loops: package testing;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import rx.util.Func1;
public class TestChainPerformance {
public static void main(String[] args) {
TestChainPerformance test = new TestChainPerformance();
Integer[] values = new Integer[100001];
for (int i = 0; i < values.length; i++) {
values[i] = i;
}
try {
for (int i = 0; i < 100; i++) {
System.out.println("-------------------------------");
test.runChained(values);
test.runComposed(values);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void runComposed(final Integer[] values) throws Exception {
long start = System.nanoTime();
Callable<Integer> c = null;
for (int i = 0; i < 250; i++) {
final Callable<Integer> previousC = c;
c = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
MathFunction f = new MathFunction();
int sum = 0;
for (int v : values) {
sum += f.call(v);
}
if (previousC != null) {
sum += previousC.call();
}
return sum;
}
};
}
int sum = c.call();
long end = System.nanoTime();
System.out.println("Composed => Sum: " + sum + " Time: " + ((double) (end - start)) / 1000 / 1000 + "ms");
}
public void runChained(Integer[] values) {
long start = System.nanoTime();
int sum = 0;
ArrayList<Func1<Integer, Integer>> functions = new ArrayList<Func1<Integer, Integer>>();
for (int i = 0; i < 250; i++) {
functions.add(new MathFunction());
}
for (int v : values) {
for (Func1<Integer, Integer> f : functions) {
sum += f.call(v);
}
}
long end = System.nanoTime();
System.out.println("Iterative => Sum: " + sum + " Time: " + ((double) (end - start)) / 1000 / 1000 + "ms");
}
private static class MathFunction implements Func1<Integer, Integer> {
@Override
public Integer call(Integer t1) {
return t1 + 1;
}
}
} The performance numbers on my machine are
That is with JDK 7. JDK 6 is slower but similar difference between the two. Due to this I'm abandoning pursuit of chaining as a performance enhancement and focusing on functions and synchronization. |
In pull request #106 with function memoization we went from this:
to this:
By reducing the nested synchronization (eliminating most of it) it is now running at:
|
Performance is now very close to what it is without Rx:
vs
I am considering this overhead acceptable right now and not spending further time on this issue. |
…ation Performance optimizations for dynamic function execution. ReactiveX#104
- migrated Func1 to OperatorSubscribeFunction for internal operator implementations - do not wrap with AtomicObserver when it's a trusted operator ReactiveX#104
Performance optimizations for dynamic function execution. ReactiveX/RxJava#104
Very large compositions have a performance hit as they increase the size of the stack and call onNext at each level of composition.
Pursue an initial round of optimizations that collapses the composition into a sequence of functions that get applied at subscribe() rather than subsequent calls of onNext.
UPDATED: See the comments below for where this evolved to. Chaining was shown through testing to not be the issue.
The text was updated successfully, but these errors were encountered: