From 93d4e693194973c93e2f7e6fc0f4ca72208f8c24 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 20:30:30 -0800 Subject: [PATCH 01/15] Update Scheduler Tests for Recursion and Common Testing --- .../rx/schedulers/AbstractSchedulerTests.java | 273 ++++++++++++++++++ .../CurrentThreadSchedulerTest.java | 16 +- .../rx/schedulers/ExecutorSchedulerTests.java | 8 +- .../rx/schedulers/ImmediateSchedulerTest.java | 17 +- .../rx/schedulers/NewThreadSchedulerTest.java | 28 ++ .../schedulers/SchedulerUnsubscribeTest.java | 109 ------- 6 files changed, 331 insertions(+), 120 deletions(-) create mode 100644 rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java create mode 100644 rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java delete mode 100644 rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java new file mode 100644 index 0000000000..bbd14b9812 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -0,0 +1,273 @@ +package rx.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.operators.SafeObservableSubscription; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public abstract class AbstractSchedulerTests { + + /** + * The scheduler to test + * + * @return + */ + protected abstract Scheduler getScheduler(); + + @Test + public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch unsubscribeLatch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + Subscription s = getScheduler().schedule(0L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + i++; + // System.out.println("i: " + i); + if (i == 10) { + latch.countDown(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } + } + + counter.incrementAndGet(); + return innerScheduler.schedule(i, this); + } + }); + + latch.await(); + s.unsubscribe(); + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } + + /** + * Bug report: https://github.com/Netflix/RxJava/issues/431 + */ + @Test + public void testUnSubscribeForScheduler() throws InterruptedException { + + final AtomicInteger countReceived = new AtomicInteger(); + final AtomicInteger countGenerated = new AtomicInteger(); + final SafeObservableSubscription s = new SafeObservableSubscription(); + final CountDownLatch latch = new CountDownLatch(1); + + s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS) + .map(new Func1() { + @Override + public Long call(Long aLong) { + countGenerated.incrementAndGet(); + return aLong; + } + }) + .subscribeOn(getScheduler()) + .observeOn(getScheduler()) + .subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("--- completed"); + } + + @Override + public void onError(Throwable e) { + System.out.println("--- onError"); + } + + @Override + public void onNext(Long args) { + if (countReceived.incrementAndGet() == 2) { + s.unsubscribe(); + latch.countDown(); + } + System.out.println("==> Received " + args); + } + })); + + latch.await(1000, TimeUnit.MILLISECONDS); + + System.out.println("----------- it thinks it is finished ------------------ "); + Thread.sleep(100); + + assertEquals(2, countGenerated.get()); + } + + @Test + public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException { + final AtomicInteger countExecutions = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + final SafeObservableSubscription outerSubscription = new SafeObservableSubscription(); + final Func2 fInner = new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + countExecutions.incrementAndGet(); + i++; + System.out.println("i: " + i); + if (i == 1000) { + outerSubscription.unsubscribe(); + latch.countDown(); + } + if (i < 10000) { + return innerScheduler.schedule(i, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + }; + + Func2 fOuter = new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + CompositeSubscription s = new CompositeSubscription(); + s.add(innerScheduler.schedule(i, fInner)); + s.add(innerScheduler.schedule(i, fInner)); + return s; + } + }; + + outerSubscription.wrap(getScheduler().schedule(0L, fOuter)); + latch.await(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + System.out.println("Count: " + countExecutions.get()); + // we unsubscribe on first to 1000 so we hit 1999 instead of 2000 + assertEquals(1999, countExecutions.get()); + } + + @Test + public void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException { + final AtomicInteger countEmitted = new AtomicInteger(); + final AtomicInteger countTaken = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + final BooleanSubscription s = BooleanSubscription.create(); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int i = 1; + while (!s.isUnsubscribed() && i <= 100) { + System.out.println("onNext from fast producer: " + i); + o.onNext(i++); + } + o.onCompleted(); + } + }); + t.setDaemon(true); + t.start(); + return s; + } + }).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + countEmitted.incrementAndGet(); + } + }).doOnCompleted(new Action0() { + + @Override + public void call() { + System.out.println("-------- Done Emitting from Source ---------"); + } + }).observeOn(getScheduler()).doOnNext(new Action1() { + + @Override + public void call(Integer i) { + System.out.println(">> onNext to slowConsumer pre-take: " + i); + //force it to be slower than the producer + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + countTaken.incrementAndGet(); + } + }).take(10).toBlockingObservable().last(); + + // they will all emit because the consumer is running slow + assertEquals(100, countEmitted.get()); + // number received after take (but take will filter any extra) + assertEquals(10, value); + // so we also want to check the doOnNext after observeOn to see if it got unsubscribed + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + // we expect only 10 to make it through the observeOn side + assertEquals(10, countTaken.get()); + } + + @Test(timeout = 8000) + public void recursionUsingFunc2() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + getScheduler().schedule(0L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 5000000L) { + return innerScheduler.schedule(i, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + }); + + latch.await(); + } + + @Test(timeout = 8000) + public void recursionUsingAction0() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + getScheduler().schedule(new Action1() { + + private long i = 0; + + @Override + public void call(Action0 self) { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 5000000L) { + self.call(); + } else { + latch.countDown(); + } + } + }); + + latch.await(); + } + +} diff --git a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java index b71d96af9e..7d7fbf4acf 100644 --- a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,9 +22,15 @@ import org.junit.Test; import org.mockito.InOrder; +import rx.Scheduler; import rx.util.functions.Action0; -public class CurrentThreadSchedulerTest { +public class CurrentThreadSchedulerTest extends AbstractSchedulerTests { + + @Override + protected Scheduler getScheduler() { + return CurrentThreadScheduler.getInstance(); + } @Test public void testNestedActions() { diff --git a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java index a4f306e436..fa825ac915 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java @@ -28,7 +28,13 @@ import rx.util.functions.Action1; import rx.util.functions.Func2; -public class ExecutorSchedulerTests { +public class ExecutorSchedulerTests extends AbstractSchedulerTests { + + @Override + protected Scheduler getScheduler() { + // this is an implementation of ExecutorScheduler + return Schedulers.threadPoolForComputation(); + } @Test public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() { diff --git a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index bfece3af6c..2b81b3452b 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,9 +20,16 @@ import org.junit.Test; import org.mockito.InOrder; +import rx.Scheduler; import rx.util.functions.Action0; -public class ImmediateSchedulerTest { +public class ImmediateSchedulerTest extends AbstractSchedulerTests { + + @Override + protected Scheduler getScheduler() { + return ImmediateScheduler.getInstance(); + } + @Test public void testNestedActions() { final ImmediateScheduler scheduler = new ImmediateScheduler(); diff --git a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java new file mode 100644 index 0000000000..a0e65582b6 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -0,0 +1,28 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.schedulers; + +import rx.Scheduler; + +public class NewThreadSchedulerTest extends AbstractSchedulerTests { + + @Override + protected Scheduler getScheduler() { + return NewThreadScheduler.getInstance(); + } + +} diff --git a/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java b/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java deleted file mode 100644 index ec51d7f977..0000000000 --- a/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.schedulers; - -import static org.junit.Assert.*; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import rx.Observable; -import rx.Observer; -import rx.Scheduler; -import rx.schedulers.Schedulers; -import rx.operators.SafeObservableSubscription; -import rx.util.functions.Func1; - -public class SchedulerUnsubscribeTest { - - /** - * Bug report: https://github.com/Netflix/RxJava/issues/431 - */ - @Test - public void testUnsubscribeOfNewThread() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.newThread()); - } - - @Test - public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.threadPoolForIO()); - } - - @Test - public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.threadPoolForComputation()); - } - - @Test - public void testUnsubscribeOfImmediateThread() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.immediate()); - } - - @Test - public void testUnsubscribeOfCurrentThread() throws InterruptedException { - testUnSubscribeForScheduler(Schedulers.currentThread()); - } - - public void testUnSubscribeForScheduler(Scheduler scheduler) throws InterruptedException { - - final AtomicInteger countReceived = new AtomicInteger(); - final AtomicInteger countGenerated = new AtomicInteger(); - final SafeObservableSubscription s = new SafeObservableSubscription(); - final CountDownLatch latch = new CountDownLatch(1); - - s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS) - .map(new Func1() { - @Override - public Long call(Long aLong) { - System.out.println("generated " + aLong); - countGenerated.incrementAndGet(); - return aLong; - } - }) - .subscribeOn(scheduler) - .observeOn(scheduler) - .subscribe(new Observer() { - @Override - public void onCompleted() { - System.out.println("--- completed"); - } - - @Override - public void onError(Throwable e) { - System.out.println("--- onError"); - } - - @Override - public void onNext(Long args) { - if (countReceived.incrementAndGet() == 2) { - s.unsubscribe(); - latch.countDown(); - } - System.out.println("==> Received " + args); - } - })); - - latch.await(1000, TimeUnit.MILLISECONDS); - - System.out.println("----------- it thinks it is finished ------------------ "); - Thread.sleep(100); - - assertEquals(2, countGenerated.get()); - } -} From 0977f040a403bec9f5818247a9dc29fa649b2e19 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 20:30:43 -0800 Subject: [PATCH 02/15] Fix Deprecated Method Call --- rxjava-core/src/main/java/rx/Scheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index b01872f226..a4cc789cd4 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -165,7 +165,7 @@ public Subscription call(final Scheduler scheduler, final Func2 parentAction) { @Override public void call() { if (!parentSubscription.isUnsubscribed()) { - childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction)); + childSubscription.set(scheduler.schedule(parentAction, parentAction)); } } From fdbf5efb66bd1e2ad6039c13bfa53f9d6e3373ac Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 20:31:17 -0800 Subject: [PATCH 03/15] Memory Leak Tests NewThreadScheduler is working, the other two are not so commented out for now until fixed. --- .../schedulers/TestRecursionMemoryUsage.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java diff --git a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java new file mode 100644 index 0000000000..730cde5e0b --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java @@ -0,0 +1,84 @@ +package rx.schedulers; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func2; + +/** + * Used for manual testing of memory leaks with recursive schedulers. + * + */ +public class TestRecursionMemoryUsage { + + public static void main(String args[]) { + usingFunc2(Schedulers.newThread()); + usingAction0(Schedulers.newThread()); +// +// usingFunc2(Schedulers.currentThread()); +// usingAction0(Schedulers.currentThread()); + +// usingFunc2(Schedulers.threadPoolForComputation()); +// usingAction0(Schedulers.threadPoolForComputation()); + } + + protected static void usingFunc2(final Scheduler scheduler) { + System.out.println("************ usingFunc2: " + scheduler); + Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + return scheduler.schedule(0L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + i++; + if (i % 500000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + o.onNext(i); + } + if (i == 100000000L) { + o.onCompleted(); + return Subscriptions.empty(); + } + + return innerScheduler.schedule(i, this); + } + }); + } + }).toBlockingObservable().last(); + } + + protected static void usingAction0(final Scheduler scheduler) { + System.out.println("************ usingAction0: " + scheduler); + Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer o) { + return scheduler.schedule(new Action1() { + + private long i = 0; + + @Override + public void call(Action0 self) { + i++; + if (i % 500000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + o.onNext(i); + } + if (i == 100000000L) { + o.onCompleted(); + return; + } + self.call(); + } + }); + } + }).toBlockingObservable().last(); + } +} From b1d9c1cb62517f7cf3cbb64ac2c9bc2fd55d4f50 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 20:32:05 -0800 Subject: [PATCH 04/15] Fix Memory Leak in NewThreadScheduler Recursion - the Action0 method did not have a leak - the Func2 method on inner scheduler recursion did have a leak --- .../rx/schedulers/NewThreadScheduler.java | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 7e460923a6..bc6b971b18 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -25,7 +25,9 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func2; /** @@ -46,6 +48,7 @@ private NewThreadScheduler() { private static class EventLoopScheduler extends Scheduler { private final ExecutorService executor; + private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); private EventLoopScheduler() { executor = Executors.newFixedThreadPool(1, new ThreadFactory() { @@ -61,21 +64,30 @@ public Thread newThread(Runnable r) { @Override public Subscription schedule(T state, Func2 action) { + CompositeSubscription s = new CompositeSubscription(); final DiscardableAction discardableAction = new DiscardableAction(state, action); - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - + s.add(discardableAction); + final Scheduler _scheduler = this; - subscription.add(Subscriptions.from(executor.submit(new Runnable() { + s.add(Subscriptions.from(executor.submit(new Runnable() { @Override public void run() { - Subscription s = discardableAction.call(_scheduler); - subscription.add(s); + discardableAction.call(_scheduler); } }))); - - return subscription; + + // replace the EventLoopScheduler child subscription with this one + childSubscription.set(s); + /* + * If `schedule` is run concurrently instead of recursively then we'd lose subscriptions as the `childSubscription` + * only remembers the last one scheduled. However, the parent subscription will shutdown the entire EventLoopScheduler + * and the ExecutorService which will terminate all outstanding tasks so this childSubscription is actually somewhat + * superfluous for stopping and cleanup ... though childSubscription does ensure exactness as can be seen by + * the `testUnSubscribeForScheduler()` unit test which fails if the `childSubscription` does not exist. + */ + + return childSubscription; } @Override @@ -103,12 +115,26 @@ public void run() { return subscription; } + private void shutdownNow() { + executor.shutdownNow(); + } + } @Override public Subscription schedule(T state, Func2 action) { - EventLoopScheduler s = new EventLoopScheduler(); - return s.schedule(state, action); + final EventLoopScheduler s = new EventLoopScheduler(); + CompositeSubscription cs = new CompositeSubscription(); + cs.add(s.schedule(state, action)); + cs.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + // shutdown the executor, all tasks queued to run and clean up resources + s.shutdownNow(); + } + })); + return cs; } @Override From 9f35594ec806672e7760873ba6837eac8f9b4454 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 21:06:37 -0800 Subject: [PATCH 05/15] Scheduler Unit Tests - passing for all but ExecutorScheduler --- .../AbstractSchedulerConcurrencyTests.java | 159 ++++++++++ .../rx/schedulers/AbstractSchedulerTests.java | 295 +++++++++--------- .../CurrentThreadSchedulerTest.java | 122 -------- .../rx/schedulers/ExecutorSchedulerTests.java | 2 +- .../rx/schedulers/ImmediateSchedulerTest.java | 57 +--- .../rx/schedulers/NewThreadSchedulerTest.java | 2 +- 6 files changed, 306 insertions(+), 331 deletions(-) create mode 100644 rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java new file mode 100644 index 0000000000..cd8f5907a2 --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -0,0 +1,159 @@ +package rx.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.operators.SafeObservableSubscription; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func2; + +/** + * Base tests for schedulers that involve threads (concurrency). + * + * These can only run on Schedulers that launch threads since they expect async/concurrent behavior. + * + * The Current/Immediate schedulers will not work with these tests. + */ +public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedulerTests { + + @Test + public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch unsubscribeLatch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + Subscription s = getScheduler().schedule(0L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + i++; + // System.out.println("i: " + i); + if (i == 10) { + latch.countDown(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } + } + + counter.incrementAndGet(); + return innerScheduler.schedule(i, this); + } + }); + + latch.await(); + s.unsubscribe(); + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } + + @Test + public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException { + final AtomicInteger countExecutions = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + final SafeObservableSubscription outerSubscription = new SafeObservableSubscription(); + final Func2 fInner = new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + countExecutions.incrementAndGet(); + i++; + System.out.println("i: " + i); + if (i == 1000) { + outerSubscription.unsubscribe(); + latch.countDown(); + } + if (i < 10000) { + return innerScheduler.schedule(i, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + }; + + Func2 fOuter = new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + CompositeSubscription s = new CompositeSubscription(); + s.add(innerScheduler.schedule(i, fInner)); + s.add(innerScheduler.schedule(i, fInner)); + return s; + } + }; + + outerSubscription.wrap(getScheduler().schedule(0L, fOuter)); + latch.await(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + System.out.println("Count: " + countExecutions.get()); + // we unsubscribe on first to 1000 so we hit 1999 instead of 2000 + assertEquals(1999, countExecutions.get()); + } + + @Test(timeout = 8000) + public void recursionUsingFunc2() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + getScheduler().schedule(0L, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Long i) { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 5000000L) { + return innerScheduler.schedule(i, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + }); + + latch.await(); + } + + @Test(timeout = 8000) + public void recursionUsingAction0() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + getScheduler().schedule(new Action1() { + + private long i = 0; + + @Override + public void call(Action0 self) { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 5000000L) { + self.call(); + } else { + latch.countDown(); + } + } + }); + + latch.await(); + } + +} diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index bbd14b9812..921780f2b2 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -1,12 +1,14 @@ package rx.schedulers; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; import rx.Observable.OnSubscribeFunc; @@ -15,63 +17,92 @@ import rx.Subscription; import rx.operators.SafeObservableSubscription; import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; -import rx.util.functions.Func2; +/** + * Base tests for all schedulers including Immediate/Current. + */ public abstract class AbstractSchedulerTests { /** * The scheduler to test - * - * @return */ protected abstract Scheduler getScheduler(); @Test - public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final CountDownLatch unsubscribeLatch = new CountDownLatch(1); - final AtomicInteger counter = new AtomicInteger(); - Subscription s = getScheduler().schedule(0L, new Func2() { + public final void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException { + final AtomicInteger countEmitted = new AtomicInteger(); + final AtomicInteger countTaken = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { @Override - public Subscription call(Scheduler innerScheduler, Long i) { - i++; - // System.out.println("i: " + i); - if (i == 10) { - latch.countDown(); - try { - // wait for unsubscribe to finish so we are not racing it - unsubscribeLatch.await(); - } catch (InterruptedException e) { - // we expect the countDown if unsubscribe is not working - // or to be interrupted if unsubscribe is successful since - // the unsubscribe will interrupt it as it is calling Future.cancel(true) - // so we will ignore the stacktrace + public Subscription onSubscribe(final Observer o) { + final BooleanSubscription s = BooleanSubscription.create(); + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + int i = 1; + while (!s.isUnsubscribed() && i <= 100) { + System.out.println("onNext from fast producer: " + i); + o.onNext(i++); + } + o.onCompleted(); } - } + }); + t.setDaemon(true); + t.start(); + return s; + } + }).doOnNext(new Action1() { - counter.incrementAndGet(); - return innerScheduler.schedule(i, this); + @Override + public void call(Integer i) { + countEmitted.incrementAndGet(); } - }); + }).doOnCompleted(new Action0() { + + @Override + public void call() { + System.out.println("-------- Done Emitting from Source ---------"); + } + }).observeOn(getScheduler()).doOnNext(new Action1() { - latch.await(); - s.unsubscribe(); - unsubscribeLatch.countDown(); + @Override + public void call(Integer i) { + System.out.println(">> onNext to slowConsumer pre-take: " + i); + //force it to be slower than the producer + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + countTaken.incrementAndGet(); + } + }).take(10).toBlockingObservable().last(); + + if (getScheduler() instanceof CurrentThreadScheduler || getScheduler() instanceof ImmediateScheduler) { + // since there is no concurrency it will block and only emit as many as it can process + assertEquals(10, countEmitted.get()); + } else { + // they will all emit because the consumer is running slow + assertEquals(100, countEmitted.get()); + } + // number received after take (but take will filter any extra) + assertEquals(10, value); + // so we also want to check the doOnNext after observeOn to see if it got unsubscribed Thread.sleep(200); // let time pass to see if the scheduler is still doing work - assertEquals(10, counter.get()); + // we expect only 10 to make it through the observeOn side + assertEquals(10, countTaken.get()); } /** * Bug report: https://github.com/Netflix/RxJava/issues/431 */ @Test - public void testUnSubscribeForScheduler() throws InterruptedException { + public final void testUnSubscribeForScheduler() throws InterruptedException { final AtomicInteger countReceived = new AtomicInteger(); final AtomicInteger countGenerated = new AtomicInteger(); @@ -118,156 +149,118 @@ public void onNext(Long args) { } @Test - public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException { - final AtomicInteger countExecutions = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - final SafeObservableSubscription outerSubscription = new SafeObservableSubscription(); - final Func2 fInner = new Func2() { + public final void testNestedActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + final Action0 firstAction = new Action0() { @Override - public Subscription call(Scheduler innerScheduler, Long i) { - countExecutions.incrementAndGet(); - i++; - System.out.println("i: " + i); - if (i == 1000) { - outerSubscription.unsubscribe(); - latch.countDown(); - } - if (i < 10000) { - return innerScheduler.schedule(i, this); - } else { - latch.countDown(); - return Subscriptions.empty(); - } + public void call() { + firstStepStart.call(); + firstStepEnd.call(); } }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); - Func2 fOuter = new Func2() { - + } + }; + final Action0 thirdAction = new Action0() { @Override - public Subscription call(Scheduler innerScheduler, Long i) { - CompositeSubscription s = new CompositeSubscription(); - s.add(innerScheduler.schedule(i, fInner)); - s.add(innerScheduler.schedule(i, fInner)); - return s; + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); } }; - outerSubscription.wrap(getScheduler().schedule(0L, fOuter)); - latch.await(); - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - System.out.println("Count: " + countExecutions.get()); - // we unsubscribe on first to 1000 so we hit 1999 instead of 2000 - assertEquals(1999, countExecutions.get()); - } + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); - @Test - public void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException { - final AtomicInteger countEmitted = new AtomicInteger(); - final AtomicInteger countTaken = new AtomicInteger(); - int value = Observable.create(new OnSubscribeFunc() { + scheduler.schedule(thirdAction); - @Override - public Subscription onSubscribe(final Observer o) { - final BooleanSubscription s = BooleanSubscription.create(); - Thread t = new Thread(new Runnable() { + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } - @Override - public void run() { - int i = 1; - while (!s.isUnsubscribed() && i <= 100) { - System.out.println("onNext from fast producer: " + i); - o.onNext(i++); - } - o.onCompleted(); - } - }); - t.setDaemon(true); - t.start(); - return s; - } - }).doOnNext(new Action1() { + @Test + public final void testSequenceOfActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - @Override - public void call(Integer i) { - countEmitted.incrementAndGet(); - } - }).doOnCompleted(new Action0() { + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); - @Override - public void call() { - System.out.println("-------- Done Emitting from Source ---------"); - } - }).observeOn(getScheduler()).doOnNext(new Action1() { + scheduler.schedule(first); + scheduler.schedule(second); - @Override - public void call(Integer i) { - System.out.println(">> onNext to slowConsumer pre-take: " + i); - //force it to be slower than the producer - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - countTaken.incrementAndGet(); - } - }).take(10).toBlockingObservable().last(); + verify(first, times(1)).call(); + verify(second, times(1)).call(); - // they will all emit because the consumer is running slow - assertEquals(100, countEmitted.get()); - // number received after take (but take will filter any extra) - assertEquals(10, value); - // so we also want to check the doOnNext after observeOn to see if it got unsubscribed - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - // we expect only 10 to make it through the observeOn side - assertEquals(10, countTaken.get()); } - @Test(timeout = 8000) - public void recursionUsingFunc2() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - getScheduler().schedule(0L, new Func2() { + @Test + public final void testSequenceOfDelayedActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + scheduler.schedule(new Action0() { @Override - public Subscription call(Scheduler innerScheduler, Long i) { - i++; - if (i % 100000 == 0) { - System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); - } - if (i < 5000000L) { - return innerScheduler.schedule(i, this); - } else { - latch.countDown(); - return Subscriptions.empty(); - } + public void call() { + scheduler.schedule(first, 30, TimeUnit.MILLISECONDS); + scheduler.schedule(second, 10, TimeUnit.MILLISECONDS); } }); - latch.await(); + InOrder inOrder = inOrder(first, second); + + inOrder.verify(second, times(1)).call(); + inOrder.verify(first, times(1)).call(); + } - @Test(timeout = 8000) - public void recursionUsingAction0() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - getScheduler().schedule(new Action1() { + @Test + public final void testMixOfDelayedAndNonDelayedActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - private long i = 0; + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + final Action0 third = mock(Action0.class); + final Action0 fourth = mock(Action0.class); + scheduler.schedule(new Action0() { @Override - public void call(Action0 self) { - i++; - if (i % 100000 == 0) { - System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); - } - if (i < 5000000L) { - self.call(); - } else { - latch.countDown(); - } + public void call() { + scheduler.schedule(first); + scheduler.schedule(second, 300, TimeUnit.MILLISECONDS); + scheduler.schedule(third, 100, TimeUnit.MILLISECONDS); + scheduler.schedule(fourth); } }); - latch.await(); + InOrder inOrder = inOrder(first, second, third, fourth); + + inOrder.verify(first, times(1)).call(); + inOrder.verify(fourth, times(1)).call(); + inOrder.verify(third, times(1)).call(); + inOrder.verify(second, times(1)).call(); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java index 7d7fbf4acf..961617bbd8 100644 --- a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java @@ -15,15 +15,7 @@ */ package rx.schedulers; -import static org.mockito.Mockito.*; - -import java.util.concurrent.TimeUnit; - -import org.junit.Test; -import org.mockito.InOrder; - import rx.Scheduler; -import rx.util.functions.Action0; public class CurrentThreadSchedulerTest extends AbstractSchedulerTests { @@ -32,118 +24,4 @@ protected Scheduler getScheduler() { return CurrentThreadScheduler.getInstance(); } - @Test - public void testNestedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 firstStepStart = mock(Action0.class); - final Action0 firstStepEnd = mock(Action0.class); - - final Action0 secondStepStart = mock(Action0.class); - final Action0 secondStepEnd = mock(Action0.class); - - final Action0 thirdStepStart = mock(Action0.class); - final Action0 thirdStepEnd = mock(Action0.class); - - final Action0 firstAction = new Action0() { - @Override - public void call() { - firstStepStart.call(); - firstStepEnd.call(); - } - }; - final Action0 secondAction = new Action0() { - @Override - public void call() { - secondStepStart.call(); - scheduler.schedule(firstAction); - secondStepEnd.call(); - - } - }; - final Action0 thirdAction = new Action0() { - @Override - public void call() { - thirdStepStart.call(); - scheduler.schedule(secondAction); - thirdStepEnd.call(); - } - }; - - InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); - - scheduler.schedule(thirdAction); - - inOrder.verify(thirdStepStart, times(1)).call(); - inOrder.verify(thirdStepEnd, times(1)).call(); - inOrder.verify(secondStepStart, times(1)).call(); - inOrder.verify(secondStepEnd, times(1)).call(); - inOrder.verify(firstStepStart, times(1)).call(); - inOrder.verify(firstStepEnd, times(1)).call(); - } - - @Test - public void testSequenceOfActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - - scheduler.schedule(first); - scheduler.schedule(second); - - verify(first, times(1)).call(); - verify(second, times(1)).call(); - - } - - @Test - public void testSequenceOfDelayedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - - scheduler.schedule(new Action0() { - @Override - public void call() { - scheduler.schedule(first, 30, TimeUnit.MILLISECONDS); - scheduler.schedule(second, 10, TimeUnit.MILLISECONDS); - } - }); - - InOrder inOrder = inOrder(first, second); - - inOrder.verify(second, times(1)).call(); - inOrder.verify(first, times(1)).call(); - - } - - @Test - public void testMixOfDelayedAndNonDelayedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - final Action0 third = mock(Action0.class); - final Action0 fourth = mock(Action0.class); - - scheduler.schedule(new Action0() { - @Override - public void call() { - scheduler.schedule(first); - scheduler.schedule(second, 300, TimeUnit.MILLISECONDS); - scheduler.schedule(third, 100, TimeUnit.MILLISECONDS); - scheduler.schedule(fourth); - } - }); - - InOrder inOrder = inOrder(first, second, third, fourth); - - inOrder.verify(first, times(1)).call(); - inOrder.verify(fourth, times(1)).call(); - inOrder.verify(third, times(1)).call(); - inOrder.verify(second, times(1)).call(); - - } } diff --git a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java index fa825ac915..d065475232 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java @@ -28,7 +28,7 @@ import rx.util.functions.Action1; import rx.util.functions.Func2; -public class ExecutorSchedulerTests extends AbstractSchedulerTests { +public class ExecutorSchedulerTests extends AbstractSchedulerConcurrencyTests { @Override protected Scheduler getScheduler() { diff --git a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index 2b81b3452b..ff3e6c138b 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -15,68 +15,13 @@ */ package rx.schedulers; -import static org.mockito.Mockito.*; - -import org.junit.Test; -import org.mockito.InOrder; - import rx.Scheduler; -import rx.util.functions.Action0; -public class ImmediateSchedulerTest extends AbstractSchedulerTests { +public class ImmediateSchedulerTest extends AbstractSchedulerTests { @Override protected Scheduler getScheduler() { return ImmediateScheduler.getInstance(); } - @Test - public void testNestedActions() { - final ImmediateScheduler scheduler = new ImmediateScheduler(); - - final Action0 firstStepStart = mock(Action0.class); - final Action0 firstStepEnd = mock(Action0.class); - - final Action0 secondStepStart = mock(Action0.class); - final Action0 secondStepEnd = mock(Action0.class); - - final Action0 thirdStepStart = mock(Action0.class); - final Action0 thirdStepEnd = mock(Action0.class); - - final Action0 firstAction = new Action0() { - @Override - public void call() { - firstStepStart.call(); - firstStepEnd.call(); - } - }; - final Action0 secondAction = new Action0() { - @Override - public void call() { - secondStepStart.call(); - scheduler.schedule(firstAction); - secondStepEnd.call(); - - } - }; - final Action0 thirdAction = new Action0() { - @Override - public void call() { - thirdStepStart.call(); - scheduler.schedule(secondAction); - thirdStepEnd.call(); - } - }; - - InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); - - scheduler.schedule(thirdAction); - - inOrder.verify(thirdStepStart, times(1)).call(); - inOrder.verify(secondStepStart, times(1)).call(); - inOrder.verify(firstStepStart, times(1)).call(); - inOrder.verify(firstStepEnd, times(1)).call(); - inOrder.verify(secondStepEnd, times(1)).call(); - inOrder.verify(thirdStepEnd, times(1)).call(); - } } diff --git a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java index a0e65582b6..eaf5855e6c 100644 --- a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -18,7 +18,7 @@ import rx.Scheduler; -public class NewThreadSchedulerTest extends AbstractSchedulerTests { +public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @Override protected Scheduler getScheduler() { From 5b2dab3ad9b2226f5be972f6b4035f82360d1199 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 22:35:49 -0800 Subject: [PATCH 06/15] ExecutorScheduler Memory Leak Fix - new InnerExecutorScheduler and childSubscription - improvements to unit tests --- .../java/rx/schedulers/ExecutorScheduler.java | 145 +++++++++++++++--- .../rx/schedulers/NewThreadScheduler.java | 8 + .../AbstractSchedulerConcurrencyTests.java | 71 ++++----- .../schedulers/TestRecursionMemoryUsage.java | 6 +- 4 files changed, 162 insertions(+), 68 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java index 563e609612..443056438b 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -25,6 +25,7 @@ import rx.Scheduler; import rx.Subscription; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -68,9 +69,10 @@ public void run() { @Override public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; + final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler(executor); + // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + final CompositeSubscription subscription = new CompositeSubscription(discardableAction, _scheduler); if (executor instanceof ScheduledExecutorService) { // we are a ScheduledExecutorService so can do proper scheduling @@ -78,9 +80,7 @@ public Subscription schedule(final T state, final Func2 Subscription schedule(T state, Func2 action) { + CompositeSubscription s = new CompositeSubscription(); final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + s.add(discardableAction); + + final InnerExecutorScheduler _scheduler = new InnerExecutorScheduler(executor); + s.add(_scheduler); - // work to be done on a thread - Runnable r = new Runnable() { + s.add(execute(executor, new Runnable() { @Override public void run() { - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); + discardableAction.call(_scheduler); } - }; + })); + + return s; + } + /** + * Execute on the given Executor and retrieve a Subscription + * + * @param executor + * @param r + * @return + */ + private static Subscription execute(Executor executor, Runnable r) { // submit for immediate execution if (executor instanceof ExecutorService) { // we are an ExecutorService so get a Future back that supports unsubscribe Future f = ((ExecutorService) executor).submit(r); // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); + return Subscriptions.from(f); } else { // we are the lowest common denominator so can't unsubscribe once we execute executor.execute(r); + return Subscriptions.empty(); } + } - return subscription; + private static class InnerExecutorScheduler extends Scheduler implements Subscription { + + private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); + private final Executor executor; + + InnerExecutorScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(T state, Func2 action) { + if(childSubscription.isUnsubscribed()) { + return childSubscription; + } + + CompositeSubscription s = new CompositeSubscription(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + s.add(discardableAction); + + final Scheduler _scheduler = this; + + s.add(execute(executor, new Runnable() { + + @Override + public void run() { + discardableAction.call(_scheduler); + } + })); + + // replace the InnerExecutorScheduler child subscription with this one + childSubscription.set(s); + /* + * TODO: Consider what will happen if `schedule` is run concurrently instead of recursively + * and we lose subscriptions as the `childSubscription` only remembers the last one scheduled. + * + * Not obvious that this should ever happen. Can it? + * + * benjchristensen => Haven't been able to come up with a valid test case to prove this as an issue + * so it may not be. + */ + + return childSubscription; + } + + @Override + public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { + if(childSubscription.isUnsubscribed()) { + return childSubscription; + } + + CompositeSubscription s = new CompositeSubscription(); + final DiscardableAction discardableAction = new DiscardableAction(state, action); + s.add(discardableAction); + + final Scheduler _scheduler = this; + + if (executor instanceof ScheduledExecutorService) { + // we are a ScheduledExecutorService so can do proper scheduling + ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { + @Override + public void run() { + // when the delay has passed we now do the work on the actual scheduler + discardableAction.call(_scheduler); + } + }, delayTime, unit); + // replace the InnerExecutorScheduler child subscription with this one + childSubscription.set(Subscriptions.from(f)); + } else { + // we are not a ScheduledExecutorService so can't directly schedule + if (delayTime == 0) { + // no delay so put on the thread-pool right now + return schedule(state, action); + } else { + // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService + // to handle the scheduling and once it's ready then execute on this Executor + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + // now execute on the real Executor (by using the other overload that schedules for immediate execution) + _scheduler.schedule(state, action); + } + }, delayTime, unit); + // replace the InnerExecutorScheduler child subscription with this one + childSubscription.set(Subscriptions.from(f)); + } + } + return childSubscription; + } + + @Override + public void unsubscribe() { + childSubscription.unsubscribe(); + } } diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index bc6b971b18..a175fe608f 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -64,6 +64,10 @@ public Thread newThread(Runnable r) { @Override public Subscription schedule(T state, Func2 action) { + if (childSubscription.isUnsubscribed()) { + return childSubscription; + } + CompositeSubscription s = new CompositeSubscription(); final DiscardableAction discardableAction = new DiscardableAction(state, action); s.add(discardableAction); @@ -92,6 +96,10 @@ public void run() { @Override public Subscription schedule(final T state, final Func2 action, final long delayTime, final TimeUnit unit) { + if (childSubscription.isUnsubscribed()) { + return childSubscription; + } + // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep // we will instead schedule the event then launch the thread after the delay has passed final Scheduler _scheduler = this; diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index cd8f5907a2..8fb00c6553 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -3,17 +3,14 @@ import static org.junit.Assert.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.operators.SafeObservableSubscription; -import rx.subscriptions.BooleanSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; @@ -34,12 +31,11 @@ public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws Interrupt final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); - Subscription s = getScheduler().schedule(0L, new Func2() { + Subscription s = getScheduler().schedule(1L, new Func2() { @Override public Subscription call(Scheduler innerScheduler, Long i) { - i++; - // System.out.println("i: " + i); + System.out.println("Run: " + i); if (i == 10) { latch.countDown(); try { @@ -54,7 +50,7 @@ public Subscription call(Scheduler innerScheduler, Long i) { } counter.incrementAndGet(); - return innerScheduler.schedule(i, this); + return innerScheduler.schedule(i + 1, this); } }); @@ -66,62 +62,51 @@ public Subscription call(Scheduler innerScheduler, Long i) { } @Test - public void scheduleMultipleTasksOnOuterForSequentialExecution() throws InterruptedException { - final AtomicInteger countExecutions = new AtomicInteger(); + public void testUnsubscribeRecursiveScheduleWithStateAndFunc2AndDelay() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - final SafeObservableSubscription outerSubscription = new SafeObservableSubscription(); - final Func2 fInner = new Func2() { + final CountDownLatch unsubscribeLatch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + Subscription s = getScheduler().schedule(1L, new Func2() { @Override public Subscription call(Scheduler innerScheduler, Long i) { - countExecutions.incrementAndGet(); - i++; - System.out.println("i: " + i); - if (i == 1000) { - outerSubscription.unsubscribe(); - latch.countDown(); - } - if (i < 10000) { - return innerScheduler.schedule(i, this); - } else { + if (i == 10) { latch.countDown(); - return Subscriptions.empty(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } } - } - }; - - Func2 fOuter = new Func2() { - @Override - public Subscription call(Scheduler innerScheduler, Long i) { - CompositeSubscription s = new CompositeSubscription(); - s.add(innerScheduler.schedule(i, fInner)); - s.add(innerScheduler.schedule(i, fInner)); - return s; + counter.incrementAndGet(); + return innerScheduler.schedule(i + 1, this, 10, TimeUnit.MILLISECONDS); } - }; + }, 10, TimeUnit.MILLISECONDS); - outerSubscription.wrap(getScheduler().schedule(0L, fOuter)); latch.await(); + s.unsubscribe(); + unsubscribeLatch.countDown(); Thread.sleep(200); // let time pass to see if the scheduler is still doing work - System.out.println("Count: " + countExecutions.get()); - // we unsubscribe on first to 1000 so we hit 1999 instead of 2000 - assertEquals(1999, countExecutions.get()); + assertEquals(10, counter.get()); } @Test(timeout = 8000) public void recursionUsingFunc2() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - getScheduler().schedule(0L, new Func2() { + getScheduler().schedule(1L, new Func2() { @Override public Subscription call(Scheduler innerScheduler, Long i) { - i++; if (i % 100000 == 0) { System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); } - if (i < 5000000L) { - return innerScheduler.schedule(i, this); + if (i < 1000000L) { + return innerScheduler.schedule(i + 1, this); } else { latch.countDown(); return Subscriptions.empty(); @@ -145,7 +130,7 @@ public void call(Action0 self) { if (i % 100000 == 0) { System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); } - if (i < 5000000L) { + if (i < 1000000L) { self.call(); } else { latch.countDown(); diff --git a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java index 730cde5e0b..294193b167 100644 --- a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java +++ b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java @@ -19,12 +19,12 @@ public class TestRecursionMemoryUsage { public static void main(String args[]) { usingFunc2(Schedulers.newThread()); usingAction0(Schedulers.newThread()); -// + // usingFunc2(Schedulers.currentThread()); // usingAction0(Schedulers.currentThread()); -// usingFunc2(Schedulers.threadPoolForComputation()); -// usingAction0(Schedulers.threadPoolForComputation()); + usingFunc2(Schedulers.threadPoolForComputation()); + usingAction0(Schedulers.threadPoolForComputation()); } protected static void usingFunc2(final Scheduler scheduler) { From 9a94fd278693e098837dddce222a732c1a57f532 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 30 Dec 2013 23:33:55 -0800 Subject: [PATCH 07/15] CurrentThreadScheduler Memory Leak Fixed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Current/Immediate/NewThread/Executor Schedulers are passing unit tests - Current/NewThread/Executor Schedulers do not leak memory on the recursion test (Immediate can’t be used for recursion otherwise it stack overflows) --- .../rx/schedulers/CurrentThreadScheduler.java | 56 ++++-- .../AbstractSchedulerConcurrencyTests.java | 53 +++++- .../rx/schedulers/AbstractSchedulerTests.java | 170 ++++++++++++------ .../rx/schedulers/ImmediateSchedulerTest.java | 23 +++ .../schedulers/TestRecursionMemoryUsage.java | 4 +- 5 files changed, 234 insertions(+), 72 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java index d1550a2422..67d66aa4aa 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java @@ -17,10 +17,12 @@ import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Func2; /** @@ -28,6 +30,7 @@ */ public class CurrentThreadScheduler extends Scheduler { private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + private static final AtomicLong counter = new AtomicLong(0); public static CurrentThreadScheduler getInstance() { return INSTANCE; @@ -38,25 +41,27 @@ public static CurrentThreadScheduler getInstance() { /* package accessible for unit tests */CurrentThreadScheduler() { } - private final AtomicInteger counter = new AtomicInteger(0); - @Override public Subscription schedule(T state, Func2 action) { + // immediately move to the InnerCurrentThreadScheduler + InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); DiscardableAction discardableAction = new DiscardableAction(state, action); - enqueue(discardableAction, now()); - return discardableAction; + enqueue(innerScheduler, discardableAction, now()); + return innerScheduler; } @Override public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { long execTime = now() + unit.toMillis(dueTime); + // immediately move to the InnerCurrentThreadScheduler + InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); - enqueue(discardableAction, execTime); + enqueue(innerScheduler, discardableAction, execTime); return discardableAction; } - private void enqueue(DiscardableAction action, long execTime) { + private static void enqueue(Scheduler scheduler, DiscardableAction action, long execTime) { PriorityQueue queue = QUEUE.get(); boolean exec = queue == null; @@ -69,19 +74,50 @@ private void enqueue(DiscardableAction action, long execTime) { if (exec) { while (!queue.isEmpty()) { - queue.poll().action.call(this); + queue.poll().action.call(scheduler); } QUEUE.set(null); } } + private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription { + private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); + + @Override + public Subscription schedule(T state, Func2 action) { + DiscardableAction discardableAction = new DiscardableAction(state, action); + childSubscription.set(discardableAction); + enqueue(this, discardableAction, now()); + return childSubscription; + } + + @Override + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + long execTime = now() + unit.toMillis(delayTime); + + DiscardableAction discardableAction = new DiscardableAction(state, action); + childSubscription.set(discardableAction); + enqueue(this, discardableAction, execTime); + return childSubscription; + } + + @Override + public void unsubscribe() { + childSubscription.unsubscribe(); + } + + } + + /** + * Use time to sort items so delayed actions are sorted to their appropriate position in the queue. + */ private static class TimedAction implements Comparable { final DiscardableAction action; final Long execTime; - final Integer count; // In case if time between enqueueing took less than 1ms + final Long count; // In case if time between enqueueing took less than 1ms - private TimedAction(DiscardableAction action, Long execTime, Integer count) { + private TimedAction(DiscardableAction action, Long execTime, Long count) { this.action = action; this.execTime = execTime; this.count = count; diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 8fb00c6553..6602d5071d 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -8,13 +8,15 @@ import org.junit.Test; +import rx.Observable; +import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.operators.SafeObservableSubscription; -import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; /** @@ -26,6 +28,55 @@ */ public abstract class AbstractSchedulerConcurrencyTests extends AbstractSchedulerTests { + /** + * Bug report: https://github.com/Netflix/RxJava/issues/431 + */ + @Test + public final void testUnSubscribeForScheduler() throws InterruptedException { + final AtomicInteger countReceived = new AtomicInteger(); + final AtomicInteger countGenerated = new AtomicInteger(); + final SafeObservableSubscription s = new SafeObservableSubscription(); + final CountDownLatch latch = new CountDownLatch(1); + + s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS) + .map(new Func1() { + @Override + public Long call(Long aLong) { + countGenerated.incrementAndGet(); + return aLong; + } + }) + .subscribeOn(getScheduler()) + .observeOn(getScheduler()) + .subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("--- completed"); + } + + @Override + public void onError(Throwable e) { + System.out.println("--- onError"); + } + + @Override + public void onNext(Long args) { + if (countReceived.incrementAndGet() == 2) { + s.unsubscribe(); + latch.countDown(); + } + System.out.println("==> Received " + args); + } + })); + + latch.await(1000, TimeUnit.MILLISECONDS); + + System.out.println("----------- it thinks it is finished ------------------ "); + Thread.sleep(100); + + assertEquals(2, countGenerated.get()); + } + @Test public void testUnsubscribeRecursiveScheduleWithStateAndFunc2() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index 921780f2b2..508727bc12 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -15,11 +15,11 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.operators.SafeObservableSubscription; import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; -import rx.util.functions.Func1; +import rx.util.functions.Func2; /** * Base tests for all schedulers including Immediate/Current. @@ -98,60 +98,11 @@ public void call(Integer i) { assertEquals(10, countTaken.get()); } - /** - * Bug report: https://github.com/Netflix/RxJava/issues/431 - */ @Test - public final void testUnSubscribeForScheduler() throws InterruptedException { - - final AtomicInteger countReceived = new AtomicInteger(); - final AtomicInteger countGenerated = new AtomicInteger(); - final SafeObservableSubscription s = new SafeObservableSubscription(); + public void testNestedActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); final CountDownLatch latch = new CountDownLatch(1); - s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS) - .map(new Func1() { - @Override - public Long call(Long aLong) { - countGenerated.incrementAndGet(); - return aLong; - } - }) - .subscribeOn(getScheduler()) - .observeOn(getScheduler()) - .subscribe(new Observer() { - @Override - public void onCompleted() { - System.out.println("--- completed"); - } - - @Override - public void onError(Throwable e) { - System.out.println("--- onError"); - } - - @Override - public void onNext(Long args) { - if (countReceived.incrementAndGet() == 2) { - s.unsubscribe(); - latch.countDown(); - } - System.out.println("==> Received " + args); - } - })); - - latch.await(1000, TimeUnit.MILLISECONDS); - - System.out.println("----------- it thinks it is finished ------------------ "); - Thread.sleep(100); - - assertEquals(2, countGenerated.get()); - } - - @Test - public final void testNestedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); - final Action0 firstStepStart = mock(Action0.class); final Action0 firstStepEnd = mock(Action0.class); @@ -166,6 +117,7 @@ public final void testNestedActions() { public void call() { firstStepStart.call(); firstStepEnd.call(); + latch.countDown(); } }; final Action0 secondAction = new Action0() { @@ -189,6 +141,8 @@ public void call() { InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); scheduler.schedule(thirdAction); + + latch.await(); inOrder.verify(thirdStepStart, times(1)).call(); inOrder.verify(thirdStepEnd, times(1)).call(); @@ -199,14 +153,24 @@ public void call() { } @Test - public final void testSequenceOfActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + public final void testSequenceOfActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final CountDownLatch latch = new CountDownLatch(1); final Action0 first = mock(Action0.class); final Action0 second = mock(Action0.class); scheduler.schedule(first); scheduler.schedule(second); + scheduler.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }); + + latch.await(); verify(first, times(1)).call(); verify(second, times(1)).call(); @@ -214,9 +178,10 @@ public final void testSequenceOfActions() { } @Test - public final void testSequenceOfDelayedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + public void testSequenceOfDelayedActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final CountDownLatch latch = new CountDownLatch(1); final Action0 first = mock(Action0.class); final Action0 second = mock(Action0.class); @@ -225,9 +190,17 @@ public final void testSequenceOfDelayedActions() { public void call() { scheduler.schedule(first, 30, TimeUnit.MILLISECONDS); scheduler.schedule(second, 10, TimeUnit.MILLISECONDS); + scheduler.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }, 40, TimeUnit.MILLISECONDS); } }); + latch.await(); InOrder inOrder = inOrder(first, second); inOrder.verify(second, times(1)).call(); @@ -236,9 +209,10 @@ public void call() { } @Test - public final void testMixOfDelayedAndNonDelayedActions() { - final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + public void testMixOfDelayedAndNonDelayedActions() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final CountDownLatch latch = new CountDownLatch(1); final Action0 first = mock(Action0.class); final Action0 second = mock(Action0.class); final Action0 third = mock(Action0.class); @@ -251,16 +225,94 @@ public void call() { scheduler.schedule(second, 300, TimeUnit.MILLISECONDS); scheduler.schedule(third, 100, TimeUnit.MILLISECONDS); scheduler.schedule(fourth); + scheduler.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }, 400, TimeUnit.MILLISECONDS); } }); + latch.await(); InOrder inOrder = inOrder(first, second, third, fourth); inOrder.verify(first, times(1)).call(); inOrder.verify(fourth, times(1)).call(); inOrder.verify(third, times(1)).call(); inOrder.verify(second, times(1)).call(); + } + + @Test + public final void testRecursiveExecutionWithAction0() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + scheduler.schedule(new Action1() { + + @Override + public void call(Action0 self) { + if (i.incrementAndGet() < 100) { + self.call(); + } else { + latch.countDown(); + } + } + }); + + latch.await(); + assertEquals(100, i.get()); + } + + @Test + public final void testRecursiveExecutionWithFunc2() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + scheduler.schedule(0, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Integer state) { + i.set(state); + if (state < 100) { + return innerScheduler.schedule(state + 1, this); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + + }); + + latch.await(); + assertEquals(100, i.get()); + } + + @Test + public final void testRecursiveExecutionWithFunc2AndDelayTime() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + scheduler.schedule(0, new Func2() { + + @Override + public Subscription call(Scheduler innerScheduler, Integer state) { + i.set(state); + if (state < 100) { + return innerScheduler.schedule(state + 1, this, 5, TimeUnit.MILLISECONDS); + } else { + latch.countDown(); + return Subscriptions.empty(); + } + } + + }, 50, TimeUnit.MILLISECONDS); + latch.await(); + assertEquals(100, i.get()); } } diff --git a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index ff3e6c138b..a00878de10 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -15,6 +15,8 @@ */ package rx.schedulers; +import org.junit.Test; + import rx.Scheduler; public class ImmediateSchedulerTest extends AbstractSchedulerTests { @@ -24,4 +26,25 @@ protected Scheduler getScheduler() { return ImmediateScheduler.getInstance(); } + @Override + @Test + public final void testNestedActions() { + // ordering of nested actions will not match other schedulers + // because there is no reordering or concurrency with ImmediateScheduler + } + + @Override + @Test + public final void testSequenceOfDelayedActions() { + // ordering of nested actions will not match other schedulers + // because there is no reordering or concurrency with ImmediateScheduler + } + + @Override + @Test + public final void testMixOfDelayedAndNonDelayedActions() { + // ordering of nested actions will not match other schedulers + // because there is no reordering or concurrency with ImmediateScheduler + } + } diff --git a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java index 294193b167..80c36e9bd3 100644 --- a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java +++ b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java @@ -20,8 +20,8 @@ public static void main(String args[]) { usingFunc2(Schedulers.newThread()); usingAction0(Schedulers.newThread()); -// usingFunc2(Schedulers.currentThread()); -// usingAction0(Schedulers.currentThread()); + usingFunc2(Schedulers.currentThread()); + usingAction0(Schedulers.currentThread()); usingFunc2(Schedulers.threadPoolForComputation()); usingAction0(Schedulers.threadPoolForComputation()); From 28dd5fcdb393c7dcaea76138f6e9c8fece893c49 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 31 Dec 2013 10:10:20 -0800 Subject: [PATCH 08/15] Small ObserveOn Improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - use Long instead of Int so we don’t overflow - migrate from deprecated method --- .../src/main/java/rx/operators/OperationObserveOn.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 2f569ea75f..b655d2c53e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Notification; import rx.Observable; @@ -71,7 +72,7 @@ private class Observation { final CompositeSubscription compositeSubscription = new CompositeSubscription(); final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription(); final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - final AtomicInteger counter = new AtomicInteger(0); + final AtomicLong counter = new AtomicLong(0); private volatile Scheduler recursiveScheduler; public Observation(Observer observer) { @@ -108,7 +109,7 @@ public Subscription call(Scheduler innerScheduler, T state) { } void processQueue() { - recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1() { + recursiveSubscription.set(recursiveScheduler.schedule(new Action1() { @Override public void call(Action0 self) { Notification not = queue.poll(); From 43e3d777bb03c020eb5f9ad2aa32c4ae75111271 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 31 Dec 2013 10:37:41 -0800 Subject: [PATCH 09/15] CurrentThreadScheduler Fixes - outer/inner scheduling so nested order is correct while not deadlocking on certain nested use cases as found in previous testing --- .../rx/schedulers/CurrentThreadScheduler.java | 76 +++++++++++++------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java index 67d66aa4aa..7d64c301cc 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java @@ -21,8 +21,8 @@ import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.CompositeSubscription; import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Func1; import rx.util.functions.Func2; /** @@ -36,7 +36,17 @@ public static CurrentThreadScheduler getInstance() { return INSTANCE; } - private static final ThreadLocal> QUEUE = new ThreadLocal>(); + private static final ThreadLocal> QUEUE = new ThreadLocal>() { + protected java.util.PriorityQueue initialValue() { + return new PriorityQueue(); + }; + }; + + private static final ThreadLocal PROCESSING = new ThreadLocal() { + protected Boolean initialValue() { + return Boolean.FALSE; + }; + }; /* package accessible for unit tests */CurrentThreadScheduler() { } @@ -45,8 +55,8 @@ public static CurrentThreadScheduler getInstance() { public Subscription schedule(T state, Func2 action) { // immediately move to the InnerCurrentThreadScheduler InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); - DiscardableAction discardableAction = new DiscardableAction(state, action); - enqueue(innerScheduler, discardableAction, now()); + innerScheduler.enqueue(new DiscardableAction(state, action), now()); + enqueueFromOuter(innerScheduler, now()); return innerScheduler; } @@ -54,41 +64,48 @@ public Subscription schedule(T state, Func2 Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { long execTime = now() + unit.toMillis(dueTime); - // immediately move to the InnerCurrentThreadScheduler + // create an inner scheduler and queue it for execution InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); - DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); - enqueue(innerScheduler, discardableAction, execTime); - return discardableAction; + innerScheduler.enqueue(new DiscardableAction(state, new SleepingAction(action, this, execTime)), execTime); + enqueueFromOuter(innerScheduler, execTime); + return innerScheduler; } - private static void enqueue(Scheduler scheduler, DiscardableAction action, long execTime) { + /* + * This will accept InnerCurrentThreadScheduler instances and execute them in order they are received + * and on each of them will loop internally until each is complete. + */ + private void enqueueFromOuter(final InnerCurrentThreadScheduler innerScheduler, long execTime) { + // Note that everything here is single-threaded so we won't have race conditions PriorityQueue queue = QUEUE.get(); - boolean exec = queue == null; + queue.add(new TimedAction(new Func1() { - if (exec) { - queue = new PriorityQueue(); - QUEUE.set(queue); - } - - queue.add(new TimedAction(action, execTime, counter.incrementAndGet())); + @Override + public Subscription call(Scheduler _) { + // when the InnerCurrentThreadScheduler gets scheduled we want to process its tasks + return innerScheduler.startProcessing(); + } + }, execTime, counter.incrementAndGet())); - if (exec) { + // first time through starts the loop + if (!PROCESSING.get()) { + PROCESSING.set(Boolean.TRUE); while (!queue.isEmpty()) { - queue.poll().action.call(scheduler); + queue.poll().action.call(innerScheduler); } - - QUEUE.set(null); + PROCESSING.set(Boolean.FALSE); } } private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription { private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription(); + private final PriorityQueue innerQueue = new PriorityQueue(); @Override public Subscription schedule(T state, Func2 action) { DiscardableAction discardableAction = new DiscardableAction(state, action); childSubscription.set(discardableAction); - enqueue(this, discardableAction, now()); + enqueue(discardableAction, now()); return childSubscription; } @@ -98,10 +115,21 @@ public Subscription schedule(T state, Func2 discardableAction = new DiscardableAction(state, action); childSubscription.set(discardableAction); - enqueue(this, discardableAction, execTime); + enqueue(discardableAction, execTime); return childSubscription; } + private void enqueue(Func1 action, long execTime) { + innerQueue.add(new TimedAction(action, execTime, counter.incrementAndGet())); + } + + private Subscription startProcessing() { + while (!innerQueue.isEmpty()) { + innerQueue.poll().action.call(this); + } + return this; + } + @Override public void unsubscribe() { childSubscription.unsubscribe(); @@ -113,11 +141,11 @@ public void unsubscribe() { * Use time to sort items so delayed actions are sorted to their appropriate position in the queue. */ private static class TimedAction implements Comparable { - final DiscardableAction action; + final Func1 action; final Long execTime; final Long count; // In case if time between enqueueing took less than 1ms - private TimedAction(DiscardableAction action, Long execTime, Long count) { + private TimedAction(Func1 action, Long execTime, Long count) { this.action = action; this.execTime = execTime; this.count = count; From 2c3db738bc854ca99167859091ef0482e35a8012 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 31 Dec 2013 10:51:59 -0800 Subject: [PATCH 10/15] Fix CurrentThreadScheduler Delay Bug - introduced a bug during refactor, caught it while updating unit tests --- .../java/rx/schedulers/CurrentThreadScheduler.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java index 7d64c301cc..f570e38f55 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java @@ -55,18 +55,18 @@ protected Boolean initialValue() { public Subscription schedule(T state, Func2 action) { // immediately move to the InnerCurrentThreadScheduler InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); - innerScheduler.enqueue(new DiscardableAction(state, action), now()); + innerScheduler.schedule(state, action); enqueueFromOuter(innerScheduler, now()); return innerScheduler; } @Override - public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { - long execTime = now() + unit.toMillis(dueTime); + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + long execTime = now() + unit.toMillis(delayTime); // create an inner scheduler and queue it for execution InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler(); - innerScheduler.enqueue(new DiscardableAction(state, new SleepingAction(action, this, execTime)), execTime); + innerScheduler.schedule(state, action, delayTime, unit); enqueueFromOuter(innerScheduler, execTime); return innerScheduler; } @@ -113,7 +113,7 @@ public Subscription schedule(T state, Func2 Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { long execTime = now() + unit.toMillis(delayTime); - DiscardableAction discardableAction = new DiscardableAction(state, action); + DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); childSubscription.set(discardableAction); enqueue(discardableAction, execTime); return childSubscription; From 1f7d6dccdf953a7d5ec68e847cdf5c2f399107b8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 31 Dec 2013 11:00:30 -0800 Subject: [PATCH 11/15] Updated Schedulers Unit Tests - merged all scheduler tests into the same package - using inheritance so that the same tests are applied to each of the different Scheduler implementations - manual test (too slow for normal execution) can be run to test memory leaks (TestRecursionMemoryUsage.java) --- .../src/test/java/rx/SchedulersTest.java | 582 ------------------ .../AbstractSchedulerConcurrencyTests.java | 134 ++++ .../rx/schedulers/AbstractSchedulerTests.java | 281 ++++++++- .../CurrentThreadSchedulerTest.java | 31 + .../rx/schedulers/ExecutorSchedulerTests.java | 75 +++ .../rx/schedulers/ImmediateSchedulerTest.java | 54 ++ .../rx/schedulers/NewThreadSchedulerTest.java | 2 +- .../schedulers/TestRecursionMemoryUsage.java | 15 + .../java/rx/schedulers/TestSchedulerTest.java | 96 +++ 9 files changed, 680 insertions(+), 590 deletions(-) delete mode 100644 rxjava-core/src/test/java/rx/SchedulersTest.java create mode 100644 rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java diff --git a/rxjava-core/src/test/java/rx/SchedulersTest.java b/rxjava-core/src/test/java/rx/SchedulersTest.java deleted file mode 100644 index 62e74f5798..0000000000 --- a/rxjava-core/src/test/java/rx/SchedulersTest.java +++ /dev/null @@ -1,582 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx; - -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.Date; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; - -import rx.Observable.OnSubscribeFunc; -import rx.schedulers.Schedulers; -import rx.schedulers.TestScheduler; -import rx.subscriptions.BooleanSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Action1; -import rx.util.functions.Func1; -import rx.util.functions.Func2; - -public class SchedulersTest { - - @SuppressWarnings("unchecked") - // mocking is unchecked, unfortunately - @Test - public void testPeriodicScheduling() { - final Func1 calledOp = mock(Func1.class); - - final TestScheduler scheduler = new TestScheduler(); - Subscription subscription = scheduler.schedulePeriodically(new Action0() { - @Override - public void call() { - System.out.println(scheduler.now()); - calledOp.call(scheduler.now()); - } - }, 1, 2, TimeUnit.SECONDS); - - verify(calledOp, never()).call(anyLong()); - - InOrder inOrder = Mockito.inOrder(calledOp); - - scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(1000L); - - scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(3000L); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(3000L); - - scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); - inOrder.verify(calledOp, times(1)).call(5000L); - inOrder.verify(calledOp, times(1)).call(7000L); - - subscription.unsubscribe(); - scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); - } - - @Test - public void testComputationThreadPool1() { - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.subscribeOn(Schedulers.threadPoolForComputation()).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testIOThreadPool1() { - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.subscribeOn(Schedulers.threadPoolForIO()).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithoutScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().equals(currentThreadName)); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithImmediateScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().equals(currentThreadName)); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithCurrentThreadScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().equals(currentThreadName)); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testMergeWithScheduler1() { - - final String currentThreadName = Thread.currentThread().getName(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { - - @Override - public String call(Integer t) { - assertFalse(Thread.currentThread().getName().equals(currentThreadName)); - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - - @Test - public void testSubscribeWithScheduler1() throws InterruptedException { - - final AtomicInteger count = new AtomicInteger(); - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - - o1.subscribe(new Action1() { - - @Override - public void call(Integer t) { - System.out.println("Thread: " + Thread.currentThread().getName()); - System.out.println("t: " + t); - count.incrementAndGet(); - } - }); - - // the above should be blocking so we should see a count of 5 - assertEquals(5, count.get()); - - count.set(0); - - // now we'll subscribe with a scheduler and it should be async - - final String currentThreadName = Thread.currentThread().getName(); - - // latches for deterministically controlling the test below across threads - final CountDownLatch latch = new CountDownLatch(5); - final CountDownLatch first = new CountDownLatch(1); - - o1.subscribe(new Action1() { - - @Override - public void call(Integer t) { - try { - // we block the first one so we can assert this executes asynchronously with a count - first.await(1000, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException("The latch should have released if we are async.", e); - } - assertFalse(Thread.currentThread().getName().equals(currentThreadName)); - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - System.out.println("Thread: " + Thread.currentThread().getName()); - System.out.println("t: " + t); - count.incrementAndGet(); - latch.countDown(); - } - }, Schedulers.threadPoolForComputation()); - - // assert we are async - assertEquals(0, count.get()); - // release the latch so it can go forward - first.countDown(); - - // wait for all 5 responses - latch.await(); - assertEquals(5, count.get()); - } - - @Test - public void testRecursiveScheduler1() { - Observable obs = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(final Observer observer) { - return Schedulers.currentThread().schedule(0, new Func2() { - @Override - public Subscription call(Scheduler scheduler, Integer i) { - if (i > 42) { - observer.onCompleted(); - return Subscriptions.empty(); - } - - observer.onNext(i); - - return scheduler.schedule(i + 1, this); - } - }); - } - }); - - final AtomicInteger lastValue = new AtomicInteger(); - obs.toBlockingObservable().forEach(new Action1() { - - @Override - public void call(Integer v) { - System.out.println("Value: " + v); - lastValue.set(v); - } - }); - - assertEquals(42, lastValue.get()); - } - - @Test - public void testRecursiveScheduler2() throws InterruptedException { - // use latches instead of Thread.sleep - final CountDownLatch latch = new CountDownLatch(10); - final CountDownLatch completionLatch = new CountDownLatch(1); - - Observable obs = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(final Observer observer) { - - return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2() { - @Override - public Subscription call(Scheduler scheduler, BooleanSubscription cancel) { - if (cancel.isUnsubscribed()) { - observer.onCompleted(); - completionLatch.countDown(); - return Subscriptions.empty(); - } - - observer.onNext(42); - latch.countDown(); - - // this will recursively schedule this task for execution again - scheduler.schedule(cancel, this); - - return cancel; - } - }); - } - }); - - final AtomicInteger count = new AtomicInteger(); - final AtomicBoolean completed = new AtomicBoolean(false); - Subscription subscribe = obs.subscribe(new Observer() { - @Override - public void onCompleted() { - System.out.println("Completed"); - completed.set(true); - } - - @Override - public void onError(Throwable e) { - System.out.println("Error"); - } - - @Override - public void onNext(Integer args) { - count.incrementAndGet(); - System.out.println(args); - } - }); - - if (!latch.await(5000, TimeUnit.MILLISECONDS)) { - fail("Timed out waiting on onNext latch"); - } - - // now unsubscribe and ensure it stops the recursive loop - subscribe.unsubscribe(); - System.out.println("unsubscribe"); - - if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { - fail("Timed out waiting on completion latch"); - } - - // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count - assertTrue(count.get() >= 10); - assertTrue(completed.get()); - } - - @Test - public void testSchedulingWithDueTime() throws InterruptedException { - - final CountDownLatch latch = new CountDownLatch(5); - final AtomicInteger counter = new AtomicInteger(); - - long start = System.currentTimeMillis(); - - Schedulers.threadPoolForComputation().schedule(null, new Func2() { - - @Override - public Subscription call(Scheduler scheduler, String state) { - System.out.println("doing work"); - counter.incrementAndGet(); - latch.countDown(); - if (latch.getCount() == 0) { - return Subscriptions.empty(); - } else { - return scheduler.schedule(state, this, new Date(System.currentTimeMillis() + 50)); - } - } - }, new Date(System.currentTimeMillis() + 100)); - - if (!latch.await(3000, TimeUnit.MILLISECONDS)) { - fail("didn't execute ... timed out"); - } - - long end = System.currentTimeMillis(); - - assertEquals(5, counter.get()); - if ((end - start) < 250) { - fail("it should have taken over 250ms since each step was scheduled 50ms in the future"); - } - } - - @Test - public void testConcurrentOnNextFailsValidation() throws InterruptedException { - - final int count = 10; - final CountDownLatch latch = new CountDownLatch(count); - Observable o = Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer observer) { - for (int i = 0; i < count; i++) { - final int v = i; - new Thread(new Runnable() { - - @Override - public void run() { - observer.onNext("v: " + v); - - latch.countDown(); - } - }).start(); - } - return Subscriptions.empty(); - } - }); - - ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); - // this should call onNext concurrently - o.subscribe(observer); - - if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { - fail("timed out"); - } - - if (observer.error.get() == null) { - fail("We expected error messages due to concurrency"); - } - } - - @Test - public void testObserveOn() throws InterruptedException { - - Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"); - - ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); - - o.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer); - - if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { - fail("timed out"); - } - - if (observer.error.get() != null) { - observer.error.get().printStackTrace(); - fail("Error: " + observer.error.get().getMessage()); - } - } - - @Test - public void testSubscribeOnNestedConcurrency() throws InterruptedException { - - Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten") - .mapMany(new Func1>() { - - @Override - public Observable call(final String v) { - return Observable.create(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer observer) { - observer.onNext("value_after_map-" + v); - observer.onCompleted(); - return Subscriptions.empty(); - } - }).subscribeOn(Schedulers.newThread()); // subscribe on a new thread - } - }); - - ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); - - o.subscribe(observer); - - if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { - fail("timed out"); - } - - if (observer.error.get() != null) { - observer.error.get().printStackTrace(); - fail("Error: " + observer.error.get().getMessage()); - } - } - - @Test - public void testRecursion() { - TestScheduler s = new TestScheduler(); - - final AtomicInteger counter = new AtomicInteger(0); - - Subscription subscription = s.schedule(new Action1() { - - @Override - public void call(Action0 self) { - counter.incrementAndGet(); - System.out.println("counter: " + counter.get()); - self.call(); - } - - }); - subscription.unsubscribe(); - assertEquals(0, counter.get()); - } - - /** - * Used to determine if onNext is being invoked concurrently. - * - * @param - */ - private static class ConcurrentObserverValidator implements Observer { - - final AtomicInteger concurrentCounter = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); - final CountDownLatch completed = new CountDownLatch(1); - - @Override - public void onCompleted() { - completed.countDown(); - } - - @Override - public void onError(Throwable e) { - completed.countDown(); - error.set(e); - } - - @Override - public void onNext(T args) { - int count = concurrentCounter.incrementAndGet(); - System.out.println("ConcurrentObserverValidator.onNext: " + args); - if (count > 1) { - onError(new RuntimeException("we should not have concurrent execution of onNext")); - } - try { - try { - // take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping) - Thread.sleep(50); - } catch (InterruptedException e) { - // ignore - } - } finally { - concurrentCounter.decrementAndGet(); - } - } - - } -} diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 6602d5071d..ef78d2ee11 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -4,15 +4,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.operators.SafeObservableSubscription; +import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -192,4 +195,135 @@ public void call(Action0 self) { latch.await(); } + @Test + public void testRecursiveScheduler2() throws InterruptedException { + // use latches instead of Thread.sleep + final CountDownLatch latch = new CountDownLatch(10); + final CountDownLatch completionLatch = new CountDownLatch(1); + + Observable obs = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + + return getScheduler().schedule(null, new Func2() { + @Override + public Subscription call(Scheduler scheduler, Void v) { + observer.onNext(42); + latch.countDown(); + + // this will recursively schedule this task for execution again + scheduler.schedule(null, this); + + return Subscriptions.create(new Action0() { + + @Override + public void call() { + observer.onCompleted(); + completionLatch.countDown(); + } + + }); + } + }); + } + }); + + final AtomicInteger count = new AtomicInteger(); + final AtomicBoolean completed = new AtomicBoolean(false); + Subscription subscribe = obs.subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("Completed"); + completed.set(true); + } + + @Override + public void onError(Throwable e) { + System.out.println("Error"); + } + + @Override + public void onNext(Integer args) { + count.incrementAndGet(); + System.out.println(args); + } + }); + + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + fail("Timed out waiting on onNext latch"); + } + + // now unsubscribe and ensure it stops the recursive loop + subscribe.unsubscribe(); + System.out.println("unsubscribe"); + + if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { + fail("Timed out waiting on completion latch"); + } + + // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count + assertTrue(count.get() >= 10); + assertTrue(completed.get()); + } + + @Test + public final void testSubscribeWithScheduler() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final AtomicInteger count = new AtomicInteger(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + } + }); + + // the above should be blocking so we should see a count of 5 + assertEquals(5, count.get()); + + count.set(0); + + // now we'll subscribe with a scheduler and it should be async + + final String currentThreadName = Thread.currentThread().getName(); + + // latches for deterministically controlling the test below across threads + final CountDownLatch latch = new CountDownLatch(5); + final CountDownLatch first = new CountDownLatch(1); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + try { + // we block the first one so we can assert this executes asynchronously with a count + first.await(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("The latch should have released if we are async.", e); + } + + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + latch.countDown(); + } + }, scheduler); + + // assert we are async + assertEquals(0, count.get()); + // release the latch so it can go forward + first.countDown(); + + // wait for all 5 responses + latch.await(); + assertEquals(5, count.get()); + } + } diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index 508727bc12..e83f5936c7 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -1,14 +1,35 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.schedulers; import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.util.Arrays; +import java.util.Date; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import rx.Observable; import rx.Observable.OnSubscribeFunc; @@ -19,6 +40,7 @@ import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; /** @@ -141,7 +163,7 @@ public void call() { InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); scheduler.schedule(thirdAction); - + latch.await(); inOrder.verify(thirdStepStart, times(1)).call(); @@ -152,6 +174,37 @@ public void call() { inOrder.verify(firstStepEnd, times(1)).call(); } + @Test + public final void testNestedScheduling() { + + Observable ids = Observable.from(Arrays.asList(1, 2), getScheduler()); + + Observable m = ids.flatMap(new Func1>() { + + @Override + public Observable call(Integer id) { + return Observable.from(Arrays.asList("a-" + id, "b-" + id), getScheduler()) + .map(new Func1() { + + @Override + public String call(String s) { + return "names=>" + s; + } + }); + } + + }); + + List strings = m.toList().toBlockingObservable().last(); + + assertEquals(4, strings.size()); + assertEquals("names=>a-1", strings.get(0)); + assertEquals("names=>b-1", strings.get(1)); + assertEquals("names=>a-2", strings.get(2)); + assertEquals("names=>b-2", strings.get(3)); + } + + @SuppressWarnings("rawtypes") @Test public final void testSequenceOfActions() throws InterruptedException { final Scheduler scheduler = getScheduler(); @@ -160,15 +213,21 @@ public final void testSequenceOfActions() throws InterruptedException { final Action0 first = mock(Action0.class); final Action0 second = mock(Action0.class); - scheduler.schedule(first); - scheduler.schedule(second); - scheduler.schedule(new Action0() { + // make it wait until after the second is called + doAnswer(new Answer() { @Override - public void call() { - latch.countDown(); + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return invocation.getMock(); + } finally { + latch.countDown(); + } } - }); + }).when(second).call(); + + scheduler.schedule(first); + scheduler.schedule(second); latch.await(); @@ -315,4 +374,212 @@ public Subscription call(Scheduler innerScheduler, Integer state) { assertEquals(100, i.get()); } + @Test + public final void testRecursiveSchedulerSimple() { + final Scheduler scheduler = getScheduler(); + + Observable obs = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + return scheduler.schedule(0, new Func2() { + @Override + public Subscription call(Scheduler scheduler, Integer i) { + if (i > 42) { + observer.onCompleted(); + return Subscriptions.empty(); + } + + observer.onNext(i); + + return scheduler.schedule(i + 1, this); + } + }); + } + }); + + final AtomicInteger lastValue = new AtomicInteger(); + obs.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer v) { + System.out.println("Value: " + v); + lastValue.set(v); + } + }); + + assertEquals(42, lastValue.get()); + } + + @Test + public final void testSchedulingWithDueTime() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + final CountDownLatch latch = new CountDownLatch(5); + final AtomicInteger counter = new AtomicInteger(); + + long start = System.currentTimeMillis(); + + scheduler.schedule(null, new Func2() { + + @Override + public Subscription call(Scheduler scheduler, String state) { + System.out.println("doing work"); + counter.incrementAndGet(); + latch.countDown(); + if (latch.getCount() == 0) { + return Subscriptions.empty(); + } else { + return scheduler.schedule(state, this, new Date(scheduler.now() + 50)); + } + } + }, new Date(scheduler.now() + 100)); + + if (!latch.await(3000, TimeUnit.MILLISECONDS)) { + fail("didn't execute ... timed out"); + } + + long end = System.currentTimeMillis(); + + assertEquals(5, counter.get()); + System.out.println("Time taken: " + (end - start)); + if ((end - start) < 250) { + fail("it should have taken over 250ms since each step was scheduled 50ms in the future"); + } + } + + @Test + public final void testConcurrentOnNextFailsValidation() throws InterruptedException { + final int count = 10; + final CountDownLatch latch = new CountDownLatch(count); + Observable o = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + for (int i = 0; i < count; i++) { + final int v = i; + new Thread(new Runnable() { + + @Override + public void run() { + observer.onNext("v: " + v); + + latch.countDown(); + } + }).start(); + } + return Subscriptions.empty(); + } + }); + + ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); + // this should call onNext concurrently + o.subscribe(observer); + + if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + if (observer.error.get() == null) { + fail("We expected error messages due to concurrency"); + } + } + + @Test + public final void testObserveOn() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"); + + ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); + + o.observeOn(scheduler).subscribe(observer); + + if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + if (observer.error.get() != null) { + observer.error.get().printStackTrace(); + fail("Error: " + observer.error.get().getMessage()); + } + } + + @Test + public final void testSubscribeOnNestedConcurrency() throws InterruptedException { + final Scheduler scheduler = getScheduler(); + + Observable o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten") + .mergeMap(new Func1>() { + + @Override + public Observable call(final String v) { + return Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + observer.onNext("value_after_map-" + v); + observer.onCompleted(); + return Subscriptions.empty(); + } + }).subscribeOn(scheduler); + } + }); + + ConcurrentObserverValidator observer = new ConcurrentObserverValidator(); + + o.subscribe(observer); + + if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + if (observer.error.get() != null) { + observer.error.get().printStackTrace(); + fail("Error: " + observer.error.get().getMessage()); + } + } + + /** + * Used to determine if onNext is being invoked concurrently. + * + * @param + */ + private static class ConcurrentObserverValidator implements Observer { + + final AtomicInteger concurrentCounter = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + final CountDownLatch completed = new CountDownLatch(1); + + @Override + public void onCompleted() { + completed.countDown(); + } + + @Override + public void onError(Throwable e) { + completed.countDown(); + error.set(e); + } + + @Override + public void onNext(T args) { + int count = concurrentCounter.incrementAndGet(); + System.out.println("ConcurrentObserverValidator.onNext: " + args); + if (count > 1) { + onError(new RuntimeException("we should not have concurrent execution of onNext")); + } + try { + try { + // take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping) + Thread.sleep(50); + } catch (InterruptedException e) { + // ignore + } + } finally { + concurrentCounter.decrementAndGet(); + } + } + + } + } diff --git a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java index 961617bbd8..59fb2fe46c 100644 --- a/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java @@ -15,7 +15,14 @@ */ package rx.schedulers; +import static org.junit.Assert.*; + +import org.junit.Test; + +import rx.Observable; import rx.Scheduler; +import rx.util.functions.Action1; +import rx.util.functions.Func1; public class CurrentThreadSchedulerTest extends AbstractSchedulerTests { @@ -24,4 +31,28 @@ protected Scheduler getScheduler() { return CurrentThreadScheduler.getInstance(); } + @Test + public final void testMergeWithCurrentThreadScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java index d065475232..c44e4ceb16 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java @@ -22,10 +22,12 @@ import org.junit.Test; +import rx.Observable; import rx.Scheduler; import rx.Subscription; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; public class ExecutorSchedulerTests extends AbstractSchedulerConcurrencyTests { @@ -89,4 +91,77 @@ public void call(Action0 self) { assertEquals(NUM, statefulMap.get("b").intValue()); assertEquals(NUM, statefulMap.get("nonThreadSafeCounter").intValue()); } + + @Test + public final void testComputationThreadPool1() { + final Scheduler scheduler = getScheduler(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForComputation()).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public final void testIOThreadPool1() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForIO()).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public final void testMergeWithExecutorScheduler() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { + + @Override + public String call(Integer t) { + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index a00878de10..34efd5c21d 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -15,9 +15,14 @@ */ package rx.schedulers; +import static org.junit.Assert.*; + import org.junit.Test; +import rx.Observable; import rx.Scheduler; +import rx.util.functions.Action1; +import rx.util.functions.Func1; public class ImmediateSchedulerTest extends AbstractSchedulerTests { @@ -47,4 +52,53 @@ public final void testMixOfDelayedAndNonDelayedActions() { // because there is no reordering or concurrency with ImmediateScheduler } + @Test + public final void testMergeWithoutScheduler() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public final void testMergeWithImmediateScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java index eaf5855e6c..1b82db95ca 100644 --- a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -24,5 +24,5 @@ public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { protected Scheduler getScheduler() { return NewThreadScheduler.getInstance(); } - + } diff --git a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java index 80c36e9bd3..e2a62a76e8 100644 --- a/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java +++ b/rxjava-core/src/test/java/rx/schedulers/TestRecursionMemoryUsage.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.schedulers; import rx.Observable; diff --git a/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java new file mode 100644 index 0000000000..1d13995bef --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/TestSchedulerTest.java @@ -0,0 +1,96 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class TestSchedulerTest { + + @SuppressWarnings("unchecked") + // mocking is unchecked, unfortunately + @Test + public final void testPeriodicScheduling() { + final Func1 calledOp = mock(Func1.class); + + final TestScheduler scheduler = new TestScheduler(); + Subscription subscription = scheduler.schedulePeriodically(new Action0() { + @Override + public void call() { + System.out.println(scheduler.now()); + calledOp.call(scheduler.now()); + } + }, 1, 2, TimeUnit.SECONDS); + + verify(calledOp, never()).call(anyLong()); + + InOrder inOrder = Mockito.inOrder(calledOp); + + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(1000L); + + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(3000L); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(3000L); + + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(5000L); + inOrder.verify(calledOp, times(1)).call(7000L); + + subscription.unsubscribe(); + scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + } + + @Test + public final void testRecursion() { + TestScheduler s = new TestScheduler(); + + final AtomicInteger counter = new AtomicInteger(0); + + Subscription subscription = s.schedule(new Action1() { + + @Override + public void call(Action0 self) { + counter.incrementAndGet(); + System.out.println("counter: " + counter.get()); + self.call(); + } + + }); + subscription.unsubscribe(); + assertEquals(0, counter.get()); + } + +} From b7e6410fe9be1cb4e3d1257a13e91506e2af9f1a Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 31 Dec 2013 14:50:49 -0800 Subject: [PATCH 12/15] Missing Header --- .../AbstractSchedulerConcurrencyTests.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index ef78d2ee11..11dc5c1859 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.schedulers; import static org.junit.Assert.*; @@ -15,7 +30,6 @@ import rx.Scheduler; import rx.Subscription; import rx.operators.SafeObservableSubscription; -import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; From abe9c98b521f1fd14c6fe2e2ca42d6bca63ac256 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 1 Jan 2014 20:38:58 -0800 Subject: [PATCH 13/15] Increasing Unit Test Timeout for Slow Machines - Increasing timeout by a lot to handle slow machines such as this: https://netflixoss.ci.cloudbees.com/job/RxJava-pull-requests/629/testReport/junit/rx.schedulers/ExecutorSchedulerTests/recursionUsingFunc2/ - The timeout is only there if a deadlock or memory leak occurs (which is what this PR is fixing) so when everything is healthy it does not timeout --- .../java/rx/schedulers/AbstractSchedulerConcurrencyTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 11dc5c1859..79b455b6ed 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -163,7 +163,7 @@ public Subscription call(Scheduler innerScheduler, Long i) { assertEquals(10, counter.get()); } - @Test(timeout = 8000) + @Test(timeout = 20000) public void recursionUsingFunc2() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); getScheduler().schedule(1L, new Func2() { @@ -185,7 +185,7 @@ public Subscription call(Scheduler innerScheduler, Long i) { latch.await(); } - @Test(timeout = 8000) + @Test(timeout = 20000) public void recursionUsingAction0() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); getScheduler().schedule(new Action1() { From 9b3a838c02b433ed8fbed014a5565fcb5b800891 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 1 Jan 2014 20:53:16 -0800 Subject: [PATCH 14/15] Remove Timeout on Tests --- .../java/rx/schedulers/AbstractSchedulerConcurrencyTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 79b455b6ed..89c3d7221f 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -163,7 +163,7 @@ public Subscription call(Scheduler innerScheduler, Long i) { assertEquals(10, counter.get()); } - @Test(timeout = 20000) + @Test public void recursionUsingFunc2() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); getScheduler().schedule(1L, new Func2() { @@ -185,7 +185,7 @@ public Subscription call(Scheduler innerScheduler, Long i) { latch.await(); } - @Test(timeout = 20000) + @Test public void recursionUsingAction0() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); getScheduler().schedule(new Action1() { From 1d0d90c0e595490b0944a7eaf11e6e317c998267 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 1 Jan 2014 20:55:43 -0800 Subject: [PATCH 15/15] Remove Validation of Ordering - this test does a flatMap which uses merge which has non-deterministic ordering since the Observable.from can be on a new thread each time --- .../test/java/rx/schedulers/AbstractSchedulerTests.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java index e83f5936c7..ddf4f959a6 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -198,10 +198,11 @@ public String call(String s) { List strings = m.toList().toBlockingObservable().last(); assertEquals(4, strings.size()); - assertEquals("names=>a-1", strings.get(0)); - assertEquals("names=>b-1", strings.get(1)); - assertEquals("names=>a-2", strings.get(2)); - assertEquals("names=>b-2", strings.get(3)); + // because flatMap does a merge there is no guarantee of order + assertTrue(strings.contains("names=>a-1")); + assertTrue(strings.contains("names=>a-2")); + assertTrue(strings.contains("names=>b-1")); + assertTrue(strings.contains("names=>b-2")); } @SuppressWarnings("rawtypes")