From 9e2691729d94f00cde97efb0b39264c2f0c0b7f5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 5 Feb 2014 10:04:29 +0100 Subject: [PATCH 01/10] ObserveOn Merge from @akarnokd:OperatorRepeat2 --- rxjava-core/src/main/java/rx/Observable.java | 12 +- .../java/rx/operators/OperationObserveOn.java | 129 ------------------ .../java/rx/operators/OperatorObserveOn.java | 67 +++++++++ .../main/java/rx/operators/QueueDrain.java | 106 ++++++++++++++ ...OnTest.java => OperatorObserveOnTest.java} | 5 +- .../java/rx/operators/OperatorTakeTest.java | 15 +- 6 files changed, 195 insertions(+), 139 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationObserveOn.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java create mode 100644 rxjava-core/src/main/java/rx/operators/QueueDrain.java rename rxjava-core/src/test/java/rx/operators/{OperationObserveOnTest.java => OperatorObserveOnTest.java} (98%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3d672326e5..4723484c8f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,7 +49,6 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; -import rx.operators.OperatorDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; @@ -63,13 +62,11 @@ import rx.operators.OperationMergeDelayError; import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; -import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; -import rx.operators.OperatorRepeat; import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; @@ -96,18 +93,21 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; -import rx.operators.OperatorSubscribeOn; -import rx.operators.OperatorZip; import rx.operators.OperatorCast; +import rx.operators.OperatorDoOnEach; import rx.operators.OperatorFromIterable; import rx.operators.OperatorGroupBy; import rx.operators.OperatorMap; import rx.operators.OperatorMerge; +import rx.operators.OperatorObserveOn; import rx.operators.OperatorParallel; +import rx.operators.OperatorRepeat; +import rx.operators.OperatorSubscribeOn; import rx.operators.OperatorTake; import rx.operators.OperatorTimestamp; import rx.operators.OperatorToObservableList; import rx.operators.OperatorToObservableSortedList; +import rx.operators.OperatorZip; import rx.operators.OperatorZipIterable; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; @@ -5151,7 +5151,7 @@ public final ConnectableObservable multicast(SubjectRxJava Wiki: observeOn() */ public final Observable observeOn(Scheduler scheduler) { - return create(OperationObserveOn.observeOn(this, scheduler)); + return lift(new OperatorObserveOn(scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java deleted file mode 100644 index c3addd9933..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; - -import rx.Notification; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Inner; -import rx.Subscription; -import rx.schedulers.ImmediateScheduler; -import rx.schedulers.TrampolineScheduler; -import rx.subscriptions.CompositeSubscription; -import rx.util.functions.Action1; - -/** - * Asynchronously notify Observers on the specified Scheduler. - *

- * - */ -public class OperationObserveOn { - - public static OnSubscribeFunc observeOn(Observable source, Scheduler scheduler) { - return new ObserveOn(source, scheduler); - } - - private static class ObserveOn implements OnSubscribeFunc { - private final Observable source; - private final Scheduler scheduler; - - public ObserveOn(Observable source, Scheduler scheduler) { - this.source = source; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - if (scheduler instanceof ImmediateScheduler) { - // do nothing if we request ImmediateScheduler so we don't invoke overhead - return source.subscribe(observer); - } else if (scheduler instanceof TrampolineScheduler) { - // do nothing if we request CurrentThreadScheduler so we don't invoke overhead - return source.subscribe(observer); - } else { - return new Observation(observer).init(); - } - } - - /** Observe through individual queue per observer. */ - private class Observation { - final Observer observer; - final CompositeSubscription compositeSubscription = new CompositeSubscription(); - final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - final AtomicLong counter = new AtomicLong(0); - private volatile Scheduler.Inner recursiveScheduler; - - public Observation(Observer observer) { - this.observer = observer; - } - - public Subscription init() { - compositeSubscription.add(source.materialize().subscribe(new SourceObserver())); - return compositeSubscription; - } - - private class SourceObserver implements Action1> { - - @Override - public void call(Notification e) { - queue.offer(e); - if (counter.getAndIncrement() == 0) { - if (recursiveScheduler == null) { - // compositeSubscription for the outer scheduler, recursive for inner - compositeSubscription.add(scheduler.schedule(new Action1() { - - @Override - public void call(Inner inner) { - // record innerScheduler so 'processQueue' can use it for all subsequent executions - recursiveScheduler = inner; - // once we have the innerScheduler we can start doing real work - processQueue(); - } - - })); - } else { - processQueue(); - } - } - } - - void processQueue() { - recursiveScheduler.schedule(new Action1() { - @Override - public void call(Inner inner) { - Notification not = queue.poll(); - if (not != null) { - not.accept(observer); - } - - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - inner.schedule(this); - } - } - }); - } - } - } - } - -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java new file mode 100644 index 0000000000..bfbf52905b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -0,0 +1,67 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Notification; +import rx.Scheduler; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Action0; + +/** + * Move the observation of events to another thread via Scheduler. + * @param the item type + */ +public class OperatorObserveOn implements Operator { + final Scheduler scheduler; + public OperatorObserveOn(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber t1) { + final QueueDrain qd = new QueueDrain(t1); + final CompositeSubscription csub = new CompositeSubscription(); + t1.add(csub); + return new Subscriber(t1) { + /** Dispatch the notification value. */ + void run(final Notification nt) { + qd.enqueue(new Action0() { + @Override + public void call() { + nt.accept(t1); + } + }); + qd.tryDrainAsync(scheduler, csub); + } + @Override + public void onNext(final T args) { + run(Notification.createOnNext(args)); + } + + @Override + public void onError(final Throwable e) { + run(Notification.createOnError(e)); + } + + @Override + public void onCompleted() { + run(Notification.createOnCompleted()); + } + }; + } + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/QueueDrain.java b/rxjava-core/src/main/java/rx/operators/QueueDrain.java new file mode 100644 index 0000000000..7e85c237e0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/QueueDrain.java @@ -0,0 +1,106 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Action queue ensuring that only a single drain caller succeeds at a time. + * This class can be used to execute work without the issues of reentrancy and + * concurrency. + */ +public final class QueueDrain implements Runnable, Action0 { + /** The number of work items. */ + private final AtomicInteger wip = new AtomicInteger(); + /** The action queue. */ + private final BlockingQueue queue = new LinkedBlockingQueue(); + /** The subscription to stop the queue processing. */ + private final Subscription k; + /** + * Constructor which takes a cancellation token. + * @param k the cancellation token (aka subscription). + */ + public QueueDrain(Subscription k) { + this.k = k; + } + /** + * Enqueue an action. + * To execute any queued action, call {@link #tryDrain()} or + * submit this instance to a {@code Scheduler.schedule()} method. + * @param action the action to enqueue, not null + */ + public void enqueue(Action0 action) { + if (!k.isUnsubscribed()) { + queue.add(action); + } + } + /** + * Try draining the queue and executing the actions in it. + */ + public void tryDrain() { + if (wip.incrementAndGet() > 1 || k.isUnsubscribed()) { + return; + } + do { + queue.poll().call(); + } while (wip.decrementAndGet() > 0 && !k.isUnsubscribed()); + } + /** + * Try draining the queue on the given scheduler. + * The method ensures that only one thread is actively draining the + * queue on the given scheduler. + * @param scheduler the scheduler where the draining should happen + * @param cs the composite subscription to track the schedule + */ + public void tryDrainAsync(Scheduler scheduler, final CompositeSubscription cs) { + if (cs.isUnsubscribed() || wip.incrementAndGet() > 1) { + return; + } + // add tracking subscription only if schedule is run to avoid overfilling cs + final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + cs.add(mas); + mas.set(scheduler.schedule(new Action1() { + @Override + public void call(Scheduler.Inner o) { + if (!cs.isUnsubscribed()) { + do { + queue.poll().call(); + } while (wip.decrementAndGet() > 0 && !cs.isUnsubscribed()); + } + cs.remove(mas); + } + })); + } + @Override + public void run() { + // to help the draining of the queue on a ThreadPool/Scheduler + tryDrain(); + } + + @Override + public void call() { + tryDrain(); + } + +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java similarity index 98% rename from rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 0e3a892599..f180658722 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationObserveOn.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,7 +36,7 @@ import rx.util.functions.Action1; import rx.util.functions.Func1; -public class OperationObserveOnTest { +public class OperatorObserveOnTest { /** * This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling. @@ -46,7 +45,7 @@ public class OperationObserveOnTest { @SuppressWarnings("unchecked") public void testObserveOn() { Observer observer = mock(Observer.class); - Observable.create(observeOn(Observable.from(1, 2, 3), Schedulers.immediate())).subscribe(observer); + Observable.from(1, 2, 3).observeOn(Schedulers.immediate()).subscribe(observer); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onNext(2); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java index 9eb07b20ef..02e832d5d8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java @@ -16,7 +16,6 @@ package rx.operators; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.Arrays; @@ -32,6 +31,7 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; import rx.util.functions.Func1; @@ -288,4 +288,17 @@ public void call(Subscriber op) { } }); + + @Test(timeout = 2000) + public void testTakeObserveOn() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + INFINITE_OBSERVABLE.observeOn(Schedulers.newThread()).take(1).subscribe(o); + + verify(o).onNext(1L); + verify(o, never()).onNext(2L); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } } From 446acf5cd8b533dfd4fd26848c9f33aedf0ad95e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 21:53:04 -0800 Subject: [PATCH 02/10] ObserveOn with Backpressure --- .../java/rx/operators/OperatorObserveOn.java | 165 ++++++++++++++---- .../main/java/rx/operators/QueueDrain.java | 106 ----------- .../java/rx/schedulers/TestScheduler.java | 12 +- .../rx/operators/OperatorObserveOnTest.java | 6 +- 4 files changed, 143 insertions(+), 146 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/QueueDrain.java diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index bfbf52905b..418d795634 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -15,53 +15,156 @@ */ package rx.operators; -import rx.Notification; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + import rx.Scheduler; +import rx.Scheduler.Inner; import rx.Subscriber; -import rx.subscriptions.CompositeSubscription; -import rx.util.functions.Action0; +import rx.schedulers.ImmediateScheduler; +import rx.schedulers.TestScheduler; +import rx.schedulers.TrampolineScheduler; +import rx.util.functions.Action1; /** - * Move the observation of events to another thread via Scheduler. - * @param the item type + * Asynchronously notify Observers on the specified Scheduler. + *

+ * */ public class OperatorObserveOn implements Operator { - final Scheduler scheduler; + + private final Scheduler scheduler; + public OperatorObserveOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override - public Subscriber call(final Subscriber t1) { - final QueueDrain qd = new QueueDrain(t1); - final CompositeSubscription csub = new CompositeSubscription(); - t1.add(csub); - return new Subscriber(t1) { - /** Dispatch the notification value. */ - void run(final Notification nt) { - qd.enqueue(new Action0() { - @Override - public void call() { - nt.accept(t1); - } - }); - qd.tryDrainAsync(scheduler, csub); + public Subscriber call(Subscriber child) { + if (scheduler instanceof ImmediateScheduler) { + // avoid overhead, execute directly + return child; + } else if (scheduler instanceof TrampolineScheduler) { + // avoid overhead, execute directly + return child; + } else if (scheduler instanceof TestScheduler) { + // this one will deadlock as it is single-threaded and won't run the scheduled + // work until it manually advances, which it won't be able to do as it will block + return child; + } else { + return new ObserveOnSubscriber(child); + } + + } + + private static Object NULL_SENTINEL = new Object(); + private static Object COMPLETE_SENTINEL = new Object(); + + private static class ErrorSentinel { + final Throwable e; + + ErrorSentinel(Throwable e) { + this.e = e; + } + } + + /** Observe through individual queue per observer. */ + private class ObserveOnSubscriber extends Subscriber { + final Subscriber observer; + private volatile Scheduler.Inner recursiveScheduler; + + private final ArrayBlockingQueue queue = new ArrayBlockingQueue(1); + final AtomicLong counter = new AtomicLong(0); + + public ObserveOnSubscriber(Subscriber observer) { + super(observer); + this.observer = observer; + } + + @Override + public void onNext(final T t) { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + if (t == null) { + queue.put(NULL_SENTINEL); + } else { + queue.put(t); + } + schedule(); + } catch (InterruptedException e) { + onError(e); } - @Override - public void onNext(final T args) { - run(Notification.createOnNext(args)); + } + + @Override + public void onCompleted() { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + queue.put(COMPLETE_SENTINEL); + schedule(); + } catch (InterruptedException e) { + onError(e); } + } - @Override - public void onError(final Throwable e) { - run(Notification.createOnError(e)); + @Override + public void onError(final Throwable e) { + try { + // we want to block for natural back-pressure + // so that the producer waits for each value to be consumed + queue.put(new ErrorSentinel(e)); + schedule(); + } catch (InterruptedException e2) { + // call directly if we can't schedule + observer.onError(e2); } + } + + protected void schedule() { + if (counter.getAndIncrement() == 0) { + if (recursiveScheduler == null) { + add(scheduler.schedule(new Action1() { + + @Override + public void call(Inner inner) { + recursiveScheduler = inner; + pollQueue(); + } + + })); + } else { + recursiveScheduler.schedule(new Action1() { - @Override - public void onCompleted() { - run(Notification.createOnCompleted()); + @Override + public void call(Inner inner) { + pollQueue(); + } + + }); + } } - }; + } + + @SuppressWarnings("unchecked") + private void pollQueue() { + do { + Object v = queue.poll(); + if (v != null) { + if (v == NULL_SENTINEL) { + observer.onNext(null); + } else if (v == COMPLETE_SENTINEL) { + observer.onCompleted(); + } else if (v instanceof ErrorSentinel) { + observer.onError(((ErrorSentinel) v).e); + } else { + observer.onNext((T) v); + } + } + } while (counter.decrementAndGet() > 0); + } + } - + } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/QueueDrain.java b/rxjava-core/src/main/java/rx/operators/QueueDrain.java deleted file mode 100644 index 7e85c237e0..0000000000 --- a/rxjava-core/src/main/java/rx/operators/QueueDrain.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import rx.Scheduler; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.MultipleAssignmentSubscription; -import rx.util.functions.Action0; -import rx.util.functions.Action1; - -/** - * Action queue ensuring that only a single drain caller succeeds at a time. - * This class can be used to execute work without the issues of reentrancy and - * concurrency. - */ -public final class QueueDrain implements Runnable, Action0 { - /** The number of work items. */ - private final AtomicInteger wip = new AtomicInteger(); - /** The action queue. */ - private final BlockingQueue queue = new LinkedBlockingQueue(); - /** The subscription to stop the queue processing. */ - private final Subscription k; - /** - * Constructor which takes a cancellation token. - * @param k the cancellation token (aka subscription). - */ - public QueueDrain(Subscription k) { - this.k = k; - } - /** - * Enqueue an action. - * To execute any queued action, call {@link #tryDrain()} or - * submit this instance to a {@code Scheduler.schedule()} method. - * @param action the action to enqueue, not null - */ - public void enqueue(Action0 action) { - if (!k.isUnsubscribed()) { - queue.add(action); - } - } - /** - * Try draining the queue and executing the actions in it. - */ - public void tryDrain() { - if (wip.incrementAndGet() > 1 || k.isUnsubscribed()) { - return; - } - do { - queue.poll().call(); - } while (wip.decrementAndGet() > 0 && !k.isUnsubscribed()); - } - /** - * Try draining the queue on the given scheduler. - * The method ensures that only one thread is actively draining the - * queue on the given scheduler. - * @param scheduler the scheduler where the draining should happen - * @param cs the composite subscription to track the schedule - */ - public void tryDrainAsync(Scheduler scheduler, final CompositeSubscription cs) { - if (cs.isUnsubscribed() || wip.incrementAndGet() > 1) { - return; - } - // add tracking subscription only if schedule is run to avoid overfilling cs - final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - cs.add(mas); - mas.set(scheduler.schedule(new Action1() { - @Override - public void call(Scheduler.Inner o) { - if (!cs.isUnsubscribed()) { - do { - queue.poll().call(); - } while (wip.decrementAndGet() > 0 && !cs.isUnsubscribed()); - } - cs.remove(mas); - } - })); - } - @Override - public void run() { - // to help the draining of the queue on a ThreadPool/Scheduler - tryDrain(); - } - - @Override - public void call() { - tryDrain(); - } - -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java index 2c10fa4efb..a858a93ccb 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java @@ -19,24 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Action1; -import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue queue = new PriorityQueue(11, new CompareActionsByTime()); + private static long counter = 0; private static class TimedAction { private final long time; private final Action1 action; private final Inner scheduler; + private final long count = counter++; // for differentiating tasks at same time private TimedAction(Inner scheduler, long time, Action1 action) { this.time = time; @@ -53,7 +51,11 @@ public String toString() { private static class CompareActionsByTime implements Comparator { @Override public int compare(TimedAction action1, TimedAction action2) { - return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + if (action1.time == action2.time) { + return Long.valueOf(action1.count).compareTo(Long.valueOf(action2.count)); + } else { + return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + } } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index f180658722..4c975ba6db 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -30,6 +30,7 @@ import rx.Observable; import rx.Observer; +import rx.Scheduler; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; import rx.util.functions.Action0; @@ -141,7 +142,7 @@ public void call() { @Test public void observeOnTheSameSchedulerTwice() { - TestScheduler scheduler = new TestScheduler(); + Scheduler scheduler = Schedulers.immediate(); Observable o = Observable.from(1, 2, 3); Observable o2 = o.observeOn(scheduler); @@ -157,8 +158,6 @@ public void observeOnTheSameSchedulerTwice() { o2.subscribe(observer1); o2.subscribe(observer2); - scheduler.advanceTimeBy(1, TimeUnit.SECONDS); - inOrder1.verify(observer1, times(1)).onNext(1); inOrder1.verify(observer1, times(1)).onNext(2); inOrder1.verify(observer1, times(1)).onNext(3); @@ -172,7 +171,6 @@ public void observeOnTheSameSchedulerTwice() { inOrder2.verify(observer2, times(1)).onCompleted(); verify(observer2, never()).onError(any(Throwable.class)); inOrder2.verifyNoMoreInteractions(); - } @Test From ac83ed9b44d2e1ad7f066bd794b4c676c5a86adf Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 23:06:01 -0800 Subject: [PATCH 03/10] Make Parallel use NewThread until Computation Fixed See https://github.com/Netflix/RxJava/issues/713 It was causing non-deterministic behavior, random test failures and poor performance. --- rxjava-core/src/main/java/rx/Observable.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 4723484c8f..e6a3dd4921 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5296,7 +5296,9 @@ public final Observable onExceptionResumeNext(final Observable r * @see RxJava Wiki: parallel() */ public final Observable parallel(Func1, Observable> f) { - return lift(new OperatorParallel(f, Schedulers.computation())); + // TODO move this back to Schedulers.computation() again once that is properly using eventloops + // see https://github.com/Netflix/RxJava/issues/713 for why this was changed + return lift(new OperatorParallel(f, Schedulers.newThread())); } /** From df7bf90803a686946e09d9ec175f8ab85277e989 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 23:08:17 -0800 Subject: [PATCH 04/10] GroupBy Test Improvement ObserveOn was the wrong mechanism for delaying behavior as it was relying on the buffering of observeOn. Now using delay() to delay the group since observeOn no longer buffers. --- .../src/test/java/rx/operators/OperatorGroupByTest.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java index c808e396bb..92b085ea0f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java @@ -484,21 +484,16 @@ public Integer call(Integer i) { @Override public Observable call(GroupedObservable group) { if (group.getKey() == 0) { - return group.observeOn(Schedulers.newThread()).map(new Func1() { + return group.delay(100, TimeUnit.MILLISECONDS).map(new Func1() { @Override public Integer call(Integer t) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - e.printStackTrace(); - } return t * 10; } }); } else { - return group.observeOn(Schedulers.newThread()); + return group; } } }) From eea02d87634d77afa4a417f4dd0bae84baacdf77 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 23:09:08 -0800 Subject: [PATCH 05/10] Fast Producer / Slow Consumer Backpressure Tests for ObserveOn --- .../main/java/rx/schedulers/Schedulers.java | 4 + .../rx/operators/OperatorObserveOnTest.java | 97 +++++++++++++++++++ .../rx/schedulers/AbstractSchedulerTests.java | 67 ------------- 3 files changed, 101 insertions(+), 67 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index d9ad39aabb..bb0a75e358 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -183,4 +183,8 @@ public Thread newThread(Runnable r) { return result; } + + public static TestScheduler test() { + return new TestScheduler(); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 4c975ba6db..21906ccc30 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -29,10 +29,15 @@ import org.mockito.stubbing.Answer; import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; +import rx.Subscription; +import rx.schedulers.ImmediateScheduler; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; +import rx.schedulers.TrampolineScheduler; +import rx.subscriptions.BooleanSubscription; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; @@ -305,6 +310,98 @@ public void call(Integer t1) { }); } + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread()); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io()); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline()); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test()); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation()); + } + + private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler) throws InterruptedException { + final AtomicInteger countEmitted = new AtomicInteger(); + final AtomicInteger countTaken = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + final BooleanSubscription s = BooleanSubscription.create(); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int i = 1; + while (!s.isUnsubscribed() && i <= 100) { + System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); + o.onNext(i++); + } + o.onCompleted(); + } + }); + t.setDaemon(true); + t.start(); + return s; + } + }).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + countEmitted.incrementAndGet(); + } + }).doOnCompleted(new Action0() { + + @Override + public void call() { + System.out.println("-------- Done Emitting from Source ---------"); + } + }).observeOn(scheduler).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); + //force it to be slower than the producer + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + countTaken.incrementAndGet(); + } + }).take(10).toBlockingObservable().last(); + + if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) { + // since there is no concurrency it will block and only emit as many as it can process + assertEquals(10, countEmitted.get()); + } else { + // the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline + // NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class. + assertEquals(12, countEmitted.get()); + } + // number received after take (but take will filter any extra) + assertEquals(10, value); + // so we also want to check the doOnNext after observeOn to see if it got unsubscribed + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + // we expect only 10 to make it through the observeOn side + assertEquals(10, countTaken.get()); + } + private static int randomIntFrom0to100() { // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml long x = System.nanoTime(); diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index bfea193310..046710bfaf 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -55,73 +55,6 @@ public abstract class AbstractSchedulerTests { */ protected abstract Scheduler getScheduler(); - @Test - public final void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException { - final AtomicInteger countEmitted = new AtomicInteger(); - final AtomicInteger countTaken = new AtomicInteger(); - int value = Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer o) { - final BooleanSubscription s = BooleanSubscription.create(); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - int i = 1; - while (!s.isUnsubscribed() && i <= 100) { - System.out.println("onNext from fast producer: " + i); - o.onNext(i++); - } - o.onCompleted(); - } - }); - t.setDaemon(true); - t.start(); - return s; - } - }).doOnNext(new Action1() { - - @Override - public void call(Integer i) { - countEmitted.incrementAndGet(); - } - }).doOnCompleted(new Action0() { - - @Override - public void call() { - System.out.println("-------- Done Emitting from Source ---------"); - } - }).observeOn(getScheduler()).doOnNext(new Action1() { - - @Override - public void call(Integer i) { - System.out.println(">> onNext to slowConsumer pre-take: " + i); - //force it to be slower than the producer - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - countTaken.incrementAndGet(); - } - }).take(10).toBlockingObservable().last(); - - if (getScheduler() instanceof TrampolineScheduler || getScheduler() instanceof ImmediateScheduler) { - // since there is no concurrency it will block and only emit as many as it can process - assertEquals(10, countEmitted.get()); - } else { - // they will all emit because the consumer is running slow - assertEquals(100, countEmitted.get()); - } - // number received after take (but take will filter any extra) - assertEquals(10, value); - // so we also want to check the doOnNext after observeOn to see if it got unsubscribed - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - // we expect only 10 to make it through the observeOn side - assertEquals(10, countTaken.get()); - } - @Test public void testNestedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); From c758d1370419141528903380d8689368e1c68630 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 23:10:30 -0800 Subject: [PATCH 06/10] Custom InterruptibleBlockingQueue for ObserveOn Since we are blocking the producer on* notifications we need to interrupt it on unsubscribe events. I need to do it on the data structure and not the thread as the thread could change for each onNext and that could have unexpected consequences. --- .../java/rx/operators/OperatorObserveOn.java | 89 ++++++++++++++++++- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index 418d795634..b2e10593b5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -15,7 +15,7 @@ */ package rx.operators; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; @@ -24,11 +24,25 @@ import rx.schedulers.ImmediateScheduler; import rx.schedulers.TestScheduler; import rx.schedulers.TrampolineScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Action1; /** - * Asynchronously notify Observers on the specified Scheduler. + * Delivers events on the specified Scheduler. *

+ * This provides backpressure by blocking the incoming onNext when there is already one in the queue. + *

+ * This means that at any given time the max number of "onNext" in flight is 3: + * -> 1 being delivered on the Scheduler + * -> 1 in the queue waiting for the Scheduler + * -> 1 blocking on the queue waiting to deliver it + * + * I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler + * can loop and have something to do each time around to optimize for avoiding rescheduling when it + * can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop + * thus if the queue is empty it exits and next time something is added it will reschedule. + * * */ public class OperatorObserveOn implements Operator { @@ -73,7 +87,7 @@ private class ObserveOnSubscriber extends Subscriber { final Subscriber observer; private volatile Scheduler.Inner recursiveScheduler; - private final ArrayBlockingQueue queue = new ArrayBlockingQueue(1); + private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(); final AtomicLong counter = new AtomicLong(0); public ObserveOnSubscriber(Subscriber observer) { @@ -93,7 +107,9 @@ public void onNext(final T t) { } schedule(); } catch (InterruptedException e) { - onError(e); + if (!isUnsubscribed()) { + onError(e); + } } } @@ -125,6 +141,18 @@ public void onError(final Throwable e) { protected void schedule() { if (counter.getAndIncrement() == 0) { if (recursiveScheduler == null) { + // first time through, register a Subscription + // that can interrupt this thread + add(Subscriptions.create(new Action0() { + + @Override + public void call() { + // we have to interrupt the parent thread because + // it can be blocked on queue.put + queue.interrupt(); + } + + })); add(scheduler.schedule(new Action1() { @Override @@ -167,4 +195,57 @@ private void pollQueue() { } + /** + * Same behavior as ArrayBlockingQueue(1) except that we can interrupt/unsubscribe it. + */ + private class InterruptibleBlockingQueue { + + private final Semaphore semaphore = new Semaphore(1); + private volatile Object item; + private volatile boolean interrupted = false; + + public Object poll() { + if (interrupted) { + return null; + } + if (item == null) { + return null; + } + try { + return item; + } finally { + item = null; + semaphore.release(); + } + } + + /** + * Add an Object, blocking if an item is already in the queue. + * + * @param o + * @throws InterruptedException + */ + public void put(Object o) throws InterruptedException { + if (interrupted) { + throw new InterruptedException("Interrupted by Unsubscribe"); + } + semaphore.acquire(); + if (interrupted) { + throw new InterruptedException("Interrupted by Unsubscribe"); + } + if (o == null) { + throw new IllegalArgumentException("Can not put null"); + } + item = o; + } + + /** + * Used to unsubscribe and interrupt the producer if blocked in put() + */ + public void interrupt() { + interrupted = true; + semaphore.release(); + } + } + } \ No newline at end of file From 974b4ad3c9621c94efa6c9702e59613dd1dbb5a7 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 23:11:17 -0800 Subject: [PATCH 07/10] Changed to use SubscribeOn instead of ObserveOn for Async Behavior The ObserveOn operator is for moving where it executes, not making it async. SubscribeOn makes it async. --- .../test/java/rx/subjects/ReplaySubjectTest.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index 14a6bbb039..3a360300f5 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -311,7 +311,8 @@ public void onNext(String v) { subject.onNext("two"); assertEquals("two", lastValueForObserver1.get()); - Subscription s2 = subject.observeOn(Schedulers.newThread()).subscribe(observer2); + // use subscribeOn to make this async otherwise we deadlock as we are using CountDownLatches + Subscription s2 = subject.subscribeOn(Schedulers.newThread()).subscribe(observer2); System.out.println("before waiting for one"); @@ -321,12 +322,23 @@ public void onNext(String v) { System.out.println("after waiting for one"); subject.onNext("three"); + + System.out.println("sent three"); + // if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet assertEquals("three", lastValueForObserver1.get()); + + System.out.println("about to send onCompleted"); + subject.onCompleted(); + System.out.println("completed subject"); + // release makeSlow.countDown(); + + System.out.println("makeSlow released"); + completed.await(); // all of them should be emitted with the last being "three" assertEquals("three", lastValueForObserver2.get()); From 10e8b78a089c062fc6b67692641b08d7c8d2e8db Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 23:18:42 -0800 Subject: [PATCH 08/10] Test to prove non-blocking despite blocking onNext --- .../rx/operators/OperatorObserveOnTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 21906ccc30..5afa2729d6 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; import org.mockito.InOrder; @@ -310,6 +311,41 @@ public void call(Integer t1) { }); } + @Test + public void testNonBlockingOuterWhileBlockingOnNext() throws InterruptedException { + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicLong completeTime = new AtomicLong(); + // use subscribeOn to make async, observeOn to move + Observable.range(1, 1000).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("onCompleted"); + completeTime.set(System.nanoTime()); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + + } + + }); + + long afterSubscribeTime = System.nanoTime(); + System.out.println("After subscribe: " + latch.getCount()); + assertEquals(1, latch.getCount()); + latch.await(); + assertTrue(completeTime.get() > afterSubscribeTime); + System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime)); + } + @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread()); From e657d22cff7c2d2c2ae00ed38ee25d5a93b8995f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 8 Feb 2014 14:33:00 -0800 Subject: [PATCH 09/10] Performance Testing --- .../java/rx/ObservableCreatePerformance.java | 4 ++ .../OperatorFromIterablePerformance.java | 4 ++ .../rx/operators/OperatorMapPerformance.java | 4 ++ .../operators/OperatorMergePerformance.java | 4 ++ .../OperatorObserveOnPerformance.java | 46 +++++++++++++++++++ .../rx/operators/OperatorTakePerformance.java | 4 ++ .../rx/operators/OperatorZipPerformance.java | 5 +- .../rx/perf/AbstractPerformanceTester.java | 20 +++++--- .../CompositeSubscriptionAddRemovePerf.java | 5 ++ 9 files changed, 88 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java diff --git a/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java b/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java index f55dd998bd..6e892a324b 100644 --- a/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java +++ b/rxjava-core/src/perf/java/rx/ObservableCreatePerformance.java @@ -7,6 +7,10 @@ public class ObservableCreatePerformance extends AbstractPerformanceTester { + ObservableCreatePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final ObservableCreatePerformance spt = new ObservableCreatePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java index acb41eb576..ea3f2c18c3 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorFromIterablePerformance.java @@ -11,6 +11,10 @@ public class OperatorFromIterablePerformance extends AbstractPerformanceTester { + OperatorFromIterablePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorFromIterablePerformance spt = new OperatorFromIterablePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java index 2f73af1d0e..dc85a1d6e6 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorMapPerformance.java @@ -8,6 +8,10 @@ public class OperatorMapPerformance extends AbstractPerformanceTester { + OperatorMapPerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorMapPerformance spt = new OperatorMapPerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java index de4b204ff1..99ce4deac9 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorMergePerformance.java @@ -9,6 +9,10 @@ public class OperatorMergePerformance extends AbstractPerformanceTester { + OperatorMergePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorMergePerformance spt = new OperatorMergePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java new file mode 100644 index 0000000000..c25857ea54 --- /dev/null +++ b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java @@ -0,0 +1,46 @@ +package rx.operators; + +import rx.Observable; +import rx.perf.AbstractPerformanceTester; +import rx.perf.IntegerSumObserver; +import rx.schedulers.Schedulers; +import rx.util.functions.Action0; + +public class OperatorObserveOnPerformance extends AbstractPerformanceTester { + + private static long reps = 1000000; + + OperatorObserveOnPerformance() { + super(reps); + } + + public static void main(String args[]) { + + final OperatorObserveOnPerformance spt = new OperatorObserveOnPerformance(); + try { + spt.runTest(new Action0() { + + @Override + public void call() { + spt.timeObserveOn(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + /** + * Observable.from(1L).observeOn() + * + */ + public long timeObserveOn() { + + Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread()); + IntegerSumObserver o = new IntegerSumObserver(); + s.subscribe(o); + return o.sum; + } + +} \ No newline at end of file diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java index b64b3c9de1..3e8e8be702 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorTakePerformance.java @@ -7,6 +7,10 @@ public class OperatorTakePerformance extends AbstractPerformanceTester { + OperatorTakePerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorTakePerformance spt = new OperatorTakePerformance(); diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java index 2f22b81303..0d929b6a33 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorZipPerformance.java @@ -3,12 +3,15 @@ import rx.Observable; import rx.perf.AbstractPerformanceTester; import rx.perf.IntegerSumObserver; -import rx.perf.LongSumObserver; import rx.util.functions.Action0; import rx.util.functions.Func2; public class OperatorZipPerformance extends AbstractPerformanceTester { + OperatorZipPerformance() { + super(REPETITIONS); + } + public static void main(String args[]) { final OperatorZipPerformance spt = new OperatorZipPerformance(); diff --git a/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java b/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java index 56d92278ab..468f900f88 100644 --- a/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java +++ b/rxjava-core/src/perf/java/rx/perf/AbstractPerformanceTester.java @@ -6,9 +6,15 @@ public abstract class AbstractPerformanceTester { - public static final int REPETITIONS = 5 * 1000 * 1000; + public static final long REPETITIONS = 5 * 1000 * 1000; public static final int NUM_PRODUCERS = 1; + private final long repetitions; + + protected AbstractPerformanceTester(long repetitions) { + this.repetitions = repetitions; + } + public final void runTest(Action0 action) throws InterruptedException { for (int runNum = 0; runNum < 15; runNum++) { System.gc(); @@ -19,7 +25,7 @@ public final void runTest(Action0 action) throws InterruptedException { action.call(); long duration = System.nanoTime() - start; - long opsPerSec = (REPETITIONS * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration; + long opsPerSec = (repetitions * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration; System.out.printf("Run: %d - %,d ops/sec \n", Integer.valueOf(runNum), Long.valueOf(opsPerSec)); @@ -39,14 +45,14 @@ public final void runTest(Action0 action) throws InterruptedException { */ public long baseline() { LongSumObserver o = new LongSumObserver(); - for (long l = 0; l < REPETITIONS; l++) { + for (long l = 0; l < repetitions; l++) { o.onNext(l); } o.onCompleted(); return o.sum; } - - public static Iterable ITERABLE_OF_REPETITIONS = new Iterable() { + + public Iterable ITERABLE_OF_REPETITIONS = new Iterable() { @Override public Iterator iterator() { @@ -55,7 +61,7 @@ public Iterator iterator() { @Override public boolean hasNext() { - return count <= REPETITIONS; + return count <= repetitions; } @Override @@ -71,5 +77,5 @@ public void remove() { }; }; }; - + } diff --git a/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java b/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java index b582e9edd0..487f34b558 100644 --- a/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java +++ b/rxjava-core/src/perf/java/rx/subscriptions/CompositeSubscriptionAddRemovePerf.java @@ -4,6 +4,11 @@ import rx.util.functions.Action0; public class CompositeSubscriptionAddRemovePerf extends AbstractPerformanceTester { + + CompositeSubscriptionAddRemovePerf() { + super(REPETITIONS); + } + public static void main(String[] args) { final CompositeSubscriptionAddRemovePerf spt = new CompositeSubscriptionAddRemovePerf(); try { From d5e5df402bf4d18075dbf11f311793399a682e2e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 8 Feb 2014 15:08:26 -0800 Subject: [PATCH 10/10] ObserveOn with Buffer Size --- rxjava-core/src/main/java/rx/Observable.java | 20 ++- .../java/rx/operators/OperatorObserveOn.java | 132 +++++++++++++----- .../OperatorObserveOnPerformance.java | 37 ++++- .../rx/operators/OperatorObserveOnTest.java | 36 +++-- 4 files changed, 172 insertions(+), 53 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e6a3dd4921..7c3ead37a2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5139,8 +5139,7 @@ public final ConnectableObservable multicast(Subject * * @@ -5154,6 +5153,23 @@ public final Observable observeOn(Scheduler scheduler) { return lift(new OperatorObserveOn(scheduler)); } + /** + * Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size. + *

+ * + * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @param bufferSize + * that will be rounded up to the next power of 2 + * @return the source Observable modified so that its {@link Observer}s are notified on the + * specified {@link Scheduler} + * @see RxJava Wiki: observeOn() + */ + public final Observable observeOn(Scheduler scheduler, int bufferSize) { + return lift(new OperatorObserveOn(scheduler, bufferSize)); + } + /** * Filters the items emitted by an Observable, only emitting those of the specified type. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index b2e10593b5..1b818013e9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -48,9 +48,35 @@ public class OperatorObserveOn implements Operator { private final Scheduler scheduler; + private final int bufferSize; - public OperatorObserveOn(Scheduler scheduler) { + /** + * + * @param scheduler + * @param bufferSize + * that will be rounded up to the next power of 2 + */ + public OperatorObserveOn(Scheduler scheduler, int bufferSize) { this.scheduler = scheduler; + this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize); + } + + public OperatorObserveOn(Scheduler scheduler) { + this(scheduler, 1); + } + + private static int roundToNextPowerOfTwoIfNecessary(int num) { + if ((num & -num) == num) { + return num; + } else { + int result = 1; + while (num != 0) + { + num >>= 1; + result <<= 1; + } + return result; + } } @Override @@ -87,7 +113,7 @@ private class ObserveOnSubscriber extends Subscriber { final Subscriber observer; private volatile Scheduler.Inner recursiveScheduler; - private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(); + private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize); final AtomicLong counter = new AtomicLong(0); public ObserveOnSubscriber(Subscriber observer) { @@ -101,9 +127,9 @@ public void onNext(final T t) { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed if (t == null) { - queue.put(NULL_SENTINEL); + queue.addBlocking(NULL_SENTINEL); } else { - queue.put(t); + queue.addBlocking(t); } schedule(); } catch (InterruptedException e) { @@ -118,7 +144,7 @@ public void onCompleted() { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - queue.put(COMPLETE_SENTINEL); + queue.addBlocking(COMPLETE_SENTINEL); schedule(); } catch (InterruptedException e) { onError(e); @@ -130,7 +156,7 @@ public void onError(final Throwable e) { try { // we want to block for natural back-pressure // so that the producer waits for each value to be consumed - queue.put(new ErrorSentinel(e)); + queue.addBlocking(new ErrorSentinel(e)); schedule(); } catch (InterruptedException e2) { // call directly if we can't schedule @@ -195,37 +221,34 @@ private void pollQueue() { } - /** - * Same behavior as ArrayBlockingQueue(1) except that we can interrupt/unsubscribe it. - */ private class InterruptibleBlockingQueue { - private final Semaphore semaphore = new Semaphore(1); - private volatile Object item; + private final Semaphore semaphore; private volatile boolean interrupted = false; - public Object poll() { - if (interrupted) { - return null; - } - if (item == null) { - return null; - } - try { - return item; - } finally { - item = null; - semaphore.release(); - } + private final Object[] buffer; + + private AtomicLong tail = new AtomicLong(); + private AtomicLong head = new AtomicLong(); + private final int capacity; + private final int mask; + + public InterruptibleBlockingQueue(final int size) { + this.semaphore = new Semaphore(size); + this.capacity = size; + this.mask = size - 1; + buffer = new Object[size]; } /** - * Add an Object, blocking if an item is already in the queue. - * - * @param o - * @throws InterruptedException + * Used to unsubscribe and interrupt the producer if blocked in put() */ - public void put(Object o) throws InterruptedException { + public void interrupt() { + interrupted = true; + semaphore.release(); + } + + public void addBlocking(final Object e) throws InterruptedException { if (interrupted) { throw new InterruptedException("Interrupted by Unsubscribe"); } @@ -233,19 +256,54 @@ public void put(Object o) throws InterruptedException { if (interrupted) { throw new InterruptedException("Interrupted by Unsubscribe"); } - if (o == null) { + if (e == null) { throw new IllegalArgumentException("Can not put null"); } - item = o; + + if (offer(e)) { + return; + } else { + throw new IllegalStateException("Queue is full"); + } } - /** - * Used to unsubscribe and interrupt the producer if blocked in put() - */ - public void interrupt() { - interrupted = true; - semaphore.release(); + private boolean offer(final Object e) { + final long _t = tail.get(); + if (_t - head.get() == capacity) { + // queue is full + return false; + } + int index = (int) (_t & mask); + buffer[index] = e; + // move the tail forward + tail.lazySet(_t + 1); + + return true; } + + public Object poll() { + if (interrupted) { + return null; + } + final long _h = head.get(); + if (tail.get() == _h) { + // nothing available + return null; + } + int index = (int) (_h & mask); + + // fetch the item + Object v = buffer[index]; + // allow GC to happen + buffer[index] = null; + // increment and signal we're done + head.lazySet(_h + 1); + if (v != null) { + semaphore.release(); + } + return v; + } + } } \ No newline at end of file diff --git a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java index c25857ea54..f14b2a25b3 100644 --- a/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java +++ b/rxjava-core/src/perf/java/rx/operators/OperatorObserveOnPerformance.java @@ -8,7 +8,7 @@ public class OperatorObserveOnPerformance extends AbstractPerformanceTester { - private static long reps = 1000000; + private static long reps = 10000; OperatorObserveOnPerformance() { super(reps); @@ -34,10 +34,43 @@ public void call() { /** * Observable.from(1L).observeOn() * + * --- version 0.17.1 => with queue size == 1 + * + * Run: 10 - 115,033 ops/sec + * Run: 11 - 118,155 ops/sec + * Run: 12 - 120,526 ops/sec + * Run: 13 - 115,035 ops/sec + * Run: 14 - 116,102 ops/sec + * + * --- version 0.17.1 => with queue size == 16 + * + * Run: 10 - 850,412 ops/sec + * Run: 11 - 711,642 ops/sec + * Run: 12 - 788,332 ops/sec + * Run: 13 - 1,064,056 ops/sec + * Run: 14 - 656,857 ops/sec + * + * --- version 0.17.1 => with queue size == 1000000 + * + * Run: 10 - 5,162,622 ops/sec + * Run: 11 - 5,271,481 ops/sec + * Run: 12 - 4,442,470 ops/sec + * Run: 13 - 5,149,330 ops/sec + * Run: 14 - 5,146,680 ops/sec + * + * --- version 0.16.1 + * + * Run: 10 - 27,098,802 ops/sec + * Run: 11 - 24,204,284 ops/sec + * Run: 12 - 27,208,663 ops/sec + * Run: 13 - 26,879,552 ops/sec + * Run: 14 - 26,658,846 ops/sec + * + * */ public long timeObserveOn() { - Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread()); + Observable s = Observable.range(1, (int) reps).observeOn(Schedulers.newThread(), 16); IntegerSumObserver o = new IntegerSumObserver(); s.subscribe(o); return o.sum; diff --git a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 5afa2729d6..7e522eccef 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -348,30 +348,35 @@ public void onNext(Integer t) { @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 1); + } + + @Test + public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThreadAndBuffer8() throws InterruptedException { + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread(), 8); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io(), 1); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline(), 1); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test(), 1); } @Test public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException { - testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation()); + testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation(), 1); } - private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler) throws InterruptedException { + private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler, int bufferSize) throws InterruptedException { final AtomicInteger countEmitted = new AtomicInteger(); final AtomicInteger countTaken = new AtomicInteger(); int value = Observable.create(new OnSubscribeFunc() { @@ -385,7 +390,7 @@ public Subscription onSubscribe(final Observer o) { public void run() { int i = 1; while (!s.isUnsubscribed() && i <= 100) { - System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); + // System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i); o.onNext(i++); } o.onCompleted(); @@ -405,13 +410,13 @@ public void call(Integer i) { @Override public void call() { - System.out.println("-------- Done Emitting from Source ---------"); + // System.out.println("-------- Done Emitting from Source ---------"); } - }).observeOn(scheduler).doOnNext(new Action1() { + }).observeOn(scheduler, bufferSize).doOnNext(new Action1() { @Override public void call(Integer i) { - System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); + // System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i); //force it to be slower than the producer try { Thread.sleep(10); @@ -420,7 +425,14 @@ public void call(Integer i) { } countTaken.incrementAndGet(); } - }).take(10).toBlockingObservable().last(); + }).take(10).doOnNext(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("*********** value: " + t); + } + + }).toBlockingObservable().last(); if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) { // since there is no concurrency it will block and only emit as many as it can process @@ -428,7 +440,7 @@ public void call(Integer i) { } else { // the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline // NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class. - assertEquals(12, countEmitted.get()); + assertEquals(11, countEmitted.get(), bufferSize); // can be up to 11 + bufferSize } // number received after take (but take will filter any extra) assertEquals(10, value);