Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxRingBuffer with synchronization #2553

Merged
merged 4 commits into from
Jan 28, 2015

Conversation

akarnokd
Copy link
Member

Changed RxRingBuffer to use synchronized blocks for correctness. We are relying here upon biased locking and lock-elision. It gets pretty close to the baseline

Benchmark:

Benchmark              (size)        1.x   |    PR#2333  |    this   
1SyncStreamOfN              1  3779678,748 | 3767936,028 | 3775157,195
1SyncStreamOfN           1000    21250,675 |   18530,542 |   20759,900
1SyncStreamOfN        1000000       20,406 |      17,712 |      19,768
NAsyncStreamsOfN            1   115390,116 |  115629,480 |  113859,532
NAsyncStreamsOfN         1000        2,579 |       2,546 |       2,435
NSyncStreamsOf1             1  3543551,254 | 3602242,709 | 3539162,675
NSyncStreamsOf1           100   299166,910 |  301703,721 |  302642,458
NSyncStreamsOf1          1000    28404,751 |   28420,833 |   28030,881
NSyncStreamsOfN             1  4054571,577 | 4003156,953 | 4061124,105
NSyncStreamsOfN          1000       24,324 |      20,601 |      23,137
TwoAsyncStreamsOfN          1    85846,727 |   85682,983 |   86691,331
TwoAsyncStreamsOfN       1000     1823,137 |    1889,458 |    1761,977
reamOfNthatMergesIn1        1  3724179,351 | 3725068,220 | 3715637,985
reamOfNthatMergesIn1     1000    19051,928 |   19392,595 |   19487,059
reamOfNthatMergesIn1  1000000       18,265 |      18,069 |      18,102

Changes (in respect of 1.x):

  • using SpscArrayQueue, removed look-ahead and null check
  • using peek to check for emptyness in certain positions
  • using short-as-possible synchronization blocks

@@ -34,7 +34,7 @@
public static RxRingBuffer getSpscInstance() {
if (UnsafeAccess.isUnsafeAvailable()) {
// TODO the SpscArrayQueue isn't ready yet so using SpmcArrayQueue for now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If SpscArrayQueue is now fixed we can remove this comment.

@benjchristensen
Copy link
Member

Here is my perf comparison building on what I did at #2333 (comment):

1.x == 1.x branch as of Jan 22nd
PR2333 == PR #2333 after rebasing onto 1.x (Jan 22nd)
PR2189 == PR #2189 after rebasing onto 1.x (Jan 22nd)
PR2553 == PR #2553 on 1.x as of Jan 28

Benchmark                                          (size)   Mode   Samples          1.x        PR2189        PR2333       PR2553
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4879535.663   4773501.735   4807008.476  4701375.614
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    43295.567     37341.214     34950.819    41735.196
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       41.255        40.202        32.236       36.977
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    99885.768    101321.745     97689.264   100134.213
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.846         4.715         4.973        4.552
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4684222.432   4747380.010   4751592.996  4704340.700
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   456736.726    468609.567    467310.110   451615.231
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    40504.652     41472.463     41146.594    41767.150
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  4993477.475   5268523.818   5414652.857  5166606.822
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       44.460        42.820        32.926       40.496
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    79546.448     76853.391     73846.697    76358.122
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3138.932      3140.582      2672.720     2283.587
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5248113.569   5136570.967   5225289.115  4471499.594
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    39001.895     39254.876     39235.506    35812.513
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.039        35.282        33.375       33.375

The degradations of this are incremental and I think worth the correctness.

Crazy to me that we can't find a non-blocking approach that performs better than the JVM doing lock-elision.

I suggest we move forward with this after cleaning up the comments in the code that are no longer valid.

Fixes in comments.
Yet another undelrying
@akarnokd
Copy link
Member Author

I suggest we move forward with this after cleaning up the comments in the code that are no longer valid.

Okay. If you feel ready, you can merge it at your will. I'll take care of #2541.

else if (null != lvElement(lElementBuffer, offset)){
return false;
}
// if (producerIndex >= producerLookAhead) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to keep this commented code?

benjchristensen added a commit that referenced this pull request Jan 28, 2015
@benjchristensen benjchristensen merged commit dda25e6 into ReactiveX:1.x Jan 28, 2015
@akarnokd akarnokd deleted the RxRingBufferSynchronized branch January 28, 2015 20:33
This was referenced Jan 29, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants