diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e6a3dd4921..7c3ead37a2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5139,8 +5139,7 @@ public final ConnectableObservable multicast(Subject * * @@ -5154,6 +5153,23 @@ public final Observable observeOn(Scheduler scheduler) { return lift(new OperatorObserveOn(scheduler)); } + /** + * Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size. + *

+ * + * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @param bufferSize + * that will be rounded up to the next power of 2 + * @return the source Observable modified so that its {@link Observer}s are notified on the + * specified {@link Scheduler} + * @see RxJava Wiki: observeOn() + */ + public final Observable observeOn(Scheduler scheduler, int bufferSize) { + return lift(new OperatorObserveOn(scheduler, bufferSize)); + } + /** * Filters the items emitted by an Observable, only emitting those of the specified type. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index b2e10593b5..1b818013e9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -48,9 +48,35 @@ public class OperatorObserveOn implements Operator { private final Scheduler scheduler; + private final int bufferSize; - public OperatorObserveOn(Scheduler scheduler) { + /** + * + * @param scheduler + * @param bufferSize + * that will be rounded up to the next power of 2 + */ + public OperatorObserveOn(Scheduler scheduler, int bufferSize) { this.scheduler = scheduler; + this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize); + } + + public OperatorObserveOn(Scheduler scheduler) { + this(scheduler, 1); + } + + private static int roundToNextPowerOfTwoIfNecessary(int num) { + if ((num & -num) == num) { + return num; + } else { + int result = 1; + while (num != 0) + { + num >>= 1; + result <<= 1; + } + return result; + } } @Override @@ -87,7 +113,7 @@ private class ObserveOnSubscriber extends Subscriber { final Subscriber observer; private volatile Scheduler.Inner recursiveScheduler; - private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(); + private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize); final AtomicLong counter = new AtomicLong(0); public ObserveOnSubscriber(Subscriber observer) { @@ -101,9 +127,9 @@ public void onNext(final T t) { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed if (t == null) { - queue.put(NULL_SENTINEL); + queue.addBlocking(NULL_SENTINEL); } else { - queue.put(t); + queue.addBlocking(t); } schedule(); } catch (InterruptedException e) { @@ -118,7 +144,7 @@ public void onCompleted() { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - queue.put(COMPLETE_SENTINEL); + queue.addBlocking(COMPLETE_SENTINEL); schedule(); } catch (InterruptedException e) { onError(e); @@ -130,7 +156,7 @@ public void onError(final Throwable e) { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - queue.put(new ErrorSentinel(e)); + queue.addBlocking(new ErrorSentinel(e)); schedule(); } catch (InterruptedException e2) { // call directly if we can't schedule @@ -195,37 +221,34 @@ private void pollQueue() { } - /** - * Same behavior as ArrayBlockingQueue(1) except that we can interrupt/unsubscribe it. - */ private class InterruptibleBlockingQueue { - private final Semaphore semaphore = new Semaphore(1); - private volatile Object item; + private final Semaphore semaphore; private volatile boolean interrupted = false; - public Object poll() { - if (interrupted) { - return null; - } - if (item == null) { - return null; - } - try { - return item; - } finally { - item = null; - semaphore.release(); - } + private final Object[] buffer; + + private AtomicLong tail = new AtomicLong(); + private AtomicLong head = new AtomicLong(); + private final int capacity; + private final int mask; + + public InterruptibleBlockingQueue(final int size) { + this.semaphore = new Semaphore(size); + this.capacity = size; + this.mask = size - 1; + buffer = new Object[size]; } /** - * Add an Object, blocking if an item is already in the queue. - * - * @param o - * @throws InterruptedException + * Used to unsubscribe and interrupt the producer if blocked in put() */ - public void put(Object o) throws InterruptedException { + public void interrupt() { + interrupted = true; + semaphore.release(); + } + + public void addBlocking(final Object e) throws InterruptedException { if (interrupted) { throw new InterruptedException("Interrupted by Unsubscribe"); } @@ -233,19 +256,54 @@ public void put(Object o) throws InterruptedException { if (interrupted) { throw new InterruptedException("Interrupted by Unsubscribe"); } - if (o == null) { + if (e == null) { throw new IllegalArgumentException("Can not put null"); } - item = o; + + if (offer(e)) { + return; + } else { + throw new IllegalStateException("Queue is full"); + } } - /** - * Used to unsubscribe and interrupt the producer if blocked in put() - */ - public void interrupt() { - interrupted = true; - semaphore.release(); + private boolean offer(final Object e) { + final long _t = tail.get(); + if (_t - head.get() == capacity) { + // queue is full + return false; + } + int index = (int) (_t & mask); + buffer[index] = e; + // move the tail forward + tail.lazySet(_t + 1); + + return true; } + + public Object poll() { + if (interrupted) { + return null; + } + final long _h = head.get(); + if (tail.get() == _h) { + // nothing available + return null; + } + int index = (int) (_h & mask); + + // fetch the item + Object v = buffer[index]; + // allow GC to happen + buffer[index] = null; + // increment and signal we're done + head.lazySet(_h + 1); + if (v != null) { + semaphore.release(); + } + return v; + } + } } \ No newline at end of file diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java index c25857ea54..f14b2a25b3 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java @@ -8,7 +8,7 @@ public class OperatorObserveOnPerformance extends AbstractPerformanceTester { - private static long reps = 1000000; + private static long reps = 10000; OperatorObserveOnPerformance() { super(reps); @@ -34,10 +34,43 @@ public void call() { /** * Observable.from(1L).observeOn() * + * --- version 0.17.1 => with queue size == 1 + * + * Run: 10 - 115,033 ops/sec + * Run: 11 - 118,155 ops/sec + * Run: 12 - 120,526 ops/sec + * Run: 13 - 115,035 ops/sec + * Run: 14 - 116,102 ops/sec + * + * --- version 0.17.1 => with queue size == 16 + * + * Run: 10 - 850,412 ops/sec + * Run: 11 - 711,642 ops/sec + * Run: 12 - 788,332 ops/sec + * Run: 13 - 1,064,056 ops/sec + * Run: 14 - 656,857 ops/sec + * + * --- version 0.17.1 => with queue size == 1000000 + * + * Run: 10 - 5,162,622 ops/sec + * Run: 11 - 5,271,481 ops/sec + * Run: 12 - 4,442,470 ops/sec + * Run: 13 - 5,149,330 ops/sec + * Run: 14 - 5,146,680 ops/sec + * + * --- version 0.16.1 + * + * Run: 10 - 27,098,802 ops/sec + * Run: 11 - 24,204,284 ops/sec + * Run: 12 - 27,208,663 ops/sec + * Run: 13 - 26,879,552 ops/sec + * Run: 14 - 26,658,846 ops/sec + * + * */ public long timeObserveOn() { - Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread()); + Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread(), 16); IntegerSumObserver o = new IntegerSumObserver(); s.subscribe(o); return o.sum; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 5afa2729d6..7e522eccef 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -348,30 +348,35 @@ public void onNext(Integer t) { @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThreadAndBuffer8() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 8); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io(), 1); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline(), 1); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test(), 1); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation(), 1); } - private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler) throws InterruptedException { + private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler, int bufferSize) throws InterruptedException { final AtomicInteger countEmitted = new AtomicInteger(); final AtomicInteger countTaken = new AtomicInteger(); int value = Observable.create(new OnSubscribeFunc() { @@ -385,7 +390,7 @@ public Subscription onSubscribe(final Observer o) { public void run() { int i = 1; while (!s.isUnsubscribed() && i <= 100) { - System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); + // System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); o.onNext(i++); } o.onCompleted(); @@ -405,13 +410,13 @@ public void call(Integer i) { @Override public void call() { - System.out.println("-------- Done Emitting from Source ---------"); + // System.out.println("-------- Done Emitting from Source ---------"); } - }).observeOn(scheduler).doOnNext(new Action1() { + }).observeOn(scheduler, bufferSize).doOnNext(new Action1() { @Override public void call(Integer i) { - System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); + // System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); //force it to be slower than the producer try { Thread.sleep(10); @@ -420,7 +425,14 @@ public void call(Integer i) { } countTaken.incrementAndGet(); } - }).take(10).toBlockingObservable().last(); + }).take(10).doOnNext(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("*********** value: " + t); + } + + }).toBlockingObservable().last(); if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) { // since there is no concurrency it will block and only emit as many as it can process @@ -428,7 +440,7 @@ public void call(Integer i) { } else { // the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline // NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class. - assertEquals(12, countEmitted.get()); + assertEquals(11, countEmitted.get(), bufferSize); // can be up to 11 + bufferSize } // number received after take (but take will filter any extra) assertEquals(10, value);