diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3d672326e5..7c3ead37a2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,7 +49,6 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; -import rx.operators.OperatorDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; @@ -63,13 +62,11 @@ import rx.operators.OperationMergeDelayError; import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; -import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; -import rx.operators.OperatorRepeat; import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; @@ -96,18 +93,21 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; -import rx.operators.OperatorSubscribeOn; -import rx.operators.OperatorZip; import rx.operators.OperatorCast; +import rx.operators.OperatorDoOnEach; import rx.operators.OperatorFromIterable; import rx.operators.OperatorGroupBy; import rx.operators.OperatorMap; import rx.operators.OperatorMerge; +import rx.operators.OperatorObserveOn; import rx.operators.OperatorParallel; +import rx.operators.OperatorRepeat; +import rx.operators.OperatorSubscribeOn; import rx.operators.OperatorTake; import rx.operators.OperatorTimestamp; import rx.operators.OperatorToObservableList; import rx.operators.OperatorToObservableSortedList; +import rx.operators.OperatorZip; import rx.operators.OperatorZipIterable; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; @@ -5139,8 +5139,7 @@ public final ConnectableObservable multicast(Subject * * @@ -5151,9 +5150,26 @@ public final ConnectableObservable multicast(SubjectRxJava Wiki: observeOn() */ public final Observable observeOn(Scheduler scheduler) { - return create(OperationObserveOn.observeOn(this, scheduler)); + return lift(new OperatorObserveOn(scheduler)); } + /** + * Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size. + *

+ * + * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @param bufferSize + * that will be rounded up to the next power of 2 + * @return the source Observable modified so that its {@link Observer}s are notified on the + * specified {@link Scheduler} + * @see RxJava Wiki: observeOn() + */ + public final Observable observeOn(Scheduler scheduler, int bufferSize) { + return lift(new OperatorObserveOn(scheduler, bufferSize)); + } + /** * Filters the items emitted by an Observable, only emitting those of the specified type. *

@@ -5296,7 +5312,9 @@ public final Observable onExceptionResumeNext(final Observable r * @see RxJava Wiki: parallel() */ public final Observable parallel(Func1, Observable> f) { - return lift(new OperatorParallel(f, Schedulers.computation())); + // TODO move this back to Schedulers.computation() again once that is properly using eventloops + // see https://github.com/Netflix/RxJava/issues/713 for why this was changed + return lift(new OperatorParallel(f, Schedulers.newThread())); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java deleted file mode 100644 index c3addd9933..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; - -import rx.Notification; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Inner; -import rx.Subscription; -import rx.schedulers.ImmediateScheduler; -import rx.schedulers.TrampolineScheduler; -import rx.subscriptions.CompositeSubscription; -import rx.util.functions.Action1; - -/** - * Asynchronously notify Observers on the specified Scheduler. - *

- * - */ -public class OperationObserveOn { - - public static OnSubscribeFunc observeOn(Observable source, Scheduler scheduler) { - return new ObserveOn(source, scheduler); - } - - private static class ObserveOn implements OnSubscribeFunc { - private final Observable source; - private final Scheduler scheduler; - - public ObserveOn(Observable source, Scheduler scheduler) { - this.source = source; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - if (scheduler instanceof ImmediateScheduler) { - // do nothing if we request ImmediateScheduler so we don't invoke overhead - return source.subscribe(observer); - } else if (scheduler instanceof TrampolineScheduler) { - // do nothing if we request CurrentThreadScheduler so we don't invoke overhead - return source.subscribe(observer); - } else { - return new Observation(observer).init(); - } - } - - /** Observe through individual queue per observer. */ - private class Observation { - final Observer observer; - final CompositeSubscription compositeSubscription = new CompositeSubscription(); - final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - final AtomicLong counter = new AtomicLong(0); - private volatile Scheduler.Inner recursiveScheduler; - - public Observation(Observer observer) { - this.observer = observer; - } - - public Subscription init() { - compositeSubscription.add(source.materialize().subscribe(new SourceObserver())); - return compositeSubscription; - } - - private class SourceObserver implements Action1> { - - @Override - public void call(Notification e) { - queue.offer(e); - if (counter.getAndIncrement() == 0) { - if (recursiveScheduler == null) { - // compositeSubscription for the outer scheduler, recursive for inner - compositeSubscription.add(scheduler.schedule(new Action1() { - - @Override - public void call(Inner inner) { - // record innerScheduler so 'processQueue' can use it for all subsequent executions - recursiveScheduler = inner; - // once we have the innerScheduler we can start doing real work - processQueue(); - } - - })); - } else { - processQueue(); - } - } - } - - void processQueue() { - recursiveScheduler.schedule(new Action1() { - @Override - public void call(Inner inner) { - Notification not = queue.poll(); - if (not != null) { - not.accept(observer); - } - - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - inner.schedule(this); - } - } - }); - } - } - } - } - -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java new file mode 100644 index 0000000000..1b818013e9 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -0,0 +1,309 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Scheduler; +import rx.Scheduler.Inner; +import rx.Subscriber; +import rx.schedulers.ImmediateScheduler; +import rx.schedulers.TestScheduler; +import rx.schedulers.TrampolineScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Delivers events on the specified Scheduler. + *

+ * This provides backpressure by blocking the incoming onNext when there is already one in the queue. + *

+ * This means that at any given time the max number of "onNext" in flight is 3: + * -> 1 being delivered on the Scheduler + * -> 1 in the queue waiting for the Scheduler + * -> 1 blocking on the queue waiting to deliver it + * + * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler + * can loop and have something to do each time around to optimize for avoiding rescheduling when it + * can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop + * thus if the queue is empty it exits and next time something is added it will reschedule. + * + * + */ +public class OperatorObserveOn implements Operator { + + private final Scheduler scheduler; + private final int bufferSize; + + /** + * + * @param scheduler + * @param bufferSize + * that will be rounded up to the next power of 2 + */ + public OperatorObserveOn(Scheduler scheduler, int bufferSize) { + this.scheduler = scheduler; + this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize); + } + + public OperatorObserveOn(Scheduler scheduler) { + this(scheduler, 1); + } + + private static int roundToNextPowerOfTwoIfNecessary(int num) { + if ((num & -num) == num) { + return num; + } else { + int result = 1; + while (num != 0) + { + num >>= 1; + result <<= 1; + } + return result; + } + } + + @Override + public Subscriber call(Subscriber child) { + if (scheduler instanceof ImmediateScheduler) { + // avoid overhead, execute directly + return child; + } else if (scheduler instanceof TrampolineScheduler) { + // avoid overhead, execute directly + return child; + } else if (scheduler instanceof TestScheduler) { + // this one will deadlock as it is single-threaded and won't run the scheduled + // work until it manually advances, which it won't be able to do as it will block + return child; + } else { + return new ObserveOnSubscriber(child); + } + + } + + private static Object NULL_SENTINEL = new Object(); + private static Object COMPLETE_SENTINEL = new Object(); + + private static class ErrorSentinel { + final Throwable e; + + ErrorSentinel(Throwable e) { + this.e = e; + } + } + + /** Observe through individual queue per observer. */ + private class ObserveOnSubscriber extends Subscriber { + final Subscriber observer; + private volatile Scheduler.Inner recursiveScheduler; + + private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize); + final AtomicLong counter = new AtomicLong(0); + + public ObserveOnSubscriber(Subscriber observer) { + super(observer); + this.observer = observer; + } + + @Override + public void onNext(final T t) { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + if (t == null) { + queue.addBlocking(NULL_SENTINEL); + } else { + queue.addBlocking(t); + } + schedule(); + } catch (InterruptedException e) { + if (!isUnsubscribed()) { + onError(e); + } + } + } + + @Override + public void onCompleted() { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + queue.addBlocking(COMPLETE_SENTINEL); + schedule(); + } catch (InterruptedException e) { + onError(e); + } + } + + @Override + public void onError(final Throwable e) { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + queue.addBlocking(new ErrorSentinel(e)); + schedule(); + } catch (InterruptedException e2) { + // call directly if we can't schedule + observer.onError(e2); + } + } + + protected void schedule() { + if (counter.getAndIncrement() == 0) { + if (recursiveScheduler == null) { + // first time through, register a Subscription + // that can interrupt this thread + add(Subscriptions.create(new Action0() { + + @Override + public void call() { + // we have to interrupt the parent thread because + // it can be blocked on queue.put + queue.interrupt(); + } + + })); + add(scheduler.schedule(new Action1() { + + @Override + public void call(Inner inner) { + recursiveScheduler = inner; + pollQueue(); + } + + })); + } else { + recursiveScheduler.schedule(new Action1() { + + @Override + public void call(Inner inner) { + pollQueue(); + } + + }); + } + } + } + + @SuppressWarnings("unchecked") + private void pollQueue() { + do { + Object v = queue.poll(); + if (v != null) { + if (v == NULL_SENTINEL) { + observer.onNext(null); + } else if (v == COMPLETE_SENTINEL) { + observer.onCompleted(); + } else if (v instanceof ErrorSentinel) { + observer.onError(((ErrorSentinel) v).e); + } else { + observer.onNext((T) v); + } + } + } while (counter.decrementAndGet() > 0); + } + + } + + private class InterruptibleBlockingQueue { + + private final Semaphore semaphore; + private volatile boolean interrupted = false; + + private final Object[] buffer; + + private AtomicLong tail = new AtomicLong(); + private AtomicLong head = new AtomicLong(); + private final int capacity; + private final int mask; + + public InterruptibleBlockingQueue(final int size) { + this.semaphore = new Semaphore(size); + this.capacity = size; + this.mask = size - 1; + buffer = new Object[size]; + } + + /** + * Used to unsubscribe and interrupt the producer if blocked in put() + */ + public void interrupt() { + interrupted = true; + semaphore.release(); + } + + public void addBlocking(final Object e) throws InterruptedException { + if (interrupted) { + throw new InterruptedException("Interrupted by Unsubscribe"); + } + semaphore.acquire(); + if (interrupted) { + throw new InterruptedException("Interrupted by Unsubscribe"); + } + if (e == null) { + throw new IllegalArgumentException("Can not put null"); + } + + if (offer(e)) { + return; + } else { + throw new IllegalStateException("Queue is full"); + } + } + + private boolean offer(final Object e) { + final long _t = tail.get(); + if (_t - head.get() == capacity) { + // queue is full + return false; + } + int index = (int) (_t & mask); + buffer[index] = e; + // move the tail forward + tail.lazySet(_t + 1); + + return true; + } + + public Object poll() { + if (interrupted) { + return null; + } + final long _h = head.get(); + if (tail.get() == _h) { + // nothing available + return null; + } + int index = (int) (_h & mask); + + // fetch the item + Object v = buffer[index]; + // allow GC to happen + buffer[index] = null; + // increment and signal we're done + head.lazySet(_h + 1); + if (v != null) { + semaphore.release(); + } + return v; + } + + } + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index d9ad39aabb..bb0a75e358 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -183,4 +183,8 @@ public Thread newThread(Runnable r) { return result; } + + public static TestScheduler test() { + return new TestScheduler(); + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java index 2c10fa4efb..a858a93ccb 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java @@ -19,24 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Action1; -import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue queue = new PriorityQueue(11, new CompareActionsByTime()); + private static long counter = 0; private static class TimedAction { private final long time; private final Action1 action; private final Inner scheduler; + private final long count = counter++; // for differentiating tasks at same time private TimedAction(Inner scheduler, long time, Action1 action) { this.time = time; @@ -53,7 +51,11 @@ public String toString() { private static class CompareActionsByTime implements Comparator { @Override public int compare(TimedAction action1, TimedAction action2) { - return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + if (action1.time == action2.time) { + return Long.valueOf(action1.count).compareTo(Long.valueOf(action2.count)); + } else { + return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + } } } diff --git a/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java b/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java index f55dd998bd..6e892a324b 100644 --- a/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java +++ b/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java @@ -7,6 +7,10 @@ public class ObservableCreatePerformance extends AbstractPerformanceTester { + ObservableCreatePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final ObservableCreatePerformance spt = new ObservableCreatePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java index acb41eb576..ea3f2c18c3 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java @@ -11,6 +11,10 @@ public class OperatorFromIterablePerformance extends AbstractPerformanceTester { + OperatorFromIterablePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorFromIterablePerformance spt = new OperatorFromIterablePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java index 2f73af1d0e..dc85a1d6e6 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java @@ -8,6 +8,10 @@ public class OperatorMapPerformance extends AbstractPerformanceTester { + OperatorMapPerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorMapPerformance spt = new OperatorMapPerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java index de4b204ff1..99ce4deac9 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java @@ -9,6 +9,10 @@ public class OperatorMergePerformance extends AbstractPerformanceTester { + OperatorMergePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorMergePerformance spt = new OperatorMergePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java new file mode 100644 index 0000000000..f14b2a25b3 --- /dev/null +++ b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java @@ -0,0 +1,79 @@ +package rx.operators; + +import rx.Observable; +import rx.perf.AbstractPerformanceTester; +import rx.perf.IntegerSumObserver; +import rx.schedulers.Schedulers; +import rx.util.functions.Action0; + +public class OperatorObserveOnPerformance extends AbstractPerformanceTester { + + private static long reps = 10000; + + OperatorObserveOnPerformance() { + super(reps); + } + + public static void main(String args[]) { + + final OperatorObserveOnPerformance spt = new OperatorObserveOnPerformance(); + try { + spt.runTest(new Action0() { + + @Override + public void call() { + spt.timeObserveOn(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /** + * Observable.from(1L).observeOn() + * + * --- version 0.17.1 => with queue size == 1 + * + * Run: 10 - 115,033 ops/sec + * Run: 11 - 118,155 ops/sec + * Run: 12 - 120,526 ops/sec + * Run: 13 - 115,035 ops/sec + * Run: 14 - 116,102 ops/sec + * + * --- version 0.17.1 => with queue size == 16 + * + * Run: 10 - 850,412 ops/sec + * Run: 11 - 711,642 ops/sec + * Run: 12 - 788,332 ops/sec + * Run: 13 - 1,064,056 ops/sec + * Run: 14 - 656,857 ops/sec + * + * --- version 0.17.1 => with queue size == 1000000 + * + * Run: 10 - 5,162,622 ops/sec + * Run: 11 - 5,271,481 ops/sec + * Run: 12 - 4,442,470 ops/sec + * Run: 13 - 5,149,330 ops/sec + * Run: 14 - 5,146,680 ops/sec + * + * --- version 0.16.1 + * + * Run: 10 - 27,098,802 ops/sec + * Run: 11 - 24,204,284 ops/sec + * Run: 12 - 27,208,663 ops/sec + * Run: 13 - 26,879,552 ops/sec + * Run: 14 - 26,658,846 ops/sec + * + * + */ + public long timeObserveOn() { + + Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread(), 16); + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + return o.sum; + } + +} \ No newline at end of file diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java index b64b3c9de1..3e8e8be702 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java @@ -7,6 +7,10 @@ public class OperatorTakePerformance extends AbstractPerformanceTester { + OperatorTakePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorTakePerformance spt = new OperatorTakePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java index 2f22b81303..0d929b6a33 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java @@ -3,12 +3,15 @@ import rx.Observable; import rx.perf.AbstractPerformanceTester; import rx.perf.IntegerSumObserver; -import rx.perf.LongSumObserver; import rx.util.functions.Action0; import rx.util.functions.Func2; public class OperatorZipPerformance extends AbstractPerformanceTester { + OperatorZipPerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorZipPerformance spt = new OperatorZipPerformance(); diff --git a/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java b/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java index 56d92278ab..468f900f88 100644 --- a/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java +++ b/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java @@ -6,9 +6,15 @@ public abstract class AbstractPerformanceTester { - public static final int REPETITIONS = 5 * 1000 * 1000; + public static final long REPETITIONS = 5 * 1000 * 1000; public static final int NUM_PRODUCERS = 1; + private final long repetitions; + + protected AbstractPerformanceTester(long repetitions) { + this.repetitions = repetitions; + } + public final void runTest(Action0 action) throws InterruptedException { for (int runNum = 0; runNum < 15; runNum++) { System.gc(); @@ -19,7 +25,7 @@ public final void runTest(Action0 action) throws InterruptedException { action.call(); long duration = System.nanoTime() - start; - long opsPerSec = (REPETITIONS * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration; + long opsPerSec = (repetitions * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration; System.out.printf("Run: %d - %,d ops/sec \n", Integer.valueOf(runNum), Long.valueOf(opsPerSec)); @@ -39,14 +45,14 @@ public final void runTest(Action0 action) throws InterruptedException { */ public long baseline() { LongSumObserver o = new LongSumObserver(); - for (long l = 0; l < REPETITIONS; l++) { + for (long l = 0; l < repetitions; l++) { o.onNext(l); } o.onCompleted(); return o.sum; } - - public static Iterable ITERABLE_OF_REPETITIONS = new Iterable() { + + public Iterable ITERABLE_OF_REPETITIONS = new Iterable() { @Override public Iterator iterator() { @@ -55,7 +61,7 @@ public Iterator iterator() { @Override public boolean hasNext() { - return count <= REPETITIONS; + return count <= repetitions; } @Override @@ -71,5 +77,5 @@ public void remove() { }; }; }; - + } diff --git a/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java b/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java index b582e9edd0..487f34b558 100644 --- a/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java +++ b/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java @@ -4,6 +4,11 @@ import rx.util.functions.Action0; public class CompositeSubscriptionAddRemovePerf extends AbstractPerformanceTester { + + CompositeSubscriptionAddRemovePerf() { + super(REPETITIONS); + } + public static void main(String[] args) { final CompositeSubscriptionAddRemovePerf spt = new CompositeSubscriptionAddRemovePerf(); try { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java index c808e396bb..92b085ea0f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java @@ -484,21 +484,16 @@ public Integer call(Integer i) { @Override public Observable call(GroupedObservable group) { if (group.getKey() == 0) { - return group.observeOn(Schedulers.newThread()).map(new Func1() { + return group.delay(100, TimeUnit.MILLISECONDS).map(new Func1() { @Override public Integer call(Integer t) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - e.printStackTrace(); - } return t * 10; } }); } else { - return group.observeOn(Schedulers.newThread()); + return group; } } }) diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java similarity index 63% rename from rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 0e3a892599..7e522eccef 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -18,11 +18,11 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationObserveOn.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; import org.mockito.InOrder; @@ -30,14 +30,20 @@ import org.mockito.stubbing.Answer; import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.schedulers.ImmediateScheduler; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; +import rx.schedulers.TrampolineScheduler; +import rx.subscriptions.BooleanSubscription; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; -public class OperationObserveOnTest { +public class OperatorObserveOnTest { /** * This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling. @@ -46,7 +52,7 @@ public class OperationObserveOnTest { @SuppressWarnings("unchecked") public void testObserveOn() { Observer observer = mock(Observer.class); - Observable.create(observeOn(Observable.from(1, 2, 3), Schedulers.immediate())).subscribe(observer); + Observable.from(1, 2, 3).observeOn(Schedulers.immediate()).subscribe(observer); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onNext(2); @@ -142,7 +148,7 @@ public void call() { @Test public void observeOnTheSameSchedulerTwice() { - TestScheduler scheduler = new TestScheduler(); + Scheduler scheduler = Schedulers.immediate(); Observable o = Observable.from(1, 2, 3); Observable o2 = o.observeOn(scheduler); @@ -158,8 +164,6 @@ public void observeOnTheSameSchedulerTwice() { o2.subscribe(observer1); o2.subscribe(observer2); - scheduler.advanceTimeBy(1, TimeUnit.SECONDS); - inOrder1.verify(observer1, times(1)).onNext(1); inOrder1.verify(observer1, times(1)).onNext(2); inOrder1.verify(observer1, times(1)).onNext(3); @@ -173,7 +177,6 @@ public void observeOnTheSameSchedulerTwice() { inOrder2.verify(observer2, times(1)).onCompleted(); verify(observer2, never()).onError(any(Throwable.class)); inOrder2.verifyNoMoreInteractions(); - } @Test @@ -308,6 +311,145 @@ public void call(Integer t1) { }); } + @Test + public void testNonBlockingOuterWhileBlockingOnNext() throws InterruptedException { + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicLong completeTime = new AtomicLong(); + // use subscribeOn to make async, observeOn to move + Observable.range(1, 1000).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("onCompleted"); + completeTime.set(System.nanoTime()); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + + } + + }); + + long afterSubscribeTime = System.nanoTime(); + System.out.println("After subscribe: " + latch.getCount()); + assertEquals(1, latch.getCount()); + latch.await(); + assertTrue(completeTime.get() > afterSubscribeTime); + System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime)); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThreadAndBuffer8() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 8); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation(), 1); + } + + private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler, int bufferSize) throws InterruptedException { + final AtomicInteger countEmitted = new AtomicInteger(); + final AtomicInteger countTaken = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + final BooleanSubscription s = BooleanSubscription.create(); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int i = 1; + while (!s.isUnsubscribed() && i <= 100) { + // System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); + o.onNext(i++); + } + o.onCompleted(); + } + }); + t.setDaemon(true); + t.start(); + return s; + } + }).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + countEmitted.incrementAndGet(); + } + }).doOnCompleted(new Action0() { + + @Override + public void call() { + // System.out.println("-------- Done Emitting from Source ---------"); + } + }).observeOn(scheduler, bufferSize).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + // System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); + //force it to be slower than the producer + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + countTaken.incrementAndGet(); + } + }).take(10).doOnNext(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("*********** value: " + t); + } + + }).toBlockingObservable().last(); + + if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) { + // since there is no concurrency it will block and only emit as many as it can process + assertEquals(10, countEmitted.get()); + } else { + // the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline + // NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class. + assertEquals(11, countEmitted.get(), bufferSize); // can be up to 11 + bufferSize + } + // number received after take (but take will filter any extra) + assertEquals(10, value); + // so we also want to check the doOnNext after observeOn to see if it got unsubscribed + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + // we expect only 10 to make it through the observeOn side + assertEquals(10, countTaken.get()); + } + private static int randomIntFrom0to100() { // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml long x = System.nanoTime(); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java index 9eb07b20ef..02e832d5d8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java @@ -16,7 +16,6 @@ package rx.operators; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.Arrays; @@ -32,6 +31,7 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; import rx.util.functions.Func1; @@ -288,4 +288,17 @@ public void call(Subscriber op) { } }); + + @Test(timeout = 2000) + public void testTakeObserveOn() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + INFINITE_OBSERVABLE.observeOn(Schedulers.newThread()).take(1).subscribe(o); + + verify(o).onNext(1L); + verify(o, never()).onNext(2L); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index bfea193310..046710bfaf 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -55,73 +55,6 @@ public abstract class AbstractSchedulerTests { */ protected abstract Scheduler getScheduler(); - @Test - public final void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException { - final AtomicInteger countEmitted = new AtomicInteger(); - final AtomicInteger countTaken = new AtomicInteger(); - int value = Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer o) { - final BooleanSubscription s = BooleanSubscription.create(); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - int i = 1; - while (!s.isUnsubscribed() && i <= 100) { - System.out.println("onNext from fast producer: " + i); - o.onNext(i++); - } - o.onCompleted(); - } - }); - t.setDaemon(true); - t.start(); - return s; - } - }).doOnNext(new Action1() { - - @Override - public void call(Integer i) { - countEmitted.incrementAndGet(); - } - }).doOnCompleted(new Action0() { - - @Override - public void call() { - System.out.println("-------- Done Emitting from Source ---------"); - } - }).observeOn(getScheduler()).doOnNext(new Action1() { - - @Override - public void call(Integer i) { - System.out.println(">> onNext to slowConsumer pre-take: " + i); - //force it to be slower than the producer - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - countTaken.incrementAndGet(); - } - }).take(10).toBlockingObservable().last(); - - if (getScheduler() instanceof TrampolineScheduler || getScheduler() instanceof ImmediateScheduler) { - // since there is no concurrency it will block and only emit as many as it can process - assertEquals(10, countEmitted.get()); - } else { - // they will all emit because the consumer is running slow - assertEquals(100, countEmitted.get()); - } - // number received after take (but take will filter any extra) - assertEquals(10, value); - // so we also want to check the doOnNext after observeOn to see if it got unsubscribed - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - // we expect only 10 to make it through the observeOn side - assertEquals(10, countTaken.get()); - } - @Test public void testNestedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index 14a6bbb039..3a360300f5 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -311,7 +311,8 @@ public void onNext(String v) { subject.onNext("two"); assertEquals("two", lastValueForObserver1.get()); - Subscription s2 = subject.observeOn(Schedulers.newThread()).subscribe(observer2); + // use subscribeOn to make this async otherwise we deadlock as we are using CountDownLatches + Subscription s2 = subject.subscribeOn(Schedulers.newThread()).subscribe(observer2); System.out.println("before waiting for one"); @@ -321,12 +322,23 @@ public void onNext(String v) { System.out.println("after waiting for one"); subject.onNext("three"); + + System.out.println("sent three"); + // if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet assertEquals("three", lastValueForObserver1.get()); + + System.out.println("about to send onCompleted"); + subject.onCompleted(); + System.out.println("completed subject"); + // release makeSlow.countDown(); + + System.out.println("makeSlow released"); + completed.await(); // all of them should be emitted with the last being "three" assertEquals("three", lastValueForObserver2.get());