Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Ring Buffer Default Sizes (and lower for Android) #1836

Merged
merged 2 commits into from
Nov 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 191 additions & 1 deletion src/main/java/rx/internal/util/IndexedRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,197 @@ public final static IndexedRingBuffer getInstance() {
private final IndexSection removed = new IndexSection();
/* package for unit testing */final AtomicInteger index = new AtomicInteger();
/* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger();
/* package for unit testing */static final int SIZE = 512;

// default size of ring buffer
/**
* Set at 256 ... Android defaults far smaller which likely will never hit the use cases that require the higher buffers.
* <p>
* The 10000 size test represents something that should be a rare use case (merging 10000 concurrent Observables for example)
*
* <pre> {@code
* ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*IndexedRingBufferPerf.*'
*
* 1024
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 269292.006 6013.347 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 2217.103 163.396 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139349.608 9397.232 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 1045.323 30.991 ops/s
*
* 512
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 270919.870 5381.793 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1724.436 42.287 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 141478.813 3696.030 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 719.447 75.629 ops/s
*
*
* 256
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 272042.605 7954.982 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1101.329 23.566 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 140479.804 6389.060 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 397.306 24.222 ops/s
*
* 128
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263065.312 11168.941 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 581.708 17.397 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 138051.488 4618.935 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 176.873 35.669 ops/s
*
* 32
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 250737.473 17260.148 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 144.725 26.284 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 118832.832 9082.658 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 32.133 8.048 ops/s
*
* 8
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 209192.847 25558.124 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 26.520 3.100 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 100200.463 1854.259 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 8.456 2.114 ops/s
*
* 2
*
* Benchmark (size) Mode Samples Score Score error Units
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 96549.208 4427.239 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 6.637 2.025 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 34553.169 4904.197 ops/s
* r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 2.159 0.700 ops/s
* } </pre>
*
* Impact of IndexedRingBuffer size on merge
*
* <pre> {@code
* ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
*
* 512
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5282500.038 530541.761 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 49327.272 6382.189 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.025 4.724 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 97395.148 2489.303 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.723 1.479 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4534067.250 116321.725 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 458561.098 27652.081 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 43267.381 2648.107 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5581051.672 144191.849 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.643 4.354 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76437.644 959.748 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2965.306 272.928 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5026522.098 364196.255 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34926.819 938.612 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 33.342 1.701 ops/s
*
*
* 128
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5144891.776 271990.561 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 53580.161 2370.204 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.265 2.236 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96634.426 1417.430 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.648 0.255 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4601280.220 53157.938 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 463394.568 58612.882 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50503.565 2394.168 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5490315.842 228654.817 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.661 3.385 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 74716.169 7413.642 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3009.476 277.075 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4953313.642 307512.126 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 35335.579 2368.377 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 37.450 0.655 ops/s
*
* 32
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4975957.497 365423.694 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 52141.226 5056.658 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.663 2.671 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96507.893 1833.371 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.850 0.782 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4557128.302 118516.934 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 339005.037 10594.737 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50781.535 6071.787 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5604920.068 209285.840 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.413 7.496 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76098.942 558.187 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2988.137 193.255 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5177255.256 150253.086 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34772.490 909.967 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 34.847 0.606 ops/s
*
* 8
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5027331.903 337986.410 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51746.540 3585.450 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 52.682 4.026 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96805.587 2868.112 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.598 0.290 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4390912.630 300687.310 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 458615.731 56125.958 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 49033.105 6132.936 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5090614.100 649439.778 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 48.548 3.586 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 72285.482 16820.952 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2981.576 316.140 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4993609.293 267975.397 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 33228.972 1554.924 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 32.994 3.615 ops/s
*
*
* 2
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5103812.234 939461.192 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51491.116 3790.056 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 54.043 2.340 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96575.834 13416.541 ops/s
* r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.740 0.047 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4435909.832 899133.671 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 392382.445 59814.783 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50429.258 7489.849 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5637321.803 161838.195 ops/s
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 51.065 2.138 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76366.764 2631.710 ops/s
* r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2978.302 296.418 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5280829.290 1602542.493 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 35070.518 3565.672 ops/s
* r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 34.501 0.991 ops/s
*
* } </pre>
*/
static int _size = 256;
static {
// lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
if (PlatformDependent.isAndroid()) {
_size = 8;
}

// possible system property for overriding
String sizeFromProperty = System.getProperty("rx.indexed-ring-buffer.size"); // also see RxRingBuffer
if (sizeFromProperty != null) {
try {
_size = Integer.parseInt(sizeFromProperty);
} catch (Exception e) {
System.err.println("Failed to set 'rx.indexed-ring-buffer.size' with value " + sizeFromProperty + " => " + e.getMessage());
}
}
}

/* package for unit testing */static final int SIZE = _size;

/**
* This resets the arrays, nulls out references and returns it to the pool.
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/rx/internal/util/PlatformDependent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import java.security.AccessController;
import java.security.PrivilegedAction;

/**
* Allow platform dependent logic such as checks for Android.
*
* Modeled after Netty with some code copy/pasted from: https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/PlatformDependent.java
*/
public final class PlatformDependent {

private static final boolean IS_ANDROID = isAndroid0();

/**
* Returns {@code true} if and only if the current platform is Android
*/
public static boolean isAndroid() {
return IS_ANDROID;
}

private static boolean isAndroid0() {
boolean android;
try {
Class.forName("android.app.Application", false, getSystemClassLoader());
android = true;
} catch (Exception e) {
// Failed to load the class uniquely available in Android.
android = false;
}

return android;
}

/**
* Return the system {@link ClassLoader}.
*/
static ClassLoader getSystemClassLoader() {
if (System.getSecurityManager() == null) {
return ClassLoader.getSystemClassLoader();
} else {
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
public ClassLoader run() {
return ClassLoader.getSystemClassLoader();
}
});
}
}
}
Loading