diff --git a/rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java b/rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java index 8322151b3e..f187a3c8ed 100644 --- a/rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java +++ b/rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java @@ -46,20 +46,27 @@ public void onNext(T t) { } public void doIt(Object t) { - int c = count.getAndIncrement(); - if (c == 0) { - // if c == 0 then queue is empty and we can emit immediately + // --- queue --- + // optimistically try to acquire draining responsibilities without enqueueing + if (count.compareAndSet(0, 1)) { emitNotification(t); - // try draining if anything else has been added concurrently - do { - emitNotification(queue.poll()); - } while (count.decrementAndGet() > 0); } else { + // enqueue then try to acquire draining responsibilities if (t == null) { queue.add(NULL_SENTINEL); } else { queue.add(t); } + if (count.getAndIncrement() > 0) + return; + + // reduce to same case as other branch, needing to decrement count + emitNotification(queue.poll()); + } + + // --- drain --- + while (count.decrementAndGet() > 0) { + emitNotification(queue.poll()); } } diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java index d0f7950ebb..ad8b1693ee 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java @@ -14,9 +14,9 @@ import rx.schedulers.Schedulers; public class OperatorSerializePerformance extends AbstractPerformanceTester { - // static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams + static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams - static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream + // static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream // static int reps = 1000; // interval streams @@ -32,8 +32,8 @@ public static void main(String args[]) { @Override public void call() { - // spt.timeTwoStreams(); - spt.timeSingleStream(); + spt.timeTwoStreams(); + // spt.timeSingleStream(); // spt.timeTwoStreamsIntervals(); } }); @@ -144,11 +144,11 @@ public void call(Integer t1) { * * -> using queue and counter technique (SerializedObserverViaQueueAndCounter) * - * Run: 10 - 2,857,008 ops/sec - * Run: 11 - 3,093,778 ops/sec - * Run: 12 - 4,009,758 ops/sec - * Run: 13 - 3,094,435 ops/sec - * Run: 14 - 3,166,119 ops/sec + * Run: 10 - 2,153,329 ops/sec + * Run: 11 - 5,139,837 ops/sec + * Run: 12 - 2,295,464 ops/sec + * Run: 13 - 2,327,337 ops/sec + * Run: 14 - 2,220,375 ops/sec * * -> using queue and lock technique (SerializedObserverViaQueueAndLock) *