diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index f8fdfdd9b1..82e65c46f5 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -69,6 +69,7 @@ import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; +import rx.operators.OperatorObserveOnBounded; import rx.operators.OperatorScan; import rx.operators.OperationSequenceEqual; import rx.operators.OperationSingle; @@ -5148,7 +5149,7 @@ public final ConnectableObservable multicast(Subject * * @@ -5162,25 +5163,6 @@ public final Observable observeOn(Scheduler scheduler) { return lift(new OperatorObserveOn(scheduler)); } - /** - * Move notifications to the specified {@link Scheduler} asynchronously with a buffer of a specified size. - *

- * - *

- * If the buffer fills to its maximum size... - * - * @param scheduler - * the {@link Scheduler} to notify {@link Observer}s on - * @param bufferSize - * that will be rounded up to the next power of 2 - * @return the source Observable modified so that its {@link Observer}s are notified on the specified - * {@link Scheduler} - * @see RxJava Wiki: observeOn() - */ - public final Observable observeOn(Scheduler scheduler, int bufferSize) { - return lift(new OperatorObserveOn(scheduler, bufferSize)); - } - /** * Filters the items emitted by an Observable, only emitting those of the specified type. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index 5fc9ed75e1..c963bf3783 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -15,7 +15,7 @@ */ package rx.operators; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import rx.Observable.Operator; @@ -23,61 +23,23 @@ import rx.Scheduler.Inner; import rx.Subscriber; import rx.schedulers.ImmediateScheduler; -import rx.schedulers.TestScheduler; import rx.schedulers.TrampolineScheduler; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Action1; /** - * Delivers events on the specified Scheduler. - *

- * This provides backpressure by blocking the incoming onNext when there is already one in the queue. - *

- * This means that at any given time the max number of "onNext" in flight is 3: - * -> 1 being delivered on the Scheduler - * -> 1 in the queue waiting for the Scheduler - * -> 1 blocking on the queue waiting to deliver it - * - * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler - * can loop and have something to do each time around to optimize for avoiding rescheduling when it - * can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop - * thus if the queue is empty it exits and next time something is added it will reschedule. + * Delivers events on the specified Scheduler asynchronously via an unbounded buffer. * * */ public class OperatorObserveOn implements Operator { private final Scheduler scheduler; - private final int bufferSize; /** - * * @param scheduler - * @param bufferSize - * that will be rounded up to the next power of 2 */ - public OperatorObserveOn(Scheduler scheduler, int bufferSize) { - this.scheduler = scheduler; - this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize); - } - public OperatorObserveOn(Scheduler scheduler) { - this(scheduler, 1); - } - - private static int roundToNextPowerOfTwoIfNecessary(int num) { - if ((num & -num) == num) { - return num; - } else { - int result = 1; - while (num != 0) - { - num >>= 1; - result <<= 1; - } - return result; - } + this.scheduler = scheduler; } @Override @@ -88,19 +50,19 @@ public Subscriber call(Subscriber child) { } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; - } else if (scheduler instanceof TestScheduler) { - // this one will deadlock as it is single-threaded and won't run the scheduled - // work until it manually advances, which it won't be able to do as it will block - return child; } else { return new ObserveOnSubscriber(child); } } - private static Object NULL_SENTINEL = new Object(); - private static Object COMPLETE_SENTINEL = new Object(); + private static class Sentinel { - private static class ErrorSentinel { + } + + private static Sentinel NULL_SENTINEL = new Sentinel(); + private static Sentinel COMPLETE_SENTINEL = new Sentinel(); + + private static class ErrorSentinel extends Sentinel { final Throwable e; ErrorSentinel(Throwable e) { @@ -113,7 +75,7 @@ private class ObserveOnSubscriber extends Subscriber { final Subscriber observer; private volatile Scheduler.Inner recursiveScheduler; - private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); final AtomicLong counter = new AtomicLong(0); public ObserveOnSubscriber(Subscriber observer) { @@ -123,62 +85,29 @@ public ObserveOnSubscriber(Subscriber observer) { @Override public void onNext(final T t) { - try { - // we want to block for natural back-pressure - // so that the producer waits for each value to be consumed - if (t == null) { - queue.addBlocking(NULL_SENTINEL); - } else { - queue.addBlocking(t); - } - schedule(); - } catch (InterruptedException e) { - if (!isUnsubscribed()) { - onError(e); - } + if (t == null) { + queue.offer(NULL_SENTINEL); + } else { + queue.offer(t); } + schedule(); } @Override public void onCompleted() { - try { - // we want to block for natural back-pressure - // so that the producer waits for each value to be consumed - queue.addBlocking(COMPLETE_SENTINEL); - schedule(); - } catch (InterruptedException e) { - onError(e); - } + queue.offer(COMPLETE_SENTINEL); + schedule(); } @Override public void onError(final Throwable e) { - try { - // we want to block for natural back-pressure - // so that the producer waits for each value to be consumed - queue.addBlocking(new ErrorSentinel(e)); - schedule(); - } catch (InterruptedException e2) { - // call directly if we can't schedule - observer.onError(e2); - } + queue.offer(new ErrorSentinel(e)); + schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { if (recursiveScheduler == null) { - // first time through, register a Subscription - // that can interrupt this thread - add(Subscriptions.create(new Action0() { - - @Override - public void call() { - // we have to interrupt the parent thread because - // it can be blocked on queue.put - queue.interrupt(); - } - - })); add(scheduler.schedule(new Action1() { @Override @@ -206,12 +135,14 @@ private void pollQueue() { do { Object v = queue.poll(); if (v != null) { - if (v == NULL_SENTINEL) { - observer.onNext(null); - } else if (v == COMPLETE_SENTINEL) { - observer.onCompleted(); - } else if (v instanceof ErrorSentinel) { - observer.onError(((ErrorSentinel) v).e); + if (v instanceof Sentinel) { + if (v == NULL_SENTINEL) { + observer.onNext(null); + } else if (v == COMPLETE_SENTINEL) { + observer.onCompleted(); + } else if (v instanceof ErrorSentinel) { + observer.onError(((ErrorSentinel) v).e); + } } else { observer.onNext((T) v); } @@ -221,110 +152,4 @@ private void pollQueue() { } - /** - * Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread). - * - * This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows - * unsubscribe behavior when this queue is being used. - * - * @param - */ - private static class InterruptibleBlockingQueue { - - private final Semaphore semaphore; - private volatile boolean interrupted = false; - - private final E[] buffer; - - private AtomicLong tail = new AtomicLong(); - private AtomicLong head = new AtomicLong(); - private final int capacity; - private final int mask; - - @SuppressWarnings("unchecked") - public InterruptibleBlockingQueue(final int size) { - this.semaphore = new Semaphore(size); - this.capacity = size; - this.mask = size - 1; - buffer = (E[]) new Object[size]; - } - - /** - * Used to unsubscribe and interrupt the producer if blocked in put() - */ - public void interrupt() { - interrupted = true; - semaphore.release(); - } - - public void addBlocking(final E e) throws InterruptedException { - if (interrupted) { - throw new InterruptedException("Interrupted by Unsubscribe"); - } - semaphore.acquire(); - if (interrupted) { - throw new InterruptedException("Interrupted by Unsubscribe"); - } - if (e == null) { - throw new IllegalArgumentException("Can not put null"); - } - - if (offer(e)) { - return; - } else { - throw new IllegalStateException("Queue is full"); - } - } - - private boolean offer(final E e) { - final long _t = tail.get(); - if (_t - head.get() == capacity) { - // queue is full - return false; - } - int index = (int) (_t & mask); - buffer[index] = e; - // move the tail forward - tail.lazySet(_t + 1); - - return true; - } - - public E poll() { - if (interrupted) { - return null; - } - final long _h = head.get(); - if (tail.get() == _h) { - // nothing available - return null; - } - int index = (int) (_h & mask); - - // fetch the item - E v = buffer[index]; - // allow GC to happen - buffer[index] = null; - // increment and signal we're done - head.lazySet(_h + 1); - if (v != null) { - semaphore.release(); - } - return v; - } - - public int size() - { - int size; - do - { - final long currentHead = head.get(); - final long currentTail = tail.get(); - size = (int) (currentTail - currentHead); - } while (size > buffer.length); - - return size; - } - - } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java new file mode 100644 index 0000000000..740f8fdad4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOnBounded.java @@ -0,0 +1,330 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Inner; +import rx.Subscriber; +import rx.schedulers.ImmediateScheduler; +import rx.schedulers.TestScheduler; +import rx.schedulers.TrampolineScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Delivers events on the specified Scheduler. + *

+ * This provides backpressure by blocking the incoming onNext when there is already one in the queue. + *

+ * This means that at any given time the max number of "onNext" in flight is 3: + * -> 1 being delivered on the Scheduler + * -> 1 in the queue waiting for the Scheduler + * -> 1 blocking on the queue waiting to deliver it + * + * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler + * can loop and have something to do each time around to optimize for avoiding rescheduling when it + * can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop + * thus if the queue is empty it exits and next time something is added it will reschedule. + * + * + */ +public class OperatorObserveOnBounded implements Operator { + + private final Scheduler scheduler; + private final int bufferSize; + + /** + * + * @param scheduler + * @param bufferSize + * that will be rounded up to the next power of 2 + */ + public OperatorObserveOnBounded(Scheduler scheduler, int bufferSize) { + this.scheduler = scheduler; + this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize); + } + + public OperatorObserveOnBounded(Scheduler scheduler) { + this(scheduler, 1); + } + + private static int roundToNextPowerOfTwoIfNecessary(int num) { + if ((num & -num) == num) { + return num; + } else { + int result = 1; + while (num != 0) + { + num >>= 1; + result <<= 1; + } + return result; + } + } + + @Override + public Subscriber call(Subscriber child) { + if (scheduler instanceof ImmediateScheduler) { + // avoid overhead, execute directly + return child; + } else if (scheduler instanceof TrampolineScheduler) { + // avoid overhead, execute directly + return child; + } else if (scheduler instanceof TestScheduler) { + // this one will deadlock as it is single-threaded and won't run the scheduled + // work until it manually advances, which it won't be able to do as it will block + return child; + } else { + return new ObserveOnSubscriber(child); + } + } + + private static Object NULL_SENTINEL = new Object(); + private static Object COMPLETE_SENTINEL = new Object(); + + private static class ErrorSentinel { + final Throwable e; + + ErrorSentinel(Throwable e) { + this.e = e; + } + } + + /** Observe through individual queue per observer. */ + private class ObserveOnSubscriber extends Subscriber { + final Subscriber observer; + private volatile Scheduler.Inner recursiveScheduler; + + private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize); + final AtomicLong counter = new AtomicLong(0); + + public ObserveOnSubscriber(Subscriber observer) { + super(observer); + this.observer = observer; + } + + @Override + public void onNext(final T t) { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + if (t == null) { + queue.addBlocking(NULL_SENTINEL); + } else { + queue.addBlocking(t); + } + schedule(); + } catch (InterruptedException e) { + if (!isUnsubscribed()) { + onError(e); + } + } + } + + @Override + public void onCompleted() { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + queue.addBlocking(COMPLETE_SENTINEL); + schedule(); + } catch (InterruptedException e) { + onError(e); + } + } + + @Override + public void onError(final Throwable e) { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + queue.addBlocking(new ErrorSentinel(e)); + schedule(); + } catch (InterruptedException e2) { + // call directly if we can't schedule + observer.onError(e2); + } + } + + protected void schedule() { + if (counter.getAndIncrement() == 0) { + if (recursiveScheduler == null) { + // first time through, register a Subscription + // that can interrupt this thread + add(Subscriptions.create(new Action0() { + + @Override + public void call() { + // we have to interrupt the parent thread because + // it can be blocked on queue.put + queue.interrupt(); + } + + })); + add(scheduler.schedule(new Action1() { + + @Override + public void call(Inner inner) { + recursiveScheduler = inner; + pollQueue(); + } + + })); + } else { + recursiveScheduler.schedule(new Action1() { + + @Override + public void call(Inner inner) { + pollQueue(); + } + + }); + } + } + } + + @SuppressWarnings("unchecked") + private void pollQueue() { + do { + Object v = queue.poll(); + if (v != null) { + if (v == NULL_SENTINEL) { + observer.onNext(null); + } else if (v == COMPLETE_SENTINEL) { + observer.onCompleted(); + } else if (v instanceof ErrorSentinel) { + observer.onError(((ErrorSentinel) v).e); + } else { + observer.onNext((T) v); + } + } + } while (counter.decrementAndGet() > 0); + } + + } + + /** + * Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread). + * + * This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows + * unsubscribe behavior when this queue is being used. + * + * @param + */ + private static class InterruptibleBlockingQueue { + + private final Semaphore semaphore; + private volatile boolean interrupted = false; + + private final E[] buffer; + + private AtomicLong tail = new AtomicLong(); + private AtomicLong head = new AtomicLong(); + private final int capacity; + private final int mask; + + @SuppressWarnings("unchecked") + public InterruptibleBlockingQueue(final int size) { + this.semaphore = new Semaphore(size); + this.capacity = size; + this.mask = size - 1; + buffer = (E[]) new Object[size]; + } + + /** + * Used to unsubscribe and interrupt the producer if blocked in put() + */ + public void interrupt() { + interrupted = true; + semaphore.release(); + } + + public void addBlocking(final E e) throws InterruptedException { + if (interrupted) { + throw new InterruptedException("Interrupted by Unsubscribe"); + } + semaphore.acquire(); + if (interrupted) { + throw new InterruptedException("Interrupted by Unsubscribe"); + } + if (e == null) { + throw new IllegalArgumentException("Can not put null"); + } + + if (offer(e)) { + return; + } else { + throw new IllegalStateException("Queue is full"); + } + } + + private boolean offer(final E e) { + final long _t = tail.get(); + if (_t - head.get() == capacity) { + // queue is full + return false; + } + int index = (int) (_t & mask); + buffer[index] = e; + // move the tail forward + tail.lazySet(_t + 1); + + return true; + } + + public E poll() { + if (interrupted) { + return null; + } + final long _h = head.get(); + if (tail.get() == _h) { + // nothing available + return null; + } + int index = (int) (_h & mask); + + // fetch the item + E v = buffer[index]; + // allow GC to happen + buffer[index] = null; + // increment and signal we're done + head.lazySet(_h + 1); + if (v != null) { + semaphore.release(); + } + return v; + } + + public int size() + { + int size; + do + { + final long currentHead = head.get(); + final long currentTail = tail.get(); + size = (int) (currentTail - currentHead); + } while (size > buffer.length); + + return size; + } + + } +} \ No newline at end of file diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java index f14b2a25b3..4a28e85d6d 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java @@ -22,7 +22,7 @@ public static void main(String args[]) { @Override public void call() { - spt.timeObserveOn(); + spt.timeObserveOnUnbounded(); } }); } catch (Exception e) { @@ -31,6 +31,35 @@ public void call() { } + /** + * Observable.from(1L).observeOn() using ImmediateScheduler + * + * NOTE: Variance is high enough that there is obviously something else that needs to be done to make these benchmarks trustworthy. + * + * --- version 0.17.1 + * + * Run: 10 - 69,444,444 ops/sec + * Run: 11 - 38,167,938 ops/sec + * Run: 12 - 41,841,004 ops/sec + * Run: 13 - 113,636,363 ops/sec + * Run: 14 - 38,759,689 ops/sec + * + * --- version 0.16.1 + * + * Run: 10 - 42,735,042 ops/sec + * Run: 11 - 40,160,642 ops/sec + * Run: 12 - 63,694,267 ops/sec + * Run: 13 - 75,757,575 ops/sec + * Run: 14 - 43,290,043 ops/sec + * + */ + public long timeObserveOnUnbounded() { + Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.immediate()); + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + return o.sum; + } + /** * Observable.from(1L).observeOn() * @@ -66,11 +95,9 @@ public void call() { * Run: 13 - 26,879,552 ops/sec * Run: 14 - 26,658,846 ops/sec * - * */ - public long timeObserveOn() { - - Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread(), 16); + public long timeObserveOnBounded() { + Observable s = Observable.range(1, (int) reps).lift(new OperatorObserveOnBounded(Schedulers.newThread(), 16)); IntegerSumObserver o = new IntegerSumObserver(); s.subscribe(o); return o.sum; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnBoundedTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnBoundedTest.java new file mode 100644 index 0000000000..260ea3ae1b --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnBoundedTest.java @@ -0,0 +1,461 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.schedulers.ImmediateScheduler; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; +import rx.schedulers.TrampolineScheduler; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class OperatorObserveOnBoundedTest { + + /** + * This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling. + */ + @Test + @SuppressWarnings("unchecked") + public void testObserveOn() { + Observer observer = mock(Observer.class); + Observable.from(1, 2, 3).lift(new OperatorObserveOnBounded(Schedulers.immediate(), 1)).subscribe(observer); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + @Test + @SuppressWarnings("unchecked") + public void testOrdering() throws InterruptedException { + Observable obs = Observable.from("one", null, "two", "three", "four"); + + Observer observer = mock(Observer.class); + + InOrder inOrder = inOrder(observer); + + final CountDownLatch completedLatch = new CountDownLatch(1); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + completedLatch.countDown(); + return null; + } + }).when(observer).onCompleted(); + + obs.lift(new OperatorObserveOnBounded(Schedulers.computation(), 1)).subscribe(observer); + + if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) { + fail("timed out waiting"); + } + + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(1)).onNext(null); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(1)).onNext("four"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + @SuppressWarnings("unchecked") + public void testThreadName() throws InterruptedException { + System.out.println("Main Thread: " + Thread.currentThread().getName()); + Observable obs = Observable.from("one", null, "two", "three", "four"); + + Observer observer = mock(Observer.class); + final String parentThreadName = Thread.currentThread().getName(); + + final CountDownLatch completedLatch = new CountDownLatch(1); + + // assert subscribe is on main thread + obs = obs.doOnNext(new Action1() { + + @Override + public void call(String s) { + String threadName = Thread.currentThread().getName(); + System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName); + assertEquals(parentThreadName, threadName); + } + + }); + + // assert observe is on new thread + obs.lift(new OperatorObserveOnBounded(Schedulers.newThread(), 1)).doOnNext(new Action1() { + + @Override + public void call(String t1) { + String threadName = Thread.currentThread().getName(); + boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler"); + System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName); + assertTrue(correctThreadName); + } + + }).finallyDo(new Action0() { + + @Override + public void call() { + completedLatch.countDown(); + + } + }).subscribe(observer); + + if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) { + fail("timed out waiting"); + } + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(5)).onNext(any(String.class)); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void observeOnTheSameSchedulerTwice() { + Scheduler scheduler = Schedulers.immediate(); + + Observable o = Observable.from(1, 2, 3); + Observable o2 = o.lift(new OperatorObserveOnBounded(scheduler, 1)); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + + InOrder inOrder1 = inOrder(observer1); + InOrder inOrder2 = inOrder(observer2); + + o2.subscribe(observer1); + o2.subscribe(observer2); + + inOrder1.verify(observer1, times(1)).onNext(1); + inOrder1.verify(observer1, times(1)).onNext(2); + inOrder1.verify(observer1, times(1)).onNext(3); + inOrder1.verify(observer1, times(1)).onCompleted(); + verify(observer1, never()).onError(any(Throwable.class)); + inOrder1.verifyNoMoreInteractions(); + + inOrder2.verify(observer2, times(1)).onNext(1); + inOrder2.verify(observer2, times(1)).onNext(2); + inOrder2.verify(observer2, times(1)).onNext(3); + inOrder2.verify(observer2, times(1)).onCompleted(); + verify(observer2, never()).onError(any(Throwable.class)); + inOrder2.verifyNoMoreInteractions(); + } + + @Test + public void observeSameOnMultipleSchedulers() { + TestScheduler scheduler1 = new TestScheduler(); + TestScheduler scheduler2 = new TestScheduler(); + + Observable o = Observable.from(1, 2, 3); + Observable o1 = o.lift(new OperatorObserveOnBounded(scheduler1, 1)); + Observable o2 = o.lift(new OperatorObserveOnBounded(scheduler2, 1)); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + + InOrder inOrder1 = inOrder(observer1); + InOrder inOrder2 = inOrder(observer2); + + o1.subscribe(observer1); + o2.subscribe(observer2); + + scheduler1.advanceTimeBy(1, TimeUnit.SECONDS); + scheduler2.advanceTimeBy(1, TimeUnit.SECONDS); + + inOrder1.verify(observer1, times(1)).onNext(1); + inOrder1.verify(observer1, times(1)).onNext(2); + inOrder1.verify(observer1, times(1)).onNext(3); + inOrder1.verify(observer1, times(1)).onCompleted(); + verify(observer1, never()).onError(any(Throwable.class)); + inOrder1.verifyNoMoreInteractions(); + + inOrder2.verify(observer2, times(1)).onNext(1); + inOrder2.verify(observer2, times(1)).onNext(2); + inOrder2.verify(observer2, times(1)).onNext(3); + inOrder2.verify(observer2, times(1)).onCompleted(); + verify(observer2, never()).onError(any(Throwable.class)); + inOrder2.verifyNoMoreInteractions(); + } + + /** + * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream + */ + @Test + public void testObserveOnWithNewThreadScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).lift(new OperatorObserveOnBounded(Schedulers.newThread(), 1)) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); + } + + }); + } + + /** + * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered. + */ + @Test + public void testObserveOnWithThreadPoolScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).lift(new OperatorObserveOnBounded(Schedulers.computation(), 1)) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + /** + * Attempts to confirm that when pauses exist between events, the ScheduledObserver + * does not lose or reorder any events since the scheduler will not block, but will + * be re-scheduled when it receives new events after each pause. + * + * + * This is non-deterministic in proving success, but if it ever fails (non-deterministically) + * it is a sign of potential issues as thread-races and scheduling should not affect output. + */ + @Test + public void testObserveOnOrderingConcurrency() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 10000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + if (randomIntFrom0to100() > 98) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return t1 * _multiple; + } + + }).lift(new OperatorObserveOnBounded(Schedulers.computation(), 1)) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + @Test + public void testNonBlockingOuterWhileBlockingOnNext() throws InterruptedException { + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicLong completeTime = new AtomicLong(); + // use subscribeOn to make async, observeOn to move + Observable.range(1, 1000).subscribeOn(Schedulers.newThread()).lift(new OperatorObserveOnBounded(Schedulers.newThread(), 1)).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("onCompleted"); + completeTime.set(System.nanoTime()); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + + } + + }); + + long afterSubscribeTime = System.nanoTime(); + System.out.println("After subscribe: " + latch.getCount()); + assertEquals(1, latch.getCount()); + latch.await(); + assertTrue(completeTime.get() > afterSubscribeTime); + System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime)); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThreadAndBuffer8() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 8); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation(), 1); + } + + private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler, int bufferSize) throws InterruptedException { + final AtomicInteger countEmitted = new AtomicInteger(); + final AtomicInteger countTaken = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + final BooleanSubscription s = BooleanSubscription.create(); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int i = 1; + while (!s.isUnsubscribed() && i <= 100) { + // System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); + o.onNext(i++); + } + o.onCompleted(); + } + }); + t.setDaemon(true); + t.start(); + return s; + } + }).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + countEmitted.incrementAndGet(); + } + }).doOnCompleted(new Action0() { + + @Override + public void call() { + // System.out.println("-------- Done Emitting from Source ---------"); + } + }).lift(new OperatorObserveOnBounded(scheduler, bufferSize)).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + // System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); + //force it to be slower than the producer + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + countTaken.incrementAndGet(); + } + }).take(10).doOnNext(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("*********** value: " + t); + } + + }).toBlockingObservable().last(); + + if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) { + // since there is no concurrency it will block and only emit as many as it can process + assertEquals(10, countEmitted.get()); + } else { + // the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline + // NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class. + assertEquals(11, countEmitted.get(), bufferSize); // can be up to 11 + bufferSize + } + // number received after take (but take will filter any extra) + assertEquals(10, value); + // so we also want to check the doOnNext after observeOn to see if it got unsubscribed + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + // we expect only 10 to make it through the observeOn side + assertEquals(10, countTaken.get()); + } + + private static int randomIntFrom0to100() { + // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml + long x = System.nanoTime(); + x ^= (x << 21); + x ^= (x >>> 35); + x ^= (x << 4); + return Math.abs((int) x % 100); + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 7e522eccef..f02970118c 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -346,109 +346,6 @@ public void onNext(Integer t) { System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime)); } - @Test - public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 1); - } - - @Test - public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThreadAndBuffer8() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 8); - } - - @Test - public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io(), 1); - } - - @Test - public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline(), 1); - } - - @Test - public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test(), 1); - } - - @Test - public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation(), 1); - } - - private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler, int bufferSize) throws InterruptedException { - final AtomicInteger countEmitted = new AtomicInteger(); - final AtomicInteger countTaken = new AtomicInteger(); - int value = Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer o) { - final BooleanSubscription s = BooleanSubscription.create(); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - int i = 1; - while (!s.isUnsubscribed() && i <= 100) { - // System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); - o.onNext(i++); - } - o.onCompleted(); - } - }); - t.setDaemon(true); - t.start(); - return s; - } - }).doOnNext(new Action1() { - - @Override - public void call(Integer i) { - countEmitted.incrementAndGet(); - } - }).doOnCompleted(new Action0() { - - @Override - public void call() { - // System.out.println("-------- Done Emitting from Source ---------"); - } - }).observeOn(scheduler, bufferSize).doOnNext(new Action1() { - - @Override - public void call(Integer i) { - // System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); - //force it to be slower than the producer - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - countTaken.incrementAndGet(); - } - }).take(10).doOnNext(new Action1() { - - @Override - public void call(Integer t) { - System.out.println("*********** value: " + t); - } - - }).toBlockingObservable().last(); - - if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) { - // since there is no concurrency it will block and only emit as many as it can process - assertEquals(10, countEmitted.get()); - } else { - // the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline - // NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class. - assertEquals(11, countEmitted.get(), bufferSize); // can be up to 11 + bufferSize - } - // number received after take (but take will filter any extra) - assertEquals(10, value); - // so we also want to check the doOnNext after observeOn to see if it got unsubscribed - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - // we expect only 10 to make it through the observeOn side - assertEquals(10, countTaken.get()); - } private static int randomIntFrom0to100() { // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml