From 1ea15baf5b76d407c1aa97331c3857c95d229bd6 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 17 Sep 2013 22:39:51 -0700 Subject: [PATCH] Fix ObserveOn, NewThreadScheduler and ScheduledObserver bugs @headinthebox and I were working on some code and found differences in behavior between Rx.Net and RxJava with observeOn. This commit should fix that. --- .../rx/concurrency/NewThreadScheduler.java | 78 +++++++++--- .../java/rx/operators/OperationObserveOn.java | 7 +- .../java/rx/operators/OperationRetry.java | 25 ++-- .../java/rx/operators/ScheduledObserver.java | 112 ++++++++++++----- .../subscriptions/CompositeSubscription.java | 27 ++-- .../MultipleAssignmentSubscription.java | 45 +++++++ .../java/rx/subscriptions/Subscriptions.java | 52 ++++++-- .../src/test/java/rx/ObserveOnTests.java | 118 ++++++++++++++++++ 8 files changed, 380 insertions(+), 84 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java create mode 100644 rxjava-core/src/test/java/rx/ObserveOnTests.java diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index d8e178bc0d..c33918353b 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -15,12 +15,15 @@ */ package rx.concurrency; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; import rx.Subscription; -import rx.operators.SafeObservableSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -29,27 +32,74 @@ * Schedules work on a new thread. */ public class NewThreadScheduler extends Scheduler { - private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + + private final static NewThreadScheduler INSTANCE = new NewThreadScheduler(); + private final static AtomicLong count = new AtomicLong(); public static NewThreadScheduler getInstance() { return INSTANCE; } - @Override - public Subscription schedule(final T state, final Func2 action) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - final Scheduler _scheduler = this; + private NewThreadScheduler() { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - subscription.wrap(action.call(_scheduler, state)); - } - }, "RxNewThreadScheduler"); + } - t.start(); + private static class EventLoopScheduler extends Scheduler { + private final ExecutorService executor; - return subscription; + private EventLoopScheduler() { + executor = Executors.newFixedThreadPool(1, new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + } + }); + } + + @Override + public Subscription schedule(final T state, final Func2 action) { + final Scheduler _scheduler = this; + return Subscriptions.from(executor.submit(new Runnable() { + + @Override + public void run() { + action.call(_scheduler, state); + } + })); + } + + @Override + public Subscription schedule(final T state, final Func2 action, final long delayTime, final TimeUnit unit) { + // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep + // we will instead schedule the event then launch the thread after the delay has passed + final Scheduler _scheduler = this; + final CompositeSubscription subscription = new CompositeSubscription(); + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + if (!subscription.isUnsubscribed()) { + // when the delay has passed we now do the work on the actual scheduler + Subscription s = _scheduler.schedule(state, action); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + } + }, delayTime, unit); + + // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.create(f)); + + return subscription; + } + + } + + @Override + public Subscription schedule(final T state, final Func2 action) { + EventLoopScheduler s = new EventLoopScheduler(); + return s.schedule(state, action); } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index f947333456..aef1cb9548 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; @@ -33,6 +34,8 @@ import rx.Subscription; import rx.concurrency.ImmediateScheduler; import rx.concurrency.Schedulers; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Func2; /** * Asynchronously notify Observers on the specified Scheduler. @@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer observer) { // do nothing if we request ImmediateScheduler so we don't invoke overhead return source.subscribe(observer); } else { - return source.subscribe(new ScheduledObserver(observer, scheduler)); + CompositeSubscription s = new CompositeSubscription(); + s.add(source.subscribe(new ScheduledObserver(s, observer, scheduler))); + return s; } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationRetry.java b/rxjava-core/src/main/java/rx/operators/OperationRetry.java index 977373cb01..0f664ee3ec 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationRetry.java +++ b/rxjava-core/src/main/java/rx/operators/OperationRetry.java @@ -26,11 +26,13 @@ import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; +import rx.util.functions.Func2; public class OperationRetry { @@ -58,17 +60,19 @@ public Retry(Observable source, int retryCount) { @Override public Subscription onSubscribe(Observer observer) { - subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer))); + MultipleAssignmentSubscription rescursiveSubscription = new MultipleAssignmentSubscription(); + subscription.add(Schedulers.currentThread().schedule(rescursiveSubscription, attemptSubscription(observer))); + subscription.add(rescursiveSubscription); return subscription; } - private Action0 attemptSubscription(final Observer observer) { - return new Action0() { + private Func2 attemptSubscription(final Observer observer) { + return new Func2() { @Override - public void call() { + public Subscription call(final Scheduler scheduler, final MultipleAssignmentSubscription rescursiveSubscription) { attempts.incrementAndGet(); - source.subscribe(new Observer() { + return source.subscribe(new Observer() { @Override public void onCompleted() { @@ -79,10 +83,8 @@ public void onCompleted() { public void onError(Throwable e) { if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) { // retry again - // remove the last subscription since we have completed (so as we retry we don't build up a huge list) - subscription.removeLast(); - // add the new subscription and schedule a retry - subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer))); + // add the new subscription and schedule a retry recursively + rescursiveSubscription.setSubscription(scheduler.schedule(rescursiveSubscription, attemptSubscription(observer))); } else { // give up and pass the failure observer.onError(e); @@ -96,6 +98,7 @@ public void onNext(T v) { }); } + }; } @@ -157,7 +160,7 @@ public void testRetrySuccess() { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } - + @Test public void testInfiniteRetry() { int NUM_FAILURES = 20; diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index a5eff1b852..ed1e83e299 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -16,21 +16,28 @@ package rx.operators; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import rx.Notification; import rx.Observer; import rx.Scheduler; -import rx.util.functions.Action0; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Func2; /* package */class ScheduledObserver implements Observer { private final Observer underlying; private final Scheduler scheduler; + private final CompositeSubscription parentSubscription; + private final EventLoop eventLoop = new EventLoop(); private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - private final AtomicInteger counter = new AtomicInteger(0); + private final AtomicBoolean started = new AtomicBoolean(); - public ScheduledObserver(Observer underlying, Scheduler scheduler) { + public ScheduledObserver(CompositeSubscription s, Observer underlying, Scheduler scheduler) { + this.parentSubscription = s; this.underlying = underlying; this.scheduler = scheduler; } @@ -50,46 +57,83 @@ public void onNext(final T args) { enqueue(new Notification(args)); } + final AtomicInteger counter = new AtomicInteger(); + private void enqueue(Notification notification) { - // this must happen before 'counter' is used to provide synchronization between threads + // this must happen before synchronization between threads queue.offer(notification); - // we now use counter to atomically determine if we need to start processing or not - // it will be 0 if it's the first notification or the scheduler has finished processing work - // and we need to start doing it again - if (counter.getAndIncrement() == 0) { - processQueue(); + /** + * If the counter is currently at 0 (before incrementing with this addition) + * we will schedule the work. + */ + if (counter.getAndIncrement() <= 0) { + if (!started.get() && started.compareAndSet(false, true)) { + // first time we use the parent scheduler to start the event loop + MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription(); + parentSubscription.add(scheduler.schedule(recursiveSubscription, eventLoop)); + parentSubscription.add(recursiveSubscription); + } else { + // subsequent times we reschedule existing one + eventLoop.reschedule(); + } } } - private void processQueue() { - scheduler.schedule(new Action0() { - @Override - public void call() { - Notification not = queue.poll(); - - switch (not.getKind()) { - case OnNext: - underlying.onNext(not.getValue()); - break; - case OnError: - underlying.onError(not.getThrowable()); - break; - case OnCompleted: - underlying.onCompleted(); - break; - default: - throw new IllegalStateException("Unknown kind of notification " + not); + private class EventLoop implements Func2 { - } + volatile Scheduler _recursiveScheduler; + volatile MultipleAssignmentSubscription _recursiveSubscription; - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - scheduler.schedule(this); - } + public void reschedule() { + _recursiveSubscription.setSubscription(_recursiveScheduler.schedule(_recursiveSubscription, this)); + } + @Override + public Subscription call(Scheduler s, MultipleAssignmentSubscription recursiveSubscription) { + /* + * -------------------------------------------------------------------------------------- + * Set these the first time through so we can externally trigger recursive execution again + */ + if (_recursiveScheduler == null) { + _recursiveScheduler = s; + } + if (_recursiveSubscription == null) { + _recursiveSubscription = recursiveSubscription; } - }); + /* + * Back to regular flow + * -------------------------------------------------------------------------------------- + */ + + do { + Notification notification = queue.poll(); + // if we got a notification, send it + if (notification != null) { + + // if unsubscribed stop working + if (parentSubscription.isUnsubscribed()) { + return parentSubscription; + } + // process notification + + switch (notification.getKind()) { + case OnNext: + underlying.onNext(notification.getValue()); + break; + case OnError: + underlying.onError(notification.getThrowable()); + break; + case OnCompleted: + underlying.onCompleted(); + break; + default: + throw new IllegalStateException("Unknown kind of notification " + notification); + } + } + } while (counter.decrementAndGet() > 0); + + return parentSubscription; + } } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 330657cd5d..6ca5c8a699 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -42,18 +42,24 @@ public class CompositeSubscription implements Subscription { * TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach */ private AtomicBoolean unsubscribed = new AtomicBoolean(false); - private final LinkedBlockingDeque subscriptions = new LinkedBlockingDeque(); + private final ConcurrentHashMap subscriptions = new ConcurrentHashMap(); public CompositeSubscription(List subscriptions) { - this.subscriptions.addAll(subscriptions); + for (Subscription s : subscriptions) { + this.subscriptions.put(s, Boolean.TRUE); + } } public CompositeSubscription(Subscription... subscriptions) { for (Subscription s : subscriptions) { - this.subscriptions.add(s); + this.subscriptions.put(s, Boolean.TRUE); } } + public void remove(Subscription s) { + this.subscriptions.remove(s); + } + public boolean isUnsubscribed() { return unsubscribed.get(); } @@ -62,24 +68,15 @@ public synchronized void add(Subscription s) { if (unsubscribed.get()) { s.unsubscribe(); } else { - subscriptions.add(s); + subscriptions.put(s, Boolean.TRUE); } } - /** - * Remove the last Subscription that was added. - * - * @return Subscription or null if none exists - */ - public synchronized Subscription removeLast() { - return subscriptions.pollLast(); - } - @Override public synchronized void unsubscribe() { if (unsubscribed.compareAndSet(false, true)) { Collection es = null; - for (Subscription s : subscriptions) { + for (Subscription s : subscriptions.keySet()) { try { s.unsubscribe(); } catch (Throwable e) { diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java new file mode 100644 index 0000000000..2af6501425 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -0,0 +1,45 @@ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observable; +import rx.Subscription; + +/** + * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed. + * + * @see Rx.Net equivalent MultipleAssignmentDisposable + */ +public class MultipleAssignmentSubscription implements Subscription { + + private final AtomicBoolean unsubscribed = new AtomicBoolean(false); + private AtomicReference subscription = new AtomicReference(); + + public boolean isUnsubscribed() { + return unsubscribed.get(); + } + + @Override + public synchronized void unsubscribe() { + unsubscribed.set(true); + Subscription s = getSubscription(); + if (s != null) { + s.unsubscribe(); + } + + } + + public synchronized void setSubscription(Subscription s) { + if (unsubscribed.get()) { + s.unsubscribe(); + } else { + subscription.set(s); + } + } + + public Subscription getSubscription() { + return subscription.get(); + } + +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index 5d52a69cb2..032a0eaece 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,7 +26,7 @@ public class Subscriptions { /** * A {@link Subscription} that does nothing. - * + * * @return {@link Subscription} */ public static Subscription empty() { @@ -35,8 +35,9 @@ public static Subscription empty() { /** * A {@link Subscription} which invokes the given {@link Action0} when unsubscribed. - * - * @param unsubscribe Action to invoke on unsubscribe. + * + * @param unsubscribe + * Action to invoke on unsubscribe. * @return {@link Subscription} */ public static Subscription create(final Action0 unsubscribe) { @@ -52,12 +53,32 @@ public void unsubscribe() { /** * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed. - * - * + * + * * @param f * {@link Future} * @return {@link Subscription} */ + public static Subscription from(final Future f) { + return new Subscription() { + + @Override + public void unsubscribe() { + f.cancel(true); + } + + }; + } + + /** + * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed. + * + * + * @param f + * {@link Future} + * @return {@link Subscription} + * @deprecated Use {@link #from(Future)} instead + */ public static Subscription create(final Future f) { return new Subscription() { @@ -71,10 +92,23 @@ public void unsubscribe() { /** * A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together. - * + * + * @param subscriptions + * Subscriptions to group together + * @return {@link Subscription} + */ + + public static CompositeSubscription from(Subscription... subscriptions) { + return new CompositeSubscription(subscriptions); + } + + /** + * A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together. + * * @param subscriptions * Subscriptions to group together * @return {@link Subscription} + * @deprecated Use {@link #from(Subscription...)} instead */ public static CompositeSubscription create(Subscription... subscriptions) { diff --git a/rxjava-core/src/test/java/rx/ObserveOnTests.java b/rxjava-core/src/test/java/rx/ObserveOnTests.java new file mode 100644 index 0000000000..e6e4e46b2d --- /dev/null +++ b/rxjava-core/src/test/java/rx/ObserveOnTests.java @@ -0,0 +1,118 @@ +package rx; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.concurrency.Schedulers; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class ObserveOnTests { + + /** + * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream + */ + @Test + public void testObserveOnWithNewThreadScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).observeOn(Schedulers.newThread()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); + } + + }); + } + + /** + * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered. + */ + @Test + public void testObserveOnWithThreadPoolScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).observeOn(Schedulers.threadPoolForComputation()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + /** + * Attempts to confirm that when pauses exist between events, the ScheduledObserver + * does not lose or reorder any events since the scheduler will not block, but will + * be re-scheduled when it receives new events after each pause. + * + * + * This is non-deterministic in proving success, but if it ever fails (non-deterministically) + * it is a sign of potential issues as thread-races and scheduling should not affect output. + */ + @Test + public void testObserveOnOrderingConcurrency() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 10000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + if (randomIntFrom0to100() > 98) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return t1 * _multiple; + } + + }).observeOn(Schedulers.threadPoolForComputation()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + private static int randomIntFrom0to100() { + // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml + long x = System.nanoTime(); + x ^= (x << 21); + x ^= (x >>> 35); + x ^= (x << 4); + return Math.abs((int) x % 100); + } + +}