Skip to content

Commit

Permalink
Fix queue-and-counter race condition
Browse files Browse the repository at this point in the history
as per discussion at ReactiveX#962 (comment)
  • Loading branch information
benjchristensen committed Mar 16, 2014
1 parent 47640d5 commit 3134fab
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,27 @@ public void onNext(T t) {
}

public void doIt(Object t) {
int c = count.getAndIncrement();
if (c == 0) {
// if c == 0 then queue is empty and we can emit immediately
// --- queue ---
// optimistically try to acquire draining responsibilities without enqueueing
if (count.compareAndSet(0, 1)) {
emitNotification(t);
// try draining if anything else has been added concurrently
do {
emitNotification(queue.poll());
} while (count.decrementAndGet() > 0);
} else {
// enqueue then try to acquire draining responsibilities
if (t == null) {
queue.add(NULL_SENTINEL);
} else {
queue.add(t);
}
if (count.getAndIncrement() > 0)
return;

// reduce to same case as other branch, needing to decrement count
emitNotification(queue.poll());
}

// --- drain ---
while (count.decrementAndGet() > 0) {
emitNotification(queue.poll());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import rx.schedulers.Schedulers;

public class OperatorSerializePerformance extends AbstractPerformanceTester {
// static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams

static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream
// static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream

// static int reps = 1000; // interval streams

Expand All @@ -32,8 +32,8 @@ public static void main(String args[]) {

@Override
public void call() {
// spt.timeTwoStreams();
spt.timeSingleStream();
spt.timeTwoStreams();
// spt.timeSingleStream();
// spt.timeTwoStreamsIntervals();
}
});
Expand Down Expand Up @@ -144,11 +144,11 @@ public void call(Integer t1) {
*
* -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
*
* Run: 10 - 2,857,008 ops/sec
* Run: 11 - 3,093,778 ops/sec
* Run: 12 - 4,009,758 ops/sec
* Run: 13 - 3,094,435 ops/sec
* Run: 14 - 3,166,119 ops/sec
* Run: 10 - 2,153,329 ops/sec
* Run: 11 - 5,139,837 ops/sec
* Run: 12 - 2,295,464 ops/sec
* Run: 13 - 2,327,337 ops/sec
* Run: 14 - 2,220,375 ops/sec
*
* -> using queue and lock technique (SerializedObserverViaQueueAndLock)
*
Expand Down

0 comments on commit 3134fab

Please sign in to comment.