From dc7a3f8f575edc28d86daa2039715866059a574d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 15 Oct 2013 10:46:13 -0700 Subject: [PATCH 1/2] Scheduler overload with recursive support --- rxjava-core/src/main/java/rx/Scheduler.java | 82 ++++++++++++++----- .../java/rx/concurrency/TestSchedulers.java | 22 +++++ 2 files changed, 82 insertions(+), 22 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 6fdfc286c0..cb3430c1f3 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -27,8 +27,11 @@ import org.mockito.Mockito; import rx.concurrency.TestScheduler; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -83,23 +86,23 @@ public abstract class Scheduler { * Schedules a cancelable action to be executed periodically. * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. - * - * @param state + * + * @param state * State to pass into the action. - * @param action + * @param action * The action to execute periodically. - * @param initialDelay + * @param initialDelay * Time to wait before executing the action for the first time. - * @param period + * @param period * The time interval to wait each time in between executing the action. - * @param unit + * @param unit * The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { final long periodInNanos = unit.toNanos(period); final AtomicBoolean complete = new AtomicBoolean(); - + final Func2 recursiveAction = new Func2() { @Override public Subscription call(Scheduler scheduler, T state0) { @@ -128,7 +131,7 @@ public void call() { } }); } - + /** * Schedules a cancelable action to be executed at dueTime. * @@ -150,6 +153,40 @@ public Subscription schedule(T state, Func2 action) { + final CompositeSubscription parentSubscription = new CompositeSubscription(); + final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); + parentSubscription.add(childSubscription); + + final Func2 parentAction = new Func2() { + + @Override + public Subscription call(final Scheduler scheduler, final Func2 parentAction) { + action.call(new Action0() { + + @Override + public void call() { + if (!parentSubscription.isUnsubscribed()) { + childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction)); + } + } + + }); + return childSubscription; + } + }; + + parentSubscription.add(schedule(parentAction, parentAction)); + + return parentSubscription; + } /** * Schedules an action to be executed. @@ -187,17 +224,16 @@ public Subscription call(Scheduler scheduler, Void state) { }, delayTime, unit); } - /** * Schedules an action to be executed periodically. * - * @param action + * @param action * The action to execute periodically. - * @param initialDelay + * @param initialDelay * Time to wait before executing the action for the first time. - * @param period + * @param period * The time interval to wait each time in between executing the action. - * @param unit + * @param unit * The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ @@ -230,39 +266,41 @@ public int degreeOfParallelism() { } public static class UnitTest { - @SuppressWarnings("unchecked") // mocking is unchecked, unfortunately + @SuppressWarnings("unchecked") + // mocking is unchecked, unfortunately @Test public void testPeriodicScheduling() { final Func1 calledOp = mock(Func1.class); - + final TestScheduler scheduler = new TestScheduler(); Subscription subscription = scheduler.schedulePeriodically(new Action0() { - @Override public void call() { + @Override + public void call() { System.out.println(scheduler.now()); calledOp.call(scheduler.now()); } }, 1, 2, TimeUnit.SECONDS); - + verify(calledOp, never()).call(anyLong()); InOrder inOrder = Mockito.inOrder(calledOp); - + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); inOrder.verify(calledOp, never()).call(anyLong()); scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); inOrder.verify(calledOp, times(1)).call(1000L); - + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); inOrder.verify(calledOp, never()).call(3000L); - + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); inOrder.verify(calledOp, times(1)).call(3000L); - + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); inOrder.verify(calledOp, times(1)).call(5000L); inOrder.verify(calledOp, times(1)).call(7000L); - + subscription.unsubscribe(); scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); inOrder.verify(calledOp, never()).call(anyLong()); diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index 7d2851abf9..e1df828237 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -33,6 +33,7 @@ import rx.Subscription; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -473,6 +474,27 @@ public Subscription onSubscribe(final Observer observer) { fail("Error: " + observer.error.get().getMessage()); } } + + @Test + public void testRecursion() { + TestScheduler s = new TestScheduler(); + + final AtomicInteger counter = new AtomicInteger(0); + + Subscription subscription = s.schedule(new Action1() { + + @Override + public void call(Action0 self) { + counter.incrementAndGet(); + System.out.println("counter: " + counter.get()); + self.call(); + } + + }); + subscription.unsubscribe(); + assertEquals(0, counter.get()); + } + /** * Used to determine if onNext is being invoked concurrently. From 6ef2530749fe3ee3dac36ffca218e884b988721f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 15 Oct 2013 14:39:10 -0700 Subject: [PATCH 2/2] BugFix: unsubscribe was not propagating to parent Observable on merge(Observable>) --- .../java/rx/operators/OperationMerge.java | 80 ++++++++++++++++++- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 1152ee5637..2e5655b38f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +37,10 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; /** * Flattens a list of Observables into one Observable sequence, without any transformation. @@ -93,6 +98,7 @@ public Subscription onSubscribe(Observer> observ @Override public void unsubscribe() { + System.out.println("unsubscribe from merge"); unsubscribed = true; } @@ -125,6 +131,7 @@ private MergeObservable(Observable> sequences) } public Subscription onSubscribe(Observer actualObserver) { + CompositeSubscription completeSubscription = new CompositeSubscription(); /** * We must synchronize a merge because we subscribe to multiple sequences in parallel that will each be emitting. @@ -134,15 +141,16 @@ public Subscription onSubscribe(Observer actualObserver) { * Bug report: https://github.com/Netflix/RxJava/issues/200 */ SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription); + completeSubscription.add(subscription); SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, subscription); /** * Subscribe to the parent Observable to get to the children Observables */ - sequences.subscribe(new ParentObserver(synchronizedObserver)); + completeSubscription.add(sequences.subscribe(new ParentObserver(synchronizedObserver))); /* return our subscription to allow unsubscribing */ - return subscription; + return completeSubscription; } /** @@ -380,6 +388,70 @@ public void testUnSubscribe() { verify(stringObserver, never()).onCompleted(); } + @Test + public void testUnSubscribeObservableOfObservables() throws InterruptedException { + + final AtomicBoolean unsubscribed = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + + Observable> source = Observable.create(new OnSubscribeFunc>() { + + @Override + public Subscription onSubscribe(final Observer> observer) { + // verbose on purpose so I can track the inside of it + final Subscription s = Subscriptions.create(new Action0() { + + @Override + public void call() { + System.out.println("*** unsubscribed"); + unsubscribed.set(true); + } + + }); + + new Thread(new Runnable() { + + @Override + public void run() { + + while (!unsubscribed.get()) { + observer.onNext(Observable.from(1L, 2L)); + } + System.out.println("Done looping after unsubscribe: " + unsubscribed.get()); + observer.onCompleted(); + + // mark that the thread is finished + latch.countDown(); + } + }).start(); + + return s; + }; + + }); + + final AtomicInteger count = new AtomicInteger(); + Observable.create(merge(source)).take(6).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Long v) { + System.out.println("Value: " + v); + int c = count.incrementAndGet(); + if (c > 6) { + fail("Should be only 6"); + } + + } + }); + + latch.await(1000, TimeUnit.MILLISECONDS); + + System.out.println("unsubscribed: " + unsubscribed.get()); + + assertTrue(unsubscribed.get()); + + } + @Test public void testMergeArrayWithThreading() { final TestASynchronousObservable o1 = new TestASynchronousObservable(); @@ -453,9 +525,9 @@ public void onNext(String v) { // so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following // onNext is invoked. - + Thread.sleep(300); - + try { // in try/finally so threads are released via latch countDown even if assertion fails assertEquals(1, concurrentCounter.get()); } finally {