From 0607db3630f9d2fe32c46b50c5c0adfaa4bf4d0f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 8 Nov 2014 09:39:29 -0800 Subject: [PATCH 1/2] RxRingBufferSize (128 default, 16 on Android) - changing from 1024 to 128 based on perf tests - platform dependent check for Android to set to 16 to reduce memory usage --- .../rx/internal/util/PlatformDependent.java | 65 ++++++++++ .../java/rx/internal/util/RxRingBuffer.java | 118 +++++++++++++++++- 2 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 src/main/java/rx/internal/util/PlatformDependent.java diff --git a/src/main/java/rx/internal/util/PlatformDependent.java b/src/main/java/rx/internal/util/PlatformDependent.java new file mode 100644 index 0000000000..c6b7f3c28f --- /dev/null +++ b/src/main/java/rx/internal/util/PlatformDependent.java @@ -0,0 +1,65 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.util; + +import java.security.AccessController; +import java.security.PrivilegedAction; + +/** + * Allow platform dependent logic such as checks for Android. + * + * Modeled after Netty with some code copy/pasted from: https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/PlatformDependent.java + */ +public final class PlatformDependent { + + private static final boolean IS_ANDROID = isAndroid0(); + + /** + * Returns {@code true} if and only if the current platform is Android + */ + public static boolean isAndroid() { + return IS_ANDROID; + } + + private static boolean isAndroid0() { + boolean android; + try { + Class.forName("android.app.Application", false, getSystemClassLoader()); + android = true; + } catch (Exception e) { + // Failed to load the class uniquely available in Android. + android = false; + } + + return android; + } + + /** + * Return the system {@link ClassLoader}. + */ + static ClassLoader getSystemClassLoader() { + if (System.getSecurityManager() == null) { + return ClassLoader.getSystemClassLoader(); + } else { + return AccessController.doPrivileged(new PrivilegedAction() { + @Override + public ClassLoader run() { + return ClassLoader.getSystemClassLoader(); + } + }); + } + } +} diff --git a/src/main/java/rx/internal/util/RxRingBuffer.java b/src/main/java/rx/internal/util/RxRingBuffer.java index 8d18b2407a..389793c1c3 100644 --- a/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/src/main/java/rx/internal/util/RxRingBuffer.java @@ -158,7 +158,123 @@ public static RxRingBuffer getSpmcInstance() { */ public volatile Object terminalState; - public static final int SIZE = 1024; + // default size of ring buffer + /** + * 128 was chosen as the default based on the numbers below. A stream processing system may benefit from increasing to 512+. + * + *
 {@code
+     * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOnPerf.*'
+     * 
+     * 1024
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   100642.874    24676.478    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4095.901       90.730    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        9.797        4.982    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 15536155.489   758579.454    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   156257.341     6324.176    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      157.099        7.143    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16864.641     1826.877    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     4269.317      169.480    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.393        1.047    ops/s
+     * 
+     * 512
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    98945.980    48050.282    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4111.149       95.987    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       12.483        3.067    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16032469.143   620157.818    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157997.290     5097.718    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      156.462        7.728    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    15813.984     8260.170    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     4358.334      251.609    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.647        0.613    ops/s
+     * 
+     * 256
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   108489.834     2688.489    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4526.674      728.019    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       13.372        0.457    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16435113.709   311602.627    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157611.204    13146.108    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      158.346        2.500    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16976.775      968.191    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     6238.210     2060.387    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       13.465        0.566    ops/s
+     * 
+     * 128
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   106887.027    29307.913    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     6713.891      202.989    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       11.929        0.187    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16055774.724   350633.068    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   153403.821    17976.156    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      153.559       20.178    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    17172.274      236.816    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     7073.555      595.990    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       11.855        1.093    ops/s
+     * 
+     * 32
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   106128.589    20986.201    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     6396.607       73.627    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        7.643        0.668    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16012419.447   409004.521    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157907.001     5772.849    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      155.308       23.853    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    16927.513      606.692    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     5191.084      244.876    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        8.288        0.217    ops/s
+     * 
+     * 16
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   109974.741      839.064    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     4538.912      173.561    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        5.420        0.111    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16017466.785   768748.695    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   157934.065    13479.575    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      155.922       17.781    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    14903.686     3325.205    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     3784.776     1054.131    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        5.624        0.130    ops/s
+     * 
+     * 2
+     * 
+     * Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5   112663.216      899.005    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5      899.737        9.460    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        0.999        0.100    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5 16087325.336   783206.227    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   156747.025     4880.489    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      156.645        3.810    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5    15958.711      673.895    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5      884.624       47.692    ops/s
+     * r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        1.173        0.100    ops/s
+     * } 
+ */ + static int _size = 128; + static { + // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820) + if (PlatformDependent.isAndroid()) { + _size = 16; + } + + // possible system property for overriding + String sizeFromProperty = System.getProperty("rx.ring-buffer.size"); // also see IndexedRingBuffer + if (sizeFromProperty != null) { + try { + _size = Integer.parseInt(sizeFromProperty); + } catch (Exception e) { + System.err.println("Failed to set 'rx.buffer.size' with value " + sizeFromProperty + " => " + e.getMessage()); + } + } + } + public static final int SIZE = _size; private static ObjectPool> SPSC_POOL = new ObjectPool>() { From 3badbbface6202e0db55ab9296c99beaab11c742 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 8 Nov 2014 10:19:09 -0800 Subject: [PATCH 2/2] IndexedRingBufferSize (256 default, 8 on Android) - changing from 512 to 256 based on perf tests - platform dependent check for Android to set to 8 to reduce memory usage (use cases on Android should rarely if ever hit the use case with merge that requires the higher buffer sizes for performance) --- .../rx/internal/util/IndexedRingBuffer.java | 192 +++++++++++++++++- 1 file changed, 191 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/util/IndexedRingBuffer.java b/src/main/java/rx/internal/util/IndexedRingBuffer.java index a84f6c917a..d353eac65f 100644 --- a/src/main/java/rx/internal/util/IndexedRingBuffer.java +++ b/src/main/java/rx/internal/util/IndexedRingBuffer.java @@ -65,7 +65,197 @@ public final static IndexedRingBuffer getInstance() { private final IndexSection removed = new IndexSection(); /* package for unit testing */final AtomicInteger index = new AtomicInteger(); /* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger(); - /* package for unit testing */static final int SIZE = 512; + + // default size of ring buffer + /** + * Set at 256 ... Android defaults far smaller which likely will never hit the use cases that require the higher buffers. + *

+ * The 10000 size test represents something that should be a rare use case (merging 10000 concurrent Observables for example) + * + *

 {@code
+     * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*IndexedRingBufferPerf.*'
+     * 
+     * 1024
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   269292.006     6013.347    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     2217.103      163.396    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   139349.608     9397.232    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5     1045.323       30.991    ops/s
+     * 
+     * 512
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   270919.870     5381.793    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1724.436       42.287    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   141478.813     3696.030    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      719.447       75.629    ops/s
+     * 
+     * 
+     * 256
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   272042.605     7954.982    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1101.329       23.566    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   140479.804     6389.060    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      397.306       24.222    ops/s
+     * 
+     * 128
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   263065.312    11168.941    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5      581.708       17.397    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   138051.488     4618.935    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      176.873       35.669    ops/s
+     * 
+     * 32
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   250737.473    17260.148    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5      144.725       26.284    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   118832.832     9082.658    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5       32.133        8.048    ops/s
+     * 
+     * 8
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   209192.847    25558.124    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5       26.520        3.100    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   100200.463     1854.259    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5        8.456        2.114    ops/s
+     * 
+     * 2
+     * 
+     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5    96549.208     4427.239    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5        6.637        2.025    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5    34553.169     4904.197    ops/s
+     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5        2.159        0.700    ops/s
+     * } 
+ * + * Impact of IndexedRingBuffer size on merge + * + *
 {@code
+     * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
+     * 
+     * 512
+     * 
+     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5282500.038   530541.761    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    49327.272     6382.189    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.025        4.724    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    97395.148     2489.303    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.723        1.479    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4534067.250   116321.725    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458561.098    27652.081    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    43267.381     2648.107    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5581051.672   144191.849    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.643        4.354    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76437.644      959.748    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2965.306      272.928    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5026522.098   364196.255    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34926.819      938.612    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       33.342        1.701    ops/s
+     * 
+     * 
+     * 128
+     * 
+     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5144891.776   271990.561    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    53580.161     2370.204    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.265        2.236    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96634.426     1417.430    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.648        0.255    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4601280.220    53157.938    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   463394.568    58612.882    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50503.565     2394.168    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5490315.842   228654.817    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.661        3.385    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    74716.169     7413.642    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3009.476      277.075    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4953313.642   307512.126    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35335.579     2368.377    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       37.450        0.655    ops/s
+     * 
+     * 32
+     * 
+     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4975957.497   365423.694    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    52141.226     5056.658    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.663        2.671    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96507.893     1833.371    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.850        0.782    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4557128.302   118516.934    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   339005.037    10594.737    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50781.535     6071.787    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5604920.068   209285.840    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.413        7.496    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76098.942      558.187    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2988.137      193.255    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5177255.256   150253.086    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34772.490      909.967    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.847        0.606    ops/s
+     * 
+     * 8
+     * 
+     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5027331.903   337986.410    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51746.540     3585.450    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       52.682        4.026    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96805.587     2868.112    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.598        0.290    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4390912.630   300687.310    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458615.731    56125.958    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    49033.105     6132.936    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5090614.100   649439.778    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       48.548        3.586    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    72285.482    16820.952    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2981.576      316.140    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4993609.293   267975.397    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    33228.972     1554.924    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       32.994        3.615    ops/s
+     * 
+     * 
+     * 2
+     * 
+     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5103812.234   939461.192    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51491.116     3790.056    ops/s
+     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       54.043        2.340    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96575.834    13416.541    ops/s
+     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.740        0.047    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4435909.832   899133.671    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   392382.445    59814.783    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50429.258     7489.849    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5637321.803   161838.195    ops/s
+     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       51.065        2.138    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76366.764     2631.710    ops/s
+     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2978.302      296.418    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5280829.290  1602542.493    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35070.518     3565.672    ops/s
+     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.501        0.991    ops/s
+     * 
+     * } 
+ */ + static int _size = 256; + static { + // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820) + if (PlatformDependent.isAndroid()) { + _size = 8; + } + + // possible system property for overriding + String sizeFromProperty = System.getProperty("rx.indexed-ring-buffer.size"); // also see RxRingBuffer + if (sizeFromProperty != null) { + try { + _size = Integer.parseInt(sizeFromProperty); + } catch (Exception e) { + System.err.println("Failed to set 'rx.indexed-ring-buffer.size' with value " + sizeFromProperty + " => " + e.getMessage()); + } + } + } + + /* package for unit testing */static final int SIZE = _size; /** * This resets the arrays, nulls out references and returns it to the pool.